Revert "storcon: switch to diesel-async and tokio-postgres (#10280)" (#10592)

There was a regression of #10280, tracked in
[#23583](https://github.com/neondatabase/cloud/issues/23583).

I have ideas how to fix the issue, but we are too close to the release
cutoff, so revert #10280 for now. We can revert the revert later :).
This commit is contained in:
Arpad Müller
2025-01-30 20:23:25 +01:00
committed by GitHub
parent bae0de643e
commit 4d2c2e9460
9 changed files with 397 additions and 564 deletions

View File

@@ -158,6 +158,8 @@ jobs:
- name: Run cargo build
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
# Do install *before* running rust tests because they might recompile the
@@ -215,6 +217,8 @@ jobs:
env:
NEXTEST_RETRIES: 3
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
LD_LIBRARY_PATH=$(pwd)/pg_install/v17/lib
export LD_LIBRARY_PATH

View File

@@ -235,7 +235,7 @@ jobs:
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
- name: Run cargo build (only for v17)
run: cargo build --all --release -j$(sysctl -n hw.ncpu)
run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release -j$(sysctl -n hw.ncpu)
- name: Check that no warnings are produced (only for v17)
run: ./run_clippy.sh

View File

@@ -114,7 +114,7 @@ jobs:
run: make walproposer-lib -j$(nproc)
- name: Produce the build stats
run: cargo build --all --release --timings -j$(nproc)
run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release --timings -j$(nproc)
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4

159
Cargo.lock generated
View File

@@ -941,18 +941,6 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bb8"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
dependencies = [
"async-trait",
"futures-util",
"parking_lot 0.12.1",
"tokio",
]
[[package]]
name = "bcder"
version = "0.7.4"
@@ -1312,7 +1300,7 @@ dependencies = [
"tar",
"thiserror 1.0.69",
"tokio",
"tokio-postgres 0.7.9",
"tokio-postgres",
"tokio-stream",
"tokio-util",
"tower 0.5.2",
@@ -1421,7 +1409,7 @@ dependencies = [
"storage_broker",
"thiserror 1.0.69",
"tokio",
"tokio-postgres 0.7.9",
"tokio-postgres",
"tokio-util",
"toml",
"toml_edit",
@@ -1797,24 +1785,11 @@ dependencies = [
"chrono",
"diesel_derives",
"itoa",
"pq-sys",
"r2d2",
"serde_json",
]
[[package]]
name = "diesel-async"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51a307ac00f7c23f526a04a77761a0519b9f0eb2838ebf5b905a58580095bdcb"
dependencies = [
"async-trait",
"bb8",
"diesel",
"futures-util",
"scoped-futures",
"tokio",
"tokio-postgres 0.7.12",
]
[[package]]
name = "diesel_derives"
version = "2.2.1"
@@ -4060,8 +4035,8 @@ dependencies = [
"pageserver_compaction",
"pin-project-lite",
"postgres",
"postgres-protocol 0.6.6",
"postgres-types 0.2.6",
"postgres-protocol",
"postgres-types",
"postgres_backend",
"postgres_connection",
"postgres_ffi",
@@ -4092,7 +4067,7 @@ dependencies = [
"tokio",
"tokio-epoll-uring",
"tokio-io-timeout",
"tokio-postgres 0.7.9",
"tokio-postgres",
"tokio-stream",
"tokio-tar",
"tokio-util",
@@ -4150,7 +4125,7 @@ dependencies = [
"serde",
"thiserror 1.0.69",
"tokio",
"tokio-postgres 0.7.9",
"tokio-postgres",
"tokio-stream",
"tokio-util",
"utils",
@@ -4456,7 +4431,7 @@ dependencies = [
"futures-util",
"log",
"tokio",
"tokio-postgres 0.7.9",
"tokio-postgres",
]
[[package]]
@@ -4477,24 +4452,6 @@ dependencies = [
"stringprep",
]
[[package]]
name = "postgres-protocol"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23"
dependencies = [
"base64 0.22.1",
"byteorder",
"bytes",
"fallible-iterator",
"hmac",
"md-5",
"memchr",
"rand 0.8.5",
"sha2",
"stringprep",
]
[[package]]
name = "postgres-protocol2"
version = "0.1.0"
@@ -4519,18 +4476,7 @@ dependencies = [
"bytes",
"chrono",
"fallible-iterator",
"postgres-protocol 0.6.6",
]
[[package]]
name = "postgres-types"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f66ea23a2d0e5734297357705193335e0a957696f34bed2f2faefacb2fec336f"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol 0.6.7",
"postgres-protocol",
]
[[package]]
@@ -4555,7 +4501,7 @@ dependencies = [
"serde",
"thiserror 1.0.69",
"tokio",
"tokio-postgres 0.7.9",
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-rustls 0.26.0",
"tokio-util",
@@ -4570,7 +4516,7 @@ dependencies = [
"itertools 0.10.5",
"once_cell",
"postgres",
"tokio-postgres 0.7.9",
"tokio-postgres",
"url",
]
@@ -4657,6 +4603,15 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pq-sys"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6cc05d7ea95200187117196eee9edd0644424911821aeb28a18ce60ea0b8793"
dependencies = [
"vcpkg",
]
[[package]]
name = "pq_proto"
version = "0.1.0"
@@ -4664,7 +4619,7 @@ dependencies = [
"byteorder",
"bytes",
"itertools 0.10.5",
"postgres-protocol 0.6.6",
"postgres-protocol",
"rand 0.8.5",
"serde",
"thiserror 1.0.69",
@@ -4912,7 +4867,7 @@ dependencies = [
"tikv-jemalloc-ctl",
"tikv-jemallocator",
"tokio",
"tokio-postgres 0.7.9",
"tokio-postgres",
"tokio-postgres2",
"tokio-rustls 0.26.0",
"tokio-tungstenite 0.21.0",
@@ -4969,6 +4924,17 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot 0.12.1",
"scheduled-thread-pool",
]
[[package]]
name = "rand"
version = "0.7.3"
@@ -5700,7 +5666,7 @@ dependencies = [
"pageserver_api",
"parking_lot 0.12.1",
"postgres",
"postgres-protocol 0.6.6",
"postgres-protocol",
"postgres_backend",
"postgres_ffi",
"pprof",
@@ -5724,7 +5690,7 @@ dependencies = [
"tikv-jemallocator",
"tokio",
"tokio-io-timeout",
"tokio-postgres 0.7.9",
"tokio-postgres",
"tokio-stream",
"tokio-tar",
"tokio-util",
@@ -5783,12 +5749,12 @@ dependencies = [
]
[[package]]
name = "scoped-futures"
version = "0.1.4"
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"pin-project-lite",
"parking_lot 0.12.1",
]
[[package]]
@@ -6323,7 +6289,6 @@ dependencies = [
"clap",
"control_plane",
"diesel",
"diesel-async",
"diesel_migrations",
"fail",
"futures",
@@ -6338,10 +6303,10 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"postgres_connection",
"r2d2",
"rand 0.8.5",
"reqwest",
"routerify",
"scoped-futures",
"scopeguard",
"serde",
"serde_json",
@@ -6394,7 +6359,7 @@ dependencies = [
"serde_json",
"storage_controller_client",
"tokio",
"tokio-postgres 0.7.9",
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-stream",
"tokio-util",
@@ -6873,34 +6838,8 @@ dependencies = [
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol 0.6.6",
"postgres-types 0.2.6",
"rand 0.8.5",
"socket2",
"tokio",
"tokio-util",
"whoami",
]
[[package]]
name = "tokio-postgres"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b5d3742945bc7d7f210693b0c58ae542c6fd47b17adbbda0885f3dcb34a6bdb"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"log",
"parking_lot 0.12.1",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol 0.6.7",
"postgres-types 0.2.8",
"postgres-protocol",
"postgres-types",
"rand 0.8.5",
"socket2",
"tokio",
@@ -6917,7 +6856,7 @@ dependencies = [
"ring",
"rustls 0.23.18",
"tokio",
"tokio-postgres 0.7.9",
"tokio-postgres",
"tokio-rustls 0.26.0",
"x509-certificate",
]
@@ -7576,6 +7515,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.4"
@@ -7595,7 +7540,7 @@ dependencies = [
"serde_json",
"sysinfo",
"tokio",
"tokio-postgres 0.7.9",
"tokio-postgres",
"tokio-util",
"tracing",
"tracing-subscriber",

View File

@@ -45,7 +45,7 @@ COPY --chown=nonroot . .
ARG ADDITIONAL_RUSTFLAGS
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
&& PQ_LIB_DIR=$(pwd)/pg_install/v${STABLE_PG_VERSION}/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \

View File

@@ -64,6 +64,8 @@ CARGO_BUILD_FLAGS += $(filter -j1,$(MAKEFLAGS))
CARGO_CMD_PREFIX += $(if $(filter n,$(MAKEFLAGS)),,+)
# Force cargo not to print progress bar
CARGO_CMD_PREFIX += CARGO_TERM_PROGRESS_WHEN=never CI=1
# Set PQ_LIB_DIR to make sure `storage_controller` get linked with bundled libpq (through diesel)
CARGO_CMD_PREFIX += PQ_LIB_DIR=$(POSTGRES_INSTALL_DIR)/v16/lib
CACHEDIR_TAG_CONTENTS := "Signature: 8a477f597d28d172789f06886806bc55"

View File

@@ -45,11 +45,12 @@ strum_macros.workspace = true
diesel = { version = "2.2.6", features = [
"serde_json",
"postgres",
"r2d2",
"chrono",
] }
diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper"] }
diesel_migrations = { version = "2.2.0" }
scoped-futures = "0.1.4"
r2d2 = { version = "0.8.10" }
utils = { path = "../libs/utils/" }
metrics = { path = "../libs/metrics/" }

View File

@@ -308,7 +308,7 @@ async fn async_main() -> anyhow::Result<()> {
// Validate that we can connect to the database
Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?;
let persistence = Arc::new(Persistence::new(secrets.database_url).await);
let persistence = Arc::new(Persistence::new(secrets.database_url));
let service = Service::spawn(config, persistence.clone()).await?;

View File

@@ -5,12 +5,9 @@ use std::time::Duration;
use std::time::Instant;
use self::split_state::SplitState;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::pooled_connection::bb8::Pool;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::RunQueryDsl;
use diesel_async::{AsyncConnection, AsyncPgConnection};
use diesel::Connection;
use itertools::Itertools;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::controller_api::MetadataHealthRecord;
@@ -23,7 +20,6 @@ use pageserver_api::shard::ShardConfigError;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::ShardStripeSize;
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
use scoped_futures::ScopedBoxFuture;
use serde::{Deserialize, Serialize};
use utils::generation::Generation;
use utils::id::{NodeId, TenantId};
@@ -64,7 +60,7 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
/// updated, and reads of nodes are always from memory, not the database. We only require that
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
pub struct Persistence {
connection_pool: Pool<AsyncPgConnection>,
connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
}
/// Legacy format, for use in JSON compat objects in test environment
@@ -80,7 +76,7 @@ pub(crate) enum DatabaseError {
#[error(transparent)]
Connection(#[from] diesel::result::ConnectionError),
#[error(transparent)]
ConnectionPool(#[from] diesel_async::pooled_connection::bb8::RunError),
ConnectionPool(#[from] r2d2::Error),
#[error("Logical error: {0}")]
Logical(String),
#[error("Migration error: {0}")]
@@ -128,7 +124,6 @@ pub(crate) enum AbortShardSplitStatus {
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
/// Some methods can operate on either a whole tenant or a single shard
#[derive(Clone)]
pub(crate) enum TenantFilter {
Tenant(TenantId),
Shard(TenantShardId),
@@ -141,11 +136,6 @@ pub(crate) struct ShardGenerationState {
pub(crate) generation_pageserver: Option<NodeId>,
}
// A generous allowance for how many times we may retry serializable transactions
// before giving up. This is not expected to be hit: it is a defensive measure in case we
// somehow engineer a situation where duelling transactions might otherwise live-lock.
const MAX_RETRIES: usize = 128;
impl Persistence {
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
@@ -155,12 +145,12 @@ impl Persistence {
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
pub async fn new(database_url: String) -> Self {
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(database_url);
pub fn new(database_url: String) -> Self {
let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
// We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
// to execute queries (database queries are not generally on latency-sensitive paths).
let connection_pool = Pool::builder()
let connection_pool = diesel::r2d2::Pool::builder()
.max_size(Self::MAX_CONNECTIONS)
.max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
.idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
@@ -168,7 +158,6 @@ impl Persistence {
.min_idle(Some(1))
.test_on_check_out(true)
.build(manager)
.await
.expect("Could not build connection pool");
Self { connection_pool }
@@ -182,7 +171,7 @@ impl Persistence {
) -> Result<(), diesel::ConnectionError> {
let started_at = Instant::now();
loop {
match AsyncPgConnection::establish(database_url).await {
match PgConnection::establish(database_url) {
Ok(_) => {
tracing::info!("Connected to database.");
return Ok(());
@@ -203,22 +192,57 @@ impl Persistence {
pub(crate) async fn migration_run(&self) -> DatabaseResult<()> {
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
// Can't use self.with_conn here as we do spawn_blocking which requires static.
let conn = self
.connection_pool
.dedicated_connection()
.await
.map_err(|e| DatabaseError::Migration(e.to_string()))?;
let mut async_wrapper: AsyncConnectionWrapper<AsyncPgConnection> =
AsyncConnectionWrapper::from(conn);
tokio::task::spawn_blocking(move || {
let mut retry_count = 0;
loop {
let result = HarnessWithOutput::write_to_stdout(&mut async_wrapper)
self.with_conn(move |conn| -> DatabaseResult<()> {
HarnessWithOutput::write_to_stdout(conn)
.run_pending_migrations(MIGRATIONS)
.map(|_| ())
.map_err(|e| DatabaseError::Migration(e.to_string()));
match result {
.map_err(|e| DatabaseError::Migration(e.to_string()))
})
.await
}
/// Wraps `with_conn` in order to collect latency and error metrics
async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
where
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
R: Send + 'static,
{
let latency = &METRICS_REGISTRY
.metrics_group
.storage_controller_database_query_latency;
let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup { operation: op });
let res = self.with_conn(func).await;
if let Err(err) = &res {
let error_counter = &METRICS_REGISTRY
.metrics_group
.storage_controller_database_query_error;
error_counter.inc(DatabaseQueryErrorLabelGroup {
error_type: err.error_label(),
operation: op,
})
}
res
}
/// Call the provided function in a tokio blocking thread, with a Diesel database connection.
async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
where
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
R: Send + 'static,
{
// A generous allowance for how many times we may retry serializable transactions
// before giving up. This is not expected to be hit: it is a defensive measure in case we
// somehow engineer a situation where duelling transactions might otherwise live-lock.
const MAX_RETRIES: usize = 128;
let mut conn = self.connection_pool.get()?;
tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
let mut retry_count = 0;
loop {
match conn.build_transaction().serializable().run(|c| func(c)) {
Ok(r) => break Ok(r),
Err(
err @ DatabaseError::Query(diesel::result::Error::DatabaseError(
@@ -247,112 +271,33 @@ impl Persistence {
}
})
.await
.map_err(|e| DatabaseError::Migration(e.to_string()))??;
Ok(())
}
/// Wraps `with_conn` in order to collect latency and error metrics
async fn with_measured_conn<'a, 'b, F, R>(
&self,
op: DatabaseOperation,
func: F,
) -> DatabaseResult<R>
where
F: for<'r> Fn(&'r mut AsyncPgConnection) -> ScopedBoxFuture<'b, 'r, DatabaseResult<R>>
+ Send
+ std::marker::Sync
+ 'a,
R: Send + 'b,
{
let latency = &METRICS_REGISTRY
.metrics_group
.storage_controller_database_query_latency;
let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup { operation: op });
let res = self.with_conn(func).await;
if let Err(err) = &res {
let error_counter = &METRICS_REGISTRY
.metrics_group
.storage_controller_database_query_error;
error_counter.inc(DatabaseQueryErrorLabelGroup {
error_type: err.error_label(),
operation: op,
})
}
res
}
/// Call the provided function with a Diesel database connection in a retry loop
async fn with_conn<'a, 'b, F, R>(&self, func: F) -> DatabaseResult<R>
where
F: for<'r> Fn(&'r mut AsyncPgConnection) -> ScopedBoxFuture<'b, 'r, DatabaseResult<R>>
+ Send
+ std::marker::Sync
+ 'a,
R: Send + 'b,
{
let mut retry_count = 0;
loop {
let mut conn = self.connection_pool.get().await?;
match conn
.build_transaction()
.serializable()
.run(|c| func(c))
.await
{
Ok(r) => break Ok(r),
Err(
err @ DatabaseError::Query(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::SerializationFailure,
_,
)),
) => {
retry_count += 1;
if retry_count > MAX_RETRIES {
tracing::error!(
"Exceeded max retries on SerializationFailure errors: {err:?}"
);
break Err(err);
} else {
// Retry on serialization errors: these are expected, because even though our
// transactions don't fight for the same rows, they will occasionally collide
// on index pages (e.g. increment_generation for unrelated shards can collide)
tracing::debug!("Retrying transaction on serialization failure {err:?}");
continue;
}
}
Err(e) => break Err(e),
}
}
.expect("Task panic")
}
/// When a node is first registered, persist it before using it for anything
pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
let np = &node.to_persistent();
self.with_measured_conn(DatabaseOperation::InsertNode, move |conn| {
Box::pin(async move {
let np = node.to_persistent();
self.with_measured_conn(
DatabaseOperation::InsertNode,
move |conn| -> DatabaseResult<()> {
diesel::insert_into(crate::schema::nodes::table)
.values(np)
.execute(conn)
.await?;
.values(&np)
.execute(conn)?;
Ok(())
})
})
},
)
.await
}
/// At startup, populate the list of nodes which our shards may be placed on
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
let nodes: Vec<NodePersistence> = self
.with_measured_conn(DatabaseOperation::ListNodes, move |conn| {
Box::pin(async move {
Ok(crate::schema::nodes::table
.load::<NodePersistence>(conn)
.await?)
})
})
.with_measured_conn(
DatabaseOperation::ListNodes,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
},
)
.await?;
tracing::info!("list_nodes: loaded {} nodes", nodes.len());
@@ -368,15 +313,12 @@ impl Persistence {
use crate::schema::nodes::dsl::*;
let updated = self
.with_measured_conn(DatabaseOperation::UpdateNode, move |conn| {
Box::pin(async move {
let updated = diesel::update(nodes)
.filter(node_id.eq(input_node_id.0 as i64))
.set((scheduling_policy.eq(String::from(input_scheduling)),))
.execute(conn)
.await?;
.execute(conn)?;
Ok(updated)
})
})
.await?;
if updated != 1 {
@@ -397,16 +339,17 @@ impl Persistence {
&self,
) -> DatabaseResult<Vec<TenantShardPersistence>> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::ListTenantShards, move |conn| {
Box::pin(async move {
self.with_measured_conn(
DatabaseOperation::ListTenantShards,
move |conn| -> DatabaseResult<_> {
let query = tenant_shards.filter(
placement_policy.ne(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
);
let result = query.load::<TenantShardPersistence>(conn).await?;
let result = query.load::<TenantShardPersistence>(conn)?;
Ok(result)
})
})
},
)
.await
}
@@ -416,14 +359,15 @@ impl Persistence {
filter_tenant_id: TenantId,
) -> DatabaseResult<Vec<TenantShardPersistence>> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::LoadTenant, move |conn| {
Box::pin(async move {
self.with_measured_conn(
DatabaseOperation::LoadTenant,
move |conn| -> DatabaseResult<_> {
let query = tenant_shards.filter(tenant_id.eq(filter_tenant_id.to_string()));
let result = query.load::<TenantShardPersistence>(conn).await?;
let result = query.load::<TenantShardPersistence>(conn)?;
Ok(result)
})
})
},
)
.await
}
@@ -449,22 +393,19 @@ impl Persistence {
})
.collect::<Vec<_>>();
let shards = &shards;
let metadata_health_records = &metadata_health_records;
self.with_measured_conn(DatabaseOperation::InsertTenantShards, move |conn| {
Box::pin(async move {
self.with_measured_conn(
DatabaseOperation::InsertTenantShards,
move |conn| -> DatabaseResult<()> {
diesel::insert_into(tenant_shards::table)
.values(shards)
.execute(conn)
.await?;
.values(&shards)
.execute(conn)?;
diesel::insert_into(metadata_health::table)
.values(metadata_health_records)
.execute(conn)
.await?;
.values(&metadata_health_records)
.execute(conn)?;
Ok(())
})
})
},
)
.await
}
@@ -472,31 +413,31 @@ impl Persistence {
/// the tenant from memory on this server.
pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::DeleteTenant, move |conn| {
Box::pin(async move {
self.with_measured_conn(
DatabaseOperation::DeleteTenant,
move |conn| -> DatabaseResult<()> {
// `metadata_health` status (if exists) is also deleted based on the cascade behavior.
diesel::delete(tenant_shards)
.filter(tenant_id.eq(del_tenant_id.to_string()))
.execute(conn)
.await?;
.execute(conn)?;
Ok(())
})
})
},
)
.await
}
pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
self.with_measured_conn(DatabaseOperation::DeleteNode, move |conn| {
Box::pin(async move {
self.with_measured_conn(
DatabaseOperation::DeleteNode,
move |conn| -> DatabaseResult<()> {
diesel::delete(nodes)
.filter(node_id.eq(del_node_id.0 as i64))
.execute(conn)
.await?;
.execute(conn)?;
Ok(())
})
})
},
)
.await
}
@@ -513,12 +454,10 @@ impl Persistence {
use crate::schema::tenant_shards::dsl::*;
let updated = self
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
Box::pin(async move {
let rows_updated = diesel::update(tenant_shards)
.filter(generation_pageserver.eq(input_node_id.0 as i64))
.set(generation.eq(generation + 1))
.execute(conn)
.await?;
.execute(conn)?;
tracing::info!("Incremented {} tenants' generations", rows_updated);
@@ -527,8 +466,7 @@ impl Persistence {
let updated = tenant_shards
.filter(generation_pageserver.eq(input_node_id.0 as i64))
.select(TenantShardPersistence::as_select())
.load(conn)
.await?;
.load(conn)?;
// If the node went through a drain and restart phase before re-attaching,
// then reset it's node scheduling policy to active.
@@ -537,18 +475,14 @@ impl Persistence {
.filter(
scheduling_policy
.eq(String::from(NodeSchedulingPolicy::PauseForRestart))
.or(scheduling_policy
.eq(String::from(NodeSchedulingPolicy::Draining)))
.or(scheduling_policy
.eq(String::from(NodeSchedulingPolicy::Filling))),
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining)))
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))),
)
.set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
.execute(conn)
.await?;
.execute(conn)?;
Ok(updated)
})
})
.await?;
let mut result = HashMap::new();
@@ -584,7 +518,6 @@ impl Persistence {
use crate::schema::tenant_shards::dsl::*;
let updated = self
.with_measured_conn(DatabaseOperation::IncrementGeneration, move |conn| {
Box::pin(async move {
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
@@ -595,12 +528,10 @@ impl Persistence {
))
// TODO: only returning() the generation column
.returning(TenantShardPersistence::as_returning())
.get_result(conn)
.await?;
.get_result(conn)?;
Ok(updated)
})
})
.await?;
// Generation is always non-null in the rseult: if the generation column had been NULL, then we
@@ -631,16 +562,13 @@ impl Persistence {
use crate::schema::tenant_shards::dsl::*;
let rows = self
.with_measured_conn(DatabaseOperation::TenantGenerations, move |conn| {
Box::pin(async move {
let result = tenant_shards
.filter(tenant_id.eq(filter_tenant_id.to_string()))
.select(TenantShardPersistence::as_select())
.order(shard_number)
.load(conn)
.await?;
.load(conn)?;
Ok(result)
})
})
.await?;
Ok(rows
@@ -687,19 +615,16 @@ impl Persistence {
break;
}
let in_clause = &in_clause;
let chunk_rows = self
.with_measured_conn(DatabaseOperation::ShardGenerations, move |conn| {
Box::pin(async move {
// diesel doesn't support multi-column IN queries, so we compose raw SQL. No escaping is required because
// the inputs are strongly typed and cannot carry any user-supplied raw string content.
let result : Vec<TenantShardPersistence> = diesel::sql_query(
format!("SELECT * from tenant_shards where (tenant_id, shard_number, shard_count) in ({in_clause});").as_str()
).load(conn).await?;
).load(conn)?;
Ok(result)
})
})
.await?;
rows.extend(chunk_rows.into_iter())
}
@@ -732,13 +657,7 @@ impl Persistence {
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
let tenant = &tenant;
let input_placement_policy = &input_placement_policy;
let input_config = &input_config;
let input_generation = &input_generation;
let input_scheduling_policy = &input_scheduling_policy;
self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| {
Box::pin(async move {
let query = match tenant {
TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards)
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
@@ -780,11 +699,10 @@ impl Persistence {
generation_pageserver: input_generation_pageserver,
};
query.set(update).execute(conn).await?;
query.set(update).execute(conn)?;
Ok(())
})
})
.await?;
Ok(())
@@ -797,9 +715,7 @@ impl Persistence {
) -> DatabaseResult<Vec<(TenantShardId, Option<AvailabilityZone>)>> {
use crate::schema::tenant_shards::dsl::*;
let preferred_azs = preferred_azs.as_slice();
self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| {
Box::pin(async move {
let mut shards_updated = Vec::default();
for (tenant_shard_id, preferred_az) in preferred_azs.iter() {
@@ -808,8 +724,7 @@ impl Persistence {
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
.set(preferred_az_id.eq(preferred_az.as_ref().map(|az| az.0.clone())))
.execute(conn)
.await?;
.execute(conn)?;
if updated == 1 {
shards_updated.push((*tenant_shard_id, preferred_az.clone()));
@@ -818,29 +733,24 @@ impl Persistence {
Ok(shards_updated)
})
})
.await
}
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::Detach, move |conn| {
Box::pin(async move {
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
.set((
generation_pageserver.eq(Option::<i64>::None),
placement_policy
.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
))
.execute(conn)
.await?;
.execute(conn)?;
Ok(updated)
})
})
.await?;
Ok(())
@@ -858,16 +768,14 @@ impl Persistence {
parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
let parent_to_children = parent_to_children.as_slice();
self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| {
Box::pin(async move {
self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> {
// Mark parent shards as splitting
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(old_shard_count.literal() as i32))
.set((splitting.eq(1),))
.execute(conn).await?;
.execute(conn)?;
if u8::try_from(updated)
.map_err(|_| DatabaseError::Logical(
format!("Overflow existing shard count {} while splitting", updated))
@@ -880,7 +788,7 @@ impl Persistence {
}
// FIXME: spurious clone to sidestep closure move rules
let parent_to_children = parent_to_children.to_vec();
let parent_to_children = parent_to_children.clone();
// Insert child shards
for (parent_shard_id, children) in parent_to_children {
@@ -888,7 +796,7 @@ impl Persistence {
.filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32))
.load::<TenantShardPersistence>(conn).await?;
.load::<TenantShardPersistence>(conn)?;
let parent = if parent.len() != 1 {
return Err(DatabaseError::Logical(format!(
"Parent shard {parent_shard_id} not found"
@@ -903,13 +811,12 @@ impl Persistence {
debug_assert!(shard.splitting == SplitState::Splitting);
diesel::insert_into(tenant_shards)
.values(shard)
.execute(conn).await?;
.execute(conn)?;
}
}
Ok(())
})
})
.await
}
@@ -921,26 +828,25 @@ impl Persistence {
old_shard_count: ShardCount,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::CompleteShardSplit, move |conn| {
Box::pin(async move {
self.with_measured_conn(
DatabaseOperation::CompleteShardSplit,
move |conn| -> DatabaseResult<()> {
// Drop parent shards
diesel::delete(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(old_shard_count.literal() as i32))
.execute(conn)
.await?;
.execute(conn)?;
// Clear sharding flag
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.set((splitting.eq(0),))
.execute(conn)
.await?;
.execute(conn)?;
debug_assert!(updated > 0);
Ok(())
})
})
},
)
.await
}
@@ -952,15 +858,15 @@ impl Persistence {
new_shard_count: ShardCount,
) -> DatabaseResult<AbortShardSplitStatus> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::AbortShardSplit, move |conn| {
Box::pin(async move {
self.with_measured_conn(
DatabaseOperation::AbortShardSplit,
move |conn| -> DatabaseResult<AbortShardSplitStatus> {
// Clear the splitting state on parent shards
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.ne(new_shard_count.literal() as i32))
.set((splitting.eq(0),))
.execute(conn)
.await?;
.execute(conn)?;
// Parent shards are already gone: we cannot abort.
if updated == 0 {
@@ -980,12 +886,11 @@ impl Persistence {
diesel::delete(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(new_shard_count.literal() as i32))
.execute(conn)
.await?;
.execute(conn)?;
Ok(AbortShardSplitStatus::Aborted)
})
})
},
)
.await
}
@@ -1001,28 +906,25 @@ impl Persistence {
) -> DatabaseResult<()> {
use crate::schema::metadata_health::dsl::*;
let healthy_records = healthy_records.as_slice();
let unhealthy_records = unhealthy_records.as_slice();
self.with_measured_conn(DatabaseOperation::UpdateMetadataHealth, move |conn| {
Box::pin(async move {
self.with_measured_conn(
DatabaseOperation::UpdateMetadataHealth,
move |conn| -> DatabaseResult<_> {
diesel::insert_into(metadata_health)
.values(healthy_records)
.values(&healthy_records)
.on_conflict((tenant_id, shard_number, shard_count))
.do_update()
.set((healthy.eq(true), last_scrubbed_at.eq(now)))
.execute(conn)
.await?;
.execute(conn)?;
diesel::insert_into(metadata_health)
.values(unhealthy_records)
.values(&unhealthy_records)
.on_conflict((tenant_id, shard_number, shard_count))
.do_update()
.set((healthy.eq(false), last_scrubbed_at.eq(now)))
.execute(conn)
.await?;
.execute(conn)?;
Ok(())
})
})
},
)
.await
}
@@ -1031,13 +933,15 @@ impl Persistence {
pub(crate) async fn list_metadata_health_records(
&self,
) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
self.with_measured_conn(DatabaseOperation::ListMetadataHealth, move |conn| {
Box::pin(async {
Ok(crate::schema::metadata_health::table
.load::<MetadataHealthPersistence>(conn)
.await?)
})
})
self.with_measured_conn(
DatabaseOperation::ListMetadataHealth,
move |conn| -> DatabaseResult<_> {
Ok(
crate::schema::metadata_health::table
.load::<MetadataHealthPersistence>(conn)?,
)
},
)
.await
}
@@ -1049,15 +953,10 @@ impl Persistence {
use crate::schema::metadata_health::dsl::*;
self.with_measured_conn(
DatabaseOperation::ListMetadataHealthUnhealthy,
move |conn| {
Box::pin(async {
DatabaseResult::Ok(
crate::schema::metadata_health::table
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::metadata_health::table
.filter(healthy.eq(false))
.load::<MetadataHealthPersistence>(conn)
.await?,
)
})
.load::<MetadataHealthPersistence>(conn)?)
},
)
.await
@@ -1071,14 +970,15 @@ impl Persistence {
) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
use crate::schema::metadata_health::dsl::*;
self.with_measured_conn(DatabaseOperation::ListMetadataHealthOutdated, move |conn| {
Box::pin(async move {
self.with_measured_conn(
DatabaseOperation::ListMetadataHealthOutdated,
move |conn| -> DatabaseResult<_> {
let query = metadata_health.filter(last_scrubbed_at.lt(earlier));
let res = query.load::<MetadataHealthPersistence>(conn).await?;
let res = query.load::<MetadataHealthPersistence>(conn)?;
Ok(res)
})
})
},
)
.await
}
@@ -1086,13 +986,12 @@ impl Persistence {
/// It is an error for the table to contain more than one entry.
pub(crate) async fn get_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
let mut leader: Vec<ControllerPersistence> = self
.with_measured_conn(DatabaseOperation::GetLeader, move |conn| {
Box::pin(async move {
Ok(crate::schema::controllers::table
.load::<ControllerPersistence>(conn)
.await?)
})
})
.with_measured_conn(
DatabaseOperation::GetLeader,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::controllers::table.load::<ControllerPersistence>(conn)?)
},
)
.await?;
if leader.len() > 1 {
@@ -1115,33 +1014,26 @@ impl Persistence {
use crate::schema::controllers::dsl::*;
let updated = self
.with_measured_conn(DatabaseOperation::UpdateLeader, move |conn| {
let prev = prev.clone();
let new = new.clone();
Box::pin(async move {
.with_measured_conn(
DatabaseOperation::UpdateLeader,
move |conn| -> DatabaseResult<usize> {
let updated = match &prev {
Some(prev) => {
diesel::update(controllers)
Some(prev) => diesel::update(controllers)
.filter(address.eq(prev.address.clone()))
.filter(started_at.eq(prev.started_at))
.set((
address.eq(new.address.clone()),
started_at.eq(new.started_at),
))
.execute(conn)
.await?
}
None => {
diesel::insert_into(controllers)
.execute(conn)?,
None => diesel::insert_into(controllers)
.values(new.clone())
.execute(conn)
.await?
}
.execute(conn)?,
};
Ok(updated)
})
})
},
)
.await?;
if updated == 0 {
@@ -1156,13 +1048,12 @@ impl Persistence {
/// At startup, populate the list of nodes which our shards may be placed on
pub(crate) async fn list_safekeepers(&self) -> DatabaseResult<Vec<SafekeeperPersistence>> {
let safekeepers: Vec<SafekeeperPersistence> = self
.with_measured_conn(DatabaseOperation::ListNodes, move |conn| {
Box::pin(async move {
Ok(crate::schema::safekeepers::table
.load::<SafekeeperPersistence>(conn)
.await?)
})
})
.with_measured_conn(
DatabaseOperation::ListNodes,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::safekeepers::table.load::<SafekeeperPersistence>(conn)?)
},
)
.await?;
tracing::info!("list_safekeepers: loaded {} nodes", safekeepers.len());
@@ -1175,14 +1066,11 @@ impl Persistence {
id: i64,
) -> Result<SafekeeperPersistence, DatabaseError> {
use crate::schema::safekeepers::dsl::{id as id_column, safekeepers};
self.with_conn(move |conn| {
Box::pin(async move {
self.with_conn(move |conn| -> DatabaseResult<SafekeeperPersistence> {
Ok(safekeepers
.filter(id_column.eq(&id))
.select(SafekeeperPersistence::as_select())
.get_result(conn)
.await?)
})
.get_result(conn)?)
})
.await
}
@@ -1193,9 +1081,7 @@ impl Persistence {
) -> Result<(), DatabaseError> {
use crate::schema::safekeepers::dsl::*;
self.with_conn(move |conn| {
let record = record.clone();
Box::pin(async move {
self.with_conn(move |conn| -> DatabaseResult<()> {
let bind = record
.as_insert_or_update()
.map_err(|e| DatabaseError::Logical(format!("{e}")))?;
@@ -1205,8 +1091,7 @@ impl Persistence {
.on_conflict(id)
.do_update()
.set(&bind)
.execute(conn)
.await?;
.execute(conn)?;
if inserted_updated != 1 {
return Err(DatabaseError::Logical(format!(
@@ -1217,7 +1102,6 @@ impl Persistence {
Ok(())
})
})
.await
}
@@ -1228,8 +1112,7 @@ impl Persistence {
) -> Result<(), DatabaseError> {
use crate::schema::safekeepers::dsl::*;
self.with_conn(move |conn| {
Box::pin(async move {
self.with_conn(move |conn| -> DatabaseResult<()> {
#[derive(Insertable, AsChangeset)]
#[diesel(table_name = crate::schema::safekeepers)]
struct UpdateSkSchedulingPolicy<'a> {
@@ -1240,8 +1123,7 @@ impl Persistence {
let rows_affected = diesel::update(safekeepers.filter(id.eq(id_)))
.set(scheduling_policy.eq(scheduling_policy_))
.execute(conn)
.await?;
.execute(conn)?;
if rows_affected != 1 {
return Err(DatabaseError::Logical(format!(
@@ -1251,7 +1133,6 @@ impl Persistence {
Ok(())
})
})
.await
}
}