Merge branch 'main' into release_2023-06-15

This commit is contained in:
Vadim Kharitonov
2023-06-15 16:50:44 +02:00
37 changed files with 817 additions and 535 deletions

View File

@@ -623,51 +623,6 @@ jobs:
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
neon-image-depot:
# For testing this will run side-by-side for a few merges.
# This action is not really optimized yet, but gets the job done
runs-on: [ self-hosted, gen3, large ]
needs: [ tag ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
permissions:
contents: read
id-token: write
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Setup go
uses: actions/setup-go@v3
with:
go-version: '1.19'
- name: Set up Depot CLI
uses: depot/setup-action@v1
- name: Install Crane & ECR helper
run: go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
- name: Configure ECR login
run: |
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Build and push
uses: depot/build-push-action@v1
with:
# if no depot.json file is at the root of your repo, you must specify the project id
project: nrdv0s4kcs
push: true
tags: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:depot-${{needs.tag.outputs.build-tag}}
build-args: |
GIT_VERSION=${{ github.sha }}
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
compute-tools-image:
runs-on: [ self-hosted, gen3, large ]
needs: [ tag ]

10
Cargo.lock generated
View File

@@ -2770,7 +2770,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
dependencies = [
"bytes",
"fallible-iterator",
@@ -2783,7 +2783,7 @@ dependencies = [
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
dependencies = [
"native-tls",
"tokio",
@@ -2794,7 +2794,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -2812,7 +2812,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4272,7 +4272,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f"
dependencies = [
"async-trait",
"byteorder",

View File

@@ -140,11 +140,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
## Other git libraries
@@ -180,7 +180,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
# Changes the MAX_THREADS limit from 4096 to 32768.
# This is a temporary workaround for using tracing from many threads in safekeepers code,

View File

@@ -148,4 +148,14 @@ mod tests {
let file = File::open("tests/cluster_spec.json").unwrap();
let _spec: ComputeSpec = serde_json::from_reader(file).unwrap();
}
#[test]
fn parse_unknown_fields() {
// Forward compatibility test
let file = File::open("tests/cluster_spec.json").unwrap();
let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
let ob = json.as_object_mut().unwrap();
ob.insert("unknown_field_123123123".into(), "hello".into());
let _spec: ComputeSpec = serde_json::from_value(json).unwrap();
}
}

View File

@@ -34,6 +34,8 @@ use crate::{
Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000;
pub(super) mod metrics {
use metrics::{register_int_counter_vec, IntCounterVec};
use once_cell::sync::Lazy;
@@ -424,17 +426,33 @@ impl RemoteStorage for S3Bucket {
delete_objects.push(obj_id);
}
metrics::inc_delete_objects(paths.len() as u64);
self.client
.delete_objects()
.bucket(self.bucket_name.clone())
.delete(Delete::builder().set_objects(Some(delete_objects)).build())
.send()
.await
.map_err(|e| {
metrics::inc_delete_objects_fail(paths.len() as u64);
e
})?;
for chunk in delete_objects.chunks(MAX_DELETE_OBJECTS_REQUEST_SIZE) {
metrics::inc_delete_objects(chunk.len() as u64);
let resp = self
.client
.delete_objects()
.bucket(self.bucket_name.clone())
.delete(Delete::builder().set_objects(Some(chunk.to_vec())).build())
.send()
.await;
match resp {
Ok(resp) => {
if let Some(errors) = resp.errors {
metrics::inc_delete_objects_fail(errors.len() as u64);
return Err(anyhow::format_err!(
"Failed to delete {} objects",
errors.len()
));
}
}
Err(e) => {
metrics::inc_delete_objects_fail(chunk.len() as u64);
return Err(e.into());
}
}
}
Ok(())
}

View File

@@ -24,6 +24,7 @@ enum RemoteOp {
Upload(RemotePath),
Download(RemotePath),
Delete(RemotePath),
DeleteObjects(Vec<RemotePath>),
}
impl UnreliableWrapper {
@@ -121,8 +122,18 @@ impl RemoteStorage for UnreliableWrapper {
}
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
let mut error_counter = 0;
for path in paths {
self.delete(path).await?
if (self.delete(path).await).is_err() {
error_counter += 1;
}
}
if error_counter > 0 {
return Err(anyhow::anyhow!(
"failed to delete {} objects",
error_counter
));
}
Ok(())
}

View File

@@ -516,7 +516,7 @@ async fn collect_eviction_candidates(
if !tl.is_active() {
continue;
}
let info = tl.get_local_layers_for_disk_usage_eviction();
let info = tl.get_local_layers_for_disk_usage_eviction().await;
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
tenant_candidates.extend(
info.resident_layers

View File

@@ -215,7 +215,7 @@ async fn build_timeline_info(
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let mut info = build_timeline_info_common(timeline, ctx)?;
let mut info = build_timeline_info_common(timeline, ctx).await?;
if include_non_incremental_logical_size {
// XXX we should be using spawn_ondemand_logical_size_calculation here.
// Otherwise, if someone deletes the timeline / detaches the tenant while
@@ -233,7 +233,7 @@ async fn build_timeline_info(
Ok(info)
}
fn build_timeline_info_common(
async fn build_timeline_info_common(
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
@@ -264,7 +264,7 @@ fn build_timeline_info_common(
None
}
};
let current_physical_size = Some(timeline.layer_size_sum());
let current_physical_size = Some(timeline.layer_size_sum().await);
let state = timeline.current_state();
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
@@ -330,6 +330,7 @@ async fn timeline_create_handler(
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
}
@@ -591,7 +592,7 @@ async fn tenant_status(
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
for timeline in tenant.list_timelines().iter() {
current_physical_size += timeline.layer_size_sum();
current_physical_size += timeline.layer_size_sum().await;
}
let state = tenant.current_state();
@@ -701,7 +702,7 @@ async fn layer_map_info_handler(
check_permission(&request, Some(tenant_id))?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let layer_map_info = timeline.layer_map_info(reset);
let layer_map_info = timeline.layer_map_info(reset).await;
json_response(StatusCode::OK, layer_map_info)
}

View File

@@ -1,4 +1,3 @@
use metrics::core::{AtomicU64, GenericCounter};
use metrics::{
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
@@ -95,21 +94,19 @@ static READ_NUM_FS_LAYERS: Lazy<HistogramVec> = Lazy::new(|| {
});
// Metrics collected on operations on the storage repository.
static RECONSTRUCT_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
pub static RECONSTRUCT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_getpage_reconstruct_seconds",
"Time spent in reconstruct_value",
&["tenant_id", "timeline_id"],
"Time spent in reconstruct_value (reconstruct a page from deltas)",
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});
static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
pub static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_direct_total",
"Number of cache hits from materialized page cache without redo",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric")
});
@@ -124,11 +121,10 @@ static GET_RECONSTRUCT_DATA_TIME: Lazy<HistogramVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
pub static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_total",
"Number of cache hits from materialized page cache",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric")
});
@@ -752,10 +748,7 @@ impl StorageTimeMetrics {
pub struct TimelineMetrics {
tenant_id: String,
timeline_id: String,
pub reconstruct_time_histo: Histogram,
pub get_reconstruct_data_time_histo: Histogram,
pub materialized_page_cache_hit_counter: GenericCounter<AtomicU64>,
pub materialized_page_cache_hit_upon_request_counter: GenericCounter<AtomicU64>,
pub flush_time_histo: StorageTimeMetrics,
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
@@ -783,15 +776,9 @@ impl TimelineMetrics {
) -> Self {
let tenant_id = tenant_id.to_string();
let timeline_id = timeline_id.to_string();
let reconstruct_time_histo = RECONSTRUCT_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let get_reconstruct_data_time_histo = GET_RECONSTRUCT_DATA_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let flush_time_histo =
StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id);
let compact_time_histo =
@@ -833,19 +820,18 @@ impl TimelineMetrics {
let read_num_fs_layers = READ_NUM_FS_LAYERS
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let materialized_page_cache_hit_upon_request_counter = MATERIALIZED_PAGE_CACHE_HIT_DIRECT
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let evictions_with_low_residence_duration =
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
// TODO(chi): remove this once we remove Lazy for all metrics. Otherwise this will not appear in the exporter
// and integration test will error.
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get();
MATERIALIZED_PAGE_CACHE_HIT.get();
TimelineMetrics {
tenant_id,
timeline_id,
reconstruct_time_histo,
get_reconstruct_data_time_histo,
materialized_page_cache_hit_counter,
materialized_page_cache_hit_upon_request_counter,
flush_time_histo,
compact_time_histo,
create_images_time_histo,
@@ -872,10 +858,7 @@ impl Drop for TimelineMetrics {
fn drop(&mut self) {
let tenant_id = &self.tenant_id;
let timeline_id = &self.timeline_id;
let _ = RECONSTRUCT_TIME.remove_label_values(&[tenant_id, timeline_id]);
let _ = GET_RECONSTRUCT_DATA_TIME.remove_label_values(&[tenant_id, timeline_id]);
let _ = MATERIALIZED_PAGE_CACHE_HIT.remove_label_values(&[tenant_id, timeline_id]);
let _ = MATERIALIZED_PAGE_CACHE_HIT_DIRECT.remove_label_values(&[tenant_id, timeline_id]);
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]);
let _ = WAIT_LSN_TIME.remove_label_values(&[tenant_id, timeline_id]);
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);

View File

@@ -1133,16 +1133,17 @@ impl<'a> DatadirModification<'a> {
let writer = self.tline.writer().await;
// Flush relation and SLRU data blocks, keep metadata.
let mut result: anyhow::Result<()> = Ok(());
self.pending_updates.retain(|&key, value| {
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
result = writer.put(key, self.lsn, value);
false
let mut retained_pending_updates = HashMap::new();
for (key, value) in self.pending_updates.drain() {
if is_rel_block_key(key) || is_slru_block_key(key) {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put(key, self.lsn, &value).await?;
} else {
true
retained_pending_updates.insert(key, value);
}
});
result?;
}
self.pending_updates.extend(retained_pending_updates);
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
@@ -1164,10 +1165,10 @@ impl<'a> DatadirModification<'a> {
self.pending_nblocks = 0;
for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, &value)?;
writer.put(key, lsn, &value).await?;
}
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn)?;
writer.delete(key_range, lsn).await?;
}
writer.finish_write(lsn);

View File

@@ -473,6 +473,14 @@ pub(crate) enum ShutdownError {
AlreadyStopping,
}
struct DeletionGuard(OwnedMutexGuard<bool>);
impl DeletionGuard {
fn is_deleted(&self) -> bool {
*self.0
}
}
impl Tenant {
/// Yet another helper for timeline initialization.
/// Contains the common part of `load_local_timeline` and `load_remote_timeline`.
@@ -519,6 +527,7 @@ impl Tenant {
);
timeline
.load_layer_map(new_disk_consistent_lsn)
.await
.with_context(|| {
format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
})?;
@@ -560,7 +569,7 @@ impl Tenant {
|| timeline
.layers
.read()
.unwrap()
.await
.iter_historic_layers()
.next()
.is_some(),
@@ -1137,7 +1146,11 @@ impl Tenant {
)
.context("create_timeline_struct")?;
let guard = Arc::clone(&timeline.delete_lock).lock_owned().await;
let guard = DeletionGuard(
Arc::clone(&timeline.delete_lock)
.try_lock_owned()
.expect("cannot happen because we're the only owner"),
);
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
// RemoteTimelineClient is the only functioning part.
@@ -1461,7 +1474,13 @@ impl Tenant {
let timelines = self.timelines.lock().unwrap();
let timelines_to_compact = timelines
.iter()
.map(|(timeline_id, timeline)| (*timeline_id, timeline.clone()))
.filter_map(|(timeline_id, timeline)| {
if timeline.is_active() {
Some((*timeline_id, timeline.clone()))
} else {
None
}
})
.collect::<Vec<_>>();
drop(timelines);
timelines_to_compact
@@ -1542,6 +1561,7 @@ impl Tenant {
&self,
timeline_id: TimelineId,
timeline: Arc<Timeline>,
guard: DeletionGuard,
) -> anyhow::Result<()> {
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
@@ -1614,6 +1634,25 @@ impl Tenant {
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
});
if let Some(remote_client) = &timeline.remote_client {
remote_client.delete_all().await.context("delete_all")?
};
// Have a failpoint that can use the `pause` failpoint action.
// We don't want to block the executor thread, hence, spawn_blocking + await.
if cfg!(feature = "testing") {
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!("at failpoint in_progress_delete");
fail::fail_point!("in_progress_delete");
}
})
.await
.expect("spawn_blocking");
}
{
// Remove the timeline from the map.
let mut timelines = self.timelines.lock().unwrap();
@@ -1634,12 +1673,7 @@ impl Tenant {
drop(timelines);
}
let remote_client = match &timeline.remote_client {
Some(remote_client) => remote_client,
None => return Ok(()),
};
remote_client.delete_all().await?;
drop(guard);
Ok(())
}
@@ -1687,23 +1721,18 @@ impl Tenant {
timeline = Arc::clone(timeline_entry.get());
// Prevent two tasks from trying to delete the timeline at the same time.
//
// XXX: We should perhaps return an HTTP "202 Accepted" to signal that the caller
// needs to poll until the operation has finished. But for now, we return an
// error, because the control plane knows to retry errors.
delete_lock_guard =
Arc::clone(&timeline.delete_lock)
.try_lock_owned()
.map_err(|_| {
DeletionGuard(Arc::clone(&timeline.delete_lock).try_lock_owned().map_err(
|_| {
DeleteTimelineError::Other(anyhow::anyhow!(
"timeline deletion is already in progress"
))
})?;
},
)?);
// If another task finished the deletion just before we acquired the lock,
// return success.
if *delete_lock_guard {
if delete_lock_guard.is_deleted() {
return Ok(());
}
@@ -1777,7 +1806,7 @@ impl Tenant {
self: Arc<Self>,
timeline_id: TimelineId,
timeline: Arc<Timeline>,
_guard: OwnedMutexGuard<bool>,
guard: DeletionGuard,
) {
let tenant_id = self.tenant_id;
let timeline_clone = Arc::clone(&timeline);
@@ -1790,7 +1819,7 @@ impl Tenant {
"timeline_delete",
false,
async move {
if let Err(err) = self.delete_timeline(timeline_id, timeline).await {
if let Err(err) = self.delete_timeline(timeline_id, timeline, guard).await {
error!("Error: {err:#}");
timeline_clone.set_broken(err.to_string())
};
@@ -3582,12 +3611,16 @@ mod tests {
.await?;
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
writer
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
.await?;
writer.finish_write(Lsn(0x10));
drop(writer);
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
writer
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
.await?;
writer.finish_write(Lsn(0x20));
drop(writer);
@@ -3656,13 +3689,21 @@ mod tests {
let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap();
// Insert a value on the timeline
writer.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))?;
writer.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))?;
writer
.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))
.await?;
writer
.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))
.await?;
writer.finish_write(Lsn(0x20));
writer.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))?;
writer
.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))
.await?;
writer.finish_write(Lsn(0x30));
writer.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))?;
writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))
.await?;
writer.finish_write(Lsn(0x40));
//assert_current_logical_size(&tline, Lsn(0x40));
@@ -3675,7 +3716,9 @@ mod tests {
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
let new_writer = newtline.writer().await;
new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?;
new_writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))
.await?;
new_writer.finish_write(Lsn(0x40));
// Check page contents on both branches
@@ -3703,36 +3746,44 @@ mod tests {
{
let writer = tline.writer().await;
// Create a relation on the timeline
writer.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer
.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.finish_write(lsn);
lsn += 0x10;
writer.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer
.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.finish_write(lsn);
lsn += 0x10;
}
tline.freeze_and_flush().await?;
{
let writer = tline.writer().await;
writer.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer
.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.finish_write(lsn);
lsn += 0x10;
writer.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer
.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.finish_write(lsn);
}
tline.freeze_and_flush().await
@@ -4048,7 +4099,9 @@ mod tests {
.await?;
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
writer
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
.await?;
writer.finish_write(Lsn(0x10));
drop(writer);
@@ -4056,7 +4109,9 @@ mod tests {
tline.compact(&ctx).await?;
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
writer
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
.await?;
writer.finish_write(Lsn(0x20));
drop(writer);
@@ -4064,7 +4119,9 @@ mod tests {
tline.compact(&ctx).await?;
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?;
writer
.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))
.await?;
writer.finish_write(Lsn(0x30));
drop(writer);
@@ -4072,7 +4129,9 @@ mod tests {
tline.compact(&ctx).await?;
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?;
writer
.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))
.await?;
writer.finish_write(Lsn(0x40));
drop(writer);
@@ -4124,11 +4183,13 @@ mod tests {
for _ in 0..10000 {
test_key.field6 = blknum;
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
writer.finish_write(lsn);
drop(writer);
@@ -4174,11 +4235,13 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
writer.finish_write(lsn);
updated[blknum] = lsn;
drop(writer);
@@ -4192,11 +4255,13 @@ mod tests {
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
writer.finish_write(lsn);
drop(writer);
updated[blknum] = lsn;
@@ -4249,11 +4314,13 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
writer.finish_write(lsn);
updated[blknum] = lsn;
drop(writer);
@@ -4275,11 +4342,13 @@ mod tests {
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
println!("updating {} at {}", blknum, lsn);
writer.finish_write(lsn);
drop(writer);
@@ -4341,11 +4410,13 @@ mod tests {
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
)?;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
)
.await?;
println!("updating [{}][{}] at {}", idx, blknum, lsn);
writer.finish_write(lsn);
drop(writer);

View File

@@ -753,22 +753,18 @@ impl RemoteTimelineClient {
// Have a failpoint that can use the `pause` failpoint action.
// We don't want to block the executor thread, hence, spawn_blocking + await.
#[cfg(feature = "testing")]
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!(
"at failpoint persist_index_part_with_deleted_flag_after_set_before_upload_pause"
);
fail::fail_point!(
"persist_index_part_with_deleted_flag_after_set_before_upload_pause"
);
}
})
.await
.expect("spawn_blocking");
if cfg!(feature = "testing") {
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!("at failpoint persist_deleted_index_part");
fail::fail_point!("persist_deleted_index_part");
}
})
.await
.expect("spawn_blocking");
}
upload::upload_index_part(
self.conf,
&self.storage_impl,

View File

@@ -389,10 +389,10 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
}
/// Returned by [`Layer::iter`]
pub type LayerIter<'i> = Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + 'i>;
pub type LayerIter<'i> = Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + 'i + Send>;
/// Returned by [`Layer::key_iter`]
pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i>;
pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i + Send>;
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
/// range of LSNs.

View File

@@ -304,7 +304,7 @@ impl InMemoryLayer {
Ok(())
}
pub fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
// TODO: Currently, we just leak the storage for any deleted keys
Ok(())

View File

@@ -47,7 +47,10 @@ use crate::tenant::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
use crate::metrics::{
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
RECONSTRUCT_TIME, UNEXPECTED_ONDEMAND_DOWNLOADS,
};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
@@ -125,7 +128,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -539,9 +542,7 @@ impl Timeline {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
Ordering::Equal => {
self.metrics
.materialized_page_cache_hit_upon_request_counter
.inc();
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
return Ok(cached_img); // exact LSN match, return the image
}
Ordering::Greater => {
@@ -563,8 +564,7 @@ impl Timeline {
.await?;
timer.stop_and_record();
self.metrics
.reconstruct_time_histo
RECONSTRUCT_TIME
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
}
@@ -597,8 +597,8 @@ impl Timeline {
/// The sum of the file size of all historic layers in the layer map.
/// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**.
pub fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().unwrap();
pub async fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().await;
let mut size = 0;
for l in layer_map.iter_historic_layers() {
size += l.file_size();
@@ -908,7 +908,7 @@ impl Timeline {
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let open_layer_size = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let Some(open_layer) = layers.open_layer.as_ref() else {
return Ok(());
};
@@ -1038,8 +1038,8 @@ impl Timeline {
}
}
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().unwrap();
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().await;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info());
@@ -1061,7 +1061,7 @@ impl Timeline {
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
if self.remote_client.is_none() {
return Ok(Some(false));
@@ -1074,7 +1074,7 @@ impl Timeline {
/// Like [`evict_layer_batch`], but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
let remote_client = self
.remote_client
.as_ref()
@@ -1159,7 +1159,7 @@ impl Timeline {
}
// start the batch update
let mut layer_map = self.layers.write().unwrap();
let mut layer_map = self.layers.write().await;
let mut batch_updates = layer_map.batch_update();
let mut results = Vec::with_capacity(layers_to_evict.len());
@@ -1417,7 +1417,7 @@ impl Timeline {
timeline_id,
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
layers: tokio::sync::RwLock::new(LayerMap::default()),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
@@ -1598,15 +1598,17 @@ impl Timeline {
/// Initialize with an empty layer map. Used when creating a new timeline.
///
pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.try_write().expect(
"in the context where we call this function, no other task has access to the object",
);
layers.next_open_layer_at = Some(Lsn(start_lsn.0));
}
///
/// Scan the timeline directory to populate the layer map.
///
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().unwrap();
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let mut num_layers = 0;
@@ -1735,7 +1737,7 @@ impl Timeline {
// We're holding a layer map lock for a while but this
// method is only called during init so it's fine.
let mut layer_map = self.layers.write().unwrap();
let mut layer_map = self.layers.write().await;
let mut updates = layer_map.batch_update();
for remote_layer_name in &index_part.timeline_layers {
let local_layer = local_only_layers.remove(remote_layer_name);
@@ -1888,7 +1890,7 @@ impl Timeline {
let local_layers = self
.layers
.read()
.unwrap()
.await
.iter_historic_layers()
.map(|l| (l.filename(), l))
.collect::<HashMap<_, _>>();
@@ -2261,8 +2263,8 @@ impl Timeline {
}
}
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().await.iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
return Some(historic_layer);
@@ -2385,7 +2387,7 @@ impl Timeline {
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
self.metrics.materialized_page_cache_hit_counter.inc_by(1);
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(());
}
if prev_lsn <= cont_lsn {
@@ -2479,7 +2481,7 @@ impl Timeline {
#[allow(clippy::never_loop)] // see comment at bottom of this loop
'layer_map_search: loop {
let remote_layer = {
let layers = timeline.layers.read().unwrap();
let layers = timeline.layers.read().await;
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
@@ -2661,8 +2663,8 @@ impl Timeline {
///
/// Get a handle to the latest layer for appending.
///
fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().unwrap();
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().await;
ensure!(lsn.is_aligned());
@@ -2711,17 +2713,16 @@ impl Timeline {
Ok(layer)
}
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write(lsn)?;
let layer = self.get_layer_for_write(lsn).await?;
layer.put_value(key, lsn, val)?;
Ok(())
}
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn)?;
layer.put_tombstone(key_range, lsn)?;
async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn).await?;
layer.put_tombstone(key_range, lsn).await?;
Ok(())
}
@@ -2740,7 +2741,7 @@ impl Timeline {
} else {
Some(self.write_lock.lock().await)
};
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
if let Some(open_layer) = &layers.open_layer {
let open_layer_rc = Arc::clone(open_layer);
// Does this layer need freezing?
@@ -2778,7 +2779,7 @@ impl Timeline {
let flush_counter = *layer_flush_start_rx.borrow();
let result = loop {
let layer_to_flush = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
layers.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes
};
@@ -2894,16 +2895,7 @@ impl Timeline {
}
}
// normal case, write out a L0 delta layer file.
let this = self.clone();
let frozen_layer = frozen_layer.clone();
let span = tracing::info_span!("blocking");
let (delta_path, metadata) = tokio::task::spawn_blocking(move || {
let _g = span.entered();
this.create_delta_layer(&frozen_layer)
})
.await
.context("create_delta_layer spawn_blocking")
.and_then(|res| res)?;
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?;
HashMap::from([(delta_path, metadata)])
};
@@ -2912,7 +2904,7 @@ impl Timeline {
// The new on-disk layers are now in the layer map. We can remove the
// in-memory layer from the map now.
{
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let l = layers.frozen_layers.pop_front();
// Only one thread may call this function at a time (for this
@@ -3006,34 +2998,50 @@ impl Timeline {
}
// Write out the given frozen in-memory layer as a new L0 delta file
fn create_delta_layer(
async fn create_delta_layer(
self: &Arc<Self>,
frozen_layer: &InMemoryLayer,
frozen_layer: &Arc<InMemoryLayer>,
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
// Write it out
let new_delta = frozen_layer.write_to_disk()?;
let new_delta_path = new_delta.path();
let new_delta_filename = new_delta.filename();
let span = tracing::info_span!("blocking");
let (new_delta, sz): (DeltaLayer, _) = tokio::task::spawn_blocking({
let _g = span.entered();
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
move || {
// Write it out
let new_delta = frozen_layer.write_to_disk()?;
let new_delta_path = new_delta.path();
// Sync it to disk.
//
// We must also fsync the timeline dir to ensure the directory entries for
// new layer files are durable
//
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
// files to flush, it might be better to first write them all, and then fsync
// them all in parallel.
// Sync it to disk.
//
// We must also fsync the timeline dir to ensure the directory entries for
// new layer files are durable
//
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
// files to flush, it might be better to first write them all, and then fsync
// them all in parallel.
// First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace
// this with a single fsync in future refactors.
par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?;
// Then sync the parent directory.
par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
.context("fsync of timeline dir")?;
// First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace
// this with a single fsync in future refactors.
par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?;
// Then sync the parent directory.
par_fsync::par_fsync(&[self_clone
.conf
.timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)])
.context("fsync of timeline dir")?;
let sz = new_delta_path.metadata()?.len();
anyhow::Ok((new_delta, sz))
}
})
.await
.context("spawn_blocking")??;
let new_delta_name = new_delta.filename();
// Add it to the layer map
let l = Arc::new(new_delta);
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut batch_updates = layers.batch_update();
l.access_stats().record_residence_event(
&batch_updates,
@@ -3044,14 +3052,12 @@ impl Timeline {
batch_updates.flush();
// update the timeline's physical size
let sz = new_delta_path.metadata()?.len();
self.metrics.resident_physical_size_gauge.add(sz);
// update metrics
self.metrics.num_persistent_files_created.inc_by(1);
self.metrics.persistent_bytes_written.inc_by(sz);
Ok((new_delta_filename, LayerFileMetadata::new(sz)))
Ok((new_delta_name, LayerFileMetadata::new(sz)))
}
async fn repartition(
@@ -3085,10 +3091,14 @@ impl Timeline {
}
// Is it time to create a new image layer for the given partition?
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
async fn time_for_new_image_layer(
&self,
partition: &KeySpace,
lsn: Lsn,
) -> anyhow::Result<bool> {
let threshold = self.get_image_creation_threshold();
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut max_deltas = 0;
{
@@ -3183,7 +3193,7 @@ impl Timeline {
for partition in partitioning.parts.iter() {
let img_range = start..partition.ranges.last().unwrap().end;
start = img_range.end;
if force || self.time_for_new_image_layer(partition, lsn)? {
if force || self.time_for_new_image_layer(partition, lsn).await? {
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
@@ -3266,7 +3276,7 @@ impl Timeline {
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
@@ -3328,13 +3338,13 @@ impl Timeline {
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
/// start of level0 files compaction, the on-demand download should be revisited as well.
fn compact_level0_phase1(
async fn compact_level0_phase1(
&self,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
@@ -3457,7 +3467,7 @@ impl Timeline {
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here?
let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here?
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
@@ -3671,21 +3681,12 @@ impl Timeline {
target_file_size: u64,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
let this = self.clone();
let ctx_inner = ctx.clone();
let layer_removal_cs_inner = layer_removal_cs.clone();
let span = tracing::info_span!("blocking");
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
} = tokio::task::spawn_blocking(move || {
let _g = span.entered();
this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner)
})
.await
.context("compact_level0_phase1 spawn_blocking")
.map_err(CompactionError::Other)
.and_then(|res| res)?;
} = self
.compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx)
.await?;
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do
@@ -3703,7 +3704,7 @@ impl Timeline {
.context("wait for layer upload ops to complete")?;
}
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
@@ -3790,6 +3791,7 @@ impl Timeline {
/// for example. The caller should hold `Tenant::gc_cs` lock to ensure
/// that.
///
#[instrument(skip_all, fields(timline_id=%self.timeline_id))]
pub(super) async fn update_gc_info(
&self,
retain_lsns: Vec<Lsn>,
@@ -3962,7 +3964,7 @@ impl Timeline {
// 4. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -4262,7 +4264,7 @@ impl Timeline {
// Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().unwrap();
let mut layers = self_clone.layers.write().await;
let mut updates = layers.batch_update();
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
@@ -4420,7 +4422,7 @@ impl Timeline {
) {
let mut downloads = Vec::new();
{
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
layers
.iter_historic_layers()
.filter_map(|l| l.downcast_remote_layer())
@@ -4522,8 +4524,8 @@ impl LocalLayerInfoForDiskUsageEviction {
}
impl Timeline {
pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().unwrap();
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().await;
let mut max_layer_size: Option<u64> = None;
let mut resident_layers = Vec::new();
@@ -4611,12 +4613,12 @@ impl<'a> TimelineWriter<'a> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value)
pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value).await
}
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn)
pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn).await
}
/// Track the end of the latest digested WAL record.

View File

@@ -197,7 +197,7 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
if hist_layer.is_remote_layer() {

View File

@@ -257,7 +257,7 @@ nwp_register_gucs(void)
"Walproposer reconnects to offline safekeepers once in this interval.",
NULL,
&wal_acceptor_reconnect_timeout,
5000, 0, INT_MAX, /* default, min, max */
1000, 0, INT_MAX, /* default, min, max */
PGC_SIGHUP, /* context */
GUC_UNIT_MS, /* flags */
NULL, NULL, NULL);

View File

@@ -1,5 +1,6 @@
use futures::pin_mut;
use futures::StreamExt;
use futures::TryFutureExt;
use hyper::body::HttpBody;
use hyper::http::HeaderName;
use hyper::http::HeaderValue;
@@ -11,8 +12,13 @@ use serde_json::Value;
use tokio_postgres::types::Kind;
use tokio_postgres::types::Type;
use tokio_postgres::Row;
use tracing::error;
use tracing::info;
use tracing::instrument;
use url::Url;
use crate::proxy::invalidate_cache;
use crate::proxy::NUM_RETRIES_WAKE_COMPUTE;
use crate::{auth, config::ProxyConfig, console};
#[derive(serde::Deserialize)]
@@ -90,10 +96,17 @@ fn json_array_to_pg_array(value: &Value) -> Result<Option<String>, serde_json::E
}
}
struct ConnInfo {
username: String,
dbname: String,
hostname: String,
password: String,
}
fn get_conn_info(
headers: &HeaderMap,
sni_hostname: Option<String>,
) -> Result<(String, String, String, String), anyhow::Error> {
) -> Result<ConnInfo, anyhow::Error> {
let connection_string = headers
.get("Neon-Connection-String")
.ok_or(anyhow::anyhow!("missing connection string"))?
@@ -146,12 +159,12 @@ fn get_conn_info(
}
}
Ok((
username.to_owned(),
dbname.to_owned(),
hostname.to_owned(),
password.to_owned(),
))
Ok(ConnInfo {
username: username.to_owned(),
dbname: dbname.to_owned(),
hostname: hostname.to_owned(),
password: password.to_owned(),
})
}
// TODO: return different http error codes
@@ -164,10 +177,10 @@ pub async fn handle(
// Determine the destination and connection params
//
let headers = request.headers();
let (username, dbname, hostname, password) = get_conn_info(headers, sni_hostname)?;
let conn_info = get_conn_info(headers, sni_hostname)?;
let credential_params = StartupMessageParams::new([
("user", &username),
("database", &dbname),
("user", &conn_info.username),
("database", &conn_info.dbname),
("application_name", APP_NAME),
]);
@@ -186,21 +199,20 @@ pub async fn handle(
let creds = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&credential_params, Some(&hostname), common_names))
.map(|_| {
auth::ClientCredentials::parse(
&credential_params,
Some(&conn_info.hostname),
common_names,
)
})
.transpose()?;
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
};
let node = creds.wake_compute(&extra).await?.expect("msg");
let conf = node.value.config;
let port = *conf.get_ports().first().expect("no port");
let host = match conf.get_hosts().first().expect("no host") {
tokio_postgres::config::Host::Tcp(host) => host,
tokio_postgres::config::Host::Unix(_) => {
return Err(anyhow::anyhow!("unix socket is not supported"));
}
};
let mut node_info = creds.wake_compute(&extra).await?.expect("msg");
let request_content_length = match request.body().size_hint().upper() {
Some(v) => v,
@@ -220,28 +232,10 @@ pub async fn handle(
let QueryData { query, params } = serde_json::from_slice(&body)?;
let query_params = json_to_pg_text(params)?;
//
// Connenct to the destination
//
let (client, connection) = tokio_postgres::Config::new()
.host(host)
.port(port)
.user(&username)
.password(&password)
.dbname(&dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect(tokio_postgres::NoTls)
.await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
//
// Now execute the query and return the result
//
let client = connect_to_compute(&mut node_info, &extra, &creds, &conn_info).await?;
let row_stream = client.query_raw_txt(query, query_params).await?;
// Manually drain the stream into a vector to leave row_stream hanging
@@ -280,6 +274,11 @@ pub async fn handle(
json!({
"name": Value::String(c.name().to_owned()),
"dataTypeID": Value::Number(c.type_().oid().into()),
"tableID": c.table_oid(),
"columnID": c.column_id(),
"dataTypeSize": c.type_size(),
"dataTypeModifier": c.type_modifier(),
"format": "text",
})
})
.collect::<Vec<_>>()
@@ -303,6 +302,70 @@ pub async fn handle(
}))
}
/// This function is a copy of `connect_to_compute` from `src/proxy.rs` with
/// the difference that it uses `tokio_postgres` for the connection.
#[instrument(skip_all)]
async fn connect_to_compute(
node_info: &mut console::CachedNodeInfo,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
conn_info: &ConnInfo,
) -> anyhow::Result<tokio_postgres::Client> {
let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE;
loop {
match connect_to_compute_once(node_info, conn_info).await {
Err(e) if num_retries > 0 => {
info!("compute node's state has changed; requesting a wake-up");
match creds.wake_compute(extra).await? {
// Update `node_info` and try one more time.
Some(new) => {
*node_info = new;
}
// Link auth doesn't work that way, so we just exit.
None => return Err(e),
}
}
other => return other,
}
num_retries -= 1;
info!("retrying after wake-up ({num_retries} attempts left)");
}
}
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
conn_info: &ConnInfo,
) -> anyhow::Result<tokio_postgres::Client> {
let mut config = (*node_info.config).clone();
let (client, connection) = config
.user(&conn_info.username)
.password(&conn_info.password)
.dbname(&conn_info.dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect(tokio_postgres::NoTls)
.inspect_err(|e: &tokio_postgres::Error| {
error!(
"failed to connect to compute node hosts={:?} ports={:?}: {}",
node_info.config.get_hosts(),
node_info.config.get_ports(),
e
);
invalidate_cache(node_info)
})
.await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
Ok(client)
}
//
// Convert postgres row with text-encoded values to JSON object
//

View File

@@ -26,7 +26,6 @@ use tls_listener::TlsListener;
use tokio::{
io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf},
net::TcpListener,
select,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
@@ -193,14 +192,9 @@ async fn ws_handler(
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
let result = select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
Err(anyhow::anyhow!("Query timed out"))
}
response = sql_over_http::handle(config, request, sni_hostname) => {
response
}
};
let result = sql_over_http::handle(config, request, sni_hostname)
.instrument(info_span!("sql-over-http"))
.await;
let status_code = match result {
Ok(_) => StatusCode::OK,
Err(_) => StatusCode::BAD_REQUEST,

View File

@@ -22,7 +22,7 @@ use tracing::{error, info, warn};
use utils::measured_stream::MeasuredStream;
/// Number of times we should retry the `/proxy_wake_compute` http request.
const NUM_RETRIES_WAKE_COMPUTE: usize = 1;
pub const NUM_RETRIES_WAKE_COMPUTE: usize = 1;
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
@@ -283,34 +283,35 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
}
/// If we couldn't connect, a cached connection info might be to blame
/// (e.g. the compute node's address might've changed at the wrong time).
/// Invalidate the cache entry (if any) to prevent subsequent errors.
#[tracing::instrument(name = "invalidate_cache", skip_all)]
pub fn invalidate_cache(node_info: &console::CachedNodeInfo) {
let is_cached = node_info.cached();
if is_cached {
warn!("invalidating stalled compute node info cache entry");
node_info.invalidate();
}
let label = match is_cached {
true => "compute_cached",
false => "compute_uncached",
};
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
}
/// Try to connect to the compute node once.
#[tracing::instrument(name = "connect_once", skip_all)]
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
) -> Result<PostgresConnection, compute::ConnectionError> {
// If we couldn't connect, a cached connection info might be to blame
// (e.g. the compute node's address might've changed at the wrong time).
// Invalidate the cache entry (if any) to prevent subsequent errors.
let invalidate_cache = |_: &compute::ConnectionError| {
let is_cached = node_info.cached();
if is_cached {
warn!("invalidating stalled compute node info cache entry");
node_info.invalidate();
}
let label = match is_cached {
true => "compute_cached",
false => "compute_uncached",
};
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
};
let allow_self_signed_compute = node_info.allow_self_signed_compute;
node_info
.config
.connect(allow_self_signed_compute)
.inspect_err(invalidate_cache)
.inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info))
.await
}

191
scripts/comment-test-report.js Normal file → Executable file
View File

@@ -1,3 +1,5 @@
#! /usr/bin/env node
//
// The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch.
//
@@ -19,7 +21,7 @@
// })
//
// Analog of Python's defaultdict.
// Equivalent of Python's defaultdict.
//
// const dm = new DefaultMap(() => new DefaultMap(() => []))
// dm["firstKey"]["secondKey"].push("value")
@@ -32,34 +34,7 @@ class DefaultMap extends Map {
}
}
module.exports = async ({ github, context, fetch, report }) => {
// Marker to find the comment in the subsequent runs
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
// If we run the script in the PR or in the branch (main/release/...)
const isPullRequest = !!context.payload.pull_request
// Latest commit in PR or in the branch
const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha
// Let users know that the comment is updated automatically
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${commitSha} at ${new Date().toISOString()} :recycle:</sub></div>`
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
const githubActionsBotId = 41898282
// Commend body itself
let commentBody = `${startMarker}\n`
// Common parameters for GitHub API requests
const ownerRepoParams = {
owner: context.repo.owner,
repo: context.repo.repo,
}
const {reportUrl, reportJsonUrl} = report
if (!reportUrl || !reportJsonUrl) {
commentBody += `#### No tests were run or test report is not available\n`
commentBody += autoupdateNotice
return
}
const parseReportJson = async ({ reportJsonUrl, fetch }) => {
const suites = await (await fetch(reportJsonUrl)).json()
// Allure distinguishes "failed" (with an assertion error) and "broken" (with any other error) tests.
@@ -83,7 +58,7 @@ module.exports = async ({ github, context, fetch, report }) => {
let buildType, pgVersion
const match = test.name.match(/[\[-](?<buildType>debug|release)-pg(?<pgVersion>\d+)[-\]]/)?.groups
if (match) {
({buildType, pgVersion} = match)
({ buildType, pgVersion } = match)
} else {
// It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance).
console.info(`Cannot get BUILD_TYPE and Postgres Version from test name: "${test.name}", defaulting to "release" and "14"`)
@@ -123,37 +98,68 @@ module.exports = async ({ github, context, fetch, report }) => {
}
}
return {
failedTests,
failedTestsCount,
passedTests,
passedTestsCount,
skippedTests,
skippedTestsCount,
flakyTests,
flakyTestsCount,
retriedTests,
pgVersions,
}
}
const reportSummary = async (params) => {
const {
failedTests,
failedTestsCount,
passedTests,
passedTestsCount,
skippedTests,
skippedTestsCount,
flakyTests,
flakyTestsCount,
retriedTests,
pgVersions,
reportUrl,
} = params
let summary = ""
const totalTestsCount = failedTestsCount + passedTestsCount + skippedTestsCount
commentBody += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n`
summary += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n`
// Print test resuls from the newest to the oldest Postgres version for release and debug builds.
for (const pgVersion of Array.from(pgVersions).sort().reverse()) {
if (Object.keys(failedTests[pgVersion]).length > 0) {
commentBody += `#### Failures on Posgres ${pgVersion}\n\n`
summary += `#### Failures on Posgres ${pgVersion}\n\n`
for (const [testName, tests] of Object.entries(failedTests[pgVersion])) {
const links = []
for (const test of tests) {
const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}`
links.push(`[${test.buildType}](${allureLink})`)
}
commentBody += `- \`${testName}\`: ${links.join(", ")}\n`
summary += `- \`${testName}\`: ${links.join(", ")}\n`
}
const testsToRerun = Object.values(failedTests[pgVersion]).map(x => x[0].name)
const command = `DEFAULT_PG_VERSION=${pgVersion} scripts/pytest -k "${testsToRerun.join(" or ")}"`
commentBody += "```\n"
commentBody += `# Run failed on Postgres ${pgVersion} tests locally:\n`
commentBody += `${command}\n`
commentBody += "```\n"
summary += "```\n"
summary += `# Run failed on Postgres ${pgVersion} tests locally:\n`
summary += `${command}\n`
summary += "```\n"
}
}
if (flakyTestsCount > 0) {
commentBody += `<details>\n<summary>Flaky tests (${flakyTestsCount})</summary>\n\n`
summary += `<details>\n<summary>Flaky tests (${flakyTestsCount})</summary>\n\n`
for (const pgVersion of Array.from(pgVersions).sort().reverse()) {
if (Object.keys(flakyTests[pgVersion]).length > 0) {
commentBody += `#### Postgres ${pgVersion}\n\n`
summary += `#### Postgres ${pgVersion}\n\n`
for (const [testName, tests] of Object.entries(flakyTests[pgVersion])) {
const links = []
for (const test of tests) {
@@ -161,11 +167,57 @@ module.exports = async ({ github, context, fetch, report }) => {
const status = test.status === "passed" ? ":white_check_mark:" : ":x:"
links.push(`[${status} ${test.buildType}](${allureLink})`)
}
commentBody += `- \`${testName}\`: ${links.join(", ")}\n`
summary += `- \`${testName}\`: ${links.join(", ")}\n`
}
}
}
commentBody += "\n</details>\n"
summary += "\n</details>\n"
}
return summary
}
module.exports = async ({ github, context, fetch, report }) => {
// Marker to find the comment in the subsequent runs
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
// If we run the script in the PR or in the branch (main/release/...)
const isPullRequest = !!context.payload.pull_request
// Latest commit in PR or in the branch
const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha
// Let users know that the comment is updated automatically
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${commitSha} at ${new Date().toISOString()} :recycle:</sub></div>`
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
const githubActionsBotId = 41898282
// Commend body itself
let commentBody = `${startMarker}\n`
// Common parameters for GitHub API requests
const ownerRepoParams = {
owner: context.repo.owner,
repo: context.repo.repo,
}
const {reportUrl, reportJsonUrl} = report
if (!reportUrl || !reportJsonUrl) {
commentBody += `#### No tests were run or test report is not available\n`
commentBody += autoupdateNotice
return
}
try {
const parsed = await parseReportJson({ reportJsonUrl, fetch })
commentBody += await reportSummary({ ...parsed, reportUrl })
} catch (error) {
commentBody += `### [full report](${reportUrl})\n___\n`
commentBody += `#### Failed to create a summary for the test run: \n`
commentBody += "```\n"
commentBody += `${error.stack}\n`
commentBody += "```\n"
commentBody += "\nTo reproduce and debug the error locally run:\n"
commentBody += "```\n"
commentBody += `scripts/comment-test-report.js ${reportJsonUrl}`
commentBody += "\n```\n"
}
commentBody += autoupdateNotice
@@ -207,3 +259,60 @@ module.exports = async ({ github, context, fetch, report }) => {
})
}
}
// Equivalent of Python's `if __name__ == "__main__":`
// https://nodejs.org/docs/latest/api/modules.html#accessing-the-main-module
if (require.main === module) {
// Poor man's argument parsing: we expect the third argument is a JSON URL (0: node binary, 1: this script, 2: JSON url)
if (process.argv.length !== 3) {
console.error(`Unexpected number of arguments\nUsage: node ${process.argv[1]} <jsonUrl>`)
process.exit(1)
}
const jsonUrl = process.argv[2]
try {
new URL(jsonUrl)
} catch (error) {
console.error(`Invalid URL: ${jsonUrl}\nUsage: node ${process.argv[1]} <jsonUrl>`)
process.exit(1)
}
const htmlUrl = jsonUrl.replace("/data/suites.json", "/index.html")
const githubMock = {
rest: {
issues: {
createComment: console.log,
listComments: async () => ({ data: [] }),
updateComment: console.log
},
repos: {
createCommitComment: console.log,
listCommentsForCommit: async () => ({ data: [] }),
updateCommitComment: console.log
}
}
}
const contextMock = {
repo: {
owner: 'testOwner',
repo: 'testRepo'
},
payload: {
number: 42,
pull_request: null,
},
sha: '0000000000000000000000000000000000000000',
}
module.exports({
github: githubMock,
context: contextMock,
fetch: fetch,
report: {
reportUrl: htmlUrl,
reportJsonUrl: jsonUrl,
}
})
}

View File

@@ -1,12 +1,14 @@
#!/usr/bin/env python3
import argparse
import json
import logging
import os
import sys
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
import backoff
import psycopg2
import psycopg2.extras
@@ -35,9 +37,20 @@ def get_connection_cursor():
connstr = os.getenv("DATABASE_URL")
if not connstr:
err("DATABASE_URL environment variable is not set")
with psycopg2.connect(connstr, connect_timeout=30) as conn:
@backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150)
def connect(connstr):
conn = psycopg2.connect(connstr, connect_timeout=30)
conn.autocommit = True
return conn
conn = connect(connstr)
try:
with conn.cursor() as cur:
yield cur
finally:
if conn is not None:
conn.close()
def create_table(cur):
@@ -115,6 +128,7 @@ def main():
parser.add_argument(
"--ingest",
type=Path,
required=True,
help="Path to perf test result file, or directory with perf test result files",
)
parser.add_argument("--initdb", action="store_true", help="Initialuze database")
@@ -140,4 +154,5 @@ def main():
if __name__ == "__main__":
logging.getLogger("backoff").addHandler(logging.StreamHandler())
main()

View File

@@ -1,11 +1,13 @@
#!/usr/bin/env python3
import argparse
import logging
import os
import re
import sys
from contextlib import contextmanager
from pathlib import Path
import backoff
import psycopg2
CREATE_TABLE = """
@@ -29,9 +31,20 @@ def get_connection_cursor():
connstr = os.getenv("DATABASE_URL")
if not connstr:
err("DATABASE_URL environment variable is not set")
with psycopg2.connect(connstr, connect_timeout=30) as conn:
@backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150)
def connect(connstr):
conn = psycopg2.connect(connstr, connect_timeout=30)
conn.autocommit = True
return conn
conn = connect(connstr)
try:
with conn.cursor() as cur:
yield cur
finally:
if conn is not None:
conn.close()
def create_table(cur):
@@ -101,4 +114,5 @@ def main():
if __name__ == "__main__":
logging.getLogger("backoff").addHandler(logging.StreamHandler())
main()

View File

@@ -57,14 +57,16 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"libmetrics_launch_timestamp",
"libmetrics_build_info",
"libmetrics_tracing_event_count_total",
"pageserver_materialized_cache_hits_total",
"pageserver_materialized_cache_hits_direct_total",
"pageserver_getpage_reconstruct_seconds_bucket",
"pageserver_getpage_reconstruct_seconds_count",
"pageserver_getpage_reconstruct_seconds_sum",
)
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_current_logical_size",
"pageserver_resident_physical_size",
"pageserver_getpage_reconstruct_seconds_bucket",
"pageserver_getpage_reconstruct_seconds_count",
"pageserver_getpage_reconstruct_seconds_sum",
"pageserver_getpage_get_reconstruct_data_seconds_bucket",
"pageserver_getpage_get_reconstruct_data_seconds_count",
"pageserver_getpage_get_reconstruct_data_seconds_sum",
@@ -73,8 +75,6 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_io_operations_seconds_count",
"pageserver_io_operations_seconds_sum",
"pageserver_last_record_lsn",
"pageserver_materialized_cache_hits_total",
"pageserver_materialized_cache_hits_direct_total",
"pageserver_read_num_fs_layers_bucket",
"pageserver_read_num_fs_layers_count",
"pageserver_read_num_fs_layers_sum",

View File

@@ -1631,6 +1631,8 @@ class NeonPageserver(PgProtocol):
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed, retrying in .*: queue is in state Stopped.*",
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
".*Error processing HTTP request: NotFound: Timeline .* was not found",
]
def start(

View File

@@ -342,6 +342,11 @@ class PageserverHttpClient(requests.Session):
return res_json
def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs):
"""
Note that deletion is not instant, it is scheduled and performed mostly in the background.
So if you need to wait for it to complete use `timeline_delete_wait_completed`.
For longer description consult with pageserver openapi spec.
"""
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", **kwargs
)

View File

@@ -193,19 +193,30 @@ def wait_for_upload_queue_empty(
time.sleep(0.2)
def assert_timeline_detail_404(
def wait_timeline_detail_404(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
last_exc = None
for _ in range(2):
time.sleep(0.250)
try:
data = pageserver_http.timeline_detail(tenant_id, timeline_id)
log.error(f"detail {data}")
except PageserverApiException as e:
log.debug(e)
if e.status_code == 404:
return
last_exc = e
raise last_exc or RuntimeError(f"Timeline wasnt deleted in time, state: {data['state']}")
def timeline_delete_wait_completed(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
**delete_args,
):
"""Asserts that timeline_detail returns 404, or dumps the detail."""
try:
data = pageserver_http.timeline_detail(tenant_id, timeline_id)
log.error(f"detail {data}")
except PageserverApiException as e:
log.error(e)
if e.status_code == 404:
return
else:
raise
raise Exception("detail succeeded (it should return 404)")
pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args)
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id)

View File

@@ -15,7 +15,11 @@ from fixtures.neon_fixtures import (
PortDistributor,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn
from pytest import FixtureRequest
@@ -417,7 +421,7 @@ def check_neon_works(
)
shutil.rmtree(repo_dir / "local_fs_remote_storage")
pageserver_http.timeline_delete(tenant_id, timeline_id)
timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id)
pageserver_http.timeline_create(pg_version, tenant_id, timeline_id)
pg_bin.run(
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]

View File

@@ -14,7 +14,11 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import subprocess_capture
@@ -151,7 +155,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
".*files not bound to index_file.json, proceeding with their deletion.*"
)
client.timeline_delete(tenant, timeline)
timeline_delete_wait_completed(client, tenant, timeline)
# Importing correct backup works
import_tar(base_tar, wal_tar)

View File

@@ -24,7 +24,13 @@ def test_basic_eviction(
test_name="test_download_remote_layers_api",
)
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(
initial_tenant_conf={
# disable gc and compaction background loops because they perform on-demand downloads
"gc_period": "0s",
"compaction_period": "0s",
}
)
client = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
@@ -47,6 +53,11 @@ def test_basic_eviction(
client.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
# disable compute & sks to avoid on-demand downloads by walreceiver / getpage
endpoint.stop()
for sk in env.safekeepers:
sk.stop()
timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
initial_local_layers = sorted(
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))

View File

@@ -20,7 +20,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.utils import (
assert_timeline_detail_404,
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
wait_until_tenant_active,
@@ -597,14 +597,11 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
env.pageserver.allowed_errors.append(
".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping"
)
client.timeline_delete(tenant_id, timeline_id)
env.pageserver.allowed_errors.append(f".*Timeline {tenant_id}/{timeline_id} was not found.*")
env.pageserver.allowed_errors.append(
".*files not bound to index_file.json, proceeding with their deletion.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(client, tenant_id, timeline_id))
timeline_delete_wait_completed(client, tenant_id, timeline_id)
assert not timeline_path.exists()

View File

@@ -11,6 +11,7 @@ from fixtures.neon_fixtures import (
wait_for_wal_insert_lsn,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import timeline_delete_wait_completed
from fixtures.pg_version import PgVersion, xfail_on_postgres
from fixtures.types import Lsn, TenantId, TimelineId
@@ -628,12 +629,12 @@ def test_get_tenant_size_with_multiple_branches(
size_debug_file_before.write(size_debug)
# teardown, delete branches, and the size should be going down
http_client.timeline_delete(tenant_id, first_branch_timeline_id)
timeline_delete_wait_completed(http_client, tenant_id, first_branch_timeline_id)
size_after_deleting_first = http_client.tenant_size(tenant_id)
assert size_after_deleting_first < size_after_thinning_branch
http_client.timeline_delete(tenant_id, second_branch_timeline_id)
timeline_delete_wait_completed(http_client, tenant_id, second_branch_timeline_id)
size_after_deleting_second = http_client.tenant_size(tenant_id)
assert size_after_deleting_second < size_after_deleting_first

View File

@@ -1,6 +1,10 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.pageserver.utils import assert_tenant_state, wait_until_tenant_active
from fixtures.pageserver.utils import (
assert_tenant_state,
timeline_delete_wait_completed,
wait_until_tenant_active,
)
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
@@ -24,7 +28,7 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
def delete_all_timelines(tenant: TenantId):
timelines = [TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)]
for t in timelines:
client.timeline_delete(tenant, t)
timeline_delete_wait_completed(client, tenant, t)
# Create tenant, start compute
tenant, _ = env.neon_cli.create_tenant()

View File

@@ -21,6 +21,7 @@ from fixtures.neon_fixtures import (
RemoteStorageKind,
available_remote_storages,
)
from fixtures.pageserver.utils import timeline_delete_wait_completed
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -213,7 +214,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
# Test (a subset of) pageserver global metrics
for metric in PAGESERVER_GLOBAL_METRICS:
ps_samples = ps_metrics.query_all(metric, {})
assert len(ps_samples) > 0
assert len(ps_samples) > 0, f"expected at least one sample for {metric}"
for sample in ps_samples:
labels = ",".join([f'{key}="{value}"' for key, value in sample.labels.items()])
log.info(f"{sample.name}{{{labels}}} {sample.value}")
@@ -318,9 +319,10 @@ def test_pageserver_with_empty_tenants(
client.tenant_create(tenant_with_empty_timelines)
temp_timelines = client.timeline_list(tenant_with_empty_timelines)
for temp_timeline in temp_timelines:
client.timeline_delete(
tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
timeline_delete_wait_completed(
client, tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(

View File

@@ -17,9 +17,10 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
assert_timeline_detail_404,
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
wait_timeline_detail_404,
wait_until_tenant_active,
wait_until_timeline_state,
)
@@ -83,7 +84,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
wait_until(
number_of_iterations=3,
interval=0.2,
func=lambda: ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id),
func=lambda: timeline_delete_wait_completed(ps_http, env.initial_tenant, leaf_timeline_id),
)
assert not timeline_path.exists()
@@ -94,16 +95,16 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found",
) as exc:
ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)
# FIXME leaves tenant without timelines, should we prevent deletion of root timeline?
wait_until(
number_of_iterations=3,
interval=0.2,
func=lambda: ps_http.timeline_delete(env.initial_tenant, parent_timeline_id),
)
assert exc.value.status_code == 404
wait_until(
number_of_iterations=3,
interval=0.2,
func=lambda: timeline_delete_wait_completed(
ps_http, env.initial_tenant, parent_timeline_id
),
)
# Check that we didn't pick up the timeline again after restart.
# See https://github.com/neondatabase/neon/issues/3560
env.pageserver.stop(immediate=True)
@@ -143,7 +144,6 @@ def test_delete_timeline_post_rm_failure(
ps_http.configure_failpoints((failpoint_name, "return"))
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline)
timeline_info = wait_until_timeline_state(
pageserver_http=ps_http,
tenant_id=env.initial_tenant,
@@ -165,13 +165,7 @@ def test_delete_timeline_post_rm_failure(
# this should succeed
# this also checks that delete can be retried even when timeline is in Broken state
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline, timeout=2)
with pytest.raises(PageserverApiException) as e:
ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
assert e.value.status_code == 404
env.pageserver.allowed_errors.append(f".*NotFound: Timeline.*{env.initial_timeline}.*")
timeline_delete_wait_completed(ps_http, env.initial_tenant, env.initial_timeline)
env.pageserver.allowed_errors.append(
f".*{env.initial_timeline}.*timeline directory not found, proceeding anyway.*"
)
@@ -247,13 +241,7 @@ def test_timeline_resurrection_on_attach(
pass
# delete new timeline
ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {tenant_id}/{branch_timeline_id} was not found.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, branch_timeline_id))
timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=branch_timeline_id)
##### Stop the pageserver instance, erase all its data
env.endpoints.stop_all()
@@ -338,7 +326,6 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
)
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
timeline_info = wait_until_timeline_state(
pageserver_http=ps_http,
tenant_id=env.initial_tenant,
@@ -357,12 +344,15 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
# Wait for tenant to finish loading.
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{leaf_timeline_id} was not found.*"
)
wait_until(
2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id)
)
try:
data = ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)
log.debug(f"detail {data}")
except PageserverApiException as e:
log.debug(e)
if e.status_code != 404:
raise
else:
raise Exception("detail succeeded (it should return 404)")
assert (
not leaf_timeline_path.exists()
@@ -389,13 +379,8 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
assert env.initial_timeline is not None
for timeline_id in (intermediate_timeline_id, env.initial_timeline):
ps_http.timeline_delete(env.initial_tenant, timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*"
)
wait_until(
2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, timeline_id)
timeline_delete_wait_completed(
ps_http, tenant_id=env.initial_tenant, timeline_id=timeline_id
)
assert_prefix_empty(
@@ -419,23 +404,27 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
)
def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
neon_env_builder: NeonEnvBuilder,
@pytest.mark.parametrize(
"stuck_failpoint",
["persist_deleted_index_part", "in_progress_delete"],
)
def test_concurrent_timeline_delete_stuck_on(
neon_env_builder: NeonEnvBuilder, stuck_failpoint: str
):
"""
If we're stuck uploading the index file with the is_delete flag,
eventually console will hand up and retry.
If we're still stuck at the retry time, ensure that the retry
fails with status 500, signalling to console that it should retry
later.
Ideally, timeline_delete should return 202 Accepted and require
console to poll for completion, but, that would require changing
the API contract.
If delete is stuck console will eventually retry deletion.
So we need to be sure that these requests wont interleave with each other.
In this tests we check two places where we can spend a lot of time.
This is a regression test because there was a bug when DeletionGuard wasnt propagated
to the background task.
Ensure that when retry comes if we're still stuck request will get an immediate error response,
signalling to console that it should retry later.
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
test_name="test_concurrent_timeline_delete_if_first_stuck_at_index_upload",
test_name=f"concurrent_timeline_delete_stuck_on_{stuck_failpoint}",
)
env = neon_env_builder.init_start()
@@ -445,13 +434,14 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
ps_http = env.pageserver.http_client()
# make the first call sleep practically forever
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
ps_http.configure_failpoints((failpoint_name, "pause"))
ps_http.configure_failpoints((stuck_failpoint, "pause"))
def first_call(result_queue):
try:
log.info("first call start")
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10)
timeline_delete_wait_completed(
ps_http, env.initial_tenant, child_timeline_id, timeout=10
)
log.info("first call success")
result_queue.put("success")
except Exception:
@@ -466,7 +456,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
def first_call_hit_failpoint():
assert env.pageserver.log_contains(
f".*{child_timeline_id}.*at failpoint {failpoint_name}"
f".*{child_timeline_id}.*at failpoint {stuck_failpoint}"
)
wait_until(50, 0.1, first_call_hit_failpoint)
@@ -484,8 +474,12 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
)
log.info("second call failed as expected")
# ensure it is not 404 and stopping
detail = ps_http.timeline_detail(env.initial_tenant, child_timeline_id)
assert detail["state"] == "Stopping"
# by now we know that the second call failed, let's ensure the first call will finish
ps_http.configure_failpoints((failpoint_name, "off"))
ps_http.configure_failpoints((stuck_failpoint, "off"))
result = first_call_result.get()
assert result == "success"
@@ -498,8 +492,10 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
"""
If the client hangs up before we start the index part upload but after we mark it
If the client hangs up before we start the index part upload but after deletion is scheduled
we mark it
deleted in local memory, a subsequent delete_timeline call should be able to do
another delete timeline operation.
This tests cancel safety up to the given failpoint.
@@ -515,12 +511,18 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
ps_http = env.pageserver.http_client()
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
failpoint_name = "persist_deleted_index_part"
ps_http.configure_failpoints((failpoint_name, "pause"))
with pytest.raises(requests.exceptions.Timeout):
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*timeline deletion is already in progress.*"
)
with pytest.raises(PageserverApiException, match="timeline deletion is already in progress"):
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
# make sure the timeout was due to the failpoint
at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*"
@@ -552,12 +554,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
wait_until(50, 0.1, first_request_finished)
# check that the timeline is gone
notfound_message = f"Timeline {env.initial_tenant}/{child_timeline_id} was not found"
env.pageserver.allowed_errors.append(".*" + notfound_message)
with pytest.raises(PageserverApiException, match=notfound_message) as exc:
ps_http.timeline_detail(env.initial_tenant, child_timeline_id)
assert exc.value.status_code == 404
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id)
@pytest.mark.parametrize(
@@ -616,12 +613,7 @@ def test_timeline_delete_works_for_remote_smoke(
for timeline_id in reversed(timeline_ids):
# note that we need to finish previous deletion before scheduling next one
# otherwise we can get an "HasChildren" error if deletion is not fast enough (real_s3)
ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, timeline_id))
timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=timeline_id)
assert_prefix_empty(
neon_env_builder,

View File

@@ -24,6 +24,7 @@ from fixtures.neon_fixtures import (
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.utils import (
assert_tenant_state,
timeline_delete_wait_completed,
wait_for_upload_queue_empty,
wait_until_tenant_active,
)
@@ -272,7 +273,7 @@ def test_timeline_initial_logical_size_calculation_cancellation(
if deletion_method == "tenant_detach":
client.tenant_detach(tenant_id)
elif deletion_method == "timeline_delete":
client.timeline_delete(tenant_id, timeline_id)
timeline_delete_wait_completed(client, tenant_id, timeline_id)
delete_timeline_success.put(True)
except PageserverApiException:
delete_timeline_success.put(False)

View File

@@ -31,7 +31,11 @@ from fixtures.neon_fixtures import (
SafekeeperPort,
available_remote_storages,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import get_dir_size, query_scalar, start_in_background
@@ -548,15 +552,15 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
f"sk_id={sk.id} to flush {last_lsn}",
)
ps_cli = env.pageserver.http_client()
pageserver_lsn = Lsn(ps_cli.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
ps_http = env.pageserver.http_client()
pageserver_lsn = Lsn(ps_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
lag = last_lsn - pageserver_lsn
log.info(
f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb"
)
endpoint.stop_and_destroy()
ps_cli.timeline_delete(tenant_id, timeline_id)
timeline_delete_wait_completed(ps_http, tenant_id, timeline_id)
# Also delete and manually create timeline on safekeepers -- this tests
# scenario of manual recovery on different set of safekeepers.
@@ -583,7 +587,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
shutil.copy(f_partial_saved, f_partial_path)
# recreate timeline on pageserver from scratch
ps_cli.timeline_create(
ps_http.timeline_create(
pg_version=PgVersion(pg_version),
tenant_id=tenant_id,
new_timeline_id=timeline_id,
@@ -598,7 +602,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
if elapsed > wait_lsn_timeout:
raise RuntimeError("Timed out waiting for WAL redo")
tenant_status = ps_cli.tenant_status(tenant_id)
tenant_status = ps_http.tenant_status(tenant_id)
if tenant_status["state"]["slug"] == "Loading":
log.debug(f"Tenant {tenant_id} is still loading, retrying")
else: