mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 03:20:36 +00:00
WIP diesel for persistence
This commit is contained in:
@@ -20,6 +20,8 @@ tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
diesel = { version = "2.1.4", features = ["sqlite", "serde_json"] }
|
||||
|
||||
utils = { path = "../../libs/utils/" }
|
||||
control_plane = { path = ".." }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
0
control_plane/attachment_service/migrations/.keep
Normal file
0
control_plane/attachment_service/migrations/.keep
Normal file
@@ -0,0 +1 @@
|
||||
DROP TABLE tenant_shards;
|
||||
@@ -0,0 +1,11 @@
|
||||
CREATE TABLE tenant_shards (
|
||||
id INTEGER PRIMARY KEY NOT NULL,
|
||||
tenant_id VARCHAR NOT NULL,
|
||||
shard_number INTEGER NOT NULL,
|
||||
shard_count INTEGER NOT NULL,
|
||||
shard_stripe_size INTEGER NOT NULL,
|
||||
generation INTEGER NOT NULL,
|
||||
placement_policy VARCHAR NOT NULL,
|
||||
-- config is JSON encoded, opaque to the database.
|
||||
config TEXT NOT NULL
|
||||
);
|
||||
@@ -0,0 +1 @@
|
||||
DROP TABLE nodes;
|
||||
@@ -0,0 +1,10 @@
|
||||
CREATE TABLE nodes (
|
||||
node_id BIGINT PRIMARY KEY NOT NULL,
|
||||
|
||||
scheduling_policy VARCHAR NOT NULL,
|
||||
|
||||
listen_http_addr VARCHAR NOT NULL,
|
||||
listen_http_port INTEGER NOT NULL,
|
||||
listen_pg_addr VARCHAR NOT NULL,
|
||||
listen_pg_port INTEGER NOT NULL
|
||||
);
|
||||
@@ -3,8 +3,10 @@ use utils::seqwait::MonotonicCounter;
|
||||
mod compute_hook;
|
||||
pub mod http;
|
||||
mod node;
|
||||
mod persistence;
|
||||
mod reconciler;
|
||||
mod scheduler;
|
||||
mod schema;
|
||||
pub mod service;
|
||||
mod tenant_state;
|
||||
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
|
||||
use control_plane::attachment_service::NodeAvailability;
|
||||
use diesel::expression::AsExpression;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::NodeId;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -15,6 +17,22 @@ pub(crate) struct Node {
|
||||
pub(crate) listen_pg_port: u16,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Debug, AsExpression)]
|
||||
#[diesel(sql_type = diesel::sql_types::VarChar)]
|
||||
pub enum NodeSchedulingPolicy {
|
||||
// Normal, happy state
|
||||
Active,
|
||||
|
||||
// A newly added node: gradually move some work here.
|
||||
Filling,
|
||||
|
||||
// Do not schedule new work here, but leave configured locations in place.
|
||||
Pause,
|
||||
|
||||
// Do not schedule work here. Gracefully move work away, as resources allow.
|
||||
Draining,
|
||||
}
|
||||
|
||||
impl Node {
|
||||
pub(crate) fn base_url(&self) -> String {
|
||||
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
|
||||
|
||||
@@ -1,13 +1,72 @@
|
||||
use std::env;
|
||||
|
||||
use anyhow::Context;
|
||||
use diesel::prelude::*;
|
||||
use diesel::{Connection, SqliteConnection};
|
||||
use utils::generation::Generation;
|
||||
|
||||
use crate::node::NodeSchedulingPolicy;
|
||||
use crate::PlacementPolicy;
|
||||
|
||||
/// The attachment service does not store most of its state durably.
|
||||
///
|
||||
/// The essential things to store durably are:
|
||||
/// - generation numbers, as these must always advance monotonically to ensure data safety.
|
||||
/// - PlacementPolicy and TenantConfig, as these are set externally.
|
||||
struct Persistence {}
|
||||
|
||||
/// Parts of TenantState that are stored durably
|
||||
struct TenantPersistence {
|
||||
pub(crate) generation: Generation,
|
||||
pub(crate) policy: PlacementPolicy,
|
||||
pub(crate) config: TenantConfig,
|
||||
/// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
|
||||
/// - Node's scheduling policies, as the source of truth for these is something external.
|
||||
///
|
||||
/// Other things we store durably as an implementation detail:
|
||||
/// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
|
||||
/// but it is operationally simpler to make this service the authority for which nodes
|
||||
/// it talks to.
|
||||
struct Persistence {
|
||||
database_url: String,
|
||||
}
|
||||
|
||||
impl Persistence {
|
||||
fn new() -> Self {
|
||||
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||
Self { database_url }
|
||||
}
|
||||
|
||||
async fn insert_node(&self, node: NodePersistence) -> anyhow::Result<()> {
|
||||
let conn = SqliteConnection::establish(&self.database_url).context("Opening database")?;
|
||||
diesel::insert_into(crate::schema::nodes::table)
|
||||
.values(&node)
|
||||
.get_result(&mut conn)
|
||||
.into()
|
||||
}
|
||||
|
||||
async fn list_nodes(&self) -> anyhow::Result<Vec<NodePersistence>> {
|
||||
let mut conn =
|
||||
SqliteConnection::establish(&self.database_url).context("Opening database")?;
|
||||
|
||||
crate::schema::nodes::dsl::nodes
|
||||
.select(NodePersistence::as_select())
|
||||
.load(&mut conn)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
|
||||
#[derive(Selectable)]
|
||||
#[diesel(table_name = crate::schema::tenant_shards)]
|
||||
pub(crate) struct TenantShardPersistence {
|
||||
pub(crate) generation: Generation,
|
||||
pub(crate) placement_policy: PlacementPolicy,
|
||||
pub(crate) config: serde_json::Value,
|
||||
}
|
||||
|
||||
/// Parts of [`crate::node::Node`] that are stored durably
|
||||
#[derive(Selectable, Insertable)]
|
||||
#[diesel(table_name = crate::schema::nodes)]
|
||||
pub(crate) struct NodePersistence {
|
||||
pub(crate) node_id: i64,
|
||||
|
||||
pub(crate) scheduling_policy: NodeSchedulingPolicy,
|
||||
pub(crate) listen_http_addr: String,
|
||||
pub(crate) listen_http_port: i32,
|
||||
|
||||
pub(crate) listen_pg_addr: String,
|
||||
pub(crate) listen_pg_port: i32,
|
||||
}
|
||||
|
||||
27
control_plane/attachment_service/src/schema.rs
Normal file
27
control_plane/attachment_service/src/schema.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
// @generated automatically by Diesel CLI.
|
||||
|
||||
diesel::table! {
|
||||
nodes (node_id) {
|
||||
node_id -> BigInt,
|
||||
scheduling_policy -> Text,
|
||||
listen_http_addr -> Text,
|
||||
listen_http_port -> Integer,
|
||||
listen_pg_addr -> Text,
|
||||
listen_pg_port -> Integer,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
tenant_shards (id) {
|
||||
id -> Integer,
|
||||
tenant_id -> Text,
|
||||
shard_number -> Integer,
|
||||
shard_count -> Integer,
|
||||
shard_stripe_size -> Integer,
|
||||
generation -> Integer,
|
||||
placement_policy -> Text,
|
||||
config -> Text,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::allow_tables_to_appear_in_same_query!(nodes, tenant_shards,);
|
||||
@@ -26,6 +26,9 @@ pub(crate) struct TenantState {
|
||||
|
||||
pub(crate) shard: ShardIdentity,
|
||||
|
||||
// Runtime only: sequence used to coordinate when updating this object while
|
||||
// with background reconcilers may be running. A reconciler runs to a particular
|
||||
// sequence.
|
||||
pub(crate) sequence: Sequence,
|
||||
|
||||
// Latest generation number: next time we attach, increment this
|
||||
@@ -45,6 +48,8 @@ pub(crate) struct TenantState {
|
||||
// with `Self::reconcile`.
|
||||
pub(crate) observed: ObservedState,
|
||||
|
||||
// Tenant configuration, passed through opaquely to the pageserver. Identical
|
||||
// for all shards in a tenant.
|
||||
pub(crate) config: TenantConfig,
|
||||
|
||||
/// If a reconcile task is currently in flight, it may be joined here (it is
|
||||
|
||||
@@ -127,18 +127,13 @@ impl FromStr for NodeAvailability {
|
||||
}
|
||||
}
|
||||
|
||||
/// FIXME: this is a duplicate of the type in the attachment_service crate, because the
|
||||
/// type needs to be defined with diesel traits in there.
|
||||
#[derive(Serialize, Deserialize, Clone, Copy)]
|
||||
pub enum NodeSchedulingPolicy {
|
||||
// Normal, happy state
|
||||
Active,
|
||||
|
||||
// A newly added node: gradually move some work here.
|
||||
Filling,
|
||||
|
||||
// Do not schedule new work here, but leave configured locations in place.
|
||||
Pause,
|
||||
|
||||
// Do not schedule work here. Gracefully move work away, as resources allow.
|
||||
Draining,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user