diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 897e1a7aad..471dc68df9 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -623,51 +623,6 @@ jobs: - name: Cleanup ECR folder run: rm -rf ~/.ecr - - neon-image-depot: - # For testing this will run side-by-side for a few merges. - # This action is not really optimized yet, but gets the job done - runs-on: [ self-hosted, gen3, large ] - needs: [ tag ] - container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned - permissions: - contents: read - id-token: write - - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - submodules: true - fetch-depth: 0 - - - name: Setup go - uses: actions/setup-go@v3 - with: - go-version: '1.19' - - - name: Set up Depot CLI - uses: depot/setup-action@v1 - - - name: Install Crane & ECR helper - run: go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0 - - - name: Configure ECR login - run: | - mkdir /github/home/.docker/ - echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json - - - name: Build and push - uses: depot/build-push-action@v1 - with: - # if no depot.json file is at the root of your repo, you must specify the project id - project: nrdv0s4kcs - push: true - tags: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:depot-${{needs.tag.outputs.build-tag}} - build-args: | - GIT_VERSION=${{ github.sha }} - REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com - compute-tools-image: runs-on: [ self-hosted, gen3, large ] needs: [ tag ] diff --git a/Cargo.lock b/Cargo.lock index 6856b9e3ac..71a6699c50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2770,7 +2770,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "bytes", "fallible-iterator", @@ -2783,7 +2783,7 @@ dependencies = [ [[package]] name = "postgres-native-tls" version = "0.5.0" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "native-tls", "tokio", @@ -2794,7 +2794,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "base64 0.20.0", "byteorder", @@ -2812,7 +2812,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "bytes", "fallible-iterator", @@ -4272,7 +4272,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index dc34705f8d..551a9dc783 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,11 +140,11 @@ env_logger = "0.10" log = "0.4" ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } -postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } +postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } +postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" } ## Other git libraries @@ -180,7 +180,7 @@ tonic-build = "0.9" # This is only needed for proxy's tests. # TODO: we should probably fork `tokio-postgres-rustls` instead. -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } # Changes the MAX_THREADS limit from 4096 to 32768. # This is a temporary workaround for using tracing from many threads in safekeepers code, diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index c2ad30f86f..b3f0e9ba43 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -148,4 +148,14 @@ mod tests { let file = File::open("tests/cluster_spec.json").unwrap(); let _spec: ComputeSpec = serde_json::from_reader(file).unwrap(); } + + #[test] + fn parse_unknown_fields() { + // Forward compatibility test + let file = File::open("tests/cluster_spec.json").unwrap(); + let mut json: serde_json::Value = serde_json::from_reader(file).unwrap(); + let ob = json.as_object_mut().unwrap(); + ob.insert("unknown_field_123123123".into(), "hello".into()); + let _spec: ComputeSpec = serde_json::from_value(json).unwrap(); + } } diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 38e1bf00f8..dafb6dcb45 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -34,6 +34,8 @@ use crate::{ Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR, }; +const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000; + pub(super) mod metrics { use metrics::{register_int_counter_vec, IntCounterVec}; use once_cell::sync::Lazy; @@ -424,17 +426,33 @@ impl RemoteStorage for S3Bucket { delete_objects.push(obj_id); } - metrics::inc_delete_objects(paths.len() as u64); - self.client - .delete_objects() - .bucket(self.bucket_name.clone()) - .delete(Delete::builder().set_objects(Some(delete_objects)).build()) - .send() - .await - .map_err(|e| { - metrics::inc_delete_objects_fail(paths.len() as u64); - e - })?; + for chunk in delete_objects.chunks(MAX_DELETE_OBJECTS_REQUEST_SIZE) { + metrics::inc_delete_objects(chunk.len() as u64); + + let resp = self + .client + .delete_objects() + .bucket(self.bucket_name.clone()) + .delete(Delete::builder().set_objects(Some(chunk.to_vec())).build()) + .send() + .await; + + match resp { + Ok(resp) => { + if let Some(errors) = resp.errors { + metrics::inc_delete_objects_fail(errors.len() as u64); + return Err(anyhow::format_err!( + "Failed to delete {} objects", + errors.len() + )); + } + } + Err(e) => { + metrics::inc_delete_objects_fail(chunk.len() as u64); + return Err(e.into()); + } + } + } Ok(()) } diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 2f341bb29d..741c18bf6f 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -24,6 +24,7 @@ enum RemoteOp { Upload(RemotePath), Download(RemotePath), Delete(RemotePath), + DeleteObjects(Vec), } impl UnreliableWrapper { @@ -121,8 +122,18 @@ impl RemoteStorage for UnreliableWrapper { } async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { + self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?; + let mut error_counter = 0; for path in paths { - self.delete(path).await? + if (self.delete(path).await).is_err() { + error_counter += 1; + } + } + if error_counter > 0 { + return Err(anyhow::anyhow!( + "failed to delete {} objects", + error_counter + )); } Ok(()) } diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index ce5f81c44b..61cbd5066f 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -516,7 +516,7 @@ async fn collect_eviction_candidates( if !tl.is_active() { continue; } - let info = tl.get_local_layers_for_disk_usage_eviction(); + let info = tl.get_local_layers_for_disk_usage_eviction().await; debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); tenant_candidates.extend( info.resident_layers diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index ac230e5f4a..fc8da70cc0 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -215,7 +215,7 @@ async fn build_timeline_info( ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); - let mut info = build_timeline_info_common(timeline, ctx)?; + let mut info = build_timeline_info_common(timeline, ctx).await?; if include_non_incremental_logical_size { // XXX we should be using spawn_ondemand_logical_size_calculation here. // Otherwise, if someone deletes the timeline / detaches the tenant while @@ -233,7 +233,7 @@ async fn build_timeline_info( Ok(info) } -fn build_timeline_info_common( +async fn build_timeline_info_common( timeline: &Arc, ctx: &RequestContext, ) -> anyhow::Result { @@ -264,7 +264,7 @@ fn build_timeline_info_common( None } }; - let current_physical_size = Some(timeline.layer_size_sum()); + let current_physical_size = Some(timeline.layer_size_sum().await); let state = timeline.current_state(); let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0)); @@ -330,6 +330,7 @@ async fn timeline_create_handler( Ok(Some(new_timeline)) => { // Created. Construct a TimelineInfo for it. let timeline_info = build_timeline_info_common(&new_timeline, &ctx) + .await .map_err(ApiError::InternalServerError)?; json_response(StatusCode::CREATED, timeline_info) } @@ -591,7 +592,7 @@ async fn tenant_status( // Calculate total physical size of all timelines let mut current_physical_size = 0; for timeline in tenant.list_timelines().iter() { - current_physical_size += timeline.layer_size_sum(); + current_physical_size += timeline.layer_size_sum().await; } let state = tenant.current_state(); @@ -701,7 +702,7 @@ async fn layer_map_info_handler( check_permission(&request, Some(tenant_id))?; let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; - let layer_map_info = timeline.layer_map_info(reset); + let layer_map_info = timeline.layer_map_info(reset).await; json_response(StatusCode::OK, layer_map_info) } diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index cc444c479a..43d06db6d8 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1,4 +1,3 @@ -use metrics::core::{AtomicU64, GenericCounter}; use metrics::{ register_counter_vec, register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec, @@ -95,21 +94,19 @@ static READ_NUM_FS_LAYERS: Lazy = Lazy::new(|| { }); // Metrics collected on operations on the storage repository. -static RECONSTRUCT_TIME: Lazy = Lazy::new(|| { - register_histogram_vec!( +pub static RECONSTRUCT_TIME: Lazy = Lazy::new(|| { + register_histogram!( "pageserver_getpage_reconstruct_seconds", - "Time spent in reconstruct_value", - &["tenant_id", "timeline_id"], + "Time spent in reconstruct_value (reconstruct a page from deltas)", CRITICAL_OP_BUCKETS.into(), ) .expect("failed to define a metric") }); -static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy = Lazy::new(|| { + register_int_counter!( "pageserver_materialized_cache_hits_direct_total", "Number of cache hits from materialized page cache without redo", - &["tenant_id", "timeline_id"] ) .expect("failed to define a metric") }); @@ -124,11 +121,10 @@ static GET_RECONSTRUCT_DATA_TIME: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); -static MATERIALIZED_PAGE_CACHE_HIT: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static MATERIALIZED_PAGE_CACHE_HIT: Lazy = Lazy::new(|| { + register_int_counter!( "pageserver_materialized_cache_hits_total", "Number of cache hits from materialized page cache", - &["tenant_id", "timeline_id"] ) .expect("failed to define a metric") }); @@ -752,10 +748,7 @@ impl StorageTimeMetrics { pub struct TimelineMetrics { tenant_id: String, timeline_id: String, - pub reconstruct_time_histo: Histogram, pub get_reconstruct_data_time_histo: Histogram, - pub materialized_page_cache_hit_counter: GenericCounter, - pub materialized_page_cache_hit_upon_request_counter: GenericCounter, pub flush_time_histo: StorageTimeMetrics, pub compact_time_histo: StorageTimeMetrics, pub create_images_time_histo: StorageTimeMetrics, @@ -783,15 +776,9 @@ impl TimelineMetrics { ) -> Self { let tenant_id = tenant_id.to_string(); let timeline_id = timeline_id.to_string(); - let reconstruct_time_histo = RECONSTRUCT_TIME - .get_metric_with_label_values(&[&tenant_id, &timeline_id]) - .unwrap(); let get_reconstruct_data_time_histo = GET_RECONSTRUCT_DATA_TIME .get_metric_with_label_values(&[&tenant_id, &timeline_id]) .unwrap(); - let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT - .get_metric_with_label_values(&[&tenant_id, &timeline_id]) - .unwrap(); let flush_time_histo = StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id); let compact_time_histo = @@ -833,19 +820,18 @@ impl TimelineMetrics { let read_num_fs_layers = READ_NUM_FS_LAYERS .get_metric_with_label_values(&[&tenant_id, &timeline_id]) .unwrap(); - let materialized_page_cache_hit_upon_request_counter = MATERIALIZED_PAGE_CACHE_HIT_DIRECT - .get_metric_with_label_values(&[&tenant_id, &timeline_id]) - .unwrap(); let evictions_with_low_residence_duration = evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id); + // TODO(chi): remove this once we remove Lazy for all metrics. Otherwise this will not appear in the exporter + // and integration test will error. + MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get(); + MATERIALIZED_PAGE_CACHE_HIT.get(); + TimelineMetrics { tenant_id, timeline_id, - reconstruct_time_histo, get_reconstruct_data_time_histo, - materialized_page_cache_hit_counter, - materialized_page_cache_hit_upon_request_counter, flush_time_histo, compact_time_histo, create_images_time_histo, @@ -872,10 +858,7 @@ impl Drop for TimelineMetrics { fn drop(&mut self) { let tenant_id = &self.tenant_id; let timeline_id = &self.timeline_id; - let _ = RECONSTRUCT_TIME.remove_label_values(&[tenant_id, timeline_id]); let _ = GET_RECONSTRUCT_DATA_TIME.remove_label_values(&[tenant_id, timeline_id]); - let _ = MATERIALIZED_PAGE_CACHE_HIT.remove_label_values(&[tenant_id, timeline_id]); - let _ = MATERIALIZED_PAGE_CACHE_HIT_DIRECT.remove_label_values(&[tenant_id, timeline_id]); let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]); let _ = WAIT_LSN_TIME.remove_label_values(&[tenant_id, timeline_id]); let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 51cac43f50..86c84ec82f 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1133,16 +1133,17 @@ impl<'a> DatadirModification<'a> { let writer = self.tline.writer().await; // Flush relation and SLRU data blocks, keep metadata. - let mut result: anyhow::Result<()> = Ok(()); - self.pending_updates.retain(|&key, value| { - if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) { - result = writer.put(key, self.lsn, value); - false + let mut retained_pending_updates = HashMap::new(); + for (key, value) in self.pending_updates.drain() { + if is_rel_block_key(key) || is_slru_block_key(key) { + // This bails out on first error without modifying pending_updates. + // That's Ok, cf this function's doc comment. + writer.put(key, self.lsn, &value).await?; } else { - true + retained_pending_updates.insert(key, value); } - }); - result?; + } + self.pending_updates.extend(retained_pending_updates); if pending_nblocks != 0 { writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); @@ -1164,10 +1165,10 @@ impl<'a> DatadirModification<'a> { self.pending_nblocks = 0; for (key, value) in self.pending_updates.drain() { - writer.put(key, lsn, &value)?; + writer.put(key, lsn, &value).await?; } for key_range in self.pending_deletions.drain(..) { - writer.delete(key_range, lsn)?; + writer.delete(key_range, lsn).await?; } writer.finish_write(lsn); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 32390c06cf..7fdd047c96 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -473,6 +473,14 @@ pub(crate) enum ShutdownError { AlreadyStopping, } +struct DeletionGuard(OwnedMutexGuard); + +impl DeletionGuard { + fn is_deleted(&self) -> bool { + *self.0 + } +} + impl Tenant { /// Yet another helper for timeline initialization. /// Contains the common part of `load_local_timeline` and `load_remote_timeline`. @@ -519,6 +527,7 @@ impl Tenant { ); timeline .load_layer_map(new_disk_consistent_lsn) + .await .with_context(|| { format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}") })?; @@ -560,7 +569,7 @@ impl Tenant { || timeline .layers .read() - .unwrap() + .await .iter_historic_layers() .next() .is_some(), @@ -1137,7 +1146,11 @@ impl Tenant { ) .context("create_timeline_struct")?; - let guard = Arc::clone(&timeline.delete_lock).lock_owned().await; + let guard = DeletionGuard( + Arc::clone(&timeline.delete_lock) + .try_lock_owned() + .expect("cannot happen because we're the only owner"), + ); // Note: here we even skip populating layer map. Timeline is essentially uninitialized. // RemoteTimelineClient is the only functioning part. @@ -1461,7 +1474,13 @@ impl Tenant { let timelines = self.timelines.lock().unwrap(); let timelines_to_compact = timelines .iter() - .map(|(timeline_id, timeline)| (*timeline_id, timeline.clone())) + .filter_map(|(timeline_id, timeline)| { + if timeline.is_active() { + Some((*timeline_id, timeline.clone())) + } else { + None + } + }) .collect::>(); drop(timelines); timelines_to_compact @@ -1542,6 +1561,7 @@ impl Tenant { &self, timeline_id: TimelineId, timeline: Arc, + guard: DeletionGuard, ) -> anyhow::Result<()> { { // Grab the layer_removal_cs lock, and actually perform the deletion. @@ -1614,6 +1634,25 @@ impl Tenant { Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))? }); + if let Some(remote_client) = &timeline.remote_client { + remote_client.delete_all().await.context("delete_all")? + }; + + // Have a failpoint that can use the `pause` failpoint action. + // We don't want to block the executor thread, hence, spawn_blocking + await. + if cfg!(feature = "testing") { + tokio::task::spawn_blocking({ + let current = tracing::Span::current(); + move || { + let _entered = current.entered(); + tracing::info!("at failpoint in_progress_delete"); + fail::fail_point!("in_progress_delete"); + } + }) + .await + .expect("spawn_blocking"); + } + { // Remove the timeline from the map. let mut timelines = self.timelines.lock().unwrap(); @@ -1634,12 +1673,7 @@ impl Tenant { drop(timelines); } - let remote_client = match &timeline.remote_client { - Some(remote_client) => remote_client, - None => return Ok(()), - }; - - remote_client.delete_all().await?; + drop(guard); Ok(()) } @@ -1687,23 +1721,18 @@ impl Tenant { timeline = Arc::clone(timeline_entry.get()); // Prevent two tasks from trying to delete the timeline at the same time. - // - // XXX: We should perhaps return an HTTP "202 Accepted" to signal that the caller - // needs to poll until the operation has finished. But for now, we return an - // error, because the control plane knows to retry errors. - delete_lock_guard = - Arc::clone(&timeline.delete_lock) - .try_lock_owned() - .map_err(|_| { + DeletionGuard(Arc::clone(&timeline.delete_lock).try_lock_owned().map_err( + |_| { DeleteTimelineError::Other(anyhow::anyhow!( "timeline deletion is already in progress" )) - })?; + }, + )?); // If another task finished the deletion just before we acquired the lock, // return success. - if *delete_lock_guard { + if delete_lock_guard.is_deleted() { return Ok(()); } @@ -1777,7 +1806,7 @@ impl Tenant { self: Arc, timeline_id: TimelineId, timeline: Arc, - _guard: OwnedMutexGuard, + guard: DeletionGuard, ) { let tenant_id = self.tenant_id; let timeline_clone = Arc::clone(&timeline); @@ -1790,7 +1819,7 @@ impl Tenant { "timeline_delete", false, async move { - if let Err(err) = self.delete_timeline(timeline_id, timeline).await { + if let Err(err) = self.delete_timeline(timeline_id, timeline, guard).await { error!("Error: {err:#}"); timeline_clone.set_broken(err.to_string()) }; @@ -3582,12 +3611,16 @@ mod tests { .await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; + writer + .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) + .await?; writer.finish_write(Lsn(0x10)); drop(writer); let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; + writer + .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) + .await?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -3656,13 +3689,21 @@ mod tests { let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap(); // Insert a value on the timeline - writer.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))?; - writer.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))?; + writer + .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20")) + .await?; + writer + .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20")) + .await?; writer.finish_write(Lsn(0x20)); - writer.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))?; + writer + .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30")) + .await?; writer.finish_write(Lsn(0x30)); - writer.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))?; + writer + .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40")) + .await?; writer.finish_write(Lsn(0x40)); //assert_current_logical_size(&tline, Lsn(0x40)); @@ -3675,7 +3716,9 @@ mod tests { .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); let new_writer = newtline.writer().await; - new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; + new_writer + .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40")) + .await?; new_writer.finish_write(Lsn(0x40)); // Check page contents on both branches @@ -3703,36 +3746,44 @@ mod tests { { let writer = tline.writer().await; // Create a relation on the timeline - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; } tline.freeze_and_flush().await?; { let writer = tline.writer().await; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); } tline.freeze_and_flush().await @@ -4048,7 +4099,9 @@ mod tests { .await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; + writer + .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) + .await?; writer.finish_write(Lsn(0x10)); drop(writer); @@ -4056,7 +4109,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; + writer + .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) + .await?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -4064,7 +4119,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?; + writer + .put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30"))) + .await?; writer.finish_write(Lsn(0x30)); drop(writer); @@ -4072,7 +4129,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?; + writer + .put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40"))) + .await?; writer.finish_write(Lsn(0x40)); drop(writer); @@ -4124,11 +4183,13 @@ mod tests { for _ in 0..10000 { test_key.field6 = blknum; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); drop(writer); @@ -4174,11 +4235,13 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); updated[blknum] = lsn; drop(writer); @@ -4192,11 +4255,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); drop(writer); updated[blknum] = lsn; @@ -4249,11 +4314,13 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); updated[blknum] = lsn; drop(writer); @@ -4275,11 +4342,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; println!("updating {} at {}", blknum, lsn); writer.finish_write(lsn); drop(writer); @@ -4341,11 +4410,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))), + ) + .await?; println!("updating [{}][{}] at {}", idx, blknum, lsn); writer.finish_write(lsn); drop(writer); diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 2c84c59dcb..8db2bc4eb2 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -753,22 +753,18 @@ impl RemoteTimelineClient { // Have a failpoint that can use the `pause` failpoint action. // We don't want to block the executor thread, hence, spawn_blocking + await. - #[cfg(feature = "testing")] - tokio::task::spawn_blocking({ - let current = tracing::Span::current(); - move || { - let _entered = current.entered(); - tracing::info!( - "at failpoint persist_index_part_with_deleted_flag_after_set_before_upload_pause" - ); - fail::fail_point!( - "persist_index_part_with_deleted_flag_after_set_before_upload_pause" - ); - } - }) - .await - .expect("spawn_blocking"); - + if cfg!(feature = "testing") { + tokio::task::spawn_blocking({ + let current = tracing::Span::current(); + move || { + let _entered = current.entered(); + tracing::info!("at failpoint persist_deleted_index_part"); + fail::fail_point!("persist_deleted_index_part"); + } + }) + .await + .expect("spawn_blocking"); + } upload::upload_index_part( self.conf, &self.storage_impl, diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 6ac4fd9470..0af3d4ce39 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -389,10 +389,10 @@ pub trait Layer: std::fmt::Debug + Send + Sync { } /// Returned by [`Layer::iter`] -pub type LayerIter<'i> = Box> + 'i>; +pub type LayerIter<'i> = Box> + 'i + Send>; /// Returned by [`Layer::key_iter`] -pub type LayerKeyIter<'i> = Box + 'i>; +pub type LayerKeyIter<'i> = Box + 'i + Send>; /// A Layer contains all data in a "rectangle" consisting of a range of keys and /// range of LSNs. diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index c453683fea..78bcfdafc0 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -304,7 +304,7 @@ impl InMemoryLayer { Ok(()) } - pub fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { + pub async fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { // TODO: Currently, we just leak the storage for any deleted keys Ok(()) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b8a7cdacb7..d42fdf5e55 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -47,7 +47,10 @@ use crate::tenant::{ use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum}; -use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS}; +use crate::metrics::{ + TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT, + RECONSTRUCT_TIME, UNEXPECTED_ONDEMAND_DOWNLOADS, +}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError}; @@ -125,7 +128,7 @@ pub struct Timeline { pub pg_version: u32, - pub(super) layers: RwLock>, + pub(crate) layers: tokio::sync::RwLock>, /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. @@ -539,9 +542,7 @@ impl Timeline { match cached_lsn.cmp(&lsn) { Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check Ordering::Equal => { - self.metrics - .materialized_page_cache_hit_upon_request_counter - .inc(); + MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc(); return Ok(cached_img); // exact LSN match, return the image } Ordering::Greater => { @@ -563,8 +564,7 @@ impl Timeline { .await?; timer.stop_and_record(); - self.metrics - .reconstruct_time_histo + RECONSTRUCT_TIME .observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state)) } @@ -597,8 +597,8 @@ impl Timeline { /// The sum of the file size of all historic layers in the layer map. /// This method makes no distinction between local and remote layers. /// Hence, the result **does not represent local filesystem usage**. - pub fn layer_size_sum(&self) -> u64 { - let layer_map = self.layers.read().unwrap(); + pub async fn layer_size_sum(&self) -> u64 { + let layer_map = self.layers.read().await; let mut size = 0; for l in layer_map.iter_historic_layers() { size += l.file_size(); @@ -908,7 +908,7 @@ impl Timeline { pub async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); let open_layer_size = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let Some(open_layer) = layers.open_layer.as_ref() else { return Ok(()); }; @@ -1038,8 +1038,8 @@ impl Timeline { } } - pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { - let layer_map = self.layers.read().unwrap(); + pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { + let layer_map = self.layers.read().await; let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { in_memory_layers.push(open_layer.info()); @@ -1061,7 +1061,7 @@ impl Timeline { #[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))] pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) }; if self.remote_client.is_none() { return Ok(Some(false)); @@ -1074,7 +1074,7 @@ impl Timeline { /// Like [`evict_layer_batch`], but for just one layer. /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; let remote_client = self .remote_client .as_ref() @@ -1159,7 +1159,7 @@ impl Timeline { } // start the batch update - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write().await; let mut batch_updates = layer_map.batch_update(); let mut results = Vec::with_capacity(layers_to_evict.len()); @@ -1417,7 +1417,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: RwLock::new(LayerMap::default()), + layers: tokio::sync::RwLock::new(LayerMap::default()), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -1598,15 +1598,17 @@ impl Timeline { /// Initialize with an empty layer map. Used when creating a new timeline. /// pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) { - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.try_write().expect( + "in the context where we call this function, no other task has access to the object", + ); layers.next_open_layer_at = Some(Lsn(start_lsn.0)); } /// /// Scan the timeline directory to populate the layer map. /// - pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { - let mut layers = self.layers.write().unwrap(); + pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let mut num_layers = 0; @@ -1735,7 +1737,7 @@ impl Timeline { // We're holding a layer map lock for a while but this // method is only called during init so it's fine. - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write().await; let mut updates = layer_map.batch_update(); for remote_layer_name in &index_part.timeline_layers { let local_layer = local_only_layers.remove(remote_layer_name); @@ -1888,7 +1890,7 @@ impl Timeline { let local_layers = self .layers .read() - .unwrap() + .await .iter_historic_layers() .map(|l| (l.filename(), l)) .collect::>(); @@ -2261,8 +2263,8 @@ impl Timeline { } } - fn find_layer(&self, layer_file_name: &str) -> Option> { - for historic_layer in self.layers.read().unwrap().iter_historic_layers() { + async fn find_layer(&self, layer_file_name: &str) -> Option> { + for historic_layer in self.layers.read().await.iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); if layer_file_name == historic_layer_name { return Some(historic_layer); @@ -2385,7 +2387,7 @@ impl Timeline { ValueReconstructResult::Continue => { // If we reached an earlier cached page image, we're done. if cont_lsn == cached_lsn + 1 { - self.metrics.materialized_page_cache_hit_counter.inc_by(1); + MATERIALIZED_PAGE_CACHE_HIT.inc_by(1); return Ok(()); } if prev_lsn <= cont_lsn { @@ -2479,7 +2481,7 @@ impl Timeline { #[allow(clippy::never_loop)] // see comment at bottom of this loop 'layer_map_search: loop { let remote_layer = { - let layers = timeline.layers.read().unwrap(); + let layers = timeline.layers.read().await; // Check the open and frozen in-memory layers first, in order from newest // to oldest. @@ -2661,8 +2663,8 @@ impl Timeline { /// /// Get a handle to the latest layer for appending. /// - fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { - let mut layers = self.layers.write().unwrap(); + async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { + let mut layers = self.layers.write().await; ensure!(lsn.is_aligned()); @@ -2711,17 +2713,16 @@ impl Timeline { Ok(layer) } - fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { + async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { //info!("PUT: key {} at {}", key, lsn); - let layer = self.get_layer_for_write(lsn)?; + let layer = self.get_layer_for_write(lsn).await?; layer.put_value(key, lsn, val)?; Ok(()) } - fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - let layer = self.get_layer_for_write(lsn)?; - layer.put_tombstone(key_range, lsn)?; - + async fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { + let layer = self.get_layer_for_write(lsn).await?; + layer.put_tombstone(key_range, lsn).await?; Ok(()) } @@ -2740,7 +2741,7 @@ impl Timeline { } else { Some(self.write_lock.lock().await) }; - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; if let Some(open_layer) = &layers.open_layer { let open_layer_rc = Arc::clone(open_layer); // Does this layer need freezing? @@ -2778,7 +2779,7 @@ impl Timeline { let flush_counter = *layer_flush_start_rx.borrow(); let result = loop { let layer_to_flush = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; layers.frozen_layers.front().cloned() // drop 'layers' lock to allow concurrent reads and writes }; @@ -2894,16 +2895,7 @@ impl Timeline { } } // normal case, write out a L0 delta layer file. - let this = self.clone(); - let frozen_layer = frozen_layer.clone(); - let span = tracing::info_span!("blocking"); - let (delta_path, metadata) = tokio::task::spawn_blocking(move || { - let _g = span.entered(); - this.create_delta_layer(&frozen_layer) - }) - .await - .context("create_delta_layer spawn_blocking") - .and_then(|res| res)?; + let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?; HashMap::from([(delta_path, metadata)]) }; @@ -2912,7 +2904,7 @@ impl Timeline { // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. { - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let l = layers.frozen_layers.pop_front(); // Only one thread may call this function at a time (for this @@ -3006,34 +2998,50 @@ impl Timeline { } // Write out the given frozen in-memory layer as a new L0 delta file - fn create_delta_layer( + async fn create_delta_layer( self: &Arc, - frozen_layer: &InMemoryLayer, + frozen_layer: &Arc, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { - // Write it out - let new_delta = frozen_layer.write_to_disk()?; - let new_delta_path = new_delta.path(); - let new_delta_filename = new_delta.filename(); + let span = tracing::info_span!("blocking"); + let (new_delta, sz): (DeltaLayer, _) = tokio::task::spawn_blocking({ + let _g = span.entered(); + let self_clone = Arc::clone(self); + let frozen_layer = Arc::clone(frozen_layer); + move || { + // Write it out + let new_delta = frozen_layer.write_to_disk()?; + let new_delta_path = new_delta.path(); - // Sync it to disk. - // - // We must also fsync the timeline dir to ensure the directory entries for - // new layer files are durable - // - // TODO: If we're running inside 'flush_frozen_layers' and there are multiple - // files to flush, it might be better to first write them all, and then fsync - // them all in parallel. + // Sync it to disk. + // + // We must also fsync the timeline dir to ensure the directory entries for + // new layer files are durable + // + // TODO: If we're running inside 'flush_frozen_layers' and there are multiple + // files to flush, it might be better to first write them all, and then fsync + // them all in parallel. - // First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace - // this with a single fsync in future refactors. - par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; - // Then sync the parent directory. - par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)]) - .context("fsync of timeline dir")?; + // First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace + // this with a single fsync in future refactors. + par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; + // Then sync the parent directory. + par_fsync::par_fsync(&[self_clone + .conf + .timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)]) + .context("fsync of timeline dir")?; + + let sz = new_delta_path.metadata()?.len(); + + anyhow::Ok((new_delta, sz)) + } + }) + .await + .context("spawn_blocking")??; + let new_delta_name = new_delta.filename(); // Add it to the layer map let l = Arc::new(new_delta); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut batch_updates = layers.batch_update(); l.access_stats().record_residence_event( &batch_updates, @@ -3044,14 +3052,12 @@ impl Timeline { batch_updates.flush(); // update the timeline's physical size - let sz = new_delta_path.metadata()?.len(); - self.metrics.resident_physical_size_gauge.add(sz); // update metrics self.metrics.num_persistent_files_created.inc_by(1); self.metrics.persistent_bytes_written.inc_by(sz); - Ok((new_delta_filename, LayerFileMetadata::new(sz))) + Ok((new_delta_name, LayerFileMetadata::new(sz))) } async fn repartition( @@ -3085,10 +3091,14 @@ impl Timeline { } // Is it time to create a new image layer for the given partition? - fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result { + async fn time_for_new_image_layer( + &self, + partition: &KeySpace, + lsn: Lsn, + ) -> anyhow::Result { let threshold = self.get_image_creation_threshold(); - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut max_deltas = 0; { @@ -3183,7 +3193,7 @@ impl Timeline { for partition in partitioning.parts.iter() { let img_range = start..partition.ranges.last().unwrap().end; start = img_range.end; - if force || self.time_for_new_image_layer(partition, lsn)? { + if force || self.time_for_new_image_layer(partition, lsn).await? { let mut image_layer_writer = ImageLayerWriter::new( self.conf, self.timeline_id, @@ -3266,7 +3276,7 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); @@ -3328,13 +3338,13 @@ impl Timeline { /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the /// start of level0 files compaction, the on-demand download should be revisited as well. - fn compact_level0_phase1( + async fn compact_level0_phase1( &self, _layer_removal_cs: Arc>, target_file_size: u64, ctx: &RequestContext, ) -> Result { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut level0_deltas = layers.get_level0_deltas()?; drop(layers); @@ -3457,7 +3467,7 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); - let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here? + let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here? let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? @@ -3671,21 +3681,12 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result<(), CompactionError> { - let this = self.clone(); - let ctx_inner = ctx.clone(); - let layer_removal_cs_inner = layer_removal_cs.clone(); - let span = tracing::info_span!("blocking"); let CompactLevel0Phase1Result { new_layers, deltas_to_compact, - } = tokio::task::spawn_blocking(move || { - let _g = span.entered(); - this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner) - }) - .await - .context("compact_level0_phase1 spawn_blocking") - .map_err(CompactionError::Other) - .and_then(|res| res)?; + } = self + .compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx) + .await?; if new_layers.is_empty() && deltas_to_compact.is_empty() { // nothing to do @@ -3703,7 +3704,7 @@ impl Timeline { .context("wait for layer upload ops to complete")?; } - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); for l in new_layers { @@ -3790,6 +3791,7 @@ impl Timeline { /// for example. The caller should hold `Tenant::gc_cs` lock to ensure /// that. /// + #[instrument(skip_all, fields(timline_id=%self.timeline_id))] pub(super) async fn update_gc_info( &self, retain_lsns: Vec, @@ -3962,7 +3964,7 @@ impl Timeline { // 4. newer on-disk image layers cover the layer's whole key range // // TODO holding a write lock is too agressive and avoidable - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -4262,7 +4264,7 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let mut layers = self_clone.layers.write().unwrap(); + let mut layers = self_clone.layers.write().await; let mut updates = layers.batch_update(); let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); { @@ -4420,7 +4422,7 @@ impl Timeline { ) { let mut downloads = Vec::new(); { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; layers .iter_historic_layers() .filter_map(|l| l.downcast_remote_layer()) @@ -4522,8 +4524,8 @@ impl LocalLayerInfoForDiskUsageEviction { } impl Timeline { - pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { - let layers = self.layers.read().unwrap(); + pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { + let layers = self.layers.read().await; let mut max_layer_size: Option = None; let mut resident_layers = Vec::new(); @@ -4611,12 +4613,12 @@ impl<'a> TimelineWriter<'a> { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { - self.tl.put_value(key, lsn, value) + pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { + self.tl.put_value(key, lsn, value).await } - pub fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - self.tl.put_tombstone(key_range, lsn) + pub async fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { + self.tl.put_tombstone(key_range, lsn).await } /// Track the end of the latest digested WAL record. diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 1040dff63d..80c5210211 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -197,7 +197,7 @@ impl Timeline { // We don't want to hold the layer map lock during eviction. // So, we just need to deal with this. let candidates: Vec> = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { if hist_layer.is_remote_layer() { diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 64d980d2e4..8d82de6dc4 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -257,7 +257,7 @@ nwp_register_gucs(void) "Walproposer reconnects to offline safekeepers once in this interval.", NULL, &wal_acceptor_reconnect_timeout, - 5000, 0, INT_MAX, /* default, min, max */ + 1000, 0, INT_MAX, /* default, min, max */ PGC_SIGHUP, /* context */ GUC_UNIT_MS, /* flags */ NULL, NULL, NULL); diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs index 050f00dd7d..e8ad2d04f3 100644 --- a/proxy/src/http/sql_over_http.rs +++ b/proxy/src/http/sql_over_http.rs @@ -1,5 +1,6 @@ use futures::pin_mut; use futures::StreamExt; +use futures::TryFutureExt; use hyper::body::HttpBody; use hyper::http::HeaderName; use hyper::http::HeaderValue; @@ -11,8 +12,13 @@ use serde_json::Value; use tokio_postgres::types::Kind; use tokio_postgres::types::Type; use tokio_postgres::Row; +use tracing::error; +use tracing::info; +use tracing::instrument; use url::Url; +use crate::proxy::invalidate_cache; +use crate::proxy::NUM_RETRIES_WAKE_COMPUTE; use crate::{auth, config::ProxyConfig, console}; #[derive(serde::Deserialize)] @@ -90,10 +96,17 @@ fn json_array_to_pg_array(value: &Value) -> Result, serde_json::E } } +struct ConnInfo { + username: String, + dbname: String, + hostname: String, + password: String, +} + fn get_conn_info( headers: &HeaderMap, sni_hostname: Option, -) -> Result<(String, String, String, String), anyhow::Error> { +) -> Result { let connection_string = headers .get("Neon-Connection-String") .ok_or(anyhow::anyhow!("missing connection string"))? @@ -146,12 +159,12 @@ fn get_conn_info( } } - Ok(( - username.to_owned(), - dbname.to_owned(), - hostname.to_owned(), - password.to_owned(), - )) + Ok(ConnInfo { + username: username.to_owned(), + dbname: dbname.to_owned(), + hostname: hostname.to_owned(), + password: password.to_owned(), + }) } // TODO: return different http error codes @@ -164,10 +177,10 @@ pub async fn handle( // Determine the destination and connection params // let headers = request.headers(); - let (username, dbname, hostname, password) = get_conn_info(headers, sni_hostname)?; + let conn_info = get_conn_info(headers, sni_hostname)?; let credential_params = StartupMessageParams::new([ - ("user", &username), - ("database", &dbname), + ("user", &conn_info.username), + ("database", &conn_info.dbname), ("application_name", APP_NAME), ]); @@ -186,21 +199,20 @@ pub async fn handle( let creds = config .auth_backend .as_ref() - .map(|_| auth::ClientCredentials::parse(&credential_params, Some(&hostname), common_names)) + .map(|_| { + auth::ClientCredentials::parse( + &credential_params, + Some(&conn_info.hostname), + common_names, + ) + }) .transpose()?; let extra = console::ConsoleReqExtra { session_id: uuid::Uuid::new_v4(), application_name: Some(APP_NAME), }; - let node = creds.wake_compute(&extra).await?.expect("msg"); - let conf = node.value.config; - let port = *conf.get_ports().first().expect("no port"); - let host = match conf.get_hosts().first().expect("no host") { - tokio_postgres::config::Host::Tcp(host) => host, - tokio_postgres::config::Host::Unix(_) => { - return Err(anyhow::anyhow!("unix socket is not supported")); - } - }; + + let mut node_info = creds.wake_compute(&extra).await?.expect("msg"); let request_content_length = match request.body().size_hint().upper() { Some(v) => v, @@ -220,28 +232,10 @@ pub async fn handle( let QueryData { query, params } = serde_json::from_slice(&body)?; let query_params = json_to_pg_text(params)?; - // - // Connenct to the destination - // - let (client, connection) = tokio_postgres::Config::new() - .host(host) - .port(port) - .user(&username) - .password(&password) - .dbname(&dbname) - .max_backend_message_size(MAX_RESPONSE_SIZE) - .connect(tokio_postgres::NoTls) - .await?; - - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - // // Now execute the query and return the result // + let client = connect_to_compute(&mut node_info, &extra, &creds, &conn_info).await?; let row_stream = client.query_raw_txt(query, query_params).await?; // Manually drain the stream into a vector to leave row_stream hanging @@ -280,6 +274,11 @@ pub async fn handle( json!({ "name": Value::String(c.name().to_owned()), "dataTypeID": Value::Number(c.type_().oid().into()), + "tableID": c.table_oid(), + "columnID": c.column_id(), + "dataTypeSize": c.type_size(), + "dataTypeModifier": c.type_modifier(), + "format": "text", }) }) .collect::>() @@ -303,6 +302,70 @@ pub async fn handle( })) } +/// This function is a copy of `connect_to_compute` from `src/proxy.rs` with +/// the difference that it uses `tokio_postgres` for the connection. +#[instrument(skip_all)] +async fn connect_to_compute( + node_info: &mut console::CachedNodeInfo, + extra: &console::ConsoleReqExtra<'_>, + creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, + conn_info: &ConnInfo, +) -> anyhow::Result { + let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE; + + loop { + match connect_to_compute_once(node_info, conn_info).await { + Err(e) if num_retries > 0 => { + info!("compute node's state has changed; requesting a wake-up"); + match creds.wake_compute(extra).await? { + // Update `node_info` and try one more time. + Some(new) => { + *node_info = new; + } + // Link auth doesn't work that way, so we just exit. + None => return Err(e), + } + } + other => return other, + } + + num_retries -= 1; + info!("retrying after wake-up ({num_retries} attempts left)"); + } +} + +async fn connect_to_compute_once( + node_info: &console::CachedNodeInfo, + conn_info: &ConnInfo, +) -> anyhow::Result { + let mut config = (*node_info.config).clone(); + + let (client, connection) = config + .user(&conn_info.username) + .password(&conn_info.password) + .dbname(&conn_info.dbname) + .max_backend_message_size(MAX_RESPONSE_SIZE) + .connect(tokio_postgres::NoTls) + .inspect_err(|e: &tokio_postgres::Error| { + error!( + "failed to connect to compute node hosts={:?} ports={:?}: {}", + node_info.config.get_hosts(), + node_info.config.get_ports(), + e + ); + invalidate_cache(node_info) + }) + .await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + error!("connection error: {}", e); + } + }); + + Ok(client) +} + // // Convert postgres row with text-encoded values to JSON object // diff --git a/proxy/src/http/websocket.rs b/proxy/src/http/websocket.rs index fbb602e3d2..9f467aceb7 100644 --- a/proxy/src/http/websocket.rs +++ b/proxy/src/http/websocket.rs @@ -26,7 +26,6 @@ use tls_listener::TlsListener; use tokio::{ io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}, net::TcpListener, - select, }; use tokio_util::sync::CancellationToken; use tracing::{error, info, info_span, warn, Instrument}; @@ -193,14 +192,9 @@ async fn ws_handler( // TODO: that deserves a refactor as now this function also handles http json client besides websockets. // Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead. } else if request.uri().path() == "/sql" && request.method() == Method::POST { - let result = select! { - _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => { - Err(anyhow::anyhow!("Query timed out")) - } - response = sql_over_http::handle(config, request, sni_hostname) => { - response - } - }; + let result = sql_over_http::handle(config, request, sni_hostname) + .instrument(info_span!("sql-over-http")) + .await; let status_code = match result { Ok(_) => StatusCode::OK, Err(_) => StatusCode::BAD_REQUEST, diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index cf2dd000db..8efb7005c8 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -22,7 +22,7 @@ use tracing::{error, info, warn}; use utils::measured_stream::MeasuredStream; /// Number of times we should retry the `/proxy_wake_compute` http request. -const NUM_RETRIES_WAKE_COMPUTE: usize = 1; +pub const NUM_RETRIES_WAKE_COMPUTE: usize = 1; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; @@ -283,34 +283,35 @@ async fn handshake( } } +/// If we couldn't connect, a cached connection info might be to blame +/// (e.g. the compute node's address might've changed at the wrong time). +/// Invalidate the cache entry (if any) to prevent subsequent errors. +#[tracing::instrument(name = "invalidate_cache", skip_all)] +pub fn invalidate_cache(node_info: &console::CachedNodeInfo) { + let is_cached = node_info.cached(); + if is_cached { + warn!("invalidating stalled compute node info cache entry"); + node_info.invalidate(); + } + + let label = match is_cached { + true => "compute_cached", + false => "compute_uncached", + }; + NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); +} + /// Try to connect to the compute node once. #[tracing::instrument(name = "connect_once", skip_all)] async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, ) -> Result { - // If we couldn't connect, a cached connection info might be to blame - // (e.g. the compute node's address might've changed at the wrong time). - // Invalidate the cache entry (if any) to prevent subsequent errors. - let invalidate_cache = |_: &compute::ConnectionError| { - let is_cached = node_info.cached(); - if is_cached { - warn!("invalidating stalled compute node info cache entry"); - node_info.invalidate(); - } - - let label = match is_cached { - true => "compute_cached", - false => "compute_uncached", - }; - NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); - }; - let allow_self_signed_compute = node_info.allow_self_signed_compute; node_info .config .connect(allow_self_signed_compute) - .inspect_err(invalidate_cache) + .inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info)) .await } diff --git a/scripts/comment-test-report.js b/scripts/comment-test-report.js old mode 100644 new mode 100755 index a7fd5b0bef..432c78d1af --- a/scripts/comment-test-report.js +++ b/scripts/comment-test-report.js @@ -1,3 +1,5 @@ +#! /usr/bin/env node + // // The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch. // @@ -19,7 +21,7 @@ // }) // -// Analog of Python's defaultdict. +// Equivalent of Python's defaultdict. // // const dm = new DefaultMap(() => new DefaultMap(() => [])) // dm["firstKey"]["secondKey"].push("value") @@ -32,34 +34,7 @@ class DefaultMap extends Map { } } -module.exports = async ({ github, context, fetch, report }) => { - // Marker to find the comment in the subsequent runs - const startMarker = `` - // If we run the script in the PR or in the branch (main/release/...) - const isPullRequest = !!context.payload.pull_request - // Latest commit in PR or in the branch - const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha - // Let users know that the comment is updated automatically - const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${commitSha} at ${new Date().toISOString()} :recycle:
` - // GitHub bot id taken from (https://api.github.com/users/github-actions[bot]) - const githubActionsBotId = 41898282 - // Commend body itself - let commentBody = `${startMarker}\n` - - // Common parameters for GitHub API requests - const ownerRepoParams = { - owner: context.repo.owner, - repo: context.repo.repo, - } - - const {reportUrl, reportJsonUrl} = report - - if (!reportUrl || !reportJsonUrl) { - commentBody += `#### No tests were run or test report is not available\n` - commentBody += autoupdateNotice - return - } - +const parseReportJson = async ({ reportJsonUrl, fetch }) => { const suites = await (await fetch(reportJsonUrl)).json() // Allure distinguishes "failed" (with an assertion error) and "broken" (with any other error) tests. @@ -83,7 +58,7 @@ module.exports = async ({ github, context, fetch, report }) => { let buildType, pgVersion const match = test.name.match(/[\[-](?debug|release)-pg(?\d+)[-\]]/)?.groups if (match) { - ({buildType, pgVersion} = match) + ({ buildType, pgVersion } = match) } else { // It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance). console.info(`Cannot get BUILD_TYPE and Postgres Version from test name: "${test.name}", defaulting to "release" and "14"`) @@ -123,37 +98,68 @@ module.exports = async ({ github, context, fetch, report }) => { } } + return { + failedTests, + failedTestsCount, + passedTests, + passedTestsCount, + skippedTests, + skippedTestsCount, + flakyTests, + flakyTestsCount, + retriedTests, + pgVersions, + } +} + +const reportSummary = async (params) => { + const { + failedTests, + failedTestsCount, + passedTests, + passedTestsCount, + skippedTests, + skippedTestsCount, + flakyTests, + flakyTestsCount, + retriedTests, + pgVersions, + reportUrl, + } = params + + let summary = "" + const totalTestsCount = failedTestsCount + passedTestsCount + skippedTestsCount - commentBody += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n` + summary += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n` // Print test resuls from the newest to the oldest Postgres version for release and debug builds. for (const pgVersion of Array.from(pgVersions).sort().reverse()) { if (Object.keys(failedTests[pgVersion]).length > 0) { - commentBody += `#### Failures on Posgres ${pgVersion}\n\n` + summary += `#### Failures on Posgres ${pgVersion}\n\n` for (const [testName, tests] of Object.entries(failedTests[pgVersion])) { const links = [] for (const test of tests) { const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}` links.push(`[${test.buildType}](${allureLink})`) } - commentBody += `- \`${testName}\`: ${links.join(", ")}\n` + summary += `- \`${testName}\`: ${links.join(", ")}\n` } const testsToRerun = Object.values(failedTests[pgVersion]).map(x => x[0].name) const command = `DEFAULT_PG_VERSION=${pgVersion} scripts/pytest -k "${testsToRerun.join(" or ")}"` - commentBody += "```\n" - commentBody += `# Run failed on Postgres ${pgVersion} tests locally:\n` - commentBody += `${command}\n` - commentBody += "```\n" + summary += "```\n" + summary += `# Run failed on Postgres ${pgVersion} tests locally:\n` + summary += `${command}\n` + summary += "```\n" } } if (flakyTestsCount > 0) { - commentBody += `
\nFlaky tests (${flakyTestsCount})\n\n` + summary += `
\nFlaky tests (${flakyTestsCount})\n\n` for (const pgVersion of Array.from(pgVersions).sort().reverse()) { if (Object.keys(flakyTests[pgVersion]).length > 0) { - commentBody += `#### Postgres ${pgVersion}\n\n` + summary += `#### Postgres ${pgVersion}\n\n` for (const [testName, tests] of Object.entries(flakyTests[pgVersion])) { const links = [] for (const test of tests) { @@ -161,11 +167,57 @@ module.exports = async ({ github, context, fetch, report }) => { const status = test.status === "passed" ? ":white_check_mark:" : ":x:" links.push(`[${status} ${test.buildType}](${allureLink})`) } - commentBody += `- \`${testName}\`: ${links.join(", ")}\n` + summary += `- \`${testName}\`: ${links.join(", ")}\n` } } } - commentBody += "\n
\n" + summary += "\n
\n" + } + + return summary +} + +module.exports = async ({ github, context, fetch, report }) => { + // Marker to find the comment in the subsequent runs + const startMarker = `` + // If we run the script in the PR or in the branch (main/release/...) + const isPullRequest = !!context.payload.pull_request + // Latest commit in PR or in the branch + const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha + // Let users know that the comment is updated automatically + const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${commitSha} at ${new Date().toISOString()} :recycle:
` + // GitHub bot id taken from (https://api.github.com/users/github-actions[bot]) + const githubActionsBotId = 41898282 + // Commend body itself + let commentBody = `${startMarker}\n` + + // Common parameters for GitHub API requests + const ownerRepoParams = { + owner: context.repo.owner, + repo: context.repo.repo, + } + + const {reportUrl, reportJsonUrl} = report + + if (!reportUrl || !reportJsonUrl) { + commentBody += `#### No tests were run or test report is not available\n` + commentBody += autoupdateNotice + return + } + + try { + const parsed = await parseReportJson({ reportJsonUrl, fetch }) + commentBody += await reportSummary({ ...parsed, reportUrl }) + } catch (error) { + commentBody += `### [full report](${reportUrl})\n___\n` + commentBody += `#### Failed to create a summary for the test run: \n` + commentBody += "```\n" + commentBody += `${error.stack}\n` + commentBody += "```\n" + commentBody += "\nTo reproduce and debug the error locally run:\n" + commentBody += "```\n" + commentBody += `scripts/comment-test-report.js ${reportJsonUrl}` + commentBody += "\n```\n" } commentBody += autoupdateNotice @@ -207,3 +259,60 @@ module.exports = async ({ github, context, fetch, report }) => { }) } } + +// Equivalent of Python's `if __name__ == "__main__":` +// https://nodejs.org/docs/latest/api/modules.html#accessing-the-main-module +if (require.main === module) { + // Poor man's argument parsing: we expect the third argument is a JSON URL (0: node binary, 1: this script, 2: JSON url) + if (process.argv.length !== 3) { + console.error(`Unexpected number of arguments\nUsage: node ${process.argv[1]} `) + process.exit(1) + } + const jsonUrl = process.argv[2] + + try { + new URL(jsonUrl) + } catch (error) { + console.error(`Invalid URL: ${jsonUrl}\nUsage: node ${process.argv[1]} `) + process.exit(1) + } + + const htmlUrl = jsonUrl.replace("/data/suites.json", "/index.html") + + const githubMock = { + rest: { + issues: { + createComment: console.log, + listComments: async () => ({ data: [] }), + updateComment: console.log + }, + repos: { + createCommitComment: console.log, + listCommentsForCommit: async () => ({ data: [] }), + updateCommitComment: console.log + } + } + } + + const contextMock = { + repo: { + owner: 'testOwner', + repo: 'testRepo' + }, + payload: { + number: 42, + pull_request: null, + }, + sha: '0000000000000000000000000000000000000000', + } + + module.exports({ + github: githubMock, + context: contextMock, + fetch: fetch, + report: { + reportUrl: htmlUrl, + reportJsonUrl: jsonUrl, + } + }) +} diff --git a/scripts/ingest_perf_test_result.py b/scripts/ingest_perf_test_result.py index 1bfc907def..35a1e29720 100644 --- a/scripts/ingest_perf_test_result.py +++ b/scripts/ingest_perf_test_result.py @@ -1,12 +1,14 @@ #!/usr/bin/env python3 import argparse import json +import logging import os import sys from contextlib import contextmanager from datetime import datetime from pathlib import Path +import backoff import psycopg2 import psycopg2.extras @@ -35,9 +37,20 @@ def get_connection_cursor(): connstr = os.getenv("DATABASE_URL") if not connstr: err("DATABASE_URL environment variable is not set") - with psycopg2.connect(connstr, connect_timeout=30) as conn: + + @backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150) + def connect(connstr): + conn = psycopg2.connect(connstr, connect_timeout=30) + conn.autocommit = True + return conn + + conn = connect(connstr) + try: with conn.cursor() as cur: yield cur + finally: + if conn is not None: + conn.close() def create_table(cur): @@ -115,6 +128,7 @@ def main(): parser.add_argument( "--ingest", type=Path, + required=True, help="Path to perf test result file, or directory with perf test result files", ) parser.add_argument("--initdb", action="store_true", help="Initialuze database") @@ -140,4 +154,5 @@ def main(): if __name__ == "__main__": + logging.getLogger("backoff").addHandler(logging.StreamHandler()) main() diff --git a/scripts/ingest_regress_test_result.py b/scripts/ingest_regress_test_result.py index 974167483a..39c1c02941 100644 --- a/scripts/ingest_regress_test_result.py +++ b/scripts/ingest_regress_test_result.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 import argparse +import logging import os import re import sys from contextlib import contextmanager from pathlib import Path +import backoff import psycopg2 CREATE_TABLE = """ @@ -29,9 +31,20 @@ def get_connection_cursor(): connstr = os.getenv("DATABASE_URL") if not connstr: err("DATABASE_URL environment variable is not set") - with psycopg2.connect(connstr, connect_timeout=30) as conn: + + @backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150) + def connect(connstr): + conn = psycopg2.connect(connstr, connect_timeout=30) + conn.autocommit = True + return conn + + conn = connect(connstr) + try: with conn.cursor() as cur: yield cur + finally: + if conn is not None: + conn.close() def create_table(cur): @@ -101,4 +114,5 @@ def main(): if __name__ == "__main__": + logging.getLogger("backoff").addHandler(logging.StreamHandler()) main() diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index b4c237cfa6..d55d159037 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -57,14 +57,16 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = ( "libmetrics_launch_timestamp", "libmetrics_build_info", "libmetrics_tracing_event_count_total", + "pageserver_materialized_cache_hits_total", + "pageserver_materialized_cache_hits_direct_total", + "pageserver_getpage_reconstruct_seconds_bucket", + "pageserver_getpage_reconstruct_seconds_count", + "pageserver_getpage_reconstruct_seconds_sum", ) PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = ( "pageserver_current_logical_size", "pageserver_resident_physical_size", - "pageserver_getpage_reconstruct_seconds_bucket", - "pageserver_getpage_reconstruct_seconds_count", - "pageserver_getpage_reconstruct_seconds_sum", "pageserver_getpage_get_reconstruct_data_seconds_bucket", "pageserver_getpage_get_reconstruct_data_seconds_count", "pageserver_getpage_get_reconstruct_data_seconds_sum", @@ -73,8 +75,6 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = ( "pageserver_io_operations_seconds_count", "pageserver_io_operations_seconds_sum", "pageserver_last_record_lsn", - "pageserver_materialized_cache_hits_total", - "pageserver_materialized_cache_hits_direct_total", "pageserver_read_num_fs_layers_bucket", "pageserver_read_num_fs_layers_count", "pageserver_read_num_fs_layers_sum", diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a8610e24df..64c71d2a59 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1631,6 +1631,8 @@ class NeonPageserver(PgProtocol): r".*ERROR.*ancestor timeline \S+ is being stopped", # this is expected given our collaborative shutdown approach for the UploadQueue ".*Compaction failed, retrying in .*: queue is in state Stopped.*", + # Pageserver timeline deletion should be polled until it gets 404, so ignore it globally + ".*Error processing HTTP request: NotFound: Timeline .* was not found", ] def start( diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index f258a3a24d..5c4f5177d0 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -342,6 +342,11 @@ class PageserverHttpClient(requests.Session): return res_json def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs): + """ + Note that deletion is not instant, it is scheduled and performed mostly in the background. + So if you need to wait for it to complete use `timeline_delete_wait_completed`. + For longer description consult with pageserver openapi spec. + """ res = self.delete( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", **kwargs ) diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 83880abc77..ad89ebad00 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -193,19 +193,30 @@ def wait_for_upload_queue_empty( time.sleep(0.2) -def assert_timeline_detail_404( +def wait_timeline_detail_404( + pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId +): + last_exc = None + for _ in range(2): + time.sleep(0.250) + try: + data = pageserver_http.timeline_detail(tenant_id, timeline_id) + log.error(f"detail {data}") + except PageserverApiException as e: + log.debug(e) + if e.status_code == 404: + return + + last_exc = e + + raise last_exc or RuntimeError(f"Timeline wasnt deleted in time, state: {data['state']}") + + +def timeline_delete_wait_completed( pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId, + **delete_args, ): - """Asserts that timeline_detail returns 404, or dumps the detail.""" - try: - data = pageserver_http.timeline_detail(tenant_id, timeline_id) - log.error(f"detail {data}") - except PageserverApiException as e: - log.error(e) - if e.status_code == 404: - return - else: - raise - raise Exception("detail succeeded (it should return 404)") + pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args) + wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id) diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 2635dbd93c..61f86dc3ce 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -15,7 +15,11 @@ from fixtures.neon_fixtures import ( PortDistributor, ) from fixtures.pageserver.http import PageserverHttpClient -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + timeline_delete_wait_completed, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.pg_version import PgVersion from fixtures.types import Lsn from pytest import FixtureRequest @@ -417,7 +421,7 @@ def check_neon_works( ) shutil.rmtree(repo_dir / "local_fs_remote_storage") - pageserver_http.timeline_delete(tenant_id, timeline_id) + timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id) pageserver_http.timeline_create(pg_version, tenant_id, timeline_id) pg_bin.run( ["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"] diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 5c3948b027..141c69b230 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -14,7 +14,11 @@ from fixtures.neon_fixtures import ( NeonEnvBuilder, PgBin, ) -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + timeline_delete_wait_completed, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import subprocess_capture @@ -151,7 +155,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build ".*files not bound to index_file.json, proceeding with their deletion.*" ) - client.timeline_delete(tenant, timeline) + timeline_delete_wait_completed(client, tenant, timeline) # Importing correct backup works import_tar(base_tar, wal_tar) diff --git a/test_runner/regress/test_layer_eviction.py b/test_runner/regress/test_layer_eviction.py index a96532c0d8..b22e545f20 100644 --- a/test_runner/regress/test_layer_eviction.py +++ b/test_runner/regress/test_layer_eviction.py @@ -24,7 +24,13 @@ def test_basic_eviction( test_name="test_download_remote_layers_api", ) - env = neon_env_builder.init_start() + env = neon_env_builder.init_start( + initial_tenant_conf={ + # disable gc and compaction background loops because they perform on-demand downloads + "gc_period": "0s", + "compaction_period": "0s", + } + ) client = env.pageserver.http_client() endpoint = env.endpoints.create_start("main") @@ -47,6 +53,11 @@ def test_basic_eviction( client.timeline_checkpoint(tenant_id, timeline_id) wait_for_upload(client, tenant_id, timeline_id, current_lsn) + # disable compute & sks to avoid on-demand downloads by walreceiver / getpage + endpoint.stop() + for sk in env.safekeepers: + sk.stop() + timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) initial_local_layers = sorted( list(filter(lambda path: path.name != "metadata", timeline_path.glob("*"))) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 11ac9e2555..f2b954a822 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -20,7 +20,7 @@ from fixtures.neon_fixtures import ( ) from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pageserver.utils import ( - assert_timeline_detail_404, + timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, wait_until_tenant_active, @@ -597,14 +597,11 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue( env.pageserver.allowed_errors.append( ".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping" ) - client.timeline_delete(tenant_id, timeline_id) - env.pageserver.allowed_errors.append(f".*Timeline {tenant_id}/{timeline_id} was not found.*") env.pageserver.allowed_errors.append( ".*files not bound to index_file.json, proceeding with their deletion.*" ) - - wait_until(2, 0.5, lambda: assert_timeline_detail_404(client, tenant_id, timeline_id)) + timeline_delete_wait_completed(client, tenant_id, timeline_id) assert not timeline_path.exists() diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index e9dcd1e5cd..a0f9f854ed 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -11,6 +11,7 @@ from fixtures.neon_fixtures import ( wait_for_wal_insert_lsn, ) from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.utils import timeline_delete_wait_completed from fixtures.pg_version import PgVersion, xfail_on_postgres from fixtures.types import Lsn, TenantId, TimelineId @@ -628,12 +629,12 @@ def test_get_tenant_size_with_multiple_branches( size_debug_file_before.write(size_debug) # teardown, delete branches, and the size should be going down - http_client.timeline_delete(tenant_id, first_branch_timeline_id) + timeline_delete_wait_completed(http_client, tenant_id, first_branch_timeline_id) size_after_deleting_first = http_client.tenant_size(tenant_id) assert size_after_deleting_first < size_after_thinning_branch - http_client.timeline_delete(tenant_id, second_branch_timeline_id) + timeline_delete_wait_completed(http_client, tenant_id, second_branch_timeline_id) size_after_deleting_second = http_client.tenant_size(tenant_id) assert size_after_deleting_second < size_after_deleting_first diff --git a/test_runner/regress/test_tenant_tasks.py b/test_runner/regress/test_tenant_tasks.py index 21e4af4127..75e5c2c91c 100644 --- a/test_runner/regress/test_tenant_tasks.py +++ b/test_runner/regress/test_tenant_tasks.py @@ -1,6 +1,10 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder -from fixtures.pageserver.utils import assert_tenant_state, wait_until_tenant_active +from fixtures.pageserver.utils import ( + assert_tenant_state, + timeline_delete_wait_completed, + wait_until_tenant_active, +) from fixtures.types import TenantId, TimelineId from fixtures.utils import wait_until @@ -24,7 +28,7 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder): def delete_all_timelines(tenant: TenantId): timelines = [TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)] for t in timelines: - client.timeline_delete(tenant, t) + timeline_delete_wait_completed(client, tenant, t) # Create tenant, start compute tenant, _ = env.neon_cli.create_tenant() diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index aef2df4932..4dbfa8bc1f 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -21,6 +21,7 @@ from fixtures.neon_fixtures import ( RemoteStorageKind, available_remote_storages, ) +from fixtures.pageserver.utils import timeline_delete_wait_completed from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import wait_until from prometheus_client.samples import Sample @@ -213,7 +214,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder): # Test (a subset of) pageserver global metrics for metric in PAGESERVER_GLOBAL_METRICS: ps_samples = ps_metrics.query_all(metric, {}) - assert len(ps_samples) > 0 + assert len(ps_samples) > 0, f"expected at least one sample for {metric}" for sample in ps_samples: labels = ",".join([f'{key}="{value}"' for key, value in sample.labels.items()]) log.info(f"{sample.name}{{{labels}}} {sample.value}") @@ -318,9 +319,10 @@ def test_pageserver_with_empty_tenants( client.tenant_create(tenant_with_empty_timelines) temp_timelines = client.timeline_list(tenant_with_empty_timelines) for temp_timeline in temp_timelines: - client.timeline_delete( - tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"]) + timeline_delete_wait_completed( + client, tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"]) ) + files_in_timelines_dir = sum( 1 for _p in Path.iterdir( diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 28b15d03ca..ddd9ffd755 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -17,9 +17,10 @@ from fixtures.neon_fixtures import ( ) from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import ( - assert_timeline_detail_404, + timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, + wait_timeline_detail_404, wait_until_tenant_active, wait_until_timeline_state, ) @@ -83,7 +84,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): wait_until( number_of_iterations=3, interval=0.2, - func=lambda: ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id), + func=lambda: timeline_delete_wait_completed(ps_http, env.initial_tenant, leaf_timeline_id), ) assert not timeline_path.exists() @@ -94,16 +95,16 @@ def test_timeline_delete(neon_simple_env: NeonEnv): match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found", ) as exc: ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) - - # FIXME leaves tenant without timelines, should we prevent deletion of root timeline? - wait_until( - number_of_iterations=3, - interval=0.2, - func=lambda: ps_http.timeline_delete(env.initial_tenant, parent_timeline_id), - ) - assert exc.value.status_code == 404 + wait_until( + number_of_iterations=3, + interval=0.2, + func=lambda: timeline_delete_wait_completed( + ps_http, env.initial_tenant, parent_timeline_id + ), + ) + # Check that we didn't pick up the timeline again after restart. # See https://github.com/neondatabase/neon/issues/3560 env.pageserver.stop(immediate=True) @@ -143,7 +144,6 @@ def test_delete_timeline_post_rm_failure( ps_http.configure_failpoints((failpoint_name, "return")) ps_http.timeline_delete(env.initial_tenant, env.initial_timeline) - timeline_info = wait_until_timeline_state( pageserver_http=ps_http, tenant_id=env.initial_tenant, @@ -165,13 +165,7 @@ def test_delete_timeline_post_rm_failure( # this should succeed # this also checks that delete can be retried even when timeline is in Broken state - ps_http.timeline_delete(env.initial_tenant, env.initial_timeline, timeout=2) - with pytest.raises(PageserverApiException) as e: - ps_http.timeline_detail(env.initial_tenant, env.initial_timeline) - - assert e.value.status_code == 404 - - env.pageserver.allowed_errors.append(f".*NotFound: Timeline.*{env.initial_timeline}.*") + timeline_delete_wait_completed(ps_http, env.initial_tenant, env.initial_timeline) env.pageserver.allowed_errors.append( f".*{env.initial_timeline}.*timeline directory not found, proceeding anyway.*" ) @@ -247,13 +241,7 @@ def test_timeline_resurrection_on_attach( pass # delete new timeline - ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id) - - env.pageserver.allowed_errors.append( - f".*Timeline {tenant_id}/{branch_timeline_id} was not found.*" - ) - - wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, branch_timeline_id)) + timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=branch_timeline_id) ##### Stop the pageserver instance, erase all its data env.endpoints.stop_all() @@ -338,7 +326,6 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild ) ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id) - timeline_info = wait_until_timeline_state( pageserver_http=ps_http, tenant_id=env.initial_tenant, @@ -357,12 +344,15 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild # Wait for tenant to finish loading. wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1) - env.pageserver.allowed_errors.append( - f".*Timeline {env.initial_tenant}/{leaf_timeline_id} was not found.*" - ) - wait_until( - 2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id) - ) + try: + data = ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) + log.debug(f"detail {data}") + except PageserverApiException as e: + log.debug(e) + if e.status_code != 404: + raise + else: + raise Exception("detail succeeded (it should return 404)") assert ( not leaf_timeline_path.exists() @@ -389,13 +379,8 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild assert env.initial_timeline is not None for timeline_id in (intermediate_timeline_id, env.initial_timeline): - ps_http.timeline_delete(env.initial_tenant, timeline_id) - - env.pageserver.allowed_errors.append( - f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*" - ) - wait_until( - 2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, timeline_id) + timeline_delete_wait_completed( + ps_http, tenant_id=env.initial_tenant, timeline_id=timeline_id ) assert_prefix_empty( @@ -419,23 +404,27 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild ) -def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( - neon_env_builder: NeonEnvBuilder, +@pytest.mark.parametrize( + "stuck_failpoint", + ["persist_deleted_index_part", "in_progress_delete"], +) +def test_concurrent_timeline_delete_stuck_on( + neon_env_builder: NeonEnvBuilder, stuck_failpoint: str ): """ - If we're stuck uploading the index file with the is_delete flag, - eventually console will hand up and retry. - If we're still stuck at the retry time, ensure that the retry - fails with status 500, signalling to console that it should retry - later. - Ideally, timeline_delete should return 202 Accepted and require - console to poll for completion, but, that would require changing - the API contract. + If delete is stuck console will eventually retry deletion. + So we need to be sure that these requests wont interleave with each other. + In this tests we check two places where we can spend a lot of time. + This is a regression test because there was a bug when DeletionGuard wasnt propagated + to the background task. + + Ensure that when retry comes if we're still stuck request will get an immediate error response, + signalling to console that it should retry later. """ neon_env_builder.enable_remote_storage( remote_storage_kind=RemoteStorageKind.MOCK_S3, - test_name="test_concurrent_timeline_delete_if_first_stuck_at_index_upload", + test_name=f"concurrent_timeline_delete_stuck_on_{stuck_failpoint}", ) env = neon_env_builder.init_start() @@ -445,13 +434,14 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( ps_http = env.pageserver.http_client() # make the first call sleep practically forever - failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause" - ps_http.configure_failpoints((failpoint_name, "pause")) + ps_http.configure_failpoints((stuck_failpoint, "pause")) def first_call(result_queue): try: log.info("first call start") - ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10) + timeline_delete_wait_completed( + ps_http, env.initial_tenant, child_timeline_id, timeout=10 + ) log.info("first call success") result_queue.put("success") except Exception: @@ -466,7 +456,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( def first_call_hit_failpoint(): assert env.pageserver.log_contains( - f".*{child_timeline_id}.*at failpoint {failpoint_name}" + f".*{child_timeline_id}.*at failpoint {stuck_failpoint}" ) wait_until(50, 0.1, first_call_hit_failpoint) @@ -484,8 +474,12 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( ) log.info("second call failed as expected") + # ensure it is not 404 and stopping + detail = ps_http.timeline_detail(env.initial_tenant, child_timeline_id) + assert detail["state"] == "Stopping" + # by now we know that the second call failed, let's ensure the first call will finish - ps_http.configure_failpoints((failpoint_name, "off")) + ps_http.configure_failpoints((stuck_failpoint, "off")) result = first_call_result.get() assert result == "success" @@ -498,8 +492,10 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): """ - If the client hangs up before we start the index part upload but after we mark it + If the client hangs up before we start the index part upload but after deletion is scheduled + we mark it deleted in local memory, a subsequent delete_timeline call should be able to do + another delete timeline operation. This tests cancel safety up to the given failpoint. @@ -515,12 +511,18 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): ps_http = env.pageserver.http_client() - failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause" + failpoint_name = "persist_deleted_index_part" ps_http.configure_failpoints((failpoint_name, "pause")) with pytest.raises(requests.exceptions.Timeout): ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2) + env.pageserver.allowed_errors.append( + f".*{child_timeline_id}.*timeline deletion is already in progress.*" + ) + with pytest.raises(PageserverApiException, match="timeline deletion is already in progress"): + ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2) + # make sure the timeout was due to the failpoint at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*" @@ -552,12 +554,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): wait_until(50, 0.1, first_request_finished) # check that the timeline is gone - notfound_message = f"Timeline {env.initial_tenant}/{child_timeline_id} was not found" - env.pageserver.allowed_errors.append(".*" + notfound_message) - with pytest.raises(PageserverApiException, match=notfound_message) as exc: - ps_http.timeline_detail(env.initial_tenant, child_timeline_id) - - assert exc.value.status_code == 404 + wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id) @pytest.mark.parametrize( @@ -616,12 +613,7 @@ def test_timeline_delete_works_for_remote_smoke( for timeline_id in reversed(timeline_ids): # note that we need to finish previous deletion before scheduling next one # otherwise we can get an "HasChildren" error if deletion is not fast enough (real_s3) - ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id) - - env.pageserver.allowed_errors.append( - f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*" - ) - wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, timeline_id)) + timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=timeline_id) assert_prefix_empty( neon_env_builder, diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 1460172afe..5bdbc18927 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -24,6 +24,7 @@ from fixtures.neon_fixtures import ( from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pageserver.utils import ( assert_tenant_state, + timeline_delete_wait_completed, wait_for_upload_queue_empty, wait_until_tenant_active, ) @@ -272,7 +273,7 @@ def test_timeline_initial_logical_size_calculation_cancellation( if deletion_method == "tenant_detach": client.tenant_detach(tenant_id) elif deletion_method == "timeline_delete": - client.timeline_delete(tenant_id, timeline_id) + timeline_delete_wait_completed(client, tenant_id, timeline_id) delete_timeline_success.put(True) except PageserverApiException: delete_timeline_success.put(False) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 8b595596cb..a837501678 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -31,7 +31,11 @@ from fixtures.neon_fixtures import ( SafekeeperPort, available_remote_storages, ) -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + timeline_delete_wait_completed, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.pg_version import PgVersion from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import get_dir_size, query_scalar, start_in_background @@ -548,15 +552,15 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re f"sk_id={sk.id} to flush {last_lsn}", ) - ps_cli = env.pageserver.http_client() - pageserver_lsn = Lsn(ps_cli.timeline_detail(tenant_id, timeline_id)["last_record_lsn"]) + ps_http = env.pageserver.http_client() + pageserver_lsn = Lsn(ps_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"]) lag = last_lsn - pageserver_lsn log.info( f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb" ) endpoint.stop_and_destroy() - ps_cli.timeline_delete(tenant_id, timeline_id) + timeline_delete_wait_completed(ps_http, tenant_id, timeline_id) # Also delete and manually create timeline on safekeepers -- this tests # scenario of manual recovery on different set of safekeepers. @@ -583,7 +587,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re shutil.copy(f_partial_saved, f_partial_path) # recreate timeline on pageserver from scratch - ps_cli.timeline_create( + ps_http.timeline_create( pg_version=PgVersion(pg_version), tenant_id=tenant_id, new_timeline_id=timeline_id, @@ -598,7 +602,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re if elapsed > wait_lsn_timeout: raise RuntimeError("Timed out waiting for WAL redo") - tenant_status = ps_cli.tenant_status(tenant_id) + tenant_status = ps_http.tenant_status(tenant_id) if tenant_status["state"]["slug"] == "Loading": log.debug(f"Tenant {tenant_id} is still loading, retrying") else: