mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 05:30:37 +00:00
Compare commits
9 Commits
tristan957
...
conrad/poo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ce93770120 | ||
|
|
897cea978a | ||
|
|
86fb432ab2 | ||
|
|
e9a12d626d | ||
|
|
42e36ba5e8 | ||
|
|
cf05d4e4b2 | ||
|
|
cd2e1fbc7c | ||
|
|
5df4a747e6 | ||
|
|
cbf442292b |
12
.github/actions/run-python-test-set/action.yml
vendored
12
.github/actions/run-python-test-set/action.yml
vendored
@@ -133,6 +133,7 @@ runs:
|
||||
fi
|
||||
|
||||
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
|
||||
echo "PERF_REPORT_DIR=${PERF_REPORT_DIR}" >> ${GITHUB_ENV}
|
||||
rm -rf $PERF_REPORT_DIR
|
||||
|
||||
TEST_SELECTION="test_runner/${{ inputs.test_selection }}"
|
||||
@@ -209,11 +210,12 @@ runs:
|
||||
--verbose \
|
||||
-rA $TEST_SELECTION $EXTRA_PARAMS
|
||||
|
||||
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
|
||||
export REPORT_FROM="$PERF_REPORT_DIR"
|
||||
export REPORT_TO="$PLATFORM"
|
||||
scripts/generate_and_push_perf_report.sh
|
||||
fi
|
||||
- name: Upload performance report
|
||||
if: ${{ !cancelled() && inputs.save_perf_report == 'true' }}
|
||||
shell: bash -euxo pipefail {0}
|
||||
run: |
|
||||
export REPORT_FROM="${PERF_REPORT_DIR}"
|
||||
scripts/generate_and_push_perf_report.sh
|
||||
|
||||
- name: Upload compatibility snapshot
|
||||
# Note, that we use `github.base_ref` which is a target branch for a PR
|
||||
|
||||
@@ -1677,7 +1677,7 @@ RUN set -e \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
|
||||
ENV PGBOUNCER_TAG=pgbouncer_1_22_1
|
||||
ENV PGBOUNCER_TAG=pgbouncer_1_24_1
|
||||
RUN set -e \
|
||||
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/pgbouncer/pgbouncer.git pgbouncer \
|
||||
&& cd pgbouncer \
|
||||
|
||||
@@ -1500,9 +1500,24 @@ impl ComputeNode {
|
||||
let mut conf = conf.as_ref().clone();
|
||||
conf.application_name("compute_ctl:migrations");
|
||||
|
||||
if let Err(e) = handle_migrations(conf).await {
|
||||
error!("Failed to run migrations: {}", e);
|
||||
}
|
||||
match conf.connect(NoTls).await {
|
||||
Ok((mut client, connection)) => {
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
if let Err(e) = handle_migrations(&mut client).await {
|
||||
error!("Failed to run migrations: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to connect to the compute for running migrations: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
|
||||
@@ -1,60 +1,29 @@
|
||||
use anyhow::{Context, Result};
|
||||
use fail::fail_point;
|
||||
use postgres::NoTls;
|
||||
use tokio_postgres::{Client, Config, Transaction};
|
||||
use tracing::{error, info, warn};
|
||||
use tokio_postgres::{Client, Transaction};
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::metrics::DB_MIGRATION_FAILED;
|
||||
|
||||
/// Runs a series of migrations on a target database
|
||||
use compute_api::spec::{Database, PgIdent};
|
||||
|
||||
use crate::pg_helpers::{Escaping, get_existing_dbs_async};
|
||||
|
||||
pub(crate) enum Migration<'m> {
|
||||
/// Cluster migrations are things like catalog updates, where they can be
|
||||
/// run in the default Postgres database, but affect every database in the
|
||||
/// cluster.
|
||||
Cluster(&'m str),
|
||||
|
||||
/// Per-database migrations will be run in every database of the cluster.
|
||||
/// The migration will not be marked as completed until after it has been
|
||||
/// run in every database. We will save the `postgres` database for last so
|
||||
/// that we can commit the transaction as applied in the
|
||||
/// neon_migration.migration_id table.
|
||||
///
|
||||
/// Please be aware of the race condition that exists for this type of
|
||||
/// migration. At the beginning of running the series of migrations, we get
|
||||
/// the current list of databases. However, we run migrations in a separate
|
||||
/// thread in order to not block connections to the compute. If after the
|
||||
/// time we have gotten the list of databases in the cluster, a user creates
|
||||
/// a new database, that database will not receive the migration, but we
|
||||
/// will have marked the migration as completed successfully, assuming all
|
||||
/// previous databases ran the migration to completion.
|
||||
PerDatabase(&'m str),
|
||||
}
|
||||
|
||||
pub(crate) struct MigrationRunner<'m> {
|
||||
/// Postgres client configuration.
|
||||
config: Config,
|
||||
|
||||
/// List of migrations to run.
|
||||
migrations: &'m [Migration<'m>],
|
||||
client: &'m mut Client,
|
||||
migrations: &'m [&'m str],
|
||||
}
|
||||
|
||||
impl<'m> MigrationRunner<'m> {
|
||||
/// Create a new migration runner
|
||||
pub fn new(config: Config, migrations: &'m [Migration<'m>]) -> Result<Self> {
|
||||
// The neon_migration.migration_id::id column is a bigint, which is
|
||||
// equivalent to an i64
|
||||
debug_assert!(migrations.len() + 1 < i64::MAX as usize);
|
||||
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
|
||||
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
|
||||
assert!(migrations.len() + 1 < i64::MAX as usize);
|
||||
|
||||
Ok(Self { config, migrations })
|
||||
Self { client, migrations }
|
||||
}
|
||||
|
||||
/// Get the current value neon_migration.migration_id
|
||||
async fn get_migration_id(client: &mut Client) -> Result<i64> {
|
||||
let row = client
|
||||
async fn get_migration_id(&mut self) -> Result<i64> {
|
||||
let row = self
|
||||
.client
|
||||
.query_one("SELECT id FROM neon_migration.migration_id", &[])
|
||||
.await?;
|
||||
|
||||
@@ -67,8 +36,9 @@ impl<'m> MigrationRunner<'m> {
|
||||
/// used if you would like to fail the application of a series of migrations
|
||||
/// at some point.
|
||||
async fn update_migration_id(txn: &mut Transaction<'_>, migration_id: i64) -> Result<()> {
|
||||
// We use this fail point in order to check that failing in the middle
|
||||
// of applying a series of migrations fails in an expected manner
|
||||
// We use this fail point in order to check that failing in the
|
||||
// middle of applying a series of migrations fails in an expected
|
||||
// manner
|
||||
if cfg!(feature = "testing") {
|
||||
let fail = (|| {
|
||||
fail_point!("compute-migration", |fail_migration_id| {
|
||||
@@ -97,95 +67,48 @@ impl<'m> MigrationRunner<'m> {
|
||||
}
|
||||
|
||||
/// Prepare the migrations the target database for handling migrations
|
||||
async fn prepare_database(client: &mut Client) -> Result<()> {
|
||||
client
|
||||
async fn prepare_database(&mut self) -> Result<()> {
|
||||
self.client
|
||||
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
|
||||
.await?;
|
||||
client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
|
||||
client
|
||||
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
|
||||
self.client
|
||||
.simple_query(
|
||||
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
|
||||
)
|
||||
.await?;
|
||||
client
|
||||
self.client
|
||||
.simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")
|
||||
.await?;
|
||||
client
|
||||
self.client
|
||||
.simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper function for allowing/disallowing connections to a Postgres
|
||||
/// database.
|
||||
async fn allow_connections_to_db(
|
||||
client: &mut Client,
|
||||
dbname: &PgIdent,
|
||||
allow: bool,
|
||||
) -> Result<()> {
|
||||
client
|
||||
.simple_query(
|
||||
format!(
|
||||
"ALTER DATABASE {} WITH ALLOW_CONNECTIONS {}",
|
||||
dbname.pg_quote(),
|
||||
allow
|
||||
)
|
||||
.as_str(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Connect to the configured Postgres database. Spawns a tokio task to
|
||||
/// handle the connection.
|
||||
async fn connect(config: &Config) -> Result<Client> {
|
||||
let (client, connection) = config.connect(NoTls).await?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
error!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
async fn run_migration(
|
||||
client: &mut Client,
|
||||
db: &str,
|
||||
migration_id: i64,
|
||||
migration: &str,
|
||||
update_migration_id: bool,
|
||||
) -> Result<()> {
|
||||
/// Run an individual migration in a separate transaction block.
|
||||
async fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
|
||||
let mut txn = client
|
||||
.transaction()
|
||||
.await
|
||||
.with_context(|| format!("begin transaction for migration {migration_id}"))?;
|
||||
|
||||
if migration.starts_with("-- SKIP") {
|
||||
info!("Skipping migration id={} db=\"{}\"", migration_id, db);
|
||||
info!("Skipping migration id={}", migration_id);
|
||||
|
||||
// Even though we are skipping the migration, updating the
|
||||
// migration ID should help keep logic easy to understand when
|
||||
// trying to understand the state of a cluster.
|
||||
Self::update_migration_id(&mut txn, migration_id).await?;
|
||||
} else {
|
||||
info!(
|
||||
"Running migration id={} db=\"{}\":\n{}\n",
|
||||
migration_id, db, migration
|
||||
);
|
||||
info!("Running migration id={}:\n{}\n", migration_id, migration);
|
||||
|
||||
if let Err(e) = txn.simple_query(migration).await {
|
||||
error!("Failed to run the migration: {}", e);
|
||||
return Err(anyhow::anyhow!(e));
|
||||
}
|
||||
}
|
||||
txn.simple_query(migration)
|
||||
.await
|
||||
.with_context(|| format!("apply migration {migration_id}"))?;
|
||||
|
||||
if update_migration_id {
|
||||
if let Err(e) = Self::update_migration_id(&mut txn, migration_id).await {
|
||||
error!(
|
||||
"Failed to update the migration id to {}: {}",
|
||||
migration_id, e
|
||||
);
|
||||
return Err(e);
|
||||
}
|
||||
Self::update_migration_id(&mut txn, migration_id).await?;
|
||||
}
|
||||
|
||||
txn.commit()
|
||||
@@ -195,201 +118,32 @@ impl<'m> MigrationRunner<'m> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run the migration for the entire cluster. See [`Migration::Cluster`] for
|
||||
/// more information.
|
||||
async fn run_cluster_migration(
|
||||
client: &mut Client,
|
||||
db: &str,
|
||||
migration_id: i64,
|
||||
migration: &str,
|
||||
) -> Result<()> {
|
||||
Self::run_migration(client, db, migration_id, migration, true).await
|
||||
}
|
||||
|
||||
/// Run the migration in the specified database. See
|
||||
/// [`Migration::PerDatabase`] for more information.
|
||||
async fn run_database_migration(
|
||||
cluster_client: &mut Client,
|
||||
config: Config,
|
||||
db: &Database,
|
||||
migration_id: i64,
|
||||
migration: &str,
|
||||
) -> Result<()> {
|
||||
// There are 2 race conditions here. Migrations get ran in a separate
|
||||
// thread to not block the ability to connect to the compute. The race
|
||||
// conditions are as follow:
|
||||
//
|
||||
// 1. If between the time we have retrieved the list of databases in
|
||||
// the cluster and before we set ALLOW_CONNECTIONS back to false,
|
||||
// the user has changed allowed connections to the database, we
|
||||
// will have overwritten their change.
|
||||
//
|
||||
// This is not the end of the world, but an inconvenience,
|
||||
// nonetheless.
|
||||
//
|
||||
// 2. If between the time we have allowed connections to the database
|
||||
// and the time the migration is performed, the user disallows
|
||||
// connections to the database, we will fail to connect to the
|
||||
// database.
|
||||
//
|
||||
// This is not much of a problem since we will re-run the migration
|
||||
// the next time we run migrations.
|
||||
if db.restrict_conn {
|
||||
info!("Allowing connections to \"{}\" for migrations", db.name);
|
||||
|
||||
Self::allow_connections_to_db(cluster_client, &db.name, true)
|
||||
.await
|
||||
.context("Failed to allow connections to the database")?;
|
||||
}
|
||||
|
||||
let mut db_client = Self::connect(&config)
|
||||
/// Run the configured set of migrations
|
||||
pub async fn run_migrations(mut self) -> Result<()> {
|
||||
self.prepare_database()
|
||||
.await
|
||||
.context("Failed to connect to the database")?;
|
||||
|
||||
let result = Self::run_migration(&mut db_client, &db.name, migration_id, migration, false)
|
||||
.await
|
||||
.context("Failed to run the migration");
|
||||
|
||||
// Reset the connection restriction
|
||||
if db.restrict_conn {
|
||||
info!(
|
||||
"Disallowing connections to \"{}\" because migration {} is done",
|
||||
db.name, migration_id
|
||||
);
|
||||
|
||||
// Failing here is not the end of the world
|
||||
if let Err(e) = Self::allow_connections_to_db(cluster_client, &db.name, false).await {
|
||||
warn!(
|
||||
"failed to reset ALLOW_CONNECTIONS on \"{}\": {}",
|
||||
db.name, e
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Run the configured set of migrations.
|
||||
pub async fn run_migrations(self) -> Result<()> {
|
||||
// Owns the connection to the database containing the
|
||||
// neon_migration.migration_id table. In addition, all Cluster
|
||||
// migrations will be run on this connection.
|
||||
let mut cluster_client = Self::connect(&self.config)
|
||||
.await
|
||||
.context("failed to connect to cluster")?;
|
||||
|
||||
Self::prepare_database(&mut cluster_client)
|
||||
.await
|
||||
.context("failed to prepare database to handle migrations")?;
|
||||
|
||||
let mut current_migration = Self::get_migration_id(&mut cluster_client)
|
||||
.await
|
||||
.context("failed to get the current migration ID")?
|
||||
as usize;
|
||||
|
||||
// All databases within the cluster
|
||||
let dbs: Option<Vec<Database>> = {
|
||||
// Then check if we actually need to run any, and if so, check if
|
||||
// any need to run in each individual database
|
||||
if current_migration < self.migrations.len()
|
||||
&& self.migrations[current_migration..]
|
||||
.iter()
|
||||
.any(|m| matches!(m, Migration::PerDatabase(_)))
|
||||
{
|
||||
match get_existing_dbs_async(&cluster_client).await {
|
||||
Ok(dbs) => Some(
|
||||
// Filter out invalid database (datconnectivity = -2)
|
||||
dbs.into_values().filter(|d| !d.invalid).collect::<Vec<_>>(),
|
||||
),
|
||||
Err(e) => {
|
||||
error!("Failed to collect the existing databases: {}", e);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None::<Vec<_>>
|
||||
}
|
||||
};
|
||||
|
||||
let admin_db = self.config.get_dbname().unwrap();
|
||||
.context("prepare database to handle migrations")?;
|
||||
|
||||
let mut current_migration = self.get_migration_id().await? as usize;
|
||||
while current_migration < self.migrations.len() {
|
||||
// The index lags the migration ID by 1, so the current migration
|
||||
// ID is also the next index
|
||||
let migration_id = (current_migration + 1) as i64;
|
||||
let migration = self.migrations[current_migration];
|
||||
|
||||
let result: Result<()> = match &self.migrations[current_migration] {
|
||||
Migration::Cluster(migration) => {
|
||||
Self::run_cluster_migration(
|
||||
&mut cluster_client,
|
||||
admin_db,
|
||||
migration_id,
|
||||
migration,
|
||||
)
|
||||
.await
|
||||
match Self::run_migration(self.client, migration_id, migration).await {
|
||||
Ok(_) => {
|
||||
info!("Finished migration id={}", migration_id);
|
||||
}
|
||||
Migration::PerDatabase(migration) => {
|
||||
let mut result: Result<()> = Ok(());
|
||||
for db in dbs.as_ref().unwrap() {
|
||||
// Once all the databases have run the migration, then we can run it in the
|
||||
// admin database to mark the migration as complete. See the run for the
|
||||
// admin database outside this loop.
|
||||
if db.name == admin_db {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut config = self.config.clone();
|
||||
config.dbname(&db.name);
|
||||
|
||||
// If we failed to run the migration in the current
|
||||
// database, stop trying to run this migration
|
||||
if let Err(e) = Self::run_database_migration(
|
||||
&mut cluster_client,
|
||||
config,
|
||||
db,
|
||||
migration_id,
|
||||
migration,
|
||||
)
|
||||
.await
|
||||
{
|
||||
result = Err(e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
// Finally, run the migration for the admin database,
|
||||
// and update the migration ID
|
||||
Self::run_migration(
|
||||
&mut cluster_client,
|
||||
admin_db,
|
||||
migration_id,
|
||||
migration,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("failed to commit the per-database migration: {}", e);
|
||||
e
|
||||
})
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to run migration id={}: {:?}", migration_id, e);
|
||||
DB_MIGRATION_FAILED
|
||||
.with_label_values(&[migration_id.to_string().as_str()])
|
||||
.inc();
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
// If failed, mark the metric and return
|
||||
if let Err(e) = result {
|
||||
DB_MIGRATION_FAILED
|
||||
.with_label_values(&[migration_id.to_string().as_str()])
|
||||
.inc();
|
||||
|
||||
return Err(anyhow::anyhow!(format!(
|
||||
"failed at migration {migration_id}: {e}"
|
||||
)));
|
||||
}
|
||||
|
||||
info!("Finished migration id={}", migration_id);
|
||||
|
||||
current_migration += 1;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,235 +0,0 @@
|
||||
/*
|
||||
* fix-CVE-2024-4317.sql
|
||||
*
|
||||
* Copyright (c) 2024, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/backend/catalog/fix-CVE-2024-4317.sql
|
||||
*
|
||||
* This file should be run in every database in the cluster to address
|
||||
* CVE-2024-4317.
|
||||
*/
|
||||
|
||||
DO $$
|
||||
DECLARE
|
||||
server_version_num numeric;
|
||||
BEGIN
|
||||
SET search_path = pg_catalog;
|
||||
|
||||
SELECT setting::numeric FROM pg_settings INTO server_version_num WHERE name = 'server_version_num';
|
||||
|
||||
-- Everything after Postgres 17 will have the fix
|
||||
IF server_version_num >= 170000 THEN
|
||||
RETURN;
|
||||
END IF;
|
||||
|
||||
-- pg_statistic_ext_data doesn't have the stxdinherit column in 14 and below
|
||||
IF server_version_num < 150000 THEN
|
||||
CREATE OR REPLACE VIEW pg_stats_ext WITH (security_barrier) AS
|
||||
SELECT cn.nspname AS schemaname,
|
||||
c.relname AS tablename,
|
||||
sn.nspname AS statistics_schemaname,
|
||||
s.stxname AS statistics_name,
|
||||
pg_get_userbyid(s.stxowner) AS statistics_owner,
|
||||
( SELECT array_agg(a.attname ORDER BY a.attnum)
|
||||
FROM unnest(s.stxkeys) k
|
||||
JOIN pg_attribute a
|
||||
ON (a.attrelid = s.stxrelid AND a.attnum = k)
|
||||
) AS attnames,
|
||||
pg_get_statisticsobjdef_expressions(s.oid) as exprs,
|
||||
s.stxkind AS kinds,
|
||||
sd.stxdndistinct AS n_distinct,
|
||||
sd.stxddependencies AS dependencies,
|
||||
m.most_common_vals,
|
||||
m.most_common_val_nulls,
|
||||
m.most_common_freqs,
|
||||
m.most_common_base_freqs
|
||||
FROM pg_statistic_ext s JOIN pg_class c ON (c.oid = s.stxrelid)
|
||||
JOIN pg_statistic_ext_data sd ON (s.oid = sd.stxoid)
|
||||
LEFT JOIN pg_namespace cn ON (cn.oid = c.relnamespace)
|
||||
LEFT JOIN pg_namespace sn ON (sn.oid = s.stxnamespace)
|
||||
LEFT JOIN LATERAL
|
||||
( SELECT array_agg(values) AS most_common_vals,
|
||||
array_agg(nulls) AS most_common_val_nulls,
|
||||
array_agg(frequency) AS most_common_freqs,
|
||||
array_agg(base_frequency) AS most_common_base_freqs
|
||||
FROM pg_mcv_list_items(sd.stxdmcv)
|
||||
) m ON sd.stxdmcv IS NOT NULL
|
||||
WHERE pg_has_role(c.relowner, 'USAGE')
|
||||
AND (c.relrowsecurity = false OR NOT row_security_active(c.oid));
|
||||
|
||||
CREATE OR REPLACE VIEW pg_stats_ext_exprs WITH (security_barrier) AS
|
||||
SELECT cn.nspname AS schemaname,
|
||||
c.relname AS tablename,
|
||||
sn.nspname AS statistics_schemaname,
|
||||
s.stxname AS statistics_name,
|
||||
pg_get_userbyid(s.stxowner) AS statistics_owner,
|
||||
stat.expr,
|
||||
(stat.a).stanullfrac AS null_frac,
|
||||
(stat.a).stawidth AS avg_width,
|
||||
(stat.a).stadistinct AS n_distinct,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 1 THEN (stat.a).stavalues1
|
||||
WHEN (stat.a).stakind2 = 1 THEN (stat.a).stavalues2
|
||||
WHEN (stat.a).stakind3 = 1 THEN (stat.a).stavalues3
|
||||
WHEN (stat.a).stakind4 = 1 THEN (stat.a).stavalues4
|
||||
WHEN (stat.a).stakind5 = 1 THEN (stat.a).stavalues5
|
||||
END) AS most_common_vals,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 1 THEN (stat.a).stanumbers1
|
||||
WHEN (stat.a).stakind2 = 1 THEN (stat.a).stanumbers2
|
||||
WHEN (stat.a).stakind3 = 1 THEN (stat.a).stanumbers3
|
||||
WHEN (stat.a).stakind4 = 1 THEN (stat.a).stanumbers4
|
||||
WHEN (stat.a).stakind5 = 1 THEN (stat.a).stanumbers5
|
||||
END) AS most_common_freqs,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 2 THEN (stat.a).stavalues1
|
||||
WHEN (stat.a).stakind2 = 2 THEN (stat.a).stavalues2
|
||||
WHEN (stat.a).stakind3 = 2 THEN (stat.a).stavalues3
|
||||
WHEN (stat.a).stakind4 = 2 THEN (stat.a).stavalues4
|
||||
WHEN (stat.a).stakind5 = 2 THEN (stat.a).stavalues5
|
||||
END) AS histogram_bounds,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 3 THEN (stat.a).stanumbers1[1]
|
||||
WHEN (stat.a).stakind2 = 3 THEN (stat.a).stanumbers2[1]
|
||||
WHEN (stat.a).stakind3 = 3 THEN (stat.a).stanumbers3[1]
|
||||
WHEN (stat.a).stakind4 = 3 THEN (stat.a).stanumbers4[1]
|
||||
WHEN (stat.a).stakind5 = 3 THEN (stat.a).stanumbers5[1]
|
||||
END) correlation,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 4 THEN (stat.a).stavalues1
|
||||
WHEN (stat.a).stakind2 = 4 THEN (stat.a).stavalues2
|
||||
WHEN (stat.a).stakind3 = 4 THEN (stat.a).stavalues3
|
||||
WHEN (stat.a).stakind4 = 4 THEN (stat.a).stavalues4
|
||||
WHEN (stat.a).stakind5 = 4 THEN (stat.a).stavalues5
|
||||
END) AS most_common_elems,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 4 THEN (stat.a).stanumbers1
|
||||
WHEN (stat.a).stakind2 = 4 THEN (stat.a).stanumbers2
|
||||
WHEN (stat.a).stakind3 = 4 THEN (stat.a).stanumbers3
|
||||
WHEN (stat.a).stakind4 = 4 THEN (stat.a).stanumbers4
|
||||
WHEN (stat.a).stakind5 = 4 THEN (stat.a).stanumbers5
|
||||
END) AS most_common_elem_freqs,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 5 THEN (stat.a).stanumbers1
|
||||
WHEN (stat.a).stakind2 = 5 THEN (stat.a).stanumbers2
|
||||
WHEN (stat.a).stakind3 = 5 THEN (stat.a).stanumbers3
|
||||
WHEN (stat.a).stakind4 = 5 THEN (stat.a).stanumbers4
|
||||
WHEN (stat.a).stakind5 = 5 THEN (stat.a).stanumbers5
|
||||
END) AS elem_count_histogram
|
||||
FROM pg_statistic_ext s JOIN pg_class c ON (c.oid = s.stxrelid)
|
||||
LEFT JOIN pg_statistic_ext_data sd ON (s.oid = sd.stxoid)
|
||||
LEFT JOIN pg_namespace cn ON (cn.oid = c.relnamespace)
|
||||
LEFT JOIN pg_namespace sn ON (sn.oid = s.stxnamespace)
|
||||
JOIN LATERAL (
|
||||
SELECT unnest(pg_get_statisticsobjdef_expressions(s.oid)) AS expr,
|
||||
unnest(sd.stxdexpr)::pg_statistic AS a
|
||||
) stat ON (stat.expr IS NOT NULL)
|
||||
WHERE pg_has_role(c.relowner, 'USAGE')
|
||||
AND (c.relrowsecurity = false OR NOT row_security_active(c.oid));
|
||||
ELSE
|
||||
CREATE OR REPLACE VIEW pg_stats_ext WITH (security_barrier) AS
|
||||
SELECT cn.nspname AS schemaname,
|
||||
c.relname AS tablename,
|
||||
sn.nspname AS statistics_schemaname,
|
||||
s.stxname AS statistics_name,
|
||||
pg_get_userbyid(s.stxowner) AS statistics_owner,
|
||||
( SELECT array_agg(a.attname ORDER BY a.attnum)
|
||||
FROM unnest(s.stxkeys) k
|
||||
JOIN pg_attribute a
|
||||
ON (a.attrelid = s.stxrelid AND a.attnum = k)
|
||||
) AS attnames,
|
||||
pg_get_statisticsobjdef_expressions(s.oid) as exprs,
|
||||
s.stxkind AS kinds,
|
||||
sd.stxdinherit AS inherited,
|
||||
sd.stxdndistinct AS n_distinct,
|
||||
sd.stxddependencies AS dependencies,
|
||||
m.most_common_vals,
|
||||
m.most_common_val_nulls,
|
||||
m.most_common_freqs,
|
||||
m.most_common_base_freqs
|
||||
FROM pg_statistic_ext s JOIN pg_class c ON (c.oid = s.stxrelid)
|
||||
JOIN pg_statistic_ext_data sd ON (s.oid = sd.stxoid)
|
||||
LEFT JOIN pg_namespace cn ON (cn.oid = c.relnamespace)
|
||||
LEFT JOIN pg_namespace sn ON (sn.oid = s.stxnamespace)
|
||||
LEFT JOIN LATERAL
|
||||
( SELECT array_agg(values) AS most_common_vals,
|
||||
array_agg(nulls) AS most_common_val_nulls,
|
||||
array_agg(frequency) AS most_common_freqs,
|
||||
array_agg(base_frequency) AS most_common_base_freqs
|
||||
FROM pg_mcv_list_items(sd.stxdmcv)
|
||||
) m ON sd.stxdmcv IS NOT NULL
|
||||
WHERE pg_has_role(c.relowner, 'USAGE')
|
||||
AND (c.relrowsecurity = false OR NOT row_security_active(c.oid));
|
||||
|
||||
CREATE OR REPLACE VIEW pg_stats_ext_exprs WITH (security_barrier) AS
|
||||
SELECT cn.nspname AS schemaname,
|
||||
c.relname AS tablename,
|
||||
sn.nspname AS statistics_schemaname,
|
||||
s.stxname AS statistics_name,
|
||||
pg_get_userbyid(s.stxowner) AS statistics_owner,
|
||||
stat.expr,
|
||||
sd.stxdinherit AS inherited,
|
||||
(stat.a).stanullfrac AS null_frac,
|
||||
(stat.a).stawidth AS avg_width,
|
||||
(stat.a).stadistinct AS n_distinct,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 1 THEN (stat.a).stavalues1
|
||||
WHEN (stat.a).stakind2 = 1 THEN (stat.a).stavalues2
|
||||
WHEN (stat.a).stakind3 = 1 THEN (stat.a).stavalues3
|
||||
WHEN (stat.a).stakind4 = 1 THEN (stat.a).stavalues4
|
||||
WHEN (stat.a).stakind5 = 1 THEN (stat.a).stavalues5
|
||||
END) AS most_common_vals,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 1 THEN (stat.a).stanumbers1
|
||||
WHEN (stat.a).stakind2 = 1 THEN (stat.a).stanumbers2
|
||||
WHEN (stat.a).stakind3 = 1 THEN (stat.a).stanumbers3
|
||||
WHEN (stat.a).stakind4 = 1 THEN (stat.a).stanumbers4
|
||||
WHEN (stat.a).stakind5 = 1 THEN (stat.a).stanumbers5
|
||||
END) AS most_common_freqs,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 2 THEN (stat.a).stavalues1
|
||||
WHEN (stat.a).stakind2 = 2 THEN (stat.a).stavalues2
|
||||
WHEN (stat.a).stakind3 = 2 THEN (stat.a).stavalues3
|
||||
WHEN (stat.a).stakind4 = 2 THEN (stat.a).stavalues4
|
||||
WHEN (stat.a).stakind5 = 2 THEN (stat.a).stavalues5
|
||||
END) AS histogram_bounds,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 3 THEN (stat.a).stanumbers1[1]
|
||||
WHEN (stat.a).stakind2 = 3 THEN (stat.a).stanumbers2[1]
|
||||
WHEN (stat.a).stakind3 = 3 THEN (stat.a).stanumbers3[1]
|
||||
WHEN (stat.a).stakind4 = 3 THEN (stat.a).stanumbers4[1]
|
||||
WHEN (stat.a).stakind5 = 3 THEN (stat.a).stanumbers5[1]
|
||||
END) correlation,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 4 THEN (stat.a).stavalues1
|
||||
WHEN (stat.a).stakind2 = 4 THEN (stat.a).stavalues2
|
||||
WHEN (stat.a).stakind3 = 4 THEN (stat.a).stavalues3
|
||||
WHEN (stat.a).stakind4 = 4 THEN (stat.a).stavalues4
|
||||
WHEN (stat.a).stakind5 = 4 THEN (stat.a).stavalues5
|
||||
END) AS most_common_elems,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 4 THEN (stat.a).stanumbers1
|
||||
WHEN (stat.a).stakind2 = 4 THEN (stat.a).stanumbers2
|
||||
WHEN (stat.a).stakind3 = 4 THEN (stat.a).stanumbers3
|
||||
WHEN (stat.a).stakind4 = 4 THEN (stat.a).stanumbers4
|
||||
WHEN (stat.a).stakind5 = 4 THEN (stat.a).stanumbers5
|
||||
END) AS most_common_elem_freqs,
|
||||
(CASE
|
||||
WHEN (stat.a).stakind1 = 5 THEN (stat.a).stanumbers1
|
||||
WHEN (stat.a).stakind2 = 5 THEN (stat.a).stanumbers2
|
||||
WHEN (stat.a).stakind3 = 5 THEN (stat.a).stanumbers3
|
||||
WHEN (stat.a).stakind4 = 5 THEN (stat.a).stanumbers4
|
||||
WHEN (stat.a).stakind5 = 5 THEN (stat.a).stanumbers5
|
||||
END) AS elem_count_histogram
|
||||
FROM pg_statistic_ext s JOIN pg_class c ON (c.oid = s.stxrelid)
|
||||
LEFT JOIN pg_statistic_ext_data sd ON (s.oid = sd.stxoid)
|
||||
LEFT JOIN pg_namespace cn ON (cn.oid = c.relnamespace)
|
||||
LEFT JOIN pg_namespace sn ON (sn.oid = s.stxnamespace)
|
||||
JOIN LATERAL (
|
||||
SELECT unnest(pg_get_statisticsobjdef_expressions(s.oid)) AS expr,
|
||||
unnest(sd.stxdexpr)::pg_statistic AS a
|
||||
) stat ON (stat.expr IS NOT NULL)
|
||||
WHERE pg_has_role(c.relowner, 'USAGE')
|
||||
AND (c.relrowsecurity = false OR NOT row_security_active(c.oid));
|
||||
END IF;
|
||||
END $$;
|
||||
@@ -1,5 +0,0 @@
|
||||
-- Testing that this migration actually works would require spinning up a
|
||||
-- Postgres instance running on a vulnerable version. Let's trust that the
|
||||
-- Postgres community created a SQL fix that actually works.
|
||||
|
||||
SELECT 1;
|
||||
@@ -6,12 +6,12 @@ use compute_api::responses::{
|
||||
ComputeConfig, ControlPlaneComputeStatus, ControlPlaneConfigResponse,
|
||||
};
|
||||
use reqwest::StatusCode;
|
||||
use tokio_postgres::{Client, Config};
|
||||
use tokio_postgres::Client;
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
use crate::config;
|
||||
use crate::metrics::{CPLANE_REQUESTS_TOTAL, CPlaneRequestRPC, UNKNOWN_HTTP_STATUS};
|
||||
use crate::migration::{Migration, MigrationRunner};
|
||||
use crate::migration::MigrationRunner;
|
||||
use crate::params::PG_HBA_ALL_MD5;
|
||||
|
||||
// Do control plane request and return response if any. In case of error it
|
||||
@@ -169,7 +169,7 @@ pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn handle_migrations(config: Config) -> Result<()> {
|
||||
pub async fn handle_migrations(client: &mut Client) -> Result<()> {
|
||||
info!("handle migrations");
|
||||
|
||||
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
@@ -178,50 +178,30 @@ pub async fn handle_migrations(config: Config) -> Result<()> {
|
||||
|
||||
// Add new migrations in numerical order.
|
||||
let migrations = [
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0001-neon_superuser_bypass_rls.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!("./migrations/0002-alter_roles.sql")),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
include_str!("./migrations/0001-neon_superuser_bypass_rls.sql"),
|
||||
include_str!("./migrations/0002-alter_roles.sql"),
|
||||
include_str!("./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"),
|
||||
include_str!("./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"),
|
||||
include_str!("./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"),
|
||||
include_str!("./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"),
|
||||
include_str!(
|
||||
"./migrations/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
),
|
||||
include_str!(
|
||||
"./migrations/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
"./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
),
|
||||
include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
|
||||
include_str!(
|
||||
"./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql"
|
||||
)),
|
||||
Migration::Cluster(include_str!(
|
||||
),
|
||||
include_str!(
|
||||
"./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
|
||||
)),
|
||||
Migration::PerDatabase(include_str!("./migrations/0012-fix-CVE-2024-4317.sql")),
|
||||
),
|
||||
];
|
||||
|
||||
let runner = match MigrationRunner::new(config, &migrations) {
|
||||
Ok(runner) => runner,
|
||||
Err(e) => {
|
||||
error!("Failed to construct a migration runner: {}", e);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
MigrationRunner::new(client, &migrations)
|
||||
.run_migrations()
|
||||
.await?;
|
||||
|
||||
runner.run_migrations().await.map_err(|e| {
|
||||
error!("Failed to run the migrations: {}", e);
|
||||
e
|
||||
})
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1285,6 +1285,10 @@ impl Timeline {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
if query.is_empty() {
|
||||
return Ok(BTreeMap::default());
|
||||
}
|
||||
|
||||
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
|
||||
Some(ReadPath::new(
|
||||
query.total_keyspace(),
|
||||
|
||||
@@ -16,9 +16,9 @@ use tracing::field::display;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use super::AsyncRW;
|
||||
use super::conn_pool::poll_client;
|
||||
use super::conn_pool::poll_client_generic;
|
||||
use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
|
||||
use super::http_conn_pool::{self, HttpConnPool, Send, poll_http2_client};
|
||||
use super::http_conn_pool::{self, HttpConnPool};
|
||||
use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool};
|
||||
use crate::auth::backend::local::StaticAuthRules;
|
||||
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
|
||||
@@ -42,10 +42,9 @@ use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
|
||||
|
||||
pub(crate) struct PoolingBackend {
|
||||
pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
|
||||
pub(crate) http_conn_pool: Arc<GlobalConnPool<HttpConnPool>>,
|
||||
pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
|
||||
pub(crate) pool:
|
||||
Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
|
||||
pub(crate) pool: Arc<GlobalConnPool<EndpointConnPool<postgres_client::Client>>>,
|
||||
|
||||
pub(crate) config: &'static ProxyConfig,
|
||||
pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
|
||||
@@ -212,7 +211,7 @@ impl PoolingBackend {
|
||||
None
|
||||
} else {
|
||||
debug!("pool: looking for an existing connection");
|
||||
self.pool.get(ctx, &conn_info)?
|
||||
self.pool.get(ctx, &conn_info)
|
||||
};
|
||||
|
||||
if let Some(client) = maybe_client {
|
||||
@@ -246,9 +245,9 @@ impl PoolingBackend {
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
conn_info: ConnInfo,
|
||||
) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
|
||||
) -> Result<http_conn_pool::Client, HttpConnError> {
|
||||
debug!("pool: looking for an existing connection");
|
||||
if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
|
||||
if let Some(client) = self.http_conn_pool.get(ctx, &conn_info) {
|
||||
return Ok(client);
|
||||
}
|
||||
|
||||
@@ -532,7 +531,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError {
|
||||
}
|
||||
|
||||
struct TokioMechanism {
|
||||
pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
|
||||
pool: Arc<GlobalConnPool<EndpointConnPool<postgres_client::Client>>>,
|
||||
conn_info: ConnInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
|
||||
@@ -578,7 +577,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
|
||||
}
|
||||
|
||||
Ok(poll_client(
|
||||
Ok(poll_client_generic(
|
||||
self.pool.clone(),
|
||||
ctx,
|
||||
self.conn_info.clone(),
|
||||
@@ -593,7 +592,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
}
|
||||
|
||||
struct HyperMechanism {
|
||||
pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
|
||||
pool: Arc<GlobalConnPool<HttpConnPool>>,
|
||||
conn_info: ConnInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
|
||||
@@ -603,7 +602,7 @@ struct HyperMechanism {
|
||||
|
||||
#[async_trait]
|
||||
impl ConnectMechanism for HyperMechanism {
|
||||
type Connection = http_conn_pool::Client<Send>;
|
||||
type Connection = http_conn_pool::Client;
|
||||
type ConnectError = HttpConnError;
|
||||
type Error = HttpConnError;
|
||||
|
||||
@@ -639,15 +638,26 @@ impl ConnectMechanism for HyperMechanism {
|
||||
info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
|
||||
}
|
||||
|
||||
Ok(poll_http2_client(
|
||||
let client = poll_client_generic(
|
||||
self.pool.clone(),
|
||||
ctx,
|
||||
&self.conn_info,
|
||||
self.conn_info.clone(),
|
||||
client,
|
||||
connection,
|
||||
self.conn_id,
|
||||
node_info.aux.clone(),
|
||||
))
|
||||
);
|
||||
|
||||
// auth-broker -> local-proxy clients don't return to the pool, since
|
||||
// they are multiplexing and cloneable. So instead we insert it once here.
|
||||
if let Some(endpoint) = self.conn_info.endpoint_cache_key() {
|
||||
self.pool
|
||||
.get_or_create_endpoint_pool(&endpoint)
|
||||
.write()
|
||||
.register(&client);
|
||||
}
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
|
||||
|
||||
@@ -11,7 +11,7 @@ use smallvec::SmallVec;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, error, info, info_span, warn};
|
||||
use tracing::{error, info, info_span, warn};
|
||||
#[cfg(test)]
|
||||
use {
|
||||
super::conn_pool_lib::GlobalConnPoolOptions,
|
||||
@@ -20,8 +20,7 @@ use {
|
||||
};
|
||||
|
||||
use super::conn_pool_lib::{
|
||||
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, EndpointConnPool,
|
||||
GlobalConnPool,
|
||||
ClientDataEnum, ClientInnerCommon, ConnInfo, EndpointConnPoolExt, GlobalConnPool,
|
||||
};
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
@@ -29,6 +28,7 @@ use crate::metrics::Metrics;
|
||||
use crate::tls::postgres_rustls::MakeRustlsConnect;
|
||||
|
||||
type TlsStream = <MakeRustlsConnect as MakeTlsConnect<TcpStream>>::Stream;
|
||||
pub(super) type Conn = postgres_client::Connection<TcpStream, TlsStream>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ConnInfoWithAuth {
|
||||
@@ -56,20 +56,20 @@ impl fmt::Display for ConnInfo {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
global_pool: Arc<GlobalConnPool<C, EndpointConnPool<C>>>,
|
||||
pub(crate) fn poll_client_generic<P: EndpointConnPoolExt>(
|
||||
global_pool: Arc<GlobalConnPool<P>>,
|
||||
ctx: &RequestContext,
|
||||
conn_info: ConnInfo,
|
||||
client: C,
|
||||
mut connection: postgres_client::Connection<TcpStream, TlsStream>,
|
||||
client: P::ClientInner,
|
||||
connection: P::Connection,
|
||||
conn_id: uuid::Uuid,
|
||||
aux: MetricsAuxInfo,
|
||||
) -> Client<C> {
|
||||
) -> P::Client {
|
||||
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
|
||||
let mut session_id = ctx.session_id();
|
||||
let session_id = ctx.session_id();
|
||||
let (tx, mut rx) = tokio::sync::watch::channel(session_id);
|
||||
|
||||
let span = info_span!(parent: None, "connection", %conn_id);
|
||||
let span = info_span!(parent: None, "connection", %conn_id, %session_id);
|
||||
let cold_start_info = ctx.cold_start_info();
|
||||
span.in_scope(|| {
|
||||
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
|
||||
@@ -85,27 +85,30 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
let cancel = CancellationToken::new();
|
||||
let cancelled = cancel.clone().cancelled_owned();
|
||||
|
||||
tokio::spawn(
|
||||
async move {
|
||||
tokio::spawn(async move {
|
||||
let _conn_gauge = conn_gauge;
|
||||
let mut idle_timeout = pin!(tokio::time::sleep(idle));
|
||||
let mut cancelled = pin!(cancelled);
|
||||
let mut connection = pin!(P::spawn_conn(connection));
|
||||
|
||||
poll_fn(move |cx| {
|
||||
let _enter = span.enter();
|
||||
|
||||
if cancelled.as_mut().poll(cx).is_ready() {
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(())
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
match rx.has_changed() {
|
||||
Ok(true) => {
|
||||
session_id = *rx.borrow_and_update();
|
||||
info!(%session_id, "changed session");
|
||||
let session_id = *rx.borrow_and_update();
|
||||
span.record("session_id", tracing::field::display(session_id));
|
||||
info!("changed session");
|
||||
idle_timeout.as_mut().reset(Instant::now() + idle);
|
||||
}
|
||||
Err(_) => {
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(())
|
||||
return Poll::Ready(());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
@@ -117,48 +120,25 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
if let Some(pool) = pool.clone().upgrade() {
|
||||
// remove client from pool - should close the connection if it's idle.
|
||||
// does nothing if the client is currently checked-out and in-use
|
||||
if pool.write().remove_client(db_user.clone(), conn_id) {
|
||||
if pool.write().remove_conn(db_user.clone(), conn_id) {
|
||||
info!("idle connection removed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session_id, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session_id, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session_id, "connection error: {}", e);
|
||||
break
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
ready!(connection.as_mut().poll(cx));
|
||||
|
||||
// remove from connection pool
|
||||
if let Some(pool) = pool.clone().upgrade() {
|
||||
if pool.write().remove_client(db_user.clone(), conn_id) {
|
||||
if pool.write().remove_conn(db_user.clone(), conn_id) {
|
||||
info!("closed connection removed");
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
}).await;
|
||||
|
||||
}
|
||||
.instrument(span));
|
||||
})
|
||||
.await;
|
||||
});
|
||||
let inner = ClientInnerCommon {
|
||||
inner: client,
|
||||
aux,
|
||||
@@ -169,7 +149,42 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
}),
|
||||
};
|
||||
|
||||
Client::new(inner, conn_info, pool_clone)
|
||||
P::wrap_client(inner, conn_info, pool_clone)
|
||||
}
|
||||
|
||||
pub async fn poll_tokio_postgres_conn_really(mut connection: Conn) {
|
||||
poll_fn(move |cx| {
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!("notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(
|
||||
pid = notif.process_id(),
|
||||
channel = notif.channel(),
|
||||
"notification received"
|
||||
);
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!("unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!("connection error: {}", e);
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -179,11 +194,11 @@ pub(crate) struct ClientDataRemote {
|
||||
}
|
||||
|
||||
impl ClientDataRemote {
|
||||
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
|
||||
&mut self.session
|
||||
pub fn session(&self) -> &tokio::sync::watch::Sender<uuid::Uuid> {
|
||||
&self.session
|
||||
}
|
||||
|
||||
pub fn cancel(&mut self) {
|
||||
pub fn cancel(&self) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
@@ -195,6 +210,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::serverless::cancel_set::CancelSet;
|
||||
use crate::serverless::conn_pool_lib::{Client, ClientInnerExt};
|
||||
use crate::types::{BranchId, EndpointId, ProjectId};
|
||||
|
||||
struct MockClient(Arc<AtomicBool>);
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{self, AtomicUsize};
|
||||
use std::sync::{Arc, Weak};
|
||||
@@ -12,11 +11,10 @@ use rand::Rng;
|
||||
use smol_str::ToSmolStr;
|
||||
use tracing::{Span, debug, info};
|
||||
|
||||
use super::backend::HttpConnError;
|
||||
use super::conn_pool::ClientDataRemote;
|
||||
use super::http_conn_pool::ClientDataHttp;
|
||||
use super::conn_pool::{ClientDataRemote, poll_tokio_postgres_conn_really};
|
||||
use super::local_conn_pool::ClientDataLocal;
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::config::HttpConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
|
||||
@@ -51,7 +49,6 @@ impl ConnInfo {
|
||||
pub(crate) enum ClientDataEnum {
|
||||
Remote(ClientDataRemote),
|
||||
Local(ClientDataLocal),
|
||||
Http(ClientDataHttp),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -64,14 +61,9 @@ pub(crate) struct ClientInnerCommon<C: ClientInnerExt> {
|
||||
|
||||
impl<C: ClientInnerExt> Drop for ClientInnerCommon<C> {
|
||||
fn drop(&mut self) {
|
||||
match &mut self.data {
|
||||
ClientDataEnum::Remote(remote_data) => {
|
||||
remote_data.cancel();
|
||||
}
|
||||
ClientDataEnum::Local(local_data) => {
|
||||
local_data.cancel();
|
||||
}
|
||||
ClientDataEnum::Http(_http_data) => (),
|
||||
match &self.data {
|
||||
ClientDataEnum::Remote(remote_data) => remote_data.cancel(),
|
||||
ClientDataEnum::Local(local_data) => local_data.cancel(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -81,8 +73,8 @@ impl<C: ClientInnerExt> ClientInnerCommon<C> {
|
||||
self.conn_id
|
||||
}
|
||||
|
||||
pub(crate) fn get_data(&mut self) -> &mut ClientDataEnum {
|
||||
&mut self.data
|
||||
pub(crate) fn get_data(&self) -> &ClientDataEnum {
|
||||
&self.data
|
||||
}
|
||||
}
|
||||
|
||||
@@ -326,12 +318,70 @@ impl<C: ClientInnerExt> DbUserConn<C> for DbUserConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait EndpointConnPoolExt<C: ClientInnerExt> {
|
||||
pub(crate) trait EndpointConnPoolExt: Send + Sync + 'static {
|
||||
type Client;
|
||||
type ClientInner: ClientInnerExt;
|
||||
type Connection: Send + 'static;
|
||||
|
||||
fn create(config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self;
|
||||
fn wrap_client(
|
||||
inner: ClientInnerCommon<Self::ClientInner>,
|
||||
conn_info: ConnInfo,
|
||||
pool: Weak<RwLock<Self>>,
|
||||
) -> Self::Client;
|
||||
|
||||
fn get_conn_entry(
|
||||
&mut self,
|
||||
db_user: (DbName, RoleName),
|
||||
) -> Option<ClientInnerCommon<Self::ClientInner>>;
|
||||
fn remove_conn(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool;
|
||||
|
||||
fn spawn_conn(conn: Self::Connection) -> impl Future<Output = ()> + Send + 'static;
|
||||
|
||||
fn clear_closed(&mut self) -> usize;
|
||||
fn total_conns(&self) -> usize;
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> EndpointConnPoolExt<C> for EndpointConnPool<C> {
|
||||
impl<C: ClientInnerExt> EndpointConnPoolExt for EndpointConnPool<C> {
|
||||
type Client = Client<C>;
|
||||
type ClientInner = C;
|
||||
type Connection = super::conn_pool::Conn;
|
||||
|
||||
fn create(config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self {
|
||||
EndpointConnPool {
|
||||
pools: HashMap::new(),
|
||||
total_conns: 0,
|
||||
max_conns: config.pool_options.max_conns_per_endpoint,
|
||||
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
|
||||
global_connections_count,
|
||||
global_pool_size_max_conns: config.pool_options.max_total_conns,
|
||||
pool_name: String::from("remote"),
|
||||
}
|
||||
}
|
||||
|
||||
fn wrap_client(
|
||||
client: ClientInnerCommon<Self::ClientInner>,
|
||||
conn_info: ConnInfo,
|
||||
pool: Weak<RwLock<Self>>,
|
||||
) -> Self::Client {
|
||||
Client::new(client, conn_info.clone(), pool)
|
||||
}
|
||||
|
||||
fn get_conn_entry(
|
||||
&mut self,
|
||||
db_user: (DbName, RoleName),
|
||||
) -> Option<ClientInnerCommon<Self::ClientInner>> {
|
||||
Some(self.get_conn_entry(db_user)?.conn)
|
||||
}
|
||||
|
||||
fn remove_conn(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool {
|
||||
self.remove_client(db_user, conn_id)
|
||||
}
|
||||
|
||||
async fn spawn_conn(conn: Self::Connection) {
|
||||
poll_tokio_postgres_conn_really(conn).await;
|
||||
}
|
||||
|
||||
fn clear_closed(&mut self) -> usize {
|
||||
let mut clients_removed: usize = 0;
|
||||
for db_pool in self.pools.values_mut() {
|
||||
@@ -345,10 +395,9 @@ impl<C: ClientInnerExt> EndpointConnPoolExt<C> for EndpointConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct GlobalConnPool<C, P>
|
||||
pub(crate) struct GlobalConnPool<P>
|
||||
where
|
||||
C: ClientInnerExt,
|
||||
P: EndpointConnPoolExt<C>,
|
||||
P: EndpointConnPoolExt,
|
||||
{
|
||||
// endpoint -> per-endpoint connection pool
|
||||
//
|
||||
@@ -367,8 +416,6 @@ where
|
||||
pub(crate) global_connections_count: Arc<AtomicUsize>,
|
||||
|
||||
pub(crate) config: &'static crate::config::HttpConfig,
|
||||
|
||||
_marker: PhantomData<C>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
@@ -391,10 +438,9 @@ pub struct GlobalConnPoolOptions {
|
||||
pub max_total_conns: usize,
|
||||
}
|
||||
|
||||
impl<C, P> GlobalConnPool<C, P>
|
||||
impl<P> GlobalConnPool<P>
|
||||
where
|
||||
C: ClientInnerExt,
|
||||
P: EndpointConnPoolExt<C>,
|
||||
P: EndpointConnPoolExt,
|
||||
{
|
||||
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
|
||||
let shards = config.pool_options.pool_shards;
|
||||
@@ -403,7 +449,6 @@ where
|
||||
global_pool_size: AtomicUsize::new(0),
|
||||
config,
|
||||
global_connections_count: Arc::new(AtomicUsize::new(0)),
|
||||
_marker: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -492,80 +537,72 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> GlobalConnPool<C, EndpointConnPool<C>> {
|
||||
impl<P: EndpointConnPoolExt> GlobalConnPool<P> {
|
||||
pub(crate) fn get(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestContext,
|
||||
conn_info: &ConnInfo,
|
||||
) -> Result<Option<Client<C>>, HttpConnError> {
|
||||
let mut client: Option<ClientInnerCommon<C>> = None;
|
||||
let Some(endpoint) = conn_info.endpoint_cache_key() else {
|
||||
return Ok(None);
|
||||
};
|
||||
) -> Option<P::Client> {
|
||||
let endpoint = conn_info.endpoint_cache_key()?;
|
||||
|
||||
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
|
||||
if let Some(entry) = endpoint_pool
|
||||
let endpoint_pool = self.get_endpoint_pool(&endpoint)?;
|
||||
let client = endpoint_pool
|
||||
.write()
|
||||
.get_conn_entry(conn_info.db_and_user())
|
||||
{
|
||||
client = Some(entry.conn);
|
||||
}
|
||||
.get_conn_entry(conn_info.db_and_user())?;
|
||||
|
||||
let endpoint_pool = Arc::downgrade(&endpoint_pool);
|
||||
|
||||
// ok return cached connection if found and establish a new one otherwise
|
||||
if let Some(mut client) = client {
|
||||
if client.inner.is_closed() {
|
||||
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
|
||||
return Ok(None);
|
||||
}
|
||||
tracing::Span::current()
|
||||
.record("conn_id", tracing::field::display(client.get_conn_id()));
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
|
||||
match client.get_data() {
|
||||
ClientDataEnum::Local(data) => {
|
||||
data.session().send(ctx.session_id())?;
|
||||
}
|
||||
|
||||
ClientDataEnum::Remote(data) => {
|
||||
data.session().send(ctx.session_id())?;
|
||||
}
|
||||
ClientDataEnum::Http(_) => (),
|
||||
}
|
||||
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.success();
|
||||
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
|
||||
if client.inner.is_closed() {
|
||||
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
|
||||
return None;
|
||||
}
|
||||
Ok(None)
|
||||
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.get_conn_id()));
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
|
||||
match client.get_data() {
|
||||
ClientDataEnum::Local(data) => {
|
||||
data.session().send(ctx.session_id()).ok()?;
|
||||
}
|
||||
ClientDataEnum::Remote(data) => {
|
||||
data.session().send(ctx.session_id()).ok()?;
|
||||
}
|
||||
}
|
||||
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
Some(P::wrap_client(client, conn_info.clone(), endpoint_pool))
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: EndpointConnPoolExt> GlobalConnPool<P> {
|
||||
pub(crate) fn get_endpoint_pool(
|
||||
self: &Arc<Self>,
|
||||
endpoint: &EndpointCacheKey,
|
||||
) -> Option<Arc<RwLock<P>>> {
|
||||
Some(self.global_pool.get(endpoint)?.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn get_or_create_endpoint_pool(
|
||||
self: &Arc<Self>,
|
||||
endpoint: &EndpointCacheKey,
|
||||
) -> Arc<RwLock<EndpointConnPool<C>>> {
|
||||
) -> Arc<RwLock<P>> {
|
||||
// fast path
|
||||
if let Some(pool) = self.global_pool.get(endpoint) {
|
||||
return pool.clone();
|
||||
}
|
||||
|
||||
// slow path
|
||||
let new_pool = Arc::new(RwLock::new(EndpointConnPool {
|
||||
pools: HashMap::new(),
|
||||
total_conns: 0,
|
||||
max_conns: self.config.pool_options.max_conns_per_endpoint,
|
||||
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
|
||||
global_connections_count: self.global_connections_count.clone(),
|
||||
global_pool_size_max_conns: self.config.pool_options.max_total_conns,
|
||||
pool_name: String::from("remote"),
|
||||
}));
|
||||
let new_pool = Arc::new(RwLock::new(P::create(
|
||||
self.config,
|
||||
self.global_connections_count.clone(),
|
||||
)));
|
||||
|
||||
// find or create a pool for this endpoint
|
||||
let mut created = false;
|
||||
@@ -592,6 +629,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C, EndpointConnPool<C>> {
|
||||
pool
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Client<C: ClientInnerExt> {
|
||||
span: Span,
|
||||
inner: Option<ClientInnerCommon<C>>,
|
||||
|
||||
@@ -4,32 +4,25 @@ use std::sync::{Arc, Weak};
|
||||
|
||||
use hyper::client::conn::http2;
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo};
|
||||
use parking_lot::RwLock;
|
||||
use smol_str::ToSmolStr;
|
||||
use tracing::{Instrument, debug, error, info, info_span};
|
||||
use tracing::{error, info};
|
||||
|
||||
use super::AsyncRW;
|
||||
use super::backend::HttpConnError;
|
||||
use super::conn_pool_lib::{
|
||||
ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry,
|
||||
EndpointConnPoolExt, GlobalConnPool,
|
||||
ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry, EndpointConnPoolExt,
|
||||
};
|
||||
use crate::config::HttpConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
|
||||
use crate::protocol2::ConnectionInfoExtra;
|
||||
use crate::types::EndpointCacheKey;
|
||||
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
|
||||
|
||||
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
|
||||
pub(crate) type Connect = http2::Connection<TokioIo<AsyncRW>, hyper::body::Incoming, TokioExecutor>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ClientDataHttp();
|
||||
|
||||
// Per-endpoint connection pool
|
||||
// Number of open connections is limited by the `max_conns_per_endpoint`.
|
||||
pub(crate) struct HttpConnPool<C: ClientInnerExt + Clone> {
|
||||
pub(crate) struct HttpConnPool {
|
||||
// TODO(conrad):
|
||||
// either we should open more connections depending on stream count
|
||||
// (not exposed by hyper, need our own counter)
|
||||
@@ -39,13 +32,13 @@ pub(crate) struct HttpConnPool<C: ClientInnerExt + Clone> {
|
||||
// seems somewhat redundant though.
|
||||
//
|
||||
// Probably we should run a semaphore and just the single conn. TBD.
|
||||
conns: VecDeque<ConnPoolEntry<C>>,
|
||||
conns: VecDeque<ConnPoolEntry<Send>>,
|
||||
_guard: HttpEndpointPoolsGuard<'static>,
|
||||
global_connections_count: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
|
||||
fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<C>> {
|
||||
impl HttpConnPool {
|
||||
fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<Send>> {
|
||||
let Self { conns, .. } = self;
|
||||
|
||||
loop {
|
||||
@@ -83,9 +76,59 @@ impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
|
||||
}
|
||||
removed > 0
|
||||
}
|
||||
|
||||
pub fn register(&mut self, client: &Client) {
|
||||
self.conns.push_back(ConnPoolEntry {
|
||||
conn: client.inner.clone(),
|
||||
_last_access: std::time::Instant::now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> EndpointConnPoolExt<C> for HttpConnPool<C> {
|
||||
impl EndpointConnPoolExt for HttpConnPool {
|
||||
type Client = Client;
|
||||
type ClientInner = Send;
|
||||
type Connection = Connect;
|
||||
|
||||
fn create(_config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self {
|
||||
HttpConnPool {
|
||||
conns: VecDeque::new(),
|
||||
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
|
||||
global_connections_count,
|
||||
}
|
||||
}
|
||||
|
||||
fn wrap_client(
|
||||
inner: ClientInnerCommon<Self::ClientInner>,
|
||||
_conn_info: ConnInfo,
|
||||
_pool: Weak<parking_lot::RwLock<Self>>,
|
||||
) -> Self::Client {
|
||||
Client::new(inner)
|
||||
}
|
||||
|
||||
fn get_conn_entry(
|
||||
&mut self,
|
||||
_db_user: (crate::types::DbName, crate::types::RoleName),
|
||||
) -> Option<ClientInnerCommon<Self::ClientInner>> {
|
||||
Some(self.get_conn_entry()?.conn)
|
||||
}
|
||||
|
||||
fn remove_conn(
|
||||
&mut self,
|
||||
_db_user: (crate::types::DbName, crate::types::RoleName),
|
||||
conn_id: uuid::Uuid,
|
||||
) -> bool {
|
||||
self.remove_conn(conn_id)
|
||||
}
|
||||
|
||||
async fn spawn_conn(conn: Self::Connection) {
|
||||
let res = conn.await;
|
||||
match res {
|
||||
Ok(()) => info!("connection closed"),
|
||||
Err(e) => error!("connection error: {e:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_closed(&mut self) -> usize {
|
||||
let Self { conns, .. } = self;
|
||||
let old_len = conns.len();
|
||||
@@ -100,7 +143,7 @@ impl<C: ClientInnerExt + Clone> EndpointConnPoolExt<C> for HttpConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> Drop for HttpConnPool<C> {
|
||||
impl Drop for HttpConnPool {
|
||||
fn drop(&mut self) {
|
||||
if !self.conns.is_empty() {
|
||||
self.global_connections_count
|
||||
@@ -114,154 +157,12 @@ impl<C: ClientInnerExt + Clone> Drop for HttpConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> GlobalConnPool<C, HttpConnPool<C>> {
|
||||
#[expect(unused_results)]
|
||||
pub(crate) fn get(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestContext,
|
||||
conn_info: &ConnInfo,
|
||||
) -> Result<Option<Client<C>>, HttpConnError> {
|
||||
let result: Result<Option<Client<C>>, HttpConnError>;
|
||||
let Some(endpoint) = conn_info.endpoint_cache_key() else {
|
||||
result = Ok(None);
|
||||
return result;
|
||||
};
|
||||
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
|
||||
let Some(client) = endpoint_pool.write().get_conn_entry() else {
|
||||
result = Ok(None);
|
||||
return result;
|
||||
};
|
||||
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id));
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.success();
|
||||
|
||||
Ok(Some(Client::new(client.conn.clone())))
|
||||
}
|
||||
|
||||
fn get_or_create_endpoint_pool(
|
||||
self: &Arc<Self>,
|
||||
endpoint: &EndpointCacheKey,
|
||||
) -> Arc<RwLock<HttpConnPool<C>>> {
|
||||
// fast path
|
||||
if let Some(pool) = self.global_pool.get(endpoint) {
|
||||
return pool.clone();
|
||||
}
|
||||
|
||||
// slow path
|
||||
let new_pool = Arc::new(RwLock::new(HttpConnPool {
|
||||
conns: VecDeque::new(),
|
||||
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
|
||||
global_connections_count: self.global_connections_count.clone(),
|
||||
}));
|
||||
|
||||
// find or create a pool for this endpoint
|
||||
let mut created = false;
|
||||
let pool = self
|
||||
.global_pool
|
||||
.entry(endpoint.clone())
|
||||
.or_insert_with(|| {
|
||||
created = true;
|
||||
new_pool
|
||||
})
|
||||
.clone();
|
||||
|
||||
// log new global pool size
|
||||
if created {
|
||||
let global_pool_size = self
|
||||
.global_pool_size
|
||||
.fetch_add(1, atomic::Ordering::Relaxed)
|
||||
+ 1;
|
||||
info!(
|
||||
"pool: created new pool for '{endpoint}', global pool size now {global_pool_size}"
|
||||
);
|
||||
}
|
||||
|
||||
pool
|
||||
}
|
||||
pub(crate) struct Client {
|
||||
pub(crate) inner: ClientInnerCommon<Send>,
|
||||
}
|
||||
|
||||
pub(crate) fn poll_http2_client(
|
||||
global_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
|
||||
ctx: &RequestContext,
|
||||
conn_info: &ConnInfo,
|
||||
client: Send,
|
||||
connection: Connect,
|
||||
conn_id: uuid::Uuid,
|
||||
aux: MetricsAuxInfo,
|
||||
) -> Client<Send> {
|
||||
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
|
||||
let session_id = ctx.session_id();
|
||||
|
||||
let span = info_span!(parent: None, "connection", %conn_id);
|
||||
let cold_start_info = ctx.cold_start_info();
|
||||
span.in_scope(|| {
|
||||
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
|
||||
});
|
||||
|
||||
let pool = match conn_info.endpoint_cache_key() {
|
||||
Some(endpoint) => {
|
||||
let pool = global_pool.get_or_create_endpoint_pool(&endpoint);
|
||||
let client = ClientInnerCommon {
|
||||
inner: client.clone(),
|
||||
aux: aux.clone(),
|
||||
conn_id,
|
||||
data: ClientDataEnum::Http(ClientDataHttp()),
|
||||
};
|
||||
pool.write().conns.push_back(ConnPoolEntry {
|
||||
conn: client,
|
||||
_last_access: std::time::Instant::now(),
|
||||
});
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_pool_opened_connections
|
||||
.get_metric()
|
||||
.inc();
|
||||
|
||||
Arc::downgrade(&pool)
|
||||
}
|
||||
None => Weak::new(),
|
||||
};
|
||||
|
||||
tokio::spawn(
|
||||
async move {
|
||||
let _conn_gauge = conn_gauge;
|
||||
let res = connection.await;
|
||||
match res {
|
||||
Ok(()) => info!("connection closed"),
|
||||
Err(e) => error!(%session_id, "connection error: {e:?}"),
|
||||
}
|
||||
|
||||
// remove from connection pool
|
||||
if let Some(pool) = pool.clone().upgrade() {
|
||||
if pool.write().remove_conn(conn_id) {
|
||||
info!("closed connection removed");
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(span),
|
||||
);
|
||||
|
||||
let client = ClientInnerCommon {
|
||||
inner: client,
|
||||
aux,
|
||||
conn_id,
|
||||
data: ClientDataEnum::Http(ClientDataHttp()),
|
||||
};
|
||||
|
||||
Client::new(client)
|
||||
}
|
||||
|
||||
pub(crate) struct Client<C: ClientInnerExt + Clone> {
|
||||
pub(crate) inner: ClientInnerCommon<C>,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> Client<C> {
|
||||
pub(self) fn new(inner: ClientInnerCommon<C>) -> Self {
|
||||
impl Client {
|
||||
pub(self) fn new(inner: ClientInnerCommon<Send>) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
|
||||
|
||||
@@ -53,11 +53,11 @@ pub(crate) struct ClientDataLocal {
|
||||
}
|
||||
|
||||
impl ClientDataLocal {
|
||||
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
|
||||
&mut self.session
|
||||
pub fn session(&self) -> &tokio::sync::watch::Sender<uuid::Uuid> {
|
||||
&self.session
|
||||
}
|
||||
|
||||
pub fn cancel(&mut self) {
|
||||
pub fn cancel(&self) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
@@ -99,7 +99,7 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
|
||||
.map(|entry| entry.conn);
|
||||
|
||||
// ok return cached connection if found and establish a new one otherwise
|
||||
if let Some(mut client) = client {
|
||||
if let Some(client) = client {
|
||||
if client.inner.is_closed() {
|
||||
info!("local_pool: cached connection '{conn_info}' is closed, opening a new one");
|
||||
return Ok(None);
|
||||
@@ -120,11 +120,9 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
|
||||
ClientDataEnum::Local(data) => {
|
||||
data.session().send(ctx.session_id())?;
|
||||
}
|
||||
|
||||
ClientDataEnum::Remote(data) => {
|
||||
data.session().send(ctx.session_id())?;
|
||||
}
|
||||
ClientDataEnum::Http(_) => (),
|
||||
}
|
||||
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
|
||||
@@ -75,7 +75,6 @@ def test_compute_migrations_retry(neon_simple_env: NeonEnv, compute_migrations_d
|
||||
pattern = rf"Running migration id={i}"
|
||||
|
||||
endpoint.log_contains(pattern)
|
||||
endpoint.log_contains(rf"Finished migration id={i}")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
Reference in New Issue
Block a user