From 30b57334efecddc0e0f9fc285416b62a861c196f Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Tue, 29 Jul 2025 10:47:39 +0100 Subject: [PATCH 01/15] test_lsn_lease_storcon: ignore ShardSplit warning in debug builds (#12770) ## Problem `test_lsn_lease_storcon` might fail in debug builds due to slow ShardSplit ## Summary of changes - Make `test_lsn_lease_storcon ` test to ignore `.*Exclusive lock by ShardSplit was held.*` warning in debug builds Ref: https://databricks.slack.com/archives/C09254R641L/p1753777051481029 --- test_runner/regress/test_tenant_size.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index 8b291b7cbe..5564d9342c 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING @@ -768,6 +769,14 @@ def test_lsn_lease_storcon(neon_env_builder: NeonEnvBuilder): "compaction_period": "0s", } env = neon_env_builder.init_start(initial_tenant_conf=conf) + # ShardSplit is slow in debug builds, so ignore the warning + if os.getenv("BUILD_TYPE", "debug") == "debug": + env.storage_controller.allowed_errors.extend( + [ + ".*Exclusive lock by ShardSplit was held.*", + ] + ) + with env.endpoints.create_start( "main", ) as ep: From 1ed72529507b9abc7ae2a2b80da025289e06a6bc Mon Sep 17 00:00:00 2001 From: a-masterov <72613290+a-masterov@users.noreply.github.com> Date: Tue, 29 Jul 2025 12:19:10 +0200 Subject: [PATCH 02/15] Add a workaround for the clickhouse 24.9+ problem causing an error (#12767) ## Problem We used ClickHouse v. 24.8, which is outdated, for logical replication testing. We could miss some problems. ## Summary of changes The version was updated to 25.6, with a workaround using the environment variable `PGSSLCERT`. Co-authored-by: Alexey Masterov --- .github/workflows/pg-clients.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pg-clients.yml b/.github/workflows/pg-clients.yml index b6b4eca2b8..40b2c51624 100644 --- a/.github/workflows/pg-clients.yml +++ b/.github/workflows/pg-clients.yml @@ -72,9 +72,10 @@ jobs: options: --init --user root services: clickhouse: - image: clickhouse/clickhouse-server:24.8 + image: clickhouse/clickhouse-server:25.6 env: CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }} + PGSSLCERT: /tmp/postgresql.crt ports: - 9000:9000 - 8123:8123 From 568927a8a01d1c49b2b19ad309f6219936bbdc13 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 29 Jul 2025 14:08:22 +0300 Subject: [PATCH 03/15] Remove unnecessary dependency to 'log' crate (#12763) We use 'tracing' everywhere. --- Cargo.lock | 1 - libs/postgres_ffi/Cargo.toml | 1 - libs/postgres_ffi/src/nonrelfile_utils.rs | 3 +-- libs/postgres_ffi/src/waldecoder_handler.rs | 3 +-- libs/postgres_ffi/src/xlog_utils.rs | 10 ++++------ 5 files changed, 6 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1919f8e99d..9a0cc9076a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5077,7 +5077,6 @@ dependencies = [ "crc32c", "criterion", "env_logger", - "log", "once_cell", "postgres", "postgres_ffi_types", diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index fca75b7bc1..5adf38f457 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -11,7 +11,6 @@ anyhow.workspace = true crc32c.workspace = true criterion.workspace = true once_cell.workspace = true -log.workspace = true pprof.workspace = true thiserror.workspace = true serde.workspace = true diff --git a/libs/postgres_ffi/src/nonrelfile_utils.rs b/libs/postgres_ffi/src/nonrelfile_utils.rs index e3e7133b94..f6693d4ec1 100644 --- a/libs/postgres_ffi/src/nonrelfile_utils.rs +++ b/libs/postgres_ffi/src/nonrelfile_utils.rs @@ -4,12 +4,11 @@ use crate::pg_constants; use crate::transaction_id_precedes; use bytes::BytesMut; -use log::*; use super::bindings::MultiXactId; pub fn transaction_id_set_status(xid: u32, status: u8, page: &mut BytesMut) { - trace!( + tracing::trace!( "handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort, 3-sub_commit)", status ); diff --git a/libs/postgres_ffi/src/waldecoder_handler.rs b/libs/postgres_ffi/src/waldecoder_handler.rs index 9cd40645ec..563a3426a0 100644 --- a/libs/postgres_ffi/src/waldecoder_handler.rs +++ b/libs/postgres_ffi/src/waldecoder_handler.rs @@ -14,7 +14,6 @@ use super::xlog_utils::*; use crate::WAL_SEGMENT_SIZE; use bytes::{Buf, BufMut, Bytes, BytesMut}; use crc32c::*; -use log::*; use std::cmp::min; use std::num::NonZeroU32; use utils::lsn::Lsn; @@ -236,7 +235,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder { // XLOG_SWITCH records are special. If we see one, we need to skip // to the next WAL segment. let next_lsn = if xlogrec.is_xlog_switch_record() { - trace!("saw xlog switch record at {}", self.lsn); + tracing::trace!("saw xlog switch record at {}", self.lsn); self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64) } else { // Pad to an 8-byte boundary diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 134baf5ff7..913e6b453f 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -23,8 +23,6 @@ use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; use bytes::BytesMut; use bytes::{Buf, Bytes}; -use log::*; - use serde::Serialize; use std::ffi::{CString, OsStr}; use std::fs::File; @@ -235,7 +233,7 @@ pub fn find_end_of_wal( let mut curr_lsn = start_lsn; let mut buf = [0u8; XLOG_BLCKSZ]; let pg_version = MY_PGVERSION; - debug!("find_end_of_wal PG_VERSION: {}", pg_version); + tracing::debug!("find_end_of_wal PG_VERSION: {}", pg_version); let mut decoder = WalStreamDecoder::new(start_lsn, pg_version); @@ -247,7 +245,7 @@ pub fn find_end_of_wal( match open_wal_segment(&seg_file_path)? { None => { // no more segments - debug!( + tracing::debug!( "find_end_of_wal reached end at {:?}, segment {:?} doesn't exist", result, seg_file_path ); @@ -260,7 +258,7 @@ pub fn find_end_of_wal( while curr_lsn.segment_number(wal_seg_size) == segno { let bytes_read = segment.read(&mut buf)?; if bytes_read == 0 { - debug!( + tracing::debug!( "find_end_of_wal reached end at {:?}, EOF in segment {:?} at offset {}", result, seg_file_path, @@ -276,7 +274,7 @@ pub fn find_end_of_wal( match decoder.poll_decode() { Ok(Some(record)) => result = record.0, Err(e) => { - debug!( + tracing::debug!( "find_end_of_wal reached end at {:?}, decode error: {:?}", result, e ); From 58327cbba8f39610f08413e4f20702b58d072d2a Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Tue, 29 Jul 2025 15:58:31 +0400 Subject: [PATCH 04/15] storcon: wait for the migration from the drained node in the draining loop (#12754) ## Problem We have seen some errors in staging when the shard migration was triggered by optimizations, and it was ongoing during draining the node it was migrating from. It happens because the node draining loop only waits for the migrations started by the drain loop itself. The ongoing migrations are ignored. Closes: https://databricks.atlassian.net/browse/LKB-1625 ## Summary of changes - Wait for the shard reconciliation during the drain if it is being migrated from the drained node. --- storage_controller/src/operation_utils.rs | 46 ++++++++++++++++++++--- storage_controller/src/service.rs | 39 ++++++++++--------- storage_controller/src/tenant_shard.rs | 2 - 3 files changed, 62 insertions(+), 25 deletions(-) diff --git a/storage_controller/src/operation_utils.rs b/storage_controller/src/operation_utils.rs index af86010ab7..1060c92832 100644 --- a/storage_controller/src/operation_utils.rs +++ b/storage_controller/src/operation_utils.rs @@ -46,11 +46,31 @@ impl TenantShardDrain { &self, tenants: &BTreeMap, scheduler: &Scheduler, - ) -> Option { - let tenant_shard = tenants.get(&self.tenant_shard_id)?; + ) -> TenantShardDrainAction { + let Some(tenant_shard) = tenants.get(&self.tenant_shard_id) else { + return TenantShardDrainAction::Skip; + }; if *tenant_shard.intent.get_attached() != Some(self.drained_node) { - return None; + // If the intent attached node is not the drained node, check the observed state + // of the shard on the drained node. If it is Attached*, it means the shard is + // beeing migrated from the drained node. The drain loop needs to wait for the + // reconciliation to complete for a smooth draining. + + use pageserver_api::models::LocationConfigMode::*; + + let attach_mode = tenant_shard + .observed + .locations + .get(&self.drained_node) + .and_then(|observed| observed.conf.as_ref().map(|conf| conf.mode)); + + return match (attach_mode, tenant_shard.intent.get_attached()) { + (Some(AttachedSingle | AttachedMulti | AttachedStale), Some(intent_node_id)) => { + TenantShardDrainAction::Reconcile(*intent_node_id) + } + _ => TenantShardDrainAction::Skip, + }; } // Only tenants with a normal (Active) scheduling policy are proactively moved @@ -63,19 +83,19 @@ impl TenantShardDrain { } ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => { // If we have been asked to avoid rescheduling this shard, then do not migrate it during a drain - return None; + return TenantShardDrainAction::Skip; } } match tenant_shard.preferred_secondary(scheduler) { - Some(node) => Some(node), + Some(node) => TenantShardDrainAction::RescheduleToSecondary(node), None => { tracing::warn!( tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), "No eligible secondary while draining {}", self.drained_node ); - None + TenantShardDrainAction::Skip } } } @@ -138,3 +158,17 @@ impl TenantShardDrain { } } } + +/// Action to take when draining a tenant shard. +pub(crate) enum TenantShardDrainAction { + /// The tenant shard is on the draining node. + /// Reschedule the tenant shard to a secondary location. + /// Holds a destination node id to reschedule to. + RescheduleToSecondary(NodeId), + /// The tenant shard is beeing migrated from the draining node. + /// Wait for the reconciliation to complete. + /// Holds the intent attached node id. + Reconcile(NodeId), + /// The tenant shard is not eligible for drainining, skip it. + Skip, +} diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 4c011033af..37380b8fbe 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -79,7 +79,7 @@ use crate::id_lock_map::{ use crate::leadership::Leadership; use crate::metrics; use crate::node::{AvailabilityTransition, Node}; -use crate::operation_utils::{self, TenantShardDrain}; +use crate::operation_utils::{self, TenantShardDrain, TenantShardDrainAction}; use crate::pageserver_client::PageserverClient; use crate::peer_client::GlobalObservedState; use crate::persistence::split_state::SplitState; @@ -1274,7 +1274,7 @@ impl Service { // Always attempt autosplits. Sharding is crucial for bulk ingest performance, so we // must be responsive when new projects begin ingesting and reach the threshold. self.autosplit_tenants().await; - } + }, _ = self.reconcilers_cancel.cancelled() => return } } @@ -8876,6 +8876,9 @@ impl Service { for (_tenant_id, schedule_context, shards) in TenantShardExclusiveIterator::new(tenants, ScheduleMode::Speculative) { + if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS { + break; + } for shard in shards { if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS { break; @@ -9640,16 +9643,16 @@ impl Service { tenant_shard_id: tid, }; - let dest_node_id = { + let drain_action = { let locked = self.inner.read().unwrap(); + tid_drain.tenant_shard_eligible_for_drain(&locked.tenants, &locked.scheduler) + }; - match tid_drain - .tenant_shard_eligible_for_drain(&locked.tenants, &locked.scheduler) - { - Some(node_id) => node_id, - None => { - continue; - } + let dest_node_id = match drain_action { + TenantShardDrainAction::RescheduleToSecondary(dest_node_id) => dest_node_id, + TenantShardDrainAction::Reconcile(intent_node_id) => intent_node_id, + TenantShardDrainAction::Skip => { + continue; } }; @@ -9684,14 +9687,16 @@ impl Service { { let mut locked = self.inner.write().unwrap(); let (nodes, tenants, scheduler) = locked.parts_mut(); - let rescheduled = tid_drain.reschedule_to_secondary( - dest_node_id, - tenants, - scheduler, - nodes, - )?; - if let Some(tenant_shard) = rescheduled { + let tenant_shard = match drain_action { + TenantShardDrainAction::RescheduleToSecondary(dest_node_id) => tid_drain + .reschedule_to_secondary(dest_node_id, tenants, scheduler, nodes)?, + TenantShardDrainAction::Reconcile(_) => tenants.get_mut(&tid), + // Note: Unreachable, handled above. + TenantShardDrainAction::Skip => None, + }; + + if let Some(tenant_shard) = tenant_shard { let waiter = self.maybe_configured_reconcile_shard( tenant_shard, nodes, diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 3eb54d714d..bf16c642af 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -812,8 +812,6 @@ impl TenantShard { /// if the swap is not possible and leaves the intent state in its original state. /// /// Arguments: - /// `attached_to`: the currently attached location matching the intent state (may be None if the - /// shard is not attached) /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask /// the scheduler to recommend a node pub(crate) fn reschedule_to_secondary( From e2411818ef8ec65ac14a24587927275c3bef9d7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JC=20Gr=C3=BCnhage?= Date: Tue, 29 Jul 2025 14:12:14 +0200 Subject: [PATCH 05/15] Add SBOMs and provenance attestations to container images (#12768) ## Problem Given a container image it is difficult to figure out dependencies and doesn't work automatically. ## Summary of changes - Build all rust binaries with `cargo auditable`, to allow sbom scanners to find it's dependencies. - Adjust `attests` for `docker/build-push-action`, so that buildkit creates sbom and provenance attestations. - Dropping `--locked` for `rustfilt`, because `rustfilt` can't build with locked dependencies[^5] ## Further details Building with `cargo auditable`[^1] embeds a dependency list into Linux, Windows, MacOS and WebAssembly artifacts. A bunch of tools support discovering dependencies from this, among them `syft`[^2], which is used by the BuildKit Syft scanner[^3] plugin. This BuildKit plugin is the default[^4] used in docker for generating sbom attestations, but we're making that default explicit by referencing the container image. [^1]: https://github.com/rust-secure-code/cargo-auditable [^2]: https://github.com/anchore/syft [^3]: https://github.com/docker/buildkit-syft-scanner [^4]: https://docs.docker.com/build/metadata/attestations/sbom/#sbom-generator [^5]: https://github.com/luser/rustfilt/issues/23 --- .github/workflows/build-build-tools-image.yml | 4 +++- .github/workflows/build_and_test.yml | 12 +++++++++--- Dockerfile | 2 +- build-tools/Dockerfile | 19 +++++++++++-------- 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/.github/workflows/build-build-tools-image.yml b/.github/workflows/build-build-tools-image.yml index 24e4c8fa3d..5e53d8231f 100644 --- a/.github/workflows/build-build-tools-image.yml +++ b/.github/workflows/build-build-tools-image.yml @@ -146,7 +146,9 @@ jobs: with: file: build-tools/Dockerfile context: . - provenance: false + attests: | + type=provenance,mode=max + type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1 push: true pull: true build-args: | diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index f237a991cc..0dcbd1c6dd 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -634,7 +634,9 @@ jobs: DEBIAN_VERSION=bookworm secrets: | SUBZERO_ACCESS_TOKEN=${{ secrets.CI_ACCESS_TOKEN }} - provenance: false + attests: | + type=provenance,mode=max + type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1 push: true pull: true file: Dockerfile @@ -747,7 +749,9 @@ jobs: PG_VERSION=${{ matrix.version.pg }} BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }} DEBIAN_VERSION=${{ matrix.version.debian }} - provenance: false + attests: | + type=provenance,mode=max + type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1 push: true pull: true file: compute/compute-node.Dockerfile @@ -766,7 +770,9 @@ jobs: PG_VERSION=${{ matrix.version.pg }} BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }} DEBIAN_VERSION=${{ matrix.version.debian }} - provenance: false + attests: | + type=provenance,mode=max + type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1 push: true pull: true file: compute/compute-node.Dockerfile diff --git a/Dockerfile b/Dockerfile index 654ae72e56..63cc954873 100644 --- a/Dockerfile +++ b/Dockerfile @@ -103,7 +103,7 @@ RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \ && if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \ export CARGO_FEATURES="rest_broker"; \ fi \ - && RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \ + && RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo auditable build \ --features $CARGO_FEATURES \ --bin pg_sni_router \ --bin pageserver \ diff --git a/build-tools/Dockerfile b/build-tools/Dockerfile index 87966591c1..c9760f610b 100644 --- a/build-tools/Dockerfile +++ b/build-tools/Dockerfile @@ -299,6 +299,7 @@ WORKDIR /home/nonroot ENV RUSTC_VERSION=1.88.0 ENV RUSTUP_HOME="/home/nonroot/.rustup" ENV PATH="/home/nonroot/.cargo/bin:${PATH}" +ARG CARGO_AUDITABLE_VERSION=0.7.0 ARG RUSTFILT_VERSION=0.2.1 ARG CARGO_HAKARI_VERSION=0.9.36 ARG CARGO_DENY_VERSION=0.18.2 @@ -314,14 +315,16 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux . "$HOME/.cargo/env" && \ cargo --version && rustup --version && \ rustup component add llvm-tools rustfmt clippy && \ - cargo install rustfilt --locked --version "${RUSTFILT_VERSION}" && \ - cargo install cargo-hakari --locked --version "${CARGO_HAKARI_VERSION}" && \ - cargo install cargo-deny --locked --version "${CARGO_DENY_VERSION}" && \ - cargo install cargo-hack --locked --version "${CARGO_HACK_VERSION}" && \ - cargo install cargo-nextest --locked --version "${CARGO_NEXTEST_VERSION}" && \ - cargo install cargo-chef --locked --version "${CARGO_CHEF_VERSION}" && \ - cargo install diesel_cli --locked --version "${CARGO_DIESEL_CLI_VERSION}" \ - --features postgres-bundled --no-default-features && \ + cargo install cargo-auditable --locked --version "${CARGO_AUDITABLE_VERSION}" && \ + cargo auditable install cargo-auditable --locked --version "${CARGO_AUDITABLE_VERSION}" --force && \ + cargo auditable install rustfilt --version "${RUSTFILT_VERSION}" && \ + cargo auditable install cargo-hakari --locked --version "${CARGO_HAKARI_VERSION}" && \ + cargo auditable install cargo-deny --locked --version "${CARGO_DENY_VERSION}" && \ + cargo auditable install cargo-hack --locked --version "${CARGO_HACK_VERSION}" && \ + cargo auditable install cargo-nextest --locked --version "${CARGO_NEXTEST_VERSION}" && \ + cargo auditable install cargo-chef --locked --version "${CARGO_CHEF_VERSION}" && \ + cargo auditable install diesel_cli --locked --version "${CARGO_DIESEL_CLI_VERSION}" \ + --features postgres-bundled --no-default-features && \ rm -rf /home/nonroot/.cargo/registry && \ rm -rf /home/nonroot/.cargo/git From 61f267d8f9bcb5c86013ecacdd8a588379337352 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 29 Jul 2025 14:33:02 +0200 Subject: [PATCH 06/15] pageserver: only retry `WaitForActiveTimeout` during shard resolution (#12772) ## Problem In https://github.com/neondatabase/neon/pull/12467, timeouts and retries were added to `Cache::get` tenant shard resolution to paper over an issue with read unavailability during shard splits. However, this retries _all_ errors, including irrecoverable errors like `NotFound`. This causes problems with gRPC child shard routing in #12702, which targets specific shards with `ShardSelector::Known` and relies on prompt `NotFound` errors to reroute requests to child shards. These retries introduce a 1s delay for all reads during child routing. The broader problem of read unavailability during shard splits is left as future work, see https://databricks.atlassian.net/browse/LKB-672. Touches #12702. Touches [LKB-191](https://databricks.atlassian.net/browse/LKB-191). ## Summary of changes * Change `TenantManager` to always return a concrete `GetActiveTimelineError`. * Only retry `WaitForActiveTimeout` errors. * Lots of code unindentation due to the simplified error handling. Out of caution, we do not gate the retries on `ShardSelector`, since this can trigger other races. Improvements here are left as future work. --- pageserver/src/page_service.rs | 9 -- pageserver/src/tenant/mgr.rs | 6 + pageserver/src/tenant/timeline/handle.rs | 157 +++++++++++------------ 3 files changed, 82 insertions(+), 90 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index a0998a7598..b3bc42a55e 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -466,13 +466,6 @@ impl TimelineHandles { self.handles .get(timeline_id, shard_selector, &self.wrapper) .await - .map_err(|e| match e { - timeline::handle::GetError::TenantManager(e) => e, - timeline::handle::GetError::PerTimelineStateShutDown => { - trace!("per-timeline state shut down"); - GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) - } - }) } fn tenant_id(&self) -> Option { @@ -488,11 +481,9 @@ pub(crate) struct TenantManagerWrapper { tenant_id: once_cell::sync::OnceCell, } -#[derive(Debug)] pub(crate) struct TenantManagerTypes; impl timeline::handle::Types for TenantManagerTypes { - type TenantManagerError = GetActiveTimelineError; type TenantManager = TenantManagerWrapper; type Timeline = TenantManagerCacheItem; } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index b47bab16d8..4432b4bba8 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1522,6 +1522,12 @@ impl TenantManager { self.resources.deletion_queue_client.flush_advisory(); // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant + // + // TODO: keeping the parent as InProgress while spawning the children causes read + // unavailability, as we can't acquire a timeline handle for it. The parent should be + // available for reads until the children are ready -- potentially until *all* subsplits + // across all parent shards are complete and the compute has been notified. See: + // . drop(tenant); let mut parent_slot_guard = self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; diff --git a/pageserver/src/tenant/timeline/handle.rs b/pageserver/src/tenant/timeline/handle.rs index 3570cab301..537b9ff373 100644 --- a/pageserver/src/tenant/timeline/handle.rs +++ b/pageserver/src/tenant/timeline/handle.rs @@ -224,11 +224,11 @@ use tracing::{instrument, trace}; use utils::id::TimelineId; use utils::shard::{ShardIndex, ShardNumber}; -use crate::tenant::mgr::ShardSelector; +use crate::page_service::GetActiveTimelineError; +use crate::tenant::GetTimelineError; +use crate::tenant::mgr::{GetActiveTenantError, ShardSelector}; -/// The requirement for Debug is so that #[derive(Debug)] works in some places. -pub(crate) trait Types: Sized + std::fmt::Debug { - type TenantManagerError: Sized + std::fmt::Debug; +pub(crate) trait Types: Sized { type TenantManager: TenantManager + Sized; type Timeline: Timeline + Sized; } @@ -307,12 +307,11 @@ impl Default for PerTimelineState { /// Abstract view of [`crate::tenant::mgr`], for testability. pub(crate) trait TenantManager { /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`]. - /// Errors are returned as [`GetError::TenantManager`]. async fn resolve( &self, timeline_id: TimelineId, shard_selector: ShardSelector, - ) -> Result; + ) -> Result; } /// Abstract view of an [`Arc`], for testability. @@ -322,13 +321,6 @@ pub(crate) trait Timeline { fn per_timeline_state(&self) -> &PerTimelineState; } -/// Errors returned by [`Cache::get`]. -#[derive(Debug)] -pub(crate) enum GetError { - TenantManager(T::TenantManagerError), - PerTimelineStateShutDown, -} - /// Internal type used in [`Cache::get`]. enum RoutingResult { FastPath(Handle), @@ -345,7 +337,7 @@ impl Cache { timeline_id: TimelineId, shard_selector: ShardSelector, tenant_manager: &T::TenantManager, - ) -> Result, GetError> { + ) -> Result, GetActiveTimelineError> { const GET_MAX_RETRIES: usize = 10; const RETRY_BACKOFF: Duration = Duration::from_millis(100); let mut attempt = 0; @@ -356,7 +348,11 @@ impl Cache { .await { Ok(handle) => return Ok(handle), - Err(e) => { + Err( + e @ GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout { + .. + }), + ) => { // Retry on tenant manager error to handle tenant split more gracefully if attempt < GET_MAX_RETRIES { tokio::time::sleep(RETRY_BACKOFF).await; @@ -370,6 +366,7 @@ impl Cache { return Err(e); } } + Err(err) => return Err(err), } } } @@ -388,7 +385,7 @@ impl Cache { timeline_id: TimelineId, shard_selector: ShardSelector, tenant_manager: &T::TenantManager, - ) -> Result, GetError> { + ) -> Result, GetActiveTimelineError> { // terminates because when every iteration we remove an element from the map let miss: ShardSelector = loop { let routing_state = self.shard_routing(timeline_id, shard_selector); @@ -468,60 +465,50 @@ impl Cache { timeline_id: TimelineId, shard_selector: ShardSelector, tenant_manager: &T::TenantManager, - ) -> Result, GetError> { - match tenant_manager.resolve(timeline_id, shard_selector).await { - Ok(timeline) => { - let key = timeline.shard_timeline_id(); - match &shard_selector { - ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)), - ShardSelector::Page(_) => (), // gotta trust tenant_manager - ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index), - } - - trace!("creating new HandleInner"); - let timeline = Arc::new(timeline); - let handle_inner_arc = - Arc::new(Mutex::new(HandleInner::Open(Arc::clone(&timeline)))); - let handle_weak = WeakHandle { - inner: Arc::downgrade(&handle_inner_arc), - }; - let handle = handle_weak - .upgrade() - .ok() - .expect("we just created it and it's not linked anywhere yet"); - { - let mut lock_guard = timeline - .per_timeline_state() - .handles - .lock() - .expect("mutex poisoned"); - match &mut *lock_guard { - Some(per_timeline_state) => { - let replaced = - per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc)); - assert!(replaced.is_none(), "some earlier code left a stale handle"); - match self.map.entry(key) { - hash_map::Entry::Occupied(_o) => { - // This cannot not happen because - // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and - // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle - // while we were waiting for the tenant manager. - unreachable!() - } - hash_map::Entry::Vacant(v) => { - v.insert(handle_weak); - } - } - } - None => { - return Err(GetError::PerTimelineStateShutDown); - } - } - } - Ok(handle) - } - Err(e) => Err(GetError::TenantManager(e)), + ) -> Result, GetActiveTimelineError> { + let timeline = tenant_manager.resolve(timeline_id, shard_selector).await?; + let key = timeline.shard_timeline_id(); + match &shard_selector { + ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)), + ShardSelector::Page(_) => (), // gotta trust tenant_manager + ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index), } + + trace!("creating new HandleInner"); + let timeline = Arc::new(timeline); + let handle_inner_arc = Arc::new(Mutex::new(HandleInner::Open(Arc::clone(&timeline)))); + let handle_weak = WeakHandle { + inner: Arc::downgrade(&handle_inner_arc), + }; + let handle = handle_weak + .upgrade() + .ok() + .expect("we just created it and it's not linked anywhere yet"); + let mut lock_guard = timeline + .per_timeline_state() + .handles + .lock() + .expect("mutex poisoned"); + let Some(per_timeline_state) = &mut *lock_guard else { + return Err(GetActiveTimelineError::Timeline( + GetTimelineError::ShuttingDown, + )); + }; + let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc)); + assert!(replaced.is_none(), "some earlier code left a stale handle"); + match self.map.entry(key) { + hash_map::Entry::Occupied(_o) => { + // This cannot not happen because + // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and + // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle + // while we were waiting for the tenant manager. + unreachable!() + } + hash_map::Entry::Vacant(v) => { + v.insert(handle_weak); + } + } + Ok(handle) } } @@ -655,7 +642,8 @@ mod tests { use pageserver_api::models::ShardParameters; use pageserver_api::reltag::RelTag; use pageserver_api::shard::DEFAULT_STRIPE_SIZE; - use utils::shard::ShardCount; + use utils::id::TenantId; + use utils::shard::{ShardCount, TenantShardId}; use utils::sync::gate::GateGuard; use super::*; @@ -665,7 +653,6 @@ mod tests { #[derive(Debug)] struct TestTypes; impl Types for TestTypes { - type TenantManagerError = anyhow::Error; type TenantManager = StubManager; type Timeline = Entered; } @@ -716,40 +703,48 @@ mod tests { &self, timeline_id: TimelineId, shard_selector: ShardSelector, - ) -> anyhow::Result { + ) -> Result { + fn enter_gate( + timeline: &StubTimeline, + ) -> Result, GetActiveTimelineError> { + Ok(Arc::new(timeline.gate.enter().map_err(|_| { + GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) + })?)) + } + for timeline in &self.shards { if timeline.id == timeline_id { - let enter_gate = || { - let gate_guard = timeline.gate.enter()?; - let gate_guard = Arc::new(gate_guard); - anyhow::Ok(gate_guard) - }; match &shard_selector { ShardSelector::Zero if timeline.shard.is_shard_zero() => { return Ok(Entered { timeline: Arc::clone(timeline), - gate_guard: enter_gate()?, + gate_guard: enter_gate(timeline)?, }); } ShardSelector::Zero => continue, ShardSelector::Page(key) if timeline.shard.is_key_local(key) => { return Ok(Entered { timeline: Arc::clone(timeline), - gate_guard: enter_gate()?, + gate_guard: enter_gate(timeline)?, }); } ShardSelector::Page(_) => continue, ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => { return Ok(Entered { timeline: Arc::clone(timeline), - gate_guard: enter_gate()?, + gate_guard: enter_gate(timeline)?, }); } ShardSelector::Known(_) => continue, } } } - anyhow::bail!("not found") + Err(GetActiveTimelineError::Timeline( + GetTimelineError::NotFound { + tenant_id: TenantShardId::unsharded(TenantId::from([0; 16])), + timeline_id, + }, + )) } } From 5e3cb2ab070c77d8f836fa0f0008f8dd08f25875 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 29 Jul 2025 16:12:44 +0300 Subject: [PATCH 07/15] Refactor LFC stats functions (#12696) Split the functions into two parts: an internal function in file_cache.c which returns an array of structs representing the result set, and another function in neon.c with the glue code to expose it as a SQL function. This is in preparation for the new communicator, which needs to implement the same SQL functions, but getting the information from a different place. In the glue code, use the more modern Postgres way of building a result set using a tuplestore. --- pgxn/neon/file_cache.c | 386 +++++++++++------------------------------ pgxn/neon/file_cache.h | 20 +++ pgxn/neon/neon.c | 74 ++++++++ 3 files changed, 194 insertions(+), 286 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 88086689c8..3c680eab86 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -1832,125 +1832,46 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, LWLockRelease(lfc_lock); } -typedef struct +/* + * Return metrics about the LFC. + * + * The return format is a palloc'd array of LfcStatsEntrys. The size + * of the returned array is returned in *num_entries. + */ +LfcStatsEntry * +lfc_get_stats(size_t *num_entries) { - TupleDesc tupdesc; -} NeonGetStatsCtx; + LfcStatsEntry *entries; + size_t n = 0; -#define NUM_NEON_GET_STATS_COLS 2 +#define MAX_ENTRIES 10 + entries = palloc(sizeof(LfcStatsEntry) * MAX_ENTRIES); -PG_FUNCTION_INFO_V1(neon_get_lfc_stats); -Datum -neon_get_lfc_stats(PG_FUNCTION_ARGS) -{ - FuncCallContext *funcctx; - NeonGetStatsCtx *fctx; - MemoryContext oldcontext; - TupleDesc tupledesc; - Datum result; - HeapTuple tuple; - char const *key; - uint64 value = 0; - Datum values[NUM_NEON_GET_STATS_COLS]; - bool nulls[NUM_NEON_GET_STATS_COLS]; + entries[n++] = (LfcStatsEntry) {"file_cache_chunk_size_pages", lfc_ctl == NULL, + lfc_ctl ? lfc_blocks_per_chunk : 0 }; + entries[n++] = (LfcStatsEntry) {"file_cache_misses", lfc_ctl == NULL, + lfc_ctl ? lfc_ctl->misses : 0}; + entries[n++] = (LfcStatsEntry) {"file_cache_hits", lfc_ctl == NULL, + lfc_ctl ? lfc_ctl->hits : 0 }; + entries[n++] = (LfcStatsEntry) {"file_cache_used", lfc_ctl == NULL, + lfc_ctl ? lfc_ctl->used : 0 }; + entries[n++] = (LfcStatsEntry) {"file_cache_writes", lfc_ctl == NULL, + lfc_ctl ? lfc_ctl->writes : 0 }; + entries[n++] = (LfcStatsEntry) {"file_cache_size", lfc_ctl == NULL, + lfc_ctl ? lfc_ctl->size : 0 }; + entries[n++] = (LfcStatsEntry) {"file_cache_used_pages", lfc_ctl == NULL, + lfc_ctl ? lfc_ctl->used_pages : 0 }; + entries[n++] = (LfcStatsEntry) {"file_cache_evicted_pages", lfc_ctl == NULL, + lfc_ctl ? lfc_ctl->evicted_pages : 0 }; + entries[n++] = (LfcStatsEntry) {"file_cache_limit", lfc_ctl == NULL, + lfc_ctl ? lfc_ctl->limit : 0 }; + entries[n++] = (LfcStatsEntry) {"file_cache_chunks_pinned", lfc_ctl == NULL, + lfc_ctl ? lfc_ctl->pinned : 0 }; + Assert(n <= MAX_ENTRIES); +#undef MAX_ENTRIES - if (SRF_IS_FIRSTCALL()) - { - funcctx = SRF_FIRSTCALL_INIT(); - - /* Switch context when allocating stuff to be used in later calls */ - oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - - /* Create a user function context for cross-call persistence */ - fctx = (NeonGetStatsCtx *) palloc(sizeof(NeonGetStatsCtx)); - - /* Construct a tuple descriptor for the result rows. */ - tupledesc = CreateTemplateTupleDesc(NUM_NEON_GET_STATS_COLS); - - TupleDescInitEntry(tupledesc, (AttrNumber) 1, "lfc_key", - TEXTOID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 2, "lfc_value", - INT8OID, -1, 0); - - fctx->tupdesc = BlessTupleDesc(tupledesc); - funcctx->user_fctx = fctx; - - /* Return to original context when allocating transient memory */ - MemoryContextSwitchTo(oldcontext); - } - - funcctx = SRF_PERCALL_SETUP(); - - /* Get the saved state */ - fctx = (NeonGetStatsCtx *) funcctx->user_fctx; - - switch (funcctx->call_cntr) - { - case 0: - key = "file_cache_misses"; - if (lfc_ctl) - value = lfc_ctl->misses; - break; - case 1: - key = "file_cache_hits"; - if (lfc_ctl) - value = lfc_ctl->hits; - break; - case 2: - key = "file_cache_used"; - if (lfc_ctl) - value = lfc_ctl->used; - break; - case 3: - key = "file_cache_writes"; - if (lfc_ctl) - value = lfc_ctl->writes; - break; - case 4: - key = "file_cache_size"; - if (lfc_ctl) - value = lfc_ctl->size; - break; - case 5: - key = "file_cache_used_pages"; - if (lfc_ctl) - value = lfc_ctl->used_pages; - break; - case 6: - key = "file_cache_evicted_pages"; - if (lfc_ctl) - value = lfc_ctl->evicted_pages; - break; - case 7: - key = "file_cache_limit"; - if (lfc_ctl) - value = lfc_ctl->limit; - break; - case 8: - key = "file_cache_chunk_size_pages"; - value = lfc_blocks_per_chunk; - break; - case 9: - key = "file_cache_chunks_pinned"; - if (lfc_ctl) - value = lfc_ctl->pinned; - break; - default: - SRF_RETURN_DONE(funcctx); - } - values[0] = PointerGetDatum(cstring_to_text(key)); - nulls[0] = false; - if (lfc_ctl) - { - nulls[1] = false; - values[1] = Int64GetDatum(value); - } - else - nulls[1] = true; - - tuple = heap_form_tuple(fctx->tupdesc, values, nulls); - result = HeapTupleGetDatum(tuple); - SRF_RETURN_NEXT(funcctx, result); + *num_entries = n; + return entries; } @@ -1958,193 +1879,86 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS) * Function returning data from the local file cache * relation node/tablespace/database/blocknum and access_counter */ -PG_FUNCTION_INFO_V1(local_cache_pages); - -/* - * Record structure holding the to be exposed cache data. - */ -typedef struct +LocalCachePagesRec * +lfc_local_cache_pages(size_t *num_entries) { - uint32 pageoffs; - Oid relfilenode; - Oid reltablespace; - Oid reldatabase; - ForkNumber forknum; - BlockNumber blocknum; - uint16 accesscount; -} LocalCachePagesRec; + HASH_SEQ_STATUS status; + FileCacheEntry *entry; + size_t n_pages; + size_t n; + LocalCachePagesRec *result; -/* - * Function context for data persisting over repeated calls. - */ -typedef struct -{ - TupleDesc tupdesc; - LocalCachePagesRec *record; -} LocalCachePagesContext; - - -#define NUM_LOCALCACHE_PAGES_ELEM 7 - -Datum -local_cache_pages(PG_FUNCTION_ARGS) -{ - FuncCallContext *funcctx; - Datum result; - MemoryContext oldcontext; - LocalCachePagesContext *fctx; /* User function context. */ - TupleDesc tupledesc; - TupleDesc expected_tupledesc; - HeapTuple tuple; - - if (SRF_IS_FIRSTCALL()) + if (!lfc_ctl) { - HASH_SEQ_STATUS status; - FileCacheEntry *entry; - uint32 n_pages = 0; + *num_entries = 0; + return NULL; + } - funcctx = SRF_FIRSTCALL_INIT(); + LWLockAcquire(lfc_lock, LW_SHARED); + if (!LFC_ENABLED()) + { + LWLockRelease(lfc_lock); + *num_entries = 0; + return NULL; + } - /* Switch context when allocating stuff to be used in later calls */ - oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - - /* Create a user function context for cross-call persistence */ - fctx = (LocalCachePagesContext *) palloc(sizeof(LocalCachePagesContext)); - - /* - * To smoothly support upgrades from version 1.0 of this extension - * transparently handle the (non-)existence of the pinning_backends - * column. We unfortunately have to get the result type for that... - - * we can't use the result type determined by the function definition - * without potentially crashing when somebody uses the old (or even - * wrong) function definition though. - */ - if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE) - neon_log(ERROR, "return type must be a row type"); - - if (expected_tupledesc->natts != NUM_LOCALCACHE_PAGES_ELEM) - neon_log(ERROR, "incorrect number of output arguments"); - - /* Construct a tuple descriptor for the result rows. */ - tupledesc = CreateTemplateTupleDesc(expected_tupledesc->natts); - TupleDescInitEntry(tupledesc, (AttrNumber) 1, "pageoffs", - INT8OID, -1, 0); -#if PG_MAJORVERSION_NUM < 16 - TupleDescInitEntry(tupledesc, (AttrNumber) 2, "relfilenode", - OIDOID, -1, 0); -#else - TupleDescInitEntry(tupledesc, (AttrNumber) 2, "relfilenumber", - OIDOID, -1, 0); -#endif - TupleDescInitEntry(tupledesc, (AttrNumber) 3, "reltablespace", - OIDOID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 4, "reldatabase", - OIDOID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 5, "relforknumber", - INT2OID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 6, "relblocknumber", - INT8OID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 7, "accesscount", - INT4OID, -1, 0); - - fctx->tupdesc = BlessTupleDesc(tupledesc); - - if (lfc_ctl) + /* Count the pages first */ + n_pages = 0; + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) + { + /* Skip hole tags */ + if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0) { - LWLockAcquire(lfc_lock, LW_SHARED); + for (int i = 0; i < lfc_blocks_per_chunk; i++) + n_pages += GET_STATE(entry, i) == AVAILABLE; + } + } - if (LFC_ENABLED()) + if (n_pages == 0) + { + LWLockRelease(lfc_lock); + *num_entries = 0; + return NULL; + } + + result = (LocalCachePagesRec *) + MemoryContextAllocHuge(CurrentMemoryContext, + sizeof(LocalCachePagesRec) * n_pages); + + /* + * Scan through all the cache entries, saving the relevant fields + * in the result structure. + */ + n = 0; + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) + { + for (int i = 0; i < lfc_blocks_per_chunk; i++) + { + if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0) { - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) + if (GET_STATE(entry, i) == AVAILABLE) { - /* Skip hole tags */ - if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0) - { - for (int i = 0; i < lfc_blocks_per_chunk; i++) - n_pages += GET_STATE(entry, i) == AVAILABLE; - } + result[n].pageoffs = entry->offset * lfc_blocks_per_chunk + i; + result[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); + result[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key)); + result[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key)); + result[n].forknum = entry->key.forkNum; + result[n].blocknum = entry->key.blockNum + i; + result[n].accesscount = entry->access_count; + n += 1; } } } - fctx->record = (LocalCachePagesRec *) - MemoryContextAllocHuge(CurrentMemoryContext, - sizeof(LocalCachePagesRec) * n_pages); - - /* Set max calls and remember the user function context. */ - funcctx->max_calls = n_pages; - funcctx->user_fctx = fctx; - - /* Return to original context when allocating transient memory */ - MemoryContextSwitchTo(oldcontext); - - if (n_pages != 0) - { - /* - * Scan through all the cache entries, saving the relevant fields - * in the fctx->record structure. - */ - uint32 n = 0; - - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) - { - for (int i = 0; i < lfc_blocks_per_chunk; i++) - { - if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0) - { - if (GET_STATE(entry, i) == AVAILABLE) - { - fctx->record[n].pageoffs = entry->offset * lfc_blocks_per_chunk + i; - fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n].forknum = entry->key.forkNum; - fctx->record[n].blocknum = entry->key.blockNum + i; - fctx->record[n].accesscount = entry->access_count; - n += 1; - } - } - } - } - Assert(n_pages == n); - } - if (lfc_ctl) - LWLockRelease(lfc_lock); } + Assert(n_pages == n); + LWLockRelease(lfc_lock); - funcctx = SRF_PERCALL_SETUP(); - - /* Get the saved state */ - fctx = funcctx->user_fctx; - - if (funcctx->call_cntr < funcctx->max_calls) - { - uint32 i = funcctx->call_cntr; - Datum values[NUM_LOCALCACHE_PAGES_ELEM]; - bool nulls[NUM_LOCALCACHE_PAGES_ELEM] = { - false, false, false, false, false, false, false - }; - - values[0] = Int64GetDatum((int64) fctx->record[i].pageoffs); - values[1] = ObjectIdGetDatum(fctx->record[i].relfilenode); - values[2] = ObjectIdGetDatum(fctx->record[i].reltablespace); - values[3] = ObjectIdGetDatum(fctx->record[i].reldatabase); - values[4] = ObjectIdGetDatum(fctx->record[i].forknum); - values[5] = Int64GetDatum((int64) fctx->record[i].blocknum); - values[6] = Int32GetDatum(fctx->record[i].accesscount); - - /* Build and return the tuple. */ - tuple = heap_form_tuple(fctx->tupdesc, values, nulls); - result = HeapTupleGetDatum(tuple); - - SRF_RETURN_NEXT(funcctx, result); - } - else - SRF_RETURN_DONE(funcctx); + *num_entries = n_pages; + return result; } - /* * Internal implementation of the approximate_working_set_size_seconds() * function. diff --git a/pgxn/neon/file_cache.h b/pgxn/neon/file_cache.h index 14e5d4f753..4145327942 100644 --- a/pgxn/neon/file_cache.h +++ b/pgxn/neon/file_cache.h @@ -47,6 +47,26 @@ extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blk extern FileCacheState* lfc_get_state(size_t max_entries); extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers); +typedef struct LfcStatsEntry +{ + const char *metric_name; + bool isnull; + uint64 value; +} LfcStatsEntry; +extern LfcStatsEntry *lfc_get_stats(size_t *num_entries); + +typedef struct +{ + uint32 pageoffs; + Oid relfilenode; + Oid reltablespace; + Oid reldatabase; + ForkNumber forknum; + BlockNumber blocknum; + uint16 accesscount; +} LocalCachePagesRec; +extern LocalCachePagesRec *lfc_local_cache_pages(size_t *num_entries); + extern int32 lfc_approximate_working_set_size_seconds(time_t duration, bool reset); diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 6cd21cce39..07de696d7b 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -625,11 +625,15 @@ _PG_init(void) ExecutorEnd_hook = neon_ExecutorEnd; } +/* Various functions exposed at SQL level */ + PG_FUNCTION_INFO_V1(pg_cluster_size); PG_FUNCTION_INFO_V1(backpressure_lsns); PG_FUNCTION_INFO_V1(backpressure_throttling_time); PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds); PG_FUNCTION_INFO_V1(approximate_working_set_size); +PG_FUNCTION_INFO_V1(neon_get_lfc_stats); +PG_FUNCTION_INFO_V1(local_cache_pages); Datum pg_cluster_size(PG_FUNCTION_ARGS) @@ -704,6 +708,76 @@ approximate_working_set_size(PG_FUNCTION_ARGS) PG_RETURN_INT32(dc); } +Datum +neon_get_lfc_stats(PG_FUNCTION_ARGS) +{ +#define NUM_NEON_GET_STATS_COLS 2 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + LfcStatsEntry *entries; + size_t num_entries; + + InitMaterializedSRF(fcinfo, 0); + + /* lfc_get_stats() does all the heavy lifting */ + entries = lfc_get_stats(&num_entries); + + /* Convert the LfcStatsEntrys to a result set */ + for (size_t i = 0; i < num_entries; i++) + { + LfcStatsEntry *entry = &entries[i]; + Datum values[NUM_NEON_GET_STATS_COLS]; + bool nulls[NUM_NEON_GET_STATS_COLS]; + + values[0] = CStringGetTextDatum(entry->metric_name); + nulls[0] = false; + values[1] = Int64GetDatum(entry->isnull ? 0 : entry->value); + nulls[1] = entry->isnull; + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + } + PG_RETURN_VOID(); + +#undef NUM_NEON_GET_STATS_COLS +} + +Datum +local_cache_pages(PG_FUNCTION_ARGS) +{ +#define NUM_LOCALCACHE_PAGES_COLS 7 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + LocalCachePagesRec *entries; + size_t num_entries; + + InitMaterializedSRF(fcinfo, 0); + + /* lfc_local_cache_pages() does all the heavy lifting */ + entries = lfc_local_cache_pages(&num_entries); + + /* Convert the LocalCachePagesRec structs to a result set */ + for (size_t i = 0; i < num_entries; i++) + { + LocalCachePagesRec *entry = &entries[i]; + Datum values[NUM_LOCALCACHE_PAGES_COLS]; + bool nulls[NUM_LOCALCACHE_PAGES_COLS] = { + false, false, false, false, false, false, false + }; + + values[0] = Int64GetDatum((int64) entry->pageoffs); + values[1] = ObjectIdGetDatum(entry->relfilenode); + values[2] = ObjectIdGetDatum(entry->reltablespace); + values[3] = ObjectIdGetDatum(entry->reldatabase); + values[4] = ObjectIdGetDatum(entry->forknum); + values[5] = Int64GetDatum((int64) entry->blocknum); + values[6] = Int32GetDatum(entry->accesscount); + + /* Build and return the tuple. */ + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + } + + PG_RETURN_VOID(); + +#undef NUM_LOCALCACHE_PAGES_COLS +} + /* * Initialization stage 2: make requests for the amount of shared memory we * will need. From dbde37c53aadf66d58b55c1267001a64f720fdac Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 29 Jul 2025 10:20:43 -0400 Subject: [PATCH 08/15] fix(safekeeper): retry if open segment fail (#12757) ## Problem Fix LKB-2632. The safekeeper wal read path does not seem to retry at all. This would cause client read errors on the customer side. ## Summary of changes - Retry on `safekeeper::wal_backup::read_object`. - Note that this only retries on S3 HTTP connection errors. Subsequent reads could fail, and that needs more refactors to make the retry mechanism work across the path. --------- Signed-off-by: Alex Chi Z --- safekeeper/src/wal_backup.rs | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 03c8f7e84a..191f8aacf1 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -12,7 +12,7 @@ use futures::stream::{self, FuturesOrdered}; use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr; use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo}; use remote_storage::{ - DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata, + DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata, }; use safekeeper_api::models::PeerInfo; use tokio::fs::File; @@ -607,6 +607,9 @@ pub(crate) async fn copy_partial_segment( storage.copy_object(source, destination, &cancel).await } +const WAL_READ_WARN_THRESHOLD: u32 = 2; +const WAL_READ_MAX_RETRIES: u32 = 3; + pub async fn read_object( storage: &GenericRemoteStorage, file_path: &RemotePath, @@ -620,12 +623,23 @@ pub async fn read_object( byte_start: std::ops::Bound::Included(offset), ..Default::default() }; - let download = storage - .download(file_path, &opts, &cancel) - .await - .with_context(|| { - format!("Failed to open WAL segment download stream for remote path {file_path:?}") - })?; + + // This retry only solves the connect errors: subsequent reads can still fail as this function returns + // a stream. + let download = backoff::retry( + || async { storage.download(file_path, &opts, &cancel).await }, + DownloadError::is_permanent, + WAL_READ_WARN_THRESHOLD, + WAL_READ_MAX_RETRIES, + "download WAL segment", + &cancel, + ) + .await + .ok_or_else(|| DownloadError::Cancelled) + .and_then(|x| x) + .with_context(|| { + format!("Failed to open WAL segment download stream for remote path {file_path:?}") + })?; let reader = tokio_util::io::StreamReader::new(download.download_stream); From 1bb434ab747ae500ef04e9aa671acb4f9eb9d956 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 29 Jul 2025 10:23:42 -0400 Subject: [PATCH 09/15] fix(test): test_readonly_node_gc compute needs time to acquire lease (#12747) ## Problem Part of LKB-2368. Compute fails to obtain LSN lease in this test case. There're many assumptions around how compute obtains the leases, and in this particular test case, as the LSN lease length is only 8s (which is shorter than the amount of time where pageserver can restart and compute can reconnect in terms of force stop), it sometimes cause issues. ## Summary of changes Add more sleeps around the test case to ensure it's stable at least. We need to find a more reliable way to test this in the future. --------- Signed-off-by: Alex Chi Z --- test_runner/regress/test_readonly_node.py | 28 +++++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index 5612236250..e151b0ba13 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -129,7 +129,10 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): Test static endpoint is protected from GC by acquiring and renewing lsn leases. """ - LSN_LEASE_LENGTH = 8 + LSN_LEASE_LENGTH = ( + 14 # This value needs to be large enough for compute_ctl to send two lease requests. + ) + neon_env_builder.num_pageservers = 2 # GC is manual triggered. env = neon_env_builder.init_start( @@ -230,6 +233,15 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): log.info(f"`SELECT` query succeed after GC, {ctx=}") return offset + # It's not reliable to let the compute renew the lease in this test case as we have a very tight + # lease timeout. Therefore, the test case itself will renew the lease. + # + # This is a workaround to make the test case more deterministic. + def renew_lease(env: NeonEnv, lease_lsn: Lsn): + env.storage_controller.pageserver_api().timeline_lsn_lease( + env.initial_tenant, env.initial_timeline, lease_lsn + ) + # Insert some records on main branch with env.endpoints.create_start("main", config_lines=["shared_buffers=1MB"]) as ep_main: with ep_main.cursor() as cur: @@ -242,6 +254,9 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): XLOG_BLCKSZ = 8192 lsn = Lsn((int(lsn) // XLOG_BLCKSZ) * XLOG_BLCKSZ) + # We need to mock the way cplane works: it gets a lease for a branch before starting the compute. + renew_lease(env, lsn) + with env.endpoints.create_start( branch_name="main", endpoint_id="static", @@ -251,9 +266,6 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): cur.execute("SELECT count(*) FROM t0") assert cur.fetchone() == (ROW_COUNT,) - # Wait for static compute to renew lease at least once. - time.sleep(LSN_LEASE_LENGTH / 2) - generate_updates_on_main(env, ep_main, 3, end=100) offset = trigger_gc_and_select( @@ -263,10 +275,10 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): # Trigger Pageserver restarts for ps in env.pageservers: ps.stop() - # Static compute should have at least one lease request failure due to connection. - time.sleep(LSN_LEASE_LENGTH / 2) ps.start() + renew_lease(env, lsn) + trigger_gc_and_select( env, ep_static, @@ -282,6 +294,9 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): ) env.storage_controller.reconcile_until_idle() + # Wait for static compute to renew lease on the new pageserver. + time.sleep(LSN_LEASE_LENGTH + 3) + trigger_gc_and_select( env, ep_static, @@ -292,7 +307,6 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): # Do some update so we can increment gc_cutoff generate_updates_on_main(env, ep_main, i, end=100) - # Wait for the existing lease to expire. time.sleep(LSN_LEASE_LENGTH + 1) # Now trigger GC again, layers should be removed. From 62d844e657b99dc5374b42333cb5413160bd7bec Mon Sep 17 00:00:00 2001 From: HaoyuHuang Date: Tue, 29 Jul 2025 08:22:04 -0700 Subject: [PATCH 10/15] Add changes in spec apply (#12759) ## Problem All changes are no-op. ## Summary of changes --- compute_tools/src/compute.rs | 63 +++++- compute_tools/src/spec_apply.rs | 205 +++++++++++++++++- .../alter_databricks_reader_roles_timeout.sql | 25 +++ .../src/sql/create_databricks_misc.sql | 15 ++ 4 files changed, 296 insertions(+), 12 deletions(-) create mode 100644 compute_tools/src/sql/alter_databricks_reader_roles_timeout.sql create mode 100644 compute_tools/src/sql/create_databricks_misc.sql diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index f53adbb1df..8350a4c059 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -6,7 +6,8 @@ use compute_api::responses::{ LfcPrewarmState, PromoteState, TlsConfig, }; use compute_api::spec::{ - ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent, + ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, GenericOption, + PageserverProtocol, PgIdent, Role, }; use futures::StreamExt; use futures::future::join_all; @@ -413,6 +414,66 @@ struct StartVmMonitorResult { vm_monitor: Option>>, } +// BEGIN_HADRON +/// This function creates roles that are used by Databricks. +/// These roles are not needs to be botostrapped at PG Compute provisioning time. +/// The auth method for these roles are configured in databricks_pg_hba.conf in universe repository. +pub(crate) fn create_databricks_roles() -> Vec { + let roles = vec![ + // Role for prometheus_stats_exporter + Role { + name: "databricks_monitor".to_string(), + // This uses "local" connection and auth method for that is "trust", so no password is needed. + encrypted_password: None, + options: Some(vec![GenericOption { + name: "IN ROLE pg_monitor".to_string(), + value: None, + vartype: "string".to_string(), + }]), + }, + // Role for brickstore control plane + Role { + name: "databricks_control_plane".to_string(), + // Certificate user does not need password. + encrypted_password: None, + options: Some(vec![GenericOption { + name: "SUPERUSER".to_string(), + value: None, + vartype: "string".to_string(), + }]), + }, + // Role for brickstore httpgateway. + Role { + name: "databricks_gateway".to_string(), + // Certificate user does not need password. + encrypted_password: None, + options: None, + }, + ]; + + roles + .into_iter() + .map(|role| { + let query = format!( + r#" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT FROM pg_catalog.pg_roles WHERE rolname = '{}') + THEN + CREATE ROLE {} {}; + END IF; + END + $$;"#, + role.name, + role.name.pg_quote(), + role.to_pg_options(), + ); + query + }) + .collect() +} + /// Databricks-specific environment variables to be passed to the `postgres` sub-process. pub struct DatabricksEnvVars { /// The Databricks "endpoint ID" of the compute instance. Used by `postgres` to check diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs index 47bf61ae1b..2356078703 100644 --- a/compute_tools/src/spec_apply.rs +++ b/compute_tools/src/spec_apply.rs @@ -13,17 +13,19 @@ use tokio_postgres::Client; use tokio_postgres::error::SqlState; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; -use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState}; +use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState, create_databricks_roles}; +use crate::hadron_metrics::COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS; use crate::pg_helpers::{ DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, get_existing_dbs_async, get_existing_roles_async, }; use crate::spec_apply::ApplySpecPhase::{ - CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreatePgauditExtension, + AddDatabricksGrants, AlterDatabricksRoles, CreateAndAlterDatabases, CreateAndAlterRoles, + CreateAvailabilityCheck, CreateDatabricksMisc, CreateDatabricksRoles, CreatePgauditExtension, CreatePgauditlogtofileExtension, CreatePrivilegedRole, CreateSchemaNeon, DisablePostgresDBPgAudit, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, - HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, - RunInEachDatabase, + HandleDatabricksAuthExtension, HandleNeonExtension, HandleOtherExtensions, + RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, }; use crate::spec_apply::PerDatabasePhase::{ ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, @@ -166,6 +168,7 @@ impl ComputeNode { concurrency_token.clone(), db, [DropLogicalSubscriptions].to_vec(), + self.params.lakebase_mode, ); Ok(tokio::spawn(fut)) @@ -186,15 +189,33 @@ impl ComputeNode { }; } - for phase in [ - CreatePrivilegedRole, + let phases = if self.params.lakebase_mode { + vec![ + CreatePrivilegedRole, + // BEGIN_HADRON + CreateDatabricksRoles, + AlterDatabricksRoles, + // END_HADRON DropInvalidDatabases, RenameRoles, CreateAndAlterRoles, RenameAndDeleteDatabases, CreateAndAlterDatabases, CreateSchemaNeon, - ] { + ] + } else { + vec![ + CreatePrivilegedRole, + DropInvalidDatabases, + RenameRoles, + CreateAndAlterRoles, + RenameAndDeleteDatabases, + CreateAndAlterDatabases, + CreateSchemaNeon, + ] + }; + + for phase in phases { info!("Applying phase {:?}", &phase); apply_operations( params.clone(), @@ -203,6 +224,7 @@ impl ComputeNode { jwks_roles.clone(), phase, || async { Ok(&client) }, + self.params.lakebase_mode, ) .await?; } @@ -254,6 +276,7 @@ impl ComputeNode { concurrency_token.clone(), db, phases, + self.params.lakebase_mode, ); Ok(tokio::spawn(fut)) @@ -265,12 +288,28 @@ impl ComputeNode { handle.await??; } - let mut phases = vec![ + let mut phases = if self.params.lakebase_mode { + vec![ + HandleOtherExtensions, + HandleNeonExtension, // This step depends on CreateSchemaNeon + // BEGIN_HADRON + HandleDatabricksAuthExtension, + // END_HADRON + CreateAvailabilityCheck, + DropRoles, + // BEGIN_HADRON + AddDatabricksGrants, + CreateDatabricksMisc, + // END_HADRON + ] + } else { + vec![ HandleOtherExtensions, HandleNeonExtension, // This step depends on CreateSchemaNeon CreateAvailabilityCheck, DropRoles, - ]; + ] + }; // This step depends on CreateSchemaNeon if spec.drop_subscriptions_before_start && !drop_subscriptions_done { @@ -303,6 +342,7 @@ impl ComputeNode { jwks_roles.clone(), phase, || async { Ok(&client) }, + self.params.lakebase_mode, ) .await?; } @@ -328,6 +368,7 @@ impl ComputeNode { concurrency_token: Arc, db: DB, subphases: Vec, + lakebase_mode: bool, ) -> Result<()> { let _permit = concurrency_token.acquire().await?; @@ -355,6 +396,7 @@ impl ComputeNode { let client = client_conn.as_ref().unwrap(); Ok(client) }, + lakebase_mode, ) .await?; } @@ -477,6 +519,10 @@ pub enum PerDatabasePhase { #[derive(Clone, Debug)] pub enum ApplySpecPhase { CreatePrivilegedRole, + // BEGIN_HADRON + CreateDatabricksRoles, + AlterDatabricksRoles, + // END_HADRON DropInvalidDatabases, RenameRoles, CreateAndAlterRoles, @@ -489,7 +535,14 @@ pub enum ApplySpecPhase { DisablePostgresDBPgAudit, HandleOtherExtensions, HandleNeonExtension, + // BEGIN_HADRON + HandleDatabricksAuthExtension, + // END_HADRON CreateAvailabilityCheck, + // BEGIN_HADRON + AddDatabricksGrants, + CreateDatabricksMisc, + // END_HADRON DropRoles, FinalizeDropLogicalSubscriptions, } @@ -525,6 +578,7 @@ pub async fn apply_operations<'a, Fut, F>( jwks_roles: Arc>, apply_spec_phase: ApplySpecPhase, client: F, + lakebase_mode: bool, ) -> Result<()> where F: FnOnce() -> Fut, @@ -571,6 +625,23 @@ where }, query ); + if !lakebase_mode { + return res; + } + // BEGIN HADRON + if let Err(e) = res.as_ref() { + if let Some(sql_state) = e.code() { + if sql_state.code() == "57014" { + // SQL State 57014 (ERRCODE_QUERY_CANCELED) is used for statement timeouts. + // Increment the counter whenever a statement timeout occurs. Timeouts on + // this configuration path can only occur due to PS connectivity problems that + // Postgres failed to recover from. + COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.inc(); + } + } + } + // END HADRON + res } .instrument(inspan) @@ -612,6 +683,35 @@ async fn get_operations<'a>( ), comment: None, }))), + // BEGIN_HADRON + // New Hadron phase + ApplySpecPhase::CreateDatabricksRoles => { + let queries = create_databricks_roles(); + let operations = queries.into_iter().map(|query| Operation { + query, + comment: None, + }); + Ok(Box::new(operations)) + } + + // Backfill existing databricks_reader_* roles with statement timeout from GUC + ApplySpecPhase::AlterDatabricksRoles => { + let query = String::from(include_str!( + "sql/alter_databricks_reader_roles_timeout.sql" + )); + + let operations = once(Operation { + query, + comment: Some( + "Backfill existing databricks_reader_* roles with statement timeout" + .to_string(), + ), + }); + + Ok(Box::new(operations)) + } + // End of new Hadron Phase + // END_HADRON ApplySpecPhase::DropInvalidDatabases => { let mut ctx = ctx.write().await; let databases = &mut ctx.dbs; @@ -981,7 +1081,10 @@ async fn get_operations<'a>( // N.B. this has to be properly dollar-escaped with `pg_quote_dollar()` role_name = escaped_role, outer_tag = outer_tag, - ), + ) + // HADRON change: + .replace("neon_superuser", ¶ms.privileged_role_name), + // HADRON change end , comment: None, }, // This now will only drop privileges of the role @@ -1017,7 +1120,8 @@ async fn get_operations<'a>( comment: None, }, Operation { - query: String::from(include_str!("sql/default_grants.sql")), + query: String::from(include_str!("sql/default_grants.sql")) + .replace("neon_superuser", ¶ms.privileged_role_name), comment: None, }, ] @@ -1086,6 +1190,28 @@ async fn get_operations<'a>( Ok(Box::new(operations)) } + // BEGIN_HADRON + // Note: we may want to version the extension someday, but for now we just drop it and recreate it. + ApplySpecPhase::HandleDatabricksAuthExtension => { + let operations = vec![ + Operation { + query: String::from("DROP EXTENSION IF EXISTS databricks_auth"), + comment: Some(String::from("dropping existing databricks_auth extension")), + }, + Operation { + query: String::from("CREATE EXTENSION databricks_auth"), + comment: Some(String::from("creating databricks_auth extension")), + }, + Operation { + query: String::from("GRANT SELECT ON databricks_auth_metrics TO pg_monitor"), + comment: Some(String::from("grant select on databricks auth counters")), + }, + ] + .into_iter(); + + Ok(Box::new(operations)) + } + // END_HADRON ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation { query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")), comment: None, @@ -1103,6 +1229,63 @@ async fn get_operations<'a>( Ok(Box::new(operations)) } + + // BEGIN_HADRON + // New Hadron phases + // + // Grants permissions to roles that are used by Databricks. + ApplySpecPhase::AddDatabricksGrants => { + let operations = vec![ + Operation { + query: String::from("GRANT USAGE ON SCHEMA neon TO databricks_monitor"), + comment: Some(String::from( + "Permissions needed to execute neon.* functions (in the postgres database)", + )), + }, + Operation { + query: String::from( + "GRANT SELECT, INSERT, UPDATE ON health_check TO databricks_monitor", + ), + comment: Some(String::from("Permissions needed for read and write probes")), + }, + Operation { + query: String::from( + "GRANT EXECUTE ON FUNCTION pg_ls_dir(text) TO databricks_monitor", + ), + comment: Some(String::from( + "Permissions needed to monitor .snap file counts", + )), + }, + Operation { + query: String::from( + "GRANT SELECT ON neon.neon_perf_counters TO databricks_monitor", + ), + comment: Some(String::from( + "Permissions needed to access neon performance counters view", + )), + }, + Operation { + query: String::from( + "GRANT EXECUTE ON FUNCTION neon.get_perf_counters() TO databricks_monitor", + ), + comment: Some(String::from( + "Permissions needed to execute the underlying performance counters function", + )), + }, + ] + .into_iter(); + + Ok(Box::new(operations)) + } + // Creates minor objects that are used by Databricks. + ApplySpecPhase::CreateDatabricksMisc => Ok(Box::new(once(Operation { + query: String::from(include_str!("sql/create_databricks_misc.sql")), + comment: Some(String::from( + "The function databricks_monitor uses to convert exception to 0 or 1", + )), + }))), + // End of new Hadron phases + // END_HADRON ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation { query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")), comment: None, diff --git a/compute_tools/src/sql/alter_databricks_reader_roles_timeout.sql b/compute_tools/src/sql/alter_databricks_reader_roles_timeout.sql new file mode 100644 index 0000000000..db16df3817 --- /dev/null +++ b/compute_tools/src/sql/alter_databricks_reader_roles_timeout.sql @@ -0,0 +1,25 @@ +DO $$ +DECLARE + reader_role RECORD; + timeout_value TEXT; +BEGIN + -- Get the current GUC setting for reader statement timeout + SELECT current_setting('databricks.reader_statement_timeout', true) INTO timeout_value; + + -- Only proceed if timeout_value is not null/empty and not '0' (disabled) + IF timeout_value IS NOT NULL AND timeout_value != '' AND timeout_value != '0' THEN + -- Find all databricks_reader_* roles and update their statement_timeout + FOR reader_role IN + SELECT r.rolname + FROM pg_roles r + WHERE r.rolname ~ '^databricks_reader_\d+$' + LOOP + -- Apply the timeout setting to the role (will overwrite existing setting) + EXECUTE format('ALTER ROLE %I SET statement_timeout = %L', + reader_role.rolname, timeout_value); + + RAISE LOG 'Updated statement_timeout = % for role %', timeout_value, reader_role.rolname; + END LOOP; + END IF; +END +$$; diff --git a/compute_tools/src/sql/create_databricks_misc.sql b/compute_tools/src/sql/create_databricks_misc.sql new file mode 100644 index 0000000000..a6dc379078 --- /dev/null +++ b/compute_tools/src/sql/create_databricks_misc.sql @@ -0,0 +1,15 @@ +ALTER ROLE databricks_monitor SET statement_timeout = '60s'; + +CREATE OR REPLACE FUNCTION health_check_write_succeeds() +RETURNS INTEGER AS $$ +BEGIN +INSERT INTO health_check VALUES (1, now()) +ON CONFLICT (id) DO UPDATE + SET updated_at = now(); + +RETURN 1; +EXCEPTION WHEN OTHERS THEN +RAISE EXCEPTION '[DATABRICKS_SMGR] health_check failed: [%] %', SQLSTATE, SQLERRM; +RETURN 0; +END; +$$ LANGUAGE plpgsql; From 0ffdc98e202f6573bdc07d78834e5c5f6423bf69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20Szafra=C5=84ski?= Date: Tue, 29 Jul 2025 17:25:22 +0200 Subject: [PATCH 11/15] [proxy] Classify "database not found" errors as user errors (#12603) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem If a user provides a wrong database name in the connection string, it should be logged as a user error, not postgres error. I found 4 different places where we log such errors: 1. `proxy/src/stream.rs:193`, e.g.: ``` {"timestamp":"2025-07-15T11:33:35.660026Z","level":"INFO","message":"forwarding error to user","fields":{"kind":"postgres","msg":"database \"[redacted]\" does not exist"},"spans":{"connect_request#9":{"protocol":"tcp","session_id":"ce1f2c90-dfb5-44f7-b9e9-8b8535e8b9b8","conn_info":"[redacted]","ep":"[redacted]","role":"[redacted]"}},"thread_id":22,"task_id":"370407867","target":"proxy::stream","src":"proxy/src/stream.rs:193","extract":{"ep":"[redacted]","session_id":"ce1f2c90-dfb5-44f7-b9e9-8b8535e8b9b8"}} ``` 2. `proxy/src/pglb/mod.rs:137`, e.g.: ``` {"timestamp":"2025-07-15T11:37:44.340497Z","level":"WARN","message":"per-client task finished with an error: Couldn't connect to compute node: db error: FATAL: database \"[redacted]\" does not exist","spans":{"connect_request#8":{"protocol":"tcp","session_id":"763baaac-d039-4f4d-9446-c149e32660eb","conn_info":"[redacted]","ep":"[redacted]","role":"[redacted]"}},"thread_id":14,"task_id":"866658139","target":"proxy::pglb","src":"proxy/src/pglb/mod.rs:137","extract":{"ep":"[redacted]","session_id":"763baaac-d039-4f4d-9446-c149e32660eb"}} ``` 3. `proxy/src/serverless/mod.rs:451`, e.g. (note that the error is repeated 4 times — retries?): ``` {"timestamp":"2025-07-15T11:37:54.515891Z","level":"WARN","message":"error in websocket connection: Couldn't connect to compute node: db error: FATAL: database \"[redacted]\" does not exist: Couldn't connect to compute node: db error: FATAL: database \"[redacted]\" does not exist: db error: FATAL: database \"[redacted]\" does not exist: FATAL: database \"[redacted]\" does not exist","spans":{"http_conn#8":{"conn_id":"ec7780db-a145-4f0e-90df-0ba35f41b828"},"connect_request#9":{"protocol":"ws","session_id":"1eaaeeec-b671-4153-b1f4-247839e4b1c7","conn_info":"[redacted]","ep":"[redacted]","role":"[redacted]"}},"thread_id":10,"task_id":"366331699","target":"proxy::serverless","src":"proxy/src/serverless/mod.rs:451","extract":{"conn_id":"ec7780db-a145-4f0e-90df-0ba35f41b828","ep":"[redacted]","session_id":"1eaaeeec-b671-4153-b1f4-247839e4b1c7"}} ``` 4. `proxy/src/serverless/sql_over_http.rs:219`, e.g. ``` {"timestamp":"2025-07-15T10:32:34.866603Z","level":"INFO","message":"forwarding error to user","fields":{"kind":"postgres","error":"could not connect to postgres in compute","msg":"database \"[redacted]\" does not exist"},"spans":{"http_conn#19":{"conn_id":"7da08203-5dab-45e8-809f-503c9019ec6b"},"connect_request#5":{"protocol":"http","session_id":"68387f1c-cbc8-45b3-a7db-8bb1c55ca809","conn_info":"[redacted]","ep":"[redacted]","role":"[redacted]"}},"thread_id":17,"task_id":"16432250","target":"proxy::serverless::sql_over_http","src":"proxy/src/serverless/sql_over_http.rs:219","extract":{"conn_id":"7da08203-5dab-45e8-809f-503c9019ec6b","ep":"[redacted]","session_id":"68387f1c-cbc8-45b3-a7db-8bb1c55ca809"}} ``` This PR directly addresses 1 and 4. I _think_ it _should_ also help with 2 and 3, although in those places we don't seem to log `kind`, so I'm not quite sure. I'm also confused why in 3 the error is repeated multiple times. ## Summary of changes Resolves https://github.com/neondatabase/neon/issues/9440 --- libs/proxy/tokio-postgres2/src/error/mod.rs | 2 +- proxy/src/compute/mod.rs | 18 +++++----- proxy/src/serverless/backend.rs | 18 +++++----- proxy/src/serverless/sql_over_http.rs | 39 +++++++++------------ 4 files changed, 37 insertions(+), 40 deletions(-) diff --git a/libs/proxy/tokio-postgres2/src/error/mod.rs b/libs/proxy/tokio-postgres2/src/error/mod.rs index 6e68b1e595..3fbb97f9bb 100644 --- a/libs/proxy/tokio-postgres2/src/error/mod.rs +++ b/libs/proxy/tokio-postgres2/src/error/mod.rs @@ -9,7 +9,7 @@ use postgres_protocol2::message::backend::{ErrorFields, ErrorResponseBody}; pub use self::sqlstate::*; #[allow(clippy::unreadable_literal)] -mod sqlstate; +pub mod sqlstate; /// The severity of a Postgres error or notice. #[derive(Debug, Copy, Clone, PartialEq, Eq)] diff --git a/proxy/src/compute/mod.rs b/proxy/src/compute/mod.rs index ca784423ee..43cfe70206 100644 --- a/proxy/src/compute/mod.rs +++ b/proxy/src/compute/mod.rs @@ -8,6 +8,7 @@ use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; use postgres_client::config::{AuthKeys, ChannelBinding, SslMode}; use postgres_client::connect_raw::StartupStream; +use postgres_client::error::SqlState; use postgres_client::maybe_tls_stream::MaybeTlsStream; use postgres_client::tls::MakeTlsConnect; use thiserror::Error; @@ -22,7 +23,7 @@ use crate::context::RequestContext; use crate::control_plane::client::ApiLockError; use crate::control_plane::errors::WakeComputeError; use crate::control_plane::messages::MetricsAuxInfo; -use crate::error::{ReportableError, UserFacingError}; +use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::metrics::{Metrics, NumDbConnectionsGuard}; use crate::pqproto::StartupMessageParams; use crate::proxy::connect_compute::TlsNegotiation; @@ -65,12 +66,13 @@ impl UserFacingError for PostgresError { } impl ReportableError for PostgresError { - fn get_error_kind(&self) -> crate::error::ErrorKind { + fn get_error_kind(&self) -> ErrorKind { match self { - PostgresError::Postgres(e) if e.as_db_error().is_some() => { - crate::error::ErrorKind::Postgres - } - PostgresError::Postgres(_) => crate::error::ErrorKind::Compute, + PostgresError::Postgres(err) => match err.as_db_error() { + Some(err) if err.code() == &SqlState::INVALID_CATALOG_NAME => ErrorKind::User, + Some(_) => ErrorKind::Postgres, + None => ErrorKind::Compute, + }, } } } @@ -110,9 +112,9 @@ impl UserFacingError for ConnectionError { } impl ReportableError for ConnectionError { - fn get_error_kind(&self) -> crate::error::ErrorKind { + fn get_error_kind(&self) -> ErrorKind { match self { - ConnectionError::TlsError(_) => crate::error::ErrorKind::Compute, + ConnectionError::TlsError(_) => ErrorKind::Compute, ConnectionError::WakeComputeError(e) => e.get_error_kind(), ConnectionError::TooManyConnectionAttempts(e) => e.get_error_kind(), #[cfg(test)] diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index eb879f98e7..511bdc4e42 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -4,6 +4,7 @@ use std::time::Duration; use ed25519_dalek::SigningKey; use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; use jose_jwk::jose_b64; +use postgres_client::error::SqlState; use postgres_client::maybe_tls_stream::MaybeTlsStream; use rand_core::OsRng; use tracing::field::display; @@ -459,15 +460,14 @@ impl ReportableError for HttpConnError { match self { HttpConnError::ConnectError(_) => ErrorKind::Compute, HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute, - HttpConnError::PostgresConnectionError(p) => { - if p.as_db_error().is_some() { - // postgres rejected the connection - ErrorKind::Postgres - } else { - // couldn't even reach postgres - ErrorKind::Compute - } - } + HttpConnError::PostgresConnectionError(p) => match p.as_db_error() { + // user provided a wrong database name + Some(err) if err.code() == &SqlState::INVALID_CATALOG_NAME => ErrorKind::User, + // postgres rejected the connection + Some(_) => ErrorKind::Postgres, + // couldn't even reach postgres + None => ErrorKind::Compute, + }, HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute, HttpConnError::ComputeCtl(_) => ErrorKind::Service, HttpConnError::JwtPayloadError(_) => ErrorKind::User, diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 26f65379e7..c334e820d7 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -192,34 +192,29 @@ pub(crate) async fn handle( let line = get(db_error, |db| db.line().map(|l| l.to_string())); let routine = get(db_error, |db| db.routine()); - match &e { - SqlOverHttpError::Postgres(e) - if e.as_db_error().is_some() && error_kind == ErrorKind::User => - { - // this error contains too much info, and it's not an error we care about. - if tracing::enabled!(Level::DEBUG) { - tracing::debug!( - kind=error_kind.to_metric_label(), - error=%e, - msg=message, - "forwarding error to user" - ); - } else { - tracing::info!( - kind = error_kind.to_metric_label(), - error = "bad query", - "forwarding error to user" - ); - } - } - _ => { - tracing::info!( + if db_error.is_some() && error_kind == ErrorKind::User { + // this error contains too much info, and it's not an error we care about. + if tracing::enabled!(Level::DEBUG) { + debug!( kind=error_kind.to_metric_label(), error=%e, msg=message, "forwarding error to user" ); + } else { + info!( + kind = error_kind.to_metric_label(), + error = "bad query", + "forwarding error to user" + ); } + } else { + info!( + kind=error_kind.to_metric_label(), + error=%e, + msg=message, + "forwarding error to user" + ); } json_response( From 5585c32ceebb498b0d0085fc7d1306199e08264e Mon Sep 17 00:00:00 2001 From: a-masterov <72613290+a-masterov@users.noreply.github.com> Date: Tue, 29 Jul 2025 17:34:02 +0200 Subject: [PATCH 12/15] Disable autovacuum while running pg_repack test (#12755) ## Problem Sometimes, the regression test of `pg_repack` fails due to an extra line in the output. The most probable cause of this is autovacuum. https://databricks.atlassian.net/browse/LKB-2637 ## Summary of changes Autovacuum is disabled during the test. Co-authored-by: Alexey Masterov --- compute/patches/pg_repack.patch | 68 ++++++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 18 deletions(-) diff --git a/compute/patches/pg_repack.patch b/compute/patches/pg_repack.patch index 10ed1054ff..b8a057e222 100644 --- a/compute/patches/pg_repack.patch +++ b/compute/patches/pg_repack.patch @@ -1,5 +1,11 @@ +commit 5eb393810cf7c7bafa4e394dad2e349e2a8cb2cb +Author: Alexey Masterov +Date: Mon Jul 28 18:11:02 2025 +0200 + + Patch for pg_repack + diff --git a/regress/Makefile b/regress/Makefile -index bf6edcb..89b4c7f 100644 +index bf6edcb..110e734 100644 --- a/regress/Makefile +++ b/regress/Makefile @@ -17,7 +17,7 @@ INTVERSION := $(shell echo $$(($$(echo $(VERSION).0 | sed 's/\([[:digit:]]\{1,\} @@ -7,18 +13,36 @@ index bf6edcb..89b4c7f 100644 # -REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by trigger -+REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger ++REGRESS := init-extension noautovacuum repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger autovacuum USE_PGXS = 1 # use pgxs if not in contrib directory PGXS := $(shell $(PG_CONFIG) --pgxs) -diff --git a/regress/expected/init-extension.out b/regress/expected/init-extension.out -index 9f2e171..f6e4f8d 100644 ---- a/regress/expected/init-extension.out -+++ b/regress/expected/init-extension.out -@@ -1,3 +1,2 @@ - SET client_min_messages = warning; - CREATE EXTENSION pg_repack; --RESET client_min_messages; +diff --git a/regress/expected/autovacuum.out b/regress/expected/autovacuum.out +new file mode 100644 +index 0000000..e7f2363 +--- /dev/null ++++ b/regress/expected/autovacuum.out +@@ -0,0 +1,7 @@ ++ALTER SYSTEM SET autovacuum='on'; ++SELECT pg_reload_conf(); ++ pg_reload_conf ++---------------- ++ t ++(1 row) ++ +diff --git a/regress/expected/noautovacuum.out b/regress/expected/noautovacuum.out +new file mode 100644 +index 0000000..fc7978e +--- /dev/null ++++ b/regress/expected/noautovacuum.out +@@ -0,0 +1,7 @@ ++ALTER SYSTEM SET autovacuum='off'; ++SELECT pg_reload_conf(); ++ pg_reload_conf ++---------------- ++ t ++(1 row) ++ diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out index 8d0a94e..63b68bf 100644 --- a/regress/expected/nosuper.out @@ -50,14 +74,22 @@ index 8d0a94e..63b68bf 100644 INFO: repacking table "public.tbl_cluster" ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block DETAIL: query was: RESET lock_timeout -diff --git a/regress/sql/init-extension.sql b/regress/sql/init-extension.sql -index 9f2e171..f6e4f8d 100644 ---- a/regress/sql/init-extension.sql -+++ b/regress/sql/init-extension.sql -@@ -1,3 +1,2 @@ - SET client_min_messages = warning; - CREATE EXTENSION pg_repack; --RESET client_min_messages; +diff --git a/regress/sql/autovacuum.sql b/regress/sql/autovacuum.sql +new file mode 100644 +index 0000000..a8eda63 +--- /dev/null ++++ b/regress/sql/autovacuum.sql +@@ -0,0 +1,2 @@ ++ALTER SYSTEM SET autovacuum='on'; ++SELECT pg_reload_conf(); +diff --git a/regress/sql/noautovacuum.sql b/regress/sql/noautovacuum.sql +new file mode 100644 +index 0000000..13d4836 +--- /dev/null ++++ b/regress/sql/noautovacuum.sql +@@ -0,0 +1,2 @@ ++ALTER SYSTEM SET autovacuum='off'; ++SELECT pg_reload_conf(); diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql index 072f0fa..dbe60f8 100644 --- a/regress/sql/nosuper.sql From bb32f1b3d05fbac6ada499f1c5ec788e6adbb9c1 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 29 Jul 2025 18:35:00 +0300 Subject: [PATCH 13/15] Move 'criterion' to a dev-dependency (#12762) It is only used in micro-benchmarks. --- libs/postgres_ffi/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index 5adf38f457..23fabeccd2 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -9,7 +9,6 @@ regex.workspace = true bytes.workspace = true anyhow.workspace = true crc32c.workspace = true -criterion.workspace = true once_cell.workspace = true pprof.workspace = true thiserror.workspace = true @@ -20,6 +19,7 @@ tracing.workspace = true postgres_versioninfo.workspace = true [dev-dependencies] +criterion.workspace = true env_logger.workspace = true postgres.workspace = true From 16eb8dda3de9e2b6bc35cc150046cce936aa1746 Mon Sep 17 00:00:00 2001 From: Suhas Thalanki <54014218+thesuhas@users.noreply.github.com> Date: Tue, 29 Jul 2025 12:01:56 -0400 Subject: [PATCH 14/15] some compute ctl changes from hadron (#12760) Some compute ctl changes from hadron --- compute_tools/src/bin/compute_ctl.rs | 42 ++++++++++++-- compute_tools/src/compute.rs | 84 +++++++++++++++++++++++----- 2 files changed, 109 insertions(+), 17 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 9c86aba531..2b4802f309 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -82,6 +82,15 @@ struct Cli { #[arg(long, default_value_t = 3081)] pub internal_http_port: u16, + /// Backwards-compatible --http-port for Hadron deployments. Functionally the + /// same as --external-http-port. + #[arg( + long, + conflicts_with = "external_http_port", + conflicts_with = "internal_http_port" + )] + pub http_port: Option, + #[arg(short = 'D', long, value_name = "DATADIR")] pub pgdata: String, @@ -181,6 +190,26 @@ impl Cli { } } +// Hadron helpers to get compatible compute_ctl http ports from Cli. The old `--http-port` +// arg is used and acts the same as `--external-http-port`. The internal http port is defined +// to be http_port + 1. Hadron runs in the dblet environment which uses the host network, so +// we need to be careful with the ports to choose. +fn get_external_http_port(cli: &Cli) -> u16 { + if cli.lakebase_mode { + return cli.http_port.unwrap_or(cli.external_http_port); + } + cli.external_http_port +} +fn get_internal_http_port(cli: &Cli) -> u16 { + if cli.lakebase_mode { + return cli + .http_port + .map(|p| p + 1) + .unwrap_or(cli.internal_http_port); + } + cli.internal_http_port +} + fn main() -> Result<()> { let cli = Cli::parse(); @@ -205,13 +234,18 @@ fn main() -> Result<()> { // enable core dumping for all child processes setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; - installed_extensions::initialize_metrics(); - hadron_metrics::initialize_metrics(); + if cli.lakebase_mode { + installed_extensions::initialize_metrics(); + hadron_metrics::initialize_metrics(); + } let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?; let config = get_config(&cli)?; + let external_http_port = get_external_http_port(&cli); + let internal_http_port = get_internal_http_port(&cli); + let compute_node = ComputeNode::new( ComputeNodeParams { compute_id: cli.compute_id, @@ -220,8 +254,8 @@ fn main() -> Result<()> { pgdata: cli.pgdata.clone(), pgbin: cli.pgbin.clone(), pgversion: get_pg_version_string(&cli.pgbin), - external_http_port: cli.external_http_port, - internal_http_port: cli.internal_http_port, + external_http_port, + internal_http_port, remote_ext_base_url: cli.remote_ext_base_url.clone(), resize_swap_on_bind: cli.resize_swap_on_bind, set_disk_quota_for_fs: cli.set_disk_quota_for_fs, diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 8350a4c059..27d33d8cd8 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -482,14 +482,27 @@ pub struct DatabricksEnvVars { /// Hostname of the Databricks workspace URL this compute instance belongs to. /// Used by postgres to verify Databricks PAT tokens. pub workspace_host: String, + + pub lakebase_mode: bool, } impl DatabricksEnvVars { - pub fn new(compute_spec: &ComputeSpec, compute_id: Option<&String>) -> Self { - // compute_id is a string format of "{endpoint_id}/{compute_idx}" - // endpoint_id is a uuid. We only need to pass down endpoint_id to postgres. - // Panics if compute_id is not set or not in the expected format. - let endpoint_id = compute_id.unwrap().split('/').next().unwrap().to_string(); + pub fn new( + compute_spec: &ComputeSpec, + compute_id: Option<&String>, + instance_id: Option, + lakebase_mode: bool, + ) -> Self { + let endpoint_id = if let Some(instance_id) = instance_id { + // Use instance_id as endpoint_id if it is set. This code path is for PuPr model. + instance_id + } else { + // Use compute_id as endpoint_id if instance_id is not set. The code path is for PrPr model. + // compute_id is a string format of "{endpoint_id}/{compute_idx}" + // endpoint_id is a uuid. We only need to pass down endpoint_id to postgres. + // Panics if compute_id is not set or not in the expected format. + compute_id.unwrap().split('/').next().unwrap().to_string() + }; let workspace_host = compute_spec .databricks_settings .as_ref() @@ -498,6 +511,7 @@ impl DatabricksEnvVars { Self { endpoint_id, workspace_host, + lakebase_mode, } } @@ -507,6 +521,10 @@ impl DatabricksEnvVars { /// Convert DatabricksEnvVars to a list of string pairs that can be passed as env vars. Consumes `self`. pub fn to_env_var_list(self) -> Vec<(String, String)> { + if !self.lakebase_mode { + // In neon env, we don't need to pass down the env vars to postgres. + return vec![]; + } vec![ ( Self::DATABRICKS_ENDPOINT_ID_ENVVAR.to_string(), @@ -556,7 +574,11 @@ impl ComputeNode { let mut new_state = ComputeState::new(); if let Some(spec) = config.spec { let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; - new_state.pspec = Some(pspec); + if params.lakebase_mode { + ComputeNode::set_spec(¶ms, &mut new_state, pspec); + } else { + new_state.pspec = Some(pspec); + } } Ok(ComputeNode { @@ -1154,7 +1176,14 @@ impl ComputeNode { // If it is something different then create_dir() will error out anyway. let pgdata = &self.params.pgdata; let _ok = fs::remove_dir_all(pgdata); - fs::create_dir(pgdata)?; + if self.params.lakebase_mode { + // Ignore creation errors if the directory already exists (e.g. mounting it ahead of time). + // If it is something different then PG startup will error out anyway. + let _ok = fs::create_dir(pgdata); + } else { + fs::create_dir(pgdata)?; + } + fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?; Ok(()) @@ -1633,7 +1662,7 @@ impl ComputeNode { // symlink doesn't affect anything. // // See https://github.com/neondatabase/autoscaling/issues/800 - std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?; + std::fs::remove_dir_all(pgdata_path.join("pg_dynshmem"))?; symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?; match spec.mode { @@ -1648,6 +1677,12 @@ impl ComputeNode { /// Start and stop a postgres process to warm up the VM for startup. pub fn prewarm_postgres_vm_memory(&self) -> Result<()> { + if self.params.lakebase_mode { + // We are running in Hadron mode. Disabling this prewarming step for now as it could run + // into dblet port conflicts and also doesn't add much value with our current infra. + info!("Skipping postgres prewarming in Hadron mode"); + return Ok(()); + } info!("prewarming VM memory"); // Create pgdata @@ -1709,7 +1744,12 @@ impl ComputeNode { let databricks_env_vars = { let state = self.state.lock().unwrap(); let spec = &state.pspec.as_ref().unwrap().spec; - DatabricksEnvVars::new(spec, Some(&self.params.compute_id)) + DatabricksEnvVars::new( + spec, + Some(&self.params.compute_id), + self.params.instance_id.clone(), + self.params.lakebase_mode, + ) }; info!( @@ -1881,7 +1921,15 @@ impl ComputeNode { /// Do initial configuration of the already started Postgres. #[instrument(skip_all)] pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { - let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config")); + let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config")); + + if self.params.lakebase_mode { + // Set a 2-minute statement_timeout for the session applying config. The individual SQL statements + // used in apply_spec_sql() should not take long (they are just creating users and installing + // extensions). If any of them are stuck for an extended period of time it usually indicates a + // pageserver connectivity problem and we should bail out. + conf.options("-c statement_timeout=2min"); + } let conf = Arc::new(conf); let spec = Arc::new( @@ -2199,7 +2247,17 @@ impl ComputeNode { pub fn check_for_core_dumps(&self) -> Result<()> { let core_dump_dir = match std::env::consts::OS { "macos" => Path::new("/cores/"), - _ => Path::new(&self.params.pgdata), + // BEGIN HADRON + // NB: Read core dump files from a fixed location outside of + // the data directory since `compute_ctl` wipes the data directory + // across container restarts. + _ => { + if self.params.lakebase_mode { + Path::new("/databricks/logs/brickstore") + } else { + Path::new(&self.params.pgdata) + } + } // END HADRON }; // Collect core dump paths if any @@ -2512,7 +2570,7 @@ LIMIT 100", if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { libs_vec = libs .split(&[',', '\'', ' ']) - .filter(|s| *s != "neon" && !s.is_empty()) + .filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty()) .map(str::to_string) .collect(); } @@ -2531,7 +2589,7 @@ LIMIT 100", if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) { preload_libs_vec = libs .split(&[',', '\'', ' ']) - .filter(|s| *s != "neon" && !s.is_empty()) + .filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty()) .map(str::to_string) .collect(); } From 65d1be6e901a1a33ce31a06c5a5d63b481f24d14 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 29 Jul 2025 18:28:57 +0200 Subject: [PATCH 15/15] pageserver: route gRPC requests to child shards (#12702) ## Problem During shard splits, each parent shard is split and removed incrementally. Only when all parent shards have split is the split committed and the compute notified. This can take several minutes for large tenants. In the meanwhile, the compute will be sending requests to the (now-removed) parent shards. This was (mostly) not a problem for the libpq protocol, because it does shard routing on the server-side. The compute just sends requests to some Pageserver, and the server will figure out which local shard should serve it. It is a problem for the gRPC protocol, where the client explicitly says which shard it's talking to. Touches [LKB-191](https://databricks.atlassian.net/browse/LKB-191). Requires #12772. ## Summary of changes * Add server-side routing of gRPC requests to any local child shards if the parent does not exist. * Add server-side splitting of GetPage batch requests straddling multiple child shards. * Move the `GetPageSplitter` into `pageserver_page_api`. I really don't like this approach, but it avoids making changes to the split protocol. I could be convinced we should change the split protocol instead, e.g. to keep the parent shard alive until the split commits and the compute has been notified, but we can also do that as a later change without blocking the communicator on it. --- pageserver/client_grpc/src/client.rs | 2 +- pageserver/client_grpc/src/lib.rs | 1 - pageserver/page_api/src/lib.rs | 4 +- .../{client_grpc => page_api}/src/split.rs | 28 +- pageserver/src/page_service.rs | 292 +++++++++++++----- pageserver/src/tenant/mgr.rs | 19 +- 6 files changed, 256 insertions(+), 90 deletions(-) rename pageserver/{client_grpc => page_api}/src/split.rs (91%) diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index e6a90fb582..b8ee57bf9f 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -14,9 +14,9 @@ use utils::logging::warn_slow; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; use crate::retry::Retry; -use crate::split::GetPageSplitter; use compute_api::spec::PageserverProtocol; use pageserver_page_api as page_api; +use pageserver_page_api::GetPageSplitter; use utils::id::{TenantId, TimelineId}; use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize}; diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 14fb3fbd5a..4999fd3d0a 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -1,6 +1,5 @@ mod client; mod pool; mod retry; -mod split; pub use client::{PageserverClient, ShardSpec}; diff --git a/pageserver/page_api/src/lib.rs b/pageserver/page_api/src/lib.rs index e78f6ce206..b44df6337f 100644 --- a/pageserver/page_api/src/lib.rs +++ b/pageserver/page_api/src/lib.rs @@ -19,7 +19,9 @@ pub mod proto { } mod client; -pub use client::Client; mod model; +mod split; +pub use client::Client; pub use model::*; +pub use split::GetPageSplitter; diff --git a/pageserver/client_grpc/src/split.rs b/pageserver/page_api/src/split.rs similarity index 91% rename from pageserver/client_grpc/src/split.rs rename to pageserver/page_api/src/split.rs index 8631638686..5ecc90a166 100644 --- a/pageserver/client_grpc/src/split.rs +++ b/pageserver/page_api/src/split.rs @@ -3,18 +3,18 @@ use std::collections::HashMap; use anyhow::anyhow; use bytes::Bytes; +use crate::model::*; use pageserver_api::key::rel_block_to_key; use pageserver_api::shard::key_to_shard_number; -use pageserver_page_api as page_api; use utils::shard::{ShardCount, ShardIndex, ShardStripeSize}; /// Splits GetPageRequests that straddle shard boundaries and assembles the responses. /// TODO: add tests for this. pub struct GetPageSplitter { /// Split requests by shard index. - requests: HashMap, + requests: HashMap, /// The response being assembled. Preallocated with empty pages, to be filled in. - response: page_api::GetPageResponse, + response: GetPageResponse, /// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used /// to assemble the response pages in the same order as the original request. block_shards: Vec, @@ -24,7 +24,7 @@ impl GetPageSplitter { /// Checks if the given request only touches a single shard, and returns the shard ID. This is /// the common case, so we check first in order to avoid unnecessary allocations and overhead. pub fn for_single_shard( - req: &page_api::GetPageRequest, + req: &GetPageRequest, count: ShardCount, stripe_size: Option, ) -> anyhow::Result> { @@ -57,7 +57,7 @@ impl GetPageSplitter { /// Splits the given request. pub fn split( - req: page_api::GetPageRequest, + req: GetPageRequest, count: ShardCount, stripe_size: Option, ) -> anyhow::Result { @@ -84,7 +84,7 @@ impl GetPageSplitter { requests .entry(shard_id) - .or_insert_with(|| page_api::GetPageRequest { + .or_insert_with(|| GetPageRequest { request_id: req.request_id, request_class: req.request_class, rel: req.rel, @@ -98,16 +98,16 @@ impl GetPageSplitter { // Construct a response to be populated by shard responses. Preallocate empty page slots // with the expected block numbers. - let response = page_api::GetPageResponse { + let response = GetPageResponse { request_id: req.request_id, - status_code: page_api::GetPageStatusCode::Ok, + status_code: GetPageStatusCode::Ok, reason: None, rel: req.rel, pages: req .block_numbers .into_iter() .map(|block_number| { - page_api::Page { + Page { block_number, image: Bytes::new(), // empty page slot to be filled in } @@ -123,9 +123,7 @@ impl GetPageSplitter { } /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations. - pub fn drain_requests( - &mut self, - ) -> impl Iterator { + pub fn drain_requests(&mut self) -> impl Iterator { self.requests.drain() } @@ -135,10 +133,10 @@ impl GetPageSplitter { pub fn add_response( &mut self, shard_id: ShardIndex, - response: page_api::GetPageResponse, + response: GetPageResponse, ) -> anyhow::Result<()> { // The caller should already have converted status codes into tonic::Status. - if response.status_code != page_api::GetPageStatusCode::Ok { + if response.status_code != GetPageStatusCode::Ok { return Err(anyhow!( "unexpected non-OK response for shard {shard_id}: {} {}", response.status_code, @@ -209,7 +207,7 @@ impl GetPageSplitter { /// Fetches the final, assembled response. #[allow(clippy::result_large_err)] - pub fn get_response(self) -> anyhow::Result { + pub fn get_response(self) -> anyhow::Result { // Check that the response is complete. for (i, page) in self.response.pages.iter().enumerate() { if page.image.is_empty() { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b3bc42a55e..f16046657a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -16,7 +16,8 @@ use anyhow::{Context as _, bail}; use bytes::{Buf as _, BufMut as _, BytesMut}; use chrono::Utc; use futures::future::BoxFuture; -use futures::{FutureExt, Stream}; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, Stream, StreamExt as _}; use itertools::Itertools; use jsonwebtoken::TokenData; use once_cell::sync::OnceCell; @@ -35,8 +36,8 @@ use pageserver_api::pagestream_api::{ }; use pageserver_api::reltag::SlruKind; use pageserver_api::shard::TenantShardId; -use pageserver_page_api as page_api; use pageserver_page_api::proto; +use pageserver_page_api::{self as page_api, GetPageSplitter}; use postgres_backend::{ AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error, }; @@ -3423,18 +3424,6 @@ impl GrpcPageServiceHandler { Ok(CancellableTask { task, cancel }) } - /// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of - /// relations and their sizes, as well as SLRU segments and similar data. - #[allow(clippy::result_large_err)] - fn ensure_shard_zero(timeline: &Handle) -> Result<(), tonic::Status> { - match timeline.get_shard_index().shard_number.0 { - 0 => Ok(()), - shard => Err(tonic::Status::invalid_argument(format!( - "request must execute on shard zero (is shard {shard})", - ))), - } - } - /// Generates a PagestreamRequest header from a ReadLsn and request ID. fn make_hdr( read_lsn: page_api::ReadLsn, @@ -3449,30 +3438,72 @@ impl GrpcPageServiceHandler { } } - /// Acquires a timeline handle for the given request. + /// Acquires a timeline handle for the given request. The shard index must match a local shard. /// - /// TODO: during shard splits, the compute may still be sending requests to the parent shard - /// until the entire split is committed and the compute is notified. Consider installing a - /// temporary shard router from the parent to the children while the split is in progress. - /// - /// TODO: consider moving this to a middleware layer; all requests need it. Needs to manage - /// the TimelineHandles lifecycle. - /// - /// TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to avoid - /// the unnecessary overhead. + /// NB: this will fail during shard splits, see comment on [`Self::maybe_split_get_page`]. async fn get_request_timeline( &self, req: &tonic::Request, ) -> Result, GetActiveTimelineError> { - let ttid = *extract::(req); + let TenantTimelineId { + tenant_id, + timeline_id, + } = *extract::(req); let shard_index = *extract::(req); - let shard_selector = ShardSelector::Known(shard_index); + // TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to + // avoid the unnecessary overhead. TimelineHandles::new(self.tenant_manager.clone()) - .get(ttid.tenant_id, ttid.timeline_id, shard_selector) + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) .await } + /// Acquires a timeline handle for the given request, which must be for shard zero. Most + /// metadata requests are only valid on shard zero. + /// + /// NB: during an ongoing shard split, the compute will keep talking to the parent shard until + /// the split is committed, but the parent shard may have been removed in the meanwhile. In that + /// case, we reroute the request to the new child shard. See [`Self::maybe_split_get_page`]. + /// + /// TODO: revamp the split protocol to avoid this child routing. + async fn get_request_timeline_shard_zero( + &self, + req: &tonic::Request, + ) -> Result, tonic::Status> { + let TenantTimelineId { + tenant_id, + timeline_id, + } = *extract::(req); + let shard_index = *extract::(req); + + if shard_index.shard_number.0 != 0 { + return Err(tonic::Status::invalid_argument(format!( + "request only valid on shard zero (requested shard {shard_index})", + ))); + } + + // TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to + // avoid the unnecessary overhead. + let mut handles = TimelineHandles::new(self.tenant_manager.clone()); + match handles + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) + .await + { + Ok(timeline) => Ok(timeline), + Err(err) => { + // We may be in the middle of a shard split. Try to find a child shard 0. + if let Ok(timeline) = handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .await + && timeline.get_shard_index().shard_count > shard_index.shard_count + { + return Ok(timeline); + } + Err(err.into()) + } + } + } + /// Starts a SmgrOpTimer at received_at, throttles the request, and records execution start. /// Only errors if the timeline is shutting down. /// @@ -3502,28 +3533,22 @@ impl GrpcPageServiceHandler { /// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send /// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or /// split them up in the client or server. - #[instrument(skip_all, fields(req_id, rel, blkno, blks, req_lsn, mod_lsn))] + #[instrument(skip_all, fields( + req_id = %req.request_id, + rel = %req.rel, + blkno = %req.block_numbers[0], + blks = %req.block_numbers.len(), + lsn = %req.read_lsn, + ))] async fn get_page( ctx: &RequestContext, - timeline: &WeakHandle, - req: proto::GetPageRequest, + timeline: Handle, + req: page_api::GetPageRequest, io_concurrency: IoConcurrency, - ) -> Result { - let received_at = Instant::now(); - let timeline = timeline.upgrade()?; + received_at: Instant, + ) -> Result { let ctx = ctx.with_scope_page_service_pagestream(&timeline); - // Validate the request, decorate the span, and convert it to a Pagestream request. - let req = page_api::GetPageRequest::try_from(req)?; - - span_record!( - req_id = %req.request_id, - rel = %req.rel, - blkno = %req.block_numbers[0], - blks = %req.block_numbers.len(), - lsn = %req.read_lsn, - ); - for &blkno in &req.block_numbers { let shard = timeline.get_shard_identity(); let key = rel_block_to_key(req.rel, blkno); @@ -3611,7 +3636,95 @@ impl GrpcPageServiceHandler { }; } - Ok(resp.into()) + Ok(resp) + } + + /// Processes a GetPage request when there is a potential shard split in progress. We have to + /// reroute the request to any local child shards, and split batch requests that straddle + /// multiple child shards. + /// + /// Parent shards are split and removed incrementally (there may be many parent shards when + /// splitting an already-sharded tenant), but the compute is only notified once the overall + /// split commits, which can take several minutes. In the meanwhile, the compute will be sending + /// requests to the parent shards. + /// + /// TODO: add test infrastructure to provoke this situation frequently and for long periods of + /// time, to properly exercise it. + /// + /// TODO: revamp the split protocol to avoid this, e.g.: + /// * Keep the parent shard until the split commits and the compute is notified. + /// * Notify the compute about each subsplit. + /// * Return an error that updates the compute's shard map. + #[instrument(skip_all)] + #[allow(clippy::too_many_arguments)] + async fn maybe_split_get_page( + ctx: &RequestContext, + handles: &mut TimelineHandles, + tenant_id: TenantId, + timeline_id: TimelineId, + parent: ShardIndex, + req: page_api::GetPageRequest, + io_concurrency: IoConcurrency, + received_at: Instant, + ) -> Result { + // Check the first page to see if we have any child shards at all. Otherwise, the compute is + // just talking to the wrong Pageserver. If the parent has been split, the shard now owning + // the page must have a higher shard count. + let timeline = handles + .get( + tenant_id, + timeline_id, + ShardSelector::Page(rel_block_to_key(req.rel, req.block_numbers[0])), + ) + .await?; + + let shard_id = timeline.get_shard_identity(); + if shard_id.count <= parent.shard_count { + return Err(HandleUpgradeError::ShutDown.into()); // emulate original error + } + + // Fast path: the request fits in a single shard. + if let Some(shard_index) = + GetPageSplitter::for_single_shard(&req, shard_id.count, Some(shard_id.stripe_size)) + .map_err(|err| tonic::Status::internal(err.to_string()))? + { + // We got the shard ID from the first page, so these must be equal. + assert_eq!(shard_index.shard_number, shard_id.number); + assert_eq!(shard_index.shard_count, shard_id.count); + return Self::get_page(ctx, timeline, req, io_concurrency, received_at).await; + } + + // The request spans multiple shards; split it and dispatch parallel requests. All pages + // were originally in the parent shard, and during a split all children are local, so we + // expect to find local shards for all pages. + let mut splitter = GetPageSplitter::split(req, shard_id.count, Some(shard_id.stripe_size)) + .map_err(|err| tonic::Status::internal(err.to_string()))?; + + let mut shard_requests = FuturesUnordered::new(); + for (shard_index, shard_req) in splitter.drain_requests() { + let timeline = handles + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) + .await?; + let future = Self::get_page( + ctx, + timeline, + shard_req, + io_concurrency.clone(), + received_at, + ) + .map(move |result| result.map(|resp| (shard_index, resp))); + shard_requests.push(future); + } + + while let Some((shard_index, shard_response)) = shard_requests.next().await.transpose()? { + splitter + .add_response(shard_index, shard_response) + .map_err(|err| tonic::Status::internal(err.to_string()))?; + } + + splitter + .get_response() + .map_err(|err| tonic::Status::internal(err.to_string())) } } @@ -3640,11 +3753,10 @@ impl proto::PageService for GrpcPageServiceHandler { // to be the sweet spot where throughput is saturated. const CHUNK_SIZE: usize = 256 * 1024; - let timeline = self.get_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_timeline(&timeline); // Validate the request and decorate the span. - Self::ensure_shard_zero(&timeline)?; if timeline.is_archived() == Some(true) { return Err(tonic::Status::failed_precondition("timeline is archived")); } @@ -3760,11 +3872,10 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request, ) -> Result, tonic::Status> { let received_at = extract::(&req).0; - let timeline = self.get_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. - Self::ensure_shard_zero(&timeline)?; let req: page_api::GetDbSizeRequest = req.into_inner().try_into()?; span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn); @@ -3793,14 +3904,29 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request>, ) -> Result, tonic::Status> { // Extract the timeline from the request and check that it exists. - let ttid = *extract::(&req); + // + // NB: during shard splits, the compute may still send requests to the parent shard. We'll + // reroute requests to the child shards below, but we also detect the common cases here + // where either the shard exists or no shards exist at all. If we have a child shard, we + // can't acquire a weak handle because we don't know which child shard to use yet. + let TenantTimelineId { + tenant_id, + timeline_id, + } = *extract::(&req); let shard_index = *extract::(&req); - let shard_selector = ShardSelector::Known(shard_index); let mut handles = TimelineHandles::new(self.tenant_manager.clone()); - handles - .get(ttid.tenant_id, ttid.timeline_id, shard_selector) - .await?; + let timeline = match handles + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) + .await + { + // The timeline shard exists. Keep a weak handle to reuse for each request. + Ok(timeline) => Some(timeline.downgrade()), + // The shard doesn't exist, but a child shard does. We'll reroute requests later. + Err(_) if self.tenant_manager.has_child_shard(tenant_id, shard_index) => None, + // Failed to fetch the timeline, and no child shard exists. Error out. + Err(err) => return Err(err.into()), + }; // Spawn an IoConcurrency sidecar, if enabled. let gate_guard = self @@ -3817,11 +3943,9 @@ impl proto::PageService for GrpcPageServiceHandler { let mut reqs = req.into_inner(); let resps = async_stream::try_stream! { - let timeline = handles - .get(ttid.tenant_id, ttid.timeline_id, shard_selector) - .await? - .downgrade(); loop { + // Wait for the next client request. + // // NB: Tonic considers the entire stream to be an in-flight request and will wait // for it to complete before shutting down. React to cancellation between requests. let req = tokio::select! { @@ -3834,16 +3958,44 @@ impl proto::PageService for GrpcPageServiceHandler { Err(err) => Err(err), }, }?; + + let received_at = Instant::now(); let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default(); - let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone()) + + // Process the request, using a closure to capture errors. + let process_request = async || { + let req = page_api::GetPageRequest::try_from(req)?; + + // Fast path: use the pre-acquired timeline handle. + if let Some(Ok(timeline)) = timeline.as_ref().map(|t| t.upgrade()) { + return Self::get_page(&ctx, timeline, req, io_concurrency.clone(), received_at) + .instrument(span.clone()) // propagate request span + .await + } + + // The timeline handle is stale. During shard splits, the compute may still be + // sending requests to the parent shard. Try to re-route requests to the child + // shards, and split any batch requests that straddle multiple child shards. + Self::maybe_split_get_page( + &ctx, + &mut handles, + tenant_id, + timeline_id, + shard_index, + req, + io_concurrency.clone(), + received_at, + ) .instrument(span.clone()) // propagate request span - .await; - yield match result { - Ok(resp) => resp, - // Convert per-request errors to GetPageResponses as appropriate, or terminate - // the stream with a tonic::Status. Log the error regardless, since - // ObservabilityLayer can't automatically log stream errors. + .await + }; + + // Return the response. Convert per-request errors to GetPageResponses if + // appropriate, or terminate the stream with a tonic::Status. + yield match process_request().await { + Ok(resp) => resp.into(), Err(status) => { + // Log the error, since ObservabilityLayer won't see stream errors. // TODO: it would be nice if we could propagate the get_page() fields here. span.in_scope(|| { warn!("request failed with {:?}: {}", status.code(), status.message()); @@ -3863,11 +4015,10 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request, ) -> Result, tonic::Status> { let received_at = extract::(&req).0; - let timeline = self.get_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. - Self::ensure_shard_zero(&timeline)?; let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?; let allow_missing = req.allow_missing; @@ -3900,11 +4051,10 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request, ) -> Result, tonic::Status> { let received_at = extract::(&req).0; - let timeline = self.get_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. - Self::ensure_shard_zero(&timeline)?; let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?; span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn); @@ -3934,6 +4084,10 @@ impl proto::PageService for GrpcPageServiceHandler { &self, req: tonic::Request, ) -> Result, tonic::Status> { + // TODO: this won't work during shard splits, as the request is directed at a specific shard + // but the parent shard is removed before the split commits and the compute is notified + // (which can take several minutes for large tenants). That's also the case for the libpq + // implementation, so we keep the behavior for now. let timeline = self.get_request_timeline(&req).await?; let ctx = self.ctx.with_scope_timeline(&timeline); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 4432b4bba8..0feba5e9c8 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -826,6 +826,18 @@ impl TenantManager { peek_slot.is_some() } + /// Returns whether a local shard exists that's a child of the given tenant shard. Note that + /// this just checks for any shard with a larger shard count, and it may not be a direct child + /// of the given shard (their keyspace may not overlap). + pub(crate) fn has_child_shard(&self, tenant_id: TenantId, shard_index: ShardIndex) -> bool { + match &*self.tenants.read().unwrap() { + TenantsMap::Initializing => false, + TenantsMap::Open(slots) | TenantsMap::ShuttingDown(slots) => slots + .range(TenantShardId::tenant_range(tenant_id)) + .any(|(tsid, _)| tsid.shard_count > shard_index.shard_count), + } + } + #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))] pub(crate) async fn upsert_location( &self, @@ -1524,9 +1536,10 @@ impl TenantManager { // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant // // TODO: keeping the parent as InProgress while spawning the children causes read - // unavailability, as we can't acquire a timeline handle for it. The parent should be - // available for reads until the children are ready -- potentially until *all* subsplits - // across all parent shards are complete and the compute has been notified. See: + // unavailability, as we can't acquire a new timeline handle for it (existing handles appear + // to still work though, even downgraded ones). The parent should be available for reads + // until the children are ready -- potentially until *all* subsplits across all parent + // shards are complete and the compute has been notified. See: // . drop(tenant); let mut parent_slot_guard =