diff --git a/compute/patches/pg_repack.patch b/compute/patches/pg_repack.patch index 10ed1054ff..b8a057e222 100644 --- a/compute/patches/pg_repack.patch +++ b/compute/patches/pg_repack.patch @@ -1,5 +1,11 @@ +commit 5eb393810cf7c7bafa4e394dad2e349e2a8cb2cb +Author: Alexey Masterov +Date: Mon Jul 28 18:11:02 2025 +0200 + + Patch for pg_repack + diff --git a/regress/Makefile b/regress/Makefile -index bf6edcb..89b4c7f 100644 +index bf6edcb..110e734 100644 --- a/regress/Makefile +++ b/regress/Makefile @@ -17,7 +17,7 @@ INTVERSION := $(shell echo $$(($$(echo $(VERSION).0 | sed 's/\([[:digit:]]\{1,\} @@ -7,18 +13,36 @@ index bf6edcb..89b4c7f 100644 # -REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by trigger -+REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger ++REGRESS := init-extension noautovacuum repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger autovacuum USE_PGXS = 1 # use pgxs if not in contrib directory PGXS := $(shell $(PG_CONFIG) --pgxs) -diff --git a/regress/expected/init-extension.out b/regress/expected/init-extension.out -index 9f2e171..f6e4f8d 100644 ---- a/regress/expected/init-extension.out -+++ b/regress/expected/init-extension.out -@@ -1,3 +1,2 @@ - SET client_min_messages = warning; - CREATE EXTENSION pg_repack; --RESET client_min_messages; +diff --git a/regress/expected/autovacuum.out b/regress/expected/autovacuum.out +new file mode 100644 +index 0000000..e7f2363 +--- /dev/null ++++ b/regress/expected/autovacuum.out +@@ -0,0 +1,7 @@ ++ALTER SYSTEM SET autovacuum='on'; ++SELECT pg_reload_conf(); ++ pg_reload_conf ++---------------- ++ t ++(1 row) ++ +diff --git a/regress/expected/noautovacuum.out b/regress/expected/noautovacuum.out +new file mode 100644 +index 0000000..fc7978e +--- /dev/null ++++ b/regress/expected/noautovacuum.out +@@ -0,0 +1,7 @@ ++ALTER SYSTEM SET autovacuum='off'; ++SELECT pg_reload_conf(); ++ pg_reload_conf ++---------------- ++ t ++(1 row) ++ diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out index 8d0a94e..63b68bf 100644 --- a/regress/expected/nosuper.out @@ -50,14 +74,22 @@ index 8d0a94e..63b68bf 100644 INFO: repacking table "public.tbl_cluster" ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block DETAIL: query was: RESET lock_timeout -diff --git a/regress/sql/init-extension.sql b/regress/sql/init-extension.sql -index 9f2e171..f6e4f8d 100644 ---- a/regress/sql/init-extension.sql -+++ b/regress/sql/init-extension.sql -@@ -1,3 +1,2 @@ - SET client_min_messages = warning; - CREATE EXTENSION pg_repack; --RESET client_min_messages; +diff --git a/regress/sql/autovacuum.sql b/regress/sql/autovacuum.sql +new file mode 100644 +index 0000000..a8eda63 +--- /dev/null ++++ b/regress/sql/autovacuum.sql +@@ -0,0 +1,2 @@ ++ALTER SYSTEM SET autovacuum='on'; ++SELECT pg_reload_conf(); +diff --git a/regress/sql/noautovacuum.sql b/regress/sql/noautovacuum.sql +new file mode 100644 +index 0000000..13d4836 +--- /dev/null ++++ b/regress/sql/noautovacuum.sql +@@ -0,0 +1,2 @@ ++ALTER SYSTEM SET autovacuum='off'; ++SELECT pg_reload_conf(); diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql index 072f0fa..dbe60f8 100644 --- a/regress/sql/nosuper.sql diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 9c86aba531..2b4802f309 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -82,6 +82,15 @@ struct Cli { #[arg(long, default_value_t = 3081)] pub internal_http_port: u16, + /// Backwards-compatible --http-port for Hadron deployments. Functionally the + /// same as --external-http-port. + #[arg( + long, + conflicts_with = "external_http_port", + conflicts_with = "internal_http_port" + )] + pub http_port: Option, + #[arg(short = 'D', long, value_name = "DATADIR")] pub pgdata: String, @@ -181,6 +190,26 @@ impl Cli { } } +// Hadron helpers to get compatible compute_ctl http ports from Cli. The old `--http-port` +// arg is used and acts the same as `--external-http-port`. The internal http port is defined +// to be http_port + 1. Hadron runs in the dblet environment which uses the host network, so +// we need to be careful with the ports to choose. +fn get_external_http_port(cli: &Cli) -> u16 { + if cli.lakebase_mode { + return cli.http_port.unwrap_or(cli.external_http_port); + } + cli.external_http_port +} +fn get_internal_http_port(cli: &Cli) -> u16 { + if cli.lakebase_mode { + return cli + .http_port + .map(|p| p + 1) + .unwrap_or(cli.internal_http_port); + } + cli.internal_http_port +} + fn main() -> Result<()> { let cli = Cli::parse(); @@ -205,13 +234,18 @@ fn main() -> Result<()> { // enable core dumping for all child processes setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; - installed_extensions::initialize_metrics(); - hadron_metrics::initialize_metrics(); + if cli.lakebase_mode { + installed_extensions::initialize_metrics(); + hadron_metrics::initialize_metrics(); + } let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?; let config = get_config(&cli)?; + let external_http_port = get_external_http_port(&cli); + let internal_http_port = get_internal_http_port(&cli); + let compute_node = ComputeNode::new( ComputeNodeParams { compute_id: cli.compute_id, @@ -220,8 +254,8 @@ fn main() -> Result<()> { pgdata: cli.pgdata.clone(), pgbin: cli.pgbin.clone(), pgversion: get_pg_version_string(&cli.pgbin), - external_http_port: cli.external_http_port, - internal_http_port: cli.internal_http_port, + external_http_port, + internal_http_port, remote_ext_base_url: cli.remote_ext_base_url.clone(), resize_swap_on_bind: cli.resize_swap_on_bind, set_disk_quota_for_fs: cli.set_disk_quota_for_fs, diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 859bf03b03..a52e9d38f4 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -484,14 +484,27 @@ pub struct DatabricksEnvVars { /// Hostname of the Databricks workspace URL this compute instance belongs to. /// Used by postgres to verify Databricks PAT tokens. pub workspace_host: String, + + pub lakebase_mode: bool, } impl DatabricksEnvVars { - pub fn new(compute_spec: &ComputeSpec, compute_id: Option<&String>) -> Self { - // compute_id is a string format of "{endpoint_id}/{compute_idx}" - // endpoint_id is a uuid. We only need to pass down endpoint_id to postgres. - // Panics if compute_id is not set or not in the expected format. - let endpoint_id = compute_id.unwrap().split('/').next().unwrap().to_string(); + pub fn new( + compute_spec: &ComputeSpec, + compute_id: Option<&String>, + instance_id: Option, + lakebase_mode: bool, + ) -> Self { + let endpoint_id = if let Some(instance_id) = instance_id { + // Use instance_id as endpoint_id if it is set. This code path is for PuPr model. + instance_id + } else { + // Use compute_id as endpoint_id if instance_id is not set. The code path is for PrPr model. + // compute_id is a string format of "{endpoint_id}/{compute_idx}" + // endpoint_id is a uuid. We only need to pass down endpoint_id to postgres. + // Panics if compute_id is not set or not in the expected format. + compute_id.unwrap().split('/').next().unwrap().to_string() + }; let workspace_host = compute_spec .databricks_settings .as_ref() @@ -500,6 +513,7 @@ impl DatabricksEnvVars { Self { endpoint_id, workspace_host, + lakebase_mode, } } @@ -509,6 +523,10 @@ impl DatabricksEnvVars { /// Convert DatabricksEnvVars to a list of string pairs that can be passed as env vars. Consumes `self`. pub fn to_env_var_list(self) -> Vec<(String, String)> { + if !self.lakebase_mode { + // In neon env, we don't need to pass down the env vars to postgres. + return vec![]; + } vec![ ( Self::DATABRICKS_ENDPOINT_ID_ENVVAR.to_string(), @@ -558,7 +576,11 @@ impl ComputeNode { let mut new_state = ComputeState::new(); if let Some(spec) = config.spec { let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; - new_state.pspec = Some(pspec); + if params.lakebase_mode { + ComputeNode::set_spec(¶ms, &mut new_state, pspec); + } else { + new_state.pspec = Some(pspec); + } } Ok(ComputeNode { @@ -1159,7 +1181,14 @@ impl ComputeNode { // If it is something different then create_dir() will error out anyway. let pgdata = &self.params.pgdata; let _ok = fs::remove_dir_all(pgdata); - fs::create_dir(pgdata)?; + if self.params.lakebase_mode { + // Ignore creation errors if the directory already exists (e.g. mounting it ahead of time). + // If it is something different then PG startup will error out anyway. + let _ok = fs::create_dir(pgdata); + } else { + fs::create_dir(pgdata)?; + } + fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?; Ok(()) @@ -1638,7 +1667,7 @@ impl ComputeNode { // symlink doesn't affect anything. // // See https://github.com/neondatabase/autoscaling/issues/800 - std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?; + std::fs::remove_dir_all(pgdata_path.join("pg_dynshmem"))?; symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?; match spec.mode { @@ -1653,6 +1682,12 @@ impl ComputeNode { /// Start and stop a postgres process to warm up the VM for startup. pub fn prewarm_postgres_vm_memory(&self) -> Result<()> { + if self.params.lakebase_mode { + // We are running in Hadron mode. Disabling this prewarming step for now as it could run + // into dblet port conflicts and also doesn't add much value with our current infra. + info!("Skipping postgres prewarming in Hadron mode"); + return Ok(()); + } info!("prewarming VM memory"); // Create pgdata @@ -1714,7 +1749,12 @@ impl ComputeNode { let databricks_env_vars = { let state = self.state.lock().unwrap(); let spec = &state.pspec.as_ref().unwrap().spec; - DatabricksEnvVars::new(spec, Some(&self.params.compute_id)) + DatabricksEnvVars::new( + spec, + Some(&self.params.compute_id), + self.params.instance_id.clone(), + self.params.lakebase_mode, + ) }; info!( @@ -1886,7 +1926,15 @@ impl ComputeNode { /// Do initial configuration of the already started Postgres. #[instrument(skip_all)] pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { - let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config")); + let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config")); + + if self.params.lakebase_mode { + // Set a 2-minute statement_timeout for the session applying config. The individual SQL statements + // used in apply_spec_sql() should not take long (they are just creating users and installing + // extensions). If any of them are stuck for an extended period of time it usually indicates a + // pageserver connectivity problem and we should bail out. + conf.options("-c statement_timeout=2min"); + } let conf = Arc::new(conf); let spec = Arc::new( @@ -2204,7 +2252,17 @@ impl ComputeNode { pub fn check_for_core_dumps(&self) -> Result<()> { let core_dump_dir = match std::env::consts::OS { "macos" => Path::new("/cores/"), - _ => Path::new(&self.params.pgdata), + // BEGIN HADRON + // NB: Read core dump files from a fixed location outside of + // the data directory since `compute_ctl` wipes the data directory + // across container restarts. + _ => { + if self.params.lakebase_mode { + Path::new("/databricks/logs/brickstore") + } else { + Path::new(&self.params.pgdata) + } + } // END HADRON }; // Collect core dump paths if any @@ -2517,7 +2575,7 @@ LIMIT 100", if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { libs_vec = libs .split(&[',', '\'', ' ']) - .filter(|s| *s != "neon" && !s.is_empty()) + .filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty()) .map(str::to_string) .collect(); } @@ -2536,7 +2594,7 @@ LIMIT 100", if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) { preload_libs_vec = libs .split(&[',', '\'', ' ']) - .filter(|s| *s != "neon" && !s.is_empty()) + .filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty()) .map(str::to_string) .collect(); } diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index 5adf38f457..23fabeccd2 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -9,7 +9,6 @@ regex.workspace = true bytes.workspace = true anyhow.workspace = true crc32c.workspace = true -criterion.workspace = true once_cell.workspace = true pprof.workspace = true thiserror.workspace = true @@ -20,6 +19,7 @@ tracing.workspace = true postgres_versioninfo.workspace = true [dev-dependencies] +criterion.workspace = true env_logger.workspace = true postgres.workspace = true diff --git a/libs/proxy/tokio-postgres2/src/error/mod.rs b/libs/proxy/tokio-postgres2/src/error/mod.rs index 6e68b1e595..3fbb97f9bb 100644 --- a/libs/proxy/tokio-postgres2/src/error/mod.rs +++ b/libs/proxy/tokio-postgres2/src/error/mod.rs @@ -9,7 +9,7 @@ use postgres_protocol2::message::backend::{ErrorFields, ErrorResponseBody}; pub use self::sqlstate::*; #[allow(clippy::unreadable_literal)] -mod sqlstate; +pub mod sqlstate; /// The severity of a Postgres error or notice. #[derive(Debug, Copy, Clone, PartialEq, Eq)] diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index e6a90fb582..dad37ebe74 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -14,9 +14,9 @@ use utils::logging::warn_slow; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; use crate::retry::Retry; -use crate::split::GetPageSplitter; use compute_api::spec::PageserverProtocol; use pageserver_page_api as page_api; +use pageserver_page_api::GetPageSplitter; use utils::id::{TenantId, TimelineId}; use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize}; @@ -230,16 +230,14 @@ impl PageserverClient { ) -> tonic::Result { // Fast path: request is for a single shard. if let Some(shard_id) = - GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size) - .map_err(|err| tonic::Status::internal(err.to_string()))? + GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)? { return Self::get_page_with_shard(req, shards.get(shard_id)?).await; } // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and // reassemble the responses. - let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size) - .map_err(|err| tonic::Status::internal(err.to_string()))?; + let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)?; let mut shard_requests = FuturesUnordered::new(); for (shard_id, shard_req) in splitter.drain_requests() { @@ -249,14 +247,10 @@ impl PageserverClient { } while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? { - splitter - .add_response(shard_id, shard_response) - .map_err(|err| tonic::Status::internal(err.to_string()))?; + splitter.add_response(shard_id, shard_response)?; } - splitter - .get_response() - .map_err(|err| tonic::Status::internal(err.to_string())) + Ok(splitter.collect_response()?) } /// Fetches pages on the given shard. Does not retry internally. diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 14fb3fbd5a..4999fd3d0a 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -1,6 +1,5 @@ mod client; mod pool; mod retry; -mod split; pub use client::{PageserverClient, ShardSpec}; diff --git a/pageserver/page_api/src/lib.rs b/pageserver/page_api/src/lib.rs index e78f6ce206..b9be6b8b91 100644 --- a/pageserver/page_api/src/lib.rs +++ b/pageserver/page_api/src/lib.rs @@ -19,7 +19,9 @@ pub mod proto { } mod client; -pub use client::Client; mod model; +mod split; +pub use client::Client; pub use model::*; +pub use split::{GetPageSplitter, SplitError}; diff --git a/pageserver/client_grpc/src/split.rs b/pageserver/page_api/src/split.rs similarity index 73% rename from pageserver/client_grpc/src/split.rs rename to pageserver/page_api/src/split.rs index 8631638686..27c1c995e0 100644 --- a/pageserver/client_grpc/src/split.rs +++ b/pageserver/page_api/src/split.rs @@ -1,20 +1,19 @@ use std::collections::HashMap; -use anyhow::anyhow; use bytes::Bytes; +use crate::model::*; use pageserver_api::key::rel_block_to_key; use pageserver_api::shard::key_to_shard_number; -use pageserver_page_api as page_api; use utils::shard::{ShardCount, ShardIndex, ShardStripeSize}; /// Splits GetPageRequests that straddle shard boundaries and assembles the responses. /// TODO: add tests for this. pub struct GetPageSplitter { /// Split requests by shard index. - requests: HashMap, + requests: HashMap, /// The response being assembled. Preallocated with empty pages, to be filled in. - response: page_api::GetPageResponse, + response: GetPageResponse, /// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used /// to assemble the response pages in the same order as the original request. block_shards: Vec, @@ -24,22 +23,22 @@ impl GetPageSplitter { /// Checks if the given request only touches a single shard, and returns the shard ID. This is /// the common case, so we check first in order to avoid unnecessary allocations and overhead. pub fn for_single_shard( - req: &page_api::GetPageRequest, + req: &GetPageRequest, count: ShardCount, stripe_size: Option, - ) -> anyhow::Result> { + ) -> Result, SplitError> { // Fast path: unsharded tenant. if count.is_unsharded() { return Ok(Some(ShardIndex::unsharded())); } let Some(stripe_size) = stripe_size else { - return Err(anyhow!("stripe size must be given for sharded tenants")); + return Err("stripe size must be given for sharded tenants".into()); }; // Find the first page's shard, for comparison. let Some(&first_page) = req.block_numbers.first() else { - return Err(anyhow!("no block numbers in request")); + return Err("no block numbers in request".into()); }; let key = rel_block_to_key(req.rel, first_page); let shard_number = key_to_shard_number(count, stripe_size, &key); @@ -57,10 +56,10 @@ impl GetPageSplitter { /// Splits the given request. pub fn split( - req: page_api::GetPageRequest, + req: GetPageRequest, count: ShardCount, stripe_size: Option, - ) -> anyhow::Result { + ) -> Result { // The caller should make sure we don't split requests unnecessarily. debug_assert!( Self::for_single_shard(&req, count, stripe_size)?.is_none(), @@ -68,10 +67,10 @@ impl GetPageSplitter { ); if count.is_unsharded() { - return Err(anyhow!("unsharded tenant, no point in splitting request")); + return Err("unsharded tenant, no point in splitting request".into()); } let Some(stripe_size) = stripe_size else { - return Err(anyhow!("stripe size must be given for sharded tenants")); + return Err("stripe size must be given for sharded tenants".into()); }; // Split the requests by shard index. @@ -84,7 +83,7 @@ impl GetPageSplitter { requests .entry(shard_id) - .or_insert_with(|| page_api::GetPageRequest { + .or_insert_with(|| GetPageRequest { request_id: req.request_id, request_class: req.request_class, rel: req.rel, @@ -98,16 +97,16 @@ impl GetPageSplitter { // Construct a response to be populated by shard responses. Preallocate empty page slots // with the expected block numbers. - let response = page_api::GetPageResponse { + let response = GetPageResponse { request_id: req.request_id, - status_code: page_api::GetPageStatusCode::Ok, + status_code: GetPageStatusCode::Ok, reason: None, rel: req.rel, pages: req .block_numbers .into_iter() .map(|block_number| { - page_api::Page { + Page { block_number, image: Bytes::new(), // empty page slot to be filled in } @@ -123,43 +122,38 @@ impl GetPageSplitter { } /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations. - pub fn drain_requests( - &mut self, - ) -> impl Iterator { + pub fn drain_requests(&mut self) -> impl Iterator { self.requests.drain() } /// Adds a response from the given shard. The response must match the request ID and have an OK /// status code. A response must not already exist for the given shard ID. - #[allow(clippy::result_large_err)] pub fn add_response( &mut self, shard_id: ShardIndex, - response: page_api::GetPageResponse, - ) -> anyhow::Result<()> { + response: GetPageResponse, + ) -> Result<(), SplitError> { // The caller should already have converted status codes into tonic::Status. - if response.status_code != page_api::GetPageStatusCode::Ok { - return Err(anyhow!( + if response.status_code != GetPageStatusCode::Ok { + return Err(SplitError(format!( "unexpected non-OK response for shard {shard_id}: {} {}", response.status_code, response.reason.unwrap_or_default() - )); + ))); } if response.request_id != self.response.request_id { - return Err(anyhow!( + return Err(SplitError(format!( "response ID mismatch for shard {shard_id}: expected {}, got {}", - self.response.request_id, - response.request_id - )); + self.response.request_id, response.request_id + ))); } if response.request_id != self.response.request_id { - return Err(anyhow!( + return Err(SplitError(format!( "response ID mismatch for shard {shard_id}: expected {}, got {}", - self.response.request_id, - response.request_id - )); + self.response.request_id, response.request_id + ))); } // Place the shard response pages into the assembled response, in request order. @@ -171,26 +165,27 @@ impl GetPageSplitter { } let Some(slot) = self.response.pages.get_mut(i) else { - return Err(anyhow!("no block_shards slot {i} for shard {shard_id}")); + return Err(SplitError(format!( + "no block_shards slot {i} for shard {shard_id}" + ))); }; let Some(page) = pages.next() else { - return Err(anyhow!( + return Err(SplitError(format!( "missing page {} in shard {shard_id} response", slot.block_number - )); + ))); }; if page.block_number != slot.block_number { - return Err(anyhow!( + return Err(SplitError(format!( "shard {shard_id} returned wrong page at index {i}, expected {} got {}", - slot.block_number, - page.block_number - )); + slot.block_number, page.block_number + ))); } if !slot.image.is_empty() { - return Err(anyhow!( + return Err(SplitError(format!( "shard {shard_id} returned duplicate page {} at index {i}", slot.block_number - )); + ))); } *slot = page; @@ -198,32 +193,54 @@ impl GetPageSplitter { // Make sure we've consumed all pages from the shard response. if let Some(extra_page) = pages.next() { - return Err(anyhow!( + return Err(SplitError(format!( "shard {shard_id} returned extra page: {}", extra_page.block_number - )); + ))); } Ok(()) } - /// Fetches the final, assembled response. - #[allow(clippy::result_large_err)] - pub fn get_response(self) -> anyhow::Result { + /// Collects the final, assembled response. + pub fn collect_response(self) -> Result { // Check that the response is complete. for (i, page) in self.response.pages.iter().enumerate() { if page.image.is_empty() { - return Err(anyhow!( + return Err(SplitError(format!( "missing page {} for shard {}", page.block_number, self.block_shards .get(i) .map(|s| s.to_string()) .unwrap_or_else(|| "?".to_string()) - )); + ))); } } Ok(self.response) } } + +/// A GetPageSplitter error. +#[derive(Debug, thiserror::Error)] +#[error("{0}")] +pub struct SplitError(String); + +impl From<&str> for SplitError { + fn from(err: &str) -> Self { + SplitError(err.to_string()) + } +} + +impl From for SplitError { + fn from(err: String) -> Self { + SplitError(err) + } +} + +impl From for tonic::Status { + fn from(err: SplitError) -> Self { + tonic::Status::internal(err.0) + } +} diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 40d5d2f8f9..7e723778f5 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -16,7 +16,8 @@ use anyhow::{Context as _, bail}; use bytes::{Buf as _, BufMut as _, BytesMut}; use chrono::Utc; use futures::future::BoxFuture; -use futures::{FutureExt, Stream}; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, Stream, StreamExt as _}; use itertools::Itertools; use jsonwebtoken::TokenData; use once_cell::sync::OnceCell; @@ -35,8 +36,8 @@ use pageserver_api::pagestream_api::{ }; use pageserver_api::reltag::SlruKind; use pageserver_api::shard::TenantShardId; -use pageserver_page_api as page_api; use pageserver_page_api::proto; +use pageserver_page_api::{self as page_api, GetPageSplitter}; use postgres_backend::{ AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error, }; @@ -3551,18 +3552,6 @@ impl GrpcPageServiceHandler { Ok(CancellableTask { task, cancel }) } - /// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of - /// relations and their sizes, as well as SLRU segments and similar data. - #[allow(clippy::result_large_err)] - fn ensure_shard_zero(timeline: &Handle) -> Result<(), tonic::Status> { - match timeline.get_shard_index().shard_number.0 { - 0 => Ok(()), - shard => Err(tonic::Status::invalid_argument(format!( - "request must execute on shard zero (is shard {shard})", - ))), - } - } - /// Generates a PagestreamRequest header from a ReadLsn and request ID. fn make_hdr( read_lsn: page_api::ReadLsn, @@ -3577,30 +3566,72 @@ impl GrpcPageServiceHandler { } } - /// Acquires a timeline handle for the given request. + /// Acquires a timeline handle for the given request. The shard index must match a local shard. /// - /// TODO: during shard splits, the compute may still be sending requests to the parent shard - /// until the entire split is committed and the compute is notified. Consider installing a - /// temporary shard router from the parent to the children while the split is in progress. - /// - /// TODO: consider moving this to a middleware layer; all requests need it. Needs to manage - /// the TimelineHandles lifecycle. - /// - /// TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to avoid - /// the unnecessary overhead. + /// NB: this will fail during shard splits, see comment on [`Self::maybe_split_get_page`]. async fn get_request_timeline( &self, req: &tonic::Request, ) -> Result, GetActiveTimelineError> { - let ttid = *extract::(req); + let TenantTimelineId { + tenant_id, + timeline_id, + } = *extract::(req); let shard_index = *extract::(req); - let shard_selector = ShardSelector::Known(shard_index); + // TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to + // avoid the unnecessary overhead. TimelineHandles::new(self.tenant_manager.clone()) - .get(ttid.tenant_id, ttid.timeline_id, shard_selector) + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) .await } + /// Acquires a timeline handle for the given request, which must be for shard zero. Most + /// metadata requests are only valid on shard zero. + /// + /// NB: during an ongoing shard split, the compute will keep talking to the parent shard until + /// the split is committed, but the parent shard may have been removed in the meanwhile. In that + /// case, we reroute the request to the new child shard. See [`Self::maybe_split_get_page`]. + /// + /// TODO: revamp the split protocol to avoid this child routing. + async fn get_request_timeline_shard_zero( + &self, + req: &tonic::Request, + ) -> Result, tonic::Status> { + let TenantTimelineId { + tenant_id, + timeline_id, + } = *extract::(req); + let shard_index = *extract::(req); + + if shard_index.shard_number.0 != 0 { + return Err(tonic::Status::invalid_argument(format!( + "request only valid on shard zero (requested shard {shard_index})", + ))); + } + + // TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to + // avoid the unnecessary overhead. + let mut handles = TimelineHandles::new(self.tenant_manager.clone()); + match handles + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) + .await + { + Ok(timeline) => Ok(timeline), + Err(err) => { + // We may be in the middle of a shard split. Try to find a child shard 0. + if let Ok(timeline) = handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .await + && timeline.get_shard_index().shard_count > shard_index.shard_count + { + return Ok(timeline); + } + Err(err.into()) + } + } + } + /// Starts a SmgrOpTimer at received_at, throttles the request, and records execution start. /// Only errors if the timeline is shutting down. /// @@ -3630,28 +3661,22 @@ impl GrpcPageServiceHandler { /// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send /// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or /// split them up in the client or server. - #[instrument(skip_all, fields(req_id, rel, blkno, blks, req_lsn, mod_lsn))] + #[instrument(skip_all, fields( + req_id = %req.request_id, + rel = %req.rel, + blkno = %req.block_numbers[0], + blks = %req.block_numbers.len(), + lsn = %req.read_lsn, + ))] async fn get_page( ctx: &RequestContext, - timeline: &WeakHandle, - req: proto::GetPageRequest, + timeline: Handle, + req: page_api::GetPageRequest, io_concurrency: IoConcurrency, - ) -> Result { - let received_at = Instant::now(); - let timeline = timeline.upgrade()?; + received_at: Instant, + ) -> Result { let ctx = ctx.with_scope_page_service_pagestream(&timeline); - // Validate the request, decorate the span, and convert it to a Pagestream request. - let req = page_api::GetPageRequest::try_from(req)?; - - span_record!( - req_id = %req.request_id, - rel = %req.rel, - blkno = %req.block_numbers[0], - blks = %req.block_numbers.len(), - lsn = %req.read_lsn, - ); - for &blkno in &req.block_numbers { let shard = timeline.get_shard_identity(); let key = rel_block_to_key(req.rel, blkno); @@ -3739,7 +3764,89 @@ impl GrpcPageServiceHandler { }; } - Ok(resp.into()) + Ok(resp) + } + + /// Processes a GetPage request when there is a potential shard split in progress. We have to + /// reroute the request to any local child shards, and split batch requests that straddle + /// multiple child shards. + /// + /// Parent shards are split and removed incrementally (there may be many parent shards when + /// splitting an already-sharded tenant), but the compute is only notified once the overall + /// split commits, which can take several minutes. In the meanwhile, the compute will be sending + /// requests to the parent shards. + /// + /// TODO: add test infrastructure to provoke this situation frequently and for long periods of + /// time, to properly exercise it. + /// + /// TODO: revamp the split protocol to avoid this, e.g.: + /// * Keep the parent shard until the split commits and the compute is notified. + /// * Notify the compute about each subsplit. + /// * Return an error that updates the compute's shard map. + #[instrument(skip_all)] + #[allow(clippy::too_many_arguments)] + async fn maybe_split_get_page( + ctx: &RequestContext, + handles: &mut TimelineHandles, + tenant_id: TenantId, + timeline_id: TimelineId, + parent: ShardIndex, + req: page_api::GetPageRequest, + io_concurrency: IoConcurrency, + received_at: Instant, + ) -> Result { + // Check the first page to see if we have any child shards at all. Otherwise, the compute is + // just talking to the wrong Pageserver. If the parent has been split, the shard now owning + // the page must have a higher shard count. + let timeline = handles + .get( + tenant_id, + timeline_id, + ShardSelector::Page(rel_block_to_key(req.rel, req.block_numbers[0])), + ) + .await?; + + let shard_id = timeline.get_shard_identity(); + if shard_id.count <= parent.shard_count { + return Err(HandleUpgradeError::ShutDown.into()); // emulate original error + } + + // Fast path: the request fits in a single shard. + if let Some(shard_index) = + GetPageSplitter::for_single_shard(&req, shard_id.count, Some(shard_id.stripe_size))? + { + // We got the shard ID from the first page, so these must be equal. + assert_eq!(shard_index.shard_number, shard_id.number); + assert_eq!(shard_index.shard_count, shard_id.count); + return Self::get_page(ctx, timeline, req, io_concurrency, received_at).await; + } + + // The request spans multiple shards; split it and dispatch parallel requests. All pages + // were originally in the parent shard, and during a split all children are local, so we + // expect to find local shards for all pages. + let mut splitter = GetPageSplitter::split(req, shard_id.count, Some(shard_id.stripe_size))?; + + let mut shard_requests = FuturesUnordered::new(); + for (shard_index, shard_req) in splitter.drain_requests() { + let timeline = handles + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) + .await?; + let future = Self::get_page( + ctx, + timeline, + shard_req, + io_concurrency.clone(), + received_at, + ) + .map(move |result| result.map(|resp| (shard_index, resp))); + shard_requests.push(future); + } + + while let Some((shard_index, shard_response)) = shard_requests.next().await.transpose()? { + splitter.add_response(shard_index, shard_response)?; + } + + Ok(splitter.collect_response()?) } } @@ -3768,11 +3875,10 @@ impl proto::PageService for GrpcPageServiceHandler { // to be the sweet spot where throughput is saturated. const CHUNK_SIZE: usize = 256 * 1024; - let timeline = self.get_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_timeline(&timeline); // Validate the request and decorate the span. - Self::ensure_shard_zero(&timeline)?; if timeline.is_archived() == Some(true) { return Err(tonic::Status::failed_precondition("timeline is archived")); } @@ -3888,11 +3994,10 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request, ) -> Result, tonic::Status> { let received_at = extract::(&req).0; - let timeline = self.get_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. - Self::ensure_shard_zero(&timeline)?; let req: page_api::GetDbSizeRequest = req.into_inner().try_into()?; span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn); @@ -3921,14 +4026,29 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request>, ) -> Result, tonic::Status> { // Extract the timeline from the request and check that it exists. - let ttid = *extract::(&req); + // + // NB: during shard splits, the compute may still send requests to the parent shard. We'll + // reroute requests to the child shards below, but we also detect the common cases here + // where either the shard exists or no shards exist at all. If we have a child shard, we + // can't acquire a weak handle because we don't know which child shard to use yet. + let TenantTimelineId { + tenant_id, + timeline_id, + } = *extract::(&req); let shard_index = *extract::(&req); - let shard_selector = ShardSelector::Known(shard_index); let mut handles = TimelineHandles::new(self.tenant_manager.clone()); - handles - .get(ttid.tenant_id, ttid.timeline_id, shard_selector) - .await?; + let timeline = match handles + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) + .await + { + // The timeline shard exists. Keep a weak handle to reuse for each request. + Ok(timeline) => Some(timeline.downgrade()), + // The shard doesn't exist, but a child shard does. We'll reroute requests later. + Err(_) if self.tenant_manager.has_child_shard(tenant_id, shard_index) => None, + // Failed to fetch the timeline, and no child shard exists. Error out. + Err(err) => return Err(err.into()), + }; // Spawn an IoConcurrency sidecar, if enabled. let gate_guard = self @@ -3945,11 +4065,9 @@ impl proto::PageService for GrpcPageServiceHandler { let mut reqs = req.into_inner(); let resps = async_stream::try_stream! { - let timeline = handles - .get(ttid.tenant_id, ttid.timeline_id, shard_selector) - .await? - .downgrade(); loop { + // Wait for the next client request. + // // NB: Tonic considers the entire stream to be an in-flight request and will wait // for it to complete before shutting down. React to cancellation between requests. let req = tokio::select! { @@ -3962,16 +4080,44 @@ impl proto::PageService for GrpcPageServiceHandler { Err(err) => Err(err), }, }?; + + let received_at = Instant::now(); let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default(); - let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone()) + + // Process the request, using a closure to capture errors. + let process_request = async || { + let req = page_api::GetPageRequest::try_from(req)?; + + // Fast path: use the pre-acquired timeline handle. + if let Some(Ok(timeline)) = timeline.as_ref().map(|t| t.upgrade()) { + return Self::get_page(&ctx, timeline, req, io_concurrency.clone(), received_at) + .instrument(span.clone()) // propagate request span + .await + } + + // The timeline handle is stale. During shard splits, the compute may still be + // sending requests to the parent shard. Try to re-route requests to the child + // shards, and split any batch requests that straddle multiple child shards. + Self::maybe_split_get_page( + &ctx, + &mut handles, + tenant_id, + timeline_id, + shard_index, + req, + io_concurrency.clone(), + received_at, + ) .instrument(span.clone()) // propagate request span - .await; - yield match result { - Ok(resp) => resp, - // Convert per-request errors to GetPageResponses as appropriate, or terminate - // the stream with a tonic::Status. Log the error regardless, since - // ObservabilityLayer can't automatically log stream errors. + .await + }; + + // Return the response. Convert per-request errors to GetPageResponses if + // appropriate, or terminate the stream with a tonic::Status. + yield match process_request().await { + Ok(resp) => resp.into(), Err(status) => { + // Log the error, since ObservabilityLayer won't see stream errors. // TODO: it would be nice if we could propagate the get_page() fields here. span.in_scope(|| { warn!("request failed with {:?}: {}", status.code(), status.message()); @@ -3991,11 +4137,10 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request, ) -> Result, tonic::Status> { let received_at = extract::(&req).0; - let timeline = self.get_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. - Self::ensure_shard_zero(&timeline)?; let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?; let allow_missing = req.allow_missing; @@ -4028,11 +4173,10 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request, ) -> Result, tonic::Status> { let received_at = extract::(&req).0; - let timeline = self.get_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. - Self::ensure_shard_zero(&timeline)?; let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?; span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn); @@ -4062,6 +4206,10 @@ impl proto::PageService for GrpcPageServiceHandler { &self, req: tonic::Request, ) -> Result, tonic::Status> { + // TODO: this won't work during shard splits, as the request is directed at a specific shard + // but the parent shard is removed before the split commits and the compute is notified + // (which can take several minutes for large tenants). That's also the case for the libpq + // implementation, so we keep the behavior for now. let timeline = self.get_request_timeline(&req).await?; let ctx = self.ctx.with_scope_timeline(&timeline); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 4432b4bba8..0feba5e9c8 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -826,6 +826,18 @@ impl TenantManager { peek_slot.is_some() } + /// Returns whether a local shard exists that's a child of the given tenant shard. Note that + /// this just checks for any shard with a larger shard count, and it may not be a direct child + /// of the given shard (their keyspace may not overlap). + pub(crate) fn has_child_shard(&self, tenant_id: TenantId, shard_index: ShardIndex) -> bool { + match &*self.tenants.read().unwrap() { + TenantsMap::Initializing => false, + TenantsMap::Open(slots) | TenantsMap::ShuttingDown(slots) => slots + .range(TenantShardId::tenant_range(tenant_id)) + .any(|(tsid, _)| tsid.shard_count > shard_index.shard_count), + } + } + #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))] pub(crate) async fn upsert_location( &self, @@ -1524,9 +1536,10 @@ impl TenantManager { // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant // // TODO: keeping the parent as InProgress while spawning the children causes read - // unavailability, as we can't acquire a timeline handle for it. The parent should be - // available for reads until the children are ready -- potentially until *all* subsplits - // across all parent shards are complete and the compute has been notified. See: + // unavailability, as we can't acquire a new timeline handle for it (existing handles appear + // to still work though, even downgraded ones). The parent should be available for reads + // until the children are ready -- potentially until *all* subsplits across all parent + // shards are complete and the compute has been notified. See: // . drop(tenant); let mut parent_slot_guard = diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 1031f185a6..158118f860 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -1440,7 +1440,6 @@ check_neon_id(char **newval, void **extra, GucSource source) return **newval == '\0' || HexDecodeString(id, *newval, 16); } - void PagestoreShmemInit(void) { diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 07de696d7b..e831fca7f8 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -51,6 +51,7 @@ void _PG_init(void); bool lakebase_mode = false; static int running_xacts_overflow_policy; +static emit_log_hook_type prev_emit_log_hook; static bool monitor_query_exec_time = false; static ExecutorStart_hook_type prev_ExecutorStart = NULL; @@ -81,6 +82,8 @@ uint32 WAIT_EVENT_NEON_PS_READ; uint32 WAIT_EVENT_NEON_WAL_DL; #endif +int databricks_test_hook = 0; + enum RunningXactsOverflowPolicies { OP_IGNORE, OP_SKIP, @@ -445,6 +448,20 @@ ReportSearchPath(void) static int neon_pgstat_file_size_limit; #endif +static void DatabricksSqlErrorHookImpl(ErrorData *edata) { + if (prev_emit_log_hook != NULL) { + prev_emit_log_hook(edata); + } + + if (edata->sqlerrcode == ERRCODE_DATA_CORRUPTED) { + pg_atomic_fetch_add_u32(&databricks_metrics_shared->data_corruption_count, 1); + } else if (edata->sqlerrcode == ERRCODE_INDEX_CORRUPTED) { + pg_atomic_fetch_add_u32(&databricks_metrics_shared->index_corruption_count, 1); + } else if (edata->sqlerrcode == ERRCODE_INTERNAL_ERROR) { + pg_atomic_fetch_add_u32(&databricks_metrics_shared->internal_error_count, 1); + } +} + void _PG_init(void) { @@ -456,6 +473,11 @@ _PG_init(void) load_file("$libdir/neon_rmgr", false); #endif + if (lakebase_mode) { + prev_emit_log_hook = emit_log_hook; + emit_log_hook = DatabricksSqlErrorHookImpl; + } + /* * Initializing a pre-loaded Postgres extension happens in three stages: * @@ -594,6 +616,19 @@ _PG_init(void) 0, NULL, NULL, NULL); + // A test hook used in sql regress to trigger specific behaviors + // to test features easily. + DefineCustomIntVariable( + "databricks.test_hook", + "The test hook used in sql regress tests only", + NULL, + &databricks_test_hook, + 0, + 0, INT32_MAX, + PGC_SUSET, + 0, + NULL, NULL, NULL); + /* * Important: This must happen after other parts of the extension are * loaded, otherwise any settings to GUCs that were set before the @@ -816,6 +851,9 @@ neon_shmem_startup_hook(void) LfcShmemInit(); NeonPerfCountersShmemInit(); + if (lakebase_mode) { + DatabricksMetricsShmemInit(); + } PagestoreShmemInit(); RelsizeCacheShmemInit(); WalproposerShmemInit(); diff --git a/pgxn/neon/neon_perf_counters.c b/pgxn/neon/neon_perf_counters.c index dd576e4e73..a38f876a0c 100644 --- a/pgxn/neon/neon_perf_counters.c +++ b/pgxn/neon/neon_perf_counters.c @@ -19,7 +19,35 @@ #include "neon.h" #include "neon_perf_counters.h" -#include "neon_pgversioncompat.h" +#include "walproposer.h" + +/* BEGIN_HADRON */ +databricks_metrics *databricks_metrics_shared; + +Size +DatabricksMetricsShmemSize(void) +{ + return sizeof(databricks_metrics); +} + +void +DatabricksMetricsShmemInit(void) +{ + bool found; + + databricks_metrics_shared = + ShmemInitStruct("Databricks counters", + DatabricksMetricsShmemSize(), + &found); + Assert(found == IsUnderPostmaster); + if (!found) + { + pg_atomic_init_u32(&databricks_metrics_shared->index_corruption_count, 0); + pg_atomic_init_u32(&databricks_metrics_shared->data_corruption_count, 0); + pg_atomic_init_u32(&databricks_metrics_shared->internal_error_count, 0); + } +} +/* END_HADRON */ neon_per_backend_counters *neon_per_backend_counters_shared; @@ -38,11 +66,12 @@ NeonPerfCountersShmemRequest(void) #else size = mul_size(NUM_NEON_PERF_COUNTER_SLOTS, sizeof(neon_per_backend_counters)); #endif + if (lakebase_mode) { + size = add_size(size, DatabricksMetricsShmemSize()); + } RequestAddinShmemSpace(size); } - - void NeonPerfCountersShmemInit(void) { @@ -395,6 +424,33 @@ neon_get_perf_counters(PG_FUNCTION_ARGS) metric_to_datums(&metrics[i], &values[0], &nulls[0]); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); } + + if (lakebase_mode) { + + if (databricks_test_hook == TestHookCorruption) { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("test corruption"))); + } + + // Not ideal but piggyback our databricks counters into the neon perf counters view + // so that we don't need to introduce neon--1.x+1.sql to add a new view. + { + metric_t databricks_metrics[] = { + {"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)}, + {"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)}, + {"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)}, + {NULL, false, 0, 0}, + }; + for (int i = 0; databricks_metrics[i].name != NULL; i++) + { + metric_to_datums(&databricks_metrics[i], &values[0], &nulls[0]); + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + } + } + /* END_HADRON */ + } + pfree(metrics); return (Datum) 0; diff --git a/pgxn/neon/neon_perf_counters.h b/pgxn/neon/neon_perf_counters.h index bc4efddee5..0196559806 100644 --- a/pgxn/neon/neon_perf_counters.h +++ b/pgxn/neon/neon_perf_counters.h @@ -177,5 +177,23 @@ extern void inc_query_time(uint64 elapsed); extern Size NeonPerfCountersShmemSize(void); extern void NeonPerfCountersShmemInit(void); +/* BEGIN_HADRON */ +typedef struct +{ + pg_atomic_uint32 index_corruption_count; + pg_atomic_uint32 data_corruption_count; + pg_atomic_uint32 internal_error_count; +} databricks_metrics; + +extern databricks_metrics *databricks_metrics_shared; + +extern Size DatabricksMetricsShmemSize(void); +extern void DatabricksMetricsShmemInit(void); + +extern int databricks_test_hook; + +static const int TestHookCorruption = 1; +/* END_HADRON */ + #endif /* NEON_PERF_COUNTERS_H */ diff --git a/proxy/src/compute/mod.rs b/proxy/src/compute/mod.rs index ca784423ee..43cfe70206 100644 --- a/proxy/src/compute/mod.rs +++ b/proxy/src/compute/mod.rs @@ -8,6 +8,7 @@ use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; use postgres_client::config::{AuthKeys, ChannelBinding, SslMode}; use postgres_client::connect_raw::StartupStream; +use postgres_client::error::SqlState; use postgres_client::maybe_tls_stream::MaybeTlsStream; use postgres_client::tls::MakeTlsConnect; use thiserror::Error; @@ -22,7 +23,7 @@ use crate::context::RequestContext; use crate::control_plane::client::ApiLockError; use crate::control_plane::errors::WakeComputeError; use crate::control_plane::messages::MetricsAuxInfo; -use crate::error::{ReportableError, UserFacingError}; +use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::metrics::{Metrics, NumDbConnectionsGuard}; use crate::pqproto::StartupMessageParams; use crate::proxy::connect_compute::TlsNegotiation; @@ -65,12 +66,13 @@ impl UserFacingError for PostgresError { } impl ReportableError for PostgresError { - fn get_error_kind(&self) -> crate::error::ErrorKind { + fn get_error_kind(&self) -> ErrorKind { match self { - PostgresError::Postgres(e) if e.as_db_error().is_some() => { - crate::error::ErrorKind::Postgres - } - PostgresError::Postgres(_) => crate::error::ErrorKind::Compute, + PostgresError::Postgres(err) => match err.as_db_error() { + Some(err) if err.code() == &SqlState::INVALID_CATALOG_NAME => ErrorKind::User, + Some(_) => ErrorKind::Postgres, + None => ErrorKind::Compute, + }, } } } @@ -110,9 +112,9 @@ impl UserFacingError for ConnectionError { } impl ReportableError for ConnectionError { - fn get_error_kind(&self) -> crate::error::ErrorKind { + fn get_error_kind(&self) -> ErrorKind { match self { - ConnectionError::TlsError(_) => crate::error::ErrorKind::Compute, + ConnectionError::TlsError(_) => ErrorKind::Compute, ConnectionError::WakeComputeError(e) => e.get_error_kind(), ConnectionError::TooManyConnectionAttempts(e) => e.get_error_kind(), #[cfg(test)] diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index eb879f98e7..511bdc4e42 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -4,6 +4,7 @@ use std::time::Duration; use ed25519_dalek::SigningKey; use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; use jose_jwk::jose_b64; +use postgres_client::error::SqlState; use postgres_client::maybe_tls_stream::MaybeTlsStream; use rand_core::OsRng; use tracing::field::display; @@ -459,15 +460,14 @@ impl ReportableError for HttpConnError { match self { HttpConnError::ConnectError(_) => ErrorKind::Compute, HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute, - HttpConnError::PostgresConnectionError(p) => { - if p.as_db_error().is_some() { - // postgres rejected the connection - ErrorKind::Postgres - } else { - // couldn't even reach postgres - ErrorKind::Compute - } - } + HttpConnError::PostgresConnectionError(p) => match p.as_db_error() { + // user provided a wrong database name + Some(err) if err.code() == &SqlState::INVALID_CATALOG_NAME => ErrorKind::User, + // postgres rejected the connection + Some(_) => ErrorKind::Postgres, + // couldn't even reach postgres + None => ErrorKind::Compute, + }, HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute, HttpConnError::ComputeCtl(_) => ErrorKind::Service, HttpConnError::JwtPayloadError(_) => ErrorKind::User, diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 26f65379e7..c334e820d7 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -192,34 +192,29 @@ pub(crate) async fn handle( let line = get(db_error, |db| db.line().map(|l| l.to_string())); let routine = get(db_error, |db| db.routine()); - match &e { - SqlOverHttpError::Postgres(e) - if e.as_db_error().is_some() && error_kind == ErrorKind::User => - { - // this error contains too much info, and it's not an error we care about. - if tracing::enabled!(Level::DEBUG) { - tracing::debug!( - kind=error_kind.to_metric_label(), - error=%e, - msg=message, - "forwarding error to user" - ); - } else { - tracing::info!( - kind = error_kind.to_metric_label(), - error = "bad query", - "forwarding error to user" - ); - } - } - _ => { - tracing::info!( + if db_error.is_some() && error_kind == ErrorKind::User { + // this error contains too much info, and it's not an error we care about. + if tracing::enabled!(Level::DEBUG) { + debug!( kind=error_kind.to_metric_label(), error=%e, msg=message, "forwarding error to user" ); + } else { + info!( + kind = error_kind.to_metric_label(), + error = "bad query", + "forwarding error to user" + ); } + } else { + info!( + kind=error_kind.to_metric_label(), + error=%e, + msg=message, + "forwarding error to user" + ); } json_response(