Compare commits

..

9 Commits

Author SHA1 Message Date
Conrad Ludgate
ce93770120 fix missing registration 2025-04-22 12:25:00 +01:00
Conrad Ludgate
897cea978a make connection polling generic 2025-04-22 12:24:57 +01:00
Conrad Ludgate
86fb432ab2 fully abstract global conn pool 2025-04-22 12:17:13 +01:00
Conrad Ludgate
e9a12d626d more abstractions 2025-04-22 12:17:13 +01:00
Conrad Ludgate
42e36ba5e8 simplify generics more 2025-04-22 12:17:13 +01:00
Conrad Ludgate
cf05d4e4b2 simplify pool generics 2025-04-22 12:17:13 +01:00
Alexander Bayandin
cd2e1fbc7c CI(benchmarks): upload perf results for passed tests (#11649)
## Problem

We run benchmarks in batches (five parallel jobs on different runners).
If any test in a batch fails, we won’t upload any results for that
batch, even for the tests that passed.

## Summary of changes
- Move the results upload to a separate step in the run-python-test-set
action, and execute this step even if tests fail.
2025-04-22 09:41:28 +00:00
Tristan Partin
5df4a747e6 Update pgbouncer in compute images to 1.24.1 (#11651)
Fixes CVE-2025-2291.

Link:
https://www.postgresql.org/about/news/pgbouncer-1241-released-fixes-cve-2025-2291-3059/

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-21 17:49:17 +00:00
Vlad Lazar
cbf442292b pageserver: handle empty get vectored queries (#11652)
## Problem

If all batched requests are excluded from the query by
`Timeine::get_rel_page_at_lsn_batched` (e.g. because they are past the
end of the relation), the read path would panic since it doesn't expect
empty queries. This is a change in behaviour that was introduced with
the scattered query implementation.

## Summary of Changes

Handle empty queries explicitly.
2025-04-21 17:45:16 +00:00
14 changed files with 377 additions and 900 deletions

View File

@@ -133,6 +133,7 @@ runs:
fi
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
echo "PERF_REPORT_DIR=${PERF_REPORT_DIR}" >> ${GITHUB_ENV}
rm -rf $PERF_REPORT_DIR
TEST_SELECTION="test_runner/${{ inputs.test_selection }}"
@@ -209,11 +210,12 @@ runs:
--verbose \
-rA $TEST_SELECTION $EXTRA_PARAMS
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
export REPORT_FROM="$PERF_REPORT_DIR"
export REPORT_TO="$PLATFORM"
scripts/generate_and_push_perf_report.sh
fi
- name: Upload performance report
if: ${{ !cancelled() && inputs.save_perf_report == 'true' }}
shell: bash -euxo pipefail {0}
run: |
export REPORT_FROM="${PERF_REPORT_DIR}"
scripts/generate_and_push_perf_report.sh
- name: Upload compatibility snapshot
# Note, that we use `github.base_ref` which is a target branch for a PR

View File

@@ -1677,7 +1677,7 @@ RUN set -e \
&& apt clean && rm -rf /var/lib/apt/lists/*
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
ENV PGBOUNCER_TAG=pgbouncer_1_22_1
ENV PGBOUNCER_TAG=pgbouncer_1_24_1
RUN set -e \
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/pgbouncer/pgbouncer.git pgbouncer \
&& cd pgbouncer \

View File

@@ -1500,9 +1500,24 @@ impl ComputeNode {
let mut conf = conf.as_ref().clone();
conf.application_name("compute_ctl:migrations");
if let Err(e) = handle_migrations(conf).await {
error!("Failed to run migrations: {}", e);
}
match conf.connect(NoTls).await {
Ok((mut client, connection)) => {
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
if let Err(e) = handle_migrations(&mut client).await {
error!("Failed to run migrations: {}", e);
}
}
Err(e) => {
error!(
"Failed to connect to the compute for running migrations: {}",
e
);
}
};
});
Ok::<(), anyhow::Error>(())

View File

@@ -1,60 +1,29 @@
use anyhow::{Context, Result};
use fail::fail_point;
use postgres::NoTls;
use tokio_postgres::{Client, Config, Transaction};
use tracing::{error, info, warn};
use tokio_postgres::{Client, Transaction};
use tracing::{error, info};
use crate::metrics::DB_MIGRATION_FAILED;
/// Runs a series of migrations on a target database
use compute_api::spec::{Database, PgIdent};
use crate::pg_helpers::{Escaping, get_existing_dbs_async};
pub(crate) enum Migration<'m> {
/// Cluster migrations are things like catalog updates, where they can be
/// run in the default Postgres database, but affect every database in the
/// cluster.
Cluster(&'m str),
/// Per-database migrations will be run in every database of the cluster.
/// The migration will not be marked as completed until after it has been
/// run in every database. We will save the `postgres` database for last so
/// that we can commit the transaction as applied in the
/// neon_migration.migration_id table.
///
/// Please be aware of the race condition that exists for this type of
/// migration. At the beginning of running the series of migrations, we get
/// the current list of databases. However, we run migrations in a separate
/// thread in order to not block connections to the compute. If after the
/// time we have gotten the list of databases in the cluster, a user creates
/// a new database, that database will not receive the migration, but we
/// will have marked the migration as completed successfully, assuming all
/// previous databases ran the migration to completion.
PerDatabase(&'m str),
}
pub(crate) struct MigrationRunner<'m> {
/// Postgres client configuration.
config: Config,
/// List of migrations to run.
migrations: &'m [Migration<'m>],
client: &'m mut Client,
migrations: &'m [&'m str],
}
impl<'m> MigrationRunner<'m> {
/// Create a new migration runner
pub fn new(config: Config, migrations: &'m [Migration<'m>]) -> Result<Self> {
// The neon_migration.migration_id::id column is a bigint, which is
// equivalent to an i64
debug_assert!(migrations.len() + 1 < i64::MAX as usize);
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
assert!(migrations.len() + 1 < i64::MAX as usize);
Ok(Self { config, migrations })
Self { client, migrations }
}
/// Get the current value neon_migration.migration_id
async fn get_migration_id(client: &mut Client) -> Result<i64> {
let row = client
async fn get_migration_id(&mut self) -> Result<i64> {
let row = self
.client
.query_one("SELECT id FROM neon_migration.migration_id", &[])
.await?;
@@ -67,8 +36,9 @@ impl<'m> MigrationRunner<'m> {
/// used if you would like to fail the application of a series of migrations
/// at some point.
async fn update_migration_id(txn: &mut Transaction<'_>, migration_id: i64) -> Result<()> {
// We use this fail point in order to check that failing in the middle
// of applying a series of migrations fails in an expected manner
// We use this fail point in order to check that failing in the
// middle of applying a series of migrations fails in an expected
// manner
if cfg!(feature = "testing") {
let fail = (|| {
fail_point!("compute-migration", |fail_migration_id| {
@@ -97,95 +67,48 @@ impl<'m> MigrationRunner<'m> {
}
/// Prepare the migrations the target database for handling migrations
async fn prepare_database(client: &mut Client) -> Result<()> {
client
async fn prepare_database(&mut self) -> Result<()> {
self.client
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
.await?;
client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
client
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
self.client
.simple_query(
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
)
.await?;
client
self.client
.simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")
.await?;
client
self.client
.simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")
.await?;
Ok(())
}
/// Helper function for allowing/disallowing connections to a Postgres
/// database.
async fn allow_connections_to_db(
client: &mut Client,
dbname: &PgIdent,
allow: bool,
) -> Result<()> {
client
.simple_query(
format!(
"ALTER DATABASE {} WITH ALLOW_CONNECTIONS {}",
dbname.pg_quote(),
allow
)
.as_str(),
)
.await?;
Ok(())
}
/// Connect to the configured Postgres database. Spawns a tokio task to
/// handle the connection.
async fn connect(config: &Config) -> Result<Client> {
let (client, connection) = config.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
Ok(client)
}
async fn run_migration(
client: &mut Client,
db: &str,
migration_id: i64,
migration: &str,
update_migration_id: bool,
) -> Result<()> {
/// Run an individual migration in a separate transaction block.
async fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
let mut txn = client
.transaction()
.await
.with_context(|| format!("begin transaction for migration {migration_id}"))?;
if migration.starts_with("-- SKIP") {
info!("Skipping migration id={} db=\"{}\"", migration_id, db);
info!("Skipping migration id={}", migration_id);
// Even though we are skipping the migration, updating the
// migration ID should help keep logic easy to understand when
// trying to understand the state of a cluster.
Self::update_migration_id(&mut txn, migration_id).await?;
} else {
info!(
"Running migration id={} db=\"{}\":\n{}\n",
migration_id, db, migration
);
info!("Running migration id={}:\n{}\n", migration_id, migration);
if let Err(e) = txn.simple_query(migration).await {
error!("Failed to run the migration: {}", e);
return Err(anyhow::anyhow!(e));
}
}
txn.simple_query(migration)
.await
.with_context(|| format!("apply migration {migration_id}"))?;
if update_migration_id {
if let Err(e) = Self::update_migration_id(&mut txn, migration_id).await {
error!(
"Failed to update the migration id to {}: {}",
migration_id, e
);
return Err(e);
}
Self::update_migration_id(&mut txn, migration_id).await?;
}
txn.commit()
@@ -195,201 +118,32 @@ impl<'m> MigrationRunner<'m> {
Ok(())
}
/// Run the migration for the entire cluster. See [`Migration::Cluster`] for
/// more information.
async fn run_cluster_migration(
client: &mut Client,
db: &str,
migration_id: i64,
migration: &str,
) -> Result<()> {
Self::run_migration(client, db, migration_id, migration, true).await
}
/// Run the migration in the specified database. See
/// [`Migration::PerDatabase`] for more information.
async fn run_database_migration(
cluster_client: &mut Client,
config: Config,
db: &Database,
migration_id: i64,
migration: &str,
) -> Result<()> {
// There are 2 race conditions here. Migrations get ran in a separate
// thread to not block the ability to connect to the compute. The race
// conditions are as follow:
//
// 1. If between the time we have retrieved the list of databases in
// the cluster and before we set ALLOW_CONNECTIONS back to false,
// the user has changed allowed connections to the database, we
// will have overwritten their change.
//
// This is not the end of the world, but an inconvenience,
// nonetheless.
//
// 2. If between the time we have allowed connections to the database
// and the time the migration is performed, the user disallows
// connections to the database, we will fail to connect to the
// database.
//
// This is not much of a problem since we will re-run the migration
// the next time we run migrations.
if db.restrict_conn {
info!("Allowing connections to \"{}\" for migrations", db.name);
Self::allow_connections_to_db(cluster_client, &db.name, true)
.await
.context("Failed to allow connections to the database")?;
}
let mut db_client = Self::connect(&config)
/// Run the configured set of migrations
pub async fn run_migrations(mut self) -> Result<()> {
self.prepare_database()
.await
.context("Failed to connect to the database")?;
let result = Self::run_migration(&mut db_client, &db.name, migration_id, migration, false)
.await
.context("Failed to run the migration");
// Reset the connection restriction
if db.restrict_conn {
info!(
"Disallowing connections to \"{}\" because migration {} is done",
db.name, migration_id
);
// Failing here is not the end of the world
if let Err(e) = Self::allow_connections_to_db(cluster_client, &db.name, false).await {
warn!(
"failed to reset ALLOW_CONNECTIONS on \"{}\": {}",
db.name, e
)
}
}
result
}
/// Run the configured set of migrations.
pub async fn run_migrations(self) -> Result<()> {
// Owns the connection to the database containing the
// neon_migration.migration_id table. In addition, all Cluster
// migrations will be run on this connection.
let mut cluster_client = Self::connect(&self.config)
.await
.context("failed to connect to cluster")?;
Self::prepare_database(&mut cluster_client)
.await
.context("failed to prepare database to handle migrations")?;
let mut current_migration = Self::get_migration_id(&mut cluster_client)
.await
.context("failed to get the current migration ID")?
as usize;
// All databases within the cluster
let dbs: Option<Vec<Database>> = {
// Then check if we actually need to run any, and if so, check if
// any need to run in each individual database
if current_migration < self.migrations.len()
&& self.migrations[current_migration..]
.iter()
.any(|m| matches!(m, Migration::PerDatabase(_)))
{
match get_existing_dbs_async(&cluster_client).await {
Ok(dbs) => Some(
// Filter out invalid database (datconnectivity = -2)
dbs.into_values().filter(|d| !d.invalid).collect::<Vec<_>>(),
),
Err(e) => {
error!("Failed to collect the existing databases: {}", e);
return Err(e);
}
}
} else {
None::<Vec<_>>
}
};
let admin_db = self.config.get_dbname().unwrap();
.context("prepare database to handle migrations")?;
let mut current_migration = self.get_migration_id().await? as usize;
while current_migration < self.migrations.len() {
// The index lags the migration ID by 1, so the current migration
// ID is also the next index
let migration_id = (current_migration + 1) as i64;
let migration = self.migrations[current_migration];
let result: Result<()> = match &self.migrations[current_migration] {
Migration::Cluster(migration) => {
Self::run_cluster_migration(
&mut cluster_client,
admin_db,
migration_id,
migration,
)
.await
match Self::run_migration(self.client, migration_id, migration).await {
Ok(_) => {
info!("Finished migration id={}", migration_id);
}
Migration::PerDatabase(migration) => {
let mut result: Result<()> = Ok(());
for db in dbs.as_ref().unwrap() {
// Once all the databases have run the migration, then we can run it in the
// admin database to mark the migration as complete. See the run for the
// admin database outside this loop.
if db.name == admin_db {
continue;
}
let mut config = self.config.clone();
config.dbname(&db.name);
// If we failed to run the migration in the current
// database, stop trying to run this migration
if let Err(e) = Self::run_database_migration(
&mut cluster_client,
config,
db,
migration_id,
migration,
)
.await
{
result = Err(e);
break;
}
}
match result {
Ok(_) => {
// Finally, run the migration for the admin database,
// and update the migration ID
Self::run_migration(
&mut cluster_client,
admin_db,
migration_id,
migration,
true,
)
.await
.map_err(|e| {
error!("failed to commit the per-database migration: {}", e);
e
})
}
Err(e) => Err(e),
}
Err(e) => {
error!("Failed to run migration id={}: {:?}", migration_id, e);
DB_MIGRATION_FAILED
.with_label_values(&[migration_id.to_string().as_str()])
.inc();
return Err(e);
}
};
// If failed, mark the metric and return
if let Err(e) = result {
DB_MIGRATION_FAILED
.with_label_values(&[migration_id.to_string().as_str()])
.inc();
return Err(anyhow::anyhow!(format!(
"failed at migration {migration_id}: {e}"
)));
}
info!("Finished migration id={}", migration_id);
current_migration += 1;
}

View File

@@ -1,235 +0,0 @@
/*
* fix-CVE-2024-4317.sql
*
* Copyright (c) 2024, PostgreSQL Global Development Group
*
* src/backend/catalog/fix-CVE-2024-4317.sql
*
* This file should be run in every database in the cluster to address
* CVE-2024-4317.
*/
DO $$
DECLARE
server_version_num numeric;
BEGIN
SET search_path = pg_catalog;
SELECT setting::numeric FROM pg_settings INTO server_version_num WHERE name = 'server_version_num';
-- Everything after Postgres 17 will have the fix
IF server_version_num >= 170000 THEN
RETURN;
END IF;
-- pg_statistic_ext_data doesn't have the stxdinherit column in 14 and below
IF server_version_num < 150000 THEN
CREATE OR REPLACE VIEW pg_stats_ext WITH (security_barrier) AS
SELECT cn.nspname AS schemaname,
c.relname AS tablename,
sn.nspname AS statistics_schemaname,
s.stxname AS statistics_name,
pg_get_userbyid(s.stxowner) AS statistics_owner,
( SELECT array_agg(a.attname ORDER BY a.attnum)
FROM unnest(s.stxkeys) k
JOIN pg_attribute a
ON (a.attrelid = s.stxrelid AND a.attnum = k)
) AS attnames,
pg_get_statisticsobjdef_expressions(s.oid) as exprs,
s.stxkind AS kinds,
sd.stxdndistinct AS n_distinct,
sd.stxddependencies AS dependencies,
m.most_common_vals,
m.most_common_val_nulls,
m.most_common_freqs,
m.most_common_base_freqs
FROM pg_statistic_ext s JOIN pg_class c ON (c.oid = s.stxrelid)
JOIN pg_statistic_ext_data sd ON (s.oid = sd.stxoid)
LEFT JOIN pg_namespace cn ON (cn.oid = c.relnamespace)
LEFT JOIN pg_namespace sn ON (sn.oid = s.stxnamespace)
LEFT JOIN LATERAL
( SELECT array_agg(values) AS most_common_vals,
array_agg(nulls) AS most_common_val_nulls,
array_agg(frequency) AS most_common_freqs,
array_agg(base_frequency) AS most_common_base_freqs
FROM pg_mcv_list_items(sd.stxdmcv)
) m ON sd.stxdmcv IS NOT NULL
WHERE pg_has_role(c.relowner, 'USAGE')
AND (c.relrowsecurity = false OR NOT row_security_active(c.oid));
CREATE OR REPLACE VIEW pg_stats_ext_exprs WITH (security_barrier) AS
SELECT cn.nspname AS schemaname,
c.relname AS tablename,
sn.nspname AS statistics_schemaname,
s.stxname AS statistics_name,
pg_get_userbyid(s.stxowner) AS statistics_owner,
stat.expr,
(stat.a).stanullfrac AS null_frac,
(stat.a).stawidth AS avg_width,
(stat.a).stadistinct AS n_distinct,
(CASE
WHEN (stat.a).stakind1 = 1 THEN (stat.a).stavalues1
WHEN (stat.a).stakind2 = 1 THEN (stat.a).stavalues2
WHEN (stat.a).stakind3 = 1 THEN (stat.a).stavalues3
WHEN (stat.a).stakind4 = 1 THEN (stat.a).stavalues4
WHEN (stat.a).stakind5 = 1 THEN (stat.a).stavalues5
END) AS most_common_vals,
(CASE
WHEN (stat.a).stakind1 = 1 THEN (stat.a).stanumbers1
WHEN (stat.a).stakind2 = 1 THEN (stat.a).stanumbers2
WHEN (stat.a).stakind3 = 1 THEN (stat.a).stanumbers3
WHEN (stat.a).stakind4 = 1 THEN (stat.a).stanumbers4
WHEN (stat.a).stakind5 = 1 THEN (stat.a).stanumbers5
END) AS most_common_freqs,
(CASE
WHEN (stat.a).stakind1 = 2 THEN (stat.a).stavalues1
WHEN (stat.a).stakind2 = 2 THEN (stat.a).stavalues2
WHEN (stat.a).stakind3 = 2 THEN (stat.a).stavalues3
WHEN (stat.a).stakind4 = 2 THEN (stat.a).stavalues4
WHEN (stat.a).stakind5 = 2 THEN (stat.a).stavalues5
END) AS histogram_bounds,
(CASE
WHEN (stat.a).stakind1 = 3 THEN (stat.a).stanumbers1[1]
WHEN (stat.a).stakind2 = 3 THEN (stat.a).stanumbers2[1]
WHEN (stat.a).stakind3 = 3 THEN (stat.a).stanumbers3[1]
WHEN (stat.a).stakind4 = 3 THEN (stat.a).stanumbers4[1]
WHEN (stat.a).stakind5 = 3 THEN (stat.a).stanumbers5[1]
END) correlation,
(CASE
WHEN (stat.a).stakind1 = 4 THEN (stat.a).stavalues1
WHEN (stat.a).stakind2 = 4 THEN (stat.a).stavalues2
WHEN (stat.a).stakind3 = 4 THEN (stat.a).stavalues3
WHEN (stat.a).stakind4 = 4 THEN (stat.a).stavalues4
WHEN (stat.a).stakind5 = 4 THEN (stat.a).stavalues5
END) AS most_common_elems,
(CASE
WHEN (stat.a).stakind1 = 4 THEN (stat.a).stanumbers1
WHEN (stat.a).stakind2 = 4 THEN (stat.a).stanumbers2
WHEN (stat.a).stakind3 = 4 THEN (stat.a).stanumbers3
WHEN (stat.a).stakind4 = 4 THEN (stat.a).stanumbers4
WHEN (stat.a).stakind5 = 4 THEN (stat.a).stanumbers5
END) AS most_common_elem_freqs,
(CASE
WHEN (stat.a).stakind1 = 5 THEN (stat.a).stanumbers1
WHEN (stat.a).stakind2 = 5 THEN (stat.a).stanumbers2
WHEN (stat.a).stakind3 = 5 THEN (stat.a).stanumbers3
WHEN (stat.a).stakind4 = 5 THEN (stat.a).stanumbers4
WHEN (stat.a).stakind5 = 5 THEN (stat.a).stanumbers5
END) AS elem_count_histogram
FROM pg_statistic_ext s JOIN pg_class c ON (c.oid = s.stxrelid)
LEFT JOIN pg_statistic_ext_data sd ON (s.oid = sd.stxoid)
LEFT JOIN pg_namespace cn ON (cn.oid = c.relnamespace)
LEFT JOIN pg_namespace sn ON (sn.oid = s.stxnamespace)
JOIN LATERAL (
SELECT unnest(pg_get_statisticsobjdef_expressions(s.oid)) AS expr,
unnest(sd.stxdexpr)::pg_statistic AS a
) stat ON (stat.expr IS NOT NULL)
WHERE pg_has_role(c.relowner, 'USAGE')
AND (c.relrowsecurity = false OR NOT row_security_active(c.oid));
ELSE
CREATE OR REPLACE VIEW pg_stats_ext WITH (security_barrier) AS
SELECT cn.nspname AS schemaname,
c.relname AS tablename,
sn.nspname AS statistics_schemaname,
s.stxname AS statistics_name,
pg_get_userbyid(s.stxowner) AS statistics_owner,
( SELECT array_agg(a.attname ORDER BY a.attnum)
FROM unnest(s.stxkeys) k
JOIN pg_attribute a
ON (a.attrelid = s.stxrelid AND a.attnum = k)
) AS attnames,
pg_get_statisticsobjdef_expressions(s.oid) as exprs,
s.stxkind AS kinds,
sd.stxdinherit AS inherited,
sd.stxdndistinct AS n_distinct,
sd.stxddependencies AS dependencies,
m.most_common_vals,
m.most_common_val_nulls,
m.most_common_freqs,
m.most_common_base_freqs
FROM pg_statistic_ext s JOIN pg_class c ON (c.oid = s.stxrelid)
JOIN pg_statistic_ext_data sd ON (s.oid = sd.stxoid)
LEFT JOIN pg_namespace cn ON (cn.oid = c.relnamespace)
LEFT JOIN pg_namespace sn ON (sn.oid = s.stxnamespace)
LEFT JOIN LATERAL
( SELECT array_agg(values) AS most_common_vals,
array_agg(nulls) AS most_common_val_nulls,
array_agg(frequency) AS most_common_freqs,
array_agg(base_frequency) AS most_common_base_freqs
FROM pg_mcv_list_items(sd.stxdmcv)
) m ON sd.stxdmcv IS NOT NULL
WHERE pg_has_role(c.relowner, 'USAGE')
AND (c.relrowsecurity = false OR NOT row_security_active(c.oid));
CREATE OR REPLACE VIEW pg_stats_ext_exprs WITH (security_barrier) AS
SELECT cn.nspname AS schemaname,
c.relname AS tablename,
sn.nspname AS statistics_schemaname,
s.stxname AS statistics_name,
pg_get_userbyid(s.stxowner) AS statistics_owner,
stat.expr,
sd.stxdinherit AS inherited,
(stat.a).stanullfrac AS null_frac,
(stat.a).stawidth AS avg_width,
(stat.a).stadistinct AS n_distinct,
(CASE
WHEN (stat.a).stakind1 = 1 THEN (stat.a).stavalues1
WHEN (stat.a).stakind2 = 1 THEN (stat.a).stavalues2
WHEN (stat.a).stakind3 = 1 THEN (stat.a).stavalues3
WHEN (stat.a).stakind4 = 1 THEN (stat.a).stavalues4
WHEN (stat.a).stakind5 = 1 THEN (stat.a).stavalues5
END) AS most_common_vals,
(CASE
WHEN (stat.a).stakind1 = 1 THEN (stat.a).stanumbers1
WHEN (stat.a).stakind2 = 1 THEN (stat.a).stanumbers2
WHEN (stat.a).stakind3 = 1 THEN (stat.a).stanumbers3
WHEN (stat.a).stakind4 = 1 THEN (stat.a).stanumbers4
WHEN (stat.a).stakind5 = 1 THEN (stat.a).stanumbers5
END) AS most_common_freqs,
(CASE
WHEN (stat.a).stakind1 = 2 THEN (stat.a).stavalues1
WHEN (stat.a).stakind2 = 2 THEN (stat.a).stavalues2
WHEN (stat.a).stakind3 = 2 THEN (stat.a).stavalues3
WHEN (stat.a).stakind4 = 2 THEN (stat.a).stavalues4
WHEN (stat.a).stakind5 = 2 THEN (stat.a).stavalues5
END) AS histogram_bounds,
(CASE
WHEN (stat.a).stakind1 = 3 THEN (stat.a).stanumbers1[1]
WHEN (stat.a).stakind2 = 3 THEN (stat.a).stanumbers2[1]
WHEN (stat.a).stakind3 = 3 THEN (stat.a).stanumbers3[1]
WHEN (stat.a).stakind4 = 3 THEN (stat.a).stanumbers4[1]
WHEN (stat.a).stakind5 = 3 THEN (stat.a).stanumbers5[1]
END) correlation,
(CASE
WHEN (stat.a).stakind1 = 4 THEN (stat.a).stavalues1
WHEN (stat.a).stakind2 = 4 THEN (stat.a).stavalues2
WHEN (stat.a).stakind3 = 4 THEN (stat.a).stavalues3
WHEN (stat.a).stakind4 = 4 THEN (stat.a).stavalues4
WHEN (stat.a).stakind5 = 4 THEN (stat.a).stavalues5
END) AS most_common_elems,
(CASE
WHEN (stat.a).stakind1 = 4 THEN (stat.a).stanumbers1
WHEN (stat.a).stakind2 = 4 THEN (stat.a).stanumbers2
WHEN (stat.a).stakind3 = 4 THEN (stat.a).stanumbers3
WHEN (stat.a).stakind4 = 4 THEN (stat.a).stanumbers4
WHEN (stat.a).stakind5 = 4 THEN (stat.a).stanumbers5
END) AS most_common_elem_freqs,
(CASE
WHEN (stat.a).stakind1 = 5 THEN (stat.a).stanumbers1
WHEN (stat.a).stakind2 = 5 THEN (stat.a).stanumbers2
WHEN (stat.a).stakind3 = 5 THEN (stat.a).stanumbers3
WHEN (stat.a).stakind4 = 5 THEN (stat.a).stanumbers4
WHEN (stat.a).stakind5 = 5 THEN (stat.a).stanumbers5
END) AS elem_count_histogram
FROM pg_statistic_ext s JOIN pg_class c ON (c.oid = s.stxrelid)
LEFT JOIN pg_statistic_ext_data sd ON (s.oid = sd.stxoid)
LEFT JOIN pg_namespace cn ON (cn.oid = c.relnamespace)
LEFT JOIN pg_namespace sn ON (sn.oid = s.stxnamespace)
JOIN LATERAL (
SELECT unnest(pg_get_statisticsobjdef_expressions(s.oid)) AS expr,
unnest(sd.stxdexpr)::pg_statistic AS a
) stat ON (stat.expr IS NOT NULL)
WHERE pg_has_role(c.relowner, 'USAGE')
AND (c.relrowsecurity = false OR NOT row_security_active(c.oid));
END IF;
END $$;

View File

@@ -1,5 +0,0 @@
-- Testing that this migration actually works would require spinning up a
-- Postgres instance running on a vulnerable version. Let's trust that the
-- Postgres community created a SQL fix that actually works.
SELECT 1;

View File

@@ -6,12 +6,12 @@ use compute_api::responses::{
ComputeConfig, ControlPlaneComputeStatus, ControlPlaneConfigResponse,
};
use reqwest::StatusCode;
use tokio_postgres::{Client, Config};
use tokio_postgres::Client;
use tracing::{error, info, instrument};
use crate::config;
use crate::metrics::{CPLANE_REQUESTS_TOTAL, CPlaneRequestRPC, UNKNOWN_HTTP_STATUS};
use crate::migration::{Migration, MigrationRunner};
use crate::migration::MigrationRunner;
use crate::params::PG_HBA_ALL_MD5;
// Do control plane request and return response if any. In case of error it
@@ -169,7 +169,7 @@ pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
}
#[instrument(skip_all)]
pub async fn handle_migrations(config: Config) -> Result<()> {
pub async fn handle_migrations(client: &mut Client) -> Result<()> {
info!("handle migrations");
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@@ -178,50 +178,30 @@ pub async fn handle_migrations(config: Config) -> Result<()> {
// Add new migrations in numerical order.
let migrations = [
Migration::Cluster(include_str!(
"./migrations/0001-neon_superuser_bypass_rls.sql"
)),
Migration::Cluster(include_str!("./migrations/0002-alter_roles.sql")),
Migration::Cluster(include_str!(
"./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"
)),
Migration::Cluster(include_str!(
"./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"
)),
Migration::Cluster(include_str!(
"./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"
)),
Migration::Cluster(include_str!(
"./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"
)),
Migration::Cluster(include_str!(
include_str!("./migrations/0001-neon_superuser_bypass_rls.sql"),
include_str!("./migrations/0002-alter_roles.sql"),
include_str!("./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"),
include_str!("./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"),
include_str!("./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"),
include_str!("./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"),
include_str!(
"./migrations/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
)),
Migration::Cluster(include_str!(
),
include_str!(
"./migrations/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
)),
Migration::Cluster(include_str!(
"./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"
)),
Migration::Cluster(include_str!(
),
include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
include_str!(
"./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql"
)),
Migration::Cluster(include_str!(
),
include_str!(
"./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
)),
Migration::PerDatabase(include_str!("./migrations/0012-fix-CVE-2024-4317.sql")),
),
];
let runner = match MigrationRunner::new(config, &migrations) {
Ok(runner) => runner,
Err(e) => {
error!("Failed to construct a migration runner: {}", e);
return Err(e);
}
};
MigrationRunner::new(client, &migrations)
.run_migrations()
.await?;
runner.run_migrations().await.map_err(|e| {
error!("Failed to run the migrations: {}", e);
e
})
Ok(())
}

View File

@@ -1285,6 +1285,10 @@ impl Timeline {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if query.is_empty() {
return Ok(BTreeMap::default());
}
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
Some(ReadPath::new(
query.total_keyspace(),

View File

@@ -16,9 +16,9 @@ use tracing::field::display;
use tracing::{debug, info};
use super::AsyncRW;
use super::conn_pool::poll_client;
use super::conn_pool::poll_client_generic;
use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
use super::http_conn_pool::{self, HttpConnPool, Send, poll_http2_client};
use super::http_conn_pool::{self, HttpConnPool};
use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool};
use crate::auth::backend::local::StaticAuthRules;
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
@@ -42,10 +42,9 @@ use crate::rate_limiter::EndpointRateLimiter;
use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
pub(crate) struct PoolingBackend {
pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
pub(crate) http_conn_pool: Arc<GlobalConnPool<HttpConnPool>>,
pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
pub(crate) pool:
Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
pub(crate) pool: Arc<GlobalConnPool<EndpointConnPool<postgres_client::Client>>>,
pub(crate) config: &'static ProxyConfig,
pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
@@ -212,7 +211,7 @@ impl PoolingBackend {
None
} else {
debug!("pool: looking for an existing connection");
self.pool.get(ctx, &conn_info)?
self.pool.get(ctx, &conn_info)
};
if let Some(client) = maybe_client {
@@ -246,9 +245,9 @@ impl PoolingBackend {
&self,
ctx: &RequestContext,
conn_info: ConnInfo,
) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
) -> Result<http_conn_pool::Client, HttpConnError> {
debug!("pool: looking for an existing connection");
if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
if let Some(client) = self.http_conn_pool.get(ctx, &conn_info) {
return Ok(client);
}
@@ -532,7 +531,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError {
}
struct TokioMechanism {
pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
pool: Arc<GlobalConnPool<EndpointConnPool<postgres_client::Client>>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,
@@ -578,7 +577,7 @@ impl ConnectMechanism for TokioMechanism {
info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
}
Ok(poll_client(
Ok(poll_client_generic(
self.pool.clone(),
ctx,
self.conn_info.clone(),
@@ -593,7 +592,7 @@ impl ConnectMechanism for TokioMechanism {
}
struct HyperMechanism {
pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
pool: Arc<GlobalConnPool<HttpConnPool>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,
@@ -603,7 +602,7 @@ struct HyperMechanism {
#[async_trait]
impl ConnectMechanism for HyperMechanism {
type Connection = http_conn_pool::Client<Send>;
type Connection = http_conn_pool::Client;
type ConnectError = HttpConnError;
type Error = HttpConnError;
@@ -639,15 +638,26 @@ impl ConnectMechanism for HyperMechanism {
info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
}
Ok(poll_http2_client(
let client = poll_client_generic(
self.pool.clone(),
ctx,
&self.conn_info,
self.conn_info.clone(),
client,
connection,
self.conn_id,
node_info.aux.clone(),
))
);
// auth-broker -> local-proxy clients don't return to the pool, since
// they are multiplexing and cloneable. So instead we insert it once here.
if let Some(endpoint) = self.conn_info.endpoint_cache_key() {
self.pool
.get_or_create_endpoint_pool(&endpoint)
.write()
.register(&client);
}
Ok(client)
}
fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}

View File

@@ -11,7 +11,7 @@ use smallvec::SmallVec;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, error, info, info_span, warn};
use tracing::{error, info, info_span, warn};
#[cfg(test)]
use {
super::conn_pool_lib::GlobalConnPoolOptions,
@@ -20,8 +20,7 @@ use {
};
use super::conn_pool_lib::{
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, EndpointConnPool,
GlobalConnPool,
ClientDataEnum, ClientInnerCommon, ConnInfo, EndpointConnPoolExt, GlobalConnPool,
};
use crate::context::RequestContext;
use crate::control_plane::messages::MetricsAuxInfo;
@@ -29,6 +28,7 @@ use crate::metrics::Metrics;
use crate::tls::postgres_rustls::MakeRustlsConnect;
type TlsStream = <MakeRustlsConnect as MakeTlsConnect<TcpStream>>::Stream;
pub(super) type Conn = postgres_client::Connection<TcpStream, TlsStream>;
#[derive(Debug, Clone)]
pub(crate) struct ConnInfoWithAuth {
@@ -56,20 +56,20 @@ impl fmt::Display for ConnInfo {
}
}
pub(crate) fn poll_client<C: ClientInnerExt>(
global_pool: Arc<GlobalConnPool<C, EndpointConnPool<C>>>,
pub(crate) fn poll_client_generic<P: EndpointConnPoolExt>(
global_pool: Arc<GlobalConnPool<P>>,
ctx: &RequestContext,
conn_info: ConnInfo,
client: C,
mut connection: postgres_client::Connection<TcpStream, TlsStream>,
client: P::ClientInner,
connection: P::Connection,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
) -> Client<C> {
) -> P::Client {
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
let mut session_id = ctx.session_id();
let session_id = ctx.session_id();
let (tx, mut rx) = tokio::sync::watch::channel(session_id);
let span = info_span!(parent: None, "connection", %conn_id);
let span = info_span!(parent: None, "connection", %conn_id, %session_id);
let cold_start_info = ctx.cold_start_info();
span.in_scope(|| {
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
@@ -85,27 +85,30 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
let cancel = CancellationToken::new();
let cancelled = cancel.clone().cancelled_owned();
tokio::spawn(
async move {
tokio::spawn(async move {
let _conn_gauge = conn_gauge;
let mut idle_timeout = pin!(tokio::time::sleep(idle));
let mut cancelled = pin!(cancelled);
let mut connection = pin!(P::spawn_conn(connection));
poll_fn(move |cx| {
let _enter = span.enter();
if cancelled.as_mut().poll(cx).is_ready() {
info!("connection dropped");
return Poll::Ready(())
return Poll::Ready(());
}
match rx.has_changed() {
Ok(true) => {
session_id = *rx.borrow_and_update();
info!(%session_id, "changed session");
let session_id = *rx.borrow_and_update();
span.record("session_id", tracing::field::display(session_id));
info!("changed session");
idle_timeout.as_mut().reset(Instant::now() + idle);
}
Err(_) => {
info!("connection dropped");
return Poll::Ready(())
return Poll::Ready(());
}
_ => {}
}
@@ -117,48 +120,25 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
if let Some(pool) = pool.clone().upgrade() {
// remove client from pool - should close the connection if it's idle.
// does nothing if the client is currently checked-out and in-use
if pool.write().remove_client(db_user.clone(), conn_id) {
if pool.write().remove_conn(db_user.clone(), conn_id) {
info!("idle connection removed");
}
}
}
loop {
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session_id, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session_id, "unknown message");
}
Some(Err(e)) => {
error!(%session_id, "connection error: {}", e);
break
}
None => {
info!("connection closed");
break
}
}
}
ready!(connection.as_mut().poll(cx));
// remove from connection pool
if let Some(pool) = pool.clone().upgrade() {
if pool.write().remove_client(db_user.clone(), conn_id) {
if pool.write().remove_conn(db_user.clone(), conn_id) {
info!("closed connection removed");
}
}
Poll::Ready(())
}).await;
}
.instrument(span));
})
.await;
});
let inner = ClientInnerCommon {
inner: client,
aux,
@@ -169,7 +149,42 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
}),
};
Client::new(inner, conn_info, pool_clone)
P::wrap_client(inner, conn_info, pool_clone)
}
pub async fn poll_tokio_postgres_conn_really(mut connection: Conn) {
poll_fn(move |cx| {
loop {
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!("notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(
pid = notif.process_id(),
channel = notif.channel(),
"notification received"
);
}
Some(Ok(_)) => {
warn!("unknown message");
}
Some(Err(e)) => {
error!("connection error: {}", e);
break;
}
None => {
info!("connection closed");
break;
}
}
}
Poll::Ready(())
})
.await;
}
#[derive(Clone)]
@@ -179,11 +194,11 @@ pub(crate) struct ClientDataRemote {
}
impl ClientDataRemote {
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
&mut self.session
pub fn session(&self) -> &tokio::sync::watch::Sender<uuid::Uuid> {
&self.session
}
pub fn cancel(&mut self) {
pub fn cancel(&self) {
self.cancel.cancel();
}
}
@@ -195,6 +210,7 @@ mod tests {
use super::*;
use crate::proxy::NeonOptions;
use crate::serverless::cancel_set::CancelSet;
use crate::serverless::conn_pool_lib::{Client, ClientInnerExt};
use crate::types::{BranchId, EndpointId, ProjectId};
struct MockClient(Arc<AtomicBool>);

View File

@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Weak};
@@ -12,11 +11,10 @@ use rand::Rng;
use smol_str::ToSmolStr;
use tracing::{Span, debug, info};
use super::backend::HttpConnError;
use super::conn_pool::ClientDataRemote;
use super::http_conn_pool::ClientDataHttp;
use super::conn_pool::{ClientDataRemote, poll_tokio_postgres_conn_really};
use super::local_conn_pool::ClientDataLocal;
use crate::auth::backend::ComputeUserInfo;
use crate::config::HttpConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
@@ -51,7 +49,6 @@ impl ConnInfo {
pub(crate) enum ClientDataEnum {
Remote(ClientDataRemote),
Local(ClientDataLocal),
Http(ClientDataHttp),
}
#[derive(Clone)]
@@ -64,14 +61,9 @@ pub(crate) struct ClientInnerCommon<C: ClientInnerExt> {
impl<C: ClientInnerExt> Drop for ClientInnerCommon<C> {
fn drop(&mut self) {
match &mut self.data {
ClientDataEnum::Remote(remote_data) => {
remote_data.cancel();
}
ClientDataEnum::Local(local_data) => {
local_data.cancel();
}
ClientDataEnum::Http(_http_data) => (),
match &self.data {
ClientDataEnum::Remote(remote_data) => remote_data.cancel(),
ClientDataEnum::Local(local_data) => local_data.cancel(),
}
}
}
@@ -81,8 +73,8 @@ impl<C: ClientInnerExt> ClientInnerCommon<C> {
self.conn_id
}
pub(crate) fn get_data(&mut self) -> &mut ClientDataEnum {
&mut self.data
pub(crate) fn get_data(&self) -> &ClientDataEnum {
&self.data
}
}
@@ -326,12 +318,70 @@ impl<C: ClientInnerExt> DbUserConn<C> for DbUserConnPool<C> {
}
}
pub(crate) trait EndpointConnPoolExt<C: ClientInnerExt> {
pub(crate) trait EndpointConnPoolExt: Send + Sync + 'static {
type Client;
type ClientInner: ClientInnerExt;
type Connection: Send + 'static;
fn create(config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self;
fn wrap_client(
inner: ClientInnerCommon<Self::ClientInner>,
conn_info: ConnInfo,
pool: Weak<RwLock<Self>>,
) -> Self::Client;
fn get_conn_entry(
&mut self,
db_user: (DbName, RoleName),
) -> Option<ClientInnerCommon<Self::ClientInner>>;
fn remove_conn(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool;
fn spawn_conn(conn: Self::Connection) -> impl Future<Output = ()> + Send + 'static;
fn clear_closed(&mut self) -> usize;
fn total_conns(&self) -> usize;
}
impl<C: ClientInnerExt> EndpointConnPoolExt<C> for EndpointConnPool<C> {
impl<C: ClientInnerExt> EndpointConnPoolExt for EndpointConnPool<C> {
type Client = Client<C>;
type ClientInner = C;
type Connection = super::conn_pool::Conn;
fn create(config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self {
EndpointConnPool {
pools: HashMap::new(),
total_conns: 0,
max_conns: config.pool_options.max_conns_per_endpoint,
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count,
global_pool_size_max_conns: config.pool_options.max_total_conns,
pool_name: String::from("remote"),
}
}
fn wrap_client(
client: ClientInnerCommon<Self::ClientInner>,
conn_info: ConnInfo,
pool: Weak<RwLock<Self>>,
) -> Self::Client {
Client::new(client, conn_info.clone(), pool)
}
fn get_conn_entry(
&mut self,
db_user: (DbName, RoleName),
) -> Option<ClientInnerCommon<Self::ClientInner>> {
Some(self.get_conn_entry(db_user)?.conn)
}
fn remove_conn(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool {
self.remove_client(db_user, conn_id)
}
async fn spawn_conn(conn: Self::Connection) {
poll_tokio_postgres_conn_really(conn).await;
}
fn clear_closed(&mut self) -> usize {
let mut clients_removed: usize = 0;
for db_pool in self.pools.values_mut() {
@@ -345,10 +395,9 @@ impl<C: ClientInnerExt> EndpointConnPoolExt<C> for EndpointConnPool<C> {
}
}
pub(crate) struct GlobalConnPool<C, P>
pub(crate) struct GlobalConnPool<P>
where
C: ClientInnerExt,
P: EndpointConnPoolExt<C>,
P: EndpointConnPoolExt,
{
// endpoint -> per-endpoint connection pool
//
@@ -367,8 +416,6 @@ where
pub(crate) global_connections_count: Arc<AtomicUsize>,
pub(crate) config: &'static crate::config::HttpConfig,
_marker: PhantomData<C>,
}
#[derive(Debug, Clone, Copy)]
@@ -391,10 +438,9 @@ pub struct GlobalConnPoolOptions {
pub max_total_conns: usize,
}
impl<C, P> GlobalConnPool<C, P>
impl<P> GlobalConnPool<P>
where
C: ClientInnerExt,
P: EndpointConnPoolExt<C>,
P: EndpointConnPoolExt,
{
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
let shards = config.pool_options.pool_shards;
@@ -403,7 +449,6 @@ where
global_pool_size: AtomicUsize::new(0),
config,
global_connections_count: Arc::new(AtomicUsize::new(0)),
_marker: PhantomData,
})
}
@@ -492,80 +537,72 @@ where
}
}
impl<C: ClientInnerExt> GlobalConnPool<C, EndpointConnPool<C>> {
impl<P: EndpointConnPoolExt> GlobalConnPool<P> {
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestContext,
conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> {
let mut client: Option<ClientInnerCommon<C>> = None;
let Some(endpoint) = conn_info.endpoint_cache_key() else {
return Ok(None);
};
) -> Option<P::Client> {
let endpoint = conn_info.endpoint_cache_key()?;
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
if let Some(entry) = endpoint_pool
let endpoint_pool = self.get_endpoint_pool(&endpoint)?;
let client = endpoint_pool
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
}
.get_conn_entry(conn_info.db_and_user())?;
let endpoint_pool = Arc::downgrade(&endpoint_pool);
// ok return cached connection if found and establish a new one otherwise
if let Some(mut client) = client {
if client.inner.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
}
tracing::Span::current()
.record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
match client.get_data() {
ClientDataEnum::Local(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Remote(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Http(_) => (),
}
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
if client.inner.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return None;
}
Ok(None)
tracing::Span::current().record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
match client.get_data() {
ClientDataEnum::Local(data) => {
data.session().send(ctx.session_id()).ok()?;
}
ClientDataEnum::Remote(data) => {
data.session().send(ctx.session_id()).ok()?;
}
}
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
Some(P::wrap_client(client, conn_info.clone(), endpoint_pool))
}
}
impl<P: EndpointConnPoolExt> GlobalConnPool<P> {
pub(crate) fn get_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Option<Arc<RwLock<P>>> {
Some(self.global_pool.get(endpoint)?.clone())
}
pub(crate) fn get_or_create_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Arc<RwLock<EndpointConnPool<C>>> {
) -> Arc<RwLock<P>> {
// fast path
if let Some(pool) = self.global_pool.get(endpoint) {
return pool.clone();
}
// slow path
let new_pool = Arc::new(RwLock::new(EndpointConnPool {
pools: HashMap::new(),
total_conns: 0,
max_conns: self.config.pool_options.max_conns_per_endpoint,
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count: self.global_connections_count.clone(),
global_pool_size_max_conns: self.config.pool_options.max_total_conns,
pool_name: String::from("remote"),
}));
let new_pool = Arc::new(RwLock::new(P::create(
self.config,
self.global_connections_count.clone(),
)));
// find or create a pool for this endpoint
let mut created = false;
@@ -592,6 +629,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C, EndpointConnPool<C>> {
pool
}
}
pub(crate) struct Client<C: ClientInnerExt> {
span: Span,
inner: Option<ClientInnerCommon<C>>,

View File

@@ -4,32 +4,25 @@ use std::sync::{Arc, Weak};
use hyper::client::conn::http2;
use hyper_util::rt::{TokioExecutor, TokioIo};
use parking_lot::RwLock;
use smol_str::ToSmolStr;
use tracing::{Instrument, debug, error, info, info_span};
use tracing::{error, info};
use super::AsyncRW;
use super::backend::HttpConnError;
use super::conn_pool_lib::{
ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry,
EndpointConnPoolExt, GlobalConnPool,
ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry, EndpointConnPoolExt,
};
use crate::config::HttpConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::protocol2::ConnectionInfoExtra;
use crate::types::EndpointCacheKey;
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
pub(crate) type Connect = http2::Connection<TokioIo<AsyncRW>, hyper::body::Incoming, TokioExecutor>;
#[derive(Clone)]
pub(crate) struct ClientDataHttp();
// Per-endpoint connection pool
// Number of open connections is limited by the `max_conns_per_endpoint`.
pub(crate) struct HttpConnPool<C: ClientInnerExt + Clone> {
pub(crate) struct HttpConnPool {
// TODO(conrad):
// either we should open more connections depending on stream count
// (not exposed by hyper, need our own counter)
@@ -39,13 +32,13 @@ pub(crate) struct HttpConnPool<C: ClientInnerExt + Clone> {
// seems somewhat redundant though.
//
// Probably we should run a semaphore and just the single conn. TBD.
conns: VecDeque<ConnPoolEntry<C>>,
conns: VecDeque<ConnPoolEntry<Send>>,
_guard: HttpEndpointPoolsGuard<'static>,
global_connections_count: Arc<AtomicUsize>,
}
impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<C>> {
impl HttpConnPool {
fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<Send>> {
let Self { conns, .. } = self;
loop {
@@ -83,9 +76,59 @@ impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
}
removed > 0
}
pub fn register(&mut self, client: &Client) {
self.conns.push_back(ConnPoolEntry {
conn: client.inner.clone(),
_last_access: std::time::Instant::now(),
});
}
}
impl<C: ClientInnerExt + Clone> EndpointConnPoolExt<C> for HttpConnPool<C> {
impl EndpointConnPoolExt for HttpConnPool {
type Client = Client;
type ClientInner = Send;
type Connection = Connect;
fn create(_config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self {
HttpConnPool {
conns: VecDeque::new(),
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count,
}
}
fn wrap_client(
inner: ClientInnerCommon<Self::ClientInner>,
_conn_info: ConnInfo,
_pool: Weak<parking_lot::RwLock<Self>>,
) -> Self::Client {
Client::new(inner)
}
fn get_conn_entry(
&mut self,
_db_user: (crate::types::DbName, crate::types::RoleName),
) -> Option<ClientInnerCommon<Self::ClientInner>> {
Some(self.get_conn_entry()?.conn)
}
fn remove_conn(
&mut self,
_db_user: (crate::types::DbName, crate::types::RoleName),
conn_id: uuid::Uuid,
) -> bool {
self.remove_conn(conn_id)
}
async fn spawn_conn(conn: Self::Connection) {
let res = conn.await;
match res {
Ok(()) => info!("connection closed"),
Err(e) => error!("connection error: {e:?}"),
}
}
fn clear_closed(&mut self) -> usize {
let Self { conns, .. } = self;
let old_len = conns.len();
@@ -100,7 +143,7 @@ impl<C: ClientInnerExt + Clone> EndpointConnPoolExt<C> for HttpConnPool<C> {
}
}
impl<C: ClientInnerExt + Clone> Drop for HttpConnPool<C> {
impl Drop for HttpConnPool {
fn drop(&mut self) {
if !self.conns.is_empty() {
self.global_connections_count
@@ -114,154 +157,12 @@ impl<C: ClientInnerExt + Clone> Drop for HttpConnPool<C> {
}
}
impl<C: ClientInnerExt + Clone> GlobalConnPool<C, HttpConnPool<C>> {
#[expect(unused_results)]
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestContext,
conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> {
let result: Result<Option<Client<C>>, HttpConnError>;
let Some(endpoint) = conn_info.endpoint_cache_key() else {
result = Ok(None);
return result;
};
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
let Some(client) = endpoint_pool.write().get_conn_entry() else {
result = Ok(None);
return result;
};
tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id));
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
Ok(Some(Client::new(client.conn.clone())))
}
fn get_or_create_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Arc<RwLock<HttpConnPool<C>>> {
// fast path
if let Some(pool) = self.global_pool.get(endpoint) {
return pool.clone();
}
// slow path
let new_pool = Arc::new(RwLock::new(HttpConnPool {
conns: VecDeque::new(),
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count: self.global_connections_count.clone(),
}));
// find or create a pool for this endpoint
let mut created = false;
let pool = self
.global_pool
.entry(endpoint.clone())
.or_insert_with(|| {
created = true;
new_pool
})
.clone();
// log new global pool size
if created {
let global_pool_size = self
.global_pool_size
.fetch_add(1, atomic::Ordering::Relaxed)
+ 1;
info!(
"pool: created new pool for '{endpoint}', global pool size now {global_pool_size}"
);
}
pool
}
pub(crate) struct Client {
pub(crate) inner: ClientInnerCommon<Send>,
}
pub(crate) fn poll_http2_client(
global_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
ctx: &RequestContext,
conn_info: &ConnInfo,
client: Send,
connection: Connect,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
) -> Client<Send> {
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
let session_id = ctx.session_id();
let span = info_span!(parent: None, "connection", %conn_id);
let cold_start_info = ctx.cold_start_info();
span.in_scope(|| {
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
});
let pool = match conn_info.endpoint_cache_key() {
Some(endpoint) => {
let pool = global_pool.get_or_create_endpoint_pool(&endpoint);
let client = ClientInnerCommon {
inner: client.clone(),
aux: aux.clone(),
conn_id,
data: ClientDataEnum::Http(ClientDataHttp()),
};
pool.write().conns.push_back(ConnPoolEntry {
conn: client,
_last_access: std::time::Instant::now(),
});
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.inc();
Arc::downgrade(&pool)
}
None => Weak::new(),
};
tokio::spawn(
async move {
let _conn_gauge = conn_gauge;
let res = connection.await;
match res {
Ok(()) => info!("connection closed"),
Err(e) => error!(%session_id, "connection error: {e:?}"),
}
// remove from connection pool
if let Some(pool) = pool.clone().upgrade() {
if pool.write().remove_conn(conn_id) {
info!("closed connection removed");
}
}
}
.instrument(span),
);
let client = ClientInnerCommon {
inner: client,
aux,
conn_id,
data: ClientDataEnum::Http(ClientDataHttp()),
};
Client::new(client)
}
pub(crate) struct Client<C: ClientInnerExt + Clone> {
pub(crate) inner: ClientInnerCommon<C>,
}
impl<C: ClientInnerExt + Clone> Client<C> {
pub(self) fn new(inner: ClientInnerCommon<C>) -> Self {
impl Client {
pub(self) fn new(inner: ClientInnerCommon<Send>) -> Self {
Self { inner }
}

View File

@@ -53,11 +53,11 @@ pub(crate) struct ClientDataLocal {
}
impl ClientDataLocal {
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
&mut self.session
pub fn session(&self) -> &tokio::sync::watch::Sender<uuid::Uuid> {
&self.session
}
pub fn cancel(&mut self) {
pub fn cancel(&self) {
self.cancel.cancel();
}
}
@@ -99,7 +99,7 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
.map(|entry| entry.conn);
// ok return cached connection if found and establish a new one otherwise
if let Some(mut client) = client {
if let Some(client) = client {
if client.inner.is_closed() {
info!("local_pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
@@ -120,11 +120,9 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
ClientDataEnum::Local(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Remote(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Http(_) => (),
}
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);

View File

@@ -75,7 +75,6 @@ def test_compute_migrations_retry(neon_simple_env: NeonEnv, compute_migrations_d
pattern = rf"Running migration id={i}"
endpoint.log_contains(pattern)
endpoint.log_contains(rf"Finished migration id={i}")
@pytest.mark.parametrize(