mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 03:20:36 +00:00
storcon: use the SchedulingPolicy enum in SafekeeperPersistence (#10897)
We don't want to serialize to/from string all the time, so use `SchedulingPolicy` in `SafekeeperPersistence` via the use of a wrapper. Stacked atop #10891
This commit is contained in:
@@ -4,6 +4,8 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use diesel::deserialize::{FromSql, FromSqlRow};
|
||||
use diesel::pg::Pg;
|
||||
use diesel::prelude::*;
|
||||
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
|
||||
use diesel_async::pooled_connection::bb8::Pool;
|
||||
@@ -1556,7 +1558,33 @@ pub(crate) struct SafekeeperPersistence {
|
||||
pub(crate) port: i32,
|
||||
pub(crate) http_port: i32,
|
||||
pub(crate) availability_zone_id: String,
|
||||
pub(crate) scheduling_policy: String,
|
||||
pub(crate) scheduling_policy: SkSchedulingPolicyFromSql,
|
||||
}
|
||||
|
||||
/// Wrapper struct around [`SkSchedulingPolicy`] because both it and [`FromSql`] are from foreign crates,
|
||||
/// and we don't want to make [`safekeeper_api`] depend on [`diesel`].
|
||||
#[derive(Serialize, Deserialize, FromSqlRow, Eq, PartialEq, Debug, Copy, Clone)]
|
||||
pub(crate) struct SkSchedulingPolicyFromSql(pub(crate) SkSchedulingPolicy);
|
||||
|
||||
impl From<SkSchedulingPolicy> for SkSchedulingPolicyFromSql {
|
||||
fn from(value: SkSchedulingPolicy) -> Self {
|
||||
SkSchedulingPolicyFromSql(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromSql<diesel::sql_types::VarChar, Pg> for SkSchedulingPolicyFromSql {
|
||||
fn from_sql(
|
||||
bytes: <Pg as diesel::backend::Backend>::RawValue<'_>,
|
||||
) -> diesel::deserialize::Result<Self> {
|
||||
let bytes = bytes.as_bytes();
|
||||
match core::str::from_utf8(bytes) {
|
||||
Ok(s) => match SkSchedulingPolicy::from_str(s) {
|
||||
Ok(policy) => Ok(SkSchedulingPolicyFromSql(policy)),
|
||||
Err(e) => Err(format!("can't parse: {e}").into()),
|
||||
},
|
||||
Err(e) => Err(format!("invalid UTF-8 for scheduling policy: {e}").into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SafekeeperPersistence {
|
||||
@@ -1572,14 +1600,10 @@ impl SafekeeperPersistence {
|
||||
port: upsert.port,
|
||||
http_port: upsert.http_port,
|
||||
availability_zone_id: upsert.availability_zone_id,
|
||||
scheduling_policy: String::from(scheduling_policy),
|
||||
scheduling_policy: SkSchedulingPolicyFromSql(scheduling_policy),
|
||||
}
|
||||
}
|
||||
pub(crate) fn as_describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
|
||||
let scheduling_policy =
|
||||
SkSchedulingPolicy::from_str(&self.scheduling_policy).map_err(|e| {
|
||||
DatabaseError::Logical(format!("can't construct SkSchedulingPolicy: {e:?}"))
|
||||
})?;
|
||||
Ok(SafekeeperDescribeResponse {
|
||||
id: NodeId(self.id as u64),
|
||||
region_id: self.region_id.clone(),
|
||||
@@ -1588,7 +1612,7 @@ impl SafekeeperPersistence {
|
||||
port: self.port,
|
||||
http_port: self.http_port,
|
||||
availability_zone_id: self.availability_zone_id.clone(),
|
||||
scheduling_policy,
|
||||
scheduling_policy: self.scheduling_policy.0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
|
||||
@@ -26,7 +25,7 @@ pub struct Safekeeper {
|
||||
|
||||
impl Safekeeper {
|
||||
pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self {
|
||||
let scheduling_policy = SkSchedulingPolicy::from_str(&skp.scheduling_policy).unwrap();
|
||||
let scheduling_policy = skp.scheduling_policy.0;
|
||||
Self {
|
||||
cancel,
|
||||
listen_http_addr: skp.host.clone(),
|
||||
@@ -55,7 +54,7 @@ impl Safekeeper {
|
||||
}
|
||||
pub(crate) fn set_scheduling_policy(&mut self, scheduling_policy: SkSchedulingPolicy) {
|
||||
self.scheduling_policy = scheduling_policy;
|
||||
self.skp.scheduling_policy = String::from(scheduling_policy);
|
||||
self.skp.scheduling_policy = scheduling_policy.into();
|
||||
}
|
||||
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
|
||||
pub(crate) async fn with_client_retries<T, O, F>(
|
||||
|
||||
Reference in New Issue
Block a user