diff --git a/.github/scripts/lint-release-pr.sh b/.github/scripts/lint-release-pr.sh index 6dc5b99f0e..d3badf9562 100755 --- a/.github/scripts/lint-release-pr.sh +++ b/.github/scripts/lint-release-pr.sh @@ -41,7 +41,7 @@ echo "Merge base of ${MAIN_BRANCH} and ${RELEASE_BRANCH}: ${MERGE_BASE}" LAST_COMMIT=$(git rev-parse HEAD) MERGE_COMMIT_MESSAGE=$(git log -1 --format=%s "${LAST_COMMIT}") -EXPECTED_MESSAGE_REGEX="^$COMPONENT release [0-9]{4}-[0-9]{2}-[0-9]{2}$" +EXPECTED_MESSAGE_REGEX="^$COMPONENT release [0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2} UTC$" if ! [[ "${MERGE_COMMIT_MESSAGE}" =~ ${EXPECTED_MESSAGE_REGEX} ]]; then report_error "Merge commit message does not match expected pattern: ' release YYYY-MM-DD' diff --git a/.github/workflows/_create-release-pr.yml b/.github/workflows/_create-release-pr.yml deleted file mode 100644 index f96ed7d69b..0000000000 --- a/.github/workflows/_create-release-pr.yml +++ /dev/null @@ -1,103 +0,0 @@ -name: Create Release PR - -on: - workflow_call: - inputs: - component-name: - description: 'Component name' - required: true - type: string - source-branch: - description: 'Source branch' - required: true - type: string - secrets: - ci-access-token: - description: 'CI access token' - required: true - -defaults: - run: - shell: bash -euo pipefail {0} - -permissions: - contents: read - -jobs: - create-release-branch: - runs-on: ubuntu-22.04 - - permissions: - contents: write # for `git push` - - steps: - - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0 - with: - egress-policy: audit - - - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - with: - ref: ${{ inputs.source-branch }} - fetch-depth: 0 - - - name: Set variables - id: vars - env: - COMPONENT_NAME: ${{ inputs.component-name }} - RELEASE_BRANCH: >- - ${{ - false - || inputs.component-name == 'Storage' && 'release' - || inputs.component-name == 'Proxy' && 'release-proxy' - || inputs.component-name == 'Compute' && 'release-compute' - }} - run: | - now_date=$(date -u +'%Y-%m-%d') - now_time=$(date -u +'%H-%M-%Z') - { - echo "title=${COMPONENT_NAME} release ${now_date}" - echo "rc-branch=rc/${RELEASE_BRANCH}/${now_date}_${now_time}" - echo "release-branch=${RELEASE_BRANCH}" - } | tee -a ${GITHUB_OUTPUT} - - - name: Configure git - run: | - git config user.name "github-actions[bot]" - git config user.email "41898282+github-actions[bot]@users.noreply.github.com" - - - name: Create RC branch - env: - RELEASE_BRANCH: ${{ steps.vars.outputs.release-branch }} - RC_BRANCH: ${{ steps.vars.outputs.rc-branch }} - TITLE: ${{ steps.vars.outputs.title }} - run: | - git switch -c "${RC_BRANCH}" - - # Manually create a merge commit on the current branch, keeping the - # tree and setting the parents to the current HEAD and the HEAD of the - # release branch. This commit is what we'll fast-forward the release - # branch to when merging the release branch. - # For details on why, look at - # https://docs.neon.build/overview/repositories/neon.html#background-on-commit-history-of-release-prs - current_tree=$(git rev-parse 'HEAD^{tree}') - release_head=$(git rev-parse "origin/${RELEASE_BRANCH}") - current_head=$(git rev-parse HEAD) - merge_commit=$(git commit-tree -p "${current_head}" -p "${release_head}" -m "${TITLE}" "${current_tree}") - - # Fast-forward the current branch to the newly created merge_commit - git merge --ff-only ${merge_commit} - - git push origin "${RC_BRANCH}" - - - name: Create a PR into ${{ steps.vars.outputs.release-branch }} - env: - GH_TOKEN: ${{ secrets.ci-access-token }} - RC_BRANCH: ${{ steps.vars.outputs.rc-branch }} - RELEASE_BRANCH: ${{ steps.vars.outputs.release-branch }} - TITLE: ${{ steps.vars.outputs.title }} - run: | - gh pr create --title "${TITLE}" \ - --body "" \ - --head "${RC_BRANCH}" \ - --base "${RELEASE_BRANCH}" diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 1791cddacc..6c025ad2a9 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -69,7 +69,7 @@ jobs: submodules: true - name: Check for file changes - uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 # v3.0.2 + uses: step-security/paths-filter@v3 id: files-changed with: token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/cloud-extensions.yml b/.github/workflows/cloud-extensions.yml index 7d60469f92..4114f0f9b4 100644 --- a/.github/workflows/cloud-extensions.yml +++ b/.github/workflows/cloud-extensions.yml @@ -68,7 +68,7 @@ jobs: id: create-neon-project uses: ./.github/actions/neon-project-create with: - region_id: ${{ inputs.region_id }} + region_id: ${{ inputs.region_id || 'aws-us-east-2' }} postgres_version: ${{ matrix.pg-version }} project_settings: ${{ steps.project-settings.outputs.settings }} # We need these settings to get the expected output results. diff --git a/.github/workflows/neon_extra_builds.yml b/.github/workflows/neon_extra_builds.yml index 79467c8f95..9c504eb5bf 100644 --- a/.github/workflows/neon_extra_builds.yml +++ b/.github/workflows/neon_extra_builds.yml @@ -53,7 +53,7 @@ jobs: submodules: true - name: Check for Postgres changes - uses: dorny/paths-filter@1441771bbfdd59dcd748680ee64ebd8faab1a242 #v3 + uses: step-security/paths-filter@v3 id: files_changed with: token: ${{ github.token }} diff --git a/.github/workflows/release-compute.yml b/.github/workflows/release-compute.yml new file mode 100644 index 0000000000..f123dd2f44 --- /dev/null +++ b/.github/workflows/release-compute.yml @@ -0,0 +1,12 @@ +name: Create compute release PR + +on: + schedule: + - cron: '0 7 * * FRI' + +jobs: + create-release-pr: + uses: ./.github/workflows/release.yml + with: + component: compute + secrets: inherit diff --git a/.github/workflows/release-proxy.yml b/.github/workflows/release-proxy.yml new file mode 100644 index 0000000000..d9055984d2 --- /dev/null +++ b/.github/workflows/release-proxy.yml @@ -0,0 +1,12 @@ +name: Create proxy release PR + +on: + schedule: + - cron: '0 6 * * TUE' + +jobs: + create-release-pr: + uses: ./.github/workflows/release.yml + with: + component: proxy + secrets: inherit diff --git a/.github/workflows/release-storage.yml b/.github/workflows/release-storage.yml new file mode 100644 index 0000000000..91f02fddda --- /dev/null +++ b/.github/workflows/release-storage.yml @@ -0,0 +1,12 @@ +name: Create storage release PR + +on: + schedule: + - cron: '0 6 * * FRI' + +jobs: + create-release-pr: + uses: ./.github/workflows/release.yml + with: + component: storage + secrets: inherit diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4068eafb95..0f97cf7c87 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,25 +1,34 @@ -name: Create Release Branch +name: Create release PR on: - schedule: - # It should be kept in sync with if-condition in jobs - - cron: '0 6 * * TUE' # Proxy release - - cron: '0 6 * * FRI' # Storage release - - cron: '0 7 * * FRI' # Compute release workflow_dispatch: inputs: - create-storage-release-branch: - type: boolean - description: 'Create Storage release PR' + component: + description: "Component to release" + required: true + type: choice + options: + - compute + - proxy + - storage + cherry-pick: + description: "Commits to cherry-pick (space separated, makes this a hotfix based on previous release)" required: false - create-proxy-release-branch: - type: boolean - description: 'Create Proxy release PR' - required: false - create-compute-release-branch: - type: boolean - description: 'Create Compute release PR' + type: string + default: '' + + workflow_call: + inputs: + component: + description: "Component to release" + required: true + type: string + cherry-pick: + description: "Commits to cherry-pick (space separated, makes this a hotfix based on previous release)" required: false + type: string + default: '' + # No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job. permissions: {} @@ -29,41 +38,31 @@ defaults: shell: bash -euo pipefail {0} jobs: - create-storage-release-branch: - if: ${{ github.event.schedule == '0 6 * * FRI' || inputs.create-storage-release-branch }} + create-release-pr: + runs-on: ubuntu-22.04 permissions: contents: write - uses: ./.github/workflows/_create-release-pr.yml - with: - component-name: 'Storage' - source-branch: ${{ github.ref_name }} - secrets: - ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }} + steps: + - name: Harden the runner (Audit all outbound calls) + uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0 + with: + egress-policy: audit - create-proxy-release-branch: - if: ${{ github.event.schedule == '0 6 * * TUE' || inputs.create-proxy-release-branch }} + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + fetch-depth: 0 - permissions: - contents: write + - name: Configure git + run: | + git config user.name "github-actions[bot]" + git config user.email "41898282+github-actions[bot]@users.noreply.github.com" - uses: ./.github/workflows/_create-release-pr.yml - with: - component-name: 'Proxy' - source-branch: ${{ github.ref_name }} - secrets: - ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }} - - create-compute-release-branch: - if: ${{ github.event.schedule == '0 7 * * FRI' || inputs.create-compute-release-branch }} - - permissions: - contents: write - - uses: ./.github/workflows/_create-release-pr.yml - with: - component-name: 'Compute' - source-branch: ${{ github.ref_name }} - secrets: - ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }} + - name: Create release PR + uses: neondatabase/dev-actions/release-pr@290dec821d86fa8a93f019e8c69720f5865b5677 + with: + component: ${{ inputs.component }} + cherry-pick: ${{ inputs.cherry-pick }} + env: + GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }} diff --git a/compute/vm-image-spec-bookworm.yaml b/compute/vm-image-spec-bookworm.yaml index ec24d73242..057099994a 100644 --- a/compute/vm-image-spec-bookworm.yaml +++ b/compute/vm-image-spec-bookworm.yaml @@ -22,7 +22,7 @@ commands: - name: local_proxy user: postgres sysvInitAction: respawn - shell: 'RUST_LOG="info,proxy::serverless::sql_over_http=warn" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432' + shell: 'RUST_LOG="error" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432' - name: postgres-exporter user: nobody sysvInitAction: respawn diff --git a/compute/vm-image-spec-bullseye.yaml b/compute/vm-image-spec-bullseye.yaml index b40bdecebc..d048e20b2e 100644 --- a/compute/vm-image-spec-bullseye.yaml +++ b/compute/vm-image-spec-bullseye.yaml @@ -22,7 +22,7 @@ commands: - name: local_proxy user: postgres sysvInitAction: respawn - shell: 'RUST_LOG="info,proxy::serverless::sql_over_http=warn" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432' + shell: 'RUST_LOG="error" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432' - name: postgres-exporter user: nobody sysvInitAction: respawn diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 948e3c8c93..eec2c997e6 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -112,7 +112,7 @@ impl SafekeeperNode { } /// Initializes a safekeeper node by creating all necessary files, - /// e.g. SSL certificates. + /// e.g. SSL certificates and JWT token file. pub fn initialize(&self) -> anyhow::Result<()> { if self.env.generate_local_ssl_certs { self.env.generate_ssl_cert( @@ -120,6 +120,17 @@ impl SafekeeperNode { &self.datadir_path().join("server.key"), )?; } + + // Generate a token file for authentication with other safekeepers + if self.conf.auth_enabled { + let token = self + .env + .generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?; + + let token_path = self.datadir_path().join("peer_jwt_token"); + std::fs::write(token_path, token)?; + } + Ok(()) } @@ -218,14 +229,26 @@ impl SafekeeperNode { args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap())); } + if self.conf.auth_enabled { + let token_path = self.datadir_path().join("peer_jwt_token"); + let token_path_str = token_path + .to_str() + .with_context(|| { + format!("Token path {token_path:?} cannot be represented as a unicode string") + })? + .to_owned(); + args.extend(["--auth-token-path".to_owned(), token_path_str]); + } + args.extend_from_slice(extra_opts); + let env_variables = Vec::new(); background_process::start_process( &format!("safekeeper-{id}"), &datadir, &self.env.safekeeper_bin(), &args, - self.safekeeper_env_variables()?, + env_variables, background_process::InitialPidFile::Expect(self.pid_file()), retry_timeout, || async { @@ -239,18 +262,6 @@ impl SafekeeperNode { .await } - fn safekeeper_env_variables(&self) -> anyhow::Result> { - // Generate a token to connect from safekeeper to peers - if self.conf.auth_enabled { - let token = self - .env - .generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?; - Ok(vec![("SAFEKEEPER_AUTH_TOKEN".to_owned(), token)]) - } else { - Ok(Vec::new()) - } - } - /// /// Stop the server. /// diff --git a/docker-compose/pageserver_config/pageserver.toml b/docker-compose/pageserver_config/pageserver.toml index 76935453b6..7d603b6c65 100644 --- a/docker-compose/pageserver_config/pageserver.toml +++ b/docker-compose/pageserver_config/pageserver.toml @@ -3,3 +3,5 @@ pg_distrib_dir='/usr/local/' listen_pg_addr='0.0.0.0:6400' listen_http_addr='0.0.0.0:9898' remote_storage={ endpoint='http://minio:9000', bucket_name='neon', bucket_region='eu-north-1', prefix_in_bucket='/pageserver' } +control_plane_api='http://0.0.0.0:6666' # No storage controller in docker compose, specify a junk address +control_plane_emergency_mode=true diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 6cfaec955b..4c2572a577 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -504,7 +504,7 @@ fn start_pageserver( // Set up deletion queue let (deletion_queue, deletion_workers) = DeletionQueue::new( remote_storage.clone(), - StorageControllerUpcallClient::new(conf, &shutdown_pageserver)?, + StorageControllerUpcallClient::new(conf, &shutdown_pageserver), conf, ); deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle()); diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 95143e58b7..ded2805602 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -150,7 +150,7 @@ pub struct PageServerConf { /// not terrible. pub background_task_maximum_delay: Duration, - pub control_plane_api: Option, + pub control_plane_api: Url, /// JWT token for use with the control plane API. pub control_plane_api_token: Option, @@ -438,7 +438,8 @@ impl PageServerConf { test_remote_failures, ondemand_download_behavior_treat_error_as_warn, background_task_maximum_delay, - control_plane_api, + control_plane_api: control_plane_api + .ok_or_else(|| anyhow::anyhow!("`control_plane_api` must be set"))?, control_plane_emergency_mode, heatmap_upload_concurrency, secondary_download_concurrency, @@ -573,6 +574,7 @@ impl PageServerConf { background_task_maximum_delay: Duration::ZERO, load_previous_heatmap: Some(true), generate_unarchival_heatmap: Some(true), + control_plane_api: Some(Url::parse("http://localhost:6666").unwrap()), ..Default::default() }; PageServerConf::parse_and_validate(NodeId(0), config_toml, &repo_dir).unwrap() @@ -641,9 +643,12 @@ mod tests { use super::PageServerConf; #[test] - fn test_empty_config_toml_is_valid() { - // we use Default impl of everything in this situation + fn test_minimal_config_toml_is_valid() { + // The minimal valid config for running a pageserver: + // - control_plane_api is mandatory, as pageservers cannot run in isolation + // - we use Default impl of everything else in this situation let input = r#" + control_plane_api = "http://localhost:6666" "#; let config_toml = toml_edit::de::from_str::(input) .expect("empty config is valid"); diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index 59c94f1549..468e5463b0 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -58,14 +58,8 @@ pub trait StorageControllerUpcallApi { impl StorageControllerUpcallClient { /// A None return value indicates that the input `conf` object does not have control /// plane API enabled. - pub fn new( - conf: &'static PageServerConf, - cancel: &CancellationToken, - ) -> Result, reqwest::Error> { - let mut url = match conf.control_plane_api.as_ref() { - Some(u) => u.clone(), - None => return Ok(None), - }; + pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Self { + let mut url = conf.control_plane_api.clone(); if let Ok(mut segs) = url.path_segments_mut() { // This ensures that `url` ends with a slash if it doesn't already. @@ -85,15 +79,17 @@ impl StorageControllerUpcallClient { } for cert in &conf.ssl_ca_certs { - client = client.add_root_certificate(Certificate::from_der(cert.contents())?); + client = client.add_root_certificate( + Certificate::from_der(cert.contents()).expect("Invalid certificate in config"), + ); } - Ok(Some(Self { - http_client: client.build()?, + Self { + http_client: client.build().expect("Failed to construct HTTP client"), base_url: url, node_id: conf.id, cancel: cancel.clone(), - })) + } } #[tracing::instrument(skip_all)] diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 6dd7d741c1..4d62bc4ab5 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -585,7 +585,7 @@ impl DeletionQueue { /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice. pub fn new( remote_storage: GenericRemoteStorage, - controller_upcall_client: Option, + controller_upcall_client: C, conf: &'static PageServerConf, ) -> (Self, DeletionQueueWorkers) where @@ -701,7 +701,7 @@ mod test { async fn restart(&mut self) { let (deletion_queue, workers) = DeletionQueue::new( self.storage.clone(), - Some(self.mock_control_plane.clone()), + self.mock_control_plane.clone(), self.harness.conf, ); @@ -821,11 +821,8 @@ mod test { let mock_control_plane = MockStorageController::new(); - let (deletion_queue, worker) = DeletionQueue::new( - storage.clone(), - Some(mock_control_plane.clone()), - harness.conf, - ); + let (deletion_queue, worker) = + DeletionQueue::new(storage.clone(), mock_control_plane.clone(), harness.conf); let worker_join = worker.spawn_with(&tokio::runtime::Handle::current()); diff --git a/pageserver/src/deletion_queue/validator.rs b/pageserver/src/deletion_queue/validator.rs index 4e775f15eb..363b1427f5 100644 --- a/pageserver/src/deletion_queue/validator.rs +++ b/pageserver/src/deletion_queue/validator.rs @@ -53,7 +53,7 @@ where tx: tokio::sync::mpsc::Sender, // Client for calling into control plane API for validation of deletes - controller_upcall_client: Option, + controller_upcall_client: C, // DeletionLists which are waiting generation validation. Not safe to // execute until [`validate`] has processed them. @@ -86,7 +86,7 @@ where conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, tx: tokio::sync::mpsc::Sender, - controller_upcall_client: Option, + controller_upcall_client: C, lsn_table: Arc>, cancel: CancellationToken, ) -> Self { @@ -137,20 +137,16 @@ where return Ok(()); } - let tenants_valid = if let Some(controller_upcall_client) = &self.controller_upcall_client { - match controller_upcall_client - .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect()) - .await - { - Ok(tenants) => tenants, - Err(RetryForeverError::ShuttingDown) => { - // The only way a validation call returns an error is when the cancellation token fires - return Err(DeletionQueueError::ShuttingDown); - } + let tenants_valid = match self + .controller_upcall_client + .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect()) + .await + { + Ok(tenants) => tenants, + Err(RetryForeverError::ShuttingDown) => { + // The only way a validation call returns an error is when the cancellation token fires + return Err(DeletionQueueError::ShuttingDown); } - } else { - // Control plane API disabled. In legacy mode we consider everything valid. - tenant_generations.keys().map(|k| (*k, true)).collect() }; let mut validated_sequence: Option = None; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 81e548a095..ccb48d8bc1 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1084,8 +1084,17 @@ impl Timeline { let mut result = HashMap::new(); for (k, v) in kv { let v = v?; + if v.is_empty() { + // This is a tombstone -- we can skip it. + // Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of + // the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone, + // we also need to consider that. Such tombstones might be written on the detach ancestor code path to + // avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.) + continue; + } let origin_id = k.field6 as RepOriginId; - let origin_lsn = Lsn::des(&v).unwrap(); + let origin_lsn = Lsn::des(&v) + .with_context(|| format!("decode replorigin value for {}: {v:?}", origin_id))?; if origin_lsn != Lsn::INVALID { result.insert(origin_id, origin_lsn); } @@ -2578,6 +2587,11 @@ impl DatadirModification<'_> { } } + #[cfg(test)] + pub fn put_for_unit_test(&mut self, key: Key, val: Value) { + self.put(key, val); + } + fn put(&mut self, key: Key, val: Value) { if Self::is_data_key(&key) { self.put_data(key.to_compact(), val) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 698579e8fb..e59db74479 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4254,9 +4254,7 @@ impl TenantShard { deletion_queue_client: DeletionQueueClient, l0_flush_global_state: L0FlushGlobalState, ) -> TenantShard { - debug_assert!( - !attached_conf.location.generation.is_none() || conf.control_plane_api.is_none() - ); + assert!(!attached_conf.location.generation.is_none()); let (state, mut rx) = watch::channel(state); @@ -5949,7 +5947,9 @@ mod tests { use itertools::Itertools; #[cfg(feature = "testing")] use models::CompactLsnRange; - use pageserver_api::key::{AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX}; + use pageserver_api::key::{ + AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX, repl_origin_key, + }; use pageserver_api::keyspace::KeySpace; #[cfg(feature = "testing")] use pageserver_api::keyspace::KeySpaceRandomAccum; @@ -8185,6 +8185,54 @@ mod tests { assert_eq!(files.get("pg_logical/mappings/test2"), None); } + #[tokio::test] + async fn test_repl_origin_tombstones() { + let harness = TenantHarness::create("test_repl_origin_tombstones") + .await + .unwrap(); + + let (tenant, ctx) = harness.load().await; + let io_concurrency = IoConcurrency::spawn_for_test(); + + let mut lsn = Lsn(0x08); + + let tline: Arc = tenant + .create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + let repl_lsn = Lsn(0x10); + { + lsn += 8; + let mut modification = tline.begin_modification(lsn); + modification.put_for_unit_test(repl_origin_key(2), Value::Image(Bytes::new())); + modification.set_replorigin(1, repl_lsn).await.unwrap(); + modification.commit(&ctx).await.unwrap(); + } + + // we can read everything from the storage + let repl_origins = tline + .get_replorigins(lsn, &ctx, io_concurrency.clone()) + .await + .unwrap(); + assert_eq!(repl_origins.len(), 1); + assert_eq!(repl_origins[&1], lsn); + + { + lsn += 8; + let mut modification = tline.begin_modification(lsn); + modification.put_for_unit_test( + repl_origin_key(3), + Value::Image(Bytes::copy_from_slice(b"cannot_decode_this")), + ); + modification.commit(&ctx).await.unwrap(); + } + let result = tline + .get_replorigins(lsn, &ctx, io_concurrency.clone()) + .await; + assert!(result.is_err()); + } + #[tokio::test] async fn test_metadata_image_creation() -> anyhow::Result<()> { let harness = TenantHarness::create("test_metadata_image_creation").await?; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 2ae7e1e875..86aef9b42c 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -346,7 +346,8 @@ async fn init_load_generations( "Emergency mode! Tenants will be attached unsafely using their last known generation" ); emergency_generations(tenant_confs) - } else if let Some(client) = StorageControllerUpcallClient::new(conf, cancel)? { + } else { + let client = StorageControllerUpcallClient::new(conf, cancel); info!("Calling {} API to re-attach tenants", client.base_url()); // If we are configured to use the control plane API, then it is the source of truth for what tenants to load. match client.re_attach(conf).await { @@ -360,9 +361,6 @@ async fn init_load_generations( anyhow::bail!("Shut down while waiting for control plane re-attach response") } } - } else { - info!("Control plane API not configured, tenant generations are disabled"); - return Ok(None); }; // The deletion queue needs to know about the startup attachment state to decide which (if any) stored @@ -1153,17 +1151,8 @@ impl TenantManager { // Testing hack: if we are configured with no control plane, then drop the generation // from upserts. This enables creating generation-less tenants even though neon_local // always uses generations when calling the location conf API. - let attached_conf = if cfg!(feature = "testing") { - let mut conf = AttachedTenantConf::try_from(new_location_config) - .map_err(UpsertLocationError::BadRequest)?; - if self.conf.control_plane_api.is_none() { - conf.location.generation = Generation::none(); - } - conf - } else { - AttachedTenantConf::try_from(new_location_config) - .map_err(UpsertLocationError::BadRequest)? - }; + let attached_conf = AttachedTenantConf::try_from(new_location_config) + .map_err(UpsertLocationError::BadRequest)?; let tenant = tenant_spawn( self.conf, diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 8e95c3a8ff..649b33e294 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -178,7 +178,7 @@ impl Attempt { } } -async fn generate_tombstone_image_layer( +pub(crate) async fn generate_tombstone_image_layer( detached: &Arc, ancestor: &Arc, ancestor_lsn: Lsn, diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index b917fdbfd8..6ab6b90cb6 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -163,8 +163,7 @@ pub async fn doit( // Ensure at-least-once delivery of the upcall to storage controller // before we mark the task as done and never come here again. // - let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)? - .expect("storcon configured"); + let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel); storcon_client .put_timeline_import_status( timeline.tenant_shard_id, diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 426b176af9..8bcc6bf924 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -36,6 +36,8 @@ DATA = \ neon--1.2--1.3.sql \ neon--1.3--1.4.sql \ neon--1.4--1.5.sql \ + neon--1.5--1.6.sql \ + neon--1.6--1.5.sql \ neon--1.5--1.4.sql \ neon--1.4--1.3.sql \ neon--1.3--1.2.sql \ diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index e2c1f7682f..ecc55bb540 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -98,7 +98,6 @@ #define MB ((uint64)1024*1024) #define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ >> lfc_chunk_size_log)) - #define BLOCK_TO_CHUNK_OFF(blkno) ((blkno) & (lfc_blocks_per_chunk-1)) /* @@ -135,6 +134,15 @@ typedef struct FileCacheEntry #define N_COND_VARS 64 #define CV_WAIT_TIMEOUT 10 +#define MAX_PREWARM_WORKERS 8 + +typedef struct PrewarmWorkerState +{ + uint32 prewarmed_pages; + uint32 skipped_pages; + TimestampTz completed; +} PrewarmWorkerState; + typedef struct FileCacheControl { uint64 generation; /* generation is needed to handle correct hash @@ -156,25 +164,43 @@ typedef struct FileCacheControl dlist_head holes; /* double linked list of punched holes */ HyperLogLogState wss_estimation; /* estimation of working set size */ ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */ + PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS]; + size_t n_prewarm_workers; + size_t n_prewarm_entries; + size_t total_prewarm_pages; + size_t prewarm_batch; + bool prewarm_active; + bool prewarm_canceled; + dsm_handle prewarm_lfc_state_handle; } FileCacheControl; -bool lfc_store_prefetch_result; +#define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc + +#define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks]) +#define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * lfc_blocks_per_chunk)+7)/8) +#define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8) static HTAB *lfc_hash; static int lfc_desc = -1; static LWLockId lfc_lock; static int lfc_max_size; static int lfc_size_limit; +static int lfc_prewarm_limit; +static int lfc_prewarm_batch; static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG; static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK; static char *lfc_path; static uint64 lfc_generation; static FileCacheControl *lfc_ctl; +static bool lfc_do_prewarm; static shmem_startup_hook_type prev_shmem_startup_hook; #if PG_VERSION_NUM>=150000 static shmem_request_hook_type prev_shmem_request_hook; #endif +bool lfc_store_prefetch_result; +bool lfc_prewarm_update_ws_estimation; + #define LFC_ENABLED() (lfc_ctl->limit != 0) /* @@ -500,6 +526,17 @@ lfc_init(void) NULL, NULL); + DefineCustomBoolVariable("neon.prewarm_update_ws_estimation", + "Consider prewarmed pages for working set estimation", + NULL, + &lfc_prewarm_update_ws_estimation, + true, + PGC_SUSET, + 0, + NULL, + NULL, + NULL); + DefineCustomIntVariable("neon.max_file_cache_size", "Maximal size of Neon local file cache", NULL, @@ -550,6 +587,32 @@ lfc_init(void) lfc_change_chunk_size, NULL); + DefineCustomIntVariable("neon.file_cache_prewarm_limit", + "Maximal number of prewarmed chunks", + NULL, + &lfc_prewarm_limit, + INT_MAX, /* no limit by default */ + 0, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("neon.file_cache_prewarm_batch", + "Number of pages retrivied by prewarm from page server", + NULL, + &lfc_prewarm_batch, + 64, + 1, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + if (lfc_max_size == 0) return; @@ -563,6 +626,317 @@ lfc_init(void) #endif } +FileCacheState* +lfc_get_state(size_t max_entries) +{ + FileCacheState* fcs = NULL; + + if (lfc_maybe_disabled() || max_entries == 0) /* fast exit if file cache is disabled */ + return NULL; + + LWLockAcquire(lfc_lock, LW_SHARED); + + if (LFC_ENABLED()) + { + dlist_iter iter; + size_t i = 0; + uint8* bitmap; + size_t n_pages = 0; + size_t n_entries = Min(max_entries, lfc_ctl->used - lfc_ctl->pinned); + size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries); + fcs = (FileCacheState*)palloc0(state_size); + SET_VARSIZE(fcs, state_size); + fcs->magic = FILE_CACHE_STATE_MAGIC; + fcs->chunk_size_log = lfc_chunk_size_log; + fcs->n_chunks = n_entries; + bitmap = FILE_CACHE_STATE_BITMAP(fcs); + + dlist_reverse_foreach(iter, &lfc_ctl->lru) + { + FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur); + fcs->chunks[i] = entry->key; + for (int j = 0; j < lfc_blocks_per_chunk; j++) + { + if (GET_STATE(entry, j) != UNAVAILABLE) + { + BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j); + n_pages += 1; + } + } + if (++i == n_entries) + break; + } + Assert(i == n_entries); + fcs->n_pages = n_pages; + Assert(pg_popcount((char*)bitmap, ((n_entries << lfc_chunk_size_log) + 7)/8) == n_pages); + elog(LOG, "LFC: save state of %d chunks %d pages", (int)n_entries, (int)n_pages); + } + + LWLockRelease(lfc_lock); + + return fcs; +} + +/* + * Prewarm LFC cache to the specified state. It uses lfc_prefetch function to load prewarmed page without hoilding shared buffer lock + * and avoid race conditions with other backends. + */ +void +lfc_prewarm(FileCacheState* fcs, uint32 n_workers) +{ + size_t fcs_chunk_size_log; + size_t n_entries; + size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size); + size_t fcs_size; + dsm_segment *seg; + BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS]; + + + if (!lfc_ensure_opened()) + return; + + if (prewarm_batch == 0 || lfc_prewarm_limit == 0 || n_workers == 0) + { + elog(LOG, "LFC: prewarm is disabled"); + return; + } + + if (n_workers > MAX_PREWARM_WORKERS) + { + elog(ERROR, "LFC: Too much prewarm workers, maximum is %d", MAX_PREWARM_WORKERS); + } + + if (fcs == NULL || fcs->n_chunks == 0) + { + elog(LOG, "LFC: nothing to prewarm"); + return; + } + + if (fcs->magic != FILE_CACHE_STATE_MAGIC) + { + elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic); + } + + fcs_size = VARSIZE(fcs); + if (FILE_CACHE_STATE_SIZE(fcs) != fcs_size) + { + elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs)); + } + + fcs_chunk_size_log = fcs->chunk_size_log; + if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK_LOG) + { + elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log); + } + + n_entries = Min(fcs->n_chunks, lfc_prewarm_limit); + Assert(n_entries != 0); + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + /* Do not prewarm more entries than LFC limit */ + if (lfc_ctl->limit <= lfc_ctl->size) + { + elog(LOG, "LFC: skip prewarm because LFC is already filled"); + LWLockRelease(lfc_lock); + return; + } + + if (lfc_ctl->prewarm_active) + { + LWLockRelease(lfc_lock); + elog(ERROR, "LFC: skip prewarm because another prewarm is still active"); + } + lfc_ctl->n_prewarm_entries = n_entries; + lfc_ctl->n_prewarm_workers = n_workers; + lfc_ctl->prewarm_active = true; + lfc_ctl->prewarm_canceled = false; + lfc_ctl->prewarm_batch = prewarm_batch; + memset(lfc_ctl->prewarm_workers, 0, n_workers*sizeof(PrewarmWorkerState)); + + LWLockRelease(lfc_lock); + + /* Calculate total number of pages to be prewarmed */ + lfc_ctl->total_prewarm_pages = fcs->n_pages; + + seg = dsm_create(fcs_size, 0); + memcpy(dsm_segment_address(seg), fcs, fcs_size); + lfc_ctl->prewarm_lfc_state_handle = dsm_segment_handle(seg); + + /* Spawn background workers */ + for (uint32 i = 0; i < n_workers; i++) + { + BackgroundWorker worker = {0}; + + worker.bgw_flags = BGWORKER_SHMEM_ACCESS; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + strcpy(worker.bgw_library_name, "neon"); + strcpy(worker.bgw_function_name, "lfc_prewarm_main"); + snprintf(worker.bgw_name, BGW_MAXLEN, "LFC prewarm worker %d", i+1); + strcpy(worker.bgw_type, "LFC prewarm worker"); + worker.bgw_main_arg = Int32GetDatum(i); + /* must set notify PID to wait for shutdown */ + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &bgw_handle[i])) + { + ereport(LOG, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("LFC: registering dynamic bgworker prewarm failed"), + errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes"))); + n_workers = i; + lfc_ctl->prewarm_canceled = true; + break; + } + } + + for (uint32 i = 0; i < n_workers; i++) + { + bool interrupted; + do + { + interrupted = false; + PG_TRY(); + { + BgwHandleStatus status = WaitForBackgroundWorkerShutdown(bgw_handle[i]); + if (status != BGWH_STOPPED && status != BGWH_POSTMASTER_DIED) + { + elog(LOG, "LFC: Unexpected status of prewarm worker termination: %d", status); + } + } + PG_CATCH(); + { + elog(LOG, "LFC: cancel prewarm"); + lfc_ctl->prewarm_canceled = true; + interrupted = true; + } + PG_END_TRY(); + } while (interrupted); + + if (!lfc_ctl->prewarm_workers[i].completed) + { + /* Background worker doesn't set completion time: it means that it was abnormally terminated */ + elog(LOG, "LFC: prewarm worker %d failed", i+1); + /* Set completion time to prevent get_prewarm_info from considering this worker as active */ + lfc_ctl->prewarm_workers[i].completed = GetCurrentTimestamp(); + } + } + dsm_detach(seg); + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_ctl->prewarm_active = false; + LWLockRelease(lfc_lock); +} + +void +lfc_prewarm_main(Datum main_arg) +{ + size_t snd_idx = 0, rcv_idx = 0; + size_t n_sent = 0, n_received = 0; + size_t fcs_chunk_size_log; + size_t max_prefetch_pages; + size_t prewarm_batch; + size_t n_workers; + dsm_segment *seg; + FileCacheState* fcs; + uint8* bitmap; + BufferTag tag; + PrewarmWorkerState* ws; + uint32 worker_id = DatumGetInt32(main_arg); + + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + seg = dsm_attach(lfc_ctl->prewarm_lfc_state_handle); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory segment"))); + + fcs = (FileCacheState*) dsm_segment_address(seg); + prewarm_batch = lfc_ctl->prewarm_batch; + fcs_chunk_size_log = fcs->chunk_size_log; + n_workers = lfc_ctl->n_prewarm_workers; + max_prefetch_pages = lfc_ctl->n_prewarm_entries << fcs_chunk_size_log; + ws = &lfc_ctl->prewarm_workers[worker_id]; + bitmap = FILE_CACHE_STATE_BITMAP(fcs); + + /* enable prefetch in LFC */ + lfc_store_prefetch_result = true; + lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC cache is full */ + + elog(LOG, "LFC: worker %d start prewarming", worker_id); + while (!lfc_ctl->prewarm_canceled) + { + if (snd_idx < max_prefetch_pages) + { + if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id) + { + /* If there are multiple workers, split chunks between them */ + snd_idx += 1 << fcs_chunk_size_log; + } + else + { + if (BITMAP_ISSET(bitmap, snd_idx)) + { + tag = fcs->chunks[snd_idx >> fcs_chunk_size_log]; + tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1); + if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum)) + { + (void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL); + n_sent += 1; + } + else + { + ws->skipped_pages += 1; + BITMAP_CLR(bitmap, snd_idx); + } + } + snd_idx += 1; + } + } + if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages) + { + if (n_received == n_sent && snd_idx == max_prefetch_pages) + { + break; + } + if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id) + { + /* Skip chunks processed by other workers */ + rcv_idx += 1 << fcs_chunk_size_log; + continue; + } + + /* Locate next block to prefetch */ + while (!BITMAP_ISSET(bitmap, rcv_idx)) + { + rcv_idx += 1; + } + tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log]; + tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1); + if (communicator_prefetch_receive(tag)) + { + ws->prewarmed_pages += 1; + } + else + { + ws->skipped_pages += 1; + } + rcv_idx += 1; + n_received += 1; + } + } + /* No need to perform prefetch cleanup here because prewarm worker will be terminated and + * connection to PS dropped just after return from this function. + */ + Assert(n_sent == n_received || lfc_ctl->prewarm_canceled); + elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received); + lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp(); +} + + /* * Check if page is present in the cache. * Returns true if page is found in local cache. @@ -1001,8 +1375,11 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash) * If we can't (e.g. because all other slots are being accessed) * then we will remove this entry from the hash and continue * on to the next chunk, as we may not exceed the limit. + * + * While prewarming LFC we do not want to replace existed entries, + * so we just stop prewarm is LFC cache is full. */ - else if (!dlist_is_empty(&lfc_ctl->lru)) + else if (!dlist_is_empty(&lfc_ctl->lru) && !lfc_do_prewarm) { /* Cache overflow: evict least recently used chunk */ FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, @@ -1026,6 +1403,7 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash) /* Can't add this chunk - we don't have the space for it */ hash_search_with_hash_value(lfc_hash, &entry->key, hash, HASH_REMOVE, NULL); + lfc_ctl->prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */ return false; } @@ -1112,9 +1490,11 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); - tag.blockNum = blkno; - addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); - + if (lfc_prewarm_update_ws_estimation) + { + tag.blockNum = blkno; + addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + } if (found) { state = GET_STATE(entry, chunk_offs); @@ -1748,3 +2128,82 @@ approximate_working_set_size(PG_FUNCTION_ARGS) } PG_RETURN_NULL(); } + +PG_FUNCTION_INFO_V1(get_local_cache_state); + +Datum +get_local_cache_state(PG_FUNCTION_ARGS) +{ + size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); + FileCacheState* fcs = lfc_get_state(max_entries); + if (fcs != NULL) + PG_RETURN_BYTEA_P((bytea*)fcs); + else + PG_RETURN_NULL(); +} + +PG_FUNCTION_INFO_V1(prewarm_local_cache); + +Datum +prewarm_local_cache(PG_FUNCTION_ARGS) +{ + bytea* state = PG_GETARG_BYTEA_PP(0); + uint32 n_workers = PG_GETARG_INT32(1); + FileCacheState* fcs = (FileCacheState*)state; + + lfc_prewarm(fcs, n_workers); + + PG_RETURN_NULL(); +} + +PG_FUNCTION_INFO_V1(get_prewarm_info); + +Datum +get_prewarm_info(PG_FUNCTION_ARGS) +{ + Datum values[4]; + bool nulls[4]; + TupleDesc tupdesc; + uint32 prewarmed_pages = 0; + uint32 skipped_pages = 0; + uint32 active_workers = 0; + uint32 total_pages; + size_t n_workers; + + if (lfc_size_limit == 0) + PG_RETURN_NULL(); + + LWLockAcquire(lfc_lock, LW_SHARED); + if (!lfc_ctl || lfc_ctl->n_prewarm_workers == 0) + { + LWLockRelease(lfc_lock); + PG_RETURN_NULL(); + } + n_workers = lfc_ctl->n_prewarm_workers; + total_pages = lfc_ctl->total_prewarm_pages; + for (size_t i = 0; i < n_workers; i++) + { + PrewarmWorkerState* ws = &lfc_ctl->prewarm_workers[i]; + prewarmed_pages += ws->prewarmed_pages; + skipped_pages += ws->skipped_pages; + active_workers += ws->completed != 0; + } + LWLockRelease(lfc_lock); + + tupdesc = CreateTemplateTupleDesc(4); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "prewarmed_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "skipped_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "active_workers", INT4OID, -1, 0); + tupdesc = BlessTupleDesc(tupdesc); + + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(total_pages); + values[1] = Int32GetDatum(prewarmed_pages); + values[2] = Int32GetDatum(skipped_pages); + values[3] = Int32GetDatum(active_workers); + + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} + diff --git a/pgxn/neon/file_cache.h b/pgxn/neon/file_cache.h index 849558b83d..c7b6b09f72 100644 --- a/pgxn/neon/file_cache.h +++ b/pgxn/neon/file_cache.h @@ -13,6 +13,17 @@ #include "neon_pgversioncompat.h" +typedef struct FileCacheState +{ + int32 vl_len_; /* varlena header (do not touch directly!) */ + uint32 magic; + uint32 n_chunks; + uint32 n_pages; + uint16 chunk_size_log; + BufferTag chunks[FLEXIBLE_ARRAY_MEMBER]; + /* followed by bitmap */ +} FileCacheState; + /* GUCs */ extern bool lfc_store_prefetch_result; @@ -32,7 +43,10 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, extern void lfc_init(void); extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, const void* buffer, XLogRecPtr lsn); +extern FileCacheState* lfc_get_state(size_t max_entries); +extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers); +PGDLLEXPORT void lfc_prewarm_main(Datum main_arg); static inline bool lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index ccb072d6f9..5287c12a84 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -736,8 +736,8 @@ pageserver_connect(shardno_t shard_no, int elevel) default: neon_shard_log(shard_no, ERROR, "libpagestore: invalid connection state %d", shard->state); } - /* This shouldn't be hit */ - Assert(false); + + pg_unreachable(); } static void diff --git a/pgxn/neon/neon--1.5--1.6.sql b/pgxn/neon/neon--1.5--1.6.sql new file mode 100644 index 0000000000..c05f0f87aa --- /dev/null +++ b/pgxn/neon/neon--1.5--1.6.sql @@ -0,0 +1,22 @@ +\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit + +CREATE FUNCTION get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer) +RETURNS record +AS 'MODULE_PATHNAME', 'get_prewarm_info' +LANGUAGE C STRICT +PARALLEL SAFE; + +CREATE FUNCTION get_local_cache_state(max_chunks integer default null) +RETURNS bytea +AS 'MODULE_PATHNAME', 'get_local_cache_state' +LANGUAGE C +PARALLEL UNSAFE; + +CREATE FUNCTION prewarm_local_cache(state bytea, n_workers integer default 1) +RETURNS void +AS 'MODULE_PATHNAME', 'prewarm_local_cache' +LANGUAGE C STRICT +PARALLEL UNSAFE; + + + diff --git a/pgxn/neon/neon--1.6--1.5.sql b/pgxn/neon/neon--1.6--1.5.sql new file mode 100644 index 0000000000..57512980f5 --- /dev/null +++ b/pgxn/neon/neon--1.6--1.5.sql @@ -0,0 +1,7 @@ +DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer); + +DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer); + +DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1); + + diff --git a/proxy/README.md b/proxy/README.md index 1156bfd352..583db36f28 100644 --- a/proxy/README.md +++ b/proxy/README.md @@ -32,7 +32,7 @@ To play with it locally one may start proxy over a local postgres installation (see end of this page on how to generate certs with openssl): ``` -./target/debug/proxy -c server.crt -k server.key --auth-backend=postgres --auth-endpoint=postgres://stas@127.0.0.1:5432/stas --wss 0.0.0.0:4444 +LOGFMT=text ./target/debug/proxy -c server.crt -k server.key --auth-backend=postgres --auth-endpoint=postgres://stas@127.0.0.1:5432/stas --wss 0.0.0.0:4444 ``` If both postgres and proxy are running you may send a SQL query: @@ -130,7 +130,7 @@ openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key Then we need to build proxy with 'testing' feature and run, e.g.: ```sh -RUST_LOG=proxy cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' -c server.crt -k server.key +RUST_LOG=proxy LOGFMT=text cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' -c server.crt -k server.key ``` Now from client you can start a new session: diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index b83b03bc4f..efa3c0b514 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -132,11 +132,10 @@ impl Drop for LoggingGuard { } } -// TODO: make JSON the default #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)] enum LogFormat { + Text, #[default] - Text = 1, Json, } diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index dd71420efb..c267a55cb6 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -1,7 +1,6 @@ // // Main entry point for the safekeeper executable // -use std::env::{VarError, var}; use std::fs::{self, File}; use std::io::{ErrorKind, Write}; use std::str::FromStr; @@ -354,29 +353,13 @@ async fn main() -> anyhow::Result<()> { }; // Load JWT auth token to connect to other safekeepers for pull_timeline. - // First check if the env var is present, then check the arg with the path. - // We want to deprecate and remove the env var method in the future. - let sk_auth_token = match var("SAFEKEEPER_AUTH_TOKEN") { - Ok(v) => { - info!("loaded JWT token for authentication with safekeepers"); - Some(SecretString::from(v)) - } - Err(VarError::NotPresent) => { - if let Some(auth_token_path) = args.auth_token_path.as_ref() { - info!( - "loading JWT token for authentication with safekeepers from {auth_token_path}" - ); - let auth_token = tokio::fs::read_to_string(auth_token_path).await?; - Some(SecretString::from(auth_token.trim().to_owned())) - } else { - info!("no JWT token for authentication with safekeepers detected"); - None - } - } - Err(_) => { - warn!("JWT token for authentication with safekeepers is not unicode"); - None - } + let sk_auth_token = if let Some(auth_token_path) = args.auth_token_path.as_ref() { + info!("loading JWT token for authentication with safekeepers from {auth_token_path}"); + let auth_token = tokio::fs::read_to_string(auth_token_path).await?; + Some(SecretString::from(auth_token.trim().to_owned())) + } else { + info!("no JWT token for authentication with safekeepers detected"); + None }; let ssl_ca_certs = match args.ssl_ca_file.as_ref() { diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 71dde9e126..2eea2f9d10 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -19,7 +19,8 @@ use storage_controller::service::chaos_injector::ChaosInjector; use storage_controller::service::{ Config, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, - PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT, Service, + PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT, + SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT, Service, }; use tokio::signal::unix::SignalKind; use tokio_util::sync::CancellationToken; @@ -132,6 +133,10 @@ struct Cli { #[arg(long)] priority_reconciler_concurrency: Option, + /// Maximum number of safekeeper reconciliations that may run in parallel (per safekeeper) + #[arg(long)] + safekeeper_reconciler_concurrency: Option, + /// Tenant API rate limit, as requests per second per tenant. #[arg(long, default_value = "10")] tenant_rate_limit: NonZeroU32, @@ -403,6 +408,9 @@ async fn async_main() -> anyhow::Result<()> { priority_reconciler_concurrency: args .priority_reconciler_concurrency .unwrap_or(PRIORITY_RECONCILER_CONCURRENCY_DEFAULT), + safekeeper_reconciler_concurrency: args + .safekeeper_reconciler_concurrency + .unwrap_or(SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT), tenant_rate_limit: args.tenant_rate_limit, split_threshold: args.split_threshold, max_split_shards: args.max_split_shards, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index ca9b911c4d..50ce559cc0 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -194,6 +194,7 @@ pub(crate) enum LeadershipStatus { pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128; pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256; +pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32; // Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately. // This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly @@ -382,6 +383,9 @@ pub struct Config { /// How many high-priority Reconcilers may be spawned concurrently pub priority_reconciler_concurrency: usize, + /// How many safekeeper reconciles may happen concurrently (per safekeeper) + pub safekeeper_reconciler_concurrency: usize, + /// How many API requests per second to allow per tenant, across all /// tenant-scoped API endpoints. Further API requests queue until ready. pub tenant_rate_limit: NonZeroU32, @@ -3720,6 +3724,10 @@ impl Service { // Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then // use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard // that will get the first creation request, and propagate the LSN to all the >0 shards. + // + // This also enables non-zero shards to use the initdb that shard 0 generated and uploaded to S3, rather than + // independently generating their own initdb. This guarantees that shards cannot end up with different initial + // states if e.g. they have different postgres binary versions. let timeline_info = create_one( shard_zero_tid, shard_zero_locations, @@ -3729,11 +3737,16 @@ impl Service { ) .await?; - // Propagate the LSN that shard zero picked, if caller didn't provide one + // Update the create request for shards >= 0 match &mut create_req.mode { models::TimelineCreateRequestMode::Branch { ancestor_start_lsn, .. } if ancestor_start_lsn.is_none() => { + // Propagate the LSN that shard zero picked, if caller didn't provide one *ancestor_start_lsn = timeline_info.ancestor_lsn; }, + models::TimelineCreateRequestMode::Bootstrap { existing_initdb_timeline_id, .. } => { + // For shards >= 0, do not run initdb: use the one that shard 0 uploaded to S3 + *existing_initdb_timeline_id = Some(create_req.new_timeline_id) + } _ => {} } diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index b15772a36c..74308cabff 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -3,7 +3,10 @@ use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; use clashmap::{ClashMap, Entry}; use safekeeper_api::models::PullTimelineRequest; use safekeeper_client::mgmt_api; -use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::sync::{ + Semaphore, + mpsc::{self, UnboundedReceiver, UnboundedSender}, +}; use tokio_util::sync::CancellationToken; use tracing::Instrument; use utils::{ @@ -206,18 +209,27 @@ impl ReconcilerHandle { } pub(crate) struct SafekeeperReconciler { - service: Arc, + inner: SafekeeperReconcilerInner, + concurrency_limiter: Arc, rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>, cancel: CancellationToken, } +/// Thin wrapper over `Service` to not clutter its inherent functions +#[derive(Clone)] +struct SafekeeperReconcilerInner { + service: Arc, +} + impl SafekeeperReconciler { fn spawn(cancel: CancellationToken, service: Arc) -> ReconcilerHandle { // We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking. let (tx, rx) = mpsc::unbounded_channel(); + let concurrency = service.config.safekeeper_reconciler_concurrency; let mut reconciler = SafekeeperReconciler { - service, + inner: SafekeeperReconcilerInner { service }, rx, + concurrency_limiter: Arc::new(Semaphore::new(concurrency)), cancel: cancel.clone(), }; let handle = ReconcilerHandle { @@ -230,31 +242,44 @@ impl SafekeeperReconciler { } async fn run(&mut self) { loop { - // TODO add parallelism with semaphore here let req = tokio::select! { req = self.rx.recv() => req, _ = self.cancel.cancelled() => break, }; let Some((req, req_cancel)) = req else { break }; + + let permit_res = tokio::select! { + req = self.concurrency_limiter.clone().acquire_owned() => req, + _ = self.cancel.cancelled() => break, + }; + let Ok(_permit) = permit_res else { return }; + + let inner = self.inner.clone(); if req_cancel.is_cancelled() { continue; } - let kind = req.kind; - let tenant_id = req.tenant_id; - let timeline_id = req.timeline_id; - let node_id = req.safekeeper.skp.id; - self.reconcile_one(req, req_cancel) - .instrument(tracing::info_span!( - "reconcile_one", - ?kind, - %tenant_id, - ?timeline_id, - %node_id, - )) - .await; + tokio::task::spawn(async move { + let kind = req.kind; + let tenant_id = req.tenant_id; + let timeline_id = req.timeline_id; + let node_id = req.safekeeper.skp.id; + inner + .reconcile_one(req, req_cancel) + .instrument(tracing::info_span!( + "reconcile_one", + ?kind, + %tenant_id, + ?timeline_id, + %node_id, + )) + .await; + }); } } +} + +impl SafekeeperReconcilerInner { async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) { let req_host = req.safekeeper.skp.host.clone(); match req.kind { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 1d668d4b2d..b93df4ede4 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1194,8 +1194,7 @@ class NeonEnv: else: cfg["broker"]["listen_addr"] = self.broker.listen_addr() - if self.control_plane_api is not None: - cfg["control_plane_api"] = self.control_plane_api + cfg["control_plane_api"] = self.control_plane_api if self.control_plane_hooks_api is not None: cfg["control_plane_hooks_api"] = self.control_plane_hooks_api diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index 3b6c94a268..d28240c722 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -14,7 +14,7 @@ from fixtures.log_helper import log from fixtures.metrics import parse_metrics from fixtures.paths import BASE_DIR from fixtures.pg_config import PgConfigKey -from fixtures.utils import subprocess_capture +from fixtures.utils import WITH_SANITIZERS, subprocess_capture from werkzeug.wrappers.response import Response if TYPE_CHECKING: @@ -148,6 +148,15 @@ def test_remote_extensions( pg_config: PgConfig, extension: RemoteExtension, ): + if WITH_SANITIZERS and extension is RemoteExtension.WITH_LIB: + pytest.skip( + """ + For this test to work with sanitizers enabled, we would need to + compile the dummy Postgres extension with the same CFLAGS that we + compile Postgres and the neon extension with to link the sanitizers. + """ + ) + # Setup a mock nginx S3 gateway which will return our test extension. (host, port) = httpserver_listen_address extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway" diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py new file mode 100644 index 0000000000..dd0ae1921d --- /dev/null +++ b/test_runner/regress/test_lfc_prewarm.py @@ -0,0 +1,147 @@ +import random +import threading +import time + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv +from fixtures.utils import USE_LFC + + +def check_pinned_entries(cur): + # some LFC buffer can be temporary locked by autovacuum or background writer + for _ in range(10): + cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_chunks_pinned'") + n_pinned = cur.fetchall()[0][0] + if n_pinned == 0: + break + time.sleep(1) + assert n_pinned == 0 + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prewarm(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 1000000 + + endpoint = env.endpoints.create_start( + branch_name="main", + config_lines=[ + "autovacuum = off", + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("create extension neon version '1.6'") + cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))") + cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))") + cur.execute("select get_local_cache_state()") + lfc_state = cur.fetchall()[0][0] + + endpoint.stop() + endpoint.start() + + conn = endpoint.connect() + cur = conn.cursor() + time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version + cur.execute("alter extension neon update to '1.6'") + cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + + cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'") + lfc_used_pages = cur.fetchall()[0][0] + log.info(f"Used LFC size: {lfc_used_pages}") + cur.execute("select * from get_prewarm_info()") + prewarm_info = cur.fetchall()[0] + log.info(f"Prewarm info: {prewarm_info}") + log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%") + + assert lfc_used_pages > 10000 + assert ( + prewarm_info[0] > 0 + and prewarm_info[1] > 0 + and prewarm_info[0] == prewarm_info[1] + prewarm_info[2] + ) + + cur.execute("select sum(pk) from t") + assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 + + check_pinned_entries(cur) + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 10000 + n_threads = 4 + + endpoint = env.endpoints.create_start( + branch_name="main", + config_lines=[ + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000000", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("create extension neon version '1.6'") + cur.execute( + "create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 1000)) with (fillfactor=10)" + ) + cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))") + cur.execute("select get_local_cache_state()") + lfc_state = cur.fetchall()[0][0] + + running = True + + def workload(): + conn = endpoint.connect() + cur = conn.cursor() + n_transfers = 0 + while running: + src = random.randint(1, n_records) + dst = random.randint(1, n_records) + cur.execute("update accounts set balance=balance-100 where id=%s", (src,)) + cur.execute("update accounts set balance=balance+100 where id=%s", (dst,)) + n_transfers += 1 + log.info(f"Number of transfers: {n_transfers}") + + def prewarm(): + conn = endpoint.connect() + cur = conn.cursor() + n_prewarms = 0 + while running: + cur.execute("alter system set neon.file_cache_size_limit='1MB'") + cur.execute("select pg_reload_conf()") + cur.execute("alter system set neon.file_cache_size_limit='1GB'") + cur.execute("select pg_reload_conf()") + cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + n_prewarms += 1 + log.info(f"Number of prewarms: {n_prewarms}") + + workload_threads = [] + for _ in range(n_threads): + t = threading.Thread(target=workload) + workload_threads.append(t) + t.start() + + prewarm_thread = threading.Thread(target=prewarm) + prewarm_thread.start() + + time.sleep(20) + + running = False + for t in workload_threads: + t.join() + prewarm_thread.join() + + cur.execute("select sum(balance) from accounts") + total_balance = cur.fetchall()[0][0] + assert total_balance == 0 + + check_pinned_entries(cur) diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index fa1cd61206..e3f9982486 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -3,7 +3,7 @@ Tests in this module exercise the pageserver's behavior around generation numbers, as defined in docs/rfcs/025-generation-numbers.md. Briefly, the behaviors we require of the pageserver are: -- Do not start a tenant without a generation number if control_plane_api is set +- Do not start a tenant without a generation number - Remote objects must be suffixed with generation - Deletions may only be executed after validating generation - Updates to remote_consistent_lsn may only be made visible after validating generation