Allow starting compute_ctl without spec

With this commit one can start compute with something like
```shell
cargo run --bin compute_ctl -- -i no-compute \
 -p http://localhost:9095 \
 -D compute_pgdata \
 -C "postgresql://cloud_admin@127.0.0.1:5434/postgres" \
 -b ./pg_install/v15/bin/postgres
```
and it will hang waiting for spec.

Then send one spec
```shell
curl -d "$(cat ./compute-spec.json)" http://localhost:3080/spec
```
Postgres will be started and configured.

Then reconfigure it with
```shell
curl -d "$(cat ./compute-spec-new.json)" http://localhost:3080/spec
```

Most of safeguards and comments are added. Some polishing especially
around HTTP API is still needed.
This commit is contained in:
Alexey Kondratov
2023-03-31 19:38:49 +02:00
parent 17d6c568bf
commit f0359fdbc0
4 changed files with 187 additions and 95 deletions

View File

@@ -64,7 +64,7 @@ fn main() -> Result<()> {
let connstr = matches
.get_one::<String>("connstr")
.expect("Postgres connection string is required");
let spec = matches.get_one::<String>("spec");
let spec_json = matches.get_one::<String>("spec");
let spec_path = matches.get_one::<String>("spec-path");
let compute_id = matches.get_one::<String>("compute-id");
@@ -73,40 +73,81 @@ fn main() -> Result<()> {
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
let spec: ComputeSpec = match spec {
let mut spec = Default::default();
let mut spec_set = false;
let mut live_config_allowed = false;
match spec_json {
// First, try to get cluster spec from the cli argument
Some(json) => serde_json::from_str(json)?,
Some(json) => {
spec = serde_json::from_str(json)?;
spec_set = true;
}
None => {
// Second, try to read it from the file if path is provided
if let Some(sp) = spec_path {
let path = Path::new(sp);
let file = File::open(path)?;
serde_json::from_reader(file)?
spec = serde_json::from_reader(file)?;
spec_set = true;
} else if let Some(id) = compute_id {
if let Some(cp_base) = control_plane_uri {
let cp_uri = format!("{cp_base}/management/api/v1/{id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
Err(_) => "".to_string(),
};
reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?
live_config_allowed = true;
if let Ok(s) = get_spec_from_control_plane(cp_base, id) {
spec = s;
spec_set = true;
}
} else {
panic!(
"must specify --control-plane-uri \"{:#?}\" and --compute-id \"{:#?}\"",
control_plane_uri, compute_id
);
panic!("must specify both --control-plane-uri and --compute-id or none");
}
} else {
panic!("compute spec should be provided via --spec or --spec-path argument");
panic!(
"compute spec should be provided by one of the following ways: \
--spec OR --spec-path OR --control-plane-uri and --compute-id"
);
}
}
};
// We have one configurator thread and async http server, so generally we
// have single consumer - multiple producers pattern here. That's why we use
// `mpsc` channel, not `tokio::sync::watch`. Actually, concurrency of
// producers is limited to one due to code logic, but we still need to
// pass `Sender` to several threads.
//
// Next, we use async `hyper` + `tokio` http server, but all the other code
// is completely synchronous. So we need to send data from async to sync,
// that's why we use `mpsc::unbounded_channel` here, not `mpsc::channel`.
// It doesn't make much sense to rewrite all code to async now, but we can
// consider doing this in the future. Unbound should not sound scary here,
// as again, only one unprocessed spec could be sent to channel at any
// given moment.
let (spec_tx, mut spec_rx) = mpsc::unbounded_channel::<ComputeSpec>();
let compute_state = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
live_config_allowed,
metrics: ComputeMetrics::default(),
state: RwLock::new(ComputeState::new()),
};
let compute = Arc::new(compute_state);
// Launch http service first, so we were able to serve control-plane
// requests, while configuration is still in progress.
let _http_handle =
launch_http_server(&compute, spec_tx).expect("cannot launch http endpoint thread");
if !spec_set {
info!("no compute spec provided, waiting");
if let Some(s) = spec_rx.blocking_recv() {
spec = s;
} else {
panic!("no compute spec received");
}
}
// Extract OpenTelemetry context for the startup actions from the spec, and
// attach it to the current tracing context.
//
@@ -147,38 +188,17 @@ fn main() -> Result<()> {
.find("neon.timeline_id")
.expect("tenant id should be provided");
let compute_state = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
spec,
tenant,
timeline,
pageserver_connstr,
storage_auth_token,
metrics: ComputeMetrics::default(),
state: RwLock::new(ComputeState::new()),
};
let compute = Arc::new(compute_state);
// We got all we need, fill in the state.
let mut state = compute.state.write().unwrap();
state.spec = spec;
state.pageserver_connstr = pageserver_connstr;
state.storage_auth_token = storage_auth_token;
state.tenant = tenant;
state.timeline = timeline;
state.status = ComputeStatus::Init;
drop(state);
// We have one configurator thread and async http server, so generally we
// single consumer - multiple producers pattern here. That's why we use
// `mpsc` channel, not `tokio::sync::watch`. Actually, concurrency of
// producers is limited to one due to code logic, but we still need to
// pass `Sender` to several threads.
//
// Next, we use async `hyper` + `tokio` http server, but all the other code
// is completely synchronous. So we need to send data from async to sync,
// that's why we use `mpsc::unbounded_channel` here, not `mpsc::channel`.
// It doesn't make much sense to rewrite all code to async now, but we can
// consider doing this in the future.
let (spec_tx, spec_rx) = mpsc::unbounded_channel::<ComputeSpec>();
// Launch service threads first, so we were able to serve availability
// requests, while configuration is still in progress.
let _http_handle =
launch_http_server(&compute, spec_tx).expect("cannot launch http endpoint thread");
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
let _configurator_handle =
launch_configurator(&compute, spec_rx).expect("cannot launch configurator thread");
@@ -280,7 +300,7 @@ fn cli() -> clap::Command {
Arg::new("control-plane-uri")
.short('p')
.long("control-plane-uri")
.value_name("CONTROL_PLANE"),
.value_name("CONTROL_PLANE_API_BASE_URI"),
)
}

View File

@@ -41,12 +41,15 @@ pub struct ComputeNode {
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
pub spec: ComputeSpec,
pub tenant: String,
pub timeline: String,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,
pub metrics: ComputeMetrics,
// We only allow live re- / configuration of the compute node if
// it uses 'pull model', i.e. it can go to control-plane and fetch
// the latest configuration. Otherwise, there could be a case:
// - we start compute with some spec provided as argument
// - we push new spec and it does reconfiguration
// - but then something happens and compute pod / VM is destroyed,
// so k8s controller starts it again with the **old** spec
pub live_config_allowed: bool,
/// Volatile part of the `ComputeNode` so should be used under `RwLock`
/// to allow HTTP API server to serve status requests, while configuration
/// is in progress.
@@ -60,7 +63,7 @@ where
x.to_rfc3339().serialize(s)
}
#[derive(Serialize)]
#[derive(Serialize, Clone)]
#[serde(rename_all = "snake_case")]
pub struct ComputeState {
pub status: ComputeStatus,
@@ -68,14 +71,27 @@ pub struct ComputeState {
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
#[serde(skip_serializing)]
pub spec: ComputeSpec,
pub tenant: String,
pub timeline: String,
#[serde(skip_serializing)]
pub pageserver_connstr: String,
#[serde(skip_serializing)]
pub storage_auth_token: Option<String>,
}
impl ComputeState {
pub fn new() -> Self {
Self {
status: ComputeStatus::Init,
status: ComputeStatus::WaitingSpec,
last_active: Utc::now(),
error: None,
spec: ComputeSpec::default(),
tenant: String::new(),
timeline: String::new(),
pageserver_connstr: String::new(),
storage_auth_token: None,
}
}
}
@@ -136,15 +152,15 @@ impl ComputeNode {
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip(self))]
fn get_basebackup(&self, lsn: &str) -> Result<()> {
#[instrument(skip(self, compute_state))]
fn get_basebackup(&self, compute_state: &ComputeState, lsn: &str) -> Result<()> {
let start_time = Utc::now();
let mut config = postgres::Config::from_str(&self.pageserver_connstr)?;
let mut config = postgres::Config::from_str(&compute_state.pageserver_connstr)?;
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
if let Some(storage_auth_token) = &self.storage_auth_token {
if let Some(storage_auth_token) = &compute_state.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token);
} else {
@@ -153,8 +169,14 @@ impl ComputeNode {
let mut client = config.connect(NoTls)?;
let basebackup_cmd = match lsn {
"0/0" => format!("basebackup {} {}", &self.tenant, &self.timeline), // First start of the compute
_ => format!("basebackup {} {} {}", &self.tenant, &self.timeline, lsn),
"0/0" => format!(
"basebackup {} {}",
&compute_state.tenant, &compute_state.timeline
), // First start of the compute
_ => format!(
"basebackup {} {} {}",
&compute_state.tenant, &compute_state.timeline, lsn
),
};
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
@@ -181,14 +203,14 @@ impl ComputeNode {
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip(self))]
fn sync_safekeepers(&self) -> Result<String> {
#[instrument(skip(self, storage_auth_token))]
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<String> {
let start_time = Utc::now();
let sync_handle = Command::new(&self.pgbin)
.args(["--sync-safekeepers"])
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
.envs(if let Some(storage_auth_token) = &self.storage_auth_token {
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
@@ -229,9 +251,9 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self))]
pub fn prepare_pgdata(&self) -> Result<()> {
let spec = &self.spec;
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let spec = &compute_state.spec;
let pgdata_path = Path::new(&self.pgdata);
// Remove/create an empty pgdata directory and put configuration there.
@@ -240,18 +262,18 @@ impl ComputeNode {
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers()
.sync_safekeepers(compute_state.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
info!(
"getting basebackup@{} from pageserver {}",
lsn, &self.pageserver_connstr
lsn, &compute_state.pageserver_connstr
);
self.get_basebackup(&lsn).with_context(|| {
self.get_basebackup(compute_state, &lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &self.pageserver_connstr
lsn, &compute_state.pageserver_connstr
)
})?;
@@ -264,13 +286,16 @@ impl ComputeNode {
/// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit.
#[instrument(skip(self))]
pub fn start_postgres(&self) -> Result<std::process::Child> {
pub fn start_postgres(
&self,
storage_auth_token: Option<String>,
) -> Result<std::process::Child> {
let pgdata_path = Path::new(&self.pgdata);
// Run postgres as a child process.
let mut pg = Command::new(&self.pgbin)
.args(["-D", &self.pgdata])
.envs(if let Some(storage_auth_token) = &self.storage_auth_token {
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
@@ -284,8 +309,8 @@ impl ComputeNode {
}
/// Do initial configuration of the already started Postgres.
#[instrument(skip(self))]
pub fn apply_config(&self) -> Result<()> {
#[instrument(skip(self, compute_state))]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
// If connection fails,
// it may be the old node with `zenith_admin` superuser.
//
@@ -316,19 +341,19 @@ impl ComputeNode {
};
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&self.spec, &mut client)?;
handle_databases(&self.spec, &mut client)?;
handle_role_deletions(&self.spec, self.connstr.as_str(), &mut client)?;
handle_grants(&self.spec, self.connstr.as_str(), &mut client)?;
handle_roles(&compute_state.spec, &mut client)?;
handle_databases(&compute_state.spec, &mut client)?;
handle_role_deletions(&compute_state.spec, self.connstr.as_str(), &mut client)?;
handle_grants(&compute_state.spec, self.connstr.as_str(), &mut client)?;
create_writability_check_data(&mut client)?;
handle_extensions(&self.spec, &mut client)?;
handle_extensions(&compute_state.spec, &mut client)?;
// 'Close' connection
drop(client);
info!(
"finished configuration of compute for project {}",
self.spec.cluster.cluster_id
compute_state.spec.cluster.cluster_id
);
Ok(())
@@ -376,21 +401,22 @@ impl ComputeNode {
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
let compute_state = self.state.read().unwrap().clone();
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
self.spec.cluster.cluster_id,
self.spec.operation_uuid.as_ref().unwrap(),
self.tenant,
self.timeline,
compute_state.spec.cluster.cluster_id,
compute_state.spec.operation_uuid.as_ref().unwrap(),
compute_state.tenant,
compute_state.timeline,
);
self.prepare_pgdata()?;
self.prepare_pgdata(&compute_state)?;
let start_time = Utc::now();
let pg = self.start_postgres()?;
let pg = self.start_postgres(compute_state.storage_auth_token.clone())?;
self.apply_config()?;
self.apply_config(&compute_state)?;
let startup_end_time = Utc::now();
self.metrics.config_ms.store(

View File

@@ -44,12 +44,29 @@ async fn routes(
// Collect Postgres current usage insights
(&Method::GET, "/insights") => {
info!("serving /insights GET request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!("compute is not running, current status: {:?}", status);
error!(msg);
return Response::new(Body::from(msg));
}
let insights = compute.collect_insights().await;
Response::new(Body::from(insights))
}
(&Method::POST, "/check_writability") => {
info!("serving /check_writability POST request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for check_writability request: {:?}",
status
);
error!(msg);
return Response::new(Body::from(msg));
}
let res = crate::checker::check_writability(compute).await;
match res {
Ok(_) => Response::new(Body::from("true")),
@@ -76,9 +93,15 @@ async fn routes(
// watch compute state after reconfiguration request and to clean
// restart in case of errors.
//
// TODO: Errors should be in JSON format
// TODO: Errors should be in JSON format with proper status codes.
(&Method::POST, "/spec") => {
info!("serving /spec POST request");
if !compute.live_config_allowed {
let msg = "live reconfiguration is not allowed for this compute node";
error!(msg);
return Response::new(Body::from(msg));
}
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
if let Ok(spec) = serde_json::from_str::<ComputeSpec>(&spec_raw) {

View File

@@ -14,7 +14,7 @@ use crate::pg_helpers::*;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Deserialize, Debug)]
#[derive(Clone, Deserialize, Debug, Default)]
pub struct ComputeSpec {
pub format_version: f32,
pub timestamp: String,
@@ -30,7 +30,7 @@ pub struct ComputeSpec {
/// Cluster state seen from the perspective of the external tools
/// like Rails web console.
#[derive(Clone, Deserialize, Debug)]
#[derive(Clone, Deserialize, Debug, Default)]
pub struct Cluster {
pub cluster_id: String,
pub name: String,
@@ -53,6 +53,29 @@ pub struct DeltaOp {
pub new_name: Option<PgIdent>,
}
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
/// env variable is set, it will be used for authorization.
pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeSpec> {
let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
Err(_) => "".to_string(),
};
info!("getting spec from control plane: {}", cp_uri);
// TODO: check the response. We should distinguish cases when it's
// - network error, then retry
// - no spec for compute yet, then wait
// - compute id is unknown or any other error, then bail out
let spec = reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?;
Ok(spec)
}
/// It takes cluster specification and does the following:
/// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file.
/// - Update `pg_hba.conf` to allow external connections.