diff --git a/.github/workflows/_build-and-test-locally.yml b/.github/workflows/_build-and-test-locally.yml index e9483492c9..1dec8106b4 100644 --- a/.github/workflows/_build-and-test-locally.yml +++ b/.github/workflows/_build-and-test-locally.yml @@ -158,8 +158,6 @@ jobs: - name: Run cargo build run: | - PQ_LIB_DIR=$(pwd)/pg_install/v16/lib - export PQ_LIB_DIR ${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests # Do install *before* running rust tests because they might recompile the @@ -217,8 +215,6 @@ jobs: env: NEXTEST_RETRIES: 3 run: | - PQ_LIB_DIR=$(pwd)/pg_install/v16/lib - export PQ_LIB_DIR LD_LIBRARY_PATH=$(pwd)/pg_install/v17/lib export LD_LIBRARY_PATH diff --git a/.github/workflows/build-macos.yml b/.github/workflows/build-macos.yml index 01d82a1ed2..347a511e98 100644 --- a/.github/workflows/build-macos.yml +++ b/.github/workflows/build-macos.yml @@ -235,7 +235,7 @@ jobs: echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV - name: Run cargo build (only for v17) - run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release -j$(sysctl -n hw.ncpu) + run: cargo build --all --release -j$(sysctl -n hw.ncpu) - name: Check that no warnings are produced (only for v17) run: ./run_clippy.sh diff --git a/.github/workflows/neon_extra_builds.yml b/.github/workflows/neon_extra_builds.yml index 5b5910badf..f077e04d1c 100644 --- a/.github/workflows/neon_extra_builds.yml +++ b/.github/workflows/neon_extra_builds.yml @@ -114,7 +114,7 @@ jobs: run: make walproposer-lib -j$(nproc) - name: Produce the build stats - run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release --timings -j$(nproc) + run: cargo build --all --release --timings -j$(nproc) - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v4 diff --git a/Cargo.lock b/Cargo.lock index cdc620e485..0133c83564 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -932,6 +932,18 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bb8" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8" +dependencies = [ + "async-trait", + "futures-util", + "parking_lot 0.12.1", + "tokio", +] + [[package]] name = "bcder" version = "0.7.4" @@ -1790,11 +1802,24 @@ dependencies = [ "chrono", "diesel_derives", "itoa", - "pq-sys", - "r2d2", "serde_json", ] +[[package]] +name = "diesel-async" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51a307ac00f7c23f526a04a77761a0519b9f0eb2838ebf5b905a58580095bdcb" +dependencies = [ + "async-trait", + "bb8", + "diesel", + "futures-util", + "scoped-futures", + "tokio", + "tokio-postgres", +] + [[package]] name = "diesel_derives" version = "2.2.1" @@ -4645,15 +4670,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "pq-sys" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6cc05d7ea95200187117196eee9edd0644424911821aeb28a18ce60ea0b8793" -dependencies = [ - "vcpkg", -] - [[package]] name = "pq_proto" version = "0.1.0" @@ -4966,17 +4982,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "r2d2" -version = "0.8.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" -dependencies = [ - "log", - "parking_lot 0.12.1", - "scheduled-thread-pool", -] - [[package]] name = "rand" version = "0.7.3" @@ -5797,12 +5802,12 @@ dependencies = [ ] [[package]] -name = "scheduled-thread-pool" -version = "0.2.7" +name = "scoped-futures" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091" dependencies = [ - "parking_lot 0.12.1", + "pin-project-lite", ] [[package]] @@ -6337,6 +6342,7 @@ dependencies = [ "clap", "control_plane", "diesel", + "diesel-async", "diesel_migrations", "fail", "futures", @@ -6351,10 +6357,12 @@ dependencies = [ "pageserver_api", "pageserver_client", "postgres_connection", - "r2d2", "rand 0.8.5", "reqwest", "routerify", + "rustls 0.23.18", + "rustls-native-certs 0.8.0", + "scoped-futures", "scopeguard", "serde", "serde_json", @@ -6362,6 +6370,8 @@ dependencies = [ "strum_macros", "thiserror 1.0.69", "tokio", + "tokio-postgres", + "tokio-postgres-rustls", "tokio-util", "tracing", "utils", @@ -6604,7 +6614,7 @@ dependencies = [ "fastrand 2.2.0", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -7562,12 +7572,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "version_check" version = "0.9.4" diff --git a/Dockerfile b/Dockerfile index f80666529b..7ba54c8ca5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -45,7 +45,7 @@ COPY --chown=nonroot . . ARG ADDITIONAL_RUSTFLAGS RUN set -e \ - && PQ_LIB_DIR=$(pwd)/pg_install/v${STABLE_PG_VERSION}/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \ + && RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \ --bin pg_sni_router \ --bin pageserver \ --bin pagectl \ diff --git a/Makefile b/Makefile index 22ebfea7d5..d1238caebf 100644 --- a/Makefile +++ b/Makefile @@ -64,8 +64,6 @@ CARGO_BUILD_FLAGS += $(filter -j1,$(MAKEFLAGS)) CARGO_CMD_PREFIX += $(if $(filter n,$(MAKEFLAGS)),,+) # Force cargo not to print progress bar CARGO_CMD_PREFIX += CARGO_TERM_PROGRESS_WHEN=never CI=1 -# Set PQ_LIB_DIR to make sure `storage_controller` get linked with bundled libpq (through diesel) -CARGO_CMD_PREFIX += PQ_LIB_DIR=$(POSTGRES_INSTALL_DIR)/v16/lib CACHEDIR_TAG_CONTENTS := "Signature: 8a477f597d28d172789f06886806bc55" diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index caaa22d0a5..63f43cdf62 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -32,6 +32,7 @@ postgres_connection.workspace = true rand.workspace = true reqwest = { workspace = true, features = ["stream"] } routerify.workspace = true +rustls-native-certs.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true @@ -39,18 +40,20 @@ tokio.workspace = true tokio-util.workspace = true tracing.workspace = true measured.workspace = true +rustls.workspace = true scopeguard.workspace = true strum.workspace = true strum_macros.workspace = true +tokio-postgres.workspace = true +tokio-postgres-rustls.workspace = true diesel = { version = "2.2.6", features = [ "serde_json", - "postgres", - "r2d2", "chrono", ] } +diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper"] } diesel_migrations = { version = "2.2.0" } -r2d2 = { version = "0.8.10" } +scoped-futures = "0.1.4" utils = { path = "../libs/utils/" } metrics = { path = "../libs/metrics/" } diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 801409d612..659c088d51 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -308,7 +308,7 @@ async fn async_main() -> anyhow::Result<()> { // Validate that we can connect to the database Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?; - let persistence = Arc::new(Persistence::new(secrets.database_url)); + let persistence = Arc::new(Persistence::new(secrets.database_url).await); let service = Service::spawn(config, persistence.clone()).await?; diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 37bfaf1139..880f203064 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1,13 +1,20 @@ pub(crate) mod split_state; use std::collections::HashMap; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use self::split_state::SplitState; -use diesel::pg::PgConnection; use diesel::prelude::*; -use diesel::Connection; +use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::pooled_connection::bb8::Pool; +use diesel_async::pooled_connection::AsyncDieselConnectionManager; +use diesel_async::pooled_connection::ManagerConfig; +use diesel_async::AsyncPgConnection; +use diesel_async::RunQueryDsl; +use futures::future::BoxFuture; +use futures::FutureExt; use itertools::Itertools; use pageserver_api::controller_api::AvailabilityZone; use pageserver_api::controller_api::MetadataHealthRecord; @@ -20,6 +27,8 @@ use pageserver_api::shard::ShardConfigError; use pageserver_api::shard::ShardIdentity; use pageserver_api::shard::ShardStripeSize; use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId}; +use rustls::crypto::ring; +use scoped_futures::ScopedBoxFuture; use serde::{Deserialize, Serialize}; use utils::generation::Generation; use utils::id::{NodeId, TenantId}; @@ -60,7 +69,7 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); /// updated, and reads of nodes are always from memory, not the database. We only require that /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline. pub struct Persistence { - connection_pool: diesel::r2d2::Pool>, + connection_pool: Pool, } /// Legacy format, for use in JSON compat objects in test environment @@ -76,7 +85,7 @@ pub(crate) enum DatabaseError { #[error(transparent)] Connection(#[from] diesel::result::ConnectionError), #[error(transparent)] - ConnectionPool(#[from] r2d2::Error), + ConnectionPool(#[from] diesel_async::pooled_connection::bb8::RunError), #[error("Logical error: {0}")] Logical(String), #[error("Migration error: {0}")] @@ -124,6 +133,7 @@ pub(crate) enum AbortShardSplitStatus { pub(crate) type DatabaseResult = Result; /// Some methods can operate on either a whole tenant or a single shard +#[derive(Clone)] pub(crate) enum TenantFilter { Tenant(TenantId), Shard(TenantShardId), @@ -136,6 +146,11 @@ pub(crate) struct ShardGenerationState { pub(crate) generation_pageserver: Option, } +// A generous allowance for how many times we may retry serializable transactions +// before giving up. This is not expected to be hit: it is a defensive measure in case we +// somehow engineer a situation where duelling transactions might otherwise live-lock. +const MAX_RETRIES: usize = 128; + impl Persistence { // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect. @@ -145,12 +160,18 @@ impl Persistence { const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60); - pub fn new(database_url: String) -> Self { - let manager = diesel::r2d2::ConnectionManager::::new(database_url); + pub async fn new(database_url: String) -> Self { + let mut mgr_config = ManagerConfig::default(); + mgr_config.custom_setup = Box::new(establish_connection_rustls); + + let manager = AsyncDieselConnectionManager::::new_with_config( + database_url, + mgr_config, + ); // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time // to execute queries (database queries are not generally on latency-sensitive paths). - let connection_pool = diesel::r2d2::Pool::builder() + let connection_pool = Pool::builder() .max_size(Self::MAX_CONNECTIONS) .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME)) .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT)) @@ -158,6 +179,7 @@ impl Persistence { .min_idle(Some(1)) .test_on_check_out(true) .build(manager) + .await .expect("Could not build connection pool"); Self { connection_pool } @@ -171,7 +193,7 @@ impl Persistence { ) -> Result<(), diesel::ConnectionError> { let started_at = Instant::now(); loop { - match PgConnection::establish(database_url) { + match establish_connection_rustls(database_url).await { Ok(_) => { tracing::info!("Connected to database."); return Ok(()); @@ -192,57 +214,22 @@ impl Persistence { pub(crate) async fn migration_run(&self) -> DatabaseResult<()> { use diesel_migrations::{HarnessWithOutput, MigrationHarness}; - self.with_conn(move |conn| -> DatabaseResult<()> { - HarnessWithOutput::write_to_stdout(conn) - .run_pending_migrations(MIGRATIONS) - .map(|_| ()) - .map_err(|e| DatabaseError::Migration(e.to_string())) - }) - .await - } - - /// Wraps `with_conn` in order to collect latency and error metrics - async fn with_measured_conn(&self, op: DatabaseOperation, func: F) -> DatabaseResult - where - F: Fn(&mut PgConnection) -> DatabaseResult + Send + 'static, - R: Send + 'static, - { - let latency = &METRICS_REGISTRY - .metrics_group - .storage_controller_database_query_latency; - let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup { operation: op }); - - let res = self.with_conn(func).await; - - if let Err(err) = &res { - let error_counter = &METRICS_REGISTRY - .metrics_group - .storage_controller_database_query_error; - error_counter.inc(DatabaseQueryErrorLabelGroup { - error_type: err.error_label(), - operation: op, - }) - } - - res - } - - /// Call the provided function in a tokio blocking thread, with a Diesel database connection. - async fn with_conn(&self, func: F) -> DatabaseResult - where - F: Fn(&mut PgConnection) -> DatabaseResult + Send + 'static, - R: Send + 'static, - { - // A generous allowance for how many times we may retry serializable transactions - // before giving up. This is not expected to be hit: it is a defensive measure in case we - // somehow engineer a situation where duelling transactions might otherwise live-lock. - const MAX_RETRIES: usize = 128; - - let mut conn = self.connection_pool.get()?; - tokio::task::spawn_blocking(move || -> DatabaseResult { + // Can't use self.with_conn here as we do spawn_blocking which requires static. + let conn = self + .connection_pool + .dedicated_connection() + .await + .map_err(|e| DatabaseError::Migration(e.to_string()))?; + let mut async_wrapper: AsyncConnectionWrapper = + AsyncConnectionWrapper::from(conn); + tokio::task::spawn_blocking(move || { let mut retry_count = 0; loop { - match conn.build_transaction().serializable().run(|c| func(c)) { + let result = HarnessWithOutput::write_to_stdout(&mut async_wrapper) + .run_pending_migrations(MIGRATIONS) + .map(|_| ()) + .map_err(|e| DatabaseError::Migration(e.to_string())); + match result { Ok(r) => break Ok(r), Err( err @ DatabaseError::Query(diesel::result::Error::DatabaseError( @@ -271,33 +258,112 @@ impl Persistence { } }) .await - .expect("Task panic") + .map_err(|e| DatabaseError::Migration(e.to_string()))??; + Ok(()) + } + + /// Wraps `with_conn` in order to collect latency and error metrics + async fn with_measured_conn<'a, 'b, F, R>( + &self, + op: DatabaseOperation, + func: F, + ) -> DatabaseResult + where + F: for<'r> Fn(&'r mut AsyncPgConnection) -> ScopedBoxFuture<'b, 'r, DatabaseResult> + + Send + + std::marker::Sync + + 'a, + R: Send + 'b, + { + let latency = &METRICS_REGISTRY + .metrics_group + .storage_controller_database_query_latency; + let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup { operation: op }); + + let res = self.with_conn(func).await; + + if let Err(err) = &res { + let error_counter = &METRICS_REGISTRY + .metrics_group + .storage_controller_database_query_error; + error_counter.inc(DatabaseQueryErrorLabelGroup { + error_type: err.error_label(), + operation: op, + }) + } + + res + } + + /// Call the provided function with a Diesel database connection in a retry loop + async fn with_conn<'a, 'b, F, R>(&self, func: F) -> DatabaseResult + where + F: for<'r> Fn(&'r mut AsyncPgConnection) -> ScopedBoxFuture<'b, 'r, DatabaseResult> + + Send + + std::marker::Sync + + 'a, + R: Send + 'b, + { + let mut retry_count = 0; + loop { + let mut conn = self.connection_pool.get().await?; + match conn + .build_transaction() + .serializable() + .run(|c| func(c)) + .await + { + Ok(r) => break Ok(r), + Err( + err @ DatabaseError::Query(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::SerializationFailure, + _, + )), + ) => { + retry_count += 1; + if retry_count > MAX_RETRIES { + tracing::error!( + "Exceeded max retries on SerializationFailure errors: {err:?}" + ); + break Err(err); + } else { + // Retry on serialization errors: these are expected, because even though our + // transactions don't fight for the same rows, they will occasionally collide + // on index pages (e.g. increment_generation for unrelated shards can collide) + tracing::debug!("Retrying transaction on serialization failure {err:?}"); + continue; + } + } + Err(e) => break Err(e), + } + } } /// When a node is first registered, persist it before using it for anything pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> { - let np = node.to_persistent(); - self.with_measured_conn( - DatabaseOperation::InsertNode, - move |conn| -> DatabaseResult<()> { + let np = &node.to_persistent(); + self.with_measured_conn(DatabaseOperation::InsertNode, move |conn| { + Box::pin(async move { diesel::insert_into(crate::schema::nodes::table) - .values(&np) - .execute(conn)?; + .values(np) + .execute(conn) + .await?; Ok(()) - }, - ) + }) + }) .await } /// At startup, populate the list of nodes which our shards may be placed on pub(crate) async fn list_nodes(&self) -> DatabaseResult> { let nodes: Vec = self - .with_measured_conn( - DatabaseOperation::ListNodes, - move |conn| -> DatabaseResult<_> { - Ok(crate::schema::nodes::table.load::(conn)?) - }, - ) + .with_measured_conn(DatabaseOperation::ListNodes, move |conn| { + Box::pin(async move { + Ok(crate::schema::nodes::table + .load::(conn) + .await?) + }) + }) .await?; tracing::info!("list_nodes: loaded {} nodes", nodes.len()); @@ -313,11 +379,14 @@ impl Persistence { use crate::schema::nodes::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::UpdateNode, move |conn| { - let updated = diesel::update(nodes) - .filter(node_id.eq(input_node_id.0 as i64)) - .set((scheduling_policy.eq(String::from(input_scheduling)),)) - .execute(conn)?; - Ok(updated) + Box::pin(async move { + let updated = diesel::update(nodes) + .filter(node_id.eq(input_node_id.0 as i64)) + .set((scheduling_policy.eq(String::from(input_scheduling)),)) + .execute(conn) + .await?; + Ok(updated) + }) }) .await?; @@ -339,17 +408,16 @@ impl Persistence { &self, ) -> DatabaseResult> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn( - DatabaseOperation::ListTenantShards, - move |conn| -> DatabaseResult<_> { + self.with_measured_conn(DatabaseOperation::ListTenantShards, move |conn| { + Box::pin(async move { let query = tenant_shards.filter( placement_policy.ne(serde_json::to_string(&PlacementPolicy::Detached).unwrap()), ); - let result = query.load::(conn)?; + let result = query.load::(conn).await?; Ok(result) - }, - ) + }) + }) .await } @@ -359,15 +427,14 @@ impl Persistence { filter_tenant_id: TenantId, ) -> DatabaseResult> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn( - DatabaseOperation::LoadTenant, - move |conn| -> DatabaseResult<_> { + self.with_measured_conn(DatabaseOperation::LoadTenant, move |conn| { + Box::pin(async move { let query = tenant_shards.filter(tenant_id.eq(filter_tenant_id.to_string())); - let result = query.load::(conn)?; + let result = query.load::(conn).await?; Ok(result) - }, - ) + }) + }) .await } @@ -393,19 +460,22 @@ impl Persistence { }) .collect::>(); - self.with_measured_conn( - DatabaseOperation::InsertTenantShards, - move |conn| -> DatabaseResult<()> { + let shards = &shards; + let metadata_health_records = &metadata_health_records; + self.with_measured_conn(DatabaseOperation::InsertTenantShards, move |conn| { + Box::pin(async move { diesel::insert_into(tenant_shards::table) - .values(&shards) - .execute(conn)?; + .values(shards) + .execute(conn) + .await?; diesel::insert_into(metadata_health::table) - .values(&metadata_health_records) - .execute(conn)?; + .values(metadata_health_records) + .execute(conn) + .await?; Ok(()) - }, - ) + }) + }) .await } @@ -413,31 +483,31 @@ impl Persistence { /// the tenant from memory on this server. pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn( - DatabaseOperation::DeleteTenant, - move |conn| -> DatabaseResult<()> { + self.with_measured_conn(DatabaseOperation::DeleteTenant, move |conn| { + Box::pin(async move { // `metadata_health` status (if exists) is also deleted based on the cascade behavior. diesel::delete(tenant_shards) .filter(tenant_id.eq(del_tenant_id.to_string())) - .execute(conn)?; + .execute(conn) + .await?; Ok(()) - }, - ) + }) + }) .await } pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> { use crate::schema::nodes::dsl::*; - self.with_measured_conn( - DatabaseOperation::DeleteNode, - move |conn| -> DatabaseResult<()> { + self.with_measured_conn(DatabaseOperation::DeleteNode, move |conn| { + Box::pin(async move { diesel::delete(nodes) .filter(node_id.eq(del_node_id.0 as i64)) - .execute(conn)?; + .execute(conn) + .await?; Ok(()) - }, - ) + }) + }) .await } @@ -454,34 +524,41 @@ impl Persistence { use crate::schema::tenant_shards::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::ReAttach, move |conn| { - let rows_updated = diesel::update(tenant_shards) - .filter(generation_pageserver.eq(input_node_id.0 as i64)) - .set(generation.eq(generation + 1)) - .execute(conn)?; + Box::pin(async move { + let rows_updated = diesel::update(tenant_shards) + .filter(generation_pageserver.eq(input_node_id.0 as i64)) + .set(generation.eq(generation + 1)) + .execute(conn) + .await?; - tracing::info!("Incremented {} tenants' generations", rows_updated); + tracing::info!("Incremented {} tenants' generations", rows_updated); - // TODO: UPDATE+SELECT in one query + // TODO: UPDATE+SELECT in one query - let updated = tenant_shards - .filter(generation_pageserver.eq(input_node_id.0 as i64)) - .select(TenantShardPersistence::as_select()) - .load(conn)?; + let updated = tenant_shards + .filter(generation_pageserver.eq(input_node_id.0 as i64)) + .select(TenantShardPersistence::as_select()) + .load(conn) + .await?; - // If the node went through a drain and restart phase before re-attaching, - // then reset it's node scheduling policy to active. - diesel::update(nodes) - .filter(node_id.eq(input_node_id.0 as i64)) - .filter( - scheduling_policy - .eq(String::from(NodeSchedulingPolicy::PauseForRestart)) - .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining))) - .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))), - ) - .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active))) - .execute(conn)?; + // If the node went through a drain and restart phase before re-attaching, + // then reset it's node scheduling policy to active. + diesel::update(nodes) + .filter(node_id.eq(input_node_id.0 as i64)) + .filter( + scheduling_policy + .eq(String::from(NodeSchedulingPolicy::PauseForRestart)) + .or(scheduling_policy + .eq(String::from(NodeSchedulingPolicy::Draining))) + .or(scheduling_policy + .eq(String::from(NodeSchedulingPolicy::Filling))), + ) + .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active))) + .execute(conn) + .await?; - Ok(updated) + Ok(updated) + }) }) .await?; @@ -518,19 +595,22 @@ impl Persistence { use crate::schema::tenant_shards::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::IncrementGeneration, move |conn| { - let updated = diesel::update(tenant_shards) - .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) - .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) - .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .set(( - generation.eq(generation + 1), - generation_pageserver.eq(node_id.0 as i64), - )) - // TODO: only returning() the generation column - .returning(TenantShardPersistence::as_returning()) - .get_result(conn)?; + Box::pin(async move { + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) + .set(( + generation.eq(generation + 1), + generation_pageserver.eq(node_id.0 as i64), + )) + // TODO: only returning() the generation column + .returning(TenantShardPersistence::as_returning()) + .get_result(conn) + .await?; - Ok(updated) + Ok(updated) + }) }) .await?; @@ -562,12 +642,15 @@ impl Persistence { use crate::schema::tenant_shards::dsl::*; let rows = self .with_measured_conn(DatabaseOperation::TenantGenerations, move |conn| { - let result = tenant_shards - .filter(tenant_id.eq(filter_tenant_id.to_string())) - .select(TenantShardPersistence::as_select()) - .order(shard_number) - .load(conn)?; - Ok(result) + Box::pin(async move { + let result = tenant_shards + .filter(tenant_id.eq(filter_tenant_id.to_string())) + .select(TenantShardPersistence::as_select()) + .order(shard_number) + .load(conn) + .await?; + Ok(result) + }) }) .await?; @@ -615,15 +698,18 @@ impl Persistence { break; } + let in_clause = &in_clause; let chunk_rows = self .with_measured_conn(DatabaseOperation::ShardGenerations, move |conn| { - // diesel doesn't support multi-column IN queries, so we compose raw SQL. No escaping is required because - // the inputs are strongly typed and cannot carry any user-supplied raw string content. - let result : Vec = diesel::sql_query( - format!("SELECT * from tenant_shards where (tenant_id, shard_number, shard_count) in ({in_clause});").as_str() - ).load(conn)?; + Box::pin(async move { + // diesel doesn't support multi-column IN queries, so we compose raw SQL. No escaping is required because + // the inputs are strongly typed and cannot carry any user-supplied raw string content. + let result : Vec = diesel::sql_query( + format!("SELECT * from tenant_shards where (tenant_id, shard_number, shard_count) in ({in_clause});").as_str() + ).load(conn).await?; - Ok(result) + Ok(result) + }) }) .await?; rows.extend(chunk_rows.into_iter()) @@ -657,51 +743,58 @@ impl Persistence { ) -> DatabaseResult<()> { use crate::schema::tenant_shards::dsl::*; + let tenant = &tenant; + let input_placement_policy = &input_placement_policy; + let input_config = &input_config; + let input_generation = &input_generation; + let input_scheduling_policy = &input_scheduling_policy; self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| { - let query = match tenant { - TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards) - .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) - .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) - .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .into_boxed(), - TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards) - .filter(tenant_id.eq(input_tenant_id.to_string())) - .into_boxed(), - }; + Box::pin(async move { + let query = match tenant { + TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) + .into_boxed(), + TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards) + .filter(tenant_id.eq(input_tenant_id.to_string())) + .into_boxed(), + }; - // Clear generation_pageserver if we are moving into a state where we won't have - // any attached pageservers. - let input_generation_pageserver = match input_placement_policy { - None | Some(PlacementPolicy::Attached(_)) => None, - Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) => Some(None), - }; + // Clear generation_pageserver if we are moving into a state where we won't have + // any attached pageservers. + let input_generation_pageserver = match input_placement_policy { + None | Some(PlacementPolicy::Attached(_)) => None, + Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) => Some(None), + }; - #[derive(AsChangeset)] - #[diesel(table_name = crate::schema::tenant_shards)] - struct ShardUpdate { - generation: Option, - placement_policy: Option, - config: Option, - scheduling_policy: Option, - generation_pageserver: Option>, - } + #[derive(AsChangeset)] + #[diesel(table_name = crate::schema::tenant_shards)] + struct ShardUpdate { + generation: Option, + placement_policy: Option, + config: Option, + scheduling_policy: Option, + generation_pageserver: Option>, + } - let update = ShardUpdate { - generation: input_generation.map(|g| g.into().unwrap() as i32), - placement_policy: input_placement_policy - .as_ref() - .map(|p| serde_json::to_string(&p).unwrap()), - config: input_config - .as_ref() - .map(|c| serde_json::to_string(&c).unwrap()), - scheduling_policy: input_scheduling_policy - .map(|p| serde_json::to_string(&p).unwrap()), - generation_pageserver: input_generation_pageserver, - }; + let update = ShardUpdate { + generation: input_generation.map(|g| g.into().unwrap() as i32), + placement_policy: input_placement_policy + .as_ref() + .map(|p| serde_json::to_string(&p).unwrap()), + config: input_config + .as_ref() + .map(|c| serde_json::to_string(&c).unwrap()), + scheduling_policy: input_scheduling_policy + .map(|p| serde_json::to_string(&p).unwrap()), + generation_pageserver: input_generation_pageserver, + }; - query.set(update).execute(conn)?; + query.set(update).execute(conn).await?; - Ok(()) + Ok(()) + }) }) .await?; @@ -715,23 +808,27 @@ impl Persistence { ) -> DatabaseResult)>> { use crate::schema::tenant_shards::dsl::*; + let preferred_azs = preferred_azs.as_slice(); self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| { - let mut shards_updated = Vec::default(); + Box::pin(async move { + let mut shards_updated = Vec::default(); - for (tenant_shard_id, preferred_az) in preferred_azs.iter() { - let updated = diesel::update(tenant_shards) - .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) - .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) - .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .set(preferred_az_id.eq(preferred_az.as_ref().map(|az| az.0.clone()))) - .execute(conn)?; + for (tenant_shard_id, preferred_az) in preferred_azs.iter() { + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) + .set(preferred_az_id.eq(preferred_az.as_ref().map(|az| az.0.clone()))) + .execute(conn) + .await?; - if updated == 1 { - shards_updated.push((*tenant_shard_id, preferred_az.clone())); + if updated == 1 { + shards_updated.push((*tenant_shard_id, preferred_az.clone())); + } } - } - Ok(shards_updated) + Ok(shards_updated) + }) }) .await } @@ -739,17 +836,21 @@ impl Persistence { pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> { use crate::schema::tenant_shards::dsl::*; self.with_measured_conn(DatabaseOperation::Detach, move |conn| { - let updated = diesel::update(tenant_shards) - .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) - .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) - .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .set(( - generation_pageserver.eq(Option::::None), - placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()), - )) - .execute(conn)?; + Box::pin(async move { + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) + .set(( + generation_pageserver.eq(Option::::None), + placement_policy + .eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()), + )) + .execute(conn) + .await?; - Ok(updated) + Ok(updated) + }) }) .await?; @@ -768,14 +869,16 @@ impl Persistence { parent_to_children: Vec<(TenantShardId, Vec)>, ) -> DatabaseResult<()> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> { + let parent_to_children = parent_to_children.as_slice(); + self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| { + Box::pin(async move { // Mark parent shards as splitting let updated = diesel::update(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .filter(shard_count.eq(old_shard_count.literal() as i32)) .set((splitting.eq(1),)) - .execute(conn)?; + .execute(conn).await?; if u8::try_from(updated) .map_err(|_| DatabaseError::Logical( format!("Overflow existing shard count {} while splitting", updated)) @@ -788,7 +891,7 @@ impl Persistence { } // FIXME: spurious clone to sidestep closure move rules - let parent_to_children = parent_to_children.clone(); + let parent_to_children = parent_to_children.to_vec(); // Insert child shards for (parent_shard_id, children) in parent_to_children { @@ -796,7 +899,7 @@ impl Persistence { .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string())) .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32)) .filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32)) - .load::(conn)?; + .load::(conn).await?; let parent = if parent.len() != 1 { return Err(DatabaseError::Logical(format!( "Parent shard {parent_shard_id} not found" @@ -811,12 +914,13 @@ impl Persistence { debug_assert!(shard.splitting == SplitState::Splitting); diesel::insert_into(tenant_shards) .values(shard) - .execute(conn)?; + .execute(conn).await?; } } Ok(()) }) + }) .await } @@ -828,25 +932,26 @@ impl Persistence { old_shard_count: ShardCount, ) -> DatabaseResult<()> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn( - DatabaseOperation::CompleteShardSplit, - move |conn| -> DatabaseResult<()> { + self.with_measured_conn(DatabaseOperation::CompleteShardSplit, move |conn| { + Box::pin(async move { // Drop parent shards diesel::delete(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .filter(shard_count.eq(old_shard_count.literal() as i32)) - .execute(conn)?; + .execute(conn) + .await?; // Clear sharding flag let updated = diesel::update(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .set((splitting.eq(0),)) - .execute(conn)?; + .execute(conn) + .await?; debug_assert!(updated > 0); Ok(()) - }, - ) + }) + }) .await } @@ -858,15 +963,15 @@ impl Persistence { new_shard_count: ShardCount, ) -> DatabaseResult { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn( - DatabaseOperation::AbortShardSplit, - move |conn| -> DatabaseResult { + self.with_measured_conn(DatabaseOperation::AbortShardSplit, move |conn| { + Box::pin(async move { // Clear the splitting state on parent shards let updated = diesel::update(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .filter(shard_count.ne(new_shard_count.literal() as i32)) .set((splitting.eq(0),)) - .execute(conn)?; + .execute(conn) + .await?; // Parent shards are already gone: we cannot abort. if updated == 0 { @@ -886,11 +991,12 @@ impl Persistence { diesel::delete(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .filter(shard_count.eq(new_shard_count.literal() as i32)) - .execute(conn)?; + .execute(conn) + .await?; Ok(AbortShardSplitStatus::Aborted) - }, - ) + }) + }) .await } @@ -906,25 +1012,28 @@ impl Persistence { ) -> DatabaseResult<()> { use crate::schema::metadata_health::dsl::*; - self.with_measured_conn( - DatabaseOperation::UpdateMetadataHealth, - move |conn| -> DatabaseResult<_> { + let healthy_records = healthy_records.as_slice(); + let unhealthy_records = unhealthy_records.as_slice(); + self.with_measured_conn(DatabaseOperation::UpdateMetadataHealth, move |conn| { + Box::pin(async move { diesel::insert_into(metadata_health) - .values(&healthy_records) + .values(healthy_records) .on_conflict((tenant_id, shard_number, shard_count)) .do_update() .set((healthy.eq(true), last_scrubbed_at.eq(now))) - .execute(conn)?; + .execute(conn) + .await?; diesel::insert_into(metadata_health) - .values(&unhealthy_records) + .values(unhealthy_records) .on_conflict((tenant_id, shard_number, shard_count)) .do_update() .set((healthy.eq(false), last_scrubbed_at.eq(now))) - .execute(conn)?; + .execute(conn) + .await?; Ok(()) - }, - ) + }) + }) .await } @@ -933,15 +1042,13 @@ impl Persistence { pub(crate) async fn list_metadata_health_records( &self, ) -> DatabaseResult> { - self.with_measured_conn( - DatabaseOperation::ListMetadataHealth, - move |conn| -> DatabaseResult<_> { - Ok( - crate::schema::metadata_health::table - .load::(conn)?, - ) - }, - ) + self.with_measured_conn(DatabaseOperation::ListMetadataHealth, move |conn| { + Box::pin(async { + Ok(crate::schema::metadata_health::table + .load::(conn) + .await?) + }) + }) .await } @@ -953,10 +1060,15 @@ impl Persistence { use crate::schema::metadata_health::dsl::*; self.with_measured_conn( DatabaseOperation::ListMetadataHealthUnhealthy, - move |conn| -> DatabaseResult<_> { - Ok(crate::schema::metadata_health::table - .filter(healthy.eq(false)) - .load::(conn)?) + move |conn| { + Box::pin(async { + DatabaseResult::Ok( + crate::schema::metadata_health::table + .filter(healthy.eq(false)) + .load::(conn) + .await?, + ) + }) }, ) .await @@ -970,15 +1082,14 @@ impl Persistence { ) -> DatabaseResult> { use crate::schema::metadata_health::dsl::*; - self.with_measured_conn( - DatabaseOperation::ListMetadataHealthOutdated, - move |conn| -> DatabaseResult<_> { + self.with_measured_conn(DatabaseOperation::ListMetadataHealthOutdated, move |conn| { + Box::pin(async move { let query = metadata_health.filter(last_scrubbed_at.lt(earlier)); - let res = query.load::(conn)?; + let res = query.load::(conn).await?; Ok(res) - }, - ) + }) + }) .await } @@ -986,12 +1097,13 @@ impl Persistence { /// It is an error for the table to contain more than one entry. pub(crate) async fn get_leader(&self) -> DatabaseResult> { let mut leader: Vec = self - .with_measured_conn( - DatabaseOperation::GetLeader, - move |conn| -> DatabaseResult<_> { - Ok(crate::schema::controllers::table.load::(conn)?) - }, - ) + .with_measured_conn(DatabaseOperation::GetLeader, move |conn| { + Box::pin(async move { + Ok(crate::schema::controllers::table + .load::(conn) + .await?) + }) + }) .await?; if leader.len() > 1 { @@ -1014,26 +1126,33 @@ impl Persistence { use crate::schema::controllers::dsl::*; let updated = self - .with_measured_conn( - DatabaseOperation::UpdateLeader, - move |conn| -> DatabaseResult { + .with_measured_conn(DatabaseOperation::UpdateLeader, move |conn| { + let prev = prev.clone(); + let new = new.clone(); + Box::pin(async move { let updated = match &prev { - Some(prev) => diesel::update(controllers) - .filter(address.eq(prev.address.clone())) - .filter(started_at.eq(prev.started_at)) - .set(( - address.eq(new.address.clone()), - started_at.eq(new.started_at), - )) - .execute(conn)?, - None => diesel::insert_into(controllers) - .values(new.clone()) - .execute(conn)?, + Some(prev) => { + diesel::update(controllers) + .filter(address.eq(prev.address.clone())) + .filter(started_at.eq(prev.started_at)) + .set(( + address.eq(new.address.clone()), + started_at.eq(new.started_at), + )) + .execute(conn) + .await? + } + None => { + diesel::insert_into(controllers) + .values(new.clone()) + .execute(conn) + .await? + } }; Ok(updated) - }, - ) + }) + }) .await?; if updated == 0 { @@ -1048,12 +1167,13 @@ impl Persistence { /// At startup, populate the list of nodes which our shards may be placed on pub(crate) async fn list_safekeepers(&self) -> DatabaseResult> { let safekeepers: Vec = self - .with_measured_conn( - DatabaseOperation::ListNodes, - move |conn| -> DatabaseResult<_> { - Ok(crate::schema::safekeepers::table.load::(conn)?) - }, - ) + .with_measured_conn(DatabaseOperation::ListNodes, move |conn| { + Box::pin(async move { + Ok(crate::schema::safekeepers::table + .load::(conn) + .await?) + }) + }) .await?; tracing::info!("list_safekeepers: loaded {} nodes", safekeepers.len()); @@ -1066,11 +1186,14 @@ impl Persistence { id: i64, ) -> Result { use crate::schema::safekeepers::dsl::{id as id_column, safekeepers}; - self.with_conn(move |conn| -> DatabaseResult { - Ok(safekeepers - .filter(id_column.eq(&id)) - .select(SafekeeperPersistence::as_select()) - .get_result(conn)?) + self.with_conn(move |conn| { + Box::pin(async move { + Ok(safekeepers + .filter(id_column.eq(&id)) + .select(SafekeeperPersistence::as_select()) + .get_result(conn) + .await?) + }) }) .await } @@ -1081,26 +1204,30 @@ impl Persistence { ) -> Result<(), DatabaseError> { use crate::schema::safekeepers::dsl::*; - self.with_conn(move |conn| -> DatabaseResult<()> { - let bind = record - .as_insert_or_update() - .map_err(|e| DatabaseError::Logical(format!("{e}")))?; + self.with_conn(move |conn| { + let record = record.clone(); + Box::pin(async move { + let bind = record + .as_insert_or_update() + .map_err(|e| DatabaseError::Logical(format!("{e}")))?; - let inserted_updated = diesel::insert_into(safekeepers) - .values(&bind) - .on_conflict(id) - .do_update() - .set(&bind) - .execute(conn)?; + let inserted_updated = diesel::insert_into(safekeepers) + .values(&bind) + .on_conflict(id) + .do_update() + .set(&bind) + .execute(conn) + .await?; - if inserted_updated != 1 { - return Err(DatabaseError::Logical(format!( - "unexpected number of rows ({})", - inserted_updated - ))); - } + if inserted_updated != 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({})", + inserted_updated + ))); + } - Ok(()) + Ok(()) + }) }) .await } @@ -1112,31 +1239,73 @@ impl Persistence { ) -> Result<(), DatabaseError> { use crate::schema::safekeepers::dsl::*; - self.with_conn(move |conn| -> DatabaseResult<()> { - #[derive(Insertable, AsChangeset)] - #[diesel(table_name = crate::schema::safekeepers)] - struct UpdateSkSchedulingPolicy<'a> { - id: i64, - scheduling_policy: &'a str, - } - let scheduling_policy_ = String::from(scheduling_policy_); + self.with_conn(move |conn| { + Box::pin(async move { + #[derive(Insertable, AsChangeset)] + #[diesel(table_name = crate::schema::safekeepers)] + struct UpdateSkSchedulingPolicy<'a> { + id: i64, + scheduling_policy: &'a str, + } + let scheduling_policy_ = String::from(scheduling_policy_); - let rows_affected = diesel::update(safekeepers.filter(id.eq(id_))) - .set(scheduling_policy.eq(scheduling_policy_)) - .execute(conn)?; + let rows_affected = diesel::update(safekeepers.filter(id.eq(id_))) + .set(scheduling_policy.eq(scheduling_policy_)) + .execute(conn) + .await?; - if rows_affected != 1 { - return Err(DatabaseError::Logical(format!( - "unexpected number of rows ({rows_affected})", - ))); - } + if rows_affected != 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({rows_affected})", + ))); + } - Ok(()) + Ok(()) + }) }) .await } } +pub(crate) fn load_certs() -> anyhow::Result> { + let der_certs = rustls_native_certs::load_native_certs(); + + if !der_certs.errors.is_empty() { + anyhow::bail!("could not parse certificates: {:?}", der_certs.errors); + } + + let mut store = rustls::RootCertStore::empty(); + store.add_parsable_certificates(der_certs.certs); + Ok(Arc::new(store)) +} + +/// Loads the root certificates and constructs a client config suitable for connecting. +/// This function is blocking. +pub fn client_config_with_root_certs() -> anyhow::Result { + Ok( + rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider())) + .with_safe_default_protocol_versions() + .expect("ring should support the default protocol versions") + .with_root_certificates(load_certs()?) + .with_no_client_auth(), + ) +} + +fn establish_connection_rustls(config: &str) -> BoxFuture> { + let fut = async { + // We first set up the way we want rustls to work. + let rustls_config = client_config_with_root_certs() + .map_err(|err| ConnectionError::BadConnection(format!("{err:?}")))?; + let tls = tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config); + let (client, conn) = tokio_postgres::connect(config, tls) + .await + .map_err(|e| ConnectionError::BadConnection(e.to_string()))?; + + AsyncPgConnection::try_from_client_and_connection(client, conn).await + }; + fut.boxed() +} + /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably #[derive( QueryableByName, Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq,