diff --git a/.github/workflows/_build-and-test-locally.yml b/.github/workflows/_build-and-test-locally.yml index f97402a90b..2daed90386 100644 --- a/.github/workflows/_build-and-test-locally.yml +++ b/.github/workflows/_build-and-test-locally.yml @@ -158,6 +158,8 @@ jobs: - name: Run cargo build run: | + PQ_LIB_DIR=$(pwd)/pg_install/v16/lib + export PQ_LIB_DIR ${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests # Do install *before* running rust tests because they might recompile the @@ -215,6 +217,8 @@ jobs: env: NEXTEST_RETRIES: 3 run: | + PQ_LIB_DIR=$(pwd)/pg_install/v16/lib + export PQ_LIB_DIR LD_LIBRARY_PATH=$(pwd)/pg_install/v17/lib export LD_LIBRARY_PATH diff --git a/.github/workflows/build-macos.yml b/.github/workflows/build-macos.yml index 347a511e98..01d82a1ed2 100644 --- a/.github/workflows/build-macos.yml +++ b/.github/workflows/build-macos.yml @@ -235,7 +235,7 @@ jobs: echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV - name: Run cargo build (only for v17) - run: cargo build --all --release -j$(sysctl -n hw.ncpu) + run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release -j$(sysctl -n hw.ncpu) - name: Check that no warnings are produced (only for v17) run: ./run_clippy.sh diff --git a/.github/workflows/neon_extra_builds.yml b/.github/workflows/neon_extra_builds.yml index f077e04d1c..5b5910badf 100644 --- a/.github/workflows/neon_extra_builds.yml +++ b/.github/workflows/neon_extra_builds.yml @@ -114,7 +114,7 @@ jobs: run: make walproposer-lib -j$(nproc) - name: Produce the build stats - run: cargo build --all --release --timings -j$(nproc) + run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release --timings -j$(nproc) - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v4 diff --git a/Cargo.lock b/Cargo.lock index 9ba90355df..359f989a76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -941,18 +941,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" -[[package]] -name = "bb8" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8" -dependencies = [ - "async-trait", - "futures-util", - "parking_lot 0.12.1", - "tokio", -] - [[package]] name = "bcder" version = "0.7.4" @@ -1312,7 +1300,7 @@ dependencies = [ "tar", "thiserror 1.0.69", "tokio", - "tokio-postgres 0.7.9", + "tokio-postgres", "tokio-stream", "tokio-util", "tower 0.5.2", @@ -1421,7 +1409,7 @@ dependencies = [ "storage_broker", "thiserror 1.0.69", "tokio", - "tokio-postgres 0.7.9", + "tokio-postgres", "tokio-util", "toml", "toml_edit", @@ -1797,24 +1785,11 @@ dependencies = [ "chrono", "diesel_derives", "itoa", + "pq-sys", + "r2d2", "serde_json", ] -[[package]] -name = "diesel-async" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51a307ac00f7c23f526a04a77761a0519b9f0eb2838ebf5b905a58580095bdcb" -dependencies = [ - "async-trait", - "bb8", - "diesel", - "futures-util", - "scoped-futures", - "tokio", - "tokio-postgres 0.7.12", -] - [[package]] name = "diesel_derives" version = "2.2.1" @@ -4060,8 +4035,8 @@ dependencies = [ "pageserver_compaction", "pin-project-lite", "postgres", - "postgres-protocol 0.6.6", - "postgres-types 0.2.6", + "postgres-protocol", + "postgres-types", "postgres_backend", "postgres_connection", "postgres_ffi", @@ -4092,7 +4067,7 @@ dependencies = [ "tokio", "tokio-epoll-uring", "tokio-io-timeout", - "tokio-postgres 0.7.9", + "tokio-postgres", "tokio-stream", "tokio-tar", "tokio-util", @@ -4150,7 +4125,7 @@ dependencies = [ "serde", "thiserror 1.0.69", "tokio", - "tokio-postgres 0.7.9", + "tokio-postgres", "tokio-stream", "tokio-util", "utils", @@ -4456,7 +4431,7 @@ dependencies = [ "futures-util", "log", "tokio", - "tokio-postgres 0.7.9", + "tokio-postgres", ] [[package]] @@ -4477,24 +4452,6 @@ dependencies = [ "stringprep", ] -[[package]] -name = "postgres-protocol" -version = "0.6.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23" -dependencies = [ - "base64 0.22.1", - "byteorder", - "bytes", - "fallible-iterator", - "hmac", - "md-5", - "memchr", - "rand 0.8.5", - "sha2", - "stringprep", -] - [[package]] name = "postgres-protocol2" version = "0.1.0" @@ -4519,18 +4476,7 @@ dependencies = [ "bytes", "chrono", "fallible-iterator", - "postgres-protocol 0.6.6", -] - -[[package]] -name = "postgres-types" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f66ea23a2d0e5734297357705193335e0a957696f34bed2f2faefacb2fec336f" -dependencies = [ - "bytes", - "fallible-iterator", - "postgres-protocol 0.6.7", + "postgres-protocol", ] [[package]] @@ -4555,7 +4501,7 @@ dependencies = [ "serde", "thiserror 1.0.69", "tokio", - "tokio-postgres 0.7.9", + "tokio-postgres", "tokio-postgres-rustls", "tokio-rustls 0.26.0", "tokio-util", @@ -4570,7 +4516,7 @@ dependencies = [ "itertools 0.10.5", "once_cell", "postgres", - "tokio-postgres 0.7.9", + "tokio-postgres", "url", ] @@ -4657,6 +4603,15 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "pq-sys" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6cc05d7ea95200187117196eee9edd0644424911821aeb28a18ce60ea0b8793" +dependencies = [ + "vcpkg", +] + [[package]] name = "pq_proto" version = "0.1.0" @@ -4664,7 +4619,7 @@ dependencies = [ "byteorder", "bytes", "itertools 0.10.5", - "postgres-protocol 0.6.6", + "postgres-protocol", "rand 0.8.5", "serde", "thiserror 1.0.69", @@ -4912,7 +4867,7 @@ dependencies = [ "tikv-jemalloc-ctl", "tikv-jemallocator", "tokio", - "tokio-postgres 0.7.9", + "tokio-postgres", "tokio-postgres2", "tokio-rustls 0.26.0", "tokio-tungstenite 0.21.0", @@ -4969,6 +4924,17 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot 0.12.1", + "scheduled-thread-pool", +] + [[package]] name = "rand" version = "0.7.3" @@ -5700,7 +5666,7 @@ dependencies = [ "pageserver_api", "parking_lot 0.12.1", "postgres", - "postgres-protocol 0.6.6", + "postgres-protocol", "postgres_backend", "postgres_ffi", "pprof", @@ -5724,7 +5690,7 @@ dependencies = [ "tikv-jemallocator", "tokio", "tokio-io-timeout", - "tokio-postgres 0.7.9", + "tokio-postgres", "tokio-stream", "tokio-tar", "tokio-util", @@ -5783,12 +5749,12 @@ dependencies = [ ] [[package]] -name = "scoped-futures" -version = "0.1.4" +name = "scheduled-thread-pool" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" dependencies = [ - "pin-project-lite", + "parking_lot 0.12.1", ] [[package]] @@ -6323,7 +6289,6 @@ dependencies = [ "clap", "control_plane", "diesel", - "diesel-async", "diesel_migrations", "fail", "futures", @@ -6338,10 +6303,10 @@ dependencies = [ "pageserver_api", "pageserver_client", "postgres_connection", + "r2d2", "rand 0.8.5", "reqwest", "routerify", - "scoped-futures", "scopeguard", "serde", "serde_json", @@ -6394,7 +6359,7 @@ dependencies = [ "serde_json", "storage_controller_client", "tokio", - "tokio-postgres 0.7.9", + "tokio-postgres", "tokio-postgres-rustls", "tokio-stream", "tokio-util", @@ -6873,34 +6838,8 @@ dependencies = [ "percent-encoding", "phf", "pin-project-lite", - "postgres-protocol 0.6.6", - "postgres-types 0.2.6", - "rand 0.8.5", - "socket2", - "tokio", - "tokio-util", - "whoami", -] - -[[package]] -name = "tokio-postgres" -version = "0.7.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b5d3742945bc7d7f210693b0c58ae542c6fd47b17adbbda0885f3dcb34a6bdb" -dependencies = [ - "async-trait", - "byteorder", - "bytes", - "fallible-iterator", - "futures-channel", - "futures-util", - "log", - "parking_lot 0.12.1", - "percent-encoding", - "phf", - "pin-project-lite", - "postgres-protocol 0.6.7", - "postgres-types 0.2.8", + "postgres-protocol", + "postgres-types", "rand 0.8.5", "socket2", "tokio", @@ -6917,7 +6856,7 @@ dependencies = [ "ring", "rustls 0.23.18", "tokio", - "tokio-postgres 0.7.9", + "tokio-postgres", "tokio-rustls 0.26.0", "x509-certificate", ] @@ -7576,6 +7515,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" @@ -7595,7 +7540,7 @@ dependencies = [ "serde_json", "sysinfo", "tokio", - "tokio-postgres 0.7.9", + "tokio-postgres", "tokio-util", "tracing", "tracing-subscriber", diff --git a/Dockerfile b/Dockerfile index 7ba54c8ca5..f80666529b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -45,7 +45,7 @@ COPY --chown=nonroot . . ARG ADDITIONAL_RUSTFLAGS RUN set -e \ - && RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \ + && PQ_LIB_DIR=$(pwd)/pg_install/v${STABLE_PG_VERSION}/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \ --bin pg_sni_router \ --bin pageserver \ --bin pagectl \ diff --git a/Makefile b/Makefile index d1238caebf..22ebfea7d5 100644 --- a/Makefile +++ b/Makefile @@ -64,6 +64,8 @@ CARGO_BUILD_FLAGS += $(filter -j1,$(MAKEFLAGS)) CARGO_CMD_PREFIX += $(if $(filter n,$(MAKEFLAGS)),,+) # Force cargo not to print progress bar CARGO_CMD_PREFIX += CARGO_TERM_PROGRESS_WHEN=never CI=1 +# Set PQ_LIB_DIR to make sure `storage_controller` get linked with bundled libpq (through diesel) +CARGO_CMD_PREFIX += PQ_LIB_DIR=$(POSTGRES_INSTALL_DIR)/v16/lib CACHEDIR_TAG_CONTENTS := "Signature: 8a477f597d28d172789f06886806bc55" diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index 9860bd5d0e..caaa22d0a5 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -45,11 +45,12 @@ strum_macros.workspace = true diesel = { version = "2.2.6", features = [ "serde_json", + "postgres", + "r2d2", "chrono", ] } -diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper"] } diesel_migrations = { version = "2.2.0" } -scoped-futures = "0.1.4" +r2d2 = { version = "0.8.10" } utils = { path = "../libs/utils/" } metrics = { path = "../libs/metrics/" } diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 659c088d51..801409d612 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -308,7 +308,7 @@ async fn async_main() -> anyhow::Result<()> { // Validate that we can connect to the database Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?; - let persistence = Arc::new(Persistence::new(secrets.database_url).await); + let persistence = Arc::new(Persistence::new(secrets.database_url)); let service = Service::spawn(config, persistence.clone()).await?; diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 35eb15b297..37bfaf1139 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -5,12 +5,9 @@ use std::time::Duration; use std::time::Instant; use self::split_state::SplitState; +use diesel::pg::PgConnection; use diesel::prelude::*; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; -use diesel_async::pooled_connection::bb8::Pool; -use diesel_async::pooled_connection::AsyncDieselConnectionManager; -use diesel_async::RunQueryDsl; -use diesel_async::{AsyncConnection, AsyncPgConnection}; +use diesel::Connection; use itertools::Itertools; use pageserver_api::controller_api::AvailabilityZone; use pageserver_api::controller_api::MetadataHealthRecord; @@ -23,7 +20,6 @@ use pageserver_api::shard::ShardConfigError; use pageserver_api::shard::ShardIdentity; use pageserver_api::shard::ShardStripeSize; use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId}; -use scoped_futures::ScopedBoxFuture; use serde::{Deserialize, Serialize}; use utils::generation::Generation; use utils::id::{NodeId, TenantId}; @@ -64,7 +60,7 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); /// updated, and reads of nodes are always from memory, not the database. We only require that /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline. pub struct Persistence { - connection_pool: Pool, + connection_pool: diesel::r2d2::Pool>, } /// Legacy format, for use in JSON compat objects in test environment @@ -80,7 +76,7 @@ pub(crate) enum DatabaseError { #[error(transparent)] Connection(#[from] diesel::result::ConnectionError), #[error(transparent)] - ConnectionPool(#[from] diesel_async::pooled_connection::bb8::RunError), + ConnectionPool(#[from] r2d2::Error), #[error("Logical error: {0}")] Logical(String), #[error("Migration error: {0}")] @@ -128,7 +124,6 @@ pub(crate) enum AbortShardSplitStatus { pub(crate) type DatabaseResult = Result; /// Some methods can operate on either a whole tenant or a single shard -#[derive(Clone)] pub(crate) enum TenantFilter { Tenant(TenantId), Shard(TenantShardId), @@ -141,11 +136,6 @@ pub(crate) struct ShardGenerationState { pub(crate) generation_pageserver: Option, } -// A generous allowance for how many times we may retry serializable transactions -// before giving up. This is not expected to be hit: it is a defensive measure in case we -// somehow engineer a situation where duelling transactions might otherwise live-lock. -const MAX_RETRIES: usize = 128; - impl Persistence { // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect. @@ -155,12 +145,12 @@ impl Persistence { const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60); - pub async fn new(database_url: String) -> Self { - let manager = AsyncDieselConnectionManager::::new(database_url); + pub fn new(database_url: String) -> Self { + let manager = diesel::r2d2::ConnectionManager::::new(database_url); // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time // to execute queries (database queries are not generally on latency-sensitive paths). - let connection_pool = Pool::builder() + let connection_pool = diesel::r2d2::Pool::builder() .max_size(Self::MAX_CONNECTIONS) .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME)) .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT)) @@ -168,7 +158,6 @@ impl Persistence { .min_idle(Some(1)) .test_on_check_out(true) .build(manager) - .await .expect("Could not build connection pool"); Self { connection_pool } @@ -182,7 +171,7 @@ impl Persistence { ) -> Result<(), diesel::ConnectionError> { let started_at = Instant::now(); loop { - match AsyncPgConnection::establish(database_url).await { + match PgConnection::establish(database_url) { Ok(_) => { tracing::info!("Connected to database."); return Ok(()); @@ -203,22 +192,57 @@ impl Persistence { pub(crate) async fn migration_run(&self) -> DatabaseResult<()> { use diesel_migrations::{HarnessWithOutput, MigrationHarness}; - // Can't use self.with_conn here as we do spawn_blocking which requires static. - let conn = self - .connection_pool - .dedicated_connection() - .await - .map_err(|e| DatabaseError::Migration(e.to_string()))?; - let mut async_wrapper: AsyncConnectionWrapper = - AsyncConnectionWrapper::from(conn); - tokio::task::spawn_blocking(move || { + self.with_conn(move |conn| -> DatabaseResult<()> { + HarnessWithOutput::write_to_stdout(conn) + .run_pending_migrations(MIGRATIONS) + .map(|_| ()) + .map_err(|e| DatabaseError::Migration(e.to_string())) + }) + .await + } + + /// Wraps `with_conn` in order to collect latency and error metrics + async fn with_measured_conn(&self, op: DatabaseOperation, func: F) -> DatabaseResult + where + F: Fn(&mut PgConnection) -> DatabaseResult + Send + 'static, + R: Send + 'static, + { + let latency = &METRICS_REGISTRY + .metrics_group + .storage_controller_database_query_latency; + let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup { operation: op }); + + let res = self.with_conn(func).await; + + if let Err(err) = &res { + let error_counter = &METRICS_REGISTRY + .metrics_group + .storage_controller_database_query_error; + error_counter.inc(DatabaseQueryErrorLabelGroup { + error_type: err.error_label(), + operation: op, + }) + } + + res + } + + /// Call the provided function in a tokio blocking thread, with a Diesel database connection. + async fn with_conn(&self, func: F) -> DatabaseResult + where + F: Fn(&mut PgConnection) -> DatabaseResult + Send + 'static, + R: Send + 'static, + { + // A generous allowance for how many times we may retry serializable transactions + // before giving up. This is not expected to be hit: it is a defensive measure in case we + // somehow engineer a situation where duelling transactions might otherwise live-lock. + const MAX_RETRIES: usize = 128; + + let mut conn = self.connection_pool.get()?; + tokio::task::spawn_blocking(move || -> DatabaseResult { let mut retry_count = 0; loop { - let result = HarnessWithOutput::write_to_stdout(&mut async_wrapper) - .run_pending_migrations(MIGRATIONS) - .map(|_| ()) - .map_err(|e| DatabaseError::Migration(e.to_string())); - match result { + match conn.build_transaction().serializable().run(|c| func(c)) { Ok(r) => break Ok(r), Err( err @ DatabaseError::Query(diesel::result::Error::DatabaseError( @@ -247,112 +271,33 @@ impl Persistence { } }) .await - .map_err(|e| DatabaseError::Migration(e.to_string()))??; - Ok(()) - } - - /// Wraps `with_conn` in order to collect latency and error metrics - async fn with_measured_conn<'a, 'b, F, R>( - &self, - op: DatabaseOperation, - func: F, - ) -> DatabaseResult - where - F: for<'r> Fn(&'r mut AsyncPgConnection) -> ScopedBoxFuture<'b, 'r, DatabaseResult> - + Send - + std::marker::Sync - + 'a, - R: Send + 'b, - { - let latency = &METRICS_REGISTRY - .metrics_group - .storage_controller_database_query_latency; - let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup { operation: op }); - - let res = self.with_conn(func).await; - - if let Err(err) = &res { - let error_counter = &METRICS_REGISTRY - .metrics_group - .storage_controller_database_query_error; - error_counter.inc(DatabaseQueryErrorLabelGroup { - error_type: err.error_label(), - operation: op, - }) - } - - res - } - - /// Call the provided function with a Diesel database connection in a retry loop - async fn with_conn<'a, 'b, F, R>(&self, func: F) -> DatabaseResult - where - F: for<'r> Fn(&'r mut AsyncPgConnection) -> ScopedBoxFuture<'b, 'r, DatabaseResult> - + Send - + std::marker::Sync - + 'a, - R: Send + 'b, - { - let mut retry_count = 0; - loop { - let mut conn = self.connection_pool.get().await?; - match conn - .build_transaction() - .serializable() - .run(|c| func(c)) - .await - { - Ok(r) => break Ok(r), - Err( - err @ DatabaseError::Query(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::SerializationFailure, - _, - )), - ) => { - retry_count += 1; - if retry_count > MAX_RETRIES { - tracing::error!( - "Exceeded max retries on SerializationFailure errors: {err:?}" - ); - break Err(err); - } else { - // Retry on serialization errors: these are expected, because even though our - // transactions don't fight for the same rows, they will occasionally collide - // on index pages (e.g. increment_generation for unrelated shards can collide) - tracing::debug!("Retrying transaction on serialization failure {err:?}"); - continue; - } - } - Err(e) => break Err(e), - } - } + .expect("Task panic") } /// When a node is first registered, persist it before using it for anything pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> { - let np = &node.to_persistent(); - self.with_measured_conn(DatabaseOperation::InsertNode, move |conn| { - Box::pin(async move { + let np = node.to_persistent(); + self.with_measured_conn( + DatabaseOperation::InsertNode, + move |conn| -> DatabaseResult<()> { diesel::insert_into(crate::schema::nodes::table) - .values(np) - .execute(conn) - .await?; + .values(&np) + .execute(conn)?; Ok(()) - }) - }) + }, + ) .await } /// At startup, populate the list of nodes which our shards may be placed on pub(crate) async fn list_nodes(&self) -> DatabaseResult> { let nodes: Vec = self - .with_measured_conn(DatabaseOperation::ListNodes, move |conn| { - Box::pin(async move { - Ok(crate::schema::nodes::table - .load::(conn) - .await?) - }) - }) + .with_measured_conn( + DatabaseOperation::ListNodes, + move |conn| -> DatabaseResult<_> { + Ok(crate::schema::nodes::table.load::(conn)?) + }, + ) .await?; tracing::info!("list_nodes: loaded {} nodes", nodes.len()); @@ -368,14 +313,11 @@ impl Persistence { use crate::schema::nodes::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::UpdateNode, move |conn| { - Box::pin(async move { - let updated = diesel::update(nodes) - .filter(node_id.eq(input_node_id.0 as i64)) - .set((scheduling_policy.eq(String::from(input_scheduling)),)) - .execute(conn) - .await?; - Ok(updated) - }) + let updated = diesel::update(nodes) + .filter(node_id.eq(input_node_id.0 as i64)) + .set((scheduling_policy.eq(String::from(input_scheduling)),)) + .execute(conn)?; + Ok(updated) }) .await?; @@ -397,16 +339,17 @@ impl Persistence { &self, ) -> DatabaseResult> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn(DatabaseOperation::ListTenantShards, move |conn| { - Box::pin(async move { + self.with_measured_conn( + DatabaseOperation::ListTenantShards, + move |conn| -> DatabaseResult<_> { let query = tenant_shards.filter( placement_policy.ne(serde_json::to_string(&PlacementPolicy::Detached).unwrap()), ); - let result = query.load::(conn).await?; + let result = query.load::(conn)?; Ok(result) - }) - }) + }, + ) .await } @@ -416,14 +359,15 @@ impl Persistence { filter_tenant_id: TenantId, ) -> DatabaseResult> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn(DatabaseOperation::LoadTenant, move |conn| { - Box::pin(async move { + self.with_measured_conn( + DatabaseOperation::LoadTenant, + move |conn| -> DatabaseResult<_> { let query = tenant_shards.filter(tenant_id.eq(filter_tenant_id.to_string())); - let result = query.load::(conn).await?; + let result = query.load::(conn)?; Ok(result) - }) - }) + }, + ) .await } @@ -449,22 +393,19 @@ impl Persistence { }) .collect::>(); - let shards = &shards; - let metadata_health_records = &metadata_health_records; - self.with_measured_conn(DatabaseOperation::InsertTenantShards, move |conn| { - Box::pin(async move { + self.with_measured_conn( + DatabaseOperation::InsertTenantShards, + move |conn| -> DatabaseResult<()> { diesel::insert_into(tenant_shards::table) - .values(shards) - .execute(conn) - .await?; + .values(&shards) + .execute(conn)?; diesel::insert_into(metadata_health::table) - .values(metadata_health_records) - .execute(conn) - .await?; + .values(&metadata_health_records) + .execute(conn)?; Ok(()) - }) - }) + }, + ) .await } @@ -472,31 +413,31 @@ impl Persistence { /// the tenant from memory on this server. pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn(DatabaseOperation::DeleteTenant, move |conn| { - Box::pin(async move { + self.with_measured_conn( + DatabaseOperation::DeleteTenant, + move |conn| -> DatabaseResult<()> { // `metadata_health` status (if exists) is also deleted based on the cascade behavior. diesel::delete(tenant_shards) .filter(tenant_id.eq(del_tenant_id.to_string())) - .execute(conn) - .await?; + .execute(conn)?; Ok(()) - }) - }) + }, + ) .await } pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> { use crate::schema::nodes::dsl::*; - self.with_measured_conn(DatabaseOperation::DeleteNode, move |conn| { - Box::pin(async move { + self.with_measured_conn( + DatabaseOperation::DeleteNode, + move |conn| -> DatabaseResult<()> { diesel::delete(nodes) .filter(node_id.eq(del_node_id.0 as i64)) - .execute(conn) - .await?; + .execute(conn)?; Ok(()) - }) - }) + }, + ) .await } @@ -513,41 +454,34 @@ impl Persistence { use crate::schema::tenant_shards::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::ReAttach, move |conn| { - Box::pin(async move { - let rows_updated = diesel::update(tenant_shards) - .filter(generation_pageserver.eq(input_node_id.0 as i64)) - .set(generation.eq(generation + 1)) - .execute(conn) - .await?; + let rows_updated = diesel::update(tenant_shards) + .filter(generation_pageserver.eq(input_node_id.0 as i64)) + .set(generation.eq(generation + 1)) + .execute(conn)?; - tracing::info!("Incremented {} tenants' generations", rows_updated); + tracing::info!("Incremented {} tenants' generations", rows_updated); - // TODO: UPDATE+SELECT in one query + // TODO: UPDATE+SELECT in one query - let updated = tenant_shards - .filter(generation_pageserver.eq(input_node_id.0 as i64)) - .select(TenantShardPersistence::as_select()) - .load(conn) - .await?; + let updated = tenant_shards + .filter(generation_pageserver.eq(input_node_id.0 as i64)) + .select(TenantShardPersistence::as_select()) + .load(conn)?; - // If the node went through a drain and restart phase before re-attaching, - // then reset it's node scheduling policy to active. - diesel::update(nodes) - .filter(node_id.eq(input_node_id.0 as i64)) - .filter( - scheduling_policy - .eq(String::from(NodeSchedulingPolicy::PauseForRestart)) - .or(scheduling_policy - .eq(String::from(NodeSchedulingPolicy::Draining))) - .or(scheduling_policy - .eq(String::from(NodeSchedulingPolicy::Filling))), - ) - .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active))) - .execute(conn) - .await?; + // If the node went through a drain and restart phase before re-attaching, + // then reset it's node scheduling policy to active. + diesel::update(nodes) + .filter(node_id.eq(input_node_id.0 as i64)) + .filter( + scheduling_policy + .eq(String::from(NodeSchedulingPolicy::PauseForRestart)) + .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining))) + .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))), + ) + .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active))) + .execute(conn)?; - Ok(updated) - }) + Ok(updated) }) .await?; @@ -584,22 +518,19 @@ impl Persistence { use crate::schema::tenant_shards::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::IncrementGeneration, move |conn| { - Box::pin(async move { - let updated = diesel::update(tenant_shards) - .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) - .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) - .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .set(( - generation.eq(generation + 1), - generation_pageserver.eq(node_id.0 as i64), - )) - // TODO: only returning() the generation column - .returning(TenantShardPersistence::as_returning()) - .get_result(conn) - .await?; + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) + .set(( + generation.eq(generation + 1), + generation_pageserver.eq(node_id.0 as i64), + )) + // TODO: only returning() the generation column + .returning(TenantShardPersistence::as_returning()) + .get_result(conn)?; - Ok(updated) - }) + Ok(updated) }) .await?; @@ -631,15 +562,12 @@ impl Persistence { use crate::schema::tenant_shards::dsl::*; let rows = self .with_measured_conn(DatabaseOperation::TenantGenerations, move |conn| { - Box::pin(async move { - let result = tenant_shards - .filter(tenant_id.eq(filter_tenant_id.to_string())) - .select(TenantShardPersistence::as_select()) - .order(shard_number) - .load(conn) - .await?; - Ok(result) - }) + let result = tenant_shards + .filter(tenant_id.eq(filter_tenant_id.to_string())) + .select(TenantShardPersistence::as_select()) + .order(shard_number) + .load(conn)?; + Ok(result) }) .await?; @@ -687,18 +615,15 @@ impl Persistence { break; } - let in_clause = &in_clause; let chunk_rows = self .with_measured_conn(DatabaseOperation::ShardGenerations, move |conn| { - Box::pin(async move { - // diesel doesn't support multi-column IN queries, so we compose raw SQL. No escaping is required because - // the inputs are strongly typed and cannot carry any user-supplied raw string content. - let result : Vec = diesel::sql_query( - format!("SELECT * from tenant_shards where (tenant_id, shard_number, shard_count) in ({in_clause});").as_str() - ).load(conn).await?; + // diesel doesn't support multi-column IN queries, so we compose raw SQL. No escaping is required because + // the inputs are strongly typed and cannot carry any user-supplied raw string content. + let result : Vec = diesel::sql_query( + format!("SELECT * from tenant_shards where (tenant_id, shard_number, shard_count) in ({in_clause});").as_str() + ).load(conn)?; - Ok(result) - }) + Ok(result) }) .await?; rows.extend(chunk_rows.into_iter()) @@ -732,58 +657,51 @@ impl Persistence { ) -> DatabaseResult<()> { use crate::schema::tenant_shards::dsl::*; - let tenant = &tenant; - let input_placement_policy = &input_placement_policy; - let input_config = &input_config; - let input_generation = &input_generation; - let input_scheduling_policy = &input_scheduling_policy; self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| { - Box::pin(async move { - let query = match tenant { - TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards) - .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) - .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) - .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .into_boxed(), - TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards) - .filter(tenant_id.eq(input_tenant_id.to_string())) - .into_boxed(), - }; + let query = match tenant { + TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) + .into_boxed(), + TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards) + .filter(tenant_id.eq(input_tenant_id.to_string())) + .into_boxed(), + }; - // Clear generation_pageserver if we are moving into a state where we won't have - // any attached pageservers. - let input_generation_pageserver = match input_placement_policy { - None | Some(PlacementPolicy::Attached(_)) => None, - Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) => Some(None), - }; + // Clear generation_pageserver if we are moving into a state where we won't have + // any attached pageservers. + let input_generation_pageserver = match input_placement_policy { + None | Some(PlacementPolicy::Attached(_)) => None, + Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) => Some(None), + }; - #[derive(AsChangeset)] - #[diesel(table_name = crate::schema::tenant_shards)] - struct ShardUpdate { - generation: Option, - placement_policy: Option, - config: Option, - scheduling_policy: Option, - generation_pageserver: Option>, - } + #[derive(AsChangeset)] + #[diesel(table_name = crate::schema::tenant_shards)] + struct ShardUpdate { + generation: Option, + placement_policy: Option, + config: Option, + scheduling_policy: Option, + generation_pageserver: Option>, + } - let update = ShardUpdate { - generation: input_generation.map(|g| g.into().unwrap() as i32), - placement_policy: input_placement_policy - .as_ref() - .map(|p| serde_json::to_string(&p).unwrap()), - config: input_config - .as_ref() - .map(|c| serde_json::to_string(&c).unwrap()), - scheduling_policy: input_scheduling_policy - .map(|p| serde_json::to_string(&p).unwrap()), - generation_pageserver: input_generation_pageserver, - }; + let update = ShardUpdate { + generation: input_generation.map(|g| g.into().unwrap() as i32), + placement_policy: input_placement_policy + .as_ref() + .map(|p| serde_json::to_string(&p).unwrap()), + config: input_config + .as_ref() + .map(|c| serde_json::to_string(&c).unwrap()), + scheduling_policy: input_scheduling_policy + .map(|p| serde_json::to_string(&p).unwrap()), + generation_pageserver: input_generation_pageserver, + }; - query.set(update).execute(conn).await?; + query.set(update).execute(conn)?; - Ok(()) - }) + Ok(()) }) .await?; @@ -797,27 +715,23 @@ impl Persistence { ) -> DatabaseResult)>> { use crate::schema::tenant_shards::dsl::*; - let preferred_azs = preferred_azs.as_slice(); self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| { - Box::pin(async move { - let mut shards_updated = Vec::default(); + let mut shards_updated = Vec::default(); - for (tenant_shard_id, preferred_az) in preferred_azs.iter() { - let updated = diesel::update(tenant_shards) - .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) - .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) - .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .set(preferred_az_id.eq(preferred_az.as_ref().map(|az| az.0.clone()))) - .execute(conn) - .await?; + for (tenant_shard_id, preferred_az) in preferred_azs.iter() { + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) + .set(preferred_az_id.eq(preferred_az.as_ref().map(|az| az.0.clone()))) + .execute(conn)?; - if updated == 1 { - shards_updated.push((*tenant_shard_id, preferred_az.clone())); - } + if updated == 1 { + shards_updated.push((*tenant_shard_id, preferred_az.clone())); } + } - Ok(shards_updated) - }) + Ok(shards_updated) }) .await } @@ -825,21 +739,17 @@ impl Persistence { pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> { use crate::schema::tenant_shards::dsl::*; self.with_measured_conn(DatabaseOperation::Detach, move |conn| { - Box::pin(async move { - let updated = diesel::update(tenant_shards) - .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) - .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) - .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .set(( - generation_pageserver.eq(Option::::None), - placement_policy - .eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()), - )) - .execute(conn) - .await?; + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) + .set(( + generation_pageserver.eq(Option::::None), + placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()), + )) + .execute(conn)?; - Ok(updated) - }) + Ok(updated) }) .await?; @@ -858,16 +768,14 @@ impl Persistence { parent_to_children: Vec<(TenantShardId, Vec)>, ) -> DatabaseResult<()> { use crate::schema::tenant_shards::dsl::*; - let parent_to_children = parent_to_children.as_slice(); - self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| { - Box::pin(async move { + self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> { // Mark parent shards as splitting let updated = diesel::update(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .filter(shard_count.eq(old_shard_count.literal() as i32)) .set((splitting.eq(1),)) - .execute(conn).await?; + .execute(conn)?; if u8::try_from(updated) .map_err(|_| DatabaseError::Logical( format!("Overflow existing shard count {} while splitting", updated)) @@ -880,7 +788,7 @@ impl Persistence { } // FIXME: spurious clone to sidestep closure move rules - let parent_to_children = parent_to_children.to_vec(); + let parent_to_children = parent_to_children.clone(); // Insert child shards for (parent_shard_id, children) in parent_to_children { @@ -888,7 +796,7 @@ impl Persistence { .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string())) .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32)) .filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32)) - .load::(conn).await?; + .load::(conn)?; let parent = if parent.len() != 1 { return Err(DatabaseError::Logical(format!( "Parent shard {parent_shard_id} not found" @@ -903,13 +811,12 @@ impl Persistence { debug_assert!(shard.splitting == SplitState::Splitting); diesel::insert_into(tenant_shards) .values(shard) - .execute(conn).await?; + .execute(conn)?; } } Ok(()) }) - }) .await } @@ -921,26 +828,25 @@ impl Persistence { old_shard_count: ShardCount, ) -> DatabaseResult<()> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn(DatabaseOperation::CompleteShardSplit, move |conn| { - Box::pin(async move { + self.with_measured_conn( + DatabaseOperation::CompleteShardSplit, + move |conn| -> DatabaseResult<()> { // Drop parent shards diesel::delete(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .filter(shard_count.eq(old_shard_count.literal() as i32)) - .execute(conn) - .await?; + .execute(conn)?; // Clear sharding flag let updated = diesel::update(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .set((splitting.eq(0),)) - .execute(conn) - .await?; + .execute(conn)?; debug_assert!(updated > 0); Ok(()) - }) - }) + }, + ) .await } @@ -952,15 +858,15 @@ impl Persistence { new_shard_count: ShardCount, ) -> DatabaseResult { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn(DatabaseOperation::AbortShardSplit, move |conn| { - Box::pin(async move { + self.with_measured_conn( + DatabaseOperation::AbortShardSplit, + move |conn| -> DatabaseResult { // Clear the splitting state on parent shards let updated = diesel::update(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .filter(shard_count.ne(new_shard_count.literal() as i32)) .set((splitting.eq(0),)) - .execute(conn) - .await?; + .execute(conn)?; // Parent shards are already gone: we cannot abort. if updated == 0 { @@ -980,12 +886,11 @@ impl Persistence { diesel::delete(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .filter(shard_count.eq(new_shard_count.literal() as i32)) - .execute(conn) - .await?; + .execute(conn)?; Ok(AbortShardSplitStatus::Aborted) - }) - }) + }, + ) .await } @@ -1001,28 +906,25 @@ impl Persistence { ) -> DatabaseResult<()> { use crate::schema::metadata_health::dsl::*; - let healthy_records = healthy_records.as_slice(); - let unhealthy_records = unhealthy_records.as_slice(); - self.with_measured_conn(DatabaseOperation::UpdateMetadataHealth, move |conn| { - Box::pin(async move { + self.with_measured_conn( + DatabaseOperation::UpdateMetadataHealth, + move |conn| -> DatabaseResult<_> { diesel::insert_into(metadata_health) - .values(healthy_records) + .values(&healthy_records) .on_conflict((tenant_id, shard_number, shard_count)) .do_update() .set((healthy.eq(true), last_scrubbed_at.eq(now))) - .execute(conn) - .await?; + .execute(conn)?; diesel::insert_into(metadata_health) - .values(unhealthy_records) + .values(&unhealthy_records) .on_conflict((tenant_id, shard_number, shard_count)) .do_update() .set((healthy.eq(false), last_scrubbed_at.eq(now))) - .execute(conn) - .await?; + .execute(conn)?; Ok(()) - }) - }) + }, + ) .await } @@ -1031,13 +933,15 @@ impl Persistence { pub(crate) async fn list_metadata_health_records( &self, ) -> DatabaseResult> { - self.with_measured_conn(DatabaseOperation::ListMetadataHealth, move |conn| { - Box::pin(async { - Ok(crate::schema::metadata_health::table - .load::(conn) - .await?) - }) - }) + self.with_measured_conn( + DatabaseOperation::ListMetadataHealth, + move |conn| -> DatabaseResult<_> { + Ok( + crate::schema::metadata_health::table + .load::(conn)?, + ) + }, + ) .await } @@ -1049,15 +953,10 @@ impl Persistence { use crate::schema::metadata_health::dsl::*; self.with_measured_conn( DatabaseOperation::ListMetadataHealthUnhealthy, - move |conn| { - Box::pin(async { - DatabaseResult::Ok( - crate::schema::metadata_health::table - .filter(healthy.eq(false)) - .load::(conn) - .await?, - ) - }) + move |conn| -> DatabaseResult<_> { + Ok(crate::schema::metadata_health::table + .filter(healthy.eq(false)) + .load::(conn)?) }, ) .await @@ -1071,14 +970,15 @@ impl Persistence { ) -> DatabaseResult> { use crate::schema::metadata_health::dsl::*; - self.with_measured_conn(DatabaseOperation::ListMetadataHealthOutdated, move |conn| { - Box::pin(async move { + self.with_measured_conn( + DatabaseOperation::ListMetadataHealthOutdated, + move |conn| -> DatabaseResult<_> { let query = metadata_health.filter(last_scrubbed_at.lt(earlier)); - let res = query.load::(conn).await?; + let res = query.load::(conn)?; Ok(res) - }) - }) + }, + ) .await } @@ -1086,13 +986,12 @@ impl Persistence { /// It is an error for the table to contain more than one entry. pub(crate) async fn get_leader(&self) -> DatabaseResult> { let mut leader: Vec = self - .with_measured_conn(DatabaseOperation::GetLeader, move |conn| { - Box::pin(async move { - Ok(crate::schema::controllers::table - .load::(conn) - .await?) - }) - }) + .with_measured_conn( + DatabaseOperation::GetLeader, + move |conn| -> DatabaseResult<_> { + Ok(crate::schema::controllers::table.load::(conn)?) + }, + ) .await?; if leader.len() > 1 { @@ -1115,33 +1014,26 @@ impl Persistence { use crate::schema::controllers::dsl::*; let updated = self - .with_measured_conn(DatabaseOperation::UpdateLeader, move |conn| { - let prev = prev.clone(); - let new = new.clone(); - Box::pin(async move { + .with_measured_conn( + DatabaseOperation::UpdateLeader, + move |conn| -> DatabaseResult { let updated = match &prev { - Some(prev) => { - diesel::update(controllers) - .filter(address.eq(prev.address.clone())) - .filter(started_at.eq(prev.started_at)) - .set(( - address.eq(new.address.clone()), - started_at.eq(new.started_at), - )) - .execute(conn) - .await? - } - None => { - diesel::insert_into(controllers) - .values(new.clone()) - .execute(conn) - .await? - } + Some(prev) => diesel::update(controllers) + .filter(address.eq(prev.address.clone())) + .filter(started_at.eq(prev.started_at)) + .set(( + address.eq(new.address.clone()), + started_at.eq(new.started_at), + )) + .execute(conn)?, + None => diesel::insert_into(controllers) + .values(new.clone()) + .execute(conn)?, }; Ok(updated) - }) - }) + }, + ) .await?; if updated == 0 { @@ -1156,13 +1048,12 @@ impl Persistence { /// At startup, populate the list of nodes which our shards may be placed on pub(crate) async fn list_safekeepers(&self) -> DatabaseResult> { let safekeepers: Vec = self - .with_measured_conn(DatabaseOperation::ListNodes, move |conn| { - Box::pin(async move { - Ok(crate::schema::safekeepers::table - .load::(conn) - .await?) - }) - }) + .with_measured_conn( + DatabaseOperation::ListNodes, + move |conn| -> DatabaseResult<_> { + Ok(crate::schema::safekeepers::table.load::(conn)?) + }, + ) .await?; tracing::info!("list_safekeepers: loaded {} nodes", safekeepers.len()); @@ -1175,14 +1066,11 @@ impl Persistence { id: i64, ) -> Result { use crate::schema::safekeepers::dsl::{id as id_column, safekeepers}; - self.with_conn(move |conn| { - Box::pin(async move { - Ok(safekeepers - .filter(id_column.eq(&id)) - .select(SafekeeperPersistence::as_select()) - .get_result(conn) - .await?) - }) + self.with_conn(move |conn| -> DatabaseResult { + Ok(safekeepers + .filter(id_column.eq(&id)) + .select(SafekeeperPersistence::as_select()) + .get_result(conn)?) }) .await } @@ -1193,30 +1081,26 @@ impl Persistence { ) -> Result<(), DatabaseError> { use crate::schema::safekeepers::dsl::*; - self.with_conn(move |conn| { - let record = record.clone(); - Box::pin(async move { - let bind = record - .as_insert_or_update() - .map_err(|e| DatabaseError::Logical(format!("{e}")))?; + self.with_conn(move |conn| -> DatabaseResult<()> { + let bind = record + .as_insert_or_update() + .map_err(|e| DatabaseError::Logical(format!("{e}")))?; - let inserted_updated = diesel::insert_into(safekeepers) - .values(&bind) - .on_conflict(id) - .do_update() - .set(&bind) - .execute(conn) - .await?; + let inserted_updated = diesel::insert_into(safekeepers) + .values(&bind) + .on_conflict(id) + .do_update() + .set(&bind) + .execute(conn)?; - if inserted_updated != 1 { - return Err(DatabaseError::Logical(format!( - "unexpected number of rows ({})", - inserted_updated - ))); - } + if inserted_updated != 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({})", + inserted_updated + ))); + } - Ok(()) - }) + Ok(()) }) .await } @@ -1228,29 +1112,26 @@ impl Persistence { ) -> Result<(), DatabaseError> { use crate::schema::safekeepers::dsl::*; - self.with_conn(move |conn| { - Box::pin(async move { - #[derive(Insertable, AsChangeset)] - #[diesel(table_name = crate::schema::safekeepers)] - struct UpdateSkSchedulingPolicy<'a> { - id: i64, - scheduling_policy: &'a str, - } - let scheduling_policy_ = String::from(scheduling_policy_); + self.with_conn(move |conn| -> DatabaseResult<()> { + #[derive(Insertable, AsChangeset)] + #[diesel(table_name = crate::schema::safekeepers)] + struct UpdateSkSchedulingPolicy<'a> { + id: i64, + scheduling_policy: &'a str, + } + let scheduling_policy_ = String::from(scheduling_policy_); - let rows_affected = diesel::update(safekeepers.filter(id.eq(id_))) - .set(scheduling_policy.eq(scheduling_policy_)) - .execute(conn) - .await?; + let rows_affected = diesel::update(safekeepers.filter(id.eq(id_))) + .set(scheduling_policy.eq(scheduling_policy_)) + .execute(conn)?; - if rows_affected != 1 { - return Err(DatabaseError::Logical(format!( - "unexpected number of rows ({rows_affected})", - ))); - } + if rows_affected != 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({rows_affected})", + ))); + } - Ok(()) - }) + Ok(()) }) .await }