Compare commits

..

1 Commits

Author SHA1 Message Date
Vlad Lazar
89231e3f99 storcon: squash all migrations into one
Problem

Neon and Hadron deployments have the same database schema, but different
migration histories. Some transactions have different identifiers too.
If we don't do anything about it, then the storage controller would fail
to apply the merged set of transactions.

Summary of Changes

We squash all migrations into a single one. If the schema already
matches, then the new transaction gets applie without doing anything.
For new regions, this migration will bootstrap the database schema.

This should be merged in both neon and hadron codebases.

Note that after deploying this change, the `__diesel_schema_migrations`
table will still contain entries for the old pre-squash transactions.
This is fine because diesel only considers the tranasactions embedded
in the repo for application. Once we are certain that we are not going
to roll back, we can clean up the `__diesel_schema_migrations` tables in
prod, but this is manual and error prone, so I'd skip it.

Rolling back

Rolling back to a previous deployment which embeds the non-squashed
transactions is safe. The old transactions are still present in
`__diesel_schema_migrations` (i.e. considered applied), so no migrations
will be run, so no migrations will be run.

Note that this assumes that all transactions are applied and squashed into
the new migration before deployment.
2025-07-31 13:39:27 +01:00
67 changed files with 658 additions and 1031 deletions

View File

@@ -300,9 +300,7 @@ jobs:
benchmarks:
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `deploy` in PRs
# if: github.ref_name == 'main' || (contains(github.event.pull_request.labels.*.name, 'run-benchmarks') && !failure() && !cancelled())
# moved to another repo
if: false
if: github.ref_name == 'main' || (contains(github.event.pull_request.labels.*.name, 'run-benchmarks') && !failure() && !cancelled())
needs: [ check-permissions, build-build-tools-image, get-benchmarks-durations, deploy ]
permissions:
id-token: write # aws-actions/configure-aws-credentials

View File

@@ -2780,7 +2780,7 @@ LIMIT 100",
// 4. We start again and try to prewarm with the state from 2. instead of the previous complete state
if matches!(
prewarm_state,
LfcPrewarmState::Completed { .. }
LfcPrewarmState::Completed
| LfcPrewarmState::NotPrewarmed
| LfcPrewarmState::Skipped
) {

View File

@@ -7,11 +7,19 @@ use http::StatusCode;
use reqwest::Client;
use std::mem::replace;
use std::sync::Arc;
use std::time::Instant;
use tokio::{io::AsyncReadExt, select, spawn};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
#[derive(serde::Serialize, Default)]
pub struct LfcPrewarmStateWithProgress {
#[serde(flatten)]
base: LfcPrewarmState,
total: i32,
prewarmed: i32,
skipped: i32,
}
/// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
struct EndpointStoragePair {
url: String,
@@ -20,7 +28,7 @@ struct EndpointStoragePair {
const KEY: &str = "lfc_state";
impl EndpointStoragePair {
/// endpoint_id is set to None while prewarming from other endpoint, see compute_promote.rs
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
/// If not None, takes precedence over pspec.spec.endpoint_id
fn from_spec_and_endpoint(
pspec: &crate::compute::ParsedSpec,
@@ -46,8 +54,36 @@ impl EndpointStoragePair {
}
impl ComputeNode {
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmState {
self.state.lock().unwrap().lfc_prewarm_state.clone()
// If prewarm failed, we want to get overall number of segments as well as done ones.
// However, this function should be reliable even if querying postgres failed.
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
info!("requesting LFC prewarm state from postgres");
let mut state = LfcPrewarmStateWithProgress::default();
{
state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
}
let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
Ok(client) => client,
Err(err) => {
error!(%err, "connecting to postgres");
return state;
}
};
let row = match client
.query_one("select * from neon.get_prewarm_info()", &[])
.await
{
Ok(row) => row,
Err(err) => {
error!(%err, "querying LFC prewarm status");
return state;
}
};
state.total = row.try_get(0).unwrap_or_default();
state.prewarmed = row.try_get(1).unwrap_or_default();
state.skipped = row.try_get(2).unwrap_or_default();
state
}
pub fn lfc_offload_state(&self) -> LfcOffloadState {
@@ -97,6 +133,7 @@ impl ComputeNode {
}
/// Request LFC state from endpoint storage and load corresponding pages into Postgres.
/// Returns a result with `false` if the LFC state is not found in endpoint storage.
async fn prewarm_impl(
&self,
from_endpoint: Option<String>,
@@ -111,7 +148,6 @@ impl ComputeNode {
fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
info!(%url, "requesting LFC state from endpoint storage");
let mut now = Instant::now();
let request = Client::new().get(&url).bearer_auth(storage_token);
let response = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
@@ -124,8 +160,6 @@ impl ComputeNode {
StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
status => bail!("{status} querying endpoint storage"),
}
let state_download_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let mut uncompressed = Vec::new();
let lfc_state = select! {
@@ -140,8 +174,6 @@ impl ComputeNode {
read = decoder.read_to_end(&mut uncompressed) => read
}
.context("decoding LFC state")?;
let uncompress_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
@@ -164,34 +196,15 @@ impl ComputeNode {
}
.context("loading LFC state into postgres")
.map(|_| ())?;
let prewarm_time_ms = now.elapsed().as_millis() as u32;
let row = client
.query_one("select * from neon.get_prewarm_info()", &[])
.await
.context("querying prewarm info")?;
let total = row.try_get(0).unwrap_or_default();
let prewarmed = row.try_get(1).unwrap_or_default();
let skipped = row.try_get(2).unwrap_or_default();
Ok(LfcPrewarmState::Completed {
total,
prewarmed,
skipped,
state_download_time_ms,
uncompress_time_ms,
prewarm_time_ms,
})
Ok(LfcPrewarmState::Completed)
}
/// If offload request is ongoing, return false, true otherwise
pub fn offload_lfc(self: &Arc<Self>) -> bool {
{
let state = &mut self.state.lock().unwrap().lfc_offload_state;
if matches!(
replace(state, LfcOffloadState::Offloading),
LfcOffloadState::Offloading
) {
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
return false;
}
}
@@ -203,10 +216,7 @@ impl ComputeNode {
pub async fn offload_lfc_async(self: &Arc<Self>) {
{
let state = &mut self.state.lock().unwrap().lfc_offload_state;
if matches!(
replace(state, LfcOffloadState::Offloading),
LfcOffloadState::Offloading
) {
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
return;
}
}
@@ -224,6 +234,7 @@ impl ComputeNode {
LfcOffloadState::Failed { error }
}
};
self.state.lock().unwrap().lfc_offload_state = state;
}
@@ -231,7 +242,6 @@ impl ComputeNode {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from Postgres");
let mut now = Instant::now();
let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
@@ -245,36 +255,25 @@ impl ComputeNode {
info!(%url, "empty LFC state, not exporting");
return Ok(LfcOffloadState::Skipped);
};
let state_query_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let mut compressed = Vec::new();
ZstdEncoder::new(state)
.read_to_end(&mut compressed)
.await
.context("compressing LFC state")?;
let compress_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let compressed_len = compressed.len();
info!(%url, "downloaded LFC state, compressed size {compressed_len}");
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
let request = Client::new().put(url).bearer_auth(token).body(compressed);
let response = request
.send()
.await
.context("writing to endpoint storage")?;
let state_upload_time_ms = now.elapsed().as_millis() as u32;
let status = response.status();
if status != StatusCode::OK {
bail!("request to endpoint storage failed: {status}");
match request.send().await {
Ok(res) if res.status() == StatusCode::OK => Ok(LfcOffloadState::Completed),
Ok(res) => bail!(
"Request to endpoint storage failed with status: {}",
res.status()
),
Err(err) => Err(err).context("writing to endpoint storage"),
}
Ok(LfcOffloadState::Completed {
compress_time_ms,
state_query_time_ms,
state_upload_time_ms,
})
}
pub fn cancel_prewarm(self: &Arc<Self>) {

View File

@@ -1,24 +1,32 @@
use crate::compute::ComputeNode;
use anyhow::{Context, bail};
use anyhow::{Context, Result, bail};
use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
use std::time::Instant;
use compute_api::spec::ComputeMode;
use itertools::Itertools;
use std::collections::HashMap;
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use tracing::info;
use utils::lsn::Lsn;
impl ComputeNode {
/// Returns only when promote fails or succeeds. If http client calling this function
/// disconnects, this does not stop promotion, and subsequent calls block until promote finishes.
/// Returns only when promote fails or succeeds. If a network error occurs
/// and http client disconnects, this does not stop promotion, and subsequent
/// calls block until promote finishes.
/// Called by control plane on secondary after primary endpoint is terminated
/// Has a failpoint "compute-promotion"
pub async fn promote(self: &std::sync::Arc<Self>, cfg: PromoteConfig) -> PromoteState {
let this = self.clone();
let promote_fn = async move || match this.promote_impl(cfg).await {
Ok(state) => state,
Err(err) => {
tracing::error!(%err, "promoting replica");
let error = format!("{err:#}");
PromoteState::Failed { error }
pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
let cloned = self.clone();
let promote_fn = async move || {
let Err(err) = cloned.promote_impl(cfg).await else {
return PromoteState::Completed;
};
tracing::error!(%err, "promoting");
PromoteState::Failed {
error: format!("{err:#}"),
}
};
let start_promotion = || {
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
tokio::spawn(async move { tx.send(promote_fn().await) });
@@ -26,31 +34,36 @@ impl ComputeNode {
};
let mut task;
// promote_impl locks self.state so we need to unlock it before calling task.changed()
// self.state is unlocked after block ends so we lock it in promote_impl
// and task.changed() is reached
{
let promote_state = &mut self.state.lock().unwrap().promote_state;
task = promote_state.get_or_insert_with(start_promotion).clone()
}
if task.changed().await.is_err() {
let error = "promote sender dropped".to_string();
return PromoteState::Failed { error };
task = self
.state
.lock()
.unwrap()
.promote_state
.get_or_insert_with(start_promotion)
.clone()
}
task.changed().await.expect("promote sender dropped");
task.borrow().clone()
}
async fn promote_impl(&self, cfg: PromoteConfig) -> anyhow::Result<PromoteState> {
async fn promote_impl(&self, mut cfg: PromoteConfig) -> Result<()> {
{
let state = self.state.lock().unwrap();
let mode = &state.pspec.as_ref().unwrap().spec.mode;
if *mode != compute_api::spec::ComputeMode::Replica {
bail!("compute mode \"{}\" is not replica", mode.to_type_str());
if *mode != ComputeMode::Replica {
bail!("{} is not replica", mode.to_type_str());
}
// we don't need to query Postgres so not self.lfc_prewarm_state()
match &state.lfc_prewarm_state {
status @ (LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming) => {
bail!("compute {status}")
LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => {
bail!("prewarm not requested or pending")
}
LfcPrewarmState::Failed { error } => {
tracing::warn!(%error, "compute prewarm failed")
tracing::warn!(%error, "replica prewarm failed")
}
_ => {}
}
@@ -59,10 +72,9 @@ impl ComputeNode {
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let mut now = Instant::now();
let primary_lsn = cfg.wal_flush_lsn;
let mut standby_lsn = utils::lsn::Lsn::INVALID;
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
const RETRIES: i32 = 20;
for i in 0..=RETRIES {
let row = client
@@ -70,18 +82,16 @@ impl ComputeNode {
.await
.context("getting last replay lsn")?;
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
standby_lsn = lsn.into();
if standby_lsn >= primary_lsn {
last_wal_replay_lsn = lsn.into();
if last_wal_replay_lsn >= primary_lsn {
break;
}
info!(%standby_lsn, %primary_lsn, "catching up, try {i}");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
sleep(Duration::from_secs(1)).await;
}
if standby_lsn < primary_lsn {
if last_wal_replay_lsn < primary_lsn {
bail!("didn't catch up with primary in {RETRIES} retries");
}
let lsn_wait_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
// using $1 doesn't work with ALTER SYSTEM SET
let safekeepers_sql = format!(
@@ -92,33 +102,27 @@ impl ComputeNode {
.query(&safekeepers_sql, &[])
.await
.context("setting safekeepers")?;
client
.query(
"ALTER SYSTEM SET synchronous_standby_names=walproposer",
&[],
)
.await
.context("setting synchronous_standby_names")?;
client
.query("SELECT pg_catalog.pg_reload_conf()", &[])
.await
.context("reloading postgres config")?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-promotion", |_| bail!(
"compute-promotion failpoint"
));
fail::fail_point!("compute-promotion", |_| {
bail!("promotion configured to fail because of a failpoint")
});
let row = client
.query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
.await
.context("pg_promote")?;
if !row.get::<usize, bool>(0) {
bail!("pg_promote() failed");
bail!("pg_promote() returned false");
}
let pg_promote_time_ms = now.elapsed().as_millis() as u32;
let now = Instant::now();
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let row = client
.query_one("SHOW transaction_read_only", &[])
.await
@@ -127,47 +131,36 @@ impl ComputeNode {
bail!("replica in read only mode after promotion");
}
// Already checked validity in http handler
#[allow(unused_mut)]
let mut new_pspec = crate::compute::ParsedSpec::try_from(cfg.spec).expect("invalid spec");
{
let mut state = self.state.lock().unwrap();
// Local setup has different ports for pg process (port=) for primary and secondary.
// Primary is stopped so we need secondary's "port" value
#[cfg(feature = "testing")]
{
let old_spec = &state.pspec.as_ref().unwrap().spec;
let Some(old_conf) = old_spec.cluster.postgresql_conf.as_ref() else {
bail!("pspec.spec.cluster.postgresql_conf missing for endpoint");
};
let set: std::collections::HashMap<&str, &str> = old_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
let Some(new_conf) = new_pspec.spec.cluster.postgresql_conf.as_mut() else {
bail!("pspec.spec.cluster.postgresql_conf missing for supplied config");
};
new_conf.push_str(&format!("port={}\n", set["port"]));
}
tracing::debug!("applied spec: {:#?}", new_pspec.spec);
if self.params.lakebase_mode {
ComputeNode::set_spec(&self.params, &mut state, new_pspec);
} else {
state.pspec = Some(new_pspec);
}
let spec = &mut state.pspec.as_mut().unwrap().spec;
spec.mode = ComputeMode::Primary;
let new_conf = cfg.spec.cluster.postgresql_conf.as_mut().unwrap();
let existing_conf = spec.cluster.postgresql_conf.as_ref().unwrap();
Self::merge_spec(new_conf, existing_conf);
}
info!("applied new spec, reconfiguring as primary");
self.reconfigure()?;
let reconfigure_time_ms = now.elapsed().as_millis() as u32;
self.reconfigure()
}
Ok(PromoteState::Completed {
lsn_wait_time_ms,
pg_promote_time_ms,
reconfigure_time_ms,
})
/// Merge old and new Postgres conf specs to apply on secondary.
/// Change new spec's port and safekeepers since they are supplied
/// differenly
fn merge_spec(new_conf: &mut String, existing_conf: &str) {
let mut new_conf_set: HashMap<&str, &str> = new_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.remove("neon.safekeepers");
let existing_conf_set: HashMap<&str, &str> = existing_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.insert("port", existing_conf_set["port"]);
*new_conf = new_conf_set
.iter()
.map(|(k, v)| format!("{k}={v}"))
.join("\n");
}
}

View File

@@ -65,19 +65,14 @@ pub fn write_postgres_conf(
writeln!(file, "{conf}")?;
}
// Stripe size GUC should be defined prior to connection string
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
// Add options for connecting to storage
writeln!(file, "# Neon storage settings")?;
writeln!(file)?;
if let Some(conninfo) = &spec.pageserver_connection_info {
// Stripe size GUC should be defined prior to connection string
if let Some(stripe_size) = conninfo.stripe_size {
writeln!(
file,
"# from compute spec's pageserver_connection_info.stripe_size field"
)?;
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
let mut libpq_urls: Option<Vec<String>> = Some(Vec::new());
let num_shards = if conninfo.shard_count.0 == 0 {
1 // unsharded, treat it as a single shard
@@ -115,7 +110,7 @@ pub fn write_postgres_conf(
if let Some(libpq_urls) = libpq_urls {
writeln!(
file,
"# derived from compute spec's pageserver_connection_info field"
"# derived from compute spec's pageserver_conninfo field"
)?;
writeln!(
file,
@@ -125,16 +120,24 @@ pub fn write_postgres_conf(
} else {
writeln!(file, "# no neon.pageserver_connstring")?;
}
} else {
// Stripe size GUC should be defined prior to connection string
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "# from compute spec's shard_stripe_size field")?;
if let Some(stripe_size) = conninfo.stripe_size {
writeln!(
file,
"# from compute spec's pageserver_conninfo.stripe_size field"
)?;
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
} else {
if let Some(s) = &spec.pageserver_connstring {
writeln!(file, "# from compute spec's pageserver_connstring field")?;
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
}
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "# from compute spec's shard_stripe_size field")?;
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
}
if !spec.safekeeper_connstrings.is_empty() {

View File

@@ -617,6 +617,9 @@ components:
type: object
required:
- status
- total
- prewarmed
- skipped
properties:
status:
description: LFC prewarm status
@@ -634,15 +637,6 @@ components:
skipped:
description: Pages processed but not prewarmed
type: integer
state_download_time_ms:
description: Time it takes to download LFC state to compute
type: integer
uncompress_time_ms:
description: Time it takes to uncompress LFC state
type: integer
prewarm_time_ms:
description: Time it takes to prewarm LFC state in Postgres
type: integer
LfcOffloadState:
type: object
@@ -656,16 +650,6 @@ components:
error:
description: LFC offload error, if any
type: string
state_query_time_ms:
description: Time it takes to get LFC state from Postgres
type: integer
compress_time_ms:
description: Time it takes to compress LFC state
type: integer
state_upload_time_ms:
description: Time it takes to upload LFC state to endpoint storage
type: integer
PromoteState:
type: object
@@ -679,15 +663,6 @@ components:
error:
description: Promote error, if any
type: string
lsn_wait_time_ms:
description: Time it takes for secondary to catch up with primary WAL flush LSN
type: integer
pg_promote_time_ms:
description: Time it takes to call pg_promote on secondary
type: integer
reconfigure_time_ms:
description: Time it takes to reconfigure promoted secondary
type: integer
SetRoleGrantsRequest:
type: object

View File

@@ -1,11 +1,12 @@
use crate::compute_prewarm::LfcPrewarmStateWithProgress;
use crate::http::JsonResponse;
use axum::response::{IntoResponse, Response};
use axum::{Json, http::StatusCode};
use axum_extra::extract::OptionalQuery;
use compute_api::responses::{LfcOffloadState, LfcPrewarmState};
use compute_api::responses::LfcOffloadState;
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmState> {
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmStateWithProgress> {
Json(compute.lfc_prewarm_state().await)
}

View File

@@ -1,22 +1,11 @@
use crate::http::JsonResponse;
use axum::extract::Json;
use compute_api::responses::PromoteConfig;
use http::StatusCode;
pub(in crate::http) async fn promote(
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
Json(cfg): Json<PromoteConfig>,
Json(cfg): Json<compute_api::responses::PromoteConfig>,
) -> axum::response::Response {
// Return early at the cost of extra parsing spec
let pspec = match crate::compute::ParsedSpec::try_from(cfg.spec) {
Ok(p) => p,
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
};
let cfg = PromoteConfig {
spec: pspec.spec,
wal_flush_lsn: cfg.wal_flush_lsn,
};
let state = compute.promote(cfg).await;
if let compute_api::responses::PromoteState::Failed { error: _ } = state {
return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state);

View File

@@ -71,9 +71,8 @@ const DEFAULT_PG_VERSION_NUM: &str = "17";
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
/// Neon CLI.
#[derive(clap::Parser)]
#[command(version = GIT_VERSION, name = "Neon CLI")]
#[command(version = GIT_VERSION, about, name = "Neon CLI")]
struct Cli {
#[command(subcommand)]
command: NeonLocalCmd,
@@ -108,31 +107,30 @@ enum NeonLocalCmd {
Stop(StopCmdArgs),
}
/// Initialize a new Neon repository, preparing configs for services to start with.
#[derive(clap::Args)]
#[clap(about = "Initialize a new Neon repository, preparing configs for services to start with")]
struct InitCmdArgs {
/// How many pageservers to create (default 1).
#[clap(long)]
#[clap(long, help("How many pageservers to create (default 1)"))]
num_pageservers: Option<u16>,
#[clap(long)]
config: Option<PathBuf>,
/// Force initialization even if the repository is not empty.
#[clap(long, default_value = "must-not-exist")]
#[clap(long, help("Force initialization even if the repository is not empty"))]
#[arg(value_parser)]
#[clap(default_value = "must-not-exist")]
force: InitForceMode,
}
/// Start pageserver and safekeepers.
#[derive(clap::Args)]
#[clap(about = "Start pageserver and safekeepers")]
struct StartCmdArgs {
#[clap(long = "start-timeout", default_value = "10s")]
timeout: humantime::Duration,
}
/// Stop pageserver and safekeepers.
#[derive(clap::Args)]
#[clap(about = "Stop pageserver and safekeepers")]
struct StopCmdArgs {
#[arg(value_enum)]
#[clap(long, default_value_t = StopMode::Fast)]
@@ -145,8 +143,8 @@ enum StopMode {
Immediate,
}
/// Manage tenants.
#[derive(clap::Subcommand)]
#[clap(about = "Manage tenants")]
enum TenantCmd {
List,
Create(TenantCreateCmdArgs),
@@ -157,36 +155,38 @@ enum TenantCmd {
#[derive(clap::Args)]
struct TenantCreateCmdArgs {
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
tenant_id: Option<TenantId>,
/// Use a specific timeline id when creating a tenant and its initial timeline.
#[clap(long)]
#[clap(
long,
help = "Use a specific timeline id when creating a tenant and its initial timeline"
)]
timeline_id: Option<TimelineId>,
#[clap(short = 'c')]
config: Vec<String>,
/// Postgres version to use for the initial timeline.
#[arg(default_value = DEFAULT_PG_VERSION_NUM)]
#[clap(long)]
#[clap(long, help = "Postgres version to use for the initial timeline")]
pg_version: PgMajorVersion,
/// Use this tenant in future CLI commands where tenant_id is needed, but not specified.
#[clap(long)]
#[clap(
long,
help = "Use this tenant in future CLI commands where tenant_id is needed, but not specified"
)]
set_default: bool,
/// Number of shards in the new tenant.
#[clap(long)]
#[clap(long, help = "Number of shards in the new tenant")]
#[arg(default_value_t = 0)]
shard_count: u8,
/// Sharding stripe size in pages.
#[clap(long)]
#[clap(long, help = "Sharding stripe size in pages")]
shard_stripe_size: Option<u32>,
/// Placement policy shards in this tenant.
#[clap(long)]
#[clap(long, help = "Placement policy shards in this tenant")]
#[arg(value_parser = parse_placement_policy)]
placement_policy: Option<PlacementPolicy>,
}
@@ -195,35 +195,44 @@ fn parse_placement_policy(s: &str) -> anyhow::Result<PlacementPolicy> {
Ok(serde_json::from_str::<PlacementPolicy>(s)?)
}
/// Set a particular tenant as default in future CLI commands where tenant_id is needed, but not
/// specified.
#[derive(clap::Args)]
#[clap(
about = "Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"
)]
struct TenantSetDefaultCmdArgs {
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
tenant_id: TenantId,
}
#[derive(clap::Args)]
struct TenantConfigCmdArgs {
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
tenant_id: Option<TenantId>,
#[clap(short = 'c')]
config: Vec<String>,
}
/// Import a tenant that is present in remote storage, and create branches for its timelines.
#[derive(clap::Args)]
#[clap(
about = "Import a tenant that is present in remote storage, and create branches for its timelines"
)]
struct TenantImportCmdArgs {
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
tenant_id: TenantId,
}
/// Manage timelines.
#[derive(clap::Subcommand)]
#[clap(about = "Manage timelines")]
enum TimelineCmd {
List(TimelineListCmdArgs),
Branch(TimelineBranchCmdArgs),
@@ -231,87 +240,98 @@ enum TimelineCmd {
Import(TimelineImportCmdArgs),
}
/// List all timelines available to this pageserver.
#[derive(clap::Args)]
#[clap(about = "List all timelines available to this pageserver")]
struct TimelineListCmdArgs {
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
tenant_shard_id: Option<TenantShardId>,
}
/// Create a new timeline, branching off from another timeline.
#[derive(clap::Args)]
#[clap(about = "Create a new timeline, branching off from another timeline")]
struct TimelineBranchCmdArgs {
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
tenant_id: Option<TenantId>,
/// New timeline's ID, as a 32-byte hexadecimal string.
#[clap(long)]
#[clap(long, help = "New timeline's ID")]
timeline_id: Option<TimelineId>,
/// Human-readable alias for the new timeline.
#[clap(long)]
#[clap(long, help = "Human-readable alias for the new timeline")]
branch_name: String,
/// Use last Lsn of another timeline (and its data) as base when creating the new timeline. The
/// timeline gets resolved by its branch name.
#[clap(long)]
#[clap(
long,
help = "Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name."
)]
ancestor_branch_name: Option<String>,
/// When using another timeline as base, use a specific Lsn in it instead of the latest one.
#[clap(long)]
#[clap(
long,
help = "When using another timeline as base, use a specific Lsn in it instead of the latest one"
)]
ancestor_start_lsn: Option<Lsn>,
}
/// Create a new blank timeline.
#[derive(clap::Args)]
#[clap(about = "Create a new blank timeline")]
struct TimelineCreateCmdArgs {
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
tenant_id: Option<TenantId>,
/// New timeline's ID, as a 32-byte hexadecimal string.
#[clap(long)]
#[clap(long, help = "New timeline's ID")]
timeline_id: Option<TimelineId>,
/// Human-readable alias for the new timeline.
#[clap(long)]
#[clap(long, help = "Human-readable alias for the new timeline")]
branch_name: String,
/// Postgres version.
#[arg(default_value = DEFAULT_PG_VERSION_NUM)]
#[clap(long)]
#[clap(long, help = "Postgres version")]
pg_version: PgMajorVersion,
}
/// Import a timeline from a basebackup directory.
#[derive(clap::Args)]
#[clap(about = "Import timeline from a basebackup directory")]
struct TimelineImportCmdArgs {
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
tenant_id: Option<TenantId>,
/// New timeline's ID, as a 32-byte hexadecimal string.
#[clap(long)]
#[clap(long, help = "New timeline's ID")]
timeline_id: TimelineId,
/// Human-readable alias for the new timeline.
#[clap(long)]
#[clap(long, help = "Human-readable alias for the new timeline")]
branch_name: String,
/// Basebackup tarfile to import.
#[clap(long)]
#[clap(long, help = "Basebackup tarfile to import")]
base_tarfile: PathBuf,
/// LSN the basebackup starts at.
#[clap(long)]
#[clap(long, help = "Lsn the basebackup starts at")]
base_lsn: Lsn,
/// WAL to add after base.
#[clap(long)]
#[clap(long, help = "Wal to add after base")]
wal_tarfile: Option<PathBuf>,
/// LSN the basebackup ends at.
#[clap(long)]
#[clap(long, help = "Lsn the basebackup ends at")]
end_lsn: Option<Lsn>,
/// Postgres version of the basebackup being imported.
#[arg(default_value = DEFAULT_PG_VERSION_NUM)]
#[clap(long)]
#[clap(long, help = "Postgres version of the backup being imported")]
pg_version: PgMajorVersion,
}
/// Manage pageservers.
#[derive(clap::Subcommand)]
#[clap(about = "Manage pageservers")]
enum PageserverCmd {
Status(PageserverStatusCmdArgs),
Start(PageserverStartCmdArgs),
@@ -319,202 +339,223 @@ enum PageserverCmd {
Restart(PageserverRestartCmdArgs),
}
/// Show status of a local pageserver.
#[derive(clap::Args)]
#[clap(about = "Show status of a local pageserver")]
struct PageserverStatusCmdArgs {
/// Pageserver ID.
#[clap(long = "id")]
#[clap(long = "id", help = "pageserver id")]
pageserver_id: Option<NodeId>,
}
/// Start local pageserver.
#[derive(clap::Args)]
#[clap(about = "Start local pageserver")]
struct PageserverStartCmdArgs {
/// Pageserver ID.
#[clap(long = "id")]
#[clap(long = "id", help = "pageserver id")]
pageserver_id: Option<NodeId>,
/// Timeout until we fail the command.
#[clap(short = 't', long)]
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
/// Stop local pageserver.
#[derive(clap::Args)]
#[clap(about = "Stop local pageserver")]
struct PageserverStopCmdArgs {
/// Pageserver ID.
#[clap(long = "id")]
#[clap(long = "id", help = "pageserver id")]
pageserver_id: Option<NodeId>,
/// If 'immediate', don't flush repository data at shutdown
#[clap(short = 'm')]
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
#[arg(value_enum, default_value = "fast")]
stop_mode: StopMode,
}
/// Restart local pageserver.
#[derive(clap::Args)]
#[clap(about = "Restart local pageserver")]
struct PageserverRestartCmdArgs {
/// Pageserver ID.
#[clap(long = "id")]
#[clap(long = "id", help = "pageserver id")]
pageserver_id: Option<NodeId>,
/// Timeout until we fail the command.
#[clap(short = 't', long)]
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
/// Manage storage controller.
#[derive(clap::Subcommand)]
#[clap(about = "Manage storage controller")]
enum StorageControllerCmd {
Start(StorageControllerStartCmdArgs),
Stop(StorageControllerStopCmdArgs),
}
/// Start storage controller.
#[derive(clap::Args)]
#[clap(about = "Start storage controller")]
struct StorageControllerStartCmdArgs {
/// Timeout until we fail the command.
#[clap(short = 't', long)]
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
/// Identifier used to distinguish storage controller instances.
#[clap(long)]
#[clap(
long,
help = "Identifier used to distinguish storage controller instances"
)]
#[arg(default_value_t = 1)]
instance_id: u8,
/// Base port for the storage controller instance identified by instance-id (defaults to
/// pageserver cplane api).
#[clap(long)]
#[clap(
long,
help = "Base port for the storage controller instance idenfified by instance-id (defaults to pageserver cplane api)"
)]
base_port: Option<u16>,
/// Whether the storage controller should handle pageserver-reported local disk loss events.
#[clap(long)]
#[clap(
long,
help = "Whether the storage controller should handle pageserver-reported local disk loss events."
)]
handle_ps_local_disk_loss: Option<bool>,
}
/// Stop storage controller.
#[derive(clap::Args)]
#[clap(about = "Stop storage controller")]
struct StorageControllerStopCmdArgs {
/// If 'immediate', don't flush repository data at shutdown
#[clap(short = 'm')]
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
#[arg(value_enum, default_value = "fast")]
stop_mode: StopMode,
/// Identifier used to distinguish storage controller instances.
#[clap(long)]
#[clap(
long,
help = "Identifier used to distinguish storage controller instances"
)]
#[arg(default_value_t = 1)]
instance_id: u8,
}
/// Manage storage broker.
#[derive(clap::Subcommand)]
#[clap(about = "Manage storage broker")]
enum StorageBrokerCmd {
Start(StorageBrokerStartCmdArgs),
Stop(StorageBrokerStopCmdArgs),
}
/// Start broker.
#[derive(clap::Args)]
#[clap(about = "Start broker")]
struct StorageBrokerStartCmdArgs {
/// Timeout until we fail the command.
#[clap(short = 't', long, default_value = "10s")]
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
/// Stop broker.
#[derive(clap::Args)]
#[clap(about = "stop broker")]
struct StorageBrokerStopCmdArgs {
/// If 'immediate', don't flush repository data on shutdown.
#[clap(short = 'm')]
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
#[arg(value_enum, default_value = "fast")]
stop_mode: StopMode,
}
/// Manage safekeepers.
#[derive(clap::Subcommand)]
#[clap(about = "Manage safekeepers")]
enum SafekeeperCmd {
Start(SafekeeperStartCmdArgs),
Stop(SafekeeperStopCmdArgs),
Restart(SafekeeperRestartCmdArgs),
}
/// Manage object storage.
#[derive(clap::Subcommand)]
#[clap(about = "Manage object storage")]
enum EndpointStorageCmd {
Start(EndpointStorageStartCmd),
Stop(EndpointStorageStopCmd),
}
/// Start object storage.
#[derive(clap::Args)]
#[clap(about = "Start object storage")]
struct EndpointStorageStartCmd {
/// Timeout until we fail the command.
#[clap(short = 't', long)]
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
/// Stop object storage.
#[derive(clap::Args)]
#[clap(about = "Stop object storage")]
struct EndpointStorageStopCmd {
/// If 'immediate', don't flush repository data on shutdown.
#[clap(short = 'm')]
#[arg(value_enum, default_value = "fast")]
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
stop_mode: StopMode,
}
/// Start local safekeeper.
#[derive(clap::Args)]
#[clap(about = "Start local safekeeper")]
struct SafekeeperStartCmdArgs {
/// Safekeeper ID.
#[clap(help = "safekeeper id")]
#[arg(default_value_t = NodeId(1))]
id: NodeId,
/// Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo.
#[clap(short = 'e', long = "safekeeper-extra-opt")]
#[clap(
short = 'e',
long = "safekeeper-extra-opt",
help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo"
)]
extra_opt: Vec<String>,
/// Timeout until we fail the command.
#[clap(short = 't', long)]
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
/// Stop local safekeeper.
#[derive(clap::Args)]
#[clap(about = "Stop local safekeeper")]
struct SafekeeperStopCmdArgs {
/// Safekeeper ID.
#[clap(help = "safekeeper id")]
#[arg(default_value_t = NodeId(1))]
id: NodeId,
/// If 'immediate', don't flush repository data on shutdown.
#[arg(value_enum, default_value = "fast")]
#[clap(short = 'm')]
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
stop_mode: StopMode,
}
/// Restart local safekeeper.
#[derive(clap::Args)]
#[clap(about = "Restart local safekeeper")]
struct SafekeeperRestartCmdArgs {
/// Safekeeper ID.
#[clap(help = "safekeeper id")]
#[arg(default_value_t = NodeId(1))]
id: NodeId,
/// If 'immediate', don't flush repository data on shutdown.
#[arg(value_enum, default_value = "fast")]
#[clap(short = 'm')]
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
stop_mode: StopMode,
/// Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo.
#[clap(short = 'e', long = "safekeeper-extra-opt")]
#[clap(
short = 'e',
long = "safekeeper-extra-opt",
help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo"
)]
extra_opt: Vec<String>,
/// Timeout until we fail the command.
#[clap(short = 't', long)]
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
/// Manage Postgres instances.
#[derive(clap::Subcommand)]
#[clap(about = "Manage Postgres instances")]
enum EndpointCmd {
List(EndpointListCmdArgs),
Create(EndpointCreateCmdArgs),
@@ -526,27 +567,33 @@ enum EndpointCmd {
GenerateJwt(EndpointGenerateJwtCmdArgs),
}
/// List endpoints.
#[derive(clap::Args)]
#[clap(about = "List endpoints")]
struct EndpointListCmdArgs {
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
tenant_shard_id: Option<TenantShardId>,
}
/// Create a compute endpoint.
#[derive(clap::Args)]
#[clap(about = "Create a compute endpoint")]
struct EndpointCreateCmdArgs {
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
tenant_id: Option<TenantId>,
/// Postgres endpoint ID.
#[clap(help = "Postgres endpoint id")]
endpoint_id: Option<String>,
/// Name of the branch the endpoint will run on.
#[clap(long)]
#[clap(long, help = "Name of the branch the endpoint will run on")]
branch_name: Option<String>,
/// Specify LSN on the timeline to start from. By default, end of the timeline would be used.
#[clap(long)]
#[clap(
long,
help = "Specify Lsn on the timeline to start from. By default, end of the timeline would be used"
)]
lsn: Option<Lsn>,
#[clap(long)]
pg_port: Option<u16>,
@@ -557,13 +604,16 @@ struct EndpointCreateCmdArgs {
#[clap(long = "pageserver-id")]
endpoint_pageserver_id: Option<NodeId>,
/// Don't do basebackup, create endpoint directory with only config files.
#[clap(long, action = clap::ArgAction::Set, default_value_t = false)]
#[clap(
long,
help = "Don't do basebackup, create endpoint directory with only config files",
action = clap::ArgAction::Set,
default_value_t = false
)]
config_only: bool,
/// Postgres version.
#[arg(default_value = DEFAULT_PG_VERSION_NUM)]
#[clap(long)]
#[clap(long, help = "Postgres version")]
pg_version: PgMajorVersion,
/// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings.
@@ -574,140 +624,170 @@ struct EndpointCreateCmdArgs {
#[clap(long)]
grpc: bool,
/// If set, the node will be a hot replica on the specified timeline.
#[clap(long, action = clap::ArgAction::Set, default_value_t = false)]
#[clap(
long,
help = "If set, the node will be a hot replica on the specified timeline",
action = clap::ArgAction::Set,
default_value_t = false
)]
hot_standby: bool,
/// If set, will set up the catalog for neon_superuser.
#[clap(long)]
#[clap(long, help = "If set, will set up the catalog for neon_superuser")]
update_catalog: bool,
/// Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but
/// useful for tests.
#[clap(long)]
#[clap(
long,
help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
)]
allow_multiple: bool,
/// Name of the privileged role for the endpoint.
// Only allow changing it on creation.
#[clap(long)]
/// Only allow changing it on creation
#[clap(long, help = "Name of the privileged role for the endpoint")]
privileged_role_name: Option<String>,
}
/// Start Postgres. If the endpoint doesn't exist yet, it is created.
#[derive(clap::Args)]
#[clap(about = "Start postgres. If the endpoint doesn't exist yet, it is created.")]
struct EndpointStartCmdArgs {
/// Postgres endpoint ID.
#[clap(help = "Postgres endpoint id")]
endpoint_id: String,
/// Pageserver ID.
#[clap(long = "pageserver-id")]
endpoint_pageserver_id: Option<NodeId>,
/// Safekeepers membership generation to prefix neon.safekeepers with.
#[clap(long)]
#[clap(
long,
help = "Safekeepers membership generation to prefix neon.safekeepers with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations."
)]
safekeepers_generation: Option<u32>,
/// List of safekeepers endpoint will talk to.
#[clap(long)]
#[clap(
long,
help = "List of safekeepers endpoint will talk to. Normally neon_local chooses them on its own, but this option allows to override."
)]
safekeepers: Option<String>,
/// Configure the remote extensions storage proxy gateway URL to request for extensions.
#[clap(long, alias = "remote-ext-config")]
#[clap(
long,
help = "Configure the remote extensions storage proxy gateway URL to request for extensions.",
alias = "remote-ext-config"
)]
remote_ext_base_url: Option<String>,
/// If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`
#[clap(long)]
#[clap(
long,
help = "If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`"
)]
create_test_user: bool,
/// Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but
/// useful for tests.
#[clap(long)]
#[clap(
long,
help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
)]
allow_multiple: bool,
/// Timeout until we fail the command.
#[clap(short = 't', long, value_parser= humantime::parse_duration)]
#[clap(short = 't', long, value_parser= humantime::parse_duration, help = "timeout until we fail the command")]
#[arg(default_value = "90s")]
start_timeout: Duration,
/// Download LFC cache from endpoint storage on endpoint startup
#[clap(long, default_value = "false")]
#[clap(
long,
help = "Download LFC cache from endpoint storage on endpoint startup",
default_value = "false"
)]
autoprewarm: bool,
/// Upload LFC cache to endpoint storage periodically
#[clap(long)]
#[clap(long, help = "Upload LFC cache to endpoint storage periodically")]
offload_lfc_interval_seconds: Option<std::num::NonZeroU64>,
/// Run in development mode, skipping VM-specific operations like process termination
#[clap(long, action = clap::ArgAction::SetTrue)]
#[clap(
long,
help = "Run in development mode, skipping VM-specific operations like process termination",
action = clap::ArgAction::SetTrue
)]
dev: bool,
}
/// Reconfigure an endpoint.
#[derive(clap::Args)]
#[clap(about = "Reconfigure an endpoint")]
struct EndpointReconfigureCmdArgs {
/// Tenant id. Represented as a hexadecimal string 32 symbols length
#[clap(long = "tenant-id")]
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
tenant_id: Option<TenantId>,
/// Postgres endpoint ID.
#[clap(help = "Postgres endpoint id")]
endpoint_id: String,
/// Pageserver ID.
#[clap(long = "pageserver-id")]
endpoint_pageserver_id: Option<NodeId>,
#[clap(long)]
safekeepers: Option<String>,
}
/// Refresh the endpoint's configuration by forcing it reload it's spec
#[derive(clap::Args)]
#[clap(about = "Refresh the endpoint's configuration by forcing it reload it's spec")]
struct EndpointRefreshConfigurationArgs {
/// Postgres endpoint id
#[clap(help = "Postgres endpoint id")]
endpoint_id: String,
}
/// Stop an endpoint.
#[derive(clap::Args)]
#[clap(about = "Stop an endpoint")]
struct EndpointStopCmdArgs {
/// Postgres endpoint ID.
#[clap(help = "Postgres endpoint id")]
endpoint_id: String,
/// Also delete data directory (now optional, should be default in future).
#[clap(long)]
#[clap(
long,
help = "Also delete data directory (now optional, should be default in future)"
)]
destroy: bool,
/// Postgres shutdown mode, passed to `pg_ctl -m <mode>`.
#[clap(long)]
#[clap(long, help = "Postgres shutdown mode")]
#[clap(default_value = "fast")]
mode: EndpointTerminateMode,
}
/// Update the pageservers in the spec file of the compute endpoint
#[derive(clap::Args)]
#[clap(about = "Update the pageservers in the spec file of the compute endpoint")]
struct EndpointUpdatePageserversCmdArgs {
/// Postgres endpoint id
#[clap(help = "Postgres endpoint id")]
endpoint_id: String,
/// Specified pageserver id
#[clap(short = 'p', long)]
#[clap(short = 'p', long, help = "Specified pageserver id")]
pageserver_id: Option<NodeId>,
}
/// Generate a JWT for an endpoint.
#[derive(clap::Args)]
#[clap(about = "Generate a JWT for an endpoint")]
struct EndpointGenerateJwtCmdArgs {
/// Postgres endpoint ID.
#[clap(help = "Postgres endpoint id")]
endpoint_id: String,
/// Scope to generate the JWT with.
#[clap(short = 's', long, value_parser = ComputeClaimsScope::from_str)]
#[clap(short = 's', long, help = "Scope to generate the JWT with", value_parser = ComputeClaimsScope::from_str)]
scope: Option<ComputeClaimsScope>,
}
/// Manage neon_local branch name mappings.
#[derive(clap::Subcommand)]
#[clap(about = "Manage neon_local branch name mappings")]
enum MappingsCmd {
Map(MappingsMapCmdArgs),
}
/// Create new mapping which cannot exist already.
#[derive(clap::Args)]
#[clap(about = "Create new mapping which cannot exist already")]
struct MappingsMapCmdArgs {
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long)]
#[clap(
long,
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
tenant_id: TenantId,
/// Timeline ID, as a 32-byte hexadecimal string.
#[clap(long)]
#[clap(
long,
help = "Timeline id. Represented as a hexadecimal string 32 symbols length"
)]
timeline_id: TimelineId,
/// Branch name to give to the timeline.
#[clap(long)]
#[clap(long, help = "Branch name to give to the timeline")]
branch_name: String,
}

View File

@@ -303,13 +303,6 @@ enum Command {
#[arg(long, required = true, value_delimiter = ',')]
new_sk_set: Vec<NodeId>,
},
/// Abort ongoing safekeeper migration.
TimelineSafekeeperMigrateAbort {
#[arg(long)]
tenant_id: TenantId,
#[arg(long)]
timeline_id: TimelineId,
},
}
#[derive(Parser)]
@@ -1403,17 +1396,6 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
}
Command::TimelineSafekeeperMigrateAbort {
tenant_id,
timeline_id,
} => {
let path =
format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate_abort");
storcon_client
.dispatch::<(), ()>(Method::POST, path, None)
.await?;
}
}
Ok(())

View File

@@ -1,9 +1,10 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use std::fmt::Display;
use chrono::{DateTime, Utc};
use jsonwebtoken::jwk::JwkSet;
use serde::{Deserialize, Serialize, Serializer};
use std::fmt::Display;
use crate::privilege::Privilege;
use crate::spec::{ComputeSpec, Database, ExtVersion, PgIdent, Role};
@@ -48,7 +49,7 @@ pub struct ExtensionInstallResponse {
/// Status of the LFC prewarm process. The same state machine is reused for
/// both autoprewarm (prewarm after compute/Postgres start using the previously
/// stored LFC state) and explicit prewarming via API.
#[derive(Serialize, Default, Debug, Clone)]
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcPrewarmState {
/// Default value when compute boots up.
@@ -58,14 +59,7 @@ pub enum LfcPrewarmState {
Prewarming,
/// We found requested LFC state in the endpoint storage and
/// completed prewarming successfully.
Completed {
total: i32,
prewarmed: i32,
skipped: i32,
state_download_time_ms: u32,
uncompress_time_ms: u32,
prewarm_time_ms: u32,
},
Completed,
/// Unexpected error happened during prewarming. Note, `Not Found 404`
/// response from the endpoint storage is explicitly excluded here
/// because it can normally happen on the first compute start,
@@ -90,7 +84,7 @@ impl Display for LfcPrewarmState {
match self {
LfcPrewarmState::NotPrewarmed => f.write_str("NotPrewarmed"),
LfcPrewarmState::Prewarming => f.write_str("Prewarming"),
LfcPrewarmState::Completed { .. } => f.write_str("Completed"),
LfcPrewarmState::Completed => f.write_str("Completed"),
LfcPrewarmState::Skipped => f.write_str("Skipped"),
LfcPrewarmState::Failed { error } => write!(f, "Error({error})"),
LfcPrewarmState::Cancelled => f.write_str("Cancelled"),
@@ -98,36 +92,26 @@ impl Display for LfcPrewarmState {
}
}
#[derive(Serialize, Default, Debug, Clone)]
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcOffloadState {
#[default]
NotOffloaded,
Offloading,
Completed {
state_query_time_ms: u32,
compress_time_ms: u32,
state_upload_time_ms: u32,
},
Completed,
Failed {
error: String,
},
/// LFC state was empty so it wasn't offloaded
Skipped,
}
#[derive(Serialize, Debug, Clone)]
#[derive(Serialize, Debug, Clone, PartialEq)]
#[serde(tag = "status", rename_all = "snake_case")]
/// Response of /promote
pub enum PromoteState {
NotPromoted,
Completed {
lsn_wait_time_ms: u32,
pg_promote_time_ms: u32,
reconfigure_time_ms: u32,
},
Failed {
error: String,
},
Completed,
Failed { error: String },
}
#[derive(Deserialize, Default, Debug)]

View File

@@ -5,17 +5,12 @@ use std::sync::Arc;
use bytes::Bytes;
use http::Method;
use http::header::{
ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_MAX_AGE, ACCESS_CONTROL_REQUEST_HEADERS, ALLOW,
AUTHORIZATION, CONTENT_TYPE, HOST, ORIGIN,
};
use http::header::{AUTHORIZATION, CONTENT_TYPE, HOST};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Empty, Full};
use http_body_util::{BodyExt, Full};
use http_utils::error::ApiError;
use hyper::body::Incoming;
use hyper::http::response::Builder;
use hyper::http::{HeaderMap, HeaderName, HeaderValue};
use hyper::http::{HeaderName, HeaderValue};
use hyper::{Request, Response, StatusCode};
use indexmap::IndexMap;
use moka::sync::Cache;
@@ -72,15 +67,6 @@ use crate::util::deserialize_json_string;
static EMPTY_JSON_SCHEMA: &str = r#"{"schemas":[]}"#;
const INTROSPECTION_SQL: &str = POSTGRESQL_INTROSPECTION_SQL;
const HEADER_VALUE_ALLOW_ALL_ORIGINS: HeaderValue = HeaderValue::from_static("*");
// CORS headers values
const ACCESS_CONTROL_ALLOW_METHODS_VALUE: HeaderValue =
HeaderValue::from_static("GET, POST, PATCH, PUT, DELETE, OPTIONS");
const ACCESS_CONTROL_MAX_AGE_VALUE: HeaderValue = HeaderValue::from_static("86400");
const ACCESS_CONTROL_EXPOSE_HEADERS_VALUE: HeaderValue = HeaderValue::from_static(
"Content-Encoding, Content-Location, Content-Range, Content-Type, Date, Location, Server, Transfer-Encoding, Range-Unit",
);
const ACCESS_CONTROL_ALLOW_HEADERS_VALUE: HeaderValue = HeaderValue::from_static("Authorization");
// A wrapper around the DbSchema that allows for self-referencing
#[self_referencing]
@@ -151,8 +137,6 @@ pub struct ApiConfig {
pub role_claim_key: String,
#[serde(default, deserialize_with = "deserialize_comma_separated_option")]
pub db_extra_search_path: Option<Vec<String>>,
#[serde(default, deserialize_with = "deserialize_comma_separated_option")]
pub server_cors_allowed_origins: Option<Vec<String>>,
}
// The DbSchemaCache is a cache of the ApiConfig and DbSchemaOwned for each endpoint
@@ -181,13 +165,7 @@ impl DbSchemaCache {
}
}
pub fn get_cached(
&self,
endpoint_id: &EndpointCacheKey,
) -> Option<Arc<(ApiConfig, DbSchemaOwned)>> {
count_cache_outcome(CacheKind::Schema, self.0.get(endpoint_id))
}
pub async fn get_remote(
pub async fn get_cached_or_remote(
&self,
endpoint_id: &EndpointCacheKey,
auth_header: &HeaderValue,
@@ -196,42 +174,47 @@ impl DbSchemaCache {
ctx: &RequestContext,
config: &'static ProxyConfig,
) -> Result<Arc<(ApiConfig, DbSchemaOwned)>, RestError> {
info!("db_schema cache miss for endpoint: {:?}", endpoint_id);
let remote_value = self
.internal_get_remote(auth_header, connection_string, client, ctx, config)
.await;
let (api_config, schema_owned) = match remote_value {
Ok((api_config, schema_owned)) => (api_config, schema_owned),
Err(e @ RestError::SchemaTooLarge) => {
// for the case where the schema is too large, we cache an empty dummy value
// all the other requests will fail without triggering the introspection query
let schema_owned = serde_json::from_str::<DbSchemaOwned>(EMPTY_JSON_SCHEMA)
.map_err(|e| JsonDeserialize { source: e })?;
let cache_result = count_cache_outcome(CacheKind::Schema, self.0.get(endpoint_id));
match cache_result {
Some(v) => Ok(v),
None => {
info!("db_schema cache miss for endpoint: {:?}", endpoint_id);
let remote_value = self
.get_remote(auth_header, connection_string, client, ctx, config)
.await;
let (api_config, schema_owned) = match remote_value {
Ok((api_config, schema_owned)) => (api_config, schema_owned),
Err(e @ RestError::SchemaTooLarge) => {
// for the case where the schema is too large, we cache an empty dummy value
// all the other requests will fail without triggering the introspection query
let schema_owned = serde_json::from_str::<DbSchemaOwned>(EMPTY_JSON_SCHEMA)
.map_err(|e| JsonDeserialize { source: e })?;
let api_config = ApiConfig {
db_schemas: vec![],
db_anon_role: None,
db_max_rows: None,
db_allowed_select_functions: vec![],
role_claim_key: String::new(),
db_extra_search_path: None,
server_cors_allowed_origins: None,
let api_config = ApiConfig {
db_schemas: vec![],
db_anon_role: None,
db_max_rows: None,
db_allowed_select_functions: vec![],
role_claim_key: String::new(),
db_extra_search_path: None,
};
let value = Arc::new((api_config, schema_owned));
count_cache_insert(CacheKind::Schema);
self.0.insert(endpoint_id.clone(), value);
return Err(e);
}
Err(e) => {
return Err(e);
}
};
let value = Arc::new((api_config, schema_owned));
count_cache_insert(CacheKind::Schema);
self.0.insert(endpoint_id.clone(), value);
return Err(e);
self.0.insert(endpoint_id.clone(), value.clone());
Ok(value)
}
Err(e) => {
return Err(e);
}
};
let value = Arc::new((api_config, schema_owned));
count_cache_insert(CacheKind::Schema);
self.0.insert(endpoint_id.clone(), value.clone());
Ok(value)
}
}
async fn internal_get_remote(
pub async fn get_remote(
&self,
auth_header: &HeaderValue,
connection_string: &str,
@@ -548,7 +531,7 @@ pub(crate) async fn handle(
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
let result = handle_inner(cancel, config, &ctx, request, backend).await;
let response = match result {
let mut response = match result {
Ok(r) => {
ctx.set_success();
@@ -657,6 +640,9 @@ pub(crate) async fn handle(
}
};
response
.headers_mut()
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
Ok(response)
}
@@ -736,37 +722,6 @@ async fn handle_inner(
}
}
fn apply_common_cors_headers(
response: &mut Builder,
request_headers: &HeaderMap,
allowed_origins: Option<&Vec<String>>,
) {
let request_origin = request_headers
.get(ORIGIN)
.map(|v| v.to_str().unwrap_or(""));
let response_allow_origin = match (request_origin, allowed_origins) {
(Some(or), Some(allowed_origins)) => {
if allowed_origins.iter().any(|o| o == or) {
Some(HeaderValue::from_str(or).unwrap_or(HEADER_VALUE_ALLOW_ALL_ORIGINS))
} else {
None
}
}
(Some(_), None) => Some(HEADER_VALUE_ALLOW_ALL_ORIGINS),
_ => None,
};
if let Some(h) = response.headers_mut() {
h.insert(
ACCESS_CONTROL_EXPOSE_HEADERS,
ACCESS_CONTROL_EXPOSE_HEADERS_VALUE,
);
if let Some(origin) = response_allow_origin {
h.insert(ACCESS_CONTROL_ALLOW_ORIGIN, origin);
}
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_rest_inner(
config: &'static ProxyConfig,
@@ -778,6 +733,12 @@ async fn handle_rest_inner(
jwt: String,
backend: Arc<PoolingBackend>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, RestError> {
// validate the jwt token
let jwt_parsed = backend
.authenticate_with_jwt(ctx, &conn_info.user_info, jwt)
.await
.map_err(HttpConnError::from)?;
let db_schema_cache =
config
.rest_config
@@ -793,83 +754,28 @@ async fn handle_rest_inner(
message: "Failed to get endpoint cache key".to_string(),
}))?;
let mut client = backend.connect_to_local_proxy(ctx, conn_info).await?;
let (parts, originial_body) = request.into_parts();
// try and get the cached entry for this endpoint
// it contains the api config and the introspected db schema
let cached_entry = db_schema_cache.get_cached(&endpoint_cache_key);
let allowed_origins = cached_entry
.as_ref()
.and_then(|arc| arc.0.server_cors_allowed_origins.as_ref());
let mut response = Response::builder();
apply_common_cors_headers(&mut response, &parts.headers, allowed_origins);
// handle the OPTIONS request
if parts.method == Method::OPTIONS {
let allowed_headers = parts
.headers
.get(ACCESS_CONTROL_REQUEST_HEADERS)
.and_then(|a| a.to_str().ok())
.filter(|v| !v.is_empty())
.map_or_else(
|| "Authorization".to_string(),
|v| format!("{v}, Authorization"),
);
return response
.status(StatusCode::OK)
.header(
ACCESS_CONTROL_ALLOW_METHODS,
ACCESS_CONTROL_ALLOW_METHODS_VALUE,
)
.header(ACCESS_CONTROL_MAX_AGE, ACCESS_CONTROL_MAX_AGE_VALUE)
.header(
ACCESS_CONTROL_ALLOW_HEADERS,
HeaderValue::from_str(&allowed_headers)
.unwrap_or(ACCESS_CONTROL_ALLOW_HEADERS_VALUE),
)
.header(ALLOW, ACCESS_CONTROL_ALLOW_METHODS_VALUE)
.body(Empty::new().map_err(|x| match x {}).boxed())
.map_err(|e| {
RestError::SubzeroCore(InternalError {
message: e.to_string(),
})
});
}
// validate the jwt token
let jwt_parsed = backend
.authenticate_with_jwt(ctx, &conn_info.user_info, jwt)
.await
.map_err(HttpConnError::from)?;
let auth_header = parts
.headers
.get(AUTHORIZATION)
.ok_or(RestError::SubzeroCore(InternalError {
message: "Authorization header is required".to_string(),
}))?;
let mut client = backend.connect_to_local_proxy(ctx, conn_info).await?;
let entry = match cached_entry {
Some(e) => e,
None => {
// if not cached, get the remote entry (will run the introspection query)
db_schema_cache
.get_remote(
&endpoint_cache_key,
auth_header,
connection_string,
&mut client,
ctx,
config,
)
.await?
}
};
let entry = db_schema_cache
.get_cached_or_remote(
&endpoint_cache_key,
auth_header,
connection_string,
&mut client,
ctx,
config,
)
.await?;
let (api_config, db_schema_owned) = entry.as_ref();
let db_schema = db_schema_owned.borrow_schema();
let db_schemas = &api_config.db_schemas; // list of schemas available for the api
@@ -1093,8 +999,8 @@ async fn handle_rest_inner(
let _metrics = client.metrics(ctx); // FIXME: is everything in the context set correctly?
// send the request to the local proxy
let proxy_response = make_raw_local_proxy_request(&mut client, headers, req_body).await?;
let (response_parts, body) = proxy_response.into_parts();
let response = make_raw_local_proxy_request(&mut client, headers, req_body).await?;
let (parts, body) = response.into_parts();
let max_response = config.http_config.max_response_size_bytes;
let bytes = read_body_with_limit(body, max_response)
@@ -1103,7 +1009,7 @@ async fn handle_rest_inner(
// if the response status is greater than 399, then it is an error
// FIXME: check if there are other error codes or shapes of the response
if response_parts.status.as_u16() > 399 {
if parts.status.as_u16() > 399 {
// turn this postgres error from the json into PostgresError
let postgres_error = serde_json::from_slice(&bytes)
.map_err(|e| RestError::SubzeroCore(JsonDeserialize { source: e }))?;
@@ -1269,7 +1175,7 @@ async fn handle_rest_inner(
.boxed();
// build the response
response = response
let mut response = Response::builder()
.status(StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR))
.header(CONTENT_TYPE, http_content_type);

View File

@@ -1 +0,0 @@
DROP TABLE tenant_shards;

View File

@@ -1,13 +0,0 @@
CREATE TABLE tenant_shards (
tenant_id VARCHAR NOT NULL,
shard_number INTEGER NOT NULL,
shard_count INTEGER NOT NULL,
PRIMARY KEY(tenant_id, shard_number, shard_count),
shard_stripe_size INTEGER NOT NULL,
generation INTEGER NOT NULL,
generation_pageserver BIGINT NOT NULL,
placement_policy VARCHAR NOT NULL,
splitting SMALLINT NOT NULL,
-- config is JSON encoded, opaque to the database.
config TEXT NOT NULL
);

View File

@@ -1 +0,0 @@
DROP TABLE nodes;

View File

@@ -1,10 +0,0 @@
CREATE TABLE nodes (
node_id BIGINT PRIMARY KEY NOT NULL,
scheduling_policy VARCHAR NOT NULL,
listen_http_addr VARCHAR NOT NULL,
listen_http_port INTEGER NOT NULL,
listen_pg_addr VARCHAR NOT NULL,
listen_pg_port INTEGER NOT NULL
);

View File

@@ -1,2 +0,0 @@
ALTER TABLE tenant_shards ALTER generation SET NOT NULL;
ALTER TABLE tenant_shards ALTER generation_pageserver SET NOT NULL;

View File

@@ -1,4 +0,0 @@
ALTER TABLE tenant_shards ALTER generation DROP NOT NULL;
ALTER TABLE tenant_shards ALTER generation_pageserver DROP NOT NULL;

View File

@@ -1,3 +0,0 @@
UPDATE tenant_shards set placement_policy='{"Double": 1}' where placement_policy='{"Attached": 1}';
UPDATE tenant_shards set placement_policy='"Single"' where placement_policy='{"Attached": 0}';

View File

@@ -1,3 +0,0 @@
UPDATE tenant_shards set placement_policy='{"Attached": 1}' where placement_policy='{"Double": 1}';
UPDATE tenant_shards set placement_policy='{"Attached": 0}' where placement_policy='"Single"';

View File

@@ -1,3 +0,0 @@
-- This file should undo anything in `up.sql`
ALTER TABLE tenant_shards drop scheduling_policy;

View File

@@ -1,2 +0,0 @@
ALTER TABLE tenant_shards add scheduling_policy VARCHAR NOT NULL DEFAULT '"Active"';

View File

@@ -1 +0,0 @@
DROP TABLE metadata_health;

View File

@@ -1,14 +0,0 @@
CREATE TABLE metadata_health (
tenant_id VARCHAR NOT NULL,
shard_number INTEGER NOT NULL,
shard_count INTEGER NOT NULL,
PRIMARY KEY(tenant_id, shard_number, shard_count),
-- Rely on cascade behavior for delete
FOREIGN KEY(tenant_id, shard_number, shard_count) REFERENCES tenant_shards ON DELETE CASCADE,
healthy BOOLEAN NOT NULL DEFAULT TRUE,
last_scrubbed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
INSERT INTO metadata_health(tenant_id, shard_number, shard_count)
SELECT tenant_id, shard_number, shard_count FROM tenant_shards;

View File

@@ -1 +0,0 @@
DROP TABLE controllers;

View File

@@ -1,5 +0,0 @@
CREATE TABLE controllers (
address VARCHAR NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY(address, started_at)
);

View File

@@ -1,2 +0,0 @@
-- This file should undo anything in `up.sql`
DROP TABLE safekeepers;

View File

@@ -1,15 +0,0 @@
-- started out as a copy of cplane schema, removed the unnecessary columns.
CREATE TABLE safekeepers (
-- the surrogate identifier defined by control plane database sequence
id BIGINT PRIMARY KEY,
region_id TEXT NOT NULL,
version BIGINT NOT NULL,
-- the natural id on whatever cloud platform, not needed in storage controller
-- instance_id TEXT UNIQUE NOT NULL,
host TEXT NOT NULL,
port INTEGER NOT NULL,
active BOOLEAN NOT NULL DEFAULT false,
-- projects_count INTEGER NOT NULL DEFAULT 0,
http_port INTEGER NOT NULL,
availability_zone_id TEXT NOT NULL
);

View File

@@ -1,2 +0,0 @@
-- This file should undo anything in `up.sql`
DROP INDEX tenant_shards_tenant_id;

View File

@@ -1,2 +0,0 @@
-- Your SQL goes here
CREATE INDEX tenant_shards_tenant_id ON tenant_shards (tenant_id);

View File

@@ -1 +0,0 @@
ALTER TABLE nodes DROP availability_zone_id;

View File

@@ -1 +0,0 @@
ALTER TABLE nodes ADD availability_zone_id VARCHAR;

View File

@@ -1 +0,0 @@
ALTER TABLE nodes ALTER availability_zone_id DROP NOT NULL;

View File

@@ -1 +0,0 @@
ALTER TABLE nodes ALTER availability_zone_id SET NOT NULL;

View File

@@ -1 +0,0 @@
ALTER TABLE tenant_shards DROP preferred_az_id;

View File

@@ -1 +0,0 @@
ALTER TABLE tenant_shards ADD preferred_az_id VARCHAR;

View File

@@ -1 +0,0 @@
ALTER TABLE safekeepers DROP scheduling_policy;

View File

@@ -1 +0,0 @@
ALTER TABLE safekeepers ADD scheduling_policy VARCHAR NOT NULL DEFAULT 'disabled';

View File

@@ -1,4 +0,0 @@
-- this sadly isn't a "true" revert of the migration, as the column is now at the end of the table.
-- But preserving order is not a trivial operation.
-- https://wiki.postgresql.org/wiki/Alter_column_position
ALTER TABLE safekeepers ADD active BOOLEAN NOT NULL DEFAULT false;

View File

@@ -1 +0,0 @@
ALTER TABLE safekeepers DROP active;

View File

@@ -1,2 +0,0 @@
ALTER TABLE safekeepers ALTER COLUMN scheduling_policy SET DEFAULT 'disabled';
UPDATE safekeepers SET scheduling_policy = 'disabled' WHERE scheduling_policy = 'pause';

View File

@@ -1,2 +0,0 @@
ALTER TABLE safekeepers ALTER COLUMN scheduling_policy SET DEFAULT 'pause';
UPDATE safekeepers SET scheduling_policy = 'pause' WHERE scheduling_policy = 'disabled';

View File

@@ -1 +0,0 @@
ALTER TABLE nodes DROP listen_https_port;

View File

@@ -1 +0,0 @@
ALTER TABLE nodes ADD listen_https_port INTEGER;

View File

@@ -1,2 +0,0 @@
DROP TABLE timelines;
DROP TABLE safekeeper_timeline_pending_ops;

View File

@@ -1,19 +0,0 @@
CREATE TABLE timelines (
tenant_id VARCHAR NOT NULL,
timeline_id VARCHAR NOT NULL,
start_lsn pg_lsn NOT NULL,
generation INTEGER NOT NULL,
sk_set BIGINT[] NOT NULL,
new_sk_set BIGINT[],
cplane_notified_generation INTEGER NOT NULL,
deleted_at timestamptz,
PRIMARY KEY(tenant_id, timeline_id)
);
CREATE TABLE safekeeper_timeline_pending_ops (
sk_id BIGINT NOT NULL,
tenant_id VARCHAR NOT NULL,
timeline_id VARCHAR NOT NULL,
generation INTEGER NOT NULL,
op_kind VARCHAR NOT NULL,
PRIMARY KEY(tenant_id, timeline_id, sk_id)
);

View File

@@ -1 +0,0 @@
ALTER TABLE safekeepers DROP https_port;

View File

@@ -1 +0,0 @@
ALTER TABLE safekeepers ADD https_port INTEGER;

View File

@@ -1 +0,0 @@
DROP TABLE timeline_imports;

View File

@@ -1,6 +0,0 @@
CREATE TABLE timeline_imports (
tenant_id VARCHAR NOT NULL,
timeline_id VARCHAR NOT NULL,
shard_statuses JSONB NOT NULL,
PRIMARY KEY(tenant_id, timeline_id)
);

View File

@@ -1 +0,0 @@
ALTER TABLE nodes DROP COLUMN lifecycle;

View File

@@ -1 +0,0 @@
ALTER TABLE nodes ADD COLUMN lifecycle VARCHAR NOT NULL DEFAULT 'active';

View File

@@ -1 +0,0 @@
ALTER TABLE nodes DROP listen_grpc_addr, listen_grpc_port;

View File

@@ -1 +0,0 @@
ALTER TABLE nodes ADD listen_grpc_addr VARCHAR NULL, ADD listen_grpc_port INTEGER NULL;

View File

@@ -1 +0,0 @@
ALTER TABLE safekeepers ALTER COLUMN scheduling_policy SET DEFAULT 'pause';

View File

@@ -1 +0,0 @@
ALTER TABLE safekeepers ALTER COLUMN scheduling_policy SET DEFAULT 'activating';

View File

@@ -1 +0,0 @@
ALTER TABLE timelines DROP sk_set_notified_generation;

View File

@@ -1 +0,0 @@
ALTER TABLE timelines ADD sk_set_notified_generation INTEGER NOT NULL DEFAULT 1;

View File

@@ -1,2 +0,0 @@
DROP TABLE hadron_safekeepers;
DROP TABLE hadron_timeline_safekeepers;

View File

@@ -1,17 +0,0 @@
-- hadron_safekeepers keep track of all Safe Keeper nodes that exist in the system.
-- Upon startup, each Safe Keeper reaches out to the hadron cluster coordinator to register its node ID and listen addresses.
CREATE TABLE hadron_safekeepers (
sk_node_id BIGINT PRIMARY KEY NOT NULL,
listen_http_addr VARCHAR NOT NULL,
listen_http_port INTEGER NOT NULL,
listen_pg_addr VARCHAR NOT NULL,
listen_pg_port INTEGER NOT NULL
);
CREATE TABLE hadron_timeline_safekeepers (
timeline_id VARCHAR NOT NULL,
sk_node_id BIGINT NOT NULL,
legacy_endpoint_id UUID DEFAULT NULL,
PRIMARY KEY(timeline_id, sk_node_id)
);

View File

@@ -0,0 +1,10 @@
DROP TABLE IF EXISTS timelines CASCADE;
DROP TABLE IF EXISTS timeline_imports CASCADE;
DROP TABLE IF EXISTS tenant_shards CASCADE;
DROP TABLE IF EXISTS safekeepers CASCADE;
DROP TABLE IF EXISTS safekeeper_timeline_pending_ops CASCADE;
DROP TABLE IF EXISTS nodes CASCADE;
DROP TABLE IF EXISTS metadata_health CASCADE;
DROP TABLE IF EXISTS hadron_timeline_safekeepers CASCADE;
DROP TABLE IF EXISTS hadron_safekeepers CASCADE;
DROP TABLE IF EXISTS controllers CASCADE;

View File

@@ -0,0 +1,112 @@
/*
* This migration squashes all previous migrations into a single one.
* It will be applied in two cases:
* 1. Spinning up a new region. There's no previous migrations apart from the diesel setup in this case
* 2. For existing regions.
*
* Hence, this migration does nothing if the schema is already up to date.
*/
CREATE TABLE IF NOT EXISTS controllers (
address character varying NOT NULL,
started_at timestamp with time zone NOT NULL,
PRIMARY KEY(address, started_at)
);
CREATE TABLE IF NOT EXISTS hadron_safekeepers (
sk_node_id bigint PRIMARY KEY NOT NULL,
listen_http_addr character varying NOT NULL,
listen_http_port integer NOT NULL,
listen_pg_addr character varying NOT NULL,
listen_pg_port integer NOT NULL
);
CREATE TABLE IF NOT EXISTS hadron_timeline_safekeepers (
timeline_id character varying NOT NULL,
sk_node_id bigint NOT NULL,
legacy_endpoint_id uuid,
PRIMARY KEY(timeline_id, sk_node_id)
);
CREATE TABLE IF NOT EXISTS tenant_shards (
tenant_id character varying NOT NULL,
shard_number integer NOT NULL,
shard_count integer NOT NULL,
PRIMARY KEY(tenant_id, shard_number, shard_count),
shard_stripe_size integer NOT NULL,
generation integer,
generation_pageserver bigint,
placement_policy character varying NOT NULL,
splitting smallint NOT NULL,
config text NOT NULL,
scheduling_policy character varying DEFAULT '"Active"'::character varying NOT NULL,
preferred_az_id character varying
);
CREATE INDEX IF NOT EXISTS tenant_shards_tenant_id ON tenant_shards USING btree (tenant_id);
CREATE TABLE IF NOT EXISTS metadata_health (
tenant_id character varying NOT NULL,
shard_number integer NOT NULL,
shard_count integer NOT NULL,
PRIMARY KEY(tenant_id, shard_number, shard_count),
-- Rely on cascade behavior for delete
FOREIGN KEY(tenant_id, shard_number, shard_count) REFERENCES tenant_shards ON DELETE CASCADE,
healthy boolean DEFAULT true NOT NULL,
last_scrubbed_at timestamp with time zone DEFAULT now() NOT NULL
);
CREATE TABLE IF NOT EXISTS nodes (
node_id bigint PRIMARY KEY NOT NULL,
scheduling_policy character varying NOT NULL,
listen_http_addr character varying NOT NULL,
listen_http_port integer NOT NULL,
listen_pg_addr character varying NOT NULL,
listen_pg_port integer NOT NULL,
availability_zone_id character varying NOT NULL,
listen_https_port integer,
lifecycle character varying DEFAULT 'active'::character varying NOT NULL,
listen_grpc_addr character varying,
listen_grpc_port integer
);
CREATE TABLE IF NOT EXISTS safekeeper_timeline_pending_ops (
sk_id bigint NOT NULL,
tenant_id character varying NOT NULL,
timeline_id character varying NOT NULL,
generation integer NOT NULL,
op_kind character varying NOT NULL,
PRIMARY KEY(tenant_id, timeline_id, sk_id)
);
CREATE TABLE IF NOT EXISTS safekeepers (
id bigint PRIMARY KEY NOT NULL,
region_id text NOT NULL,
version bigint NOT NULL,
host text NOT NULL,
port integer NOT NULL,
http_port integer NOT NULL,
availability_zone_id text NOT NULL,
scheduling_policy character varying DEFAULT 'activating'::character varying NOT NULL,
https_port integer
);
CREATE TABLE IF NOT EXISTS timeline_imports (
tenant_id character varying NOT NULL,
timeline_id character varying NOT NULL,
shard_statuses jsonb NOT NULL,
PRIMARY KEY(tenant_id, timeline_id)
);
CREATE TABLE IF NOT EXISTS timelines (
tenant_id character varying NOT NULL,
timeline_id character varying NOT NULL,
start_lsn pg_lsn NOT NULL,
generation integer NOT NULL,
sk_set bigint[] NOT NULL,
new_sk_set bigint[],
cplane_notified_generation integer NOT NULL,
deleted_at timestamp with time zone,
sk_set_notified_generation integer DEFAULT 1 NOT NULL,
PRIMARY KEY(tenant_id, timeline_id)
);

View File

@@ -644,7 +644,6 @@ async fn handle_tenant_timeline_safekeeper_migrate(
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
// TODO(diko): it's not PS operation, there should be a different permission scope.
check_permissions(&req, Scope::PageServerApi)?;
maybe_rate_limit(&req, tenant_id).await;
@@ -666,23 +665,6 @@ async fn handle_tenant_timeline_safekeeper_migrate(
json_response(StatusCode::OK, ())
}
async fn handle_tenant_timeline_safekeeper_migrate_abort(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
// TODO(diko): it's not PS operation, there should be a different permission scope.
check_permissions(&req, Scope::PageServerApi)?;
maybe_rate_limit(&req, tenant_id).await;
service
.tenant_timeline_safekeeper_migrate_abort(tenant_id, timeline_id)
.await?;
json_response(StatusCode::OK, ())
}
async fn handle_tenant_timeline_lsn_lease(
service: Arc<Service>,
req: Request<Body>,
@@ -2629,16 +2611,6 @@ pub fn make_router(
)
},
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/safekeeper_migrate_abort",
|r| {
tenant_service_handler(
r,
handle_tenant_timeline_safekeeper_migrate_abort,
RequestName("v1_tenant_timeline_safekeeper_migrate_abort"),
)
},
)
// LSN lease passthrough to all shards
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",

View File

@@ -1230,7 +1230,10 @@ impl Service {
}
// It it is the same new_sk_set, we can continue the migration (retry).
} else {
if !is_migration_finished(&timeline) {
let prev_finished = timeline.cplane_notified_generation == timeline.generation
&& timeline.sk_set_notified_generation == timeline.generation;
if !prev_finished {
// The previous migration is committed, but the finish step failed.
// Safekeepers/cplane might not know about the last membership configuration.
// Retry the finish step to ensure smooth migration.
@@ -1542,8 +1545,6 @@ impl Service {
timeline_id: TimelineId,
timeline: &TimelinePersistence,
) -> Result<(), ApiError> {
tracing::info!(generation=?timeline.generation, sk_set=?timeline.sk_set, new_sk_set=?timeline.new_sk_set, "retrying finish safekeeper migration");
if timeline.new_sk_set.is_some() {
// Logical error, should never happen.
return Err(ApiError::InternalServerError(anyhow::anyhow!(
@@ -1623,120 +1624,4 @@ impl Service {
Ok(wal_positions[quorum_size - 1])
}
/// Abort ongoing safekeeper migration.
pub(crate) async fn tenant_timeline_safekeeper_migrate_abort(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<(), ApiError> {
// TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks.
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineSafekeeperMigrate,
)
.await;
// Fetch current timeline configuration from the configuration storage.
let timeline = self
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(timeline) = timeline else {
return Err(ApiError::NotFound(
anyhow::anyhow!(
"timeline {tenant_id}/{timeline_id} doesn't exist in timelines table"
)
.into(),
));
};
let mut generation = SafekeeperGeneration::new(timeline.generation as u32);
let Some(new_sk_set) = &timeline.new_sk_set else {
// No new_sk_set -> no active migration that we can abort.
tracing::info!("timeline has no active migration");
if !is_migration_finished(&timeline) {
// The last migration is committed, but the finish step failed.
// Safekeepers/cplane might not know about the last membership configuration.
// Retry the finish step to make the timeline state clean.
self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline)
.await?;
}
return Ok(());
};
tracing::info!(sk_set=?timeline.sk_set, ?new_sk_set, ?generation, "aborting timeline migration");
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
let new_safekeepers = self.get_safekeepers(new_sk_set)?;
let cur_sk_member_set =
Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
// Increment current generation and remove new_sk_set from the timeline to abort the migration.
generation = generation.next();
let mconf = membership::Configuration {
generation,
members: cur_sk_member_set,
new_members: None,
};
// Exclude safekeepers which were added during the current migration.
let cur_ids: HashSet<NodeId> = cur_safekeepers.iter().map(|sk| sk.get_id()).collect();
let exclude_safekeepers = new_safekeepers
.into_iter()
.filter(|sk| !cur_ids.contains(&sk.get_id()))
.collect::<Vec<_>>();
let exclude_requests = exclude_safekeepers
.iter()
.map(|sk| TimelinePendingOpPersistence {
sk_id: sk.skp.id,
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: generation.into_inner() as i32,
op_kind: SafekeeperTimelineOpKind::Exclude,
})
.collect::<Vec<_>>();
let cur_sk_set = cur_safekeepers
.iter()
.map(|sk| sk.get_id())
.collect::<Vec<_>>();
// Persist new mconf and exclude requests.
self.persistence
.update_timeline_membership(
tenant_id,
timeline_id,
generation,
&cur_sk_set,
None,
&exclude_requests,
)
.await?;
// At this point we have already commited the abort, but still need to notify
// cplane/safekeepers with the new mconf. That's what finish_safekeeper_migration does.
self.finish_safekeeper_migration(
tenant_id,
timeline_id,
&cur_safekeepers,
&mconf,
&exclude_safekeepers,
)
.await?;
Ok(())
}
}
fn is_migration_finished(timeline: &TimelinePersistence) -> bool {
timeline.cplane_notified_generation == timeline.generation
&& timeline.sk_set_notified_generation == timeline.generation
}

View File

@@ -2323,19 +2323,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
response.raise_for_status()
log.info(f"migrate_safekeepers success: {response.json()}")
def abort_safekeeper_migration(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
):
response = self.request(
"POST",
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate_abort",
headers=self.headers(TokenScope.PAGE_SERVER_API),
)
response.raise_for_status()
log.info(f"abort_safekeeper_migration success: {response.json()}")
def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]:
"""
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr": str, "listen_http_port": int}

View File

@@ -145,7 +145,6 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
stop_and_check_lsn(secondary, None)
if method == PromoteMethod.COMPUTE_CTL:
log.info("Restarting primary to check new config")
secondary.stop()
# In production, compute ultimately receives new compute spec from cplane.
secondary.respec(mode="Primary")

View File

@@ -460,91 +460,3 @@ def test_pull_from_most_advanced_sk(neon_env_builder: NeonEnvBuilder):
ep.start(safekeeper_generation=5, safekeepers=new_sk_set2)
assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]
def test_abort_safekeeper_migration(neon_env_builder: NeonEnvBuilder):
"""
Test that safekeeper migration can be aborted.
1. Insert failpoints and ensure the abort successfully reverts the timeline state.
2. Check that endpoint is operational after the abort.
"""
neon_env_builder.num_safekeepers = 2
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 1,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert len(mconf["sk_set"]) == 1
cur_sk = mconf["sk_set"][0]
cur_gen = 1
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"])
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
ep.safe_psql("CREATE TABLE t(a int)")
ep.safe_psql("INSERT INTO t VALUES (1)")
another_sk = [sk.id for sk in env.safekeepers if sk.id != cur_sk][0]
failpoints = [
"sk-migration-after-step-3",
"sk-migration-after-step-4",
"sk-migration-after-step-5",
"sk-migration-after-step-7",
]
for fp in failpoints:
env.storage_controller.configure_failpoints((fp, "return(1)"))
with pytest.raises(StorageControllerApiException, match=f"failpoint {fp}"):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [another_sk]
)
cur_gen += 1
env.storage_controller.configure_failpoints((fp, "off"))
# We should have a joint mconf after the failure.
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == cur_gen
assert mconf["sk_set"] == [cur_sk]
assert mconf["new_sk_set"] == [another_sk]
env.storage_controller.abort_safekeeper_migration(env.initial_tenant, env.initial_timeline)
cur_gen += 1
# Abort should revert the timeline to the previous sk_set and increment the generation.
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == cur_gen
assert mconf["sk_set"] == [cur_sk]
assert mconf["new_sk_set"] is None
assert ep.safe_psql("SHOW neon.safekeepers")[0][0].startswith(f"g#{cur_gen}:")
ep.safe_psql(f"INSERT INTO t VALUES ({cur_gen})")
# After step-8 the final mconf is committed and the migration is not abortable anymore.
# So the abort should not abort anything.
env.storage_controller.configure_failpoints(("sk-migration-after-step-8", "return(1)"))
with pytest.raises(StorageControllerApiException, match="failpoint sk-migration-after-step-8"):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [another_sk]
)
cur_gen += 2
env.storage_controller.configure_failpoints((fp, "off"))
env.storage_controller.abort_safekeeper_migration(env.initial_tenant, env.initial_timeline)
# The migration is fully committed, no abort should have been performed.
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == cur_gen
assert mconf["sk_set"] == [another_sk]
assert mconf["new_sk_set"] is None
ep.safe_psql(f"INSERT INTO t VALUES ({cur_gen})")
ep.clear_buffers()
assert ep.safe_psql("SELECT * FROM t") == [(i + 1,) for i in range(cur_gen) if i % 2 == 0]