Implement live reconfiguration in the compute_ctl

Accept spec in JSON format and request compute reconfiguration from
the configurator thread. If anything goes wrong after we set the
compute state to `ConfigurationPending` and / or sent spec to the
configurator thread, we basically leave compute in the potentially
wrong state. That said, it's control-plane's responsibility to
watch compute state after reconfiguration request and to clean
restart it in case of errors.

It still lacks ability of starting up without spec and some validations,
i.e. that live reconfiguration should be only available with
`--compute-id` and `--control-plane-uri` options.

Otherwise, it works fine and could be tested by running `compute_ctl`
locally, then sending it a new spec:
```shell
curl -d "$(cat ./compute-spec-new.json)" http://localhost:3080/spec
```

We have one configurator thread and async http server, so generally we
have single consumer - multiple producers pattern here. That's why we
use `mpsc` channel, not `tokio::sync::watch`. Actually, concurrency of
producers is limited to one due to code logic, but we still need an
ability to potentially pass `Sender` to several threads.

Next, we use async `hyper` + `tokio` http server, but all the other code
is completely synchronous. So we need to send data from async to sync,
that's why we use `mpsc::unbounded_channel` here, not `mpsc::channel`.
It doesn't make much sense to rewrite all code to async now, but we can
consider doing this in the future.

I think that a combination of `Mutex` and `CondVar` would work just fine
too, but as we already have `tokio`, I decided to try something from it.
This commit is contained in:
Alexey Kondratov
2023-03-31 15:52:58 +02:00
committed by Heikki Linnakangas
parent 1f2946af17
commit 66dd3f8ca5
8 changed files with 210 additions and 28 deletions

View File

@@ -14,12 +14,24 @@ pub struct ComputeState {
pub error: Option<String>,
}
#[derive(Serialize, Clone, Copy, PartialEq, Eq)]
#[derive(Serialize, Clone, Copy, PartialEq, Eq, Debug)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
// Spec wasn't provided as start, waiting for it to be
// provided by control-plane.
WaitingSpec,
// Compute node has initial spec and is starting up.
Init,
// Compute is configured and running.
Running,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.
Failed,
// Control-plane requested reconfiguration.
ConfigurationPending,
// New spec is being applied.
Reconfiguration,
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>

View File

@@ -12,7 +12,7 @@ pub type PgIdent = String;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct ComputeSpec {
pub format_version: f32,
pub timestamp: String,
@@ -26,7 +26,7 @@ pub struct ComputeSpec {
pub startup_tracing_context: Option<HashMap<String, String>>,
}
#[derive(Clone, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct Cluster {
pub cluster_id: String,
pub name: String,
@@ -42,7 +42,7 @@ pub struct Cluster {
/// - DROP ROLE
/// - ALTER ROLE name RENAME TO new_name
/// - ALTER DATABASE name RENAME TO new_name
#[derive(Clone, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct DeltaOp {
pub action: String,
pub name: PgIdent,
@@ -51,7 +51,7 @@ pub struct DeltaOp {
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
@@ -60,7 +60,7 @@ pub struct Role {
/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
@@ -70,7 +70,7 @@ pub struct Database {
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,