diff --git a/.github/workflows/trigger-e2e-tests.yml b/.github/workflows/trigger-e2e-tests.yml index 77928a343e..0a615b3e37 100644 --- a/.github/workflows/trigger-e2e-tests.yml +++ b/.github/workflows/trigger-e2e-tests.yml @@ -10,11 +10,13 @@ defaults: run: shell: bash -euxo pipefail {0} +concurrency: + group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }} + cancel-in-progress: true + env: # A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix E2E_CONCURRENCY_GROUP: ${{ github.repository }}-e2e-tests-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }} - AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }} jobs: cancel-previous-e2e-tests: @@ -64,19 +66,35 @@ jobs: needs: [ tag ] runs-on: ubuntu-22.04 env: + EVENT_ACTION: ${{ github.event.action }} + GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }} TAG: ${{ needs.tag.outputs.build-tag }} steps: - - name: check if ecr image are present - env: - AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }} + - name: Wait for `promote-images` job to finish + # It's important to have a timeout here, the script in the step can run infinitely + timeout-minutes: 60 run: | - for REPO in neon compute-tools compute-node-v14 vm-compute-node-v14 compute-node-v15 vm-compute-node-v15 compute-node-v16 vm-compute-node-v16; do - OUTPUT=$(aws ecr describe-images --repository-name ${REPO} --region eu-central-1 --query "imageDetails[?imageTags[?contains(@, '${TAG}')]]" --output text) - if [ "$OUTPUT" == "" ]; then - echo "$REPO with image tag $TAG not found" >> $GITHUB_OUTPUT - exit 1 - fi + if [ "${GITHUB_EVENT_NAME}" != "pull_request" ] || [ "${EVENT_ACTION}" != "ready_for_review" ]; then + exit 0 + fi + + # For PRs we use the run id as the tag + BUILD_AND_TEST_RUN_ID=${TAG} + while true; do + conclusion=$(gh run --repo ${GITHUB_REPOSITORY} view ${BUILD_AND_TEST_RUN_ID} --json jobs --jq '.jobs[] | select(.name == "promote-images") | .conclusion') + case "$conclusion" in + success) + break + ;; + failure | cancelled | skipped) + echo "The 'promote-images' job didn't succeed: '${conclusion}'. Exiting..." + exit 1 + ;; + *) + echo "The 'promote-images' hasn't succeed yet. Waiting..." + sleep 60 + ;; + esac done - name: Set e2e-platforms diff --git a/Cargo.lock b/Cargo.lock index 2677699702..764c0fbd30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5703,6 +5703,7 @@ dependencies = [ "pageserver_client", "postgres_connection", "r2d2", + "rand 0.8.5", "reqwest 0.12.4", "routerify", "scopeguard", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 8ceb8f2ad2..8af0ed43ce 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -4,6 +4,11 @@ version = "0.1.0" edition.workspace = true license.workspace = true +[features] +default = [] +# Enables test specific features. +testing = [] + [dependencies] anyhow.workspace = true async-compression.workspace = true diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 91855d954d..5bd6897fe3 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -400,7 +400,15 @@ impl ComputeNode { pub fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> { let mut retry_period_ms = 500.0; let mut attempts = 0; - let max_attempts = 10; + const DEFAULT_ATTEMPTS: u16 = 10; + #[cfg(feature = "testing")] + let max_attempts = if let Ok(v) = env::var("NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES") { + u16::from_str(&v).unwrap() + } else { + DEFAULT_ATTEMPTS + }; + #[cfg(not(feature = "testing"))] + let max_attempts = DEFAULT_ATTEMPTS; loop { let result = self.try_get_basebackup(compute_state, lsn); match result { diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index a272c306e7..bf8a27e550 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -289,7 +289,7 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command { fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command { for (var, val) in std::env::vars() { - if var.starts_with("NEON_PAGESERVER_") { + if var.starts_with("NEON_") { cmd = cmd.env(var, val); } } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index fed666ca45..9e021c7e35 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1378,6 +1378,18 @@ impl RemoteTimelineClient { .dirty .layer_metadata .drain() + .filter(|(_file_name, meta)| { + // Filter out layers that belonged to an ancestor shard. Since we are deleting the whole timeline from + // all shards anyway, we _could_ delete these, but + // - it creates a potential race if other shards are still + // using the layers while this shard deletes them. + // - it means that if we rolled back the shard split, the ancestor shards would be in a state where + // these timelines are present but corrupt (their index exists but some layers don't) + // + // These layers will eventually be cleaned up by the scrubber when it does physical GC. + meta.shard.shard_number == self.tenant_shard_id.shard_number + && meta.shard.shard_count == self.tenant_shard_id.shard_count + }) .map(|(file_name, meta)| { remote_layer_path( &self.tenant_shard_id.tenant_id, diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 4fd110359b..59d3e1ce09 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -8,6 +8,9 @@ mod layer_desc; mod layer_name; pub mod merge_iterator; +#[cfg(test)] +pub mod split_writer; + use crate::context::{AccessStatsBehavior, RequestContext}; use crate::repository::Value; use crate::walrecord::NeonWalRecord; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index ba5d291694..44a6f3f240 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -382,6 +382,9 @@ struct DeltaLayerWriterInner { tree: DiskBtreeBuilder, blob_writer: BlobWriter, + + // Number of key-lsns in the layer. + num_keys: usize, } impl DeltaLayerWriterInner { @@ -422,6 +425,7 @@ impl DeltaLayerWriterInner { lsn_range, tree: tree_builder, blob_writer, + num_keys: 0, }) } @@ -472,6 +476,9 @@ impl DeltaLayerWriterInner { let delta_key = DeltaKey::from_key_lsn(&key, lsn); let res = self.tree.append(&delta_key.0, blob_ref.0); + + self.num_keys += 1; + (val, res.map_err(|e| anyhow::anyhow!(e))) } @@ -674,6 +681,17 @@ impl DeltaLayerWriter { ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { self.inner.take().unwrap().finish(key_end, ctx).await } + + #[cfg(test)] + pub(crate) fn num_keys(&self) -> usize { + self.inner.as_ref().unwrap().num_keys + } + + #[cfg(test)] + pub(crate) fn estimated_size(&self) -> u64 { + let inner = self.inner.as_ref().unwrap(); + inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64 + } } impl Drop for DeltaLayerWriter { diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 08db27514a..aa308ba3c1 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -742,8 +742,14 @@ struct ImageLayerWriterInner { // where we have chosen their compressed form uncompressed_bytes_chosen: u64, + // Number of keys in the layer. + num_keys: usize, + blob_writer: BlobWriter, tree: DiskBtreeBuilder, + + #[cfg_attr(not(feature = "testing"), allow(dead_code))] + last_written_key: Key, } impl ImageLayerWriterInner { @@ -800,6 +806,8 @@ impl ImageLayerWriterInner { uncompressed_bytes: 0, uncompressed_bytes_eligible: 0, uncompressed_bytes_chosen: 0, + num_keys: 0, + last_written_key: Key::MIN, }; Ok(writer) @@ -820,6 +828,7 @@ impl ImageLayerWriterInner { let compression = self.conf.image_compression; let uncompressed_len = img.len() as u64; self.uncompressed_bytes += uncompressed_len; + self.num_keys += 1; let (_img, res) = self .blob_writer .write_blob_maybe_compressed(img, ctx, compression) @@ -839,6 +848,11 @@ impl ImageLayerWriterInner { key.write_to_byte_slice(&mut keybuf); self.tree.append(&keybuf, off)?; + #[cfg(feature = "testing")] + { + self.last_written_key = key; + } + Ok(()) } @@ -849,6 +863,7 @@ impl ImageLayerWriterInner { self, timeline: &Arc, ctx: &RequestContext, + end_key: Option, ) -> anyhow::Result { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -899,11 +914,23 @@ impl ImageLayerWriterInner { let desc = PersistentLayerDesc::new_img( self.tenant_shard_id, self.timeline_id, - self.key_range.clone(), + if let Some(end_key) = end_key { + self.key_range.start..end_key + } else { + self.key_range.clone() + }, self.lsn, metadata.len(), ); + #[cfg(feature = "testing")] + if let Some(end_key) = end_key { + assert!( + self.last_written_key < end_key, + "written key violates end_key range" + ); + } + // Note: Because we open the file in write-only mode, we cannot // reuse the same VirtualFile for reading later. That's why we don't // set inner.file here. The first read will have to re-open it. @@ -980,6 +1007,18 @@ impl ImageLayerWriter { self.inner.as_mut().unwrap().put_image(key, img, ctx).await } + #[cfg(test)] + /// Estimated size of the image layer. + pub(crate) fn estimated_size(&self) -> u64 { + let inner = self.inner.as_ref().unwrap(); + inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64 + } + + #[cfg(test)] + pub(crate) fn num_keys(&self) -> usize { + self.inner.as_ref().unwrap().num_keys + } + /// /// Finish writing the image layer. /// @@ -988,7 +1027,22 @@ impl ImageLayerWriter { timeline: &Arc, ctx: &RequestContext, ) -> anyhow::Result { - self.inner.take().unwrap().finish(timeline, ctx).await + self.inner.take().unwrap().finish(timeline, ctx, None).await + } + + #[cfg(test)] + /// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive. + pub(super) async fn finish_with_end_key( + mut self, + timeline: &Arc, + end_key: Key, + ctx: &RequestContext, + ) -> anyhow::Result { + self.inner + .take() + .unwrap() + .finish(timeline, ctx, Some(end_key)) + .await } } diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs new file mode 100644 index 0000000000..a966775f9e --- /dev/null +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -0,0 +1,449 @@ +use std::{ops::Range, sync::Arc}; + +use bytes::Bytes; +use pageserver_api::key::{Key, KEY_SIZE}; +use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId}; + +use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline}; + +use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer}; + +/// An image writer that takes images and produces multiple image layers. The interface does not +/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files +/// to be cleaned up) +#[must_use] +pub struct SplitImageLayerWriter { + inner: ImageLayerWriter, + target_layer_size: u64, + generated_layers: Vec, + conf: &'static PageServerConf, + timeline_id: TimelineId, + tenant_shard_id: TenantShardId, + lsn: Lsn, +} + +impl SplitImageLayerWriter { + pub async fn new( + conf: &'static PageServerConf, + timeline_id: TimelineId, + tenant_shard_id: TenantShardId, + start_key: Key, + lsn: Lsn, + target_layer_size: u64, + ctx: &RequestContext, + ) -> anyhow::Result { + Ok(Self { + target_layer_size, + inner: ImageLayerWriter::new( + conf, + timeline_id, + tenant_shard_id, + &(start_key..Key::MAX), + lsn, + ctx, + ) + .await?, + generated_layers: Vec::new(), + conf, + timeline_id, + tenant_shard_id, + lsn, + }) + } + + pub async fn put_image( + &mut self, + key: Key, + img: Bytes, + tline: &Arc, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + // The current estimation is an upper bound of the space that the key/image could take + // because we did not consider compression in this estimation. The resulting image layer + // could be smaller than the target size. + let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64; + if self.inner.num_keys() >= 1 + && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size + { + let next_image_writer = ImageLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + &(key..Key::MAX), + self.lsn, + ctx, + ) + .await?; + let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer); + self.generated_layers.push( + prev_image_writer + .finish_with_end_key(tline, key, ctx) + .await?, + ); + } + self.inner.put_image(key, img, ctx).await + } + + pub(crate) async fn finish( + self, + tline: &Arc, + ctx: &RequestContext, + end_key: Key, + ) -> anyhow::Result> { + let Self { + mut generated_layers, + inner, + .. + } = self; + generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?); + Ok(generated_layers) + } + + /// When split writer fails, the caller should call this function and handle partially generated layers. + #[allow(dead_code)] + pub(crate) async fn take(self) -> anyhow::Result<(Vec, ImageLayerWriter)> { + Ok((self.generated_layers, self.inner)) + } +} + +/// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not +/// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files +/// to be cleaned up). +#[must_use] +pub struct SplitDeltaLayerWriter { + inner: DeltaLayerWriter, + target_layer_size: u64, + generated_layers: Vec, + conf: &'static PageServerConf, + timeline_id: TimelineId, + tenant_shard_id: TenantShardId, + lsn_range: Range, +} + +impl SplitDeltaLayerWriter { + pub async fn new( + conf: &'static PageServerConf, + timeline_id: TimelineId, + tenant_shard_id: TenantShardId, + start_key: Key, + lsn_range: Range, + target_layer_size: u64, + ctx: &RequestContext, + ) -> anyhow::Result { + Ok(Self { + target_layer_size, + inner: DeltaLayerWriter::new( + conf, + timeline_id, + tenant_shard_id, + start_key, + lsn_range.clone(), + ctx, + ) + .await?, + generated_layers: Vec::new(), + conf, + timeline_id, + tenant_shard_id, + lsn_range, + }) + } + + pub async fn put_value( + &mut self, + key: Key, + lsn: Lsn, + val: Value, + tline: &Arc, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate + // number, and therefore the final layer size could be a little bit larger or smaller than the target. + let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */; + if self.inner.num_keys() >= 1 + && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size + { + let next_delta_writer = DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + key, + self.lsn_range.clone(), + ctx, + ) + .await?; + let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer); + self.generated_layers + .push(prev_delta_writer.finish(key, tline, ctx).await?); + } + self.inner.put_value(key, lsn, val, ctx).await + } + + pub(crate) async fn finish( + self, + tline: &Arc, + ctx: &RequestContext, + end_key: Key, + ) -> anyhow::Result> { + let Self { + mut generated_layers, + inner, + .. + } = self; + generated_layers.push(inner.finish(end_key, tline, ctx).await?); + Ok(generated_layers) + } + + /// When split writer fails, the caller should call this function and handle partially generated layers. + #[allow(dead_code)] + pub(crate) async fn take(self) -> anyhow::Result<(Vec, DeltaLayerWriter)> { + Ok((self.generated_layers, self.inner)) + } +} + +#[cfg(test)] +mod tests { + use crate::{ + tenant::{ + harness::{TenantHarness, TIMELINE_ID}, + storage_layer::AsLayerDesc, + }, + DEFAULT_PG_VERSION, + }; + + use super::*; + + fn get_key(id: u32) -> Key { + let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + fn get_img(id: u32) -> Bytes { + format!("{id:064}").into() + } + + fn get_large_img() -> Bytes { + vec![0; 8192].into() + } + + #[tokio::test] + async fn write_one_image() { + let harness = TenantHarness::create("split_writer_write_one_image") + .await + .unwrap(); + let (tenant, ctx) = harness.load().await; + + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + let mut image_writer = SplitImageLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x18), + 4 * 1024 * 1024, + &ctx, + ) + .await + .unwrap(); + + let mut delta_writer = SplitDeltaLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x18)..Lsn(0x20), + 4 * 1024 * 1024, + &ctx, + ) + .await + .unwrap(); + + image_writer + .put_image(get_key(0), get_img(0), &tline, &ctx) + .await + .unwrap(); + let layers = image_writer + .finish(&tline, &ctx, get_key(10)) + .await + .unwrap(); + assert_eq!(layers.len(), 1); + + delta_writer + .put_value( + get_key(0), + Lsn(0x18), + Value::Image(get_img(0)), + &tline, + &ctx, + ) + .await + .unwrap(); + let layers = delta_writer + .finish(&tline, &ctx, get_key(10)) + .await + .unwrap(); + assert_eq!(layers.len(), 1); + } + + #[tokio::test] + async fn write_split() { + let harness = TenantHarness::create("split_writer_write_split") + .await + .unwrap(); + let (tenant, ctx) = harness.load().await; + + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + let mut image_writer = SplitImageLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x18), + 4 * 1024 * 1024, + &ctx, + ) + .await + .unwrap(); + let mut delta_writer = SplitDeltaLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x18)..Lsn(0x20), + 4 * 1024 * 1024, + &ctx, + ) + .await + .unwrap(); + const N: usize = 2000; + for i in 0..N { + let i = i as u32; + image_writer + .put_image(get_key(i), get_large_img(), &tline, &ctx) + .await + .unwrap(); + delta_writer + .put_value( + get_key(i), + Lsn(0x20), + Value::Image(get_large_img()), + &tline, + &ctx, + ) + .await + .unwrap(); + } + let image_layers = image_writer + .finish(&tline, &ctx, get_key(N as u32)) + .await + .unwrap(); + let delta_layers = delta_writer + .finish(&tline, &ctx, get_key(N as u32)) + .await + .unwrap(); + assert_eq!(image_layers.len(), N / 512 + 1); + assert_eq!(delta_layers.len(), N / 512 + 1); + for idx in 0..image_layers.len() { + assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN); + assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX); + assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN); + assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX); + if idx > 0 { + assert_eq!( + image_layers[idx - 1].layer_desc().key_range.end, + image_layers[idx].layer_desc().key_range.start + ); + assert_eq!( + delta_layers[idx - 1].layer_desc().key_range.end, + delta_layers[idx].layer_desc().key_range.start + ); + } + } + } + + #[tokio::test] + async fn write_large_img() { + let harness = TenantHarness::create("split_writer_write_large_img") + .await + .unwrap(); + let (tenant, ctx) = harness.load().await; + + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + let mut image_writer = SplitImageLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x18), + 4 * 1024, + &ctx, + ) + .await + .unwrap(); + + let mut delta_writer = SplitDeltaLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x18)..Lsn(0x20), + 4 * 1024, + &ctx, + ) + .await + .unwrap(); + + image_writer + .put_image(get_key(0), get_img(0), &tline, &ctx) + .await + .unwrap(); + image_writer + .put_image(get_key(1), get_large_img(), &tline, &ctx) + .await + .unwrap(); + let layers = image_writer + .finish(&tline, &ctx, get_key(10)) + .await + .unwrap(); + assert_eq!(layers.len(), 2); + + delta_writer + .put_value( + get_key(0), + Lsn(0x18), + Value::Image(get_img(0)), + &tline, + &ctx, + ) + .await + .unwrap(); + delta_writer + .put_value( + get_key(1), + Lsn(0x1A), + Value::Image(get_large_img()), + &tline, + &ctx, + ) + .await + .unwrap(); + let layers = delta_writer + .finish(&tline, &ctx, get_key(10)) + .await + .unwrap(); + assert_eq!(layers.len(), 2); + } +} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index abda60bb11..490e8e6011 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -143,7 +143,10 @@ use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::{config::TenantConf, upload_queue::NotInitialized}; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe}; -use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer}; +use super::{ + remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError, + storage_layer::ReadableLayer, +}; use super::{ secondary::heatmap::{HeatMapLayer, HeatMapTimeline}, GcError, @@ -4089,6 +4092,21 @@ impl Timeline { // release lock on 'layers' }; + // Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files. + // This makes us refuse ingest until the new layers have been persisted to the remote. + self.remote_client + .wait_completion() + .await + .map_err(|e| match e { + WaitCompletionError::UploadQueueShutDownOrStopped + | WaitCompletionError::NotInitialized( + NotInitialized::ShuttingDown | NotInitialized::Stopped, + ) => FlushLayerError::Cancelled, + WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => { + FlushLayerError::Other(anyhow!(e).into()) + } + })?; + // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`, // a compaction can delete the file and then it won't be available for uploads any more. // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 1e2df7a90a..1daae300aa 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1649,19 +1649,23 @@ impl Timeline { use std::collections::BTreeSet; // Block other compaction/GC tasks from running for now. GC-compaction could run along - // with legacy compaction tasks in the future. + // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc. + // Note that we already acquired the compaction lock when the outer `compact` function gets called. - let _compaction_lock = tokio::select! { - guard = self.compaction_lock.lock() => guard, - // TODO: refactor to CompactionError to correctly pass cancelled error - _ = cancel.cancelled() => return Err(anyhow!("cancelled")), + let gc_lock = async { + tokio::select! { + guard = self.gc_lock.lock() => Ok(guard), + // TODO: refactor to CompactionError to correctly pass cancelled error + _ = cancel.cancelled() => Err(anyhow!("cancelled")), + } }; - let _gc = tokio::select! { - guard = self.gc_lock.lock() => guard, - // TODO: refactor to CompactionError to correctly pass cancelled error - _ = cancel.cancelled() => return Err(anyhow!("cancelled")), - }; + let gc_lock = crate::timed( + gc_lock, + "acquires gc lock", + std::time::Duration::from_secs(5), + ) + .await?; info!("running enhanced gc bottom-most compaction"); @@ -2068,9 +2072,11 @@ impl Timeline { let mut guard = self.layers.write().await; guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics) }; - self.remote_client .schedule_compaction_update(&layer_selection, &compact_to)?; + + drop(gc_lock); + Ok(()) } } diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 9b2403f899..05178c38b4 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -63,10 +63,19 @@ pub(super) async fn delete_local_timeline_directory( tenant_shard_id: TenantShardId, timeline: &Timeline, ) -> anyhow::Result<()> { - let guards = async { tokio::join!(timeline.gc_lock.lock(), timeline.compaction_lock.lock()) }; - let guards = crate::timed( - guards, - "acquire gc and compaction locks", + // Always ensure the lock order is compaction -> gc. + let compaction_lock = timeline.compaction_lock.lock(); + let compaction_lock = crate::timed( + compaction_lock, + "acquires compaction lock", + std::time::Duration::from_secs(5), + ) + .await; + + let gc_lock = timeline.gc_lock.lock(); + let gc_lock = crate::timed( + gc_lock, + "acquires gc lock", std::time::Duration::from_secs(5), ) .await; @@ -107,7 +116,8 @@ pub(super) async fn delete_local_timeline_directory( .context("fsync_pre_mark_remove")?; info!("finished deleting layer files, releasing locks"); - drop(guards); + drop(gc_lock); + drop(compaction_lock); fail::fail_point!("timeline-delete-after-rm", |_| { Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))? diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 2365fd0587..41c2d3fe08 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -170,11 +170,6 @@ struct Args { /// still needed for existing replication connection. #[arg(long)] walsenders_keep_horizon: bool, - /// Enable partial backup. If disabled, safekeeper will not upload partial - /// segments to remote storage. - /// TODO: now partial backup is always enabled, remove this flag. - #[arg(long)] - partial_backup_enabled: bool, /// Controls how long backup will wait until uploading the partial segment. #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_PARTIAL_BACKUP_TIMEOUT, verbatim_doc_comment)] partial_backup_timeout: Duration, @@ -347,7 +342,6 @@ async fn main() -> anyhow::Result<()> { sk_auth_token, current_thread_runtime: args.current_thread_runtime, walsenders_keep_horizon: args.walsenders_keep_horizon, - partial_backup_enabled: true, partial_backup_timeout: args.partial_backup_timeout, disable_periodic_broker_push: args.disable_periodic_broker_push, enable_offload: args.enable_offload, diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 8f2920ada3..2e11a279ca 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -21,6 +21,7 @@ pub mod json_ctrl; pub mod metrics; pub mod patch_control_file; pub mod pull_timeline; +pub mod rate_limit; pub mod receive_wal; pub mod recovery; pub mod remove_wal; @@ -53,6 +54,7 @@ pub mod defaults { pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m"; pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s"; pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5"; + pub const DEFAULT_EVICTION_CONCURRENCY: usize = 2; // By default, our required residency before eviction is the same as the period that passes // before uploading a partial segment, so that in normal operation the eviction can happen @@ -91,7 +93,6 @@ pub struct SafeKeeperConf { pub sk_auth_token: Option, pub current_thread_runtime: bool, pub walsenders_keep_horizon: bool, - pub partial_backup_enabled: bool, pub partial_backup_timeout: Duration, pub disable_periodic_broker_push: bool, pub enable_offload: bool, @@ -135,7 +136,6 @@ impl SafeKeeperConf { max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES, current_thread_runtime: false, walsenders_keep_horizon: false, - partial_backup_enabled: false, partial_backup_timeout: Duration::from_secs(0), disable_periodic_broker_push: false, enable_offload: false, diff --git a/safekeeper/src/rate_limit.rs b/safekeeper/src/rate_limit.rs new file mode 100644 index 0000000000..72373b5786 --- /dev/null +++ b/safekeeper/src/rate_limit.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use rand::Rng; + +use crate::metrics::MISC_OPERATION_SECONDS; + +/// Global rate limiter for background tasks. +#[derive(Clone)] +pub struct RateLimiter { + partial_backup: Arc, + eviction: Arc, +} + +impl RateLimiter { + /// Create a new rate limiter. + /// - `partial_backup_max`: maximum number of concurrent partial backups. + /// - `eviction_max`: maximum number of concurrent timeline evictions. + pub fn new(partial_backup_max: usize, eviction_max: usize) -> Self { + Self { + partial_backup: Arc::new(tokio::sync::Semaphore::new(partial_backup_max)), + eviction: Arc::new(tokio::sync::Semaphore::new(eviction_max)), + } + } + + /// Get a permit for partial backup. This will block if the maximum number of concurrent + /// partial backups is reached. + pub async fn acquire_partial_backup(&self) -> tokio::sync::OwnedSemaphorePermit { + let _timer = MISC_OPERATION_SECONDS + .with_label_values(&["partial_permit_acquire"]) + .start_timer(); + self.partial_backup + .clone() + .acquire_owned() + .await + .expect("semaphore is closed") + } + + /// Try to get a permit for timeline eviction. This will return None if the maximum number of + /// concurrent timeline evictions is reached. + pub fn try_acquire_eviction(&self) -> Option { + self.eviction.clone().try_acquire_owned().ok() + } +} + +/// Generate a random duration that is a fraction of the given duration. +pub fn rand_duration(duration: &std::time::Duration) -> std::time::Duration { + let randf64 = rand::thread_rng().gen_range(0.0..1.0); + duration.mul_f64(randf64) +} diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 132e5ec32f..57935d879f 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -25,6 +25,7 @@ use utils::{ use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; +use crate::rate_limit::RateLimiter; use crate::receive_wal::WalReceivers; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn, @@ -36,7 +37,7 @@ use crate::timeline_guard::ResidenceGuard; use crate::timeline_manager::{AtomicStatus, ManagerCtl}; use crate::timelines_set::TimelinesSet; use crate::wal_backup::{self}; -use crate::wal_backup_partial::{PartialRemoteSegment, RateLimiter}; +use crate::wal_backup_partial::PartialRemoteSegment; use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS}; diff --git a/safekeeper/src/timeline_eviction.rs b/safekeeper/src/timeline_eviction.rs index 7947d83eb4..ae6f3f4b7e 100644 --- a/safekeeper/src/timeline_eviction.rs +++ b/safekeeper/src/timeline_eviction.rs @@ -5,7 +5,6 @@ use anyhow::Context; use camino::Utf8PathBuf; use remote_storage::RemotePath; -use std::time::Instant; use tokio::{ fs::File, io::{AsyncRead, AsyncWriteExt}, @@ -15,6 +14,7 @@ use utils::crashsafe::durable_rename; use crate::{ metrics::{EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED}, + rate_limit::rand_duration, timeline_manager::{Manager, StateSnapshot}, wal_backup, wal_backup_partial::{self, PartialRemoteSegment}, @@ -50,7 +50,6 @@ impl Manager { .flush_lsn .segment_number(self.wal_seg_size) == self.last_removed_segno + 1 - && self.resident_since.elapsed() >= self.conf.eviction_min_resident } /// Evict the timeline to remote storage. @@ -112,7 +111,8 @@ impl Manager { return; } - self.resident_since = Instant::now(); + self.evict_not_before = + tokio::time::Instant::now() + rand_duration(&self.conf.eviction_min_resident); info!("successfully restored evicted timeline"); } diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index debf8c824f..482614fac7 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -23,6 +23,7 @@ use utils::lsn::Lsn; use crate::{ control_file::{FileStorage, Storage}, metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS}, + rate_limit::{rand_duration, RateLimiter}, recovery::recovery_main, remove_wal::calc_horizon_lsn, safekeeper::Term, @@ -32,7 +33,7 @@ use crate::{ timeline_guard::{AccessService, GuardId, ResidenceGuard}, timelines_set::{TimelineSetGuard, TimelinesSet}, wal_backup::{self, WalBackupTaskHandle}, - wal_backup_partial::{self, PartialRemoteSegment, RateLimiter}, + wal_backup_partial::{self, PartialRemoteSegment}, SafeKeeperConf, }; @@ -185,11 +186,11 @@ pub(crate) struct Manager { // misc pub(crate) access_service: AccessService, - pub(crate) partial_backup_rate_limiter: RateLimiter, + pub(crate) global_rate_limiter: RateLimiter, // Anti-flapping state: we evict timelines eagerly if they are inactive, but should not // evict them if they go inactive very soon after being restored. - pub(crate) resident_since: std::time::Instant, + pub(crate) evict_not_before: Instant, } /// This task gets spawned alongside each timeline and is responsible for managing the timeline's @@ -202,7 +203,7 @@ pub async fn main_task( broker_active_set: Arc, manager_tx: tokio::sync::mpsc::UnboundedSender, mut manager_rx: tokio::sync::mpsc::UnboundedReceiver, - partial_backup_rate_limiter: RateLimiter, + global_rate_limiter: RateLimiter, ) { tli.set_status(Status::Started); @@ -220,7 +221,7 @@ pub async fn main_task( conf, broker_active_set, manager_tx, - partial_backup_rate_limiter, + global_rate_limiter, ) .await; @@ -254,9 +255,29 @@ pub async fn main_task( mgr.set_status(Status::UpdatePartialBackup); mgr.update_partial_backup(&state_snapshot).await; - if mgr.conf.enable_offload && mgr.ready_for_eviction(&next_event, &state_snapshot) { - mgr.set_status(Status::EvictTimeline); - mgr.evict_timeline().await; + let now = Instant::now(); + if mgr.evict_not_before > now { + // we should wait until evict_not_before + update_next_event(&mut next_event, mgr.evict_not_before); + } + + if mgr.conf.enable_offload + && mgr.evict_not_before <= now + && mgr.ready_for_eviction(&next_event, &state_snapshot) + { + // check rate limiter and evict timeline if possible + match mgr.global_rate_limiter.try_acquire_eviction() { + Some(_permit) => { + mgr.set_status(Status::EvictTimeline); + mgr.evict_timeline().await; + } + None => { + // we can't evict timeline now, will try again later + mgr.evict_not_before = + Instant::now() + rand_duration(&mgr.conf.eviction_min_resident); + update_next_event(&mut next_event, mgr.evict_not_before); + } + } } } @@ -334,11 +355,10 @@ impl Manager { conf: SafeKeeperConf, broker_active_set: Arc, manager_tx: tokio::sync::mpsc::UnboundedSender, - partial_backup_rate_limiter: RateLimiter, + global_rate_limiter: RateLimiter, ) -> Manager { let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await; Manager { - conf, wal_seg_size: tli.get_wal_seg_size().await, walsenders: tli.get_walsenders().clone(), state_version_rx: tli.get_state_version_rx(), @@ -353,8 +373,10 @@ impl Manager { partial_backup_uploaded, access_service: AccessService::new(manager_tx), tli, - partial_backup_rate_limiter, - resident_since: std::time::Instant::now(), + global_rate_limiter, + // to smooth out evictions spike after restart + evict_not_before: Instant::now() + rand_duration(&conf.eviction_min_resident), + conf, } } @@ -522,8 +544,8 @@ impl Manager { /// Spawns partial WAL backup task if needed. async fn update_partial_backup(&mut self, state: &StateSnapshot) { - // check if partial backup is enabled and should be started - if !self.conf.is_wal_backup_enabled() || !self.conf.partial_backup_enabled { + // check if WAL backup is enabled and should be started + if !self.conf.is_wal_backup_enabled() { return; } @@ -541,7 +563,7 @@ impl Manager { self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task( self.wal_resident_timeline(), self.conf.clone(), - self.partial_backup_rate_limiter.clone(), + self.global_rate_limiter.clone(), ))); } diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index f57da5c7cb..6662e18817 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -2,10 +2,11 @@ //! All timelines should always be present in this map, this is done by loading them //! all from the disk on startup and keeping them in memory. +use crate::defaults::DEFAULT_EVICTION_CONCURRENCY; +use crate::rate_limit::RateLimiter; use crate::safekeeper::ServerInfo; use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError}; use crate::timelines_set::TimelinesSet; -use crate::wal_backup_partial::RateLimiter; use crate::SafeKeeperConf; use anyhow::{bail, Context, Result}; use camino::Utf8PathBuf; @@ -31,7 +32,7 @@ struct GlobalTimelinesState { conf: Option, broker_active_set: Arc, load_lock: Arc>, - partial_backup_rate_limiter: RateLimiter, + global_rate_limiter: RateLimiter, } // Used to prevent concurrent timeline loading. @@ -50,7 +51,7 @@ impl GlobalTimelinesState { ( self.get_conf().clone(), self.broker_active_set.clone(), - self.partial_backup_rate_limiter.clone(), + self.global_rate_limiter.clone(), ) } @@ -85,7 +86,7 @@ static TIMELINES_STATE: Lazy> = Lazy::new(|| { conf: None, broker_active_set: Arc::new(TimelinesSet::default()), load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)), - partial_backup_rate_limiter: RateLimiter::new(1), + global_rate_limiter: RateLimiter::new(1, 1), }) }); @@ -99,7 +100,10 @@ impl GlobalTimelines { // lock, so use explicit block let tenants_dir = { let mut state = TIMELINES_STATE.lock().unwrap(); - state.partial_backup_rate_limiter = RateLimiter::new(conf.partial_backup_concurrency); + state.global_rate_limiter = RateLimiter::new( + conf.partial_backup_concurrency, + DEFAULT_EVICTION_CONCURRENCY, + ); state.conf = Some(conf); // Iterate through all directories and load tenants for all directories diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index b1efa9749f..52765b0e98 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -18,8 +18,6 @@ //! This way control file stores information about all potentially existing //! remote partial segments and can clean them up after uploading a newer version. -use std::sync::Arc; - use camino::Utf8PathBuf; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; use remote_storage::RemotePath; @@ -30,6 +28,7 @@ use utils::lsn::Lsn; use crate::{ metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS}, + rate_limit::{rand_duration, RateLimiter}, safekeeper::Term, timeline::WalResidentTimeline, timeline_manager::StateSnapshot, @@ -37,30 +36,6 @@ use crate::{ SafeKeeperConf, }; -#[derive(Clone)] -pub struct RateLimiter { - semaphore: Arc, -} - -impl RateLimiter { - pub fn new(permits: usize) -> Self { - Self { - semaphore: Arc::new(tokio::sync::Semaphore::new(permits)), - } - } - - async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit { - let _timer = MISC_OPERATION_SECONDS - .with_label_values(&["partial_permit_acquire"]) - .start_timer(); - self.semaphore - .clone() - .acquire_owned() - .await - .expect("semaphore is closed") - } -} - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum UploadStatus { /// Upload is in progress. This status should be used only for garbage collection, @@ -352,6 +327,7 @@ pub async fn main_task( ) -> Option { debug!("started"); let await_duration = conf.partial_backup_timeout; + let mut first_iteration = true; let (_, persistent_state) = tli.get_state().await; let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx(); @@ -419,6 +395,15 @@ pub async fn main_task( } } + // smoothing the load after restart, by sleeping for a random time. + // if this is not the first iteration, we will wait for the full await_duration + let await_duration = if first_iteration { + first_iteration = false; + rand_duration(&await_duration) + } else { + await_duration + }; + // fixing the segno and waiting some time to prevent reuploading the same segment too often let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn); let timeout = tokio::time::sleep(await_duration); @@ -454,7 +439,7 @@ pub async fn main_task( } // limit concurrent uploads - let _upload_permit = limiter.acquire_owned().await; + let _upload_permit = limiter.acquire_partial_backup().await; let prepared = backup.prepare_upload().await; if let Some(seg) = &uploaded_segment { diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 0c6d97ddfa..771d905c90 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -181,7 +181,6 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { sk_auth_token: None, current_thread_runtime: false, walsenders_keep_horizon: false, - partial_backup_enabled: false, partial_backup_timeout: Duration::from_secs(0), disable_periodic_broker_push: false, enable_offload: false, diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index d14b235046..ecaac04915 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -32,6 +32,7 @@ once_cell.workspace = true pageserver_api.workspace = true pageserver_client.workspace = true postgres_connection.workspace = true +rand.workspace = true reqwest = { workspace = true, features = ["stream"] } routerify.workspace = true serde.workspace = true diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index adbf5c6496..2799f21fdc 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -9,12 +9,14 @@ use std::time::Duration; use storage_controller::http::make_router; use storage_controller::metrics::preinitialize_metrics; use storage_controller::persistence::Persistence; +use storage_controller::service::chaos_injector::ChaosInjector; use storage_controller::service::{ Config, Service, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT, }; use tokio::signal::unix::SignalKind; use tokio_util::sync::CancellationToken; +use tracing::Instrument; use utils::auth::{JwtAuth, SwappableJwtAuth}; use utils::logging::{self, LogFormat}; @@ -86,6 +88,10 @@ struct Cli { // TODO: make `cfg(feature = "testing")` #[arg(long)] neon_local_repo_dir: Option, + + /// Chaos testing + #[arg(long)] + chaos_interval: Option, } enum StrictMode { @@ -309,6 +315,22 @@ async fn async_main() -> anyhow::Result<()> { tracing::info!("Serving on {0}", args.listen); let server_task = tokio::task::spawn(server); + let chaos_task = args.chaos_interval.map(|interval| { + let service = service.clone(); + let cancel = CancellationToken::new(); + let cancel_bg = cancel.clone(); + ( + tokio::task::spawn( + async move { + let mut chaos_injector = ChaosInjector::new(service, interval.into()); + chaos_injector.run(cancel_bg).await + } + .instrument(tracing::info_span!("chaos_injector")), + ), + cancel, + ) + }); + // Wait until we receive a signal let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?; @@ -337,6 +359,12 @@ async fn async_main() -> anyhow::Result<()> { } } + // If we were injecting chaos, stop that so that we're not calling into Service while it shuts down + if let Some((chaos_jh, chaos_cancel)) = chaos_task { + chaos_cancel.cancel(); + chaos_jh.await.ok(); + } + service.shutdown().await; tracing::info!("Service shutdown complete"); diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index ea515f67da..6940bf2c64 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -84,6 +84,8 @@ use crate::{ }; use serde::{Deserialize, Serialize}; +pub mod chaos_injector; + // For operations that should be quick, like attaching a new tenant const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5); diff --git a/storage_controller/src/service/chaos_injector.rs b/storage_controller/src/service/chaos_injector.rs new file mode 100644 index 0000000000..99961d691c --- /dev/null +++ b/storage_controller/src/service/chaos_injector.rs @@ -0,0 +1,71 @@ +use std::{sync::Arc, time::Duration}; + +use rand::seq::SliceRandom; +use rand::thread_rng; +use tokio_util::sync::CancellationToken; + +use super::Service; + +pub struct ChaosInjector { + service: Arc, + interval: Duration, +} + +impl ChaosInjector { + pub fn new(service: Arc, interval: Duration) -> Self { + Self { service, interval } + } + + pub async fn run(&mut self, cancel: CancellationToken) { + let mut interval = tokio::time::interval(self.interval); + + loop { + tokio::select! { + _ = interval.tick() => {} + _ = cancel.cancelled() => { + tracing::info!("Shutting down"); + return; + } + } + + self.inject_chaos().await; + + tracing::info!("Chaos iteration..."); + } + } + + async fn inject_chaos(&mut self) { + // Pick some shards to interfere with + let batch_size = 128; + let mut inner = self.service.inner.write().unwrap(); + let (nodes, tenants, scheduler) = inner.parts_mut(); + let tenant_ids = tenants.keys().cloned().collect::>(); + let victims = tenant_ids.choose_multiple(&mut thread_rng(), batch_size); + + for victim in victims { + let shard = tenants + .get_mut(victim) + .expect("Held lock between choosing ID and this get"); + + // Pick a secondary to promote + let Some(new_location) = shard + .intent + .get_secondary() + .choose(&mut thread_rng()) + .cloned() + else { + tracing::info!("Skipping shard {victim}: no secondary location, can't migrate"); + continue; + }; + + let Some(old_location) = *shard.intent.get_attached() else { + tracing::info!("Skipping shard {victim}: currently has no attached location"); + continue; + }; + + shard.intent.demote_attached(scheduler, old_location); + shard.intent.promote_attached(scheduler, new_location); + self.service.maybe_reconcile_shard(shard, nodes); + } + } +} diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index 152319b731..1fc94cc174 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; +use aws_config::retry::{RetryConfigBuilder, RetryMode}; use aws_sdk_s3::config::Region; use aws_sdk_s3::error::DisplayErrorContext; use aws_sdk_s3::Client; @@ -314,8 +315,15 @@ pub fn init_logging(file_name: &str) -> Option { } async fn init_s3_client(bucket_region: Region) -> Client { + let mut retry_config_builder = RetryConfigBuilder::new(); + + retry_config_builder + .set_max_attempts(Some(3)) + .set_mode(Some(RetryMode::Adaptive)); + let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28()) .region(bucket_region) + .retry_config(retry_config_builder.build()) .load() .await; Client::new(&config) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 0c33dec784..7289472de2 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1943,11 +1943,15 @@ class NeonCli(AbstractNeonCli): remote_ext_config: Optional[str] = None, pageserver_id: Optional[int] = None, allow_multiple=False, + basebackup_request_tries: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "endpoint", "start", ] + extra_env_vars = {} + if basebackup_request_tries is not None: + extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries) if remote_ext_config is not None: args.extend(["--remote-ext-config", remote_ext_config]) @@ -1960,7 +1964,7 @@ class NeonCli(AbstractNeonCli): if allow_multiple: args.extend(["--allow-multiple"]) - res = self.raw_cli(args) + res = self.raw_cli(args, extra_env_vars) res.check_returncode() return res @@ -3812,6 +3816,7 @@ class Endpoint(PgProtocol, LogUtils): pageserver_id: Optional[int] = None, safekeepers: Optional[List[int]] = None, allow_multiple: bool = False, + basebackup_request_tries: Optional[int] = None, ) -> "Endpoint": """ Start the Postgres instance. @@ -3833,6 +3838,7 @@ class Endpoint(PgProtocol, LogUtils): remote_ext_config=remote_ext_config, pageserver_id=pageserver_id, allow_multiple=allow_multiple, + basebackup_request_tries=basebackup_request_tries, ) self._running.release(1) @@ -3979,6 +3985,7 @@ class Endpoint(PgProtocol, LogUtils): remote_ext_config: Optional[str] = None, pageserver_id: Optional[int] = None, allow_multiple=False, + basebackup_request_tries: Optional[int] = None, ) -> "Endpoint": """ Create an endpoint, apply config, and start Postgres. @@ -3999,6 +4006,7 @@ class Endpoint(PgProtocol, LogUtils): remote_ext_config=remote_ext_config, pageserver_id=pageserver_id, allow_multiple=allow_multiple, + basebackup_request_tries=basebackup_request_tries, ) log.info(f"Postgres startup took {time.time() - started_at} seconds") @@ -4042,6 +4050,7 @@ class EndpointFactory: config_lines: Optional[List[str]] = None, remote_ext_config: Optional[str] = None, pageserver_id: Optional[int] = None, + basebackup_request_tries: Optional[int] = None, ) -> Endpoint: ep = Endpoint( self.env, @@ -4060,6 +4069,7 @@ class EndpointFactory: lsn=lsn, remote_ext_config=remote_ext_config, pageserver_id=pageserver_id, + basebackup_request_tries=basebackup_request_tries, ) def create( @@ -4529,6 +4539,13 @@ def test_output_dir( yield test_dir + # Allure artifacts creation might involve the creation of `.tar.zst` archives, + # which aren't going to be used if Allure results collection is not enabled + # (i.e. --alluredir is not set). + # Skip `allure_attach_from_dir` in this case + if not request.config.getoption("--alluredir"): + return + preserve_database_files = False for k, v in request.node.user_properties: # NB: the neon_env_builder fixture uses this fixture (test_output_dir). diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index c6df6b5baf..192324f086 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -663,6 +663,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): force_image_layer_creation=False, wait_until_uploaded=False, compact: Optional[bool] = None, + **kwargs, ): self.is_testing_enabled_or_skip() query = {} @@ -680,6 +681,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): res = self.put( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint", params=query, + **kwargs, ) log.info(f"Got checkpoint request response code: {res.status_code}") self.verbose_error(res) diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index 190b624a54..fc74707639 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -18,7 +18,6 @@ from fixtures.pageserver.utils import wait_until_tenant_active from fixtures.utils import query_scalar from performance.test_perf_pgbench import get_scales_matrix from requests import RequestException -from requests.exceptions import RetryError # Test branch creation @@ -151,7 +150,7 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE env.pageserver.allowed_errors.extend( [ ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*", - ".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading", + ".*page_service_conn_main.*: query handler for 'basebackup .* ERROR: Not found: Timeline", ] ) ps_http = env.pageserver.http_client() @@ -176,10 +175,12 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline) - with pytest.raises(RuntimeError, match="is not active, state: Loading"): - env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant) + with pytest.raises(RuntimeError, match="ERROR: Not found: Timeline"): + env.endpoints.create_start( + initial_branch, tenant_id=env.initial_tenant, basebackup_request_tries=2 + ) + ps_http.configure_failpoints(("before-upload-index-pausable", "off")) finally: - # FIXME: paused uploads bother shutdown env.pageserver.stop(immediate=True) t.join() @@ -193,8 +194,11 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder env = neon_env_builder.init_configs() env.start() - env.pageserver.allowed_errors.append( - ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*" + env.pageserver.allowed_errors.extend( + [ + ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*", + ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: .*Cannot branch off the timeline that's not present in pageserver.*", + ] ) ps_http = env.pageserver.http_client() @@ -216,7 +220,10 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder branch_id = TimelineId.generate() - with pytest.raises(RetryError, match="too many 503 error responses"): + with pytest.raises( + PageserverApiException, + match="Cannot branch off the timeline that's not present in pageserver", + ): ps_http.timeline_create( env.pg_version, env.initial_tenant, diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 411b20b2c4..137b0e931d 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -3,18 +3,15 @@ import re import shutil import subprocess import tempfile +from dataclasses import dataclass from pathlib import Path from typing import List, Optional import pytest import toml -from fixtures.common_types import Lsn +from fixtures.common_types import Lsn, TenantId, TimelineId from fixtures.log_helper import log -from fixtures.neon_fixtures import ( - NeonEnv, - NeonEnvBuilder, - PgBin, -) +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import ( timeline_delete_wait_completed, @@ -22,7 +19,8 @@ from fixtures.pageserver.utils import ( wait_for_upload, ) from fixtures.pg_version import PgVersion -from fixtures.remote_storage import RemoteStorageKind +from fixtures.remote_storage import RemoteStorageKind, S3Storage, s3_storage +from fixtures.workload import Workload # # A test suite that help to prevent unintentionally breaking backward or forward compatibility between Neon releases. @@ -409,3 +407,133 @@ def dump_differs( break return differs + + +@dataclass +class HistoricDataSet: + name: str + tenant_id: TenantId + pg_version: PgVersion + url: str + + def __str__(self): + return self.name + + +HISTORIC_DATA_SETS = [ + # From before we enabled image layer compression. + # - IndexPart::LATEST_VERSION 7 + # - STORAGE_FORMAT_VERSION 3 + HistoricDataSet( + "2024-07-18", + TenantId("17bf64a53509714687664b3a84e9b3ba"), + PgVersion.V16, + "https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2024-07-18-pgv16.tar.zst", + ), +] + + +@pytest.mark.parametrize("dataset", HISTORIC_DATA_SETS) +@pytest.mark.xdist_group("compatibility") +def test_historic_storage_formats( + neon_env_builder: NeonEnvBuilder, + test_output_dir: Path, + pg_version: PgVersion, + dataset: HistoricDataSet, +): + """ + This test is like test_backward_compatibility, but it looks back further to examples of our storage format from long ago. + """ + + ARTIFACT_CACHE_DIR = "./artifact_cache" + + import tarfile + from contextlib import closing + + import requests + import zstandard + + artifact_unpack_path = ARTIFACT_CACHE_DIR / Path("unpacked") / Path(dataset.name) + + # Note: we assume that when running across a matrix of PG versions, the matrix includes all the versions needed by + # HISTORIC_DATA_SETS. If we ever remove a PG version from the matrix, then historic datasets built using that version + # will no longer be covered by this test. + if pg_version != dataset.pg_version: + pytest.skip(f"Dataset {dataset} is for different PG version, skipping") + + with closing(requests.get(dataset.url, stream=True)) as r: + unzstd = zstandard.ZstdDecompressor() + with unzstd.stream_reader(r.raw) as stream: + with tarfile.open(mode="r|", fileobj=stream) as tf: + tf.extractall(artifact_unpack_path) + + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + neon_env_builder.pg_version = dataset.pg_version + env = neon_env_builder.init_configs() + env.start() + assert isinstance(env.pageserver_remote_storage, S3Storage) + + # Link artifact data into test's remote storage. We don't want the whole repo dir, just the remote storage part: we are not testing + # compat of local disk data across releases (test_backward_compat does that), we're testing really long-lived data in S3 like layer files and indices. + # + # The code generating the snapshot uses local_fs, but this test uses S3Storage, so we are copying a tree of files into a bucket. We use + # S3Storage so that the scrubber can run (the scrubber doesn't speak local_fs) + artifact_pageserver_path = ( + artifact_unpack_path / Path("repo") / Path("local_fs_remote_storage") / Path("pageserver") + ) + for root, _dirs, files in os.walk(artifact_pageserver_path): + for file in files: + local_path = os.path.join(root, file) + remote_key = ( + env.pageserver_remote_storage.prefix_in_bucket + + str(local_path)[len(str(artifact_pageserver_path)) :] + ) + log.info(f"Uploading {local_path} -> {remote_key}") + env.pageserver_remote_storage.client.upload_file( + local_path, env.pageserver_remote_storage.bucket_name, remote_key + ) + + # Check the scrubber handles this old data correctly (can read it and doesn't consider it corrupt) + # + # Do this _before_ importing to the pageserver, as that import may start writing immediately + metadata_summary = env.storage_scrubber.scan_metadata() + assert metadata_summary["tenant_count"] >= 1 + assert metadata_summary["timeline_count"] >= 1 + assert not metadata_summary["with_errors"] + assert not metadata_summary["with_warnings"] + + env.neon_cli.import_tenant(dataset.tenant_id) + + # Discover timelines + timelines = env.pageserver.http_client().timeline_list(dataset.tenant_id) + # All our artifacts should contain at least one timeline + assert len(timelines) > 0 + + # TODO: ensure that the snapshots we're importing contain a sensible variety of content, at the very + # least they should include a mixture of deltas and image layers. Preferably they should also + # contain some "exotic" stuff like aux files from logical replication. + + # Check we can start an endpoint and read the SQL that the artifact is meant to contain + reference_sql_dump = artifact_unpack_path / Path("dump.sql") + ep = env.endpoints.create_start("main", tenant_id=dataset.tenant_id) + pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version) + pg_bin.run_capture( + ["pg_dumpall", f"--dbname={ep.connstr()}", f"--file={test_output_dir / 'dump.sql'}"] + ) + assert not dump_differs( + reference_sql_dump, + test_output_dir / "dump.sql", + test_output_dir / "dump.filediff", + ) + ep.stop() + + # Check we can also do writes to the database + existing_timeline_id = TimelineId(timelines[0]["timeline_id"]) + workload = Workload(env, dataset.tenant_id, existing_timeline_id) + workload.init() + workload.write_rows(100) + + # Check that compaction works + env.pageserver.http_client().timeline_compact( + dataset.tenant_id, existing_timeline_id, force_image_layer_creation=True + ) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 09f941f582..2e5260ca78 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -12,7 +12,6 @@ from fixtures.neon_fixtures import ( NeonEnvBuilder, wait_for_last_flush_lsn, ) -from fixtures.pageserver.common_types import parse_layer_file_name from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pageserver.utils import ( timeline_delete_wait_completed, @@ -313,6 +312,7 @@ def test_remote_storage_upload_queue_retries( def churn_while_failpoints_active(result): overwrite_data_and_wait_for_it_to_arrive_at_pageserver("c") + # this call will wait for the failpoints to be turned off client.timeline_checkpoint(tenant_id, timeline_id) client.timeline_compact(tenant_id, timeline_id) overwrite_data_and_wait_for_it_to_arrive_at_pageserver("d") @@ -332,8 +332,8 @@ def test_remote_storage_upload_queue_retries( # Exponential back-off in upload queue, so, gracious timeouts. wait_until(30, 1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="upload"), 0)) - wait_until(30, 1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 2)) - wait_until(30, 1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="delete"), 0)) + wait_until(30, 1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 1)) + wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0)) # unblock churn operations configure_storage_sync_failpoints("off") @@ -769,11 +769,11 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv create_thread.join() -def test_compaction_waits_for_upload( +def test_paused_upload_stalls_checkpoint( neon_env_builder: NeonEnvBuilder, ): """ - This test forces a race between upload and compaction. + This test checks that checkpoints block on uploads to remote storage. """ neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) @@ -788,6 +788,10 @@ def test_compaction_waits_for_upload( } ) + env.pageserver.allowed_errors.append( + f".*PUT.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing" + ) + tenant_id = env.initial_tenant timeline_id = env.initial_timeline @@ -808,76 +812,9 @@ def test_compaction_waits_for_upload( endpoint.safe_psql("CREATE TABLE foo AS SELECT x FROM generate_series(1, 10000) g(x)") wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) - client.timeline_checkpoint(tenant_id, timeline_id) - deltas_at_first = len(client.layer_map_info(tenant_id, timeline_id).delta_layers()) - assert ( - deltas_at_first == 2 - ), "are you fixing #5863? just add one more checkpoint after 'CREATE TABLE bar ...' statement." - - endpoint.safe_psql("CREATE TABLE bar AS SELECT x FROM generate_series(1, 10000) g(x)") - endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1") - wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) - - layers_before_last_checkpoint = client.layer_map_info(tenant_id, timeline_id).historic_by_name() - upload_stuck_layers = layers_before_last_checkpoint - layers_at_creation.historic_by_name() - - assert len(upload_stuck_layers) > 0 - - for name in upload_stuck_layers: - assert env.pageserver.layer_exists( - tenant_id, timeline_id, parse_layer_file_name(name) - ), "while uploads are stuck the layers should be present on disk" - - # now this will do the L0 => L1 compaction and want to remove - # upload_stuck_layers and the original initdb L0 - client.timeline_checkpoint(tenant_id, timeline_id) - - # as uploads are paused, the upload_stuck_layers should still be with us - for name in upload_stuck_layers: - assert env.pageserver.layer_exists( - tenant_id, timeline_id, parse_layer_file_name(name) - ), "uploads are stuck still over compaction" - - compacted_layers = client.layer_map_info(tenant_id, timeline_id).historic_by_name() - overlap = compacted_layers.intersection(upload_stuck_layers) - assert len(overlap) == 0, "none of the L0's should remain after L0 => L1 compaction" - assert ( - len(compacted_layers) == 1 - ), "there should be one L1 after L0 => L1 compaction (without #5863 being fixed)" - - def layer_deletes_completed(): - m = client.get_metric_value("pageserver_layer_completed_deletes_total") - if m is None: - return 0 - return int(m) - - # if initdb created an initial delta layer, it might already be gc'd - # because it was uploaded before the failpoint was enabled. however, the - # deletion is not guaranteed to be complete. - assert layer_deletes_completed() <= 1 - - client.configure_failpoints(("before-upload-layer-pausable", "off")) - - # Ensure that this actually terminates - wait_upload_queue_empty(client, tenant_id, timeline_id) - - def until_layer_deletes_completed(): - deletes = layer_deletes_completed() - log.info(f"layer_deletes: {deletes}") - # ensure that initdb delta layer AND the previously stuck are now deleted - assert deletes >= len(upload_stuck_layers) + 1 - - wait_until(10, 1, until_layer_deletes_completed) - - for name in upload_stuck_layers: - assert not env.pageserver.layer_exists( - tenant_id, timeline_id, parse_layer_file_name(name) - ), "l0 should now be removed because of L0 => L1 compaction and completed uploads" - - # We should not have hit the error handling path in uploads where a uploaded file is gone - assert not env.pageserver.log_contains( - "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more." - ) + with pytest.raises(ReadTimeout): + client.timeline_checkpoint(tenant_id, timeline_id, timeout=5) + client.configure_failpoints(("before-upload-layer-pausable", "off")) def wait_upload_queue_empty( diff --git a/test_runner/regress/test_storage_scrubber.py b/test_runner/regress/test_storage_scrubber.py index fadf438788..e3f627b6a6 100644 --- a/test_runner/regress/test_storage_scrubber.py +++ b/test_runner/regress/test_storage_scrubber.py @@ -13,6 +13,7 @@ from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, ) +from fixtures.pg_version import PgVersion from fixtures.remote_storage import S3Storage, s3_storage from fixtures.utils import wait_until from fixtures.workload import Workload @@ -265,10 +266,85 @@ def test_scrubber_physical_gc_ancestors( # attach it, to drop any local state, then check it's still readable. workload.stop() drop_local_state(env, tenant_id) - workload.validate() +def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder): + """ + When we delete a timeline after a shard split, the child shards do not directly delete the + layers in the ancestor shards. They rely on the scrubber to clean up. + """ + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + neon_env_builder.num_pageservers = 2 + + env = neon_env_builder.init_configs() + env.start() + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + env.neon_cli.create_tenant( + tenant_id, + timeline_id, + shard_count=None, + conf={ + # Small layers and low compaction thresholds, so that when we split we can expect some to + # be dropped by child shards + "checkpoint_distance": f"{1024 * 1024}", + "compaction_threshold": "1", + "compaction_target_size": f"{1024 * 1024}", + "image_creation_threshold": "2", + "image_layer_creation_check_threshold": "0", + # Disable background compaction, we will do it explicitly + "compaction_period": "0s", + # No PITR, so that as soon as child shards generate an image layer, it covers ancestor deltas + # and makes them GC'able + "pitr_interval": "0s", + }, + ) + + # Make sure the original shard has some layers + workload = Workload(env, tenant_id, timeline_id) + workload.init() + workload.write_rows(100) + + new_shard_count = 4 + shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count) + + # Create a second timeline so that when we delete the first one, child shards still have some content in S3. + # + # This is a limitation of the scrubber: if a shard isn't in S3 (because it has no timelines), then the scrubber + # doesn't know about it, and won't perceive its ancestors as ancestors. + other_timeline_id = TimelineId.generate() + env.storage_controller.pageserver_api().timeline_create( + PgVersion.NOT_SET, tenant_id, other_timeline_id + ) + + # Write after split so that child shards have some indices in S3 + workload.write_rows(100, upload=False) + for shard in shards: + ps = env.get_tenant_pageserver(shard) + log.info(f"Waiting for shard {shard} on pageserver {ps.id}") + ps.http_client().timeline_checkpoint( + shard, timeline_id, compact=False, wait_until_uploaded=True + ) + + # The timeline still exists in child shards and they reference its layers, so scrubbing + # now shouldn't delete anything. + gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=0, mode="full") + assert gc_summary["remote_storage_errors"] == 0 + assert gc_summary["indices_deleted"] == 0 + assert gc_summary["ancestor_layers_deleted"] == 0 + + # Delete the timeline + env.storage_controller.pageserver_api().timeline_delete(tenant_id, timeline_id) + + # Subsequently doing physical GC should clean up the ancestor layers + gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=0, mode="full") + assert gc_summary["remote_storage_errors"] == 0 + assert gc_summary["indices_deleted"] == 0 + assert gc_summary["ancestor_layers_deleted"] > 0 + + def test_scrubber_physical_gc_ancestors_split(neon_env_builder: NeonEnvBuilder): """ Exercise ancestor GC while a tenant is partly split: this test ensures that if we have some child shards diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index dbd0e6428b..7bbe834c8c 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit dbd0e6428b9274d72a10ac29bd3e3162faf109d4 +Subproject commit 7bbe834c8c2dc37802eca8484311599bc47341f6 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 035b73a9c5..9eba7dd382 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 035b73a9c5998f9a0ef35cc8df1bae680bf770fc +Subproject commit 9eba7dd382606ffca43aca865f337ec21bcdac73 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index b39f316137..5377f5ed72 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit b39f316137fdd29e2da15d2af2fdd1cfd18163be +Subproject commit 5377f5ed7290af45b7cb6b0d98d43cbf4a4e77f3 diff --git a/vendor/revisions.json b/vendor/revisions.json index eeebd646f5..570dfc1550 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,5 @@ { - "v16": ["16.3", "b39f316137fdd29e2da15d2af2fdd1cfd18163be"], - "v15": ["15.7", "035b73a9c5998f9a0ef35cc8df1bae680bf770fc"], - "v14": ["14.12", "dbd0e6428b9274d72a10ac29bd3e3162faf109d4"] + "v16": ["16.3", "5377f5ed7290af45b7cb6b0d98d43cbf4a4e77f3"], + "v15": ["15.7", "9eba7dd382606ffca43aca865f337ec21bcdac73"], + "v14": ["14.12", "7bbe834c8c2dc37802eca8484311599bc47341f6"] }