From 84bc3380cc82a27b06717d2f239ac1fea74880fc Mon Sep 17 00:00:00 2001 From: "devin-ai-integration[bot]" <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 28 Apr 2025 17:34:47 +0200 Subject: [PATCH 01/15] Remove SAFEKEEPER_AUTH_TOKEN env var parsing from safekeeper (#11698) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Remove SAFEKEEPER_AUTH_TOKEN env var parsing from safekeeper This PR is a follow-up to #11443 that removes the parsing of the `SAFEKEEPER_AUTH_TOKEN` environment variable from the safekeeper codebase while keeping the `auth_token_path` CLI flag functionality. ## Changes: - Removed code that checks for the `SAFEKEEPER_AUTH_TOKEN` environment variable - Updated comments to reflect that only the `auth_token_path` CLI flag is now used As mentioned in PR #11443, the environment variable approach was planned to be deprecated and removed in favor of the file-based approach, which is more secure since environment variables can be quite public in both procfs and unit files. Link to Devin run: https://app.devin.ai/sessions/d6f56cf1b4164ea9880a9a06358a58ac Requested by: arpad@neon.tech --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: arpad@neon.tech Co-authored-by: Arpad Müller --- control_plane/src/safekeeper.rs | 39 ++++++++++++++++++++------------ safekeeper/src/bin/safekeeper.rs | 31 ++++++------------------- 2 files changed, 32 insertions(+), 38 deletions(-) diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 948e3c8c93..eec2c997e6 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -112,7 +112,7 @@ impl SafekeeperNode { } /// Initializes a safekeeper node by creating all necessary files, - /// e.g. SSL certificates. + /// e.g. SSL certificates and JWT token file. pub fn initialize(&self) -> anyhow::Result<()> { if self.env.generate_local_ssl_certs { self.env.generate_ssl_cert( @@ -120,6 +120,17 @@ impl SafekeeperNode { &self.datadir_path().join("server.key"), )?; } + + // Generate a token file for authentication with other safekeepers + if self.conf.auth_enabled { + let token = self + .env + .generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?; + + let token_path = self.datadir_path().join("peer_jwt_token"); + std::fs::write(token_path, token)?; + } + Ok(()) } @@ -218,14 +229,26 @@ impl SafekeeperNode { args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap())); } + if self.conf.auth_enabled { + let token_path = self.datadir_path().join("peer_jwt_token"); + let token_path_str = token_path + .to_str() + .with_context(|| { + format!("Token path {token_path:?} cannot be represented as a unicode string") + })? + .to_owned(); + args.extend(["--auth-token-path".to_owned(), token_path_str]); + } + args.extend_from_slice(extra_opts); + let env_variables = Vec::new(); background_process::start_process( &format!("safekeeper-{id}"), &datadir, &self.env.safekeeper_bin(), &args, - self.safekeeper_env_variables()?, + env_variables, background_process::InitialPidFile::Expect(self.pid_file()), retry_timeout, || async { @@ -239,18 +262,6 @@ impl SafekeeperNode { .await } - fn safekeeper_env_variables(&self) -> anyhow::Result> { - // Generate a token to connect from safekeeper to peers - if self.conf.auth_enabled { - let token = self - .env - .generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?; - Ok(vec![("SAFEKEEPER_AUTH_TOKEN".to_owned(), token)]) - } else { - Ok(Vec::new()) - } - } - /// /// Stop the server. /// diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index dd71420efb..c267a55cb6 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -1,7 +1,6 @@ // // Main entry point for the safekeeper executable // -use std::env::{VarError, var}; use std::fs::{self, File}; use std::io::{ErrorKind, Write}; use std::str::FromStr; @@ -354,29 +353,13 @@ async fn main() -> anyhow::Result<()> { }; // Load JWT auth token to connect to other safekeepers for pull_timeline. - // First check if the env var is present, then check the arg with the path. - // We want to deprecate and remove the env var method in the future. - let sk_auth_token = match var("SAFEKEEPER_AUTH_TOKEN") { - Ok(v) => { - info!("loaded JWT token for authentication with safekeepers"); - Some(SecretString::from(v)) - } - Err(VarError::NotPresent) => { - if let Some(auth_token_path) = args.auth_token_path.as_ref() { - info!( - "loading JWT token for authentication with safekeepers from {auth_token_path}" - ); - let auth_token = tokio::fs::read_to_string(auth_token_path).await?; - Some(SecretString::from(auth_token.trim().to_owned())) - } else { - info!("no JWT token for authentication with safekeepers detected"); - None - } - } - Err(_) => { - warn!("JWT token for authentication with safekeepers is not unicode"); - None - } + let sk_auth_token = if let Some(auth_token_path) = args.auth_token_path.as_ref() { + info!("loading JWT token for authentication with safekeepers from {auth_token_path}"); + let auth_token = tokio::fs::read_to_string(auth_token_path).await?; + Some(SecretString::from(auth_token.trim().to_owned())) + } else { + info!("no JWT token for authentication with safekeepers detected"); + None }; let ssl_ca_certs = match args.ssl_ca_file.as_ref() { From b1fa68f6592672a8e988b85f0de83749fece2dab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JC=20Gr=C3=BCnhage?= Date: Mon, 28 Apr 2025 18:37:36 +0200 Subject: [PATCH 02/15] impr(ci): switch release PR creation over to use python based action (#11679) ## Problem Our different repositories had both had code to achieve very similar results in terms of release PR creation, but they were structured differently and had different extensions. This was likely to cause maintainability problems in the long run. ## Summary of changes Switch to a python cli based composite action for creating the release PRs that will also be introduced in our other repos later. ## To Do - [ ] Adjust our docs to reflect the changes from this. --- .github/workflows/_create-release-pr.yml | 103 ----------------------- .github/workflows/release-compute.yml | 12 +++ .github/workflows/release-proxy.yml | 12 +++ .github/workflows/release-storage.yml | 12 +++ .github/workflows/release.yml | 93 ++++++++++---------- 5 files changed, 82 insertions(+), 150 deletions(-) delete mode 100644 .github/workflows/_create-release-pr.yml create mode 100644 .github/workflows/release-compute.yml create mode 100644 .github/workflows/release-proxy.yml create mode 100644 .github/workflows/release-storage.yml diff --git a/.github/workflows/_create-release-pr.yml b/.github/workflows/_create-release-pr.yml deleted file mode 100644 index f96ed7d69b..0000000000 --- a/.github/workflows/_create-release-pr.yml +++ /dev/null @@ -1,103 +0,0 @@ -name: Create Release PR - -on: - workflow_call: - inputs: - component-name: - description: 'Component name' - required: true - type: string - source-branch: - description: 'Source branch' - required: true - type: string - secrets: - ci-access-token: - description: 'CI access token' - required: true - -defaults: - run: - shell: bash -euo pipefail {0} - -permissions: - contents: read - -jobs: - create-release-branch: - runs-on: ubuntu-22.04 - - permissions: - contents: write # for `git push` - - steps: - - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0 - with: - egress-policy: audit - - - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - with: - ref: ${{ inputs.source-branch }} - fetch-depth: 0 - - - name: Set variables - id: vars - env: - COMPONENT_NAME: ${{ inputs.component-name }} - RELEASE_BRANCH: >- - ${{ - false - || inputs.component-name == 'Storage' && 'release' - || inputs.component-name == 'Proxy' && 'release-proxy' - || inputs.component-name == 'Compute' && 'release-compute' - }} - run: | - now_date=$(date -u +'%Y-%m-%d') - now_time=$(date -u +'%H-%M-%Z') - { - echo "title=${COMPONENT_NAME} release ${now_date}" - echo "rc-branch=rc/${RELEASE_BRANCH}/${now_date}_${now_time}" - echo "release-branch=${RELEASE_BRANCH}" - } | tee -a ${GITHUB_OUTPUT} - - - name: Configure git - run: | - git config user.name "github-actions[bot]" - git config user.email "41898282+github-actions[bot]@users.noreply.github.com" - - - name: Create RC branch - env: - RELEASE_BRANCH: ${{ steps.vars.outputs.release-branch }} - RC_BRANCH: ${{ steps.vars.outputs.rc-branch }} - TITLE: ${{ steps.vars.outputs.title }} - run: | - git switch -c "${RC_BRANCH}" - - # Manually create a merge commit on the current branch, keeping the - # tree and setting the parents to the current HEAD and the HEAD of the - # release branch. This commit is what we'll fast-forward the release - # branch to when merging the release branch. - # For details on why, look at - # https://docs.neon.build/overview/repositories/neon.html#background-on-commit-history-of-release-prs - current_tree=$(git rev-parse 'HEAD^{tree}') - release_head=$(git rev-parse "origin/${RELEASE_BRANCH}") - current_head=$(git rev-parse HEAD) - merge_commit=$(git commit-tree -p "${current_head}" -p "${release_head}" -m "${TITLE}" "${current_tree}") - - # Fast-forward the current branch to the newly created merge_commit - git merge --ff-only ${merge_commit} - - git push origin "${RC_BRANCH}" - - - name: Create a PR into ${{ steps.vars.outputs.release-branch }} - env: - GH_TOKEN: ${{ secrets.ci-access-token }} - RC_BRANCH: ${{ steps.vars.outputs.rc-branch }} - RELEASE_BRANCH: ${{ steps.vars.outputs.release-branch }} - TITLE: ${{ steps.vars.outputs.title }} - run: | - gh pr create --title "${TITLE}" \ - --body "" \ - --head "${RC_BRANCH}" \ - --base "${RELEASE_BRANCH}" diff --git a/.github/workflows/release-compute.yml b/.github/workflows/release-compute.yml new file mode 100644 index 0000000000..f123dd2f44 --- /dev/null +++ b/.github/workflows/release-compute.yml @@ -0,0 +1,12 @@ +name: Create compute release PR + +on: + schedule: + - cron: '0 7 * * FRI' + +jobs: + create-release-pr: + uses: ./.github/workflows/release.yml + with: + component: compute + secrets: inherit diff --git a/.github/workflows/release-proxy.yml b/.github/workflows/release-proxy.yml new file mode 100644 index 0000000000..d9055984d2 --- /dev/null +++ b/.github/workflows/release-proxy.yml @@ -0,0 +1,12 @@ +name: Create proxy release PR + +on: + schedule: + - cron: '0 6 * * TUE' + +jobs: + create-release-pr: + uses: ./.github/workflows/release.yml + with: + component: proxy + secrets: inherit diff --git a/.github/workflows/release-storage.yml b/.github/workflows/release-storage.yml new file mode 100644 index 0000000000..91f02fddda --- /dev/null +++ b/.github/workflows/release-storage.yml @@ -0,0 +1,12 @@ +name: Create storage release PR + +on: + schedule: + - cron: '0 6 * * FRI' + +jobs: + create-release-pr: + uses: ./.github/workflows/release.yml + with: + component: storage + secrets: inherit diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4068eafb95..4b19d6aa3f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,25 +1,34 @@ -name: Create Release Branch +name: Create release PR on: - schedule: - # It should be kept in sync with if-condition in jobs - - cron: '0 6 * * TUE' # Proxy release - - cron: '0 6 * * FRI' # Storage release - - cron: '0 7 * * FRI' # Compute release workflow_dispatch: inputs: - create-storage-release-branch: - type: boolean - description: 'Create Storage release PR' + component: + description: "Component to release" + required: true + type: choice + options: + - compute + - proxy + - storage + cherry-pick: + description: "Commits to cherry-pick (space separated, makes this a hotfix based on previous release)" required: false - create-proxy-release-branch: - type: boolean - description: 'Create Proxy release PR' - required: false - create-compute-release-branch: - type: boolean - description: 'Create Compute release PR' + type: string + default: '' + + workflow_call: + inputs: + component: + description: "Component to release" + required: true + type: string + cherry-pick: + description: "Commits to cherry-pick (space separated, makes this a hotfix based on previous release)" required: false + type: string + default: '' + # No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job. permissions: {} @@ -29,41 +38,31 @@ defaults: shell: bash -euo pipefail {0} jobs: - create-storage-release-branch: - if: ${{ github.event.schedule == '0 6 * * FRI' || inputs.create-storage-release-branch }} + create-release-pr: + runs-on: ubuntu-22.04 permissions: contents: write - uses: ./.github/workflows/_create-release-pr.yml - with: - component-name: 'Storage' - source-branch: ${{ github.ref_name }} - secrets: - ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }} + steps: + - name: Harden the runner (Audit all outbound calls) + uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0 + with: + egress-policy: audit - create-proxy-release-branch: - if: ${{ github.event.schedule == '0 6 * * TUE' || inputs.create-proxy-release-branch }} + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + fetch-depth: 0 - permissions: - contents: write + - name: Configure git + run: | + git config user.name "github-actions[bot]" + git config user.email "41898282+github-actions[bot]@users.noreply.github.com" - uses: ./.github/workflows/_create-release-pr.yml - with: - component-name: 'Proxy' - source-branch: ${{ github.ref_name }} - secrets: - ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }} - - create-compute-release-branch: - if: ${{ github.event.schedule == '0 7 * * FRI' || inputs.create-compute-release-branch }} - - permissions: - contents: write - - uses: ./.github/workflows/_create-release-pr.yml - with: - component-name: 'Compute' - source-branch: ${{ github.ref_name }} - secrets: - ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }} + - name: Create release PR + uses: neondatabase/dev-actions/release-pr@02b41460646b70d12dd33e5f56ebc5af2384c993 + with: + component: ${{ inputs.component }} + cherry-pick: ${{ inputs.cherry-pick }} + env: + GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }} From 998d2c2ce9315f2ec7b7bc2b5ce0253e6c758d9d Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 28 Apr 2025 17:43:35 +0100 Subject: [PATCH 03/15] storcon: use shard 0's initdb for timeline creation (#11727) ## Problem In princple, pageservers with different postgres binaries might generate different initdbs, resulting in inconsistency between shards. To avoid that, we should have shard 0 generate the initdb and other shards re-use it. Fixes: https://github.com/neondatabase/neon/issues/11340 ## Summary of changes - For shards with index greater than zero, set `existing_initdb_timeline_id` in timeline creation to consume the existing initdb rather than creating a new one --- storage_controller/src/service.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index ca9b911c4d..0f71a87f13 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -3720,6 +3720,10 @@ impl Service { // Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then // use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard // that will get the first creation request, and propagate the LSN to all the >0 shards. + // + // This also enables non-zero shards to use the initdb that shard 0 generated and uploaded to S3, rather than + // independently generating their own initdb. This guarantees that shards cannot end up with different initial + // states if e.g. they have different postgres binary versions. let timeline_info = create_one( shard_zero_tid, shard_zero_locations, @@ -3729,11 +3733,16 @@ impl Service { ) .await?; - // Propagate the LSN that shard zero picked, if caller didn't provide one + // Update the create request for shards >= 0 match &mut create_req.mode { models::TimelineCreateRequestMode::Branch { ancestor_start_lsn, .. } if ancestor_start_lsn.is_none() => { + // Propagate the LSN that shard zero picked, if caller didn't provide one *ancestor_start_lsn = timeline_info.ancestor_lsn; }, + models::TimelineCreateRequestMode::Bootstrap { existing_initdb_timeline_id, .. } => { + // For shards >= 0, do not run initdb: use the one that shard 0 uploaded to S3 + *existing_initdb_timeline_id = Some(create_req.new_timeline_id) + } _ => {} } From a750026c2e69344908464c59ba820fc0639abeb1 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Mon, 28 Apr 2025 12:09:48 -0500 Subject: [PATCH 04/15] Fix compiler warning in libpagestore.c when WITH_SANITIZERS=yes (#11755) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Postgres has a nice self-documenting macro called pg_unreachable() when you want to assert that a location in code won't be hit. Warning in question: ``` /home/tristan957/Projects/work/neon//pgxn/neon/libpagestore.c: In function ‘pageserver_connect’: /home/tristan957/Projects/work/neon//pgxn/neon/libpagestore.c:739:1: warning: control reaches end of non-void function [-Wreturn-type] 739 | } | ^ ``` Signed-off-by: Tristan Partin --- pgxn/neon/libpagestore.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index ccb072d6f9..5287c12a84 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -736,8 +736,8 @@ pageserver_connect(shardno_t shard_no, int elevel) default: neon_shard_log(shard_no, ERROR, "libpagestore: invalid connection state %d", shard->state); } - /* This shouldn't be hit */ - Assert(false); + + pg_unreachable(); } static void From 04826905340da9ac80aa96b2892a137eb91b3b7d Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 28 Apr 2025 18:24:55 +0100 Subject: [PATCH 05/15] pageserver: make control_plane_api & generations fully mandatory (#10715) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem We had retained the ability to run in a generation-less mode to support test_generations_upgrade, which was replaced with a cleaner backward compat test in https://github.com/neondatabase/neon/pull/10701 ## Summary of changes - Remove all the special cases for "if no generation" or "if no control plane api" - Make control_plane_api config mandatory --------- Co-authored-by: Arpad Müller --- .../pageserver_config/pageserver.toml | 2 ++ pageserver/src/bin/pageserver.rs | 2 +- pageserver/src/config.rs | 13 +++++++--- pageserver/src/controller_upcall_client.rs | 20 ++++++-------- pageserver/src/deletion_queue.rs | 11 +++----- pageserver/src/deletion_queue/validator.rs | 26 ++++++++----------- pageserver/src/tenant.rs | 4 +-- pageserver/src/tenant/mgr.rs | 19 +++----------- .../src/tenant/timeline/import_pgdata.rs | 3 +-- test_runner/fixtures/neon_fixtures.py | 3 +-- .../regress/test_pageserver_generations.py | 2 +- 11 files changed, 43 insertions(+), 62 deletions(-) diff --git a/docker-compose/pageserver_config/pageserver.toml b/docker-compose/pageserver_config/pageserver.toml index 76935453b6..7d603b6c65 100644 --- a/docker-compose/pageserver_config/pageserver.toml +++ b/docker-compose/pageserver_config/pageserver.toml @@ -3,3 +3,5 @@ pg_distrib_dir='/usr/local/' listen_pg_addr='0.0.0.0:6400' listen_http_addr='0.0.0.0:9898' remote_storage={ endpoint='http://minio:9000', bucket_name='neon', bucket_region='eu-north-1', prefix_in_bucket='/pageserver' } +control_plane_api='http://0.0.0.0:6666' # No storage controller in docker compose, specify a junk address +control_plane_emergency_mode=true diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 6cfaec955b..4c2572a577 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -504,7 +504,7 @@ fn start_pageserver( // Set up deletion queue let (deletion_queue, deletion_workers) = DeletionQueue::new( remote_storage.clone(), - StorageControllerUpcallClient::new(conf, &shutdown_pageserver)?, + StorageControllerUpcallClient::new(conf, &shutdown_pageserver), conf, ); deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle()); diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 95143e58b7..ded2805602 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -150,7 +150,7 @@ pub struct PageServerConf { /// not terrible. pub background_task_maximum_delay: Duration, - pub control_plane_api: Option, + pub control_plane_api: Url, /// JWT token for use with the control plane API. pub control_plane_api_token: Option, @@ -438,7 +438,8 @@ impl PageServerConf { test_remote_failures, ondemand_download_behavior_treat_error_as_warn, background_task_maximum_delay, - control_plane_api, + control_plane_api: control_plane_api + .ok_or_else(|| anyhow::anyhow!("`control_plane_api` must be set"))?, control_plane_emergency_mode, heatmap_upload_concurrency, secondary_download_concurrency, @@ -573,6 +574,7 @@ impl PageServerConf { background_task_maximum_delay: Duration::ZERO, load_previous_heatmap: Some(true), generate_unarchival_heatmap: Some(true), + control_plane_api: Some(Url::parse("http://localhost:6666").unwrap()), ..Default::default() }; PageServerConf::parse_and_validate(NodeId(0), config_toml, &repo_dir).unwrap() @@ -641,9 +643,12 @@ mod tests { use super::PageServerConf; #[test] - fn test_empty_config_toml_is_valid() { - // we use Default impl of everything in this situation + fn test_minimal_config_toml_is_valid() { + // The minimal valid config for running a pageserver: + // - control_plane_api is mandatory, as pageservers cannot run in isolation + // - we use Default impl of everything else in this situation let input = r#" + control_plane_api = "http://localhost:6666" "#; let config_toml = toml_edit::de::from_str::(input) .expect("empty config is valid"); diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index 59c94f1549..468e5463b0 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -58,14 +58,8 @@ pub trait StorageControllerUpcallApi { impl StorageControllerUpcallClient { /// A None return value indicates that the input `conf` object does not have control /// plane API enabled. - pub fn new( - conf: &'static PageServerConf, - cancel: &CancellationToken, - ) -> Result, reqwest::Error> { - let mut url = match conf.control_plane_api.as_ref() { - Some(u) => u.clone(), - None => return Ok(None), - }; + pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Self { + let mut url = conf.control_plane_api.clone(); if let Ok(mut segs) = url.path_segments_mut() { // This ensures that `url` ends with a slash if it doesn't already. @@ -85,15 +79,17 @@ impl StorageControllerUpcallClient { } for cert in &conf.ssl_ca_certs { - client = client.add_root_certificate(Certificate::from_der(cert.contents())?); + client = client.add_root_certificate( + Certificate::from_der(cert.contents()).expect("Invalid certificate in config"), + ); } - Ok(Some(Self { - http_client: client.build()?, + Self { + http_client: client.build().expect("Failed to construct HTTP client"), base_url: url, node_id: conf.id, cancel: cancel.clone(), - })) + } } #[tracing::instrument(skip_all)] diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 6dd7d741c1..4d62bc4ab5 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -585,7 +585,7 @@ impl DeletionQueue { /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice. pub fn new( remote_storage: GenericRemoteStorage, - controller_upcall_client: Option, + controller_upcall_client: C, conf: &'static PageServerConf, ) -> (Self, DeletionQueueWorkers) where @@ -701,7 +701,7 @@ mod test { async fn restart(&mut self) { let (deletion_queue, workers) = DeletionQueue::new( self.storage.clone(), - Some(self.mock_control_plane.clone()), + self.mock_control_plane.clone(), self.harness.conf, ); @@ -821,11 +821,8 @@ mod test { let mock_control_plane = MockStorageController::new(); - let (deletion_queue, worker) = DeletionQueue::new( - storage.clone(), - Some(mock_control_plane.clone()), - harness.conf, - ); + let (deletion_queue, worker) = + DeletionQueue::new(storage.clone(), mock_control_plane.clone(), harness.conf); let worker_join = worker.spawn_with(&tokio::runtime::Handle::current()); diff --git a/pageserver/src/deletion_queue/validator.rs b/pageserver/src/deletion_queue/validator.rs index 4e775f15eb..363b1427f5 100644 --- a/pageserver/src/deletion_queue/validator.rs +++ b/pageserver/src/deletion_queue/validator.rs @@ -53,7 +53,7 @@ where tx: tokio::sync::mpsc::Sender, // Client for calling into control plane API for validation of deletes - controller_upcall_client: Option, + controller_upcall_client: C, // DeletionLists which are waiting generation validation. Not safe to // execute until [`validate`] has processed them. @@ -86,7 +86,7 @@ where conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, tx: tokio::sync::mpsc::Sender, - controller_upcall_client: Option, + controller_upcall_client: C, lsn_table: Arc>, cancel: CancellationToken, ) -> Self { @@ -137,20 +137,16 @@ where return Ok(()); } - let tenants_valid = if let Some(controller_upcall_client) = &self.controller_upcall_client { - match controller_upcall_client - .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect()) - .await - { - Ok(tenants) => tenants, - Err(RetryForeverError::ShuttingDown) => { - // The only way a validation call returns an error is when the cancellation token fires - return Err(DeletionQueueError::ShuttingDown); - } + let tenants_valid = match self + .controller_upcall_client + .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect()) + .await + { + Ok(tenants) => tenants, + Err(RetryForeverError::ShuttingDown) => { + // The only way a validation call returns an error is when the cancellation token fires + return Err(DeletionQueueError::ShuttingDown); } - } else { - // Control plane API disabled. In legacy mode we consider everything valid. - tenant_generations.keys().map(|k| (*k, true)).collect() }; let mut validated_sequence: Option = None; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 698579e8fb..dcd043c4a1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4254,9 +4254,7 @@ impl TenantShard { deletion_queue_client: DeletionQueueClient, l0_flush_global_state: L0FlushGlobalState, ) -> TenantShard { - debug_assert!( - !attached_conf.location.generation.is_none() || conf.control_plane_api.is_none() - ); + assert!(!attached_conf.location.generation.is_none()); let (state, mut rx) = watch::channel(state); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 2ae7e1e875..86aef9b42c 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -346,7 +346,8 @@ async fn init_load_generations( "Emergency mode! Tenants will be attached unsafely using their last known generation" ); emergency_generations(tenant_confs) - } else if let Some(client) = StorageControllerUpcallClient::new(conf, cancel)? { + } else { + let client = StorageControllerUpcallClient::new(conf, cancel); info!("Calling {} API to re-attach tenants", client.base_url()); // If we are configured to use the control plane API, then it is the source of truth for what tenants to load. match client.re_attach(conf).await { @@ -360,9 +361,6 @@ async fn init_load_generations( anyhow::bail!("Shut down while waiting for control plane re-attach response") } } - } else { - info!("Control plane API not configured, tenant generations are disabled"); - return Ok(None); }; // The deletion queue needs to know about the startup attachment state to decide which (if any) stored @@ -1153,17 +1151,8 @@ impl TenantManager { // Testing hack: if we are configured with no control plane, then drop the generation // from upserts. This enables creating generation-less tenants even though neon_local // always uses generations when calling the location conf API. - let attached_conf = if cfg!(feature = "testing") { - let mut conf = AttachedTenantConf::try_from(new_location_config) - .map_err(UpsertLocationError::BadRequest)?; - if self.conf.control_plane_api.is_none() { - conf.location.generation = Generation::none(); - } - conf - } else { - AttachedTenantConf::try_from(new_location_config) - .map_err(UpsertLocationError::BadRequest)? - }; + let attached_conf = AttachedTenantConf::try_from(new_location_config) + .map_err(UpsertLocationError::BadRequest)?; let tenant = tenant_spawn( self.conf, diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index b917fdbfd8..6ab6b90cb6 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -163,8 +163,7 @@ pub async fn doit( // Ensure at-least-once delivery of the upcall to storage controller // before we mark the task as done and never come here again. // - let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)? - .expect("storcon configured"); + let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel); storcon_client .put_timeline_import_status( timeline.tenant_shard_id, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 1d668d4b2d..b93df4ede4 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1194,8 +1194,7 @@ class NeonEnv: else: cfg["broker"]["listen_addr"] = self.broker.listen_addr() - if self.control_plane_api is not None: - cfg["control_plane_api"] = self.control_plane_api + cfg["control_plane_api"] = self.control_plane_api if self.control_plane_hooks_api is not None: cfg["control_plane_hooks_api"] = self.control_plane_hooks_api diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index fa1cd61206..e3f9982486 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -3,7 +3,7 @@ Tests in this module exercise the pageserver's behavior around generation numbers, as defined in docs/rfcs/025-generation-numbers.md. Briefly, the behaviors we require of the pageserver are: -- Do not start a tenant without a generation number if control_plane_api is set +- Do not start a tenant without a generation number - Remote objects must be suffixed with generation - Deletions may only be executed after validating generation - Updates to remote_consistent_lsn may only be made visible after validating generation From 6d6b83e737b5b89796f6d84e06a3f7f420bccd6c Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 28 Apr 2025 21:17:03 +0300 Subject: [PATCH 06/15] Prewarm implementation (#11741) ## Problem Continue work on prewarm started in PR https://github.com/neondatabase/neon/pull/11740 ## Summary of changes Implement prewarm using prefetch --------- Co-authored-by: Konstantin Knizhnik --- pgxn/neon/file_cache.c | 389 ++++++++++++++++++++++++++++++++++++++++- pgxn/neon/file_cache.h | 14 ++ 2 files changed, 397 insertions(+), 6 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index e2c1f7682f..924e0055c1 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -98,7 +98,6 @@ #define MB ((uint64)1024*1024) #define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ >> lfc_chunk_size_log)) - #define BLOCK_TO_CHUNK_OFF(blkno) ((blkno) & (lfc_blocks_per_chunk-1)) /* @@ -135,6 +134,15 @@ typedef struct FileCacheEntry #define N_COND_VARS 64 #define CV_WAIT_TIMEOUT 10 +#define MAX_PREWARM_WORKERS 8 + +typedef struct PrewarmWorkerState +{ + uint32 prewarmed_pages; + uint32 skipped_pages; + TimestampTz completed; +} PrewarmWorkerState; + typedef struct FileCacheControl { uint64 generation; /* generation is needed to handle correct hash @@ -156,25 +164,43 @@ typedef struct FileCacheControl dlist_head holes; /* double linked list of punched holes */ HyperLogLogState wss_estimation; /* estimation of working set size */ ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */ + PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS]; + size_t n_prewarm_workers; + size_t n_prewarm_entries; + size_t total_prewarm_pages; + size_t prewarm_batch; + bool prewarm_active; + bool prewarm_canceled; + dsm_handle prewarm_lfc_state_handle; } FileCacheControl; -bool lfc_store_prefetch_result; +#define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc + +#define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks]) +#define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * lfc_blocks_per_chunk)+7)/8) +#define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8) static HTAB *lfc_hash; static int lfc_desc = -1; static LWLockId lfc_lock; static int lfc_max_size; static int lfc_size_limit; +static int lfc_prewarm_limit; +static int lfc_prewarm_batch; static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG; static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK; static char *lfc_path; static uint64 lfc_generation; static FileCacheControl *lfc_ctl; +static bool lfc_do_prewarm; static shmem_startup_hook_type prev_shmem_startup_hook; #if PG_VERSION_NUM>=150000 static shmem_request_hook_type prev_shmem_request_hook; #endif +bool lfc_store_prefetch_result; +bool lfc_prewarm_update_ws_estimation; + #define LFC_ENABLED() (lfc_ctl->limit != 0) /* @@ -500,6 +526,17 @@ lfc_init(void) NULL, NULL); + DefineCustomBoolVariable("neon.prewarm_update_ws_estimation", + "Consider prewarmed pages for working set estimation", + NULL, + &lfc_prewarm_update_ws_estimation, + true, + PGC_SUSET, + 0, + NULL, + NULL, + NULL); + DefineCustomIntVariable("neon.max_file_cache_size", "Maximal size of Neon local file cache", NULL, @@ -550,6 +587,32 @@ lfc_init(void) lfc_change_chunk_size, NULL); + DefineCustomIntVariable("neon.file_cache_prewarm_limit", + "Maximal number of prewarmed chunks", + NULL, + &lfc_prewarm_limit, + INT_MAX, /* no limit by default */ + 0, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("neon.file_cache_prewarm_batch", + "Number of pages retrivied by prewarm from page server", + NULL, + &lfc_prewarm_batch, + 64, + 1, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + if (lfc_max_size == 0) return; @@ -563,6 +626,314 @@ lfc_init(void) #endif } +FileCacheState* +lfc_get_state(size_t max_entries) +{ + FileCacheState* fcs = NULL; + + if (lfc_maybe_disabled() || max_entries == 0) /* fast exit if file cache is disabled */ + return NULL; + + LWLockAcquire(lfc_lock, LW_SHARED); + + if (LFC_ENABLED()) + { + dlist_iter iter; + size_t i = 0; + uint8* bitmap; + size_t n_pages = 0; + size_t n_entries = Min(max_entries, lfc_ctl->used - lfc_ctl->pinned); + size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries); + fcs = (FileCacheState*)palloc0(state_size); + SET_VARSIZE(fcs, state_size); + fcs->magic = FILE_CACHE_STATE_MAGIC; + fcs->chunk_size_log = lfc_chunk_size_log; + fcs->n_chunks = n_entries; + bitmap = FILE_CACHE_STATE_BITMAP(fcs); + + dlist_reverse_foreach(iter, &lfc_ctl->lru) + { + FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur); + fcs->chunks[i] = entry->key; + for (int j = 0; j < lfc_blocks_per_chunk; j++) + { + if (GET_STATE(entry, j) != UNAVAILABLE) + { + BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j); + n_pages += 1; + } + } + if (++i == n_entries) + break; + } + Assert(i == n_entries); + fcs->n_pages = n_pages; + Assert(pg_popcount((char*)bitmap, ((n_entries << lfc_chunk_size_log) + 7)/8) == n_pages); + elog(LOG, "LFC: save state of %d chunks %d pages", (int)n_entries, (int)n_pages); + } + + LWLockRelease(lfc_lock); + + return fcs; +} + +/* + * Prewarm LFC cache to the specified state. It uses lfc_prefetch function to load prewarmed page without hoilding shared buffer lock + * and avoid race conditions with other backends. + */ +void +lfc_prewarm(FileCacheState* fcs, uint32 n_workers) +{ + size_t fcs_chunk_size_log; + size_t n_entries; + size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size); + size_t fcs_size; + dsm_segment *seg; + BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS]; + + + if (!lfc_ensure_opened()) + return; + + if (prewarm_batch == 0 || lfc_prewarm_limit == 0 || n_workers == 0) + { + elog(LOG, "LFC: prewarm is disabled"); + return; + } + + if (n_workers > MAX_PREWARM_WORKERS) + { + elog(ERROR, "LFC: Too much prewarm workers, maximum is %d", MAX_PREWARM_WORKERS); + } + + if (fcs == NULL || fcs->n_chunks == 0) + { + elog(LOG, "LFC: nothing to prewarm"); + return; + } + + if (fcs->magic != FILE_CACHE_STATE_MAGIC) + { + elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic); + } + + fcs_size = VARSIZE(fcs); + if (FILE_CACHE_STATE_SIZE(fcs) != fcs_size) + { + elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs)); + } + + fcs_chunk_size_log = fcs->chunk_size_log; + if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK_LOG) + { + elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log); + } + + n_entries = Min(fcs->n_chunks, lfc_prewarm_limit); + Assert(n_entries != 0); + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + /* Do not prewarm more entries than LFC limit */ + if (lfc_ctl->limit <= lfc_ctl->size) + { + elog(LOG, "LFC: skip prewarm because LFC is already filled"); + LWLockRelease(lfc_lock); + return; + } + + if (lfc_ctl->prewarm_active) + { + LWLockRelease(lfc_lock); + elog(ERROR, "LFC: skip prewarm because another prewarm is still active"); + } + lfc_ctl->n_prewarm_entries = n_entries; + lfc_ctl->n_prewarm_workers = n_workers; + lfc_ctl->prewarm_active = true; + lfc_ctl->prewarm_canceled = false; + lfc_ctl->prewarm_batch = prewarm_batch; + memset(lfc_ctl->prewarm_workers, 0, n_workers*sizeof(PrewarmWorkerState)); + + LWLockRelease(lfc_lock); + + /* Calculate total number of pages to be prewarmed */ + lfc_ctl->total_prewarm_pages = fcs->n_pages; + + seg = dsm_create(fcs_size, 0); + memcpy(dsm_segment_address(seg), fcs, fcs_size); + lfc_ctl->prewarm_lfc_state_handle = dsm_segment_handle(seg); + + /* Spawn background workers */ + for (uint32 i = 0; i < n_workers; i++) + { + BackgroundWorker worker = {0}; + + worker.bgw_flags = BGWORKER_SHMEM_ACCESS; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + strcpy(worker.bgw_library_name, "neon"); + strcpy(worker.bgw_function_name, "lfc_prewarm_main"); + snprintf(worker.bgw_name, BGW_MAXLEN, "LFC prewarm worker %d", i+1); + strcpy(worker.bgw_type, "LFC prewarm worker"); + worker.bgw_main_arg = Int32GetDatum(i); + /* must set notify PID to wait for shutdown */ + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &bgw_handle[i])) + { + ereport(LOG, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("LFC: registering dynamic bgworker prewarm failed"), + errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes"))); + n_workers = i; + lfc_ctl->prewarm_canceled = true; + break; + } + } + + for (uint32 i = 0; i < n_workers; i++) + { + while (true) + { + PG_TRY(); + { + BgwHandleStatus status = WaitForBackgroundWorkerShutdown(bgw_handle[i]); + if (status != BGWH_STOPPED && status != BGWH_POSTMASTER_DIED) + { + elog(LOG, "LFC: Unexpected status of prewarm worker termination: %d", status); + } + break; + } + PG_CATCH(); + { + elog(LOG, "LFC: cancel prewarm"); + lfc_ctl->prewarm_canceled = true; + } + PG_END_TRY(); + } + if (!lfc_ctl->prewarm_workers[i].completed) + { + /* Background worker doesn't set completion time: it means that it was abnormally terminated */ + elog(LOG, "LFC: prewarm worker %d failed", i+1); + /* Set completion time to prevent get_prewarm_info from considering this worker as active */ + lfc_ctl->prewarm_workers[i].completed = GetCurrentTimestamp(); + } + } + dsm_detach(seg); + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_ctl->prewarm_active = false; + LWLockRelease(lfc_lock); +} + +void +lfc_prewarm_main(Datum main_arg) +{ + size_t snd_idx = 0, rcv_idx = 0; + size_t n_sent = 0, n_received = 0; + size_t fcs_chunk_size_log; + size_t max_prefetch_pages; + size_t prewarm_batch; + size_t n_workers; + dsm_segment *seg; + FileCacheState* fcs; + uint8* bitmap; + BufferTag tag; + PrewarmWorkerState* ws; + uint32 worker_id = DatumGetInt32(main_arg); + + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + seg = dsm_attach(lfc_ctl->prewarm_lfc_state_handle); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory segment"))); + + fcs = (FileCacheState*) dsm_segment_address(seg); + prewarm_batch = lfc_ctl->prewarm_batch; + fcs_chunk_size_log = fcs->chunk_size_log; + n_workers = lfc_ctl->n_prewarm_workers; + max_prefetch_pages = lfc_ctl->n_prewarm_entries << fcs_chunk_size_log; + ws = &lfc_ctl->prewarm_workers[worker_id]; + bitmap = FILE_CACHE_STATE_BITMAP(fcs); + + /* enable prefetch in LFC */ + lfc_store_prefetch_result = true; + lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC cache is full */ + + elog(LOG, "LFC: worker %d start prewarming", worker_id); + while (!lfc_ctl->prewarm_canceled) + { + if (snd_idx < max_prefetch_pages) + { + if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id) + { + /* If there are multiple workers, split chunks between them */ + snd_idx += 1 << fcs_chunk_size_log; + } + else + { + if (BITMAP_ISSET(bitmap, snd_idx)) + { + tag = fcs->chunks[snd_idx >> fcs_chunk_size_log]; + tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1); + if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum)) + { + (void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL); + n_sent += 1; + } + else + { + ws->skipped_pages += 1; + BITMAP_CLR(bitmap, snd_idx); + } + } + snd_idx += 1; + } + } + if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages) + { + if (n_received == n_sent && snd_idx == max_prefetch_pages) + { + break; + } + if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id) + { + /* Skip chunks processed by other workers */ + rcv_idx += 1 << fcs_chunk_size_log; + continue; + } + + /* Locate next block to prefetch */ + while (!BITMAP_ISSET(bitmap, rcv_idx)) + { + rcv_idx += 1; + } + tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log]; + tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1); + if (communicator_prefetch_receive(tag)) + { + ws->prewarmed_pages += 1; + } + else + { + ws->skipped_pages += 1; + } + rcv_idx += 1; + n_received += 1; + } + } + /* No need to perform prefetch cleanup here because prewarm worker will be terminated and + * connection to PS dropped just after return from this function. + */ + Assert(n_sent == n_received || lfc_ctl->prewarm_canceled); + elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received); + lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp(); +} + + /* * Check if page is present in the cache. * Returns true if page is found in local cache. @@ -1001,8 +1372,11 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash) * If we can't (e.g. because all other slots are being accessed) * then we will remove this entry from the hash and continue * on to the next chunk, as we may not exceed the limit. + * + * While prewarming LFC we do not want to replace existed entries, + * so we just stop prewarm is LFC cache is full. */ - else if (!dlist_is_empty(&lfc_ctl->lru)) + else if (!dlist_is_empty(&lfc_ctl->lru) && !lfc_do_prewarm) { /* Cache overflow: evict least recently used chunk */ FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, @@ -1026,6 +1400,7 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash) /* Can't add this chunk - we don't have the space for it */ hash_search_with_hash_value(lfc_hash, &entry->key, hash, HASH_REMOVE, NULL); + lfc_ctl->prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */ return false; } @@ -1112,9 +1487,11 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); - tag.blockNum = blkno; - addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); - + if (lfc_prewarm_update_ws_estimation) + { + tag.blockNum = blkno; + addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + } if (found) { state = GET_STATE(entry, chunk_offs); diff --git a/pgxn/neon/file_cache.h b/pgxn/neon/file_cache.h index 849558b83d..c7b6b09f72 100644 --- a/pgxn/neon/file_cache.h +++ b/pgxn/neon/file_cache.h @@ -13,6 +13,17 @@ #include "neon_pgversioncompat.h" +typedef struct FileCacheState +{ + int32 vl_len_; /* varlena header (do not touch directly!) */ + uint32 magic; + uint32 n_chunks; + uint32 n_pages; + uint16 chunk_size_log; + BufferTag chunks[FLEXIBLE_ARRAY_MEMBER]; + /* followed by bitmap */ +} FileCacheState; + /* GUCs */ extern bool lfc_store_prefetch_result; @@ -32,7 +43,10 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, extern void lfc_init(void); extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, const void* buffer, XLogRecPtr lsn); +extern FileCacheState* lfc_get_state(size_t max_entries); +extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers); +PGDLLEXPORT void lfc_prewarm_main(Datum main_arg); static inline bool lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, From c1ff7db1874c6b5ff8ee134b944f1c54616ba37b Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Mon, 28 Apr 2025 14:54:26 -0400 Subject: [PATCH 07/15] fix(pageserver): consider tombstones in replorigin (#11752) ## Problem We didn't consider tombstones in replorigin read path in the past. This was fine because tombstones are stored as LSN::Invalid before we universally define what the tombstone is for sparse keyspaces. Now we remove non-inherited keys during detach ancestor and write the universal tombstone "empty image". So we need to consider it across all the read paths. related: https://github.com/neondatabase/neon/pull/11299 ## Summary of changes Empty value gets ignored for replorigin scans. --------- Signed-off-by: Alex Chi Z --- pageserver/src/pgdatadir_mapping.rs | 16 +++++- pageserver/src/tenant.rs | 52 ++++++++++++++++++- .../src/tenant/timeline/detach_ancestor.rs | 2 +- 3 files changed, 67 insertions(+), 3 deletions(-) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 81e548a095..ccb48d8bc1 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1084,8 +1084,17 @@ impl Timeline { let mut result = HashMap::new(); for (k, v) in kv { let v = v?; + if v.is_empty() { + // This is a tombstone -- we can skip it. + // Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of + // the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone, + // we also need to consider that. Such tombstones might be written on the detach ancestor code path to + // avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.) + continue; + } let origin_id = k.field6 as RepOriginId; - let origin_lsn = Lsn::des(&v).unwrap(); + let origin_lsn = Lsn::des(&v) + .with_context(|| format!("decode replorigin value for {}: {v:?}", origin_id))?; if origin_lsn != Lsn::INVALID { result.insert(origin_id, origin_lsn); } @@ -2578,6 +2587,11 @@ impl DatadirModification<'_> { } } + #[cfg(test)] + pub fn put_for_unit_test(&mut self, key: Key, val: Value) { + self.put(key, val); + } + fn put(&mut self, key: Key, val: Value) { if Self::is_data_key(&key) { self.put_data(key.to_compact(), val) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index dcd043c4a1..e59db74479 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5947,7 +5947,9 @@ mod tests { use itertools::Itertools; #[cfg(feature = "testing")] use models::CompactLsnRange; - use pageserver_api::key::{AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX}; + use pageserver_api::key::{ + AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX, repl_origin_key, + }; use pageserver_api::keyspace::KeySpace; #[cfg(feature = "testing")] use pageserver_api::keyspace::KeySpaceRandomAccum; @@ -8183,6 +8185,54 @@ mod tests { assert_eq!(files.get("pg_logical/mappings/test2"), None); } + #[tokio::test] + async fn test_repl_origin_tombstones() { + let harness = TenantHarness::create("test_repl_origin_tombstones") + .await + .unwrap(); + + let (tenant, ctx) = harness.load().await; + let io_concurrency = IoConcurrency::spawn_for_test(); + + let mut lsn = Lsn(0x08); + + let tline: Arc = tenant + .create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + let repl_lsn = Lsn(0x10); + { + lsn += 8; + let mut modification = tline.begin_modification(lsn); + modification.put_for_unit_test(repl_origin_key(2), Value::Image(Bytes::new())); + modification.set_replorigin(1, repl_lsn).await.unwrap(); + modification.commit(&ctx).await.unwrap(); + } + + // we can read everything from the storage + let repl_origins = tline + .get_replorigins(lsn, &ctx, io_concurrency.clone()) + .await + .unwrap(); + assert_eq!(repl_origins.len(), 1); + assert_eq!(repl_origins[&1], lsn); + + { + lsn += 8; + let mut modification = tline.begin_modification(lsn); + modification.put_for_unit_test( + repl_origin_key(3), + Value::Image(Bytes::copy_from_slice(b"cannot_decode_this")), + ); + modification.commit(&ctx).await.unwrap(); + } + let result = tline + .get_replorigins(lsn, &ctx, io_concurrency.clone()) + .await; + assert!(result.is_err()); + } + #[tokio::test] async fn test_metadata_image_creation() -> anyhow::Result<()> { let harness = TenantHarness::create("test_metadata_image_creation").await?; diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 8e95c3a8ff..649b33e294 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -178,7 +178,7 @@ impl Attempt { } } -async fn generate_tombstone_image_layer( +pub(crate) async fn generate_tombstone_image_layer( detached: &Arc, ancestor: &Arc, ancestor_lsn: Lsn, From 9e8ab2ab4f653d1ca96869a1c52d4aec0deb202c Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Mon, 28 Apr 2025 14:13:35 -0500 Subject: [PATCH 08/15] Skip remote extensions WITH_LIB test when sanitizers are enabled (#11758) In order for the test to work when sanitizers are enabled, we would need to compile the dummy Postgres extension with the same sanitizer flags that we compile Postgres and the neon extension with. Doing this work would be a little more than trivial, so skipping is the best option, at least for now. Signed-off-by: Tristan Partin --- test_runner/regress/test_download_extensions.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index 3b6c94a268..d28240c722 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -14,7 +14,7 @@ from fixtures.log_helper import log from fixtures.metrics import parse_metrics from fixtures.paths import BASE_DIR from fixtures.pg_config import PgConfigKey -from fixtures.utils import subprocess_capture +from fixtures.utils import WITH_SANITIZERS, subprocess_capture from werkzeug.wrappers.response import Response if TYPE_CHECKING: @@ -148,6 +148,15 @@ def test_remote_extensions( pg_config: PgConfig, extension: RemoteExtension, ): + if WITH_SANITIZERS and extension is RemoteExtension.WITH_LIB: + pytest.skip( + """ + For this test to work with sanitizers enabled, we would need to + compile the dummy Postgres extension with the same CFLAGS that we + compile Postgres and the neon extension with to link the sanitizers. + """ + ) + # Setup a mock nginx S3 gateway which will return our test extension. (host, port) = httpserver_listen_address extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway" From 3593356c1055f8462011542c561d90b02c84e4eb Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 29 Apr 2025 09:44:28 +0300 Subject: [PATCH 09/15] Prewarm sql api (#11742) ## Problem Continue work on prewarm, see https://github.com/neondatabase/neon/pull/11740 https://github.com/neondatabase/neon/pull/11741 ## Summary of changes Add SQL API to prewarm --------- Co-authored-by: Konstantin Knizhnik --- pgxn/neon/Makefile | 2 + pgxn/neon/file_cache.c | 88 +++++++++++++- pgxn/neon/neon--1.5--1.6.sql | 22 ++++ pgxn/neon/neon--1.6--1.5.sql | 7 ++ test_runner/regress/test_lfc_prewarm.py | 147 ++++++++++++++++++++++++ 5 files changed, 263 insertions(+), 3 deletions(-) create mode 100644 pgxn/neon/neon--1.5--1.6.sql create mode 100644 pgxn/neon/neon--1.6--1.5.sql create mode 100644 test_runner/regress/test_lfc_prewarm.py diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 426b176af9..8bcc6bf924 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -36,6 +36,8 @@ DATA = \ neon--1.2--1.3.sql \ neon--1.3--1.4.sql \ neon--1.4--1.5.sql \ + neon--1.5--1.6.sql \ + neon--1.6--1.5.sql \ neon--1.5--1.4.sql \ neon--1.4--1.3.sql \ neon--1.3--1.2.sql \ diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 924e0055c1..ecc55bb540 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -793,8 +793,10 @@ lfc_prewarm(FileCacheState* fcs, uint32 n_workers) for (uint32 i = 0; i < n_workers; i++) { - while (true) + bool interrupted; + do { + interrupted = false; PG_TRY(); { BgwHandleStatus status = WaitForBackgroundWorkerShutdown(bgw_handle[i]); @@ -802,15 +804,16 @@ lfc_prewarm(FileCacheState* fcs, uint32 n_workers) { elog(LOG, "LFC: Unexpected status of prewarm worker termination: %d", status); } - break; } PG_CATCH(); { elog(LOG, "LFC: cancel prewarm"); lfc_ctl->prewarm_canceled = true; + interrupted = true; } PG_END_TRY(); - } + } while (interrupted); + if (!lfc_ctl->prewarm_workers[i].completed) { /* Background worker doesn't set completion time: it means that it was abnormally terminated */ @@ -2125,3 +2128,82 @@ approximate_working_set_size(PG_FUNCTION_ARGS) } PG_RETURN_NULL(); } + +PG_FUNCTION_INFO_V1(get_local_cache_state); + +Datum +get_local_cache_state(PG_FUNCTION_ARGS) +{ + size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); + FileCacheState* fcs = lfc_get_state(max_entries); + if (fcs != NULL) + PG_RETURN_BYTEA_P((bytea*)fcs); + else + PG_RETURN_NULL(); +} + +PG_FUNCTION_INFO_V1(prewarm_local_cache); + +Datum +prewarm_local_cache(PG_FUNCTION_ARGS) +{ + bytea* state = PG_GETARG_BYTEA_PP(0); + uint32 n_workers = PG_GETARG_INT32(1); + FileCacheState* fcs = (FileCacheState*)state; + + lfc_prewarm(fcs, n_workers); + + PG_RETURN_NULL(); +} + +PG_FUNCTION_INFO_V1(get_prewarm_info); + +Datum +get_prewarm_info(PG_FUNCTION_ARGS) +{ + Datum values[4]; + bool nulls[4]; + TupleDesc tupdesc; + uint32 prewarmed_pages = 0; + uint32 skipped_pages = 0; + uint32 active_workers = 0; + uint32 total_pages; + size_t n_workers; + + if (lfc_size_limit == 0) + PG_RETURN_NULL(); + + LWLockAcquire(lfc_lock, LW_SHARED); + if (!lfc_ctl || lfc_ctl->n_prewarm_workers == 0) + { + LWLockRelease(lfc_lock); + PG_RETURN_NULL(); + } + n_workers = lfc_ctl->n_prewarm_workers; + total_pages = lfc_ctl->total_prewarm_pages; + for (size_t i = 0; i < n_workers; i++) + { + PrewarmWorkerState* ws = &lfc_ctl->prewarm_workers[i]; + prewarmed_pages += ws->prewarmed_pages; + skipped_pages += ws->skipped_pages; + active_workers += ws->completed != 0; + } + LWLockRelease(lfc_lock); + + tupdesc = CreateTemplateTupleDesc(4); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "prewarmed_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "skipped_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "active_workers", INT4OID, -1, 0); + tupdesc = BlessTupleDesc(tupdesc); + + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(total_pages); + values[1] = Int32GetDatum(prewarmed_pages); + values[2] = Int32GetDatum(skipped_pages); + values[3] = Int32GetDatum(active_workers); + + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} + diff --git a/pgxn/neon/neon--1.5--1.6.sql b/pgxn/neon/neon--1.5--1.6.sql new file mode 100644 index 0000000000..c05f0f87aa --- /dev/null +++ b/pgxn/neon/neon--1.5--1.6.sql @@ -0,0 +1,22 @@ +\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit + +CREATE FUNCTION get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer) +RETURNS record +AS 'MODULE_PATHNAME', 'get_prewarm_info' +LANGUAGE C STRICT +PARALLEL SAFE; + +CREATE FUNCTION get_local_cache_state(max_chunks integer default null) +RETURNS bytea +AS 'MODULE_PATHNAME', 'get_local_cache_state' +LANGUAGE C +PARALLEL UNSAFE; + +CREATE FUNCTION prewarm_local_cache(state bytea, n_workers integer default 1) +RETURNS void +AS 'MODULE_PATHNAME', 'prewarm_local_cache' +LANGUAGE C STRICT +PARALLEL UNSAFE; + + + diff --git a/pgxn/neon/neon--1.6--1.5.sql b/pgxn/neon/neon--1.6--1.5.sql new file mode 100644 index 0000000000..57512980f5 --- /dev/null +++ b/pgxn/neon/neon--1.6--1.5.sql @@ -0,0 +1,7 @@ +DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer); + +DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer); + +DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1); + + diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py new file mode 100644 index 0000000000..dd0ae1921d --- /dev/null +++ b/test_runner/regress/test_lfc_prewarm.py @@ -0,0 +1,147 @@ +import random +import threading +import time + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv +from fixtures.utils import USE_LFC + + +def check_pinned_entries(cur): + # some LFC buffer can be temporary locked by autovacuum or background writer + for _ in range(10): + cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_chunks_pinned'") + n_pinned = cur.fetchall()[0][0] + if n_pinned == 0: + break + time.sleep(1) + assert n_pinned == 0 + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prewarm(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 1000000 + + endpoint = env.endpoints.create_start( + branch_name="main", + config_lines=[ + "autovacuum = off", + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("create extension neon version '1.6'") + cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))") + cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))") + cur.execute("select get_local_cache_state()") + lfc_state = cur.fetchall()[0][0] + + endpoint.stop() + endpoint.start() + + conn = endpoint.connect() + cur = conn.cursor() + time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version + cur.execute("alter extension neon update to '1.6'") + cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + + cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'") + lfc_used_pages = cur.fetchall()[0][0] + log.info(f"Used LFC size: {lfc_used_pages}") + cur.execute("select * from get_prewarm_info()") + prewarm_info = cur.fetchall()[0] + log.info(f"Prewarm info: {prewarm_info}") + log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%") + + assert lfc_used_pages > 10000 + assert ( + prewarm_info[0] > 0 + and prewarm_info[1] > 0 + and prewarm_info[0] == prewarm_info[1] + prewarm_info[2] + ) + + cur.execute("select sum(pk) from t") + assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 + + check_pinned_entries(cur) + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 10000 + n_threads = 4 + + endpoint = env.endpoints.create_start( + branch_name="main", + config_lines=[ + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000000", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("create extension neon version '1.6'") + cur.execute( + "create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 1000)) with (fillfactor=10)" + ) + cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))") + cur.execute("select get_local_cache_state()") + lfc_state = cur.fetchall()[0][0] + + running = True + + def workload(): + conn = endpoint.connect() + cur = conn.cursor() + n_transfers = 0 + while running: + src = random.randint(1, n_records) + dst = random.randint(1, n_records) + cur.execute("update accounts set balance=balance-100 where id=%s", (src,)) + cur.execute("update accounts set balance=balance+100 where id=%s", (dst,)) + n_transfers += 1 + log.info(f"Number of transfers: {n_transfers}") + + def prewarm(): + conn = endpoint.connect() + cur = conn.cursor() + n_prewarms = 0 + while running: + cur.execute("alter system set neon.file_cache_size_limit='1MB'") + cur.execute("select pg_reload_conf()") + cur.execute("alter system set neon.file_cache_size_limit='1GB'") + cur.execute("select pg_reload_conf()") + cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + n_prewarms += 1 + log.info(f"Number of prewarms: {n_prewarms}") + + workload_threads = [] + for _ in range(n_threads): + t = threading.Thread(target=workload) + workload_threads.append(t) + t.start() + + prewarm_thread = threading.Thread(target=prewarm) + prewarm_thread.start() + + time.sleep(20) + + running = False + for t in workload_threads: + t.join() + prewarm_thread.join() + + cur.execute("select sum(balance) from accounts") + total_balance = cur.fetchall()[0][0] + assert total_balance == 0 + + check_pinned_entries(cur) From d15f2ff57a3a9aefe0a10ea034d977e83b90504f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JC=20Gr=C3=BCnhage?= Date: Tue, 29 Apr 2025 10:56:44 +0200 Subject: [PATCH 10/15] fix(lint-release-pr): adjust lint and action to match (#11766) ## Problem The `lint-release-pr` workflow run for https://github.com/neondatabase/neon/pull/11763 failed, because the new action did not match the lint. ## Summary of changes Include time in expected merge message regex. --- .github/scripts/lint-release-pr.sh | 2 +- .github/workflows/release.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/scripts/lint-release-pr.sh b/.github/scripts/lint-release-pr.sh index 6dc5b99f0e..d3badf9562 100755 --- a/.github/scripts/lint-release-pr.sh +++ b/.github/scripts/lint-release-pr.sh @@ -41,7 +41,7 @@ echo "Merge base of ${MAIN_BRANCH} and ${RELEASE_BRANCH}: ${MERGE_BASE}" LAST_COMMIT=$(git rev-parse HEAD) MERGE_COMMIT_MESSAGE=$(git log -1 --format=%s "${LAST_COMMIT}") -EXPECTED_MESSAGE_REGEX="^$COMPONENT release [0-9]{4}-[0-9]{2}-[0-9]{2}$" +EXPECTED_MESSAGE_REGEX="^$COMPONENT release [0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2} UTC$" if ! [[ "${MERGE_COMMIT_MESSAGE}" =~ ${EXPECTED_MESSAGE_REGEX} ]]; then report_error "Merge commit message does not match expected pattern: ' release YYYY-MM-DD' diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4b19d6aa3f..0f97cf7c87 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -60,7 +60,7 @@ jobs: git config user.email "41898282+github-actions[bot]@users.noreply.github.com" - name: Create release PR - uses: neondatabase/dev-actions/release-pr@02b41460646b70d12dd33e5f56ebc5af2384c993 + uses: neondatabase/dev-actions/release-pr@290dec821d86fa8a93f019e8c69720f5865b5677 with: component: ${{ inputs.component }} cherry-pick: ${{ inputs.cherry-pick }} From 7f8b1d79c015ba1158883c689d8ee230426194da Mon Sep 17 00:00:00 2001 From: Busra Kugler Date: Tue, 29 Apr 2025 11:02:01 +0200 Subject: [PATCH 11/15] Replace dorny/paths-filter with step-security maintained version (#11663) ## Problem Our CI/CD security tool StepSecurity maintains safer forks of popular GitHub Actions with low security scores. We're replacing dorny/paths-filter with the maintained step-security/paths-filter version to reduce risk of supply chain breaches and potential CVEs. ## Summary of changes replace ```uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 ``` with ```uses: step-security/paths-filter@v3``` This PR will fix: neondatabase/cloud#26141 --- .github/workflows/build_and_test.yml | 2 +- .github/workflows/neon_extra_builds.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 1791cddacc..6c025ad2a9 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -69,7 +69,7 @@ jobs: submodules: true - name: Check for file changes - uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 # v3.0.2 + uses: step-security/paths-filter@v3 id: files-changed with: token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/neon_extra_builds.yml b/.github/workflows/neon_extra_builds.yml index 79467c8f95..9c504eb5bf 100644 --- a/.github/workflows/neon_extra_builds.yml +++ b/.github/workflows/neon_extra_builds.yml @@ -53,7 +53,7 @@ jobs: submodules: true - name: Check for Postgres changes - uses: dorny/paths-filter@1441771bbfdd59dcd748680ee64ebd8faab1a242 #v3 + uses: step-security/paths-filter@v3 id: files_changed with: token: ${{ github.token }} From 498d852bde2c1b20761f5ca588bcc64c86fce282 Mon Sep 17 00:00:00 2001 From: a-masterov <72613290+a-masterov@users.noreply.github.com> Date: Tue, 29 Apr 2025 11:12:14 +0200 Subject: [PATCH 12/15] Fix the empty region if run on schedule (#11764) ## Problem When the workflow ran on a schedule, the `region_id` input was not set. As a result, an empty region value was used, which caused errors during execution. ## Summary of Changes - Added fallback logic to set a default region (`aws-us-east-2`) when `region_id` is not provided. - Ensures the workflow works correctly both when triggered manually (`workflow_dispatch`) and on schedule (`cron`). --- .github/workflows/cloud-extensions.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cloud-extensions.yml b/.github/workflows/cloud-extensions.yml index 7d60469f92..4114f0f9b4 100644 --- a/.github/workflows/cloud-extensions.yml +++ b/.github/workflows/cloud-extensions.yml @@ -68,7 +68,7 @@ jobs: id: create-neon-project uses: ./.github/actions/neon-project-create with: - region_id: ${{ inputs.region_id }} + region_id: ${{ inputs.region_id || 'aws-us-east-2' }} postgres_version: ${{ matrix.pg-version }} project_settings: ${{ steps.project-settings.outputs.settings }} # We need these settings to get the expected output results. From b3db7f66ac39f072502b41fd6723e7753a0c37e6 Mon Sep 17 00:00:00 2001 From: Ivan Efremov Date: Tue, 29 Apr 2025 14:49:16 +0300 Subject: [PATCH 13/15] fix(compute): Change the local_proxy log level (#11770) Related to the INC-496 --- compute/vm-image-spec-bookworm.yaml | 2 +- compute/vm-image-spec-bullseye.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/compute/vm-image-spec-bookworm.yaml b/compute/vm-image-spec-bookworm.yaml index ec24d73242..057099994a 100644 --- a/compute/vm-image-spec-bookworm.yaml +++ b/compute/vm-image-spec-bookworm.yaml @@ -22,7 +22,7 @@ commands: - name: local_proxy user: postgres sysvInitAction: respawn - shell: 'RUST_LOG="info,proxy::serverless::sql_over_http=warn" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432' + shell: 'RUST_LOG="error" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432' - name: postgres-exporter user: nobody sysvInitAction: respawn diff --git a/compute/vm-image-spec-bullseye.yaml b/compute/vm-image-spec-bullseye.yaml index b40bdecebc..d048e20b2e 100644 --- a/compute/vm-image-spec-bullseye.yaml +++ b/compute/vm-image-spec-bullseye.yaml @@ -22,7 +22,7 @@ commands: - name: local_proxy user: postgres sysvInitAction: respawn - shell: 'RUST_LOG="info,proxy::serverless::sql_over_http=warn" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432' + shell: 'RUST_LOG="error" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432' - name: postgres-exporter user: nobody sysvInitAction: respawn From 0b3592921179fee85ab28725a6601213d195ba53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 29 Apr 2025 14:46:15 +0200 Subject: [PATCH 14/15] Make SafekeeperReconciler parallel via semaphore (#11757) Right now we only support running one reconciliation per safekeeper. This is of course usually way below of what a safekeeper can do. Therefore, introduce a semaphore and spawn the tasks asynchronously as they come in. Part of #11670 --- storage_controller/src/main.rs | 10 +++- storage_controller/src/service.rs | 4 ++ .../src/service/safekeeper_reconciler.rs | 59 +++++++++++++------ 3 files changed, 55 insertions(+), 18 deletions(-) diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 71dde9e126..2eea2f9d10 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -19,7 +19,8 @@ use storage_controller::service::chaos_injector::ChaosInjector; use storage_controller::service::{ Config, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, - PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT, Service, + PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT, + SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT, Service, }; use tokio::signal::unix::SignalKind; use tokio_util::sync::CancellationToken; @@ -132,6 +133,10 @@ struct Cli { #[arg(long)] priority_reconciler_concurrency: Option, + /// Maximum number of safekeeper reconciliations that may run in parallel (per safekeeper) + #[arg(long)] + safekeeper_reconciler_concurrency: Option, + /// Tenant API rate limit, as requests per second per tenant. #[arg(long, default_value = "10")] tenant_rate_limit: NonZeroU32, @@ -403,6 +408,9 @@ async fn async_main() -> anyhow::Result<()> { priority_reconciler_concurrency: args .priority_reconciler_concurrency .unwrap_or(PRIORITY_RECONCILER_CONCURRENCY_DEFAULT), + safekeeper_reconciler_concurrency: args + .safekeeper_reconciler_concurrency + .unwrap_or(SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT), tenant_rate_limit: args.tenant_rate_limit, split_threshold: args.split_threshold, max_split_shards: args.max_split_shards, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 0f71a87f13..50ce559cc0 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -194,6 +194,7 @@ pub(crate) enum LeadershipStatus { pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128; pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256; +pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32; // Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately. // This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly @@ -382,6 +383,9 @@ pub struct Config { /// How many high-priority Reconcilers may be spawned concurrently pub priority_reconciler_concurrency: usize, + /// How many safekeeper reconciles may happen concurrently (per safekeeper) + pub safekeeper_reconciler_concurrency: usize, + /// How many API requests per second to allow per tenant, across all /// tenant-scoped API endpoints. Further API requests queue until ready. pub tenant_rate_limit: NonZeroU32, diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index b15772a36c..74308cabff 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -3,7 +3,10 @@ use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; use clashmap::{ClashMap, Entry}; use safekeeper_api::models::PullTimelineRequest; use safekeeper_client::mgmt_api; -use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::sync::{ + Semaphore, + mpsc::{self, UnboundedReceiver, UnboundedSender}, +}; use tokio_util::sync::CancellationToken; use tracing::Instrument; use utils::{ @@ -206,18 +209,27 @@ impl ReconcilerHandle { } pub(crate) struct SafekeeperReconciler { - service: Arc, + inner: SafekeeperReconcilerInner, + concurrency_limiter: Arc, rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>, cancel: CancellationToken, } +/// Thin wrapper over `Service` to not clutter its inherent functions +#[derive(Clone)] +struct SafekeeperReconcilerInner { + service: Arc, +} + impl SafekeeperReconciler { fn spawn(cancel: CancellationToken, service: Arc) -> ReconcilerHandle { // We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking. let (tx, rx) = mpsc::unbounded_channel(); + let concurrency = service.config.safekeeper_reconciler_concurrency; let mut reconciler = SafekeeperReconciler { - service, + inner: SafekeeperReconcilerInner { service }, rx, + concurrency_limiter: Arc::new(Semaphore::new(concurrency)), cancel: cancel.clone(), }; let handle = ReconcilerHandle { @@ -230,31 +242,44 @@ impl SafekeeperReconciler { } async fn run(&mut self) { loop { - // TODO add parallelism with semaphore here let req = tokio::select! { req = self.rx.recv() => req, _ = self.cancel.cancelled() => break, }; let Some((req, req_cancel)) = req else { break }; + + let permit_res = tokio::select! { + req = self.concurrency_limiter.clone().acquire_owned() => req, + _ = self.cancel.cancelled() => break, + }; + let Ok(_permit) = permit_res else { return }; + + let inner = self.inner.clone(); if req_cancel.is_cancelled() { continue; } - let kind = req.kind; - let tenant_id = req.tenant_id; - let timeline_id = req.timeline_id; - let node_id = req.safekeeper.skp.id; - self.reconcile_one(req, req_cancel) - .instrument(tracing::info_span!( - "reconcile_one", - ?kind, - %tenant_id, - ?timeline_id, - %node_id, - )) - .await; + tokio::task::spawn(async move { + let kind = req.kind; + let tenant_id = req.tenant_id; + let timeline_id = req.timeline_id; + let node_id = req.safekeeper.skp.id; + inner + .reconcile_one(req, req_cancel) + .instrument(tracing::info_span!( + "reconcile_one", + ?kind, + %tenant_id, + ?timeline_id, + %node_id, + )) + .await; + }); } } +} + +impl SafekeeperReconcilerInner { async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) { let req_host = req.safekeeper.skp.host.clone(); match req.kind { From 09247de8d508be4d65d74d1879ccdfa930ec4794 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Tue, 29 Apr 2025 13:11:24 +0000 Subject: [PATCH 15/15] proxy: Enable JSON logging by default (#11772) This does not affect local_proxy. --- proxy/README.md | 4 ++-- proxy/src/logging.rs | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/proxy/README.md b/proxy/README.md index 1156bfd352..583db36f28 100644 --- a/proxy/README.md +++ b/proxy/README.md @@ -32,7 +32,7 @@ To play with it locally one may start proxy over a local postgres installation (see end of this page on how to generate certs with openssl): ``` -./target/debug/proxy -c server.crt -k server.key --auth-backend=postgres --auth-endpoint=postgres://stas@127.0.0.1:5432/stas --wss 0.0.0.0:4444 +LOGFMT=text ./target/debug/proxy -c server.crt -k server.key --auth-backend=postgres --auth-endpoint=postgres://stas@127.0.0.1:5432/stas --wss 0.0.0.0:4444 ``` If both postgres and proxy are running you may send a SQL query: @@ -130,7 +130,7 @@ openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key Then we need to build proxy with 'testing' feature and run, e.g.: ```sh -RUST_LOG=proxy cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' -c server.crt -k server.key +RUST_LOG=proxy LOGFMT=text cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' -c server.crt -k server.key ``` Now from client you can start a new session: diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index b83b03bc4f..efa3c0b514 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -132,11 +132,10 @@ impl Drop for LoggingGuard { } } -// TODO: make JSON the default #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)] enum LogFormat { + Text, #[default] - Text = 1, Json, }