From 1299df87d2c32888383a2bd0da0939bf3aabfd20 Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Tue, 13 Jun 2023 13:34:56 +0200 Subject: [PATCH 01/16] [compute_ctl] Fix logging if catalog updates are skipped (#4480) Otherwise, it wasn't clear from the log when Postgres started up completely if catalog updates were skipped. Follow-up for 4936ab6 --- compute_tools/src/compute.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 977708a18f..94cebf93de 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -370,11 +370,6 @@ impl ComputeNode { // 'Close' connection drop(client); - info!( - "finished configuration of compute for project {}", - spec.cluster.cluster_id.as_deref().unwrap_or("None") - ); - Ok(()) } @@ -427,22 +422,22 @@ impl ComputeNode { #[instrument(skip(self))] pub fn start_compute(&self) -> Result { let compute_state = self.state.lock().unwrap().clone(); - let spec = compute_state.pspec.as_ref().expect("spec must be set"); + let pspec = compute_state.pspec.as_ref().expect("spec must be set"); info!( "starting compute for project {}, operation {}, tenant {}, timeline {}", - spec.spec.cluster.cluster_id.as_deref().unwrap_or("None"), - spec.spec.operation_uuid.as_deref().unwrap_or("None"), - spec.tenant_id, - spec.timeline_id, + pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"), + pspec.spec.operation_uuid.as_deref().unwrap_or("None"), + pspec.tenant_id, + pspec.timeline_id, ); self.prepare_pgdata(&compute_state)?; let start_time = Utc::now(); - let pg = self.start_postgres(spec.storage_auth_token.clone())?; + let pg = self.start_postgres(pspec.storage_auth_token.clone())?; - if spec.spec.mode == ComputeMode::Primary && !spec.spec.skip_pg_catalog_updates { + if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates { self.apply_config(&compute_state)?; } @@ -462,6 +457,11 @@ impl ComputeNode { } self.set_status(ComputeStatus::Running); + info!( + "finished configuration of compute for project {}", + pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None") + ); + Ok(pg) } From fdf7a67ed2a8533bce6df83ef2f48ade592fd39c Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 13 Jun 2023 13:49:40 +0200 Subject: [PATCH 02/16] init_empty_layer_map: use `try_write` (#4485) This is preliminary work for/from #4220 (async `Layer::get_value_reconstruct_data`). Or more specifically, #4441, where we turn Timeline::layers into a tokio::sync::RwLock. By using try_write() here, we can avoid turning init_empty_layer_map async, which is nice because much of its transitive call(er) graph isn't async. --- pageserver/src/tenant/timeline.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b8a7cdacb7..ec434843d1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1598,7 +1598,9 @@ impl Timeline { /// Initialize with an empty layer map. Used when creating a new timeline. /// pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) { - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.try_write().expect( + "in the context where we call this function, no other task has access to the object", + ); layers.next_open_layer_at = Some(Lsn(start_lsn.0)); } From 3693d1f431e89eec8231ab3ffa44021f1778d4cf Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 13 Jun 2023 18:38:41 +0200 Subject: [PATCH 03/16] turn Timeline::layers into tokio::sync::RwLock (#4441) This is preliminary work for/from #4220 (async `Layer::get_value_reconstruct_data`). # Full Stack Of Preliminary PRs Thanks to the countless preliminary PRs, this conversion is relatively straight-forward. 1. Clean-ups * https://github.com/neondatabase/neon/pull/4316 * https://github.com/neondatabase/neon/pull/4317 * https://github.com/neondatabase/neon/pull/4318 * https://github.com/neondatabase/neon/pull/4319 * https://github.com/neondatabase/neon/pull/4321 * Note: these were mostly to find an alternative to #4291, which I thought we'd need in my original plan where we would need to convert `Tenant::timelines` into an async locking primitive (#4333). In reviews, we walked away from that, but these cleanups were still quite useful. 2. https://github.com/neondatabase/neon/pull/4364 3. https://github.com/neondatabase/neon/pull/4472 4. https://github.com/neondatabase/neon/pull/4476 5. https://github.com/neondatabase/neon/pull/4477 6. https://github.com/neondatabase/neon/pull/4485 # Significant Changes In This PR ## `compact_level0_phase1` & `create_delta_layer` This commit partially reverts "pgserver: spawn_blocking in compaction (#4265)" 4e359db4c78e0fd3d3e8f6a69baac4fb5b80b752. Specifically, it reverts the `spawn_blocking`-ificiation of `compact_level0_phase1`. If we didn't revert it, we'd have to use `Timeline::layers.blocking_read()` inside `compact_level0_phase1`. That would use up a thread in the `spawn_blocking` thread pool, which is hard-capped. I considered wrapping the code that follows the second `layers.read().await` into `spawn_blocking`, but there are lifetime issues with `deltas_to_compact`. Also, this PR switches the `create_delta_layer` _function_ back to async, and uses `spawn_blocking` inside to run the code that does sync IO, while keeping the code that needs to lock `Timeline::layers` async. ## `LayerIter` and `LayerKeyIter` `Send` bounds I had to add a `Send` bound on the `dyn` type that `LayerIter` and `LayerKeyIter` wrap. Why? Because we now have the second `layers.read().await` inside `compact_level0_phase`, and these iterator instances are held across that await-point. More background: https://github.com/neondatabase/neon/pull/4462#issuecomment-1587376960 ## `DatadirModification::flush` Needed to replace the `HashMap::retain` with a hand-rolled variant because `TimelineWriter::put` is now async. --- pageserver/src/disk_usage_eviction_task.rs | 2 +- pageserver/src/http/routes.rs | 11 +- pageserver/src/pgdatadir_mapping.rs | 21 +- pageserver/src/tenant.rs | 167 ++++++++++------ pageserver/src/tenant/storage_layer.rs | 4 +- .../tenant/storage_layer/inmemory_layer.rs | 2 +- pageserver/src/tenant/timeline.rs | 181 +++++++++--------- .../src/tenant/timeline/eviction_task.rs | 2 +- 8 files changed, 217 insertions(+), 173 deletions(-) diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index ce5f81c44b..61cbd5066f 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -516,7 +516,7 @@ async fn collect_eviction_candidates( if !tl.is_active() { continue; } - let info = tl.get_local_layers_for_disk_usage_eviction(); + let info = tl.get_local_layers_for_disk_usage_eviction().await; debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); tenant_candidates.extend( info.resident_layers diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index ac230e5f4a..fc8da70cc0 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -215,7 +215,7 @@ async fn build_timeline_info( ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); - let mut info = build_timeline_info_common(timeline, ctx)?; + let mut info = build_timeline_info_common(timeline, ctx).await?; if include_non_incremental_logical_size { // XXX we should be using spawn_ondemand_logical_size_calculation here. // Otherwise, if someone deletes the timeline / detaches the tenant while @@ -233,7 +233,7 @@ async fn build_timeline_info( Ok(info) } -fn build_timeline_info_common( +async fn build_timeline_info_common( timeline: &Arc, ctx: &RequestContext, ) -> anyhow::Result { @@ -264,7 +264,7 @@ fn build_timeline_info_common( None } }; - let current_physical_size = Some(timeline.layer_size_sum()); + let current_physical_size = Some(timeline.layer_size_sum().await); let state = timeline.current_state(); let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0)); @@ -330,6 +330,7 @@ async fn timeline_create_handler( Ok(Some(new_timeline)) => { // Created. Construct a TimelineInfo for it. let timeline_info = build_timeline_info_common(&new_timeline, &ctx) + .await .map_err(ApiError::InternalServerError)?; json_response(StatusCode::CREATED, timeline_info) } @@ -591,7 +592,7 @@ async fn tenant_status( // Calculate total physical size of all timelines let mut current_physical_size = 0; for timeline in tenant.list_timelines().iter() { - current_physical_size += timeline.layer_size_sum(); + current_physical_size += timeline.layer_size_sum().await; } let state = tenant.current_state(); @@ -701,7 +702,7 @@ async fn layer_map_info_handler( check_permission(&request, Some(tenant_id))?; let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; - let layer_map_info = timeline.layer_map_info(reset); + let layer_map_info = timeline.layer_map_info(reset).await; json_response(StatusCode::OK, layer_map_info) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 51cac43f50..86c84ec82f 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1133,16 +1133,17 @@ impl<'a> DatadirModification<'a> { let writer = self.tline.writer().await; // Flush relation and SLRU data blocks, keep metadata. - let mut result: anyhow::Result<()> = Ok(()); - self.pending_updates.retain(|&key, value| { - if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) { - result = writer.put(key, self.lsn, value); - false + let mut retained_pending_updates = HashMap::new(); + for (key, value) in self.pending_updates.drain() { + if is_rel_block_key(key) || is_slru_block_key(key) { + // This bails out on first error without modifying pending_updates. + // That's Ok, cf this function's doc comment. + writer.put(key, self.lsn, &value).await?; } else { - true + retained_pending_updates.insert(key, value); } - }); - result?; + } + self.pending_updates.extend(retained_pending_updates); if pending_nblocks != 0 { writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); @@ -1164,10 +1165,10 @@ impl<'a> DatadirModification<'a> { self.pending_nblocks = 0; for (key, value) in self.pending_updates.drain() { - writer.put(key, lsn, &value)?; + writer.put(key, lsn, &value).await?; } for key_range in self.pending_deletions.drain(..) { - writer.delete(key_range, lsn)?; + writer.delete(key_range, lsn).await?; } writer.finish_write(lsn); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 32390c06cf..5603bcef84 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -519,6 +519,7 @@ impl Tenant { ); timeline .load_layer_map(new_disk_consistent_lsn) + .await .with_context(|| { format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}") })?; @@ -560,7 +561,7 @@ impl Tenant { || timeline .layers .read() - .unwrap() + .await .iter_historic_layers() .next() .is_some(), @@ -3582,12 +3583,16 @@ mod tests { .await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; + writer + .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) + .await?; writer.finish_write(Lsn(0x10)); drop(writer); let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; + writer + .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) + .await?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -3656,13 +3661,21 @@ mod tests { let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap(); // Insert a value on the timeline - writer.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))?; - writer.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))?; + writer + .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20")) + .await?; + writer + .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20")) + .await?; writer.finish_write(Lsn(0x20)); - writer.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))?; + writer + .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30")) + .await?; writer.finish_write(Lsn(0x30)); - writer.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))?; + writer + .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40")) + .await?; writer.finish_write(Lsn(0x40)); //assert_current_logical_size(&tline, Lsn(0x40)); @@ -3675,7 +3688,9 @@ mod tests { .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); let new_writer = newtline.writer().await; - new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; + new_writer + .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40")) + .await?; new_writer.finish_write(Lsn(0x40)); // Check page contents on both branches @@ -3703,36 +3718,44 @@ mod tests { { let writer = tline.writer().await; // Create a relation on the timeline - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; } tline.freeze_and_flush().await?; { let writer = tline.writer().await; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); } tline.freeze_and_flush().await @@ -4048,7 +4071,9 @@ mod tests { .await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; + writer + .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) + .await?; writer.finish_write(Lsn(0x10)); drop(writer); @@ -4056,7 +4081,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; + writer + .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) + .await?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -4064,7 +4091,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?; + writer + .put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30"))) + .await?; writer.finish_write(Lsn(0x30)); drop(writer); @@ -4072,7 +4101,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?; + writer + .put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40"))) + .await?; writer.finish_write(Lsn(0x40)); drop(writer); @@ -4124,11 +4155,13 @@ mod tests { for _ in 0..10000 { test_key.field6 = blknum; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); drop(writer); @@ -4174,11 +4207,13 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); updated[blknum] = lsn; drop(writer); @@ -4192,11 +4227,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); drop(writer); updated[blknum] = lsn; @@ -4249,11 +4286,13 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); updated[blknum] = lsn; drop(writer); @@ -4275,11 +4314,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; println!("updating {} at {}", blknum, lsn); writer.finish_write(lsn); drop(writer); @@ -4341,11 +4382,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))), + ) + .await?; println!("updating [{}][{}] at {}", idx, blknum, lsn); writer.finish_write(lsn); drop(writer); diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 6ac4fd9470..0af3d4ce39 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -389,10 +389,10 @@ pub trait Layer: std::fmt::Debug + Send + Sync { } /// Returned by [`Layer::iter`] -pub type LayerIter<'i> = Box> + 'i>; +pub type LayerIter<'i> = Box> + 'i + Send>; /// Returned by [`Layer::key_iter`] -pub type LayerKeyIter<'i> = Box + 'i>; +pub type LayerKeyIter<'i> = Box + 'i + Send>; /// A Layer contains all data in a "rectangle" consisting of a range of keys and /// range of LSNs. diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index c453683fea..78bcfdafc0 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -304,7 +304,7 @@ impl InMemoryLayer { Ok(()) } - pub fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { + pub async fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { // TODO: Currently, we just leak the storage for any deleted keys Ok(()) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ec434843d1..d642090996 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -125,7 +125,7 @@ pub struct Timeline { pub pg_version: u32, - pub(super) layers: RwLock>, + pub(crate) layers: tokio::sync::RwLock>, /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. @@ -597,8 +597,8 @@ impl Timeline { /// The sum of the file size of all historic layers in the layer map. /// This method makes no distinction between local and remote layers. /// Hence, the result **does not represent local filesystem usage**. - pub fn layer_size_sum(&self) -> u64 { - let layer_map = self.layers.read().unwrap(); + pub async fn layer_size_sum(&self) -> u64 { + let layer_map = self.layers.read().await; let mut size = 0; for l in layer_map.iter_historic_layers() { size += l.file_size(); @@ -908,7 +908,7 @@ impl Timeline { pub async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); let open_layer_size = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let Some(open_layer) = layers.open_layer.as_ref() else { return Ok(()); }; @@ -1038,8 +1038,8 @@ impl Timeline { } } - pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { - let layer_map = self.layers.read().unwrap(); + pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { + let layer_map = self.layers.read().await; let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { in_memory_layers.push(open_layer.info()); @@ -1061,7 +1061,7 @@ impl Timeline { #[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))] pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) }; if self.remote_client.is_none() { return Ok(Some(false)); @@ -1074,7 +1074,7 @@ impl Timeline { /// Like [`evict_layer_batch`], but for just one layer. /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; let remote_client = self .remote_client .as_ref() @@ -1159,7 +1159,7 @@ impl Timeline { } // start the batch update - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write().await; let mut batch_updates = layer_map.batch_update(); let mut results = Vec::with_capacity(layers_to_evict.len()); @@ -1417,7 +1417,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: RwLock::new(LayerMap::default()), + layers: tokio::sync::RwLock::new(LayerMap::default()), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -1607,8 +1607,8 @@ impl Timeline { /// /// Scan the timeline directory to populate the layer map. /// - pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { - let mut layers = self.layers.write().unwrap(); + pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let mut num_layers = 0; @@ -1737,7 +1737,7 @@ impl Timeline { // We're holding a layer map lock for a while but this // method is only called during init so it's fine. - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write().await; let mut updates = layer_map.batch_update(); for remote_layer_name in &index_part.timeline_layers { let local_layer = local_only_layers.remove(remote_layer_name); @@ -1890,7 +1890,7 @@ impl Timeline { let local_layers = self .layers .read() - .unwrap() + .await .iter_historic_layers() .map(|l| (l.filename(), l)) .collect::>(); @@ -2263,8 +2263,8 @@ impl Timeline { } } - fn find_layer(&self, layer_file_name: &str) -> Option> { - for historic_layer in self.layers.read().unwrap().iter_historic_layers() { + async fn find_layer(&self, layer_file_name: &str) -> Option> { + for historic_layer in self.layers.read().await.iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); if layer_file_name == historic_layer_name { return Some(historic_layer); @@ -2481,7 +2481,7 @@ impl Timeline { #[allow(clippy::never_loop)] // see comment at bottom of this loop 'layer_map_search: loop { let remote_layer = { - let layers = timeline.layers.read().unwrap(); + let layers = timeline.layers.read().await; // Check the open and frozen in-memory layers first, in order from newest // to oldest. @@ -2663,8 +2663,8 @@ impl Timeline { /// /// Get a handle to the latest layer for appending. /// - fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { - let mut layers = self.layers.write().unwrap(); + async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { + let mut layers = self.layers.write().await; ensure!(lsn.is_aligned()); @@ -2713,17 +2713,16 @@ impl Timeline { Ok(layer) } - fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { + async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { //info!("PUT: key {} at {}", key, lsn); - let layer = self.get_layer_for_write(lsn)?; + let layer = self.get_layer_for_write(lsn).await?; layer.put_value(key, lsn, val)?; Ok(()) } - fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - let layer = self.get_layer_for_write(lsn)?; - layer.put_tombstone(key_range, lsn)?; - + async fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { + let layer = self.get_layer_for_write(lsn).await?; + layer.put_tombstone(key_range, lsn).await?; Ok(()) } @@ -2742,7 +2741,7 @@ impl Timeline { } else { Some(self.write_lock.lock().await) }; - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; if let Some(open_layer) = &layers.open_layer { let open_layer_rc = Arc::clone(open_layer); // Does this layer need freezing? @@ -2780,7 +2779,7 @@ impl Timeline { let flush_counter = *layer_flush_start_rx.borrow(); let result = loop { let layer_to_flush = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; layers.frozen_layers.front().cloned() // drop 'layers' lock to allow concurrent reads and writes }; @@ -2896,16 +2895,7 @@ impl Timeline { } } // normal case, write out a L0 delta layer file. - let this = self.clone(); - let frozen_layer = frozen_layer.clone(); - let span = tracing::info_span!("blocking"); - let (delta_path, metadata) = tokio::task::spawn_blocking(move || { - let _g = span.entered(); - this.create_delta_layer(&frozen_layer) - }) - .await - .context("create_delta_layer spawn_blocking") - .and_then(|res| res)?; + let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?; HashMap::from([(delta_path, metadata)]) }; @@ -2914,7 +2904,7 @@ impl Timeline { // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. { - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let l = layers.frozen_layers.pop_front(); // Only one thread may call this function at a time (for this @@ -3008,34 +2998,50 @@ impl Timeline { } // Write out the given frozen in-memory layer as a new L0 delta file - fn create_delta_layer( + async fn create_delta_layer( self: &Arc, - frozen_layer: &InMemoryLayer, + frozen_layer: &Arc, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { - // Write it out - let new_delta = frozen_layer.write_to_disk()?; - let new_delta_path = new_delta.path(); - let new_delta_filename = new_delta.filename(); + let span = tracing::info_span!("blocking"); + let (new_delta, sz): (DeltaLayer, _) = tokio::task::spawn_blocking({ + let _g = span.entered(); + let self_clone = Arc::clone(self); + let frozen_layer = Arc::clone(frozen_layer); + move || { + // Write it out + let new_delta = frozen_layer.write_to_disk()?; + let new_delta_path = new_delta.path(); - // Sync it to disk. - // - // We must also fsync the timeline dir to ensure the directory entries for - // new layer files are durable - // - // TODO: If we're running inside 'flush_frozen_layers' and there are multiple - // files to flush, it might be better to first write them all, and then fsync - // them all in parallel. + // Sync it to disk. + // + // We must also fsync the timeline dir to ensure the directory entries for + // new layer files are durable + // + // TODO: If we're running inside 'flush_frozen_layers' and there are multiple + // files to flush, it might be better to first write them all, and then fsync + // them all in parallel. - // First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace - // this with a single fsync in future refactors. - par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; - // Then sync the parent directory. - par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)]) - .context("fsync of timeline dir")?; + // First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace + // this with a single fsync in future refactors. + par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; + // Then sync the parent directory. + par_fsync::par_fsync(&[self_clone + .conf + .timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)]) + .context("fsync of timeline dir")?; + + let sz = new_delta_path.metadata()?.len(); + + anyhow::Ok((new_delta, sz)) + } + }) + .await + .context("spawn_blocking")??; + let new_delta_name = new_delta.filename(); // Add it to the layer map let l = Arc::new(new_delta); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut batch_updates = layers.batch_update(); l.access_stats().record_residence_event( &batch_updates, @@ -3046,14 +3052,12 @@ impl Timeline { batch_updates.flush(); // update the timeline's physical size - let sz = new_delta_path.metadata()?.len(); - self.metrics.resident_physical_size_gauge.add(sz); // update metrics self.metrics.num_persistent_files_created.inc_by(1); self.metrics.persistent_bytes_written.inc_by(sz); - Ok((new_delta_filename, LayerFileMetadata::new(sz))) + Ok((new_delta_name, LayerFileMetadata::new(sz))) } async fn repartition( @@ -3087,10 +3091,14 @@ impl Timeline { } // Is it time to create a new image layer for the given partition? - fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result { + async fn time_for_new_image_layer( + &self, + partition: &KeySpace, + lsn: Lsn, + ) -> anyhow::Result { let threshold = self.get_image_creation_threshold(); - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut max_deltas = 0; { @@ -3185,7 +3193,7 @@ impl Timeline { for partition in partitioning.parts.iter() { let img_range = start..partition.ranges.last().unwrap().end; start = img_range.end; - if force || self.time_for_new_image_layer(partition, lsn)? { + if force || self.time_for_new_image_layer(partition, lsn).await? { let mut image_layer_writer = ImageLayerWriter::new( self.conf, self.timeline_id, @@ -3268,7 +3276,7 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); @@ -3330,13 +3338,13 @@ impl Timeline { /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the /// start of level0 files compaction, the on-demand download should be revisited as well. - fn compact_level0_phase1( + async fn compact_level0_phase1( &self, _layer_removal_cs: Arc>, target_file_size: u64, ctx: &RequestContext, ) -> Result { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut level0_deltas = layers.get_level0_deltas()?; drop(layers); @@ -3459,7 +3467,7 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); - let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here? + let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here? let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? @@ -3673,21 +3681,12 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result<(), CompactionError> { - let this = self.clone(); - let ctx_inner = ctx.clone(); - let layer_removal_cs_inner = layer_removal_cs.clone(); - let span = tracing::info_span!("blocking"); let CompactLevel0Phase1Result { new_layers, deltas_to_compact, - } = tokio::task::spawn_blocking(move || { - let _g = span.entered(); - this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner) - }) - .await - .context("compact_level0_phase1 spawn_blocking") - .map_err(CompactionError::Other) - .and_then(|res| res)?; + } = self + .compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx) + .await?; if new_layers.is_empty() && deltas_to_compact.is_empty() { // nothing to do @@ -3705,7 +3704,7 @@ impl Timeline { .context("wait for layer upload ops to complete")?; } - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); for l in new_layers { @@ -3964,7 +3963,7 @@ impl Timeline { // 4. newer on-disk image layers cover the layer's whole key range // // TODO holding a write lock is too agressive and avoidable - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -4264,7 +4263,7 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let mut layers = self_clone.layers.write().unwrap(); + let mut layers = self_clone.layers.write().await; let mut updates = layers.batch_update(); let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); { @@ -4422,7 +4421,7 @@ impl Timeline { ) { let mut downloads = Vec::new(); { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; layers .iter_historic_layers() .filter_map(|l| l.downcast_remote_layer()) @@ -4524,8 +4523,8 @@ impl LocalLayerInfoForDiskUsageEviction { } impl Timeline { - pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { - let layers = self.layers.read().unwrap(); + pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { + let layers = self.layers.read().await; let mut max_layer_size: Option = None; let mut resident_layers = Vec::new(); @@ -4613,12 +4612,12 @@ impl<'a> TimelineWriter<'a> { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { - self.tl.put_value(key, lsn, value) + pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { + self.tl.put_value(key, lsn, value).await } - pub fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - self.tl.put_tombstone(key_range, lsn) + pub async fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { + self.tl.put_tombstone(key_range, lsn).await } /// Track the end of the latest digested WAL record. diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 1040dff63d..80c5210211 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -197,7 +197,7 @@ impl Timeline { // We don't want to hold the layer map lock during eviction. // So, we just need to deal with this. let candidates: Vec> = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { if hist_layer.is_remote_layer() { From 4385e0c2913e965aa93d8e7994591db7ef8e5ad8 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Tue, 13 Jun 2023 01:32:34 +0300 Subject: [PATCH 04/16] Return more RowDescription fields via proxy json endpoint As we aim to align client-side behavior with node-postgres, it's necessary for us to return these fields, because node-postgres does so as well. --- Cargo.lock | 10 +++++----- Cargo.toml | 12 ++++++------ proxy/src/http/sql_over_http.rs | 5 +++++ 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6856b9e3ac..71a6699c50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2770,7 +2770,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "bytes", "fallible-iterator", @@ -2783,7 +2783,7 @@ dependencies = [ [[package]] name = "postgres-native-tls" version = "0.5.0" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "native-tls", "tokio", @@ -2794,7 +2794,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "base64 0.20.0", "byteorder", @@ -2812,7 +2812,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "bytes", "fallible-iterator", @@ -4272,7 +4272,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index dc34705f8d..551a9dc783 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,11 +140,11 @@ env_logger = "0.10" log = "0.4" ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } -postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } +postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } +postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" } ## Other git libraries @@ -180,7 +180,7 @@ tonic-build = "0.9" # This is only needed for proxy's tests. # TODO: we should probably fork `tokio-postgres-rustls` instead. -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } # Changes the MAX_THREADS limit from 4096 to 32768. # This is a temporary workaround for using tracing from many threads in safekeepers code, diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs index 050f00dd7d..1007532a96 100644 --- a/proxy/src/http/sql_over_http.rs +++ b/proxy/src/http/sql_over_http.rs @@ -280,6 +280,11 @@ pub async fn handle( json!({ "name": Value::String(c.name().to_owned()), "dataTypeID": Value::Number(c.type_().oid().into()), + "tableID": c.table_oid(), + "columnID": c.column_id(), + "dataTypeSize": c.type_size(), + "dataTypeModifier": c.type_modifier(), + "format": "text", }) }) .collect::>() From a0b3990411071539e715ba8564b9c873dfd50d2b Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Tue, 13 Jun 2023 22:33:42 +0100 Subject: [PATCH 05/16] Retry data ingestion scripts on connection errors (#4382) ## Problem From time to time, we're catching a race condition when trying to upload perf or regression test results. Ref: - https://neondb.slack.com/archives/C03H1K0PGKH/p1685462717870759 - https://github.com/neondatabase/cloud/issues/3686 ## Summary of changes Wrap `psycopg2.connect` method with `@backoff.on_exception` contextmanager --- scripts/ingest_perf_test_result.py | 15 ++++++++++++++- scripts/ingest_regress_test_result.py | 14 +++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/scripts/ingest_perf_test_result.py b/scripts/ingest_perf_test_result.py index 1bfc907def..fc177b590e 100644 --- a/scripts/ingest_perf_test_result.py +++ b/scripts/ingest_perf_test_result.py @@ -1,12 +1,14 @@ #!/usr/bin/env python3 import argparse import json +import logging import os import sys from contextlib import contextmanager from datetime import datetime from pathlib import Path +import backoff import psycopg2 import psycopg2.extras @@ -35,9 +37,18 @@ def get_connection_cursor(): connstr = os.getenv("DATABASE_URL") if not connstr: err("DATABASE_URL environment variable is not set") - with psycopg2.connect(connstr, connect_timeout=30) as conn: + + @backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150) + def connect(connstr): + return psycopg2.connect(connstr, connect_timeout=30) + + conn = connect(connstr) + try: with conn.cursor() as cur: yield cur + finally: + if conn is not None: + conn.close() def create_table(cur): @@ -115,6 +126,7 @@ def main(): parser.add_argument( "--ingest", type=Path, + required=True, help="Path to perf test result file, or directory with perf test result files", ) parser.add_argument("--initdb", action="store_true", help="Initialuze database") @@ -140,4 +152,5 @@ def main(): if __name__ == "__main__": + logging.getLogger("backoff").addHandler(logging.StreamHandler()) main() diff --git a/scripts/ingest_regress_test_result.py b/scripts/ingest_regress_test_result.py index 974167483a..dff8e0cefa 100644 --- a/scripts/ingest_regress_test_result.py +++ b/scripts/ingest_regress_test_result.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 import argparse +import logging import os import re import sys from contextlib import contextmanager from pathlib import Path +import backoff import psycopg2 CREATE_TABLE = """ @@ -29,9 +31,18 @@ def get_connection_cursor(): connstr = os.getenv("DATABASE_URL") if not connstr: err("DATABASE_URL environment variable is not set") - with psycopg2.connect(connstr, connect_timeout=30) as conn: + + @backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150) + def connect(connstr): + return psycopg2.connect(connstr, connect_timeout=30) + + conn = connect(connstr) + try: with conn.cursor() as cur: yield cur + finally: + if conn is not None: + conn.close() def create_table(cur): @@ -101,4 +112,5 @@ def main(): if __name__ == "__main__": + logging.getLogger("backoff").addHandler(logging.StreamHandler()) main() From 3164ad7052fc8680538cc7659c9770d807177567 Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Tue, 13 Jun 2023 21:48:09 -0400 Subject: [PATCH 06/16] compute_ctl: Spec parser forward compatibility test (#4494) --- libs/compute_api/src/spec.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index c2ad30f86f..b3f0e9ba43 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -148,4 +148,14 @@ mod tests { let file = File::open("tests/cluster_spec.json").unwrap(); let _spec: ComputeSpec = serde_json::from_reader(file).unwrap(); } + + #[test] + fn parse_unknown_fields() { + // Forward compatibility test + let file = File::open("tests/cluster_spec.json").unwrap(); + let mut json: serde_json::Value = serde_json::from_reader(file).unwrap(); + let ob = json.as_object_mut().unwrap(); + ob.insert("unknown_field_123123123".into(), "hello".into()); + let _spec: ComputeSpec = serde_json::from_value(json).unwrap(); + } } From ebee8247b54dbea641215506abc722208f8095a7 Mon Sep 17 00:00:00 2001 From: Shany Pozin Date: Wed, 14 Jun 2023 15:38:01 +0300 Subject: [PATCH 07/16] Move s3 delete_objects to use chunks of 1000 OIDs (#4463) See https://github.com/neondatabase/neon/pull/4461#pullrequestreview-1474240712 --- libs/remote_storage/src/s3_bucket.rs | 40 ++++++++++++++------ libs/remote_storage/src/simulate_failures.rs | 13 ++++++- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 38e1bf00f8..dafb6dcb45 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -34,6 +34,8 @@ use crate::{ Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR, }; +const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000; + pub(super) mod metrics { use metrics::{register_int_counter_vec, IntCounterVec}; use once_cell::sync::Lazy; @@ -424,17 +426,33 @@ impl RemoteStorage for S3Bucket { delete_objects.push(obj_id); } - metrics::inc_delete_objects(paths.len() as u64); - self.client - .delete_objects() - .bucket(self.bucket_name.clone()) - .delete(Delete::builder().set_objects(Some(delete_objects)).build()) - .send() - .await - .map_err(|e| { - metrics::inc_delete_objects_fail(paths.len() as u64); - e - })?; + for chunk in delete_objects.chunks(MAX_DELETE_OBJECTS_REQUEST_SIZE) { + metrics::inc_delete_objects(chunk.len() as u64); + + let resp = self + .client + .delete_objects() + .bucket(self.bucket_name.clone()) + .delete(Delete::builder().set_objects(Some(chunk.to_vec())).build()) + .send() + .await; + + match resp { + Ok(resp) => { + if let Some(errors) = resp.errors { + metrics::inc_delete_objects_fail(errors.len() as u64); + return Err(anyhow::format_err!( + "Failed to delete {} objects", + errors.len() + )); + } + } + Err(e) => { + metrics::inc_delete_objects_fail(chunk.len() as u64); + return Err(e.into()); + } + } + } Ok(()) } diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 2f341bb29d..741c18bf6f 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -24,6 +24,7 @@ enum RemoteOp { Upload(RemotePath), Download(RemotePath), Delete(RemotePath), + DeleteObjects(Vec), } impl UnreliableWrapper { @@ -121,8 +122,18 @@ impl RemoteStorage for UnreliableWrapper { } async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { + self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?; + let mut error_counter = 0; for path in paths { - self.delete(path).await? + if (self.delete(path).await).is_err() { + error_counter += 1; + } + } + if error_counter > 0 { + return Err(anyhow::anyhow!( + "failed to delete {} objects", + error_counter + )); } Ok(()) } From 9484b96d7cd31e3d1f91f0feb3ecd2c5afff2ca3 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Wed, 14 Jun 2023 15:07:30 +0100 Subject: [PATCH 08/16] GitHub Autocomment: do not fail the job (#4478) ## Problem If the script fails to generate a test summary, the step also fails the job/workflow (despite this could be a non-fatal problem). ## Summary of changes - Separate JSON parsing and summarisation into separate functions - Wrap functions calling into try..catch block, add an error message to GitHub comment and do not fail the step - Make `scripts/comment-test-report.js` a CLI script that can be run locally (mock GitHub calls) to make it easier to debug issues locally --- scripts/comment-test-report.js | 191 ++++++++++++++++++++++++++------- 1 file changed, 150 insertions(+), 41 deletions(-) mode change 100644 => 100755 scripts/comment-test-report.js diff --git a/scripts/comment-test-report.js b/scripts/comment-test-report.js old mode 100644 new mode 100755 index a7fd5b0bef..432c78d1af --- a/scripts/comment-test-report.js +++ b/scripts/comment-test-report.js @@ -1,3 +1,5 @@ +#! /usr/bin/env node + // // The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch. // @@ -19,7 +21,7 @@ // }) // -// Analog of Python's defaultdict. +// Equivalent of Python's defaultdict. // // const dm = new DefaultMap(() => new DefaultMap(() => [])) // dm["firstKey"]["secondKey"].push("value") @@ -32,34 +34,7 @@ class DefaultMap extends Map { } } -module.exports = async ({ github, context, fetch, report }) => { - // Marker to find the comment in the subsequent runs - const startMarker = `` - // If we run the script in the PR or in the branch (main/release/...) - const isPullRequest = !!context.payload.pull_request - // Latest commit in PR or in the branch - const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha - // Let users know that the comment is updated automatically - const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${commitSha} at ${new Date().toISOString()} :recycle:
` - // GitHub bot id taken from (https://api.github.com/users/github-actions[bot]) - const githubActionsBotId = 41898282 - // Commend body itself - let commentBody = `${startMarker}\n` - - // Common parameters for GitHub API requests - const ownerRepoParams = { - owner: context.repo.owner, - repo: context.repo.repo, - } - - const {reportUrl, reportJsonUrl} = report - - if (!reportUrl || !reportJsonUrl) { - commentBody += `#### No tests were run or test report is not available\n` - commentBody += autoupdateNotice - return - } - +const parseReportJson = async ({ reportJsonUrl, fetch }) => { const suites = await (await fetch(reportJsonUrl)).json() // Allure distinguishes "failed" (with an assertion error) and "broken" (with any other error) tests. @@ -83,7 +58,7 @@ module.exports = async ({ github, context, fetch, report }) => { let buildType, pgVersion const match = test.name.match(/[\[-](?debug|release)-pg(?\d+)[-\]]/)?.groups if (match) { - ({buildType, pgVersion} = match) + ({ buildType, pgVersion } = match) } else { // It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance). console.info(`Cannot get BUILD_TYPE and Postgres Version from test name: "${test.name}", defaulting to "release" and "14"`) @@ -123,37 +98,68 @@ module.exports = async ({ github, context, fetch, report }) => { } } + return { + failedTests, + failedTestsCount, + passedTests, + passedTestsCount, + skippedTests, + skippedTestsCount, + flakyTests, + flakyTestsCount, + retriedTests, + pgVersions, + } +} + +const reportSummary = async (params) => { + const { + failedTests, + failedTestsCount, + passedTests, + passedTestsCount, + skippedTests, + skippedTestsCount, + flakyTests, + flakyTestsCount, + retriedTests, + pgVersions, + reportUrl, + } = params + + let summary = "" + const totalTestsCount = failedTestsCount + passedTestsCount + skippedTestsCount - commentBody += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n` + summary += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n` // Print test resuls from the newest to the oldest Postgres version for release and debug builds. for (const pgVersion of Array.from(pgVersions).sort().reverse()) { if (Object.keys(failedTests[pgVersion]).length > 0) { - commentBody += `#### Failures on Posgres ${pgVersion}\n\n` + summary += `#### Failures on Posgres ${pgVersion}\n\n` for (const [testName, tests] of Object.entries(failedTests[pgVersion])) { const links = [] for (const test of tests) { const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}` links.push(`[${test.buildType}](${allureLink})`) } - commentBody += `- \`${testName}\`: ${links.join(", ")}\n` + summary += `- \`${testName}\`: ${links.join(", ")}\n` } const testsToRerun = Object.values(failedTests[pgVersion]).map(x => x[0].name) const command = `DEFAULT_PG_VERSION=${pgVersion} scripts/pytest -k "${testsToRerun.join(" or ")}"` - commentBody += "```\n" - commentBody += `# Run failed on Postgres ${pgVersion} tests locally:\n` - commentBody += `${command}\n` - commentBody += "```\n" + summary += "```\n" + summary += `# Run failed on Postgres ${pgVersion} tests locally:\n` + summary += `${command}\n` + summary += "```\n" } } if (flakyTestsCount > 0) { - commentBody += `
\nFlaky tests (${flakyTestsCount})\n\n` + summary += `
\nFlaky tests (${flakyTestsCount})\n\n` for (const pgVersion of Array.from(pgVersions).sort().reverse()) { if (Object.keys(flakyTests[pgVersion]).length > 0) { - commentBody += `#### Postgres ${pgVersion}\n\n` + summary += `#### Postgres ${pgVersion}\n\n` for (const [testName, tests] of Object.entries(flakyTests[pgVersion])) { const links = [] for (const test of tests) { @@ -161,11 +167,57 @@ module.exports = async ({ github, context, fetch, report }) => { const status = test.status === "passed" ? ":white_check_mark:" : ":x:" links.push(`[${status} ${test.buildType}](${allureLink})`) } - commentBody += `- \`${testName}\`: ${links.join(", ")}\n` + summary += `- \`${testName}\`: ${links.join(", ")}\n` } } } - commentBody += "\n
\n" + summary += "\n
\n" + } + + return summary +} + +module.exports = async ({ github, context, fetch, report }) => { + // Marker to find the comment in the subsequent runs + const startMarker = `` + // If we run the script in the PR or in the branch (main/release/...) + const isPullRequest = !!context.payload.pull_request + // Latest commit in PR or in the branch + const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha + // Let users know that the comment is updated automatically + const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${commitSha} at ${new Date().toISOString()} :recycle:
` + // GitHub bot id taken from (https://api.github.com/users/github-actions[bot]) + const githubActionsBotId = 41898282 + // Commend body itself + let commentBody = `${startMarker}\n` + + // Common parameters for GitHub API requests + const ownerRepoParams = { + owner: context.repo.owner, + repo: context.repo.repo, + } + + const {reportUrl, reportJsonUrl} = report + + if (!reportUrl || !reportJsonUrl) { + commentBody += `#### No tests were run or test report is not available\n` + commentBody += autoupdateNotice + return + } + + try { + const parsed = await parseReportJson({ reportJsonUrl, fetch }) + commentBody += await reportSummary({ ...parsed, reportUrl }) + } catch (error) { + commentBody += `### [full report](${reportUrl})\n___\n` + commentBody += `#### Failed to create a summary for the test run: \n` + commentBody += "```\n" + commentBody += `${error.stack}\n` + commentBody += "```\n" + commentBody += "\nTo reproduce and debug the error locally run:\n" + commentBody += "```\n" + commentBody += `scripts/comment-test-report.js ${reportJsonUrl}` + commentBody += "\n```\n" } commentBody += autoupdateNotice @@ -207,3 +259,60 @@ module.exports = async ({ github, context, fetch, report }) => { }) } } + +// Equivalent of Python's `if __name__ == "__main__":` +// https://nodejs.org/docs/latest/api/modules.html#accessing-the-main-module +if (require.main === module) { + // Poor man's argument parsing: we expect the third argument is a JSON URL (0: node binary, 1: this script, 2: JSON url) + if (process.argv.length !== 3) { + console.error(`Unexpected number of arguments\nUsage: node ${process.argv[1]} `) + process.exit(1) + } + const jsonUrl = process.argv[2] + + try { + new URL(jsonUrl) + } catch (error) { + console.error(`Invalid URL: ${jsonUrl}\nUsage: node ${process.argv[1]} `) + process.exit(1) + } + + const htmlUrl = jsonUrl.replace("/data/suites.json", "/index.html") + + const githubMock = { + rest: { + issues: { + createComment: console.log, + listComments: async () => ({ data: [] }), + updateComment: console.log + }, + repos: { + createCommitComment: console.log, + listCommentsForCommit: async () => ({ data: [] }), + updateCommitComment: console.log + } + } + } + + const contextMock = { + repo: { + owner: 'testOwner', + repo: 'testRepo' + }, + payload: { + number: 42, + pull_request: null, + }, + sha: '0000000000000000000000000000000000000000', + } + + module.exports({ + github: githubMock, + context: contextMock, + fetch: fetch, + report: { + reportUrl: htmlUrl, + reportJsonUrl: jsonUrl, + } + }) +} From ee9a5bae43da5cac32cc71326f6e482ed5eeb389 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Wed, 14 Jun 2023 19:07:42 +0300 Subject: [PATCH 09/16] Filter only active timelines for compaction (#4487) Previously we may've included Stopping/Broken timelines here, which leads to errors in logs -> causes tests to sporadically fail resolves #4467 --- pageserver/src/tenant.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 5603bcef84..3ed4621112 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1462,7 +1462,13 @@ impl Tenant { let timelines = self.timelines.lock().unwrap(); let timelines_to_compact = timelines .iter() - .map(|(timeline_id, timeline)| (*timeline_id, timeline.clone())) + .filter_map(|(timeline_id, timeline)| { + if timeline.is_active() { + Some((*timeline_id, timeline.clone())) + } else { + None + } + }) .collect::>(); drop(timelines); timelines_to_compact From a7a0c3cd278c485a027620cfd373d6b9ca7e6c0c Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Wed, 14 Jun 2023 19:24:46 +0300 Subject: [PATCH 10/16] Invalidate proxy cache in http-over-sql (#4500) HTTP queries failed with errors `error connecting to server: failed to lookup address information: Name or service not known\n\nCaused by:\n failed to lookup address information: Name or service not known` The fix reused cache invalidation logic in proxy from usual postgres connections and added it to HTTP-over-SQL queries. Also removed a timeout for HTTP request, because it almost never worked on staging (50s+ time just to start the compute), and we can have the similar case in production. Should be ok, since we have a limits for the requests and responses. --- proxy/src/http/sql_over_http.rs | 136 +++++++++++++++++++++++--------- proxy/src/http/websocket.rs | 12 +-- proxy/src/proxy.rs | 39 ++++----- 3 files changed, 120 insertions(+), 67 deletions(-) diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs index 1007532a96..e8ad2d04f3 100644 --- a/proxy/src/http/sql_over_http.rs +++ b/proxy/src/http/sql_over_http.rs @@ -1,5 +1,6 @@ use futures::pin_mut; use futures::StreamExt; +use futures::TryFutureExt; use hyper::body::HttpBody; use hyper::http::HeaderName; use hyper::http::HeaderValue; @@ -11,8 +12,13 @@ use serde_json::Value; use tokio_postgres::types::Kind; use tokio_postgres::types::Type; use tokio_postgres::Row; +use tracing::error; +use tracing::info; +use tracing::instrument; use url::Url; +use crate::proxy::invalidate_cache; +use crate::proxy::NUM_RETRIES_WAKE_COMPUTE; use crate::{auth, config::ProxyConfig, console}; #[derive(serde::Deserialize)] @@ -90,10 +96,17 @@ fn json_array_to_pg_array(value: &Value) -> Result, serde_json::E } } +struct ConnInfo { + username: String, + dbname: String, + hostname: String, + password: String, +} + fn get_conn_info( headers: &HeaderMap, sni_hostname: Option, -) -> Result<(String, String, String, String), anyhow::Error> { +) -> Result { let connection_string = headers .get("Neon-Connection-String") .ok_or(anyhow::anyhow!("missing connection string"))? @@ -146,12 +159,12 @@ fn get_conn_info( } } - Ok(( - username.to_owned(), - dbname.to_owned(), - hostname.to_owned(), - password.to_owned(), - )) + Ok(ConnInfo { + username: username.to_owned(), + dbname: dbname.to_owned(), + hostname: hostname.to_owned(), + password: password.to_owned(), + }) } // TODO: return different http error codes @@ -164,10 +177,10 @@ pub async fn handle( // Determine the destination and connection params // let headers = request.headers(); - let (username, dbname, hostname, password) = get_conn_info(headers, sni_hostname)?; + let conn_info = get_conn_info(headers, sni_hostname)?; let credential_params = StartupMessageParams::new([ - ("user", &username), - ("database", &dbname), + ("user", &conn_info.username), + ("database", &conn_info.dbname), ("application_name", APP_NAME), ]); @@ -186,21 +199,20 @@ pub async fn handle( let creds = config .auth_backend .as_ref() - .map(|_| auth::ClientCredentials::parse(&credential_params, Some(&hostname), common_names)) + .map(|_| { + auth::ClientCredentials::parse( + &credential_params, + Some(&conn_info.hostname), + common_names, + ) + }) .transpose()?; let extra = console::ConsoleReqExtra { session_id: uuid::Uuid::new_v4(), application_name: Some(APP_NAME), }; - let node = creds.wake_compute(&extra).await?.expect("msg"); - let conf = node.value.config; - let port = *conf.get_ports().first().expect("no port"); - let host = match conf.get_hosts().first().expect("no host") { - tokio_postgres::config::Host::Tcp(host) => host, - tokio_postgres::config::Host::Unix(_) => { - return Err(anyhow::anyhow!("unix socket is not supported")); - } - }; + + let mut node_info = creds.wake_compute(&extra).await?.expect("msg"); let request_content_length = match request.body().size_hint().upper() { Some(v) => v, @@ -220,28 +232,10 @@ pub async fn handle( let QueryData { query, params } = serde_json::from_slice(&body)?; let query_params = json_to_pg_text(params)?; - // - // Connenct to the destination - // - let (client, connection) = tokio_postgres::Config::new() - .host(host) - .port(port) - .user(&username) - .password(&password) - .dbname(&dbname) - .max_backend_message_size(MAX_RESPONSE_SIZE) - .connect(tokio_postgres::NoTls) - .await?; - - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - // // Now execute the query and return the result // + let client = connect_to_compute(&mut node_info, &extra, &creds, &conn_info).await?; let row_stream = client.query_raw_txt(query, query_params).await?; // Manually drain the stream into a vector to leave row_stream hanging @@ -308,6 +302,70 @@ pub async fn handle( })) } +/// This function is a copy of `connect_to_compute` from `src/proxy.rs` with +/// the difference that it uses `tokio_postgres` for the connection. +#[instrument(skip_all)] +async fn connect_to_compute( + node_info: &mut console::CachedNodeInfo, + extra: &console::ConsoleReqExtra<'_>, + creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, + conn_info: &ConnInfo, +) -> anyhow::Result { + let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE; + + loop { + match connect_to_compute_once(node_info, conn_info).await { + Err(e) if num_retries > 0 => { + info!("compute node's state has changed; requesting a wake-up"); + match creds.wake_compute(extra).await? { + // Update `node_info` and try one more time. + Some(new) => { + *node_info = new; + } + // Link auth doesn't work that way, so we just exit. + None => return Err(e), + } + } + other => return other, + } + + num_retries -= 1; + info!("retrying after wake-up ({num_retries} attempts left)"); + } +} + +async fn connect_to_compute_once( + node_info: &console::CachedNodeInfo, + conn_info: &ConnInfo, +) -> anyhow::Result { + let mut config = (*node_info.config).clone(); + + let (client, connection) = config + .user(&conn_info.username) + .password(&conn_info.password) + .dbname(&conn_info.dbname) + .max_backend_message_size(MAX_RESPONSE_SIZE) + .connect(tokio_postgres::NoTls) + .inspect_err(|e: &tokio_postgres::Error| { + error!( + "failed to connect to compute node hosts={:?} ports={:?}: {}", + node_info.config.get_hosts(), + node_info.config.get_ports(), + e + ); + invalidate_cache(node_info) + }) + .await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + error!("connection error: {}", e); + } + }); + + Ok(client) +} + // // Convert postgres row with text-encoded values to JSON object // diff --git a/proxy/src/http/websocket.rs b/proxy/src/http/websocket.rs index fbb602e3d2..9f467aceb7 100644 --- a/proxy/src/http/websocket.rs +++ b/proxy/src/http/websocket.rs @@ -26,7 +26,6 @@ use tls_listener::TlsListener; use tokio::{ io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}, net::TcpListener, - select, }; use tokio_util::sync::CancellationToken; use tracing::{error, info, info_span, warn, Instrument}; @@ -193,14 +192,9 @@ async fn ws_handler( // TODO: that deserves a refactor as now this function also handles http json client besides websockets. // Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead. } else if request.uri().path() == "/sql" && request.method() == Method::POST { - let result = select! { - _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => { - Err(anyhow::anyhow!("Query timed out")) - } - response = sql_over_http::handle(config, request, sni_hostname) => { - response - } - }; + let result = sql_over_http::handle(config, request, sni_hostname) + .instrument(info_span!("sql-over-http")) + .await; let status_code = match result { Ok(_) => StatusCode::OK, Err(_) => StatusCode::BAD_REQUEST, diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index cf2dd000db..8efb7005c8 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -22,7 +22,7 @@ use tracing::{error, info, warn}; use utils::measured_stream::MeasuredStream; /// Number of times we should retry the `/proxy_wake_compute` http request. -const NUM_RETRIES_WAKE_COMPUTE: usize = 1; +pub const NUM_RETRIES_WAKE_COMPUTE: usize = 1; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; @@ -283,34 +283,35 @@ async fn handshake( } } +/// If we couldn't connect, a cached connection info might be to blame +/// (e.g. the compute node's address might've changed at the wrong time). +/// Invalidate the cache entry (if any) to prevent subsequent errors. +#[tracing::instrument(name = "invalidate_cache", skip_all)] +pub fn invalidate_cache(node_info: &console::CachedNodeInfo) { + let is_cached = node_info.cached(); + if is_cached { + warn!("invalidating stalled compute node info cache entry"); + node_info.invalidate(); + } + + let label = match is_cached { + true => "compute_cached", + false => "compute_uncached", + }; + NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); +} + /// Try to connect to the compute node once. #[tracing::instrument(name = "connect_once", skip_all)] async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, ) -> Result { - // If we couldn't connect, a cached connection info might be to blame - // (e.g. the compute node's address might've changed at the wrong time). - // Invalidate the cache entry (if any) to prevent subsequent errors. - let invalidate_cache = |_: &compute::ConnectionError| { - let is_cached = node_info.cached(); - if is_cached { - warn!("invalidating stalled compute node info cache entry"); - node_info.invalidate(); - } - - let label = match is_cached { - true => "compute_cached", - false => "compute_uncached", - }; - NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); - }; - let allow_self_signed_compute = node_info.allow_self_signed_compute; node_info .config .connect(allow_self_signed_compute) - .inspect_err(invalidate_cache) + .inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info)) .await } From cd3faa8c0ccef3a80ff04d5582393450d6693fd6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 14 Jun 2023 19:04:22 +0200 Subject: [PATCH 11/16] test_basic_eviction: avoid some sources of flakiness (#4504) We've seen the download_layer() call return 304 in prod because of a spurious on-demand download caused by a GetPage request from compute. Avoid these and some other sources of on-demand downloads by shutting down compute, SKs, and by disabling background loops. CF https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4498/5258914461/index.html#suites/2599693fa27db8427603ba822bcf2a20/357808fd552fede3 --- test_runner/regress/test_layer_eviction.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/test_runner/regress/test_layer_eviction.py b/test_runner/regress/test_layer_eviction.py index a96532c0d8..b22e545f20 100644 --- a/test_runner/regress/test_layer_eviction.py +++ b/test_runner/regress/test_layer_eviction.py @@ -24,7 +24,13 @@ def test_basic_eviction( test_name="test_download_remote_layers_api", ) - env = neon_env_builder.init_start() + env = neon_env_builder.init_start( + initial_tenant_conf={ + # disable gc and compaction background loops because they perform on-demand downloads + "gc_period": "0s", + "compaction_period": "0s", + } + ) client = env.pageserver.http_client() endpoint = env.endpoints.create_start("main") @@ -47,6 +53,11 @@ def test_basic_eviction( client.timeline_checkpoint(tenant_id, timeline_id) wait_for_upload(client, tenant_id, timeline_id, current_lsn) + # disable compute & sks to avoid on-demand downloads by walreceiver / getpage + endpoint.stop() + for sk in env.safekeepers: + sk.stop() + timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) initial_local_layers = sorted( list(filter(lambda path: path.name != "metadata", timeline_path.glob("*"))) From 94f315d490af8f3dc29f291b34b95f86678843ac Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Wed, 14 Jun 2023 19:03:09 +0100 Subject: [PATCH 12/16] Remove neon-image-depot job (#4506) ## Problem `neon-image-depot` is an experimental job we use to compare with the main `neon-image` job. But it's not stable and right now we don't have the capacity to properly fix and evaluate it. We can come back to this later. ## Summary of changes Remove `neon-image-depot` job --- .github/workflows/build_and_test.yml | 45 ---------------------------- 1 file changed, 45 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 897e1a7aad..471dc68df9 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -623,51 +623,6 @@ jobs: - name: Cleanup ECR folder run: rm -rf ~/.ecr - - neon-image-depot: - # For testing this will run side-by-side for a few merges. - # This action is not really optimized yet, but gets the job done - runs-on: [ self-hosted, gen3, large ] - needs: [ tag ] - container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned - permissions: - contents: read - id-token: write - - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - submodules: true - fetch-depth: 0 - - - name: Setup go - uses: actions/setup-go@v3 - with: - go-version: '1.19' - - - name: Set up Depot CLI - uses: depot/setup-action@v1 - - - name: Install Crane & ECR helper - run: go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0 - - - name: Configure ECR login - run: | - mkdir /github/home/.docker/ - echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json - - - name: Build and push - uses: depot/build-push-action@v1 - with: - # if no depot.json file is at the root of your repo, you must specify the project id - project: nrdv0s4kcs - push: true - tags: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:depot-${{needs.tag.outputs.build-tag}} - build-args: | - GIT_VERSION=${{ github.sha }} - REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com - compute-tools-image: runs-on: [ self-hosted, gen3, large ] needs: [ tag ] From 2252c5c282e8463b0f1dc1d9c7484e50706392e9 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 14 Jun 2023 17:12:34 -0400 Subject: [PATCH 13/16] metrics: convert some metrics to pageserver-level (#4490) ## Problem Some metrics are better to be observed at page-server level. Otherwise, as we have a lot of tenants in production, we cannot do a sum b/c Prometheus has limit on how many time series we can aggregate. This also helps reduce metrics scraping size. ## Summary of changes Some integration tests are likely not to pass as it will check the existence of some metrics. Waiting for CI complete and fix them. Metrics downgraded: page cache hit (where we are likely to have a page-server level page cache in the future instead of per-tenant), and reconstruct time (this would better be tenant-level, as we have one pg replayer for each tenant, but now we make it page-server level as we do not need that fine-grained data). --------- Signed-off-by: Alex Chi --- pageserver/src/metrics.rs | 41 +++++++++-------------------- pageserver/src/tenant/timeline.rs | 14 +++++----- test_runner/fixtures/metrics.py | 10 +++---- test_runner/regress/test_tenants.py | 2 +- 4 files changed, 25 insertions(+), 42 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index cc444c479a..43d06db6d8 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1,4 +1,3 @@ -use metrics::core::{AtomicU64, GenericCounter}; use metrics::{ register_counter_vec, register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec, @@ -95,21 +94,19 @@ static READ_NUM_FS_LAYERS: Lazy = Lazy::new(|| { }); // Metrics collected on operations on the storage repository. -static RECONSTRUCT_TIME: Lazy = Lazy::new(|| { - register_histogram_vec!( +pub static RECONSTRUCT_TIME: Lazy = Lazy::new(|| { + register_histogram!( "pageserver_getpage_reconstruct_seconds", - "Time spent in reconstruct_value", - &["tenant_id", "timeline_id"], + "Time spent in reconstruct_value (reconstruct a page from deltas)", CRITICAL_OP_BUCKETS.into(), ) .expect("failed to define a metric") }); -static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy = Lazy::new(|| { + register_int_counter!( "pageserver_materialized_cache_hits_direct_total", "Number of cache hits from materialized page cache without redo", - &["tenant_id", "timeline_id"] ) .expect("failed to define a metric") }); @@ -124,11 +121,10 @@ static GET_RECONSTRUCT_DATA_TIME: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); -static MATERIALIZED_PAGE_CACHE_HIT: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static MATERIALIZED_PAGE_CACHE_HIT: Lazy = Lazy::new(|| { + register_int_counter!( "pageserver_materialized_cache_hits_total", "Number of cache hits from materialized page cache", - &["tenant_id", "timeline_id"] ) .expect("failed to define a metric") }); @@ -752,10 +748,7 @@ impl StorageTimeMetrics { pub struct TimelineMetrics { tenant_id: String, timeline_id: String, - pub reconstruct_time_histo: Histogram, pub get_reconstruct_data_time_histo: Histogram, - pub materialized_page_cache_hit_counter: GenericCounter, - pub materialized_page_cache_hit_upon_request_counter: GenericCounter, pub flush_time_histo: StorageTimeMetrics, pub compact_time_histo: StorageTimeMetrics, pub create_images_time_histo: StorageTimeMetrics, @@ -783,15 +776,9 @@ impl TimelineMetrics { ) -> Self { let tenant_id = tenant_id.to_string(); let timeline_id = timeline_id.to_string(); - let reconstruct_time_histo = RECONSTRUCT_TIME - .get_metric_with_label_values(&[&tenant_id, &timeline_id]) - .unwrap(); let get_reconstruct_data_time_histo = GET_RECONSTRUCT_DATA_TIME .get_metric_with_label_values(&[&tenant_id, &timeline_id]) .unwrap(); - let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT - .get_metric_with_label_values(&[&tenant_id, &timeline_id]) - .unwrap(); let flush_time_histo = StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id); let compact_time_histo = @@ -833,19 +820,18 @@ impl TimelineMetrics { let read_num_fs_layers = READ_NUM_FS_LAYERS .get_metric_with_label_values(&[&tenant_id, &timeline_id]) .unwrap(); - let materialized_page_cache_hit_upon_request_counter = MATERIALIZED_PAGE_CACHE_HIT_DIRECT - .get_metric_with_label_values(&[&tenant_id, &timeline_id]) - .unwrap(); let evictions_with_low_residence_duration = evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id); + // TODO(chi): remove this once we remove Lazy for all metrics. Otherwise this will not appear in the exporter + // and integration test will error. + MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get(); + MATERIALIZED_PAGE_CACHE_HIT.get(); + TimelineMetrics { tenant_id, timeline_id, - reconstruct_time_histo, get_reconstruct_data_time_histo, - materialized_page_cache_hit_counter, - materialized_page_cache_hit_upon_request_counter, flush_time_histo, compact_time_histo, create_images_time_histo, @@ -872,10 +858,7 @@ impl Drop for TimelineMetrics { fn drop(&mut self) { let tenant_id = &self.tenant_id; let timeline_id = &self.timeline_id; - let _ = RECONSTRUCT_TIME.remove_label_values(&[tenant_id, timeline_id]); let _ = GET_RECONSTRUCT_DATA_TIME.remove_label_values(&[tenant_id, timeline_id]); - let _ = MATERIALIZED_PAGE_CACHE_HIT.remove_label_values(&[tenant_id, timeline_id]); - let _ = MATERIALIZED_PAGE_CACHE_HIT_DIRECT.remove_label_values(&[tenant_id, timeline_id]); let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]); let _ = WAIT_LSN_TIME.remove_label_values(&[tenant_id, timeline_id]); let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d642090996..855896c832 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -47,7 +47,10 @@ use crate::tenant::{ use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum}; -use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS}; +use crate::metrics::{ + TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT, + RECONSTRUCT_TIME, UNEXPECTED_ONDEMAND_DOWNLOADS, +}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError}; @@ -539,9 +542,7 @@ impl Timeline { match cached_lsn.cmp(&lsn) { Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check Ordering::Equal => { - self.metrics - .materialized_page_cache_hit_upon_request_counter - .inc(); + MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc(); return Ok(cached_img); // exact LSN match, return the image } Ordering::Greater => { @@ -563,8 +564,7 @@ impl Timeline { .await?; timer.stop_and_record(); - self.metrics - .reconstruct_time_histo + RECONSTRUCT_TIME .observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state)) } @@ -2387,7 +2387,7 @@ impl Timeline { ValueReconstructResult::Continue => { // If we reached an earlier cached page image, we're done. if cont_lsn == cached_lsn + 1 { - self.metrics.materialized_page_cache_hit_counter.inc_by(1); + MATERIALIZED_PAGE_CACHE_HIT.inc_by(1); return Ok(()); } if prev_lsn <= cont_lsn { diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index b4c237cfa6..d55d159037 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -57,14 +57,16 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = ( "libmetrics_launch_timestamp", "libmetrics_build_info", "libmetrics_tracing_event_count_total", + "pageserver_materialized_cache_hits_total", + "pageserver_materialized_cache_hits_direct_total", + "pageserver_getpage_reconstruct_seconds_bucket", + "pageserver_getpage_reconstruct_seconds_count", + "pageserver_getpage_reconstruct_seconds_sum", ) PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = ( "pageserver_current_logical_size", "pageserver_resident_physical_size", - "pageserver_getpage_reconstruct_seconds_bucket", - "pageserver_getpage_reconstruct_seconds_count", - "pageserver_getpage_reconstruct_seconds_sum", "pageserver_getpage_get_reconstruct_data_seconds_bucket", "pageserver_getpage_get_reconstruct_data_seconds_count", "pageserver_getpage_get_reconstruct_data_seconds_sum", @@ -73,8 +75,6 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = ( "pageserver_io_operations_seconds_count", "pageserver_io_operations_seconds_sum", "pageserver_last_record_lsn", - "pageserver_materialized_cache_hits_total", - "pageserver_materialized_cache_hits_direct_total", "pageserver_read_num_fs_layers_bucket", "pageserver_read_num_fs_layers_count", "pageserver_read_num_fs_layers_sum", diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index aef2df4932..4a1d659be3 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -213,7 +213,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder): # Test (a subset of) pageserver global metrics for metric in PAGESERVER_GLOBAL_METRICS: ps_samples = ps_metrics.query_all(metric, {}) - assert len(ps_samples) > 0 + assert len(ps_samples) > 0, f"expected at least one sample for {metric}" for sample in ps_samples: labels = ",".join([f'{key}="{value}"' for key, value in sample.labels.items()]) log.info(f"{sample.name}{{{labels}}} {sample.value}") From e60b70b4759406283eebf4d6f16c458512b2b63f Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Thu, 15 Jun 2023 13:01:06 +0100 Subject: [PATCH 14/16] Fix data ingestion scripts (#4515) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem When I switched `psycopg2.connect` from context manager to a regular function call in https://github.com/neondatabase/neon/pull/4382 I embarrassingly forgot about commit, so it doesn't really put data into DB 😞 ## Summary of changes - Enable autocommit for data ingestion scripts --- scripts/ingest_perf_test_result.py | 4 +++- scripts/ingest_regress_test_result.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/scripts/ingest_perf_test_result.py b/scripts/ingest_perf_test_result.py index fc177b590e..35a1e29720 100644 --- a/scripts/ingest_perf_test_result.py +++ b/scripts/ingest_perf_test_result.py @@ -40,7 +40,9 @@ def get_connection_cursor(): @backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150) def connect(connstr): - return psycopg2.connect(connstr, connect_timeout=30) + conn = psycopg2.connect(connstr, connect_timeout=30) + conn.autocommit = True + return conn conn = connect(connstr) try: diff --git a/scripts/ingest_regress_test_result.py b/scripts/ingest_regress_test_result.py index dff8e0cefa..39c1c02941 100644 --- a/scripts/ingest_regress_test_result.py +++ b/scripts/ingest_regress_test_result.py @@ -34,7 +34,9 @@ def get_connection_cursor(): @backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150) def connect(connstr): - return psycopg2.connect(connstr, connect_timeout=30) + conn = psycopg2.connect(connstr, connect_timeout=30) + conn.autocommit = True + return conn conn = connect(connstr) try: From 76413a0fb8df249a3ea7ae82f2766c50ea6e980b Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 15 Jun 2023 15:26:59 +0300 Subject: [PATCH 15/16] Revert reconnect_timeout to improve performance (#4512) Default value for `wal_acceptor_reconnect_timeout` was changed in https://github.com/neondatabase/neon/pull/4428 and it affected performance up to 20% in some cases. Revert the value back. --- pgxn/neon/walproposer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 64d980d2e4..8d82de6dc4 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -257,7 +257,7 @@ nwp_register_gucs(void) "Walproposer reconnects to offline safekeepers once in this interval.", NULL, &wal_acceptor_reconnect_timeout, - 5000, 0, INT_MAX, /* default, min, max */ + 1000, 0, INT_MAX, /* default, min, max */ PGC_SIGHUP, /* context */ GUC_UNIT_MS, /* flags */ NULL, NULL, NULL); From 472cc17b7aba4f78bc7a71a2c04d2e7cb8b696d8 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Thu, 15 Jun 2023 17:30:12 +0300 Subject: [PATCH 16/16] propagate lock guard to background deletion task (#4495) ## Problem 1. During the rollout we got a panic: "timeline that we were deleting was concurrently removed from 'timelines' map" that was caused by lock guard not being propagated to the background part of the deletion. Existing test didnt catch it because failpoint that was used for verification was placed earlier prior to background task spawning. 2. When looking at surrounding code one more bug was detected. We removed timeline from the map before deletion is finished, which breaks client retry logic, because it will indicate 404 before actual deletion is completed which can lead to client stopping its retry poll earlier. ## Summary of changes 1. Carry the lock guard over to background deletion. Ensure existing test case fails without applied patch (second deletion becomes stuck without it, which eventually leads to a test failure). 2. Move delete_all call earlier so timeline is removed from the map is the last thing done during deletion. Additionally I've added timeline_id to the `update_gc_info` span, because `debug_assert_current_span_has_tenant_and_timeline_id` in `download_remote_layer` was firing when `update_gc_info` lead to on-demand downloads via `find_lsn_for_timestamp` (caught by @problame). This is not directly related to the PR but fixes possible flakiness. Another smaller set of changes involves deletion wrapper used in python tests. Now there is a simpler wrapper that waits for deletions to complete `timeline_delete_wait_completed`. Most of the test_delete_timeline.py tests make negative tests, i.e., "does ps_http.timeline_delete() fail in this and that scenario". These can be left alone. Other places when we actually do the deletions, we need to use the helper that polls for completion. Discussion https://neondb.slack.com/archives/C03F5SM1N02/p1686668007396639 resolves #4496 --------- Co-authored-by: Christian Schwarz --- pageserver/src/tenant.rs | 60 +++++--- .../src/tenant/remote_timeline_client.rs | 28 ++-- pageserver/src/tenant/timeline.rs | 1 + test_runner/fixtures/neon_fixtures.py | 2 + test_runner/fixtures/pageserver/http.py | 5 + test_runner/fixtures/pageserver/utils.py | 35 +++-- test_runner/regress/test_compatibility.py | 8 +- test_runner/regress/test_import.py | 8 +- test_runner/regress/test_remote_storage.py | 7 +- test_runner/regress/test_tenant_size.py | 5 +- test_runner/regress/test_tenant_tasks.py | 8 +- test_runner/regress/test_tenants.py | 6 +- test_runner/regress/test_timeline_delete.py | 130 ++++++++---------- test_runner/regress/test_timeline_size.py | 3 +- test_runner/regress/test_wal_acceptor.py | 16 ++- 15 files changed, 184 insertions(+), 138 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 3ed4621112..7fdd047c96 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -473,6 +473,14 @@ pub(crate) enum ShutdownError { AlreadyStopping, } +struct DeletionGuard(OwnedMutexGuard); + +impl DeletionGuard { + fn is_deleted(&self) -> bool { + *self.0 + } +} + impl Tenant { /// Yet another helper for timeline initialization. /// Contains the common part of `load_local_timeline` and `load_remote_timeline`. @@ -1138,7 +1146,11 @@ impl Tenant { ) .context("create_timeline_struct")?; - let guard = Arc::clone(&timeline.delete_lock).lock_owned().await; + let guard = DeletionGuard( + Arc::clone(&timeline.delete_lock) + .try_lock_owned() + .expect("cannot happen because we're the only owner"), + ); // Note: here we even skip populating layer map. Timeline is essentially uninitialized. // RemoteTimelineClient is the only functioning part. @@ -1549,6 +1561,7 @@ impl Tenant { &self, timeline_id: TimelineId, timeline: Arc, + guard: DeletionGuard, ) -> anyhow::Result<()> { { // Grab the layer_removal_cs lock, and actually perform the deletion. @@ -1621,6 +1634,25 @@ impl Tenant { Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))? }); + if let Some(remote_client) = &timeline.remote_client { + remote_client.delete_all().await.context("delete_all")? + }; + + // Have a failpoint that can use the `pause` failpoint action. + // We don't want to block the executor thread, hence, spawn_blocking + await. + if cfg!(feature = "testing") { + tokio::task::spawn_blocking({ + let current = tracing::Span::current(); + move || { + let _entered = current.entered(); + tracing::info!("at failpoint in_progress_delete"); + fail::fail_point!("in_progress_delete"); + } + }) + .await + .expect("spawn_blocking"); + } + { // Remove the timeline from the map. let mut timelines = self.timelines.lock().unwrap(); @@ -1641,12 +1673,7 @@ impl Tenant { drop(timelines); } - let remote_client = match &timeline.remote_client { - Some(remote_client) => remote_client, - None => return Ok(()), - }; - - remote_client.delete_all().await?; + drop(guard); Ok(()) } @@ -1694,23 +1721,18 @@ impl Tenant { timeline = Arc::clone(timeline_entry.get()); // Prevent two tasks from trying to delete the timeline at the same time. - // - // XXX: We should perhaps return an HTTP "202 Accepted" to signal that the caller - // needs to poll until the operation has finished. But for now, we return an - // error, because the control plane knows to retry errors. - delete_lock_guard = - Arc::clone(&timeline.delete_lock) - .try_lock_owned() - .map_err(|_| { + DeletionGuard(Arc::clone(&timeline.delete_lock).try_lock_owned().map_err( + |_| { DeleteTimelineError::Other(anyhow::anyhow!( "timeline deletion is already in progress" )) - })?; + }, + )?); // If another task finished the deletion just before we acquired the lock, // return success. - if *delete_lock_guard { + if delete_lock_guard.is_deleted() { return Ok(()); } @@ -1784,7 +1806,7 @@ impl Tenant { self: Arc, timeline_id: TimelineId, timeline: Arc, - _guard: OwnedMutexGuard, + guard: DeletionGuard, ) { let tenant_id = self.tenant_id; let timeline_clone = Arc::clone(&timeline); @@ -1797,7 +1819,7 @@ impl Tenant { "timeline_delete", false, async move { - if let Err(err) = self.delete_timeline(timeline_id, timeline).await { + if let Err(err) = self.delete_timeline(timeline_id, timeline, guard).await { error!("Error: {err:#}"); timeline_clone.set_broken(err.to_string()) }; diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 2c84c59dcb..8db2bc4eb2 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -753,22 +753,18 @@ impl RemoteTimelineClient { // Have a failpoint that can use the `pause` failpoint action. // We don't want to block the executor thread, hence, spawn_blocking + await. - #[cfg(feature = "testing")] - tokio::task::spawn_blocking({ - let current = tracing::Span::current(); - move || { - let _entered = current.entered(); - tracing::info!( - "at failpoint persist_index_part_with_deleted_flag_after_set_before_upload_pause" - ); - fail::fail_point!( - "persist_index_part_with_deleted_flag_after_set_before_upload_pause" - ); - } - }) - .await - .expect("spawn_blocking"); - + if cfg!(feature = "testing") { + tokio::task::spawn_blocking({ + let current = tracing::Span::current(); + move || { + let _entered = current.entered(); + tracing::info!("at failpoint persist_deleted_index_part"); + fail::fail_point!("persist_deleted_index_part"); + } + }) + .await + .expect("spawn_blocking"); + } upload::upload_index_part( self.conf, &self.storage_impl, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 855896c832..d42fdf5e55 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3791,6 +3791,7 @@ impl Timeline { /// for example. The caller should hold `Tenant::gc_cs` lock to ensure /// that. /// + #[instrument(skip_all, fields(timline_id=%self.timeline_id))] pub(super) async fn update_gc_info( &self, retain_lsns: Vec, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a8610e24df..64c71d2a59 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1631,6 +1631,8 @@ class NeonPageserver(PgProtocol): r".*ERROR.*ancestor timeline \S+ is being stopped", # this is expected given our collaborative shutdown approach for the UploadQueue ".*Compaction failed, retrying in .*: queue is in state Stopped.*", + # Pageserver timeline deletion should be polled until it gets 404, so ignore it globally + ".*Error processing HTTP request: NotFound: Timeline .* was not found", ] def start( diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index f258a3a24d..5c4f5177d0 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -342,6 +342,11 @@ class PageserverHttpClient(requests.Session): return res_json def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs): + """ + Note that deletion is not instant, it is scheduled and performed mostly in the background. + So if you need to wait for it to complete use `timeline_delete_wait_completed`. + For longer description consult with pageserver openapi spec. + """ res = self.delete( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", **kwargs ) diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 83880abc77..ad89ebad00 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -193,19 +193,30 @@ def wait_for_upload_queue_empty( time.sleep(0.2) -def assert_timeline_detail_404( +def wait_timeline_detail_404( + pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId +): + last_exc = None + for _ in range(2): + time.sleep(0.250) + try: + data = pageserver_http.timeline_detail(tenant_id, timeline_id) + log.error(f"detail {data}") + except PageserverApiException as e: + log.debug(e) + if e.status_code == 404: + return + + last_exc = e + + raise last_exc or RuntimeError(f"Timeline wasnt deleted in time, state: {data['state']}") + + +def timeline_delete_wait_completed( pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId, + **delete_args, ): - """Asserts that timeline_detail returns 404, or dumps the detail.""" - try: - data = pageserver_http.timeline_detail(tenant_id, timeline_id) - log.error(f"detail {data}") - except PageserverApiException as e: - log.error(e) - if e.status_code == 404: - return - else: - raise - raise Exception("detail succeeded (it should return 404)") + pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args) + wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id) diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 2635dbd93c..61f86dc3ce 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -15,7 +15,11 @@ from fixtures.neon_fixtures import ( PortDistributor, ) from fixtures.pageserver.http import PageserverHttpClient -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + timeline_delete_wait_completed, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.pg_version import PgVersion from fixtures.types import Lsn from pytest import FixtureRequest @@ -417,7 +421,7 @@ def check_neon_works( ) shutil.rmtree(repo_dir / "local_fs_remote_storage") - pageserver_http.timeline_delete(tenant_id, timeline_id) + timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id) pageserver_http.timeline_create(pg_version, tenant_id, timeline_id) pg_bin.run( ["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"] diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 5c3948b027..141c69b230 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -14,7 +14,11 @@ from fixtures.neon_fixtures import ( NeonEnvBuilder, PgBin, ) -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + timeline_delete_wait_completed, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import subprocess_capture @@ -151,7 +155,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build ".*files not bound to index_file.json, proceeding with their deletion.*" ) - client.timeline_delete(tenant, timeline) + timeline_delete_wait_completed(client, tenant, timeline) # Importing correct backup works import_tar(base_tar, wal_tar) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 11ac9e2555..f2b954a822 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -20,7 +20,7 @@ from fixtures.neon_fixtures import ( ) from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pageserver.utils import ( - assert_timeline_detail_404, + timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, wait_until_tenant_active, @@ -597,14 +597,11 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue( env.pageserver.allowed_errors.append( ".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping" ) - client.timeline_delete(tenant_id, timeline_id) - env.pageserver.allowed_errors.append(f".*Timeline {tenant_id}/{timeline_id} was not found.*") env.pageserver.allowed_errors.append( ".*files not bound to index_file.json, proceeding with their deletion.*" ) - - wait_until(2, 0.5, lambda: assert_timeline_detail_404(client, tenant_id, timeline_id)) + timeline_delete_wait_completed(client, tenant_id, timeline_id) assert not timeline_path.exists() diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index e9dcd1e5cd..a0f9f854ed 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -11,6 +11,7 @@ from fixtures.neon_fixtures import ( wait_for_wal_insert_lsn, ) from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.utils import timeline_delete_wait_completed from fixtures.pg_version import PgVersion, xfail_on_postgres from fixtures.types import Lsn, TenantId, TimelineId @@ -628,12 +629,12 @@ def test_get_tenant_size_with_multiple_branches( size_debug_file_before.write(size_debug) # teardown, delete branches, and the size should be going down - http_client.timeline_delete(tenant_id, first_branch_timeline_id) + timeline_delete_wait_completed(http_client, tenant_id, first_branch_timeline_id) size_after_deleting_first = http_client.tenant_size(tenant_id) assert size_after_deleting_first < size_after_thinning_branch - http_client.timeline_delete(tenant_id, second_branch_timeline_id) + timeline_delete_wait_completed(http_client, tenant_id, second_branch_timeline_id) size_after_deleting_second = http_client.tenant_size(tenant_id) assert size_after_deleting_second < size_after_deleting_first diff --git a/test_runner/regress/test_tenant_tasks.py b/test_runner/regress/test_tenant_tasks.py index 21e4af4127..75e5c2c91c 100644 --- a/test_runner/regress/test_tenant_tasks.py +++ b/test_runner/regress/test_tenant_tasks.py @@ -1,6 +1,10 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder -from fixtures.pageserver.utils import assert_tenant_state, wait_until_tenant_active +from fixtures.pageserver.utils import ( + assert_tenant_state, + timeline_delete_wait_completed, + wait_until_tenant_active, +) from fixtures.types import TenantId, TimelineId from fixtures.utils import wait_until @@ -24,7 +28,7 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder): def delete_all_timelines(tenant: TenantId): timelines = [TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)] for t in timelines: - client.timeline_delete(tenant, t) + timeline_delete_wait_completed(client, tenant, t) # Create tenant, start compute tenant, _ = env.neon_cli.create_tenant() diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 4a1d659be3..4dbfa8bc1f 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -21,6 +21,7 @@ from fixtures.neon_fixtures import ( RemoteStorageKind, available_remote_storages, ) +from fixtures.pageserver.utils import timeline_delete_wait_completed from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import wait_until from prometheus_client.samples import Sample @@ -318,9 +319,10 @@ def test_pageserver_with_empty_tenants( client.tenant_create(tenant_with_empty_timelines) temp_timelines = client.timeline_list(tenant_with_empty_timelines) for temp_timeline in temp_timelines: - client.timeline_delete( - tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"]) + timeline_delete_wait_completed( + client, tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"]) ) + files_in_timelines_dir = sum( 1 for _p in Path.iterdir( diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 28b15d03ca..ddd9ffd755 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -17,9 +17,10 @@ from fixtures.neon_fixtures import ( ) from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import ( - assert_timeline_detail_404, + timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, + wait_timeline_detail_404, wait_until_tenant_active, wait_until_timeline_state, ) @@ -83,7 +84,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): wait_until( number_of_iterations=3, interval=0.2, - func=lambda: ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id), + func=lambda: timeline_delete_wait_completed(ps_http, env.initial_tenant, leaf_timeline_id), ) assert not timeline_path.exists() @@ -94,16 +95,16 @@ def test_timeline_delete(neon_simple_env: NeonEnv): match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found", ) as exc: ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) - - # FIXME leaves tenant without timelines, should we prevent deletion of root timeline? - wait_until( - number_of_iterations=3, - interval=0.2, - func=lambda: ps_http.timeline_delete(env.initial_tenant, parent_timeline_id), - ) - assert exc.value.status_code == 404 + wait_until( + number_of_iterations=3, + interval=0.2, + func=lambda: timeline_delete_wait_completed( + ps_http, env.initial_tenant, parent_timeline_id + ), + ) + # Check that we didn't pick up the timeline again after restart. # See https://github.com/neondatabase/neon/issues/3560 env.pageserver.stop(immediate=True) @@ -143,7 +144,6 @@ def test_delete_timeline_post_rm_failure( ps_http.configure_failpoints((failpoint_name, "return")) ps_http.timeline_delete(env.initial_tenant, env.initial_timeline) - timeline_info = wait_until_timeline_state( pageserver_http=ps_http, tenant_id=env.initial_tenant, @@ -165,13 +165,7 @@ def test_delete_timeline_post_rm_failure( # this should succeed # this also checks that delete can be retried even when timeline is in Broken state - ps_http.timeline_delete(env.initial_tenant, env.initial_timeline, timeout=2) - with pytest.raises(PageserverApiException) as e: - ps_http.timeline_detail(env.initial_tenant, env.initial_timeline) - - assert e.value.status_code == 404 - - env.pageserver.allowed_errors.append(f".*NotFound: Timeline.*{env.initial_timeline}.*") + timeline_delete_wait_completed(ps_http, env.initial_tenant, env.initial_timeline) env.pageserver.allowed_errors.append( f".*{env.initial_timeline}.*timeline directory not found, proceeding anyway.*" ) @@ -247,13 +241,7 @@ def test_timeline_resurrection_on_attach( pass # delete new timeline - ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id) - - env.pageserver.allowed_errors.append( - f".*Timeline {tenant_id}/{branch_timeline_id} was not found.*" - ) - - wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, branch_timeline_id)) + timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=branch_timeline_id) ##### Stop the pageserver instance, erase all its data env.endpoints.stop_all() @@ -338,7 +326,6 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild ) ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id) - timeline_info = wait_until_timeline_state( pageserver_http=ps_http, tenant_id=env.initial_tenant, @@ -357,12 +344,15 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild # Wait for tenant to finish loading. wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1) - env.pageserver.allowed_errors.append( - f".*Timeline {env.initial_tenant}/{leaf_timeline_id} was not found.*" - ) - wait_until( - 2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id) - ) + try: + data = ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) + log.debug(f"detail {data}") + except PageserverApiException as e: + log.debug(e) + if e.status_code != 404: + raise + else: + raise Exception("detail succeeded (it should return 404)") assert ( not leaf_timeline_path.exists() @@ -389,13 +379,8 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild assert env.initial_timeline is not None for timeline_id in (intermediate_timeline_id, env.initial_timeline): - ps_http.timeline_delete(env.initial_tenant, timeline_id) - - env.pageserver.allowed_errors.append( - f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*" - ) - wait_until( - 2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, timeline_id) + timeline_delete_wait_completed( + ps_http, tenant_id=env.initial_tenant, timeline_id=timeline_id ) assert_prefix_empty( @@ -419,23 +404,27 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild ) -def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( - neon_env_builder: NeonEnvBuilder, +@pytest.mark.parametrize( + "stuck_failpoint", + ["persist_deleted_index_part", "in_progress_delete"], +) +def test_concurrent_timeline_delete_stuck_on( + neon_env_builder: NeonEnvBuilder, stuck_failpoint: str ): """ - If we're stuck uploading the index file with the is_delete flag, - eventually console will hand up and retry. - If we're still stuck at the retry time, ensure that the retry - fails with status 500, signalling to console that it should retry - later. - Ideally, timeline_delete should return 202 Accepted and require - console to poll for completion, but, that would require changing - the API contract. + If delete is stuck console will eventually retry deletion. + So we need to be sure that these requests wont interleave with each other. + In this tests we check two places where we can spend a lot of time. + This is a regression test because there was a bug when DeletionGuard wasnt propagated + to the background task. + + Ensure that when retry comes if we're still stuck request will get an immediate error response, + signalling to console that it should retry later. """ neon_env_builder.enable_remote_storage( remote_storage_kind=RemoteStorageKind.MOCK_S3, - test_name="test_concurrent_timeline_delete_if_first_stuck_at_index_upload", + test_name=f"concurrent_timeline_delete_stuck_on_{stuck_failpoint}", ) env = neon_env_builder.init_start() @@ -445,13 +434,14 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( ps_http = env.pageserver.http_client() # make the first call sleep practically forever - failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause" - ps_http.configure_failpoints((failpoint_name, "pause")) + ps_http.configure_failpoints((stuck_failpoint, "pause")) def first_call(result_queue): try: log.info("first call start") - ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10) + timeline_delete_wait_completed( + ps_http, env.initial_tenant, child_timeline_id, timeout=10 + ) log.info("first call success") result_queue.put("success") except Exception: @@ -466,7 +456,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( def first_call_hit_failpoint(): assert env.pageserver.log_contains( - f".*{child_timeline_id}.*at failpoint {failpoint_name}" + f".*{child_timeline_id}.*at failpoint {stuck_failpoint}" ) wait_until(50, 0.1, first_call_hit_failpoint) @@ -484,8 +474,12 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( ) log.info("second call failed as expected") + # ensure it is not 404 and stopping + detail = ps_http.timeline_detail(env.initial_tenant, child_timeline_id) + assert detail["state"] == "Stopping" + # by now we know that the second call failed, let's ensure the first call will finish - ps_http.configure_failpoints((failpoint_name, "off")) + ps_http.configure_failpoints((stuck_failpoint, "off")) result = first_call_result.get() assert result == "success" @@ -498,8 +492,10 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): """ - If the client hangs up before we start the index part upload but after we mark it + If the client hangs up before we start the index part upload but after deletion is scheduled + we mark it deleted in local memory, a subsequent delete_timeline call should be able to do + another delete timeline operation. This tests cancel safety up to the given failpoint. @@ -515,12 +511,18 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): ps_http = env.pageserver.http_client() - failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause" + failpoint_name = "persist_deleted_index_part" ps_http.configure_failpoints((failpoint_name, "pause")) with pytest.raises(requests.exceptions.Timeout): ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2) + env.pageserver.allowed_errors.append( + f".*{child_timeline_id}.*timeline deletion is already in progress.*" + ) + with pytest.raises(PageserverApiException, match="timeline deletion is already in progress"): + ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2) + # make sure the timeout was due to the failpoint at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*" @@ -552,12 +554,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): wait_until(50, 0.1, first_request_finished) # check that the timeline is gone - notfound_message = f"Timeline {env.initial_tenant}/{child_timeline_id} was not found" - env.pageserver.allowed_errors.append(".*" + notfound_message) - with pytest.raises(PageserverApiException, match=notfound_message) as exc: - ps_http.timeline_detail(env.initial_tenant, child_timeline_id) - - assert exc.value.status_code == 404 + wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id) @pytest.mark.parametrize( @@ -616,12 +613,7 @@ def test_timeline_delete_works_for_remote_smoke( for timeline_id in reversed(timeline_ids): # note that we need to finish previous deletion before scheduling next one # otherwise we can get an "HasChildren" error if deletion is not fast enough (real_s3) - ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id) - - env.pageserver.allowed_errors.append( - f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*" - ) - wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, timeline_id)) + timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=timeline_id) assert_prefix_empty( neon_env_builder, diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 1460172afe..5bdbc18927 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -24,6 +24,7 @@ from fixtures.neon_fixtures import ( from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pageserver.utils import ( assert_tenant_state, + timeline_delete_wait_completed, wait_for_upload_queue_empty, wait_until_tenant_active, ) @@ -272,7 +273,7 @@ def test_timeline_initial_logical_size_calculation_cancellation( if deletion_method == "tenant_detach": client.tenant_detach(tenant_id) elif deletion_method == "timeline_delete": - client.timeline_delete(tenant_id, timeline_id) + timeline_delete_wait_completed(client, tenant_id, timeline_id) delete_timeline_success.put(True) except PageserverApiException: delete_timeline_success.put(False) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 8b595596cb..a837501678 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -31,7 +31,11 @@ from fixtures.neon_fixtures import ( SafekeeperPort, available_remote_storages, ) -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + timeline_delete_wait_completed, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.pg_version import PgVersion from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import get_dir_size, query_scalar, start_in_background @@ -548,15 +552,15 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re f"sk_id={sk.id} to flush {last_lsn}", ) - ps_cli = env.pageserver.http_client() - pageserver_lsn = Lsn(ps_cli.timeline_detail(tenant_id, timeline_id)["last_record_lsn"]) + ps_http = env.pageserver.http_client() + pageserver_lsn = Lsn(ps_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"]) lag = last_lsn - pageserver_lsn log.info( f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb" ) endpoint.stop_and_destroy() - ps_cli.timeline_delete(tenant_id, timeline_id) + timeline_delete_wait_completed(ps_http, tenant_id, timeline_id) # Also delete and manually create timeline on safekeepers -- this tests # scenario of manual recovery on different set of safekeepers. @@ -583,7 +587,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re shutil.copy(f_partial_saved, f_partial_path) # recreate timeline on pageserver from scratch - ps_cli.timeline_create( + ps_http.timeline_create( pg_version=PgVersion(pg_version), tenant_id=tenant_id, new_timeline_id=timeline_id, @@ -598,7 +602,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re if elapsed > wait_lsn_timeout: raise RuntimeError("Timed out waiting for WAL redo") - tenant_status = ps_cli.tenant_status(tenant_id) + tenant_status = ps_http.tenant_status(tenant_id) if tenant_status["state"]["slug"] == "Loading": log.debug(f"Tenant {tenant_id} is still loading, retrying") else: