diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 897e1a7aad..94fbb02cf6 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -623,51 +623,6 @@ jobs: - name: Cleanup ECR folder run: rm -rf ~/.ecr - - neon-image-depot: - # For testing this will run side-by-side for a few merges. - # This action is not really optimized yet, but gets the job done - runs-on: [ self-hosted, gen3, large ] - needs: [ tag ] - container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned - permissions: - contents: read - id-token: write - - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - submodules: true - fetch-depth: 0 - - - name: Setup go - uses: actions/setup-go@v3 - with: - go-version: '1.19' - - - name: Set up Depot CLI - uses: depot/setup-action@v1 - - - name: Install Crane & ECR helper - run: go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0 - - - name: Configure ECR login - run: | - mkdir /github/home/.docker/ - echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json - - - name: Build and push - uses: depot/build-push-action@v1 - with: - # if no depot.json file is at the root of your repo, you must specify the project id - project: nrdv0s4kcs - push: true - tags: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:depot-${{needs.tag.outputs.build-tag}} - build-args: | - GIT_VERSION=${{ github.sha }} - REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com - compute-tools-image: runs-on: [ self-hosted, gen3, large ] needs: [ tag ] @@ -704,6 +659,7 @@ jobs: --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} + --build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}} --build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} @@ -761,6 +717,7 @@ jobs: --context . --build-arg GIT_VERSION=${{ github.sha }} --build-arg PG_VERSION=${{ matrix.version }} + --build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}} --build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com --dockerfile Dockerfile.compute-node --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} @@ -959,6 +916,20 @@ jobs: exit 1 fi + - name: Create tag "release-${{ needs.tag.outputs.build-tag }}" + if: github.ref_name == 'release' + uses: actions/github-script@v6 + with: + # Retry script for 5XX server errors: https://github.com/actions/github-script#retries + retries: 5 + script: | + github.rest.git.createRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref: "refs/tags/release-${{ needs.tag.outputs.build-tag }}", + sha: context.sha, + }) + promote-compatibility-data: runs-on: [ self-hosted, gen3, small ] container: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4bce9cdd1e..595ee05514 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -3,6 +3,7 @@ name: Create Release Branch on: schedule: - cron: '0 10 * * 2' + workflow_dispatch: jobs: create_release_branch: diff --git a/Cargo.lock b/Cargo.lock index 71a6699c50..4be74614c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2349,9 +2349,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "openssl" -version = "0.10.52" +version = "0.10.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56" +checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d" dependencies = [ "bitflags", "cfg-if", @@ -2381,9 +2381,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.87" +version = "0.9.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e" +checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6" dependencies = [ "cc", "libc", diff --git a/Dockerfile.compute-node b/Dockerfile.compute-node index ae330d8a20..fc575536bc 100644 --- a/Dockerfile.compute-node +++ b/Dockerfile.compute-node @@ -2,6 +2,7 @@ ARG PG_VERSION ARG REPOSITORY=neondatabase ARG IMAGE=rust ARG TAG=pinned +ARG BUILD_TAG ######################################################################################### # @@ -634,6 +635,9 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \ # ######################################################################################### FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools +ARG BUILD_TAG +ENV BUILD_TAG=$BUILD_TAG + USER nonroot # Copy entire project to get Cargo.* files with proper dependencies for the whole project COPY --chown=nonroot . . diff --git a/Dockerfile.compute-tools b/Dockerfile.compute-tools index e86fb40ca4..3066e3f7ca 100644 --- a/Dockerfile.compute-tools +++ b/Dockerfile.compute-tools @@ -3,6 +3,7 @@ ARG REPOSITORY=neondatabase ARG IMAGE=rust ARG TAG=pinned +ARG BUILD_TAG FROM $REPOSITORY/$IMAGE:$TAG AS rust-build WORKDIR /home/nonroot @@ -16,6 +17,8 @@ ENV CACHEPOT_S3_KEY_PREFIX=cachepot ARG CACHEPOT_BUCKET=neon-github-dev #ARG AWS_ACCESS_KEY_ID #ARG AWS_SECRET_ACCESS_KEY +ARG BUILD_TAG +ENV BUILD_TAG=$BUILD_TAG COPY . . diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index c6cfde1d1a..90b39e9dd9 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -54,9 +54,15 @@ use compute_tools::monitor::launch_monitor; use compute_tools::params::*; use compute_tools::spec::*; +const BUILD_TAG_DEFAULT: &str = "local"; + fn main() -> Result<()> { init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; + let build_tag = option_env!("BUILD_TAG").unwrap_or(BUILD_TAG_DEFAULT); + + info!("build_tag: {build_tag}"); + let matches = cli().get_matches(); let http_port = *matches diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index d3131ac476..52683ff1c3 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -67,6 +67,7 @@ pub struct EndpointConf { pg_port: u16, http_port: u16, pg_version: u32, + skip_pg_catalog_updates: bool, } // @@ -135,6 +136,7 @@ impl ComputeControlPlane { mode, tenant_id, pg_version, + skip_pg_catalog_updates: false, }); ep.create_endpoint_dir()?; @@ -148,6 +150,7 @@ impl ComputeControlPlane { http_port, pg_port, pg_version, + skip_pg_catalog_updates: false, })?, )?; std::fs::write( @@ -183,6 +186,9 @@ pub struct Endpoint { // the endpoint runs in. pub env: LocalEnv, pageserver: Arc, + + // Optimizations + skip_pg_catalog_updates: bool, } impl Endpoint { @@ -216,6 +222,7 @@ impl Endpoint { mode: conf.mode, tenant_id: conf.tenant_id, pg_version: conf.pg_version, + skip_pg_catalog_updates: conf.skip_pg_catalog_updates, }) } @@ -450,7 +457,7 @@ impl Endpoint { // Create spec file let spec = ComputeSpec { - skip_pg_catalog_updates: false, + skip_pg_catalog_updates: self.skip_pg_catalog_updates, format_version: 1.0, operation_uuid: None, cluster: Cluster { diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index ac1f8a357e..0e9c237e1e 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -70,6 +70,14 @@ impl RemotePath { pub fn join(&self, segment: &Path) -> Self { Self(self.0.join(segment)) } + + pub fn get_path(&self) -> &PathBuf { + &self.0 + } + + pub fn extension(&self) -> Option<&str> { + self.0.extension()?.to_str() + } } /// Storage (potentially remote) API to manage its state. @@ -86,6 +94,19 @@ pub trait RemoteStorage: Send + Sync + 'static { prefix: Option<&RemotePath>, ) -> Result, DownloadError>; + /// Lists all files in directory "recursively" + /// (not really recursively, because AWS has a flat namespace) + /// Note: This is subtely different than list_prefixes, + /// because it is for listing files instead of listing + /// names sharing common prefixes. + /// For example, + /// list_files("foo/bar") = ["foo/bar/cat123.txt", + /// "foo/bar/cat567.txt", "foo/bar/dog123.txt", "foo/bar/dog456.txt"] + /// whereas, + /// list_prefixes("foo/bar/") = ["cat", "dog"] + /// See `test_real_s3.rs` for more details. + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result>; + /// Streams the local file contents into remote into the remote storage entry. async fn upload( &self, @@ -174,6 +195,14 @@ impl GenericRemoteStorage { } } + pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + match self { + Self::LocalFs(s) => s.list_files(folder).await, + Self::AwsS3(s) => s.list_files(folder).await, + Self::Unreliable(s) => s.list_files(folder).await, + } + } + pub async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 59304c2481..ca5fbd5de5 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -48,6 +48,14 @@ impl LocalFs { Ok(Self { storage_root }) } + // mirrors S3Bucket::s3_object_to_relative_path + fn local_file_to_relative_path(&self, key: PathBuf) -> RemotePath { + let relative_path = key + .strip_prefix(&self.storage_root) + .expect("relative path must contain storage_root as prefix"); + RemotePath(relative_path.into()) + } + async fn read_storage_metadata( &self, file_path: &Path, @@ -132,6 +140,34 @@ impl RemoteStorage for LocalFs { Ok(prefixes) } + // recursively lists all files in a directory, + // mirroring the `list_files` for `s3_bucket` + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + let full_path = match folder { + Some(folder) => folder.with_base(&self.storage_root), + None => self.storage_root.clone(), + }; + let mut files = vec![]; + let mut directory_queue = vec![full_path.clone()]; + + while !directory_queue.is_empty() { + let cur_folder = directory_queue + .pop() + .expect("queue cannot be empty: we just checked"); + let mut entries = fs::read_dir(cur_folder.clone()).await?; + while let Some(entry) = entries.next_entry().await? { + let file_name: PathBuf = entry.file_name().into(); + let full_file_name = cur_folder.clone().join(&file_name); + let file_remote_path = self.local_file_to_relative_path(full_file_name.clone()); + files.push(file_remote_path.clone()); + if full_file_name.is_dir() { + directory_queue.push(full_file_name); + } + } + } + Ok(files) + } + async fn upload( &self, data: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index dafb6dcb45..43d818dfb9 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -347,6 +347,51 @@ impl RemoteStorage for S3Bucket { Ok(document_keys) } + /// See the doc for `RemoteStorage::list_files` + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + let folder_name = folder + .map(|p| self.relative_path_to_s3_object(p)) + .or_else(|| self.prefix_in_bucket.clone()); + + // AWS may need to break the response into several parts + let mut continuation_token = None; + let mut all_files = vec![]; + loop { + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 list_files")?; + metrics::inc_list_objects(); + + let response = self + .client + .list_objects_v2() + .bucket(self.bucket_name.clone()) + .set_prefix(folder_name.clone()) + .set_continuation_token(continuation_token) + .set_max_keys(self.max_keys_per_list_response) + .send() + .await + .map_err(|e| { + metrics::inc_list_objects_fail(); + e + }) + .context("Failed to list files in S3 bucket")?; + + for object in response.contents().unwrap_or_default() { + let object_path = object.key().expect("response does not contain a key"); + let remote_path = self.s3_object_to_relative_path(object_path); + all_files.push(remote_path); + } + match response.next_continuation_token { + Some(new_token) => continuation_token = Some(new_token), + None => break, + } + } + Ok(all_files) + } + async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 741c18bf6f..c46ca14ace 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -83,6 +83,11 @@ impl RemoteStorage for UnreliableWrapper { self.inner.list_prefixes(prefix).await } + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?; + self.inner.list_files(folder).await + } + async fn upload( &self, data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 5f52b0754c..6fe65a0362 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -88,6 +88,58 @@ async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> any Ok(()) } +/// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries. +/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set. +/// See `s3_pagination_should_work` for more information. +/// +/// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_s3_data`] +/// Then performs the following queries: +/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt` +/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt` +#[test_context(MaybeEnabledS3WithSimpleTestBlobs)] +#[tokio::test] +async fn s3_list_files_works(ctx: &mut MaybeEnabledS3WithSimpleTestBlobs) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledS3WithSimpleTestBlobs::Enabled(ctx) => ctx, + MaybeEnabledS3WithSimpleTestBlobs::Disabled => return Ok(()), + MaybeEnabledS3WithSimpleTestBlobs::UploadsFailed(e, _) => { + anyhow::bail!("S3 init failed: {e:?}") + } + }; + let test_client = Arc::clone(&ctx.enabled.client); + let base_prefix = + RemotePath::new(Path::new("folder1")).context("common_prefix construction")?; + let root_files = test_client + .list_files(None) + .await + .context("client list root files failure")? + .into_iter() + .collect::>(); + assert_eq!( + root_files, + ctx.remote_blobs.clone(), + "remote storage list_files on root mismatches with the uploads." + ); + let nested_remote_files = test_client + .list_files(Some(&base_prefix)) + .await + .context("client list nested files failure")? + .into_iter() + .collect::>(); + let trim_remote_blobs: HashSet<_> = ctx + .remote_blobs + .iter() + .map(|x| x.get_path().to_str().expect("must be valid name")) + .filter(|x| x.starts_with("folder1")) + .map(|x| RemotePath::new(Path::new(x)).expect("must be valid name")) + .collect(); + assert_eq!( + nested_remote_files, trim_remote_blobs, + "remote storage list_files on subdirrectory mismatches with the uploads." + ); + Ok(()) +} + #[test_context(MaybeEnabledS3)] #[tokio::test] async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { @@ -248,6 +300,66 @@ impl AsyncTestContext for MaybeEnabledS3WithTestBlobs { } } +// NOTE: the setups for the list_prefixes test and the list_files test are very similar +// However, they are not idential. The list_prefixes function is concerned with listing prefixes, +// whereas the list_files function is concerned with listing files. +// See `RemoteStorage::list_files` documentation for more details +enum MaybeEnabledS3WithSimpleTestBlobs { + Enabled(S3WithSimpleTestBlobs), + Disabled, + UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs), +} +struct S3WithSimpleTestBlobs { + enabled: EnabledS3, + remote_blobs: HashSet, +} + +#[async_trait::async_trait] +impl AsyncTestContext for MaybeEnabledS3WithSimpleTestBlobs { + async fn setup() -> Self { + ensure_logging_ready(); + if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() { + info!( + "`{}` env variable is not set, skipping the test", + ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME + ); + return Self::Disabled; + } + + let max_keys_in_list_response = 10; + let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap()); + + let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await; + + match upload_simple_s3_data(&enabled.client, upload_tasks_count).await { + ControlFlow::Continue(uploads) => { + info!("Remote objects created successfully"); + + Self::Enabled(S3WithSimpleTestBlobs { + enabled, + remote_blobs: uploads, + }) + } + ControlFlow::Break(uploads) => Self::UploadsFailed( + anyhow::anyhow!("One or multiple blobs failed to upload to S3"), + S3WithSimpleTestBlobs { + enabled, + remote_blobs: uploads, + }, + ), + } + } + + async fn teardown(self) { + match self { + Self::Disabled => {} + Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => { + cleanup(&ctx.enabled.client, ctx.remote_blobs).await; + } + } + } +} + fn create_s3_client( max_keys_per_list_response: Option, ) -> anyhow::Result> { @@ -258,7 +370,7 @@ fn create_s3_client( let random_prefix_part = std::time::SystemTime::now() .duration_since(UNIX_EPOCH) .context("random s3 test prefix part calculation")? - .as_millis(); + .as_nanos(); let remote_storage_config = RemoteStorageConfig { max_concurrent_syncs: NonZeroUsize::new(100).unwrap(), max_sync_errors: NonZeroU32::new(5).unwrap(), @@ -364,3 +476,52 @@ async fn cleanup(client: &Arc, objects_to_delete: HashSet< } } } + +// Uploads files `folder{j}/blob{i}.txt`. See test description for more details. +async fn upload_simple_s3_data( + client: &Arc, + upload_tasks_count: usize, +) -> ControlFlow, HashSet> { + info!("Creating {upload_tasks_count} S3 files"); + let mut upload_tasks = JoinSet::new(); + for i in 1..upload_tasks_count + 1 { + let task_client = Arc::clone(client); + upload_tasks.spawn(async move { + let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i)); + let blob_path = RemotePath::new(&blob_path) + .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?; + debug!("Creating remote item {i} at path {blob_path:?}"); + + let data = format!("remote blob data {i}").into_bytes(); + let data_len = data.len(); + task_client + .upload(std::io::Cursor::new(data), data_len, &blob_path, None) + .await?; + + Ok::<_, anyhow::Error>(blob_path) + }); + } + + let mut upload_tasks_failed = false; + let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count); + while let Some(task_run_result) = upload_tasks.join_next().await { + match task_run_result + .context("task join failed") + .and_then(|task_result| task_result.context("upload task failed")) + { + Ok(upload_path) => { + uploaded_blobs.insert(upload_path); + } + Err(e) => { + error!("Upload task failed: {e:?}"); + upload_tasks_failed = true; + } + } + } + + if upload_tasks_failed { + ControlFlow::Break(uploaded_blobs) + } else { + ControlFlow::Continue(uploaded_blobs) + } +} diff --git a/libs/utils/src/http/error.rs b/libs/utils/src/http/error.rs index f9c06453df..527e486fd0 100644 --- a/libs/utils/src/http/error.rs +++ b/libs/utils/src/http/error.rs @@ -1,5 +1,6 @@ use hyper::{header, Body, Response, StatusCode}; use serde::{Deserialize, Serialize}; +use std::error::Error as StdError; use thiserror::Error; use tracing::error; @@ -15,7 +16,7 @@ pub enum ApiError { Unauthorized(String), #[error("NotFound: {0}")] - NotFound(anyhow::Error), + NotFound(Box), #[error("Conflict: {0}")] Conflict(String), diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 17e6e3fb2a..2046d27b1e 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -96,12 +96,12 @@ pub mod defaults { #background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}' -# [tenant_config] +[tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} #compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes #compaction_period = '{DEFAULT_COMPACTION_PERIOD}' -#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}' +#compaction_threshold = {DEFAULT_COMPACTION_THRESHOLD} #gc_period = '{DEFAULT_GC_PERIOD}' #gc_horizon = {DEFAULT_GC_HORIZON} @@ -111,7 +111,8 @@ pub mod defaults { #min_resident_size_override = .. # in bytes #evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}' #gc_feedback = false -# [remote_storage] + +[remote_storage] "### ); diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index fc8da70cc0..5bec07b74a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -142,7 +142,7 @@ impl From for ApiError { impl From for ApiError { fn from(tse: TenantStateError) -> ApiError { match tse { - TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid)), + TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()), _ => ApiError::InternalServerError(anyhow::Error::new(tse)), } } @@ -151,7 +151,7 @@ impl From for ApiError { impl From for ApiError { fn from(tse: GetTenantError) -> ApiError { match tse { - GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid)), + GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()), e @ GetTenantError::NotActive(_) => { // Why is this not `ApiError::NotFound`? // Because we must be careful to never return 404 for a tenant if it does @@ -169,7 +169,7 @@ impl From for ApiError { fn from(e: SetNewTenantConfigError) -> ApiError { match e { SetNewTenantConfigError::GetTenant(tid) => { - ApiError::NotFound(anyhow!("tenant {}", tid)) + ApiError::NotFound(anyhow!("tenant {}", tid).into()) } e @ SetNewTenantConfigError::Persist(_) => { ApiError::InternalServerError(anyhow::Error::new(e)) @@ -182,7 +182,7 @@ impl From for ApiError { fn from(value: crate::tenant::DeleteTimelineError) -> Self { use crate::tenant::DeleteTimelineError::*; match value { - NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found")), + NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()), HasChildren(children) => ApiError::PreconditionFailed( format!("Cannot delete timeline which has child timelines: {children:?}") .into_boxed_str(), @@ -397,7 +397,7 @@ async fn timeline_detail_handler( let timeline = tenant .get_timeline(timeline_id, false) - .map_err(ApiError::NotFound)?; + .map_err(|e| ApiError::NotFound(e.into()))?; let timeline_info = build_timeline_info( &timeline, @@ -1061,7 +1061,7 @@ async fn timeline_download_remote_layers_handler_get( let info = timeline .get_download_all_remote_layers_task_info() .context("task never started since last pageserver process start") - .map_err(ApiError::NotFound)?; + .map_err(|e| ApiError::NotFound(e.into()))?; json_response(StatusCode::OK, info) } @@ -1072,7 +1072,7 @@ async fn active_timeline_of_active_tenant( let tenant = mgr::get_tenant(tenant_id, true).await?; tenant .get_timeline(timeline_id, true) - .map_err(ApiError::NotFound) + .map_err(|e| ApiError::NotFound(e.into())) } async fn always_panic_handler( diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 9ad0124a80..5bff5337bd 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -148,17 +148,17 @@ async fn import_rel( // because there is no guarantee about the order in which we are processing segments. // ignore "relation already exists" error // - // FIXME: use proper error type for this, instead of parsing the error message. - // Or better yet, keep track of which relations we've already created + // FIXME: Keep track of which relations we've already created? // https://github.com/neondatabase/neon/issues/3309 if let Err(e) = modification .put_rel_creation(rel, nblocks as u32, ctx) .await { - if e.to_string().contains("already exists") { - debug!("relation {} already exists. we must be extending it", rel); - } else { - return Err(e); + match e { + RelationError::AlreadyExists => { + debug!("Relation {} already exist. We must be extending it.", rel) + } + _ => return Err(e.into()), } } diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 89bca5fa9f..7227bb63b6 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1,4 +1,3 @@ -use metrics::core::{AtomicU64, GenericCounter}; use metrics::{ register_counter_vec, register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec, @@ -95,21 +94,19 @@ static READ_NUM_FS_LAYERS: Lazy = Lazy::new(|| { }); // Metrics collected on operations on the storage repository. -static RECONSTRUCT_TIME: Lazy = Lazy::new(|| { - register_histogram_vec!( +pub static RECONSTRUCT_TIME: Lazy = Lazy::new(|| { + register_histogram!( "pageserver_getpage_reconstruct_seconds", - "Time spent in reconstruct_value", - &["tenant_id", "timeline_id"], + "Time spent in reconstruct_value (reconstruct a page from deltas)", CRITICAL_OP_BUCKETS.into(), ) .expect("failed to define a metric") }); -static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy = Lazy::new(|| { + register_int_counter!( "pageserver_materialized_cache_hits_direct_total", "Number of cache hits from materialized page cache without redo", - &["tenant_id", "timeline_id"] ) .expect("failed to define a metric") }); @@ -124,11 +121,10 @@ static GET_RECONSTRUCT_DATA_TIME: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); -static MATERIALIZED_PAGE_CACHE_HIT: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static MATERIALIZED_PAGE_CACHE_HIT: Lazy = Lazy::new(|| { + register_int_counter!( "pageserver_materialized_cache_hits_total", "Number of cache hits from materialized page cache", - &["tenant_id", "timeline_id"] ) .expect("failed to define a metric") }); @@ -753,10 +749,7 @@ pub struct TimelineMetrics { fake: bool, tenant_id: String, timeline_id: String, - pub reconstruct_time_histo: Histogram, pub get_reconstruct_data_time_histo: Histogram, - pub materialized_page_cache_hit_counter: GenericCounter, - pub materialized_page_cache_hit_upon_request_counter: GenericCounter, pub flush_time_histo: StorageTimeMetrics, pub compact_time_histo: StorageTimeMetrics, pub create_images_time_histo: StorageTimeMetrics, @@ -785,15 +778,9 @@ impl TimelineMetrics { ) -> Self { let tenant_id = tenant_id.to_string(); let timeline_id = timeline_id.to_string(); - let reconstruct_time_histo = RECONSTRUCT_TIME - .get_metric_with_label_values(&[&tenant_id, &timeline_id]) - .unwrap(); let get_reconstruct_data_time_histo = GET_RECONSTRUCT_DATA_TIME .get_metric_with_label_values(&[&tenant_id, &timeline_id]) .unwrap(); - let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT - .get_metric_with_label_values(&[&tenant_id, &timeline_id]) - .unwrap(); let flush_time_histo = StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id); let compact_time_histo = @@ -835,20 +822,19 @@ impl TimelineMetrics { let read_num_fs_layers = READ_NUM_FS_LAYERS .get_metric_with_label_values(&[&tenant_id, &timeline_id]) .unwrap(); - let materialized_page_cache_hit_upon_request_counter = MATERIALIZED_PAGE_CACHE_HIT_DIRECT - .get_metric_with_label_values(&[&tenant_id, &timeline_id]) - .unwrap(); let evictions_with_low_residence_duration = evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id); + // TODO(chi): remove this once we remove Lazy for all metrics. Otherwise this will not appear in the exporter + // and integration test will error. + MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get(); + MATERIALIZED_PAGE_CACHE_HIT.get(); + let m = TimelineMetrics { fake, tenant_id, timeline_id, - reconstruct_time_histo, get_reconstruct_data_time_histo, - materialized_page_cache_hit_counter, - materialized_page_cache_hit_upon_request_counter, flush_time_histo, compact_time_histo, create_images_time_histo, @@ -879,10 +865,7 @@ impl TimelineMetrics { fn remove_metrics(&self) { let tenant_id = &self.tenant_id; let timeline_id = &self.timeline_id; - let _ = RECONSTRUCT_TIME.remove_label_values(&[tenant_id, timeline_id]); let _ = GET_RECONSTRUCT_DATA_TIME.remove_label_values(&[tenant_id, timeline_id]); - let _ = MATERIALIZED_PAGE_CACHE_HIT.remove_label_values(&[tenant_id, timeline_id]); - let _ = MATERIALIZED_PAGE_CACHE_HIT_DIRECT.remove_label_values(&[tenant_id, timeline_id]); let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]); let _ = WAIT_LSN_TIME.remove_label_values(&[tenant_id, timeline_id]); let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 22338c83e4..90e404976f 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -394,7 +394,9 @@ impl PageServerHandler { }; // Check that the timeline exists - let timeline = tenant.get_timeline(timeline_id, true)?; + let timeline = tenant + .get_timeline(timeline_id, true) + .map_err(|e| anyhow::anyhow!(e))?; // switch client to COPYBOTH pgb.write_message_noflush(&BeMessage::CopyBothResponse)?; @@ -1273,6 +1275,6 @@ async fn get_active_tenant_timeline( .map_err(GetActiveTimelineError::Tenant)?; let timeline = tenant .get_timeline(timeline_id, true) - .map_err(GetActiveTimelineError::Timeline)?; + .map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?; Ok(timeline) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 86c84ec82f..998c199ba6 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -43,6 +43,16 @@ pub enum CalculateLogicalSizeError { Other(#[from] anyhow::Error), } +#[derive(Debug, thiserror::Error)] +pub enum RelationError { + #[error("Relation Already Exists")] + AlreadyExists, + #[error("invalid relnode")] + InvalidRelnode, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + /// /// This impl provides all the functionality to store PostgreSQL relations, SLRUs, /// and other special kinds of files, in a versioned key-value store. The @@ -101,9 +111,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { - return Err(PageReconstructError::Other(anyhow::anyhow!( - "invalid relnode" - ))); + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); } let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?; @@ -148,9 +158,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { - return Err(PageReconstructError::Other(anyhow::anyhow!( - "invalid relnode" - ))); + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); } if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) { @@ -193,9 +203,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { - return Err(PageReconstructError::Other(anyhow::anyhow!( - "invalid relnode" - ))); + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); } // first try to lookup relation in cache @@ -724,7 +734,7 @@ impl<'a> DatadirModification<'a> { blknum: BlockNumber, rec: NeonWalRecord, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec)); Ok(()) } @@ -751,7 +761,7 @@ impl<'a> DatadirModification<'a> { blknum: BlockNumber, img: Bytes, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); self.put(rel_block_to_key(rel, blknum), Value::Image(img)); Ok(()) } @@ -875,32 +885,38 @@ impl<'a> DatadirModification<'a> { rel: RelTag, nblocks: BlockNumber, ctx: &RequestContext, - ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + ) -> Result<(), RelationError> { + if rel.relnode == 0 { + return Err(RelationError::AlreadyExists); + } // It's possible that this is the first rel for this db in this // tablespace. Create the reldir entry for it if so. - let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?; + let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?) + .context("deserialize db")?; let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() { // Didn't exist. Update dbdir dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false); - let buf = DbDirectory::ser(&dbdir)?; + let buf = DbDirectory::ser(&dbdir).context("serialize db")?; self.put(DBDIR_KEY, Value::Image(buf.into())); // and create the RelDirectory RelDirectory::default() } else { // reldir already exists, fetch it - RelDirectory::des(&self.get(rel_dir_key, ctx).await?)? + RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?) + .context("deserialize db")? }; // Add the new relation to the rel directory entry, and write it back if !rel_dir.rels.insert((rel.relnode, rel.forknum)) { - anyhow::bail!("rel {rel} already exists"); + return Err(RelationError::AlreadyExists); } self.put( rel_dir_key, - Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)), + Value::Image(Bytes::from( + RelDirectory::ser(&rel_dir).context("serialize")?, + )), ); // Put size @@ -925,7 +941,7 @@ impl<'a> DatadirModification<'a> { nblocks: BlockNumber, ctx: &RequestContext, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); let last_lsn = self.tline.get_last_record_lsn(); if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? { let size_key = rel_size_to_key(rel); @@ -956,7 +972,7 @@ impl<'a> DatadirModification<'a> { nblocks: BlockNumber, ctx: &RequestContext, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); // Put size let size_key = rel_size_to_key(rel); @@ -977,7 +993,7 @@ impl<'a> DatadirModification<'a> { /// Drop a relation. pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); // Remove it from the directory entry let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 71dc061485..f5816ae08d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -387,6 +387,21 @@ remote: } } +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +pub enum GetTimelineError { + #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")] + NotActive { + tenant_id: TenantId, + timeline_id: TimelineId, + state: TimelineState, + }, + #[error("Timeline {tenant_id}/{timeline_id} was not found")] + NotFound { + tenant_id: TenantId, + timeline_id: TimelineId, + }, +} + #[derive(Debug, thiserror::Error)] pub enum DeleteTimelineError { #[error("NotFound")] @@ -467,6 +482,14 @@ pub(crate) enum ShutdownError { AlreadyStopping, } +struct DeletionGuard(OwnedMutexGuard); + +impl DeletionGuard { + fn is_deleted(&self) -> bool { + *self.0 + } +} + impl Tenant { /// Yet another helper for timeline initialization. /// Contains the common part of `load_local_timeline` and `load_remote_timeline`. @@ -1003,6 +1026,117 @@ impl Tenant { tenant } + pub fn scan_and_sort_timelines_dir( + self: Arc, + ) -> anyhow::Result> { + let timelines_dir = self.conf.timelines_path(&self.tenant_id); + let mut timelines_to_load: HashMap = HashMap::new(); + + for entry in + std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")? + { + let entry = entry.context("read timeline dir entry")?; + let timeline_dir = entry.path(); + + if crate::is_temporary(&timeline_dir) { + info!( + "Found temporary timeline directory, removing: {}", + timeline_dir.display() + ); + if let Err(e) = std::fs::remove_dir_all(&timeline_dir) { + error!( + "Failed to remove temporary directory '{}': {:?}", + timeline_dir.display(), + e + ); + } + } else if is_uninit_mark(&timeline_dir) { + if !timeline_dir.exists() { + warn!( + "Timeline dir entry become invalid: {}", + timeline_dir.display() + ); + continue; + } + let timeline_uninit_mark_file = &timeline_dir; + info!( + "Found an uninit mark file {}, removing the timeline and its uninit mark", + timeline_uninit_mark_file.display() + ); + let timeline_id = timeline_uninit_mark_file + .file_stem() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .with_context(|| { + format!( + "Could not parse timeline id out of the timeline uninit mark name {}", + timeline_uninit_mark_file.display() + ) + })?; + let timeline_dir = self.conf.timeline_path(&timeline_id, &self.tenant_id); + if let Err(e) = + remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file) + { + error!("Failed to clean up uninit marked timeline: {e:?}"); + } + } else { + if !timeline_dir.exists() { + warn!( + "Timeline dir entry become invalid: {}", + timeline_dir.display() + ); + continue; + } + let timeline_id = timeline_dir + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .with_context(|| { + format!( + "Could not parse timeline id out of the timeline dir name {}", + timeline_dir.display() + ) + })?; + let timeline_uninit_mark_file = self + .conf + .timeline_uninit_mark_file_path(self.tenant_id, timeline_id); + if timeline_uninit_mark_file.exists() { + info!( + %timeline_id, + "Found an uninit mark file, removing the timeline and its uninit mark", + ); + if let Err(e) = + remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file) + { + error!("Failed to clean up uninit marked timeline: {e:?}"); + } + continue; + } + + let file_name = entry.file_name(); + if let Ok(timeline_id) = + file_name.to_str().unwrap_or_default().parse::() + { + let metadata = load_metadata(self.conf, timeline_id, self.tenant_id) + .context("failed to load metadata")?; + timelines_to_load.insert(timeline_id, metadata); + } else { + // A file or directory that doesn't look like a timeline ID + warn!( + "unexpected file or directory in timelines directory: {}", + file_name.to_string_lossy() + ); + } + } + } + + // Sort the array of timeline IDs into tree-order, so that parent comes before + // all its children. + tree_sort_timelines(timelines_to_load) + } + /// /// Background task to load in-memory data structures for this tenant, from /// files on disk. Used at pageserver startup. @@ -1020,124 +1154,16 @@ impl Tenant { utils::failpoint_sleep_millis_async!("before-loading-tenant"); - // TODO split this into two functions, scan and actual load - // Load in-memory state to reflect the local files on disk // // Scan the directory, peek into the metadata file of each timeline, and // collect a list of timelines and their ancestors. - let tenant_id = self.tenant_id; - let conf = self.conf; let span = info_span!("blocking"); + let cloned = Arc::clone(self); let sorted_timelines: Vec<(_, _)> = tokio::task::spawn_blocking(move || { let _g = span.entered(); - let mut timelines_to_load: HashMap = HashMap::new(); - let timelines_dir = conf.timelines_path(&tenant_id); - - for entry in - std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")? - { - let entry = entry.context("read timeline dir entry")?; - let timeline_dir = entry.path(); - - if crate::is_temporary(&timeline_dir) { - info!( - "Found temporary timeline directory, removing: {}", - timeline_dir.display() - ); - if let Err(e) = std::fs::remove_dir_all(&timeline_dir) { - error!( - "Failed to remove temporary directory '{}': {:?}", - timeline_dir.display(), - e - ); - } - } else if is_uninit_mark(&timeline_dir) { - if !timeline_dir.exists() { - warn!( - "Timeline dir entry become invalid: {}", - timeline_dir.display() - ); - continue; - } - let timeline_uninit_mark_file = &timeline_dir; - info!( - "Found an uninit mark file {}, removing the timeline and its uninit mark", - timeline_uninit_mark_file.display() - ); - let timeline_id = timeline_uninit_mark_file - .file_stem() - .and_then(OsStr::to_str) - .unwrap_or_default() - .parse::() - .with_context(|| { - format!( - "Could not parse timeline id out of the timeline uninit mark name {}", - timeline_uninit_mark_file.display() - ) - })?; - let timeline_dir = conf.timeline_path(&timeline_id, &tenant_id); - if let Err(e) = - remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file) - { - error!("Failed to clean up uninit marked timeline: {e:?}"); - } - } else { - if !timeline_dir.exists() { - warn!( - "Timeline dir entry become invalid: {}", - timeline_dir.display() - ); - continue; - } - let timeline_id = timeline_dir - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() - .parse::() - .with_context(|| { - format!( - "Could not parse timeline id out of the timeline dir name {}", - timeline_dir.display() - ) - })?; - let timeline_uninit_mark_file = - conf.timeline_uninit_mark_file_path(tenant_id, timeline_id); - if timeline_uninit_mark_file.exists() { - info!( - %timeline_id, - "Found an uninit mark file, removing the timeline and its uninit mark", - ); - if let Err(e) = remove_timeline_and_uninit_mark( - &timeline_dir, - &timeline_uninit_mark_file, - ) { - error!("Failed to clean up uninit marked timeline: {e:?}"); - } - continue; - } - - let file_name = entry.file_name(); - if let Ok(timeline_id) = - file_name.to_str().unwrap_or_default().parse::() - { - let metadata = load_metadata(conf, timeline_id, tenant_id) - .context("failed to load metadata")?; - timelines_to_load.insert(timeline_id, metadata); - } else { - // A file or directory that doesn't look like a timeline ID - warn!( - "unexpected file or directory in timelines directory: {}", - file_name.to_string_lossy() - ); - } - } - } - - // Sort the array of timeline IDs into tree-order, so that parent comes before - // all its children. - tree_sort_timelines(timelines_to_load) + cloned.scan_and_sort_timelines_dir() }) .await .context("load spawn_blocking") @@ -1272,19 +1298,21 @@ impl Tenant { &self, timeline_id: TimelineId, active_only: bool, - ) -> anyhow::Result> { + ) -> Result, GetTimelineError> { let timelines_accessor = self.timelines.lock().unwrap(); - let timeline = timelines_accessor.get(&timeline_id).with_context(|| { - format!("Timeline {}/{} was not found", self.tenant_id, timeline_id) - })?; + let timeline = timelines_accessor + .get(&timeline_id) + .ok_or(GetTimelineError::NotFound { + tenant_id: self.tenant_id, + timeline_id, + })?; if active_only && !timeline.is_active() { - anyhow::bail!( - "Timeline {}/{} is not active, state: {:?}", - self.tenant_id, + Err(GetTimelineError::NotActive { + tenant_id: self.tenant_id, timeline_id, - timeline.current_state() - ) + state: timeline.current_state(), + }) } else { Ok(Arc::clone(timeline)) } @@ -1820,6 +1848,7 @@ impl Tenant { &self, timeline_id: TimelineId, timeline: Arc, + guard: DeletionGuard, ) -> anyhow::Result<()> { { // Grab the layer_removal_cs lock, and actually perform the deletion. @@ -1892,6 +1921,25 @@ impl Tenant { Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))? }); + if let Some(remote_client) = &timeline.remote_client { + remote_client.delete_all().await.context("delete_all")? + }; + + // Have a failpoint that can use the `pause` failpoint action. + // We don't want to block the executor thread, hence, spawn_blocking + await. + if cfg!(feature = "testing") { + tokio::task::spawn_blocking({ + let current = tracing::Span::current(); + move || { + let _entered = current.entered(); + tracing::info!("at failpoint in_progress_delete"); + fail::fail_point!("in_progress_delete"); + } + }) + .await + .expect("spawn_blocking"); + } + { // Remove the timeline from the map. let mut timelines = self.timelines.lock().unwrap(); @@ -1912,12 +1960,7 @@ impl Tenant { drop(timelines); } - let remote_client = match &timeline.remote_client { - Some(remote_client) => remote_client, - None => return Ok(()), - }; - - remote_client.delete_all().await?; + drop(guard); Ok(()) } @@ -1970,23 +2013,18 @@ impl Tenant { } // Prevent two tasks from trying to delete the timeline at the same time. - // - // XXX: We should perhaps return an HTTP "202 Accepted" to signal that the caller - // needs to poll until the operation has finished. But for now, we return an - // error, because the control plane knows to retry errors. - delete_lock_guard = - Arc::clone(&timeline.delete_lock) - .try_lock_owned() - .map_err(|_| { + DeletionGuard(Arc::clone(&timeline.delete_lock).try_lock_owned().map_err( + |_| { DeleteTimelineError::Other(anyhow::anyhow!( "timeline deletion is already in progress" )) - })?; + }, + )?); // If another task finished the deletion just before we acquired the lock, // return success. - if *delete_lock_guard { + if delete_lock_guard.is_deleted() { return Ok(()); } @@ -2060,7 +2098,7 @@ impl Tenant { self: Arc, timeline_id: TimelineId, timeline: Arc, - _guard: OwnedMutexGuard, + guard: DeletionGuard, ) { let tenant_id = self.tenant_id; let timeline_clone = Arc::clone(&timeline); @@ -2073,7 +2111,7 @@ impl Tenant { "timeline_delete", false, async move { - if let Err(err) = self.delete_timeline(timeline_id, timeline).await { + if let Err(err) = self.delete_timeline(timeline_id, timeline, guard).await { error!("Error: {err:#}"); timeline_clone.set_broken(err.to_string()) }; @@ -3741,9 +3779,8 @@ pub fn dump_layerfile_from_path( #[cfg(test)] pub mod harness { use bytes::{Bytes, BytesMut}; - use once_cell::sync::Lazy; use once_cell::sync::OnceCell; - use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; + use std::sync::Arc; use std::{fs, path::PathBuf}; use utils::logging; use utils::lsn::Lsn; @@ -3776,8 +3813,6 @@ pub mod harness { buf.freeze() } - static LOCK: Lazy> = Lazy::new(|| RwLock::new(())); - impl From for TenantConfOpt { fn from(tenant_conf: TenantConf) -> Self { Self { @@ -3804,33 +3839,16 @@ pub mod harness { } } - pub struct TenantHarness<'a> { + pub struct TenantHarness { pub conf: &'static PageServerConf, pub tenant_conf: TenantConf, pub tenant_id: TenantId, - - pub lock_guard: ( - Option>, - Option>, - ), } static LOG_HANDLE: OnceCell<()> = OnceCell::new(); - impl<'a> TenantHarness<'a> { + impl TenantHarness { pub fn create(test_name: &'static str) -> anyhow::Result { - Self::create_internal(test_name, false) - } - pub fn create_exclusive(test_name: &'static str) -> anyhow::Result { - Self::create_internal(test_name, true) - } - fn create_internal(test_name: &'static str, exclusive: bool) -> anyhow::Result { - let lock_guard = if exclusive { - (None, Some(LOCK.write().unwrap())) - } else { - (Some(LOCK.read().unwrap()), None) - }; - LOG_HANDLE.get_or_init(|| { logging::init( logging::LogFormat::Test, @@ -3866,7 +3884,6 @@ pub mod harness { conf, tenant_conf, tenant_id, - lock_guard, }) } @@ -3891,26 +3908,12 @@ pub mod harness { self.tenant_id, None, )); - // populate tenant with locally available timelines - let mut timelines_to_load = HashMap::new(); - for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id)) - .expect("should be able to read timelines dir") - { - let timeline_dir_entry = timeline_dir_entry?; - let timeline_id: TimelineId = timeline_dir_entry - .path() - .file_name() - .unwrap() - .to_string_lossy() - .parse()?; - - let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?; - timelines_to_load.insert(timeline_id, timeline_metadata); - } tenant .load(TimelineLoadCause::Test, None, ctx) .instrument(info_span!("try_load", tenant_id=%self.tenant_id)) .await?; + + // TODO reuse Tenant::activate (needs broker) tenant.state.send_replace(TenantState::Active); for timeline in tenant.timelines.lock().unwrap().values() { timeline.set_state(TimelineState::Active); @@ -4436,9 +4439,13 @@ mod tests { std::fs::write(metadata_path, metadata_bytes)?; let err = harness.try_load(&ctx).await.err().expect("should fail"); - assert!(err - .to_string() - .starts_with("Failed to parse metadata bytes from path")); + // get all the stack with all .context, not tonly the last one + let message = format!("{err:#}"); + let expected = "Failed to parse metadata bytes from path"; + assert!( + message.contains(expected), + "message '{message}' expected to contain {expected}" + ); let mut found_error_message = false; let mut err_source = err.source(); @@ -4873,6 +4880,45 @@ mod tests { assert!(expect_initdb_optimization); assert!(initdb_optimization_count > 0); } + Ok(()) + } + + #[tokio::test] + async fn test_uninit_mark_crash() -> anyhow::Result<()> { + let name = "test_uninit_mark_crash"; + let harness = TenantHarness::create(name)?; + { + let (tenant, ctx) = harness.load().await; + let (guard, _tline) = tenant + .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; + // Keeps uninit mark in place + std::mem::forget(guard); + } + + let (tenant, _) = harness.load().await; + match tenant.get_timeline(TIMELINE_ID, false) { + Ok(_) => panic!("timeline should've been removed during load"), + Err(e) => { + assert_eq!( + e, + GetTimelineError::NotFound { + tenant_id: tenant.tenant_id, + timeline_id: TIMELINE_ID, + } + ) + } + } + + assert!(!harness + .conf + .timeline_path(&TIMELINE_ID, &tenant.tenant_id) + .exists()); + + assert!(!harness + .conf + .timeline_uninit_mark_file_path(tenant.tenant_id, TIMELINE_ID) + .exists()); Ok(()) } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 80d153661a..ffe2c5eab6 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -38,8 +38,8 @@ pub mod defaults { pub const DEFAULT_GC_PERIOD: &str = "1 hr"; pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3; pub const DEFAULT_PITR_INTERVAL: &str = "7 days"; - pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds"; - pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds"; + pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds"; + pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds"; pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024; pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour"; } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index b0e19309d0..b498f8f377 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -681,7 +681,7 @@ pub async fn immediate_gc( .get(&tenant_id) .map(Arc::clone) .with_context(|| format!("tenant {tenant_id}")) - .map_err(ApiError::NotFound)?; + .map_err(|e| ApiError::NotFound(e.into()))?; let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon()); // Use tenant's pitr setting @@ -730,11 +730,11 @@ pub async fn immediate_compact( .get(&tenant_id) .map(Arc::clone) .with_context(|| format!("tenant {tenant_id}")) - .map_err(ApiError::NotFound)?; + .map_err(|e| ApiError::NotFound(e.into()))?; let timeline = tenant .get_timeline(timeline_id, true) - .map_err(ApiError::NotFound)?; + .map_err(|e| ApiError::NotFound(e.into()))?; // Run in task_mgr to avoid race with tenant_detach operation let ctx = ctx.detached_child(TaskKind::Compaction, DownloadBehavior::Download); diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 2c84c59dcb..7808b64d35 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -753,22 +753,18 @@ impl RemoteTimelineClient { // Have a failpoint that can use the `pause` failpoint action. // We don't want to block the executor thread, hence, spawn_blocking + await. - #[cfg(feature = "testing")] - tokio::task::spawn_blocking({ - let current = tracing::Span::current(); - move || { - let _entered = current.entered(); - tracing::info!( - "at failpoint persist_index_part_with_deleted_flag_after_set_before_upload_pause" - ); - fail::fail_point!( - "persist_index_part_with_deleted_flag_after_set_before_upload_pause" - ); - } - }) - .await - .expect("spawn_blocking"); - + if cfg!(feature = "testing") { + tokio::task::spawn_blocking({ + let current = tracing::Span::current(); + move || { + let _entered = current.entered(); + tracing::info!("at failpoint persist_deleted_index_part"); + fail::fail_point!("persist_deleted_index_part"); + } + }) + .await + .expect("spawn_blocking"); + } upload::upload_index_part( self.conf, &self.storage_impl, @@ -1371,7 +1367,7 @@ mod tests { struct TestSetup { runtime: &'static tokio::runtime::Runtime, entered_runtime: EnterGuard<'static>, - harness: TenantHarness<'static>, + harness: TenantHarness, tenant: Arc, tenant_ctx: RequestContext, remote_fs_dir: PathBuf, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 75c4bf962d..3bac321dd1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -15,6 +15,7 @@ use pageserver_api::models::{ TimelineState, }; use remote_storage::GenericRemoteStorage; +use serde_with::serde_as; use storage_broker::BrokerClientChannel; use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; use tokio_util::sync::CancellationToken; @@ -47,7 +48,10 @@ use crate::tenant::{ use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum}; -use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS}; +use crate::metrics::{ + TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT, + RECONSTRUCT_TIME, UNEXPECTED_ONDEMAND_DOWNLOADS, +}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError}; @@ -540,9 +544,7 @@ impl Timeline { match cached_lsn.cmp(&lsn) { Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check Ordering::Equal => { - self.metrics - .materialized_page_cache_hit_upon_request_counter - .inc(); + MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc(); return Ok(cached_img); // exact LSN match, return the image } Ordering::Greater => { @@ -564,8 +566,7 @@ impl Timeline { .await?; timer.stop_and_record(); - self.metrics - .reconstruct_time_histo + RECONSTRUCT_TIME .observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state)) } @@ -2474,7 +2475,7 @@ impl Timeline { ValueReconstructResult::Continue => { // If we reached an earlier cached page image, we're done. if cont_lsn == cached_lsn + 1 { - self.metrics.materialized_page_cache_hit_counter.inc_by(1); + MATERIALIZED_PAGE_CACHE_HIT.inc_by(1); return Ok(()); } if prev_lsn <= cont_lsn { @@ -3090,7 +3091,7 @@ impl Timeline { frozen_layer: &Arc, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { let span = tracing::info_span!("blocking"); - let (new_delta, sz): (DeltaLayer, _) = tokio::task::spawn_blocking({ + let new_delta: DeltaLayer = tokio::task::spawn_blocking({ let _g = span.entered(); let self_clone = Arc::clone(self); let frozen_layer = Arc::clone(frozen_layer); @@ -3102,29 +3103,31 @@ impl Timeline { // Sync it to disk. // // We must also fsync the timeline dir to ensure the directory entries for - // new layer files are durable + // new layer files are durable. + // + // NB: timeline dir must be synced _after_ the file contents are durable. + // So, two separate fsyncs are required, they mustn't be batched. // // TODO: If we're running inside 'flush_frozen_layers' and there are multiple - // files to flush, it might be better to first write them all, and then fsync - // them all in parallel. - - // First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace - // this with a single fsync in future refactors. - par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; - // Then sync the parent directory. + // files to flush, the fsync overhead can be reduces as follows: + // 1. write them all to temporary file names + // 2. fsync them + // 3. rename to the final name + // 4. fsync the parent directory. + // Note that (1),(2),(3) today happen inside write_to_disk(). + par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?; par_fsync::par_fsync(&[self_clone .conf .timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)]) .context("fsync of timeline dir")?; - let sz = new_delta_path.metadata()?.len(); - - anyhow::Ok((new_delta, sz)) + anyhow::Ok(new_delta) } }) .await .context("spawn_blocking")??; let new_delta_name = new_delta.filename(); + let sz = new_delta.desc.file_size; // Add it to the layer map let l = Arc::new(new_delta); @@ -3138,9 +3141,8 @@ impl Timeline { batch_updates.insert_historic(l.layer_desc().clone(), l); batch_updates.flush(); - // update the timeline's physical size - self.metrics.resident_physical_size_gauge.add(sz); // update metrics + self.metrics.resident_physical_size_gauge.add(sz); self.metrics.num_persistent_files_created.inc_by(1); self.metrics.persistent_bytes_written.inc_by(sz); @@ -3419,6 +3421,130 @@ impl From for CompactionError { } } +#[serde_as] +#[derive(serde::Serialize)] +struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration); + +#[derive(Default)] +enum DurationRecorder { + #[default] + NotStarted, + Recorded(RecordedDuration, tokio::time::Instant), +} + +impl DurationRecorder { + pub fn till_now(&self) -> DurationRecorder { + match self { + DurationRecorder::NotStarted => { + panic!("must only call on recorded measurements") + } + DurationRecorder::Recorded(_, ended) => { + let now = tokio::time::Instant::now(); + DurationRecorder::Recorded(RecordedDuration(now - *ended), now) + } + } + } + pub fn into_recorded(self) -> Option { + match self { + DurationRecorder::NotStarted => None, + DurationRecorder::Recorded(recorded, _) => Some(recorded), + } + } +} + +#[derive(Default)] +struct CompactLevel0Phase1StatsBuilder { + version: Option, + tenant_id: Option, + timeline_id: Option, + first_read_lock_acquisition_micros: DurationRecorder, + get_level0_deltas_plus_drop_lock_micros: DurationRecorder, + level0_deltas_count: Option, + time_spent_between_locks: DurationRecorder, + second_read_lock_acquisition_micros: DurationRecorder, + second_read_lock_held_micros: DurationRecorder, + sort_holes_micros: DurationRecorder, + write_layer_files_micros: DurationRecorder, + new_deltas_count: Option, + new_deltas_size: Option, +} + +#[serde_as] +#[derive(serde::Serialize)] +struct CompactLevel0Phase1Stats { + version: u64, + #[serde_as(as = "serde_with::DisplayFromStr")] + tenant_id: TenantId, + #[serde_as(as = "serde_with::DisplayFromStr")] + timeline_id: TimelineId, + first_read_lock_acquisition_micros: RecordedDuration, + get_level0_deltas_plus_drop_lock_micros: RecordedDuration, + level0_deltas_count: usize, + time_spent_between_locks: RecordedDuration, + second_read_lock_acquisition_micros: RecordedDuration, + second_read_lock_held_micros: RecordedDuration, + sort_holes_micros: RecordedDuration, + write_layer_files_micros: RecordedDuration, + new_deltas_count: usize, + new_deltas_size: u64, +} + +impl TryFrom for CompactLevel0Phase1Stats { + type Error = anyhow::Error; + + fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result { + let CompactLevel0Phase1StatsBuilder { + version, + tenant_id, + timeline_id, + first_read_lock_acquisition_micros, + get_level0_deltas_plus_drop_lock_micros, + level0_deltas_count, + time_spent_between_locks, + second_read_lock_acquisition_micros, + second_read_lock_held_micros, + sort_holes_micros, + write_layer_files_micros, + new_deltas_count, + new_deltas_size, + } = value; + Ok(CompactLevel0Phase1Stats { + version: version.ok_or_else(|| anyhow::anyhow!("version not set"))?, + tenant_id: tenant_id.ok_or_else(|| anyhow::anyhow!("tenant_id not set"))?, + timeline_id: timeline_id.ok_or_else(|| anyhow::anyhow!("timeline_id not set"))?, + first_read_lock_acquisition_micros: first_read_lock_acquisition_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("first_read_lock_acquisition_micros not set"))?, + get_level0_deltas_plus_drop_lock_micros: get_level0_deltas_plus_drop_lock_micros + .into_recorded() + .ok_or_else(|| { + anyhow::anyhow!("get_level0_deltas_plus_drop_lock_micros not set") + })?, + level0_deltas_count: level0_deltas_count + .ok_or_else(|| anyhow::anyhow!("level0_deltas_count not set"))?, + time_spent_between_locks: time_spent_between_locks + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("time_spent_between_locks not set"))?, + second_read_lock_acquisition_micros: second_read_lock_acquisition_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("second_read_lock_acquisition_micros not set"))?, + second_read_lock_held_micros: second_read_lock_held_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("second_read_lock_held_micros not set"))?, + sort_holes_micros: sort_holes_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("sort_holes_micros not set"))?, + write_layer_files_micros: write_layer_files_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("write_layer_files_micros not set"))?, + new_deltas_count: new_deltas_count + .ok_or_else(|| anyhow::anyhow!("new_deltas_count not set"))?, + new_deltas_size: new_deltas_size + .ok_or_else(|| anyhow::anyhow!("new_deltas_size not set"))?, + }) + } +} + impl Timeline { /// Level0 files first phase of compaction, explained in the [`compact_inner`] comment. /// @@ -3431,9 +3557,23 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result { + let mut stats = CompactLevel0Phase1StatsBuilder { + version: Some(1), + tenant_id: Some(self.tenant_id), + timeline_id: Some(self.timeline_id), + ..Default::default() + }; + + let begin = tokio::time::Instant::now(); let layers = self.layers.read().await; + let now = tokio::time::Instant::now(); + stats.first_read_lock_acquisition_micros = + DurationRecorder::Recorded(RecordedDuration(now - begin), now); let mut level0_deltas = layers.get_level0_deltas()?; drop(layers); + stats.level0_deltas_count = Some(level0_deltas.len()); + stats.get_level0_deltas_plus_drop_lock_micros = + stats.first_read_lock_acquisition_micros.till_now(); // Only compact if enough layers have accumulated. let threshold = self.get_compaction_threshold(); @@ -3554,7 +3694,9 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); + stats.time_spent_between_locks = stats.get_level0_deltas_plus_drop_lock_micros.till_now(); let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here? + stats.second_read_lock_acquisition_micros = stats.time_spent_between_locks.till_now(); let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? @@ -3588,9 +3730,11 @@ impl Timeline { prev = Some(next_key.next()); } drop(layers); + stats.second_read_lock_held_micros = stats.second_read_lock_acquisition_micros.till_now(); let mut holes = heap.into_vec(); holes.sort_unstable_by_key(|hole| hole.key_range.start); let mut next_hole = 0; // index of next hole in holes vector + stats.sort_holes_micros = stats.second_read_lock_held_micros.till_now(); // Merge the contents of all the input delta layers into a new set // of delta layers, based on the current partitioning. @@ -3750,8 +3894,26 @@ impl Timeline { layer_paths.pop().unwrap(); } + stats.write_layer_files_micros = stats.sort_holes_micros.till_now(); + stats.new_deltas_count = Some(new_layers.len()); + stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum()); + drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed + match TryInto::::try_into(stats) + .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string")) + { + Ok(stats_json) => { + info!( + stats_json = stats_json.as_str(), + "compact_level0_phase1 stats available" + ) + } + Err(e) => { + warn!("compact_level0_phase1 stats failed to serialize: {:#}", e); + } + } + Ok(CompactLevel0Phase1Result { new_layers, deltas_to_compact, @@ -3878,6 +4040,7 @@ impl Timeline { /// for example. The caller should hold `Tenant::gc_cs` lock to ensure /// that. /// + #[instrument(skip_all, fields(timeline_id=%self.timeline_id))] pub(super) async fn update_gc_info( &self, retain_lsns: Vec, diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 3abd90bef9..595725fc09 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -1322,7 +1322,7 @@ mod tests { const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr"; - async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState { + async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState { let (tenant, ctx) = harness.load().await; let timeline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 68cf2a4645..8d4c1842bd 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -25,7 +25,7 @@ use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes; use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment; use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; -use anyhow::Result; +use anyhow::{Context, Result}; use bytes::{Buf, Bytes, BytesMut}; use tracing::*; @@ -1082,7 +1082,10 @@ impl<'a> WalIngest<'a> { .await? { // create it with 0 size initially, the logic below will extend it - modification.put_rel_creation(rel, 0, ctx).await?; + modification + .put_rel_creation(rel, 0, ctx) + .await + .context("Relation Error")?; 0 } else { self.timeline.get_rel_size(rel, last_lsn, true, ctx).await? diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 64d980d2e4..8d82de6dc4 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -257,7 +257,7 @@ nwp_register_gucs(void) "Walproposer reconnects to offline safekeepers once in this interval.", NULL, &wal_acceptor_reconnect_timeout, - 5000, 0, INT_MAX, /* default, min, max */ + 1000, 0, INT_MAX, /* default, min, max */ PGC_SIGHUP, /* context */ GUC_UNIT_MS, /* flags */ NULL, NULL, NULL); diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 52c3e8d4be..30036cc7f2 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -266,7 +266,7 @@ impl From for ApiError { fn from(te: TimelineError) -> ApiError { match te { TimelineError::NotFound(ttid) => { - ApiError::NotFound(anyhow!("timeline {} not found", ttid)) + ApiError::NotFound(anyhow!("timeline {} not found", ttid).into()) } _ => ApiError::InternalServerError(anyhow!("{}", te)), } diff --git a/scripts/ingest_perf_test_result.py b/scripts/ingest_perf_test_result.py index fc177b590e..35a1e29720 100644 --- a/scripts/ingest_perf_test_result.py +++ b/scripts/ingest_perf_test_result.py @@ -40,7 +40,9 @@ def get_connection_cursor(): @backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150) def connect(connstr): - return psycopg2.connect(connstr, connect_timeout=30) + conn = psycopg2.connect(connstr, connect_timeout=30) + conn.autocommit = True + return conn conn = connect(connstr) try: diff --git a/scripts/ingest_regress_test_result.py b/scripts/ingest_regress_test_result.py index dff8e0cefa..39c1c02941 100644 --- a/scripts/ingest_regress_test_result.py +++ b/scripts/ingest_regress_test_result.py @@ -34,7 +34,9 @@ def get_connection_cursor(): @backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150) def connect(connstr): - return psycopg2.connect(connstr, connect_timeout=30) + conn = psycopg2.connect(connstr, connect_timeout=30) + conn.autocommit = True + return conn conn = connect(connstr) try: diff --git a/test_runner/fixtures/flaky.py b/test_runner/fixtures/flaky.py index 9d7f8ead9a..d13f3318b0 100644 --- a/test_runner/fixtures/flaky.py +++ b/test_runner/fixtures/flaky.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import List +from typing import Any, List, MutableMapping, cast import pytest from _pytest.config import Config @@ -56,3 +56,15 @@ def pytest_collection_modifyitems(config: Config, items: List[pytest.Item]): # Rerun 3 times = 1 original run + 2 reruns log.info(f"Marking {item.nodeid} as flaky. It will be rerun up to 3 times") item.add_marker(pytest.mark.flaky(reruns=2)) + + # pytest-rerunfailures is not compatible with pytest-timeout (timeout is not set for reruns), + # we can workaround it by setting `timeout_func_only` to True[1]. + # Unfortunately, setting `timeout_func_only = True` globally in pytest.ini is broken[2], + # but we still can do it using pytest marker. + # + # - [1] https://github.com/pytest-dev/pytest-rerunfailures/issues/99 + # - [2] https://github.com/pytest-dev/pytest-timeout/issues/142 + timeout_marker = item.get_closest_marker("timeout") + if timeout_marker is not None: + kwargs = cast(MutableMapping[str, Any], timeout_marker.kwargs) + kwargs["func_only"] = True diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index b4c237cfa6..d55d159037 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -57,14 +57,16 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = ( "libmetrics_launch_timestamp", "libmetrics_build_info", "libmetrics_tracing_event_count_total", + "pageserver_materialized_cache_hits_total", + "pageserver_materialized_cache_hits_direct_total", + "pageserver_getpage_reconstruct_seconds_bucket", + "pageserver_getpage_reconstruct_seconds_count", + "pageserver_getpage_reconstruct_seconds_sum", ) PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = ( "pageserver_current_logical_size", "pageserver_resident_physical_size", - "pageserver_getpage_reconstruct_seconds_bucket", - "pageserver_getpage_reconstruct_seconds_count", - "pageserver_getpage_reconstruct_seconds_sum", "pageserver_getpage_get_reconstruct_data_seconds_bucket", "pageserver_getpage_get_reconstruct_data_seconds_count", "pageserver_getpage_get_reconstruct_data_seconds_sum", @@ -73,8 +75,6 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = ( "pageserver_io_operations_seconds_count", "pageserver_io_operations_seconds_sum", "pageserver_last_record_lsn", - "pageserver_materialized_cache_hits_total", - "pageserver_materialized_cache_hits_direct_total", "pageserver_read_num_fs_layers_bucket", "pageserver_read_num_fs_layers_count", "pageserver_read_num_fs_layers_sum", diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 809603e9f1..e4ec4f6f5a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1631,6 +1631,8 @@ class NeonPageserver(PgProtocol): r".*ERROR.*ancestor timeline \S+ is being stopped", # this is expected given our collaborative shutdown approach for the UploadQueue ".*Compaction failed, retrying in .*: queue is in state Stopped.*", + # Pageserver timeline deletion should be polled until it gets 404, so ignore it globally + ".*Error processing HTTP request: NotFound: Timeline .* was not found", ] def start( @@ -2413,6 +2415,17 @@ class Endpoint(PgProtocol): return self + def respec(self, **kwargs): + """Update the endpoint.json file used by control_plane.""" + # Read config + config_path = os.path.join(self.endpoint_path(), "endpoint.json") + with open(config_path, "r") as f: + data_dict = json.load(f) + + # Write it back updated + with open(config_path, "w") as file: + json.dump(dict(data_dict, **kwargs), file, indent=4) + def stop(self) -> "Endpoint": """ Stop the Postgres instance if it's running. diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index f258a3a24d..5c4f5177d0 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -342,6 +342,11 @@ class PageserverHttpClient(requests.Session): return res_json def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs): + """ + Note that deletion is not instant, it is scheduled and performed mostly in the background. + So if you need to wait for it to complete use `timeline_delete_wait_completed`. + For longer description consult with pageserver openapi spec. + """ res = self.delete( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", **kwargs ) diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 83880abc77..ad89ebad00 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -193,19 +193,30 @@ def wait_for_upload_queue_empty( time.sleep(0.2) -def assert_timeline_detail_404( +def wait_timeline_detail_404( + pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId +): + last_exc = None + for _ in range(2): + time.sleep(0.250) + try: + data = pageserver_http.timeline_detail(tenant_id, timeline_id) + log.error(f"detail {data}") + except PageserverApiException as e: + log.debug(e) + if e.status_code == 404: + return + + last_exc = e + + raise last_exc or RuntimeError(f"Timeline wasnt deleted in time, state: {data['state']}") + + +def timeline_delete_wait_completed( pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId, + **delete_args, ): - """Asserts that timeline_detail returns 404, or dumps the detail.""" - try: - data = pageserver_http.timeline_detail(tenant_id, timeline_id) - log.error(f"detail {data}") - except PageserverApiException as e: - log.error(e) - if e.status_code == 404: - return - else: - raise - raise Exception("detail succeeded (it should return 404)") + pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args) + wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id) diff --git a/test_runner/performance/test_startup.py b/test_runner/performance/test_startup.py index 9c45088d62..8babbbe132 100644 --- a/test_runner/performance/test_startup.py +++ b/test_runner/performance/test_startup.py @@ -32,13 +32,18 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc env.neon_cli.create_branch("test_startup") + endpoint = None + # We do two iterations so we can see if the second startup is faster. It should # be because the compute node should already be configured with roles, databases, # extensions, etc from the first run. for i in range(2): # Start with zenbenchmark.record_duration(f"{i}_start_and_select"): - endpoint = env.endpoints.create_start("test_startup") + if endpoint: + endpoint.start() + else: + endpoint = env.endpoints.create_start("test_startup") endpoint.safe_psql("select 1;") # Get metrics @@ -57,6 +62,9 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc # Stop so we can restart endpoint.stop() + # Imitate optimizations that console would do for the second start + endpoint.respec(skip_pg_catalog_updates=True) + # This test sometimes runs for longer than the global 5 minute timeout. @pytest.mark.timeout(600) diff --git a/test_runner/pg_clients/python/pg8000/requirements.txt b/test_runner/pg_clients/python/pg8000/requirements.txt index 7bba8da06d..a8407c3cb0 100644 --- a/test_runner/pg_clients/python/pg8000/requirements.txt +++ b/test_runner/pg_clients/python/pg8000/requirements.txt @@ -1,2 +1,2 @@ -pg8000==1.29.4 +pg8000==1.29.8 scramp>=1.4.3 diff --git a/test_runner/pg_clients/rust/tokio-postgres/Cargo.lock b/test_runner/pg_clients/rust/tokio-postgres/Cargo.lock index 30deb3ff20..bdbbe0ad69 100644 --- a/test_runner/pg_clients/rust/tokio-postgres/Cargo.lock +++ b/test_runner/pg_clients/rust/tokio-postgres/Cargo.lock @@ -396,9 +396,9 @@ checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" [[package]] name = "openssl" -version = "0.10.52" +version = "0.10.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56" +checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d" dependencies = [ "bitflags", "cfg-if", @@ -428,9 +428,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.87" +version = "0.9.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e" +checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6" dependencies = [ "cc", "libc", diff --git a/test_runner/pg_clients/rust/tokio-postgres/Dockerfile b/test_runner/pg_clients/rust/tokio-postgres/Dockerfile index 43fc6f6c92..35ae25a470 100644 --- a/test_runner/pg_clients/rust/tokio-postgres/Dockerfile +++ b/test_runner/pg_clients/rust/tokio-postgres/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.69 +FROM rust:1.70 WORKDIR /source COPY . . diff --git a/test_runner/pg_clients/swift/PostgresNIOExample/Package.resolved b/test_runner/pg_clients/swift/PostgresNIOExample/Package.resolved index cc12acda4c..9f13106011 100644 --- a/test_runner/pg_clients/swift/PostgresNIOExample/Package.resolved +++ b/test_runner/pg_clients/swift/PostgresNIOExample/Package.resolved @@ -5,8 +5,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/vapor/postgres-nio.git", "state" : { - "revision" : "dbf9c2eb596df39cba8ff3f74d74b2e6a31bd937", - "version" : "1.14.1" + "revision" : "061a0836d7c1887e04a975d1d2eaa2ef5fd7dfab", + "version" : "1.16.0" } }, { @@ -59,8 +59,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/apple/swift-nio.git", "state" : { - "revision" : "d1690f85419fdac8d54e350fb6d2ab9fd95afd75", - "version" : "2.51.1" + "revision" : "6213ba7a06febe8fef60563a4a7d26a4085783cf", + "version" : "2.54.0" } }, { diff --git a/test_runner/pg_clients/swift/PostgresNIOExample/Package.swift b/test_runner/pg_clients/swift/PostgresNIOExample/Package.swift index ac32b982e2..a80590daa2 100644 --- a/test_runner/pg_clients/swift/PostgresNIOExample/Package.swift +++ b/test_runner/pg_clients/swift/PostgresNIOExample/Package.swift @@ -4,7 +4,7 @@ import PackageDescription let package = Package( name: "PostgresNIOExample", dependencies: [ - .package(url: "https://github.com/vapor/postgres-nio.git", from: "1.14.1") + .package(url: "https://github.com/vapor/postgres-nio.git", from: "1.16.0") ], targets: [ .executableTarget( diff --git a/test_runner/pg_clients/typescript/postgresql-client/package-lock.json b/test_runner/pg_clients/typescript/postgresql-client/package-lock.json index e4dfd1dd9d..4cedf56acd 100644 --- a/test_runner/pg_clients/typescript/postgresql-client/package-lock.json +++ b/test_runner/pg_clients/typescript/postgresql-client/package-lock.json @@ -5,23 +5,7 @@ "packages": { "": { "dependencies": { - "postgresql-client": "2.5.5" - } - }, - "node_modules/debug": { - "version": "4.3.4", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", - "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", - "dependencies": { - "ms": "2.1.2" - }, - "engines": { - "node": ">=6.0" - }, - "peerDependenciesMeta": { - "supports-color": { - "optional": true - } + "postgresql-client": "2.5.9" } }, "node_modules/doublylinked": { @@ -41,11 +25,6 @@ "putil-promisify": "^1.8.6" } }, - "node_modules/ms": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" - }, "node_modules/obuf": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz", @@ -63,30 +42,28 @@ } }, "node_modules/postgresql-client": { - "version": "2.5.5", - "resolved": "https://registry.npmjs.org/postgresql-client/-/postgresql-client-2.5.5.tgz", - "integrity": "sha512-2Mu3i+6NQ9cnkoZNd0XeSZo9WoUpuWf4ZSiCCoDWSj82T93py2/SKXZ1aUaP8mVaU0oKpyyGe0IwLYZ1VHShnA==", + "version": "2.5.9", + "resolved": "https://registry.npmjs.org/postgresql-client/-/postgresql-client-2.5.9.tgz", + "integrity": "sha512-s+kgTN6TfWLzehEyxw4Im4odnxVRCbZ0DEJzWS6SLowPAmB2m1/DOiOvZC0+ZVoi5AfbGE6SBqFxKguSyVAXZg==", "dependencies": { - "debug": "^4.3.4", "doublylinked": "^2.5.2", "lightning-pool": "^4.2.1", "postgres-bytea": "^3.0.0", - "power-tasks": "^1.6.4", + "power-tasks": "^1.7.0", "putil-merge": "^3.10.3", "putil-promisify": "^1.10.0", "putil-varhelpers": "^1.6.5" }, "engines": { - "node": ">=14.0", + "node": ">=16.0", "npm": ">=7.0.0" } }, "node_modules/power-tasks": { - "version": "1.6.4", - "resolved": "https://registry.npmjs.org/power-tasks/-/power-tasks-1.6.4.tgz", - "integrity": "sha512-LX8GGgEIP1N7jsZqlqZ275e6f1Ehq97APCEGj8uVO0NoEoB+77QUX12BFv3LmlNKfq4fIuNSPiHhyHFjqn2gfA==", + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/power-tasks/-/power-tasks-1.7.0.tgz", + "integrity": "sha512-rndZXCDxhuIDjPUJJvQwBDHaYagCkjvbPF/NA+omh/Ef4rAI9KtnvdA0k98dyiGpn1zXOpc6c2c0JWzg/xAhJg==", "dependencies": { - "debug": "^4.3.4", "doublylinked": "^2.5.2", "strict-typed-events": "^2.3.1" }, @@ -132,9 +109,9 @@ } }, "node_modules/ts-gems": { - "version": "2.3.0", - "resolved": "https://registry.npmjs.org/ts-gems/-/ts-gems-2.3.0.tgz", - "integrity": "sha512-bUvrwrzlct7vfaNvtgMhynDf6lAki/kTtrNsIGhX6l7GJGK3s6b8Ro7dazOLXabV0m2jyShBzDQ8X1+h/C2Cug==" + "version": "2.4.0", + "resolved": "https://registry.npmjs.org/ts-gems/-/ts-gems-2.4.0.tgz", + "integrity": "sha512-SdugYAXoWvbqrxLodIObzxhEKacDxh5LfAJIiIkiH7q5thvuuCzdmkdTVQYf7uEDrEpPhfx4tokDMamdO3be9A==" } } } diff --git a/test_runner/pg_clients/typescript/postgresql-client/package.json b/test_runner/pg_clients/typescript/postgresql-client/package.json index 9eaa13437a..12703ce89f 100644 --- a/test_runner/pg_clients/typescript/postgresql-client/package.json +++ b/test_runner/pg_clients/typescript/postgresql-client/package.json @@ -1,6 +1,6 @@ { "type": "module", "dependencies": { - "postgresql-client": "2.5.5" + "postgresql-client": "2.5.9" } } diff --git a/test_runner/pg_clients/typescript/serverless-driver/Dockerfile b/test_runner/pg_clients/typescript/serverless-driver/Dockerfile index a5ad832a5c..07e98c586b 100644 --- a/test_runner/pg_clients/typescript/serverless-driver/Dockerfile +++ b/test_runner/pg_clients/typescript/serverless-driver/Dockerfile @@ -1,4 +1,4 @@ -FROM node:18 +FROM node:20 WORKDIR /source COPY . . diff --git a/test_runner/pg_clients/typescript/serverless-driver/package-lock.json b/test_runner/pg_clients/typescript/serverless-driver/package-lock.json index 0fb84cf5b7..72cc452817 100644 --- a/test_runner/pg_clients/typescript/serverless-driver/package-lock.json +++ b/test_runner/pg_clients/typescript/serverless-driver/package-lock.json @@ -5,16 +5,16 @@ "packages": { "": { "dependencies": { - "@neondatabase/serverless": "0.4.3", + "@neondatabase/serverless": "0.4.18", "ws": "8.13.0" } }, "node_modules/@neondatabase/serverless": { - "version": "0.4.3", - "resolved": "https://registry.npmjs.org/@neondatabase/serverless/-/serverless-0.4.3.tgz", - "integrity": "sha512-U8tpuF5f0R5WRsciR7iaJ5S2h54DWa6Z6CEW+J4KgwyvRN3q3qDz0MibdfFXU0WqnRoi/9RSf/2XN4TfeaOCbQ==", + "version": "0.4.18", + "resolved": "https://registry.npmjs.org/@neondatabase/serverless/-/serverless-0.4.18.tgz", + "integrity": "sha512-2TZnIyRGC/+0fjZ8TKCzaSTPUD94PM7NBGuantGZbUrbWyqBwGnUoRtdZAQ95qBKVHqORLVfymlv2NE+HQMFeA==", "dependencies": { - "@types/pg": "^8.6.6" + "@types/pg": "8.6.6" } }, "node_modules/@types/node": { diff --git a/test_runner/pg_clients/typescript/serverless-driver/package.json b/test_runner/pg_clients/typescript/serverless-driver/package.json index 71ba181afc..840c7a5c4c 100644 --- a/test_runner/pg_clients/typescript/serverless-driver/package.json +++ b/test_runner/pg_clients/typescript/serverless-driver/package.json @@ -1,7 +1,7 @@ { "type": "module", "dependencies": { - "@neondatabase/serverless": "0.4.3", + "@neondatabase/serverless": "0.4.18", "ws": "8.13.0" } } diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 2635dbd93c..61f86dc3ce 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -15,7 +15,11 @@ from fixtures.neon_fixtures import ( PortDistributor, ) from fixtures.pageserver.http import PageserverHttpClient -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + timeline_delete_wait_completed, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.pg_version import PgVersion from fixtures.types import Lsn from pytest import FixtureRequest @@ -417,7 +421,7 @@ def check_neon_works( ) shutil.rmtree(repo_dir / "local_fs_remote_storage") - pageserver_http.timeline_delete(tenant_id, timeline_id) + timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id) pageserver_http.timeline_create(pg_version, tenant_id, timeline_id) pg_bin.run( ["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"] diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 852ae55757..1c7d8e8190 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -14,7 +14,11 @@ from fixtures.neon_fixtures import ( NeonEnvBuilder, PgBin, ) -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + timeline_delete_wait_completed, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import subprocess_capture @@ -158,7 +162,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build ".*files not bound to index_file.json, proceeding with their deletion.*" ) - client.timeline_delete(tenant, timeline) + timeline_delete_wait_completed(client, tenant, timeline) # Importing correct backup works import_tar(base_tar, wal_tar) diff --git a/test_runner/regress/test_layer_eviction.py b/test_runner/regress/test_layer_eviction.py index a96532c0d8..b22e545f20 100644 --- a/test_runner/regress/test_layer_eviction.py +++ b/test_runner/regress/test_layer_eviction.py @@ -24,7 +24,13 @@ def test_basic_eviction( test_name="test_download_remote_layers_api", ) - env = neon_env_builder.init_start() + env = neon_env_builder.init_start( + initial_tenant_conf={ + # disable gc and compaction background loops because they perform on-demand downloads + "gc_period": "0s", + "compaction_period": "0s", + } + ) client = env.pageserver.http_client() endpoint = env.endpoints.create_start("main") @@ -47,6 +53,11 @@ def test_basic_eviction( client.timeline_checkpoint(tenant_id, timeline_id) wait_for_upload(client, tenant_id, timeline_id, current_lsn) + # disable compute & sks to avoid on-demand downloads by walreceiver / getpage + endpoint.stop() + for sk in env.safekeepers: + sk.stop() + timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) initial_local_layers = sorted( list(filter(lambda path: path.name != "metadata", timeline_path.glob("*"))) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 87f4a148b4..36a0ec3c60 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -19,7 +19,7 @@ from fixtures.neon_fixtures import ( ) from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pageserver.utils import ( - assert_timeline_detail_404, + timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, wait_until_tenant_active, @@ -596,14 +596,11 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue( env.pageserver.allowed_errors.append( ".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping" ) - client.timeline_delete(tenant_id, timeline_id) - env.pageserver.allowed_errors.append(f".*Timeline {tenant_id}/{timeline_id} was not found.*") env.pageserver.allowed_errors.append( ".*files not bound to index_file.json, proceeding with their deletion.*" ) - - wait_until(2, 0.5, lambda: assert_timeline_detail_404(client, tenant_id, timeline_id)) + timeline_delete_wait_completed(client, tenant_id, timeline_id) assert not timeline_path.exists() diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index e9dcd1e5cd..a0f9f854ed 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -11,6 +11,7 @@ from fixtures.neon_fixtures import ( wait_for_wal_insert_lsn, ) from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.utils import timeline_delete_wait_completed from fixtures.pg_version import PgVersion, xfail_on_postgres from fixtures.types import Lsn, TenantId, TimelineId @@ -628,12 +629,12 @@ def test_get_tenant_size_with_multiple_branches( size_debug_file_before.write(size_debug) # teardown, delete branches, and the size should be going down - http_client.timeline_delete(tenant_id, first_branch_timeline_id) + timeline_delete_wait_completed(http_client, tenant_id, first_branch_timeline_id) size_after_deleting_first = http_client.tenant_size(tenant_id) assert size_after_deleting_first < size_after_thinning_branch - http_client.timeline_delete(tenant_id, second_branch_timeline_id) + timeline_delete_wait_completed(http_client, tenant_id, second_branch_timeline_id) size_after_deleting_second = http_client.tenant_size(tenant_id) assert size_after_deleting_second < size_after_deleting_first diff --git a/test_runner/regress/test_tenant_tasks.py b/test_runner/regress/test_tenant_tasks.py index 21e4af4127..75e5c2c91c 100644 --- a/test_runner/regress/test_tenant_tasks.py +++ b/test_runner/regress/test_tenant_tasks.py @@ -1,6 +1,10 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder -from fixtures.pageserver.utils import assert_tenant_state, wait_until_tenant_active +from fixtures.pageserver.utils import ( + assert_tenant_state, + timeline_delete_wait_completed, + wait_until_tenant_active, +) from fixtures.types import TenantId, TimelineId from fixtures.utils import wait_until @@ -24,7 +28,7 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder): def delete_all_timelines(tenant: TenantId): timelines = [TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)] for t in timelines: - client.timeline_delete(tenant, t) + timeline_delete_wait_completed(client, tenant, t) # Create tenant, start compute tenant, _ = env.neon_cli.create_tenant() diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 946bb4ef34..989f31f1fa 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -23,6 +23,7 @@ from fixtures.neon_fixtures import ( last_flush_lsn_checkpoint, last_flush_lsn_upload, ) +from fixtures.pageserver.utils import timeline_delete_wait_completed from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import wait_until from prometheus_client.samples import Sample @@ -215,7 +216,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder): # Test (a subset of) pageserver global metrics for metric in PAGESERVER_GLOBAL_METRICS: ps_samples = ps_metrics.query_all(metric, {}) - assert len(ps_samples) > 0 + assert len(ps_samples) > 0, f"expected at least one sample for {metric}" for sample in ps_samples: labels = ",".join([f'{key}="{value}"' for key, value in sample.labels.items()]) log.info(f"{sample.name}{{{labels}}} {sample.value}") @@ -331,9 +332,10 @@ def test_pageserver_with_empty_tenants( client.tenant_create(tenant_with_empty_timelines) temp_timelines = client.timeline_list(tenant_with_empty_timelines) for temp_timeline in temp_timelines: - client.timeline_delete( - tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"]) + timeline_delete_wait_completed( + client, tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"]) ) + files_in_timelines_dir = sum( 1 for _p in Path.iterdir( diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 8f23c0615b..1307b3c74c 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -17,9 +17,10 @@ from fixtures.neon_fixtures import ( ) from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import ( - assert_timeline_detail_404, + timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, + wait_timeline_detail_404, wait_until_tenant_active, wait_until_timeline_state, ) @@ -83,7 +84,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): wait_until( number_of_iterations=3, interval=0.2, - func=lambda: ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id), + func=lambda: timeline_delete_wait_completed(ps_http, env.initial_tenant, leaf_timeline_id), ) assert not timeline_path.exists() @@ -94,16 +95,16 @@ def test_timeline_delete(neon_simple_env: NeonEnv): match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found", ) as exc: ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) - - # FIXME leaves tenant without timelines, should we prevent deletion of root timeline? - wait_until( - number_of_iterations=3, - interval=0.2, - func=lambda: ps_http.timeline_delete(env.initial_tenant, parent_timeline_id), - ) - assert exc.value.status_code == 404 + wait_until( + number_of_iterations=3, + interval=0.2, + func=lambda: timeline_delete_wait_completed( + ps_http, env.initial_tenant, parent_timeline_id + ), + ) + # Check that we didn't pick up the timeline again after restart. # See https://github.com/neondatabase/neon/issues/3560 env.pageserver.stop(immediate=True) @@ -143,7 +144,6 @@ def test_delete_timeline_post_rm_failure( ps_http.configure_failpoints((failpoint_name, "return")) ps_http.timeline_delete(env.initial_tenant, env.initial_timeline) - timeline_info = wait_until_timeline_state( pageserver_http=ps_http, tenant_id=env.initial_tenant, @@ -165,13 +165,7 @@ def test_delete_timeline_post_rm_failure( # this should succeed # this also checks that delete can be retried even when timeline is in Broken state - ps_http.timeline_delete(env.initial_tenant, env.initial_timeline, timeout=2) - with pytest.raises(PageserverApiException) as e: - ps_http.timeline_detail(env.initial_tenant, env.initial_timeline) - - assert e.value.status_code == 404 - - env.pageserver.allowed_errors.append(f".*NotFound: Timeline.*{env.initial_timeline}.*") + timeline_delete_wait_completed(ps_http, env.initial_tenant, env.initial_timeline) env.pageserver.allowed_errors.append( f".*{env.initial_timeline}.*timeline directory not found, proceeding anyway.*" ) @@ -247,13 +241,7 @@ def test_timeline_resurrection_on_attach( pass # delete new timeline - ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id) - - env.pageserver.allowed_errors.append( - f".*Timeline {tenant_id}/{branch_timeline_id} was not found.*" - ) - - wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, branch_timeline_id)) + timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=branch_timeline_id) ##### Stop the pageserver instance, erase all its data env.endpoints.stop_all() @@ -338,7 +326,6 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild ) ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id) - timeline_info = wait_until_timeline_state( pageserver_http=ps_http, tenant_id=env.initial_tenant, @@ -357,12 +344,15 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild # Wait for tenant to finish loading. wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1) - env.pageserver.allowed_errors.append( - f".*Timeline {env.initial_tenant}/{leaf_timeline_id} was not found.*" - ) - wait_until( - 2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id) - ) + try: + data = ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) + log.debug(f"detail {data}") + except PageserverApiException as e: + log.debug(e) + if e.status_code != 404: + raise + else: + raise Exception("detail succeeded (it should return 404)") assert ( not leaf_timeline_path.exists() @@ -389,13 +379,8 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild assert env.initial_timeline is not None for timeline_id in (intermediate_timeline_id, env.initial_timeline): - ps_http.timeline_delete(env.initial_tenant, timeline_id) - - env.pageserver.allowed_errors.append( - f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*" - ) - wait_until( - 2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, timeline_id) + timeline_delete_wait_completed( + ps_http, tenant_id=env.initial_tenant, timeline_id=timeline_id ) assert_prefix_empty( @@ -419,23 +404,27 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild ) -def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( - neon_env_builder: NeonEnvBuilder, +@pytest.mark.parametrize( + "stuck_failpoint", + ["persist_deleted_index_part", "in_progress_delete"], +) +def test_concurrent_timeline_delete_stuck_on( + neon_env_builder: NeonEnvBuilder, stuck_failpoint: str ): """ - If we're stuck uploading the index file with the is_delete flag, - eventually console will hand up and retry. - If we're still stuck at the retry time, ensure that the retry - fails with status 500, signalling to console that it should retry - later. - Ideally, timeline_delete should return 202 Accepted and require - console to poll for completion, but, that would require changing - the API contract. + If delete is stuck console will eventually retry deletion. + So we need to be sure that these requests wont interleave with each other. + In this tests we check two places where we can spend a lot of time. + This is a regression test because there was a bug when DeletionGuard wasnt propagated + to the background task. + + Ensure that when retry comes if we're still stuck request will get an immediate error response, + signalling to console that it should retry later. """ neon_env_builder.enable_remote_storage( remote_storage_kind=RemoteStorageKind.MOCK_S3, - test_name="test_concurrent_timeline_delete_if_first_stuck_at_index_upload", + test_name=f"concurrent_timeline_delete_stuck_on_{stuck_failpoint}", ) env = neon_env_builder.init_start() @@ -445,13 +434,14 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( ps_http = env.pageserver.http_client() # make the first call sleep practically forever - failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause" - ps_http.configure_failpoints((failpoint_name, "pause")) + ps_http.configure_failpoints((stuck_failpoint, "pause")) def first_call(result_queue): try: log.info("first call start") - ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10) + timeline_delete_wait_completed( + ps_http, env.initial_tenant, child_timeline_id, timeout=10 + ) log.info("first call success") result_queue.put("success") except Exception: @@ -466,7 +456,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( def first_call_hit_failpoint(): assert env.pageserver.log_contains( - f".*{child_timeline_id}.*at failpoint {failpoint_name}" + f".*{child_timeline_id}.*at failpoint {stuck_failpoint}" ) wait_until(50, 0.1, first_call_hit_failpoint) @@ -484,8 +474,12 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( ) log.info("second call failed as expected") + # ensure it is not 404 and stopping + detail = ps_http.timeline_detail(env.initial_tenant, child_timeline_id) + assert detail["state"] == "Stopping" + # by now we know that the second call failed, let's ensure the first call will finish - ps_http.configure_failpoints((failpoint_name, "off")) + ps_http.configure_failpoints((stuck_failpoint, "off")) result = first_call_result.get() assert result == "success" @@ -498,8 +492,10 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): """ - If the client hangs up before we start the index part upload but after we mark it + If the client hangs up before we start the index part upload but after deletion is scheduled + we mark it deleted in local memory, a subsequent delete_timeline call should be able to do + another delete timeline operation. This tests cancel safety up to the given failpoint. @@ -515,12 +511,18 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): ps_http = env.pageserver.http_client() - failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause" + failpoint_name = "persist_deleted_index_part" ps_http.configure_failpoints((failpoint_name, "pause")) with pytest.raises(requests.exceptions.Timeout): ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2) + env.pageserver.allowed_errors.append( + f".*{child_timeline_id}.*timeline deletion is already in progress.*" + ) + with pytest.raises(PageserverApiException, match="timeline deletion is already in progress"): + ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2) + # make sure the timeout was due to the failpoint at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*" @@ -552,12 +554,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): wait_until(50, 0.1, first_request_finished) # check that the timeline is gone - notfound_message = f"Timeline {env.initial_tenant}/{child_timeline_id} was not found" - env.pageserver.allowed_errors.append(".*" + notfound_message) - with pytest.raises(PageserverApiException, match=notfound_message) as exc: - ps_http.timeline_detail(env.initial_tenant, child_timeline_id) - - assert exc.value.status_code == 404 + wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id) @pytest.mark.parametrize( @@ -616,12 +613,7 @@ def test_timeline_delete_works_for_remote_smoke( for timeline_id in reversed(timeline_ids): # note that we need to finish previous deletion before scheduling next one # otherwise we can get an "HasChildren" error if deletion is not fast enough (real_s3) - ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id) - - env.pageserver.allowed_errors.append( - f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*" - ) - wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, timeline_id)) + timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=timeline_id) assert_prefix_empty( neon_env_builder, diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 1460172afe..5bdbc18927 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -24,6 +24,7 @@ from fixtures.neon_fixtures import ( from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pageserver.utils import ( assert_tenant_state, + timeline_delete_wait_completed, wait_for_upload_queue_empty, wait_until_tenant_active, ) @@ -272,7 +273,7 @@ def test_timeline_initial_logical_size_calculation_cancellation( if deletion_method == "tenant_detach": client.tenant_detach(tenant_id) elif deletion_method == "timeline_delete": - client.timeline_delete(tenant_id, timeline_id) + timeline_delete_wait_completed(client, tenant_id, timeline_id) delete_timeline_success.put(True) except PageserverApiException: delete_timeline_success.put(False) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 8b595596cb..994858edf7 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -31,7 +31,11 @@ from fixtures.neon_fixtures import ( SafekeeperPort, available_remote_storages, ) -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + timeline_delete_wait_completed, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.pg_version import PgVersion from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import get_dir_size, query_scalar, start_in_background @@ -548,15 +552,15 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re f"sk_id={sk.id} to flush {last_lsn}", ) - ps_cli = env.pageserver.http_client() - pageserver_lsn = Lsn(ps_cli.timeline_detail(tenant_id, timeline_id)["last_record_lsn"]) + ps_http = env.pageserver.http_client() + pageserver_lsn = Lsn(ps_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"]) lag = last_lsn - pageserver_lsn log.info( f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb" ) endpoint.stop_and_destroy() - ps_cli.timeline_delete(tenant_id, timeline_id) + timeline_delete_wait_completed(ps_http, tenant_id, timeline_id) # Also delete and manually create timeline on safekeepers -- this tests # scenario of manual recovery on different set of safekeepers. @@ -571,11 +575,21 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version + # Terminate first all safekeepers to prevent communication unexpectantly + # advancing peer_horizon_lsn. for sk in env.safekeepers: cli = sk.http_client() cli.timeline_delete_force(tenant_id, timeline_id) # restart safekeeper to clear its in-memory state - sk.stop().start() + sk.stop() + # wait all potenital in flight pushes to broker arrive before starting + # safekeepers (even without sleep, it is very unlikely they are not + # delivered yet). + time.sleep(1) + + for sk in env.safekeepers: + sk.start() + cli = sk.http_client() cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn) f_partial_path = ( Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name @@ -583,7 +597,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re shutil.copy(f_partial_saved, f_partial_path) # recreate timeline on pageserver from scratch - ps_cli.timeline_create( + ps_http.timeline_create( pg_version=PgVersion(pg_version), tenant_id=tenant_id, new_timeline_id=timeline_id, @@ -598,7 +612,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re if elapsed > wait_lsn_timeout: raise RuntimeError("Timed out waiting for WAL redo") - tenant_status = ps_cli.tenant_status(tenant_id) + tenant_status = ps_http.tenant_status(tenant_id) if tenant_status["state"]["slug"] == "Loading": log.debug(f"Tenant {tenant_id} is still loading, retrying") else: diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py index 515d47c079..7ac6e6332c 100644 --- a/test_runner/regress/test_wal_receiver.py +++ b/test_runner/regress/test_wal_receiver.py @@ -1,3 +1,5 @@ +import time + from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder from fixtures.types import Lsn, TenantId @@ -40,7 +42,10 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder): # Kills one of the safekeepers and ensures that only the active ones are printed in the state. def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder): # Trigger WAL wait timeout faster - neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'" + neon_env_builder.pageserver_config_override = """ + wait_lsn_timeout = "1s" + tenant_config={walreceiver_connect_timeout = "2s", lagging_wal_timeout = "2s"} + """ # Have notable SK ids to ensure we check logs for their presence, not some other random numbers neon_env_builder.safekeepers_id_start = 12345 neon_env_builder.num_safekeepers = 3 @@ -70,6 +75,8 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil stopped_safekeeper_id = stopped_safekeeper.id log.info(f"Stopping safekeeper {stopped_safekeeper.id}") stopped_safekeeper.stop() + # sleep until stopped safekeeper is removed from candidates + time.sleep(2) # Spend some more time inserting, to ensure SKs report updated statuses and walreceiver in PS have time to update its connection stats. insert_test_elements(env, tenant_id, start=elements_to_insert + 1, count=elements_to_insert)