mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-07 04:30:36 +00:00
Compare commits
4 Commits
conrad/pro
...
local-prox
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
87c793f58c | ||
|
|
52a7d780ad | ||
|
|
2255a8ebac | ||
|
|
e109d5aac0 |
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -33,7 +33,7 @@ jobs:
|
|||||||
github-event-name: ${{ github.event_name }}
|
github-event-name: ${{ github.event_name }}
|
||||||
|
|
||||||
cancel-previous-e2e-tests:
|
cancel-previous-e2e-tests:
|
||||||
needs: [ check-permissions ]
|
needs: [ check-permissions, promote-images, tag ]
|
||||||
if: github.event_name == 'pull_request'
|
if: github.event_name == 'pull_request'
|
||||||
runs-on: ubuntu-22.04
|
runs-on: ubuntu-22.04
|
||||||
|
|
||||||
@@ -518,7 +518,7 @@ jobs:
|
|||||||
|
|
||||||
trigger-e2e-tests:
|
trigger-e2e-tests:
|
||||||
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft') || github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy' }}
|
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft') || github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy' }}
|
||||||
needs: [ check-permissions, promote-images, tag ]
|
needs: [ check-permissions, promote-images, tag, cancel-previous-e2e-tests ]
|
||||||
uses: ./.github/workflows/trigger-e2e-tests.yml
|
uses: ./.github/workflows/trigger-e2e-tests.yml
|
||||||
secrets: inherit
|
secrets: inherit
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ testing = []
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
|
# camino.workspace = true
|
||||||
chrono.workspace = true
|
chrono.workspace = true
|
||||||
cfg-if.workspace = true
|
cfg-if.workspace = true
|
||||||
clap.workspace = true
|
clap.workspace = true
|
||||||
|
|||||||
@@ -264,68 +264,72 @@ async fn handle_configure_request(
|
|||||||
|
|
||||||
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
||||||
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
|
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
|
||||||
if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
|
match serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
|
||||||
let spec = request.spec;
|
Ok(request) => {
|
||||||
|
let spec = request.spec;
|
||||||
|
|
||||||
let parsed_spec = match ParsedSpec::try_from(spec) {
|
let parsed_spec = match ParsedSpec::try_from(spec) {
|
||||||
Ok(ps) => ps,
|
Ok(ps) => ps,
|
||||||
Err(msg) => return Err((msg, StatusCode::BAD_REQUEST)),
|
Err(msg) => return Err((msg, StatusCode::BAD_REQUEST)),
|
||||||
};
|
};
|
||||||
|
|
||||||
// XXX: wrap state update under lock in code blocks. Otherwise,
|
// XXX: wrap state update under lock in code blocks. Otherwise,
|
||||||
// we will try to `Send` `mut state` into the spawned thread
|
// we will try to `Send` `mut state` into the spawned thread
|
||||||
// bellow, which will cause error:
|
// bellow, which will cause error:
|
||||||
// ```
|
// ```
|
||||||
// error: future cannot be sent between threads safely
|
// error: future cannot be sent between threads safely
|
||||||
// ```
|
// ```
|
||||||
{
|
{
|
||||||
let mut state = compute.state.lock().unwrap();
|
let mut state = compute.state.lock().unwrap();
|
||||||
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
||||||
let msg = format!(
|
let msg = format!(
|
||||||
"invalid compute status for configuration request: {:?}",
|
"invalid compute status for configuration request: {:?}",
|
||||||
state.status.clone()
|
state.status.clone()
|
||||||
);
|
);
|
||||||
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
||||||
}
|
|
||||||
state.pspec = Some(parsed_spec);
|
|
||||||
state.status = ComputeStatus::ConfigurationPending;
|
|
||||||
compute.state_changed.notify_all();
|
|
||||||
drop(state);
|
|
||||||
info!("set new spec and notified waiters");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Spawn a blocking thread to wait for compute to become Running.
|
|
||||||
// This is needed to do not block the main pool of workers and
|
|
||||||
// be able to serve other requests while some particular request
|
|
||||||
// is waiting for compute to finish configuration.
|
|
||||||
let c = compute.clone();
|
|
||||||
task::spawn_blocking(move || {
|
|
||||||
let mut state = c.state.lock().unwrap();
|
|
||||||
while state.status != ComputeStatus::Running {
|
|
||||||
state = c.state_changed.wait(state).unwrap();
|
|
||||||
info!(
|
|
||||||
"waiting for compute to become Running, current status: {:?}",
|
|
||||||
state.status
|
|
||||||
);
|
|
||||||
|
|
||||||
if state.status == ComputeStatus::Failed {
|
|
||||||
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
|
||||||
let msg = format!("compute configuration failed: {:?}", err);
|
|
||||||
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
|
|
||||||
}
|
}
|
||||||
|
state.pspec = Some(parsed_spec);
|
||||||
|
state.status = ComputeStatus::ConfigurationPending;
|
||||||
|
compute.state_changed.notify_all();
|
||||||
|
drop(state);
|
||||||
|
info!("set new spec and notified waiters");
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
// Spawn a blocking thread to wait for compute to become Running.
|
||||||
})
|
// This is needed to do not block the main pool of workers and
|
||||||
.await
|
// be able to serve other requests while some particular request
|
||||||
.unwrap()?;
|
// is waiting for compute to finish configuration.
|
||||||
|
let c = compute.clone();
|
||||||
|
task::spawn_blocking(move || {
|
||||||
|
let mut state = c.state.lock().unwrap();
|
||||||
|
while state.status != ComputeStatus::Running {
|
||||||
|
state = c.state_changed.wait(state).unwrap();
|
||||||
|
info!(
|
||||||
|
"waiting for compute to become Running, current status: {:?}",
|
||||||
|
state.status
|
||||||
|
);
|
||||||
|
|
||||||
// Return current compute state if everything went well.
|
if state.status == ComputeStatus::Failed {
|
||||||
let state = compute.state.lock().unwrap().clone();
|
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||||
let status_response = status_response_from_state(&state);
|
let msg = format!("compute configuration failed: {:?}", err);
|
||||||
Ok(serde_json::to_string(&status_response).unwrap())
|
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
|
||||||
} else {
|
}
|
||||||
Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()?;
|
||||||
|
|
||||||
|
// Return current compute state if everything went well.
|
||||||
|
let state = compute.state.lock().unwrap().clone();
|
||||||
|
let status_response = status_response_from_state(&state);
|
||||||
|
Ok(serde_json::to_string(&status_response).unwrap())
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("could not parse spec: {spec_raw}");
|
||||||
|
Err((format!("invalid spec: {err:?}"), StatusCode::BAD_REQUEST))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ pub mod catalog;
|
|||||||
pub mod compute;
|
pub mod compute;
|
||||||
pub mod disk_quota;
|
pub mod disk_quota;
|
||||||
pub mod extension_server;
|
pub mod extension_server;
|
||||||
|
// pub mod local_proxy;
|
||||||
pub mod lsn_lease;
|
pub mod lsn_lease;
|
||||||
mod migration;
|
mod migration;
|
||||||
pub mod monitor;
|
pub mod monitor;
|
||||||
|
|||||||
56
compute_tools/src/local_proxy.rs
Normal file
56
compute_tools/src/local_proxy.rs
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
//! Local Proxy is a feature of our BaaS Neon Authorize project.
|
||||||
|
//!
|
||||||
|
//! Local Proxy validates JWTs and manages the pg_session_jwt extension.
|
||||||
|
//! It also maintains a connection pool to postgres.
|
||||||
|
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use camino::Utf8Path;
|
||||||
|
use compute_api::spec::LocalProxySpec;
|
||||||
|
use nix::sys::signal::Signal;
|
||||||
|
use utils::pid_file::{self, PidFileRead};
|
||||||
|
|
||||||
|
pub fn configure(local_proxy: &LocalProxySpec) -> Result<()> {
|
||||||
|
write_local_proxy_conf("/etc/local_proxy/config.json".as_ref(), local_proxy)?;
|
||||||
|
notify_local_proxy("/etc/local_proxy/pid".as_ref())?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create or completely rewrite configuration file specified by `path`
|
||||||
|
fn write_local_proxy_conf(path: &Utf8Path, local_proxy: &LocalProxySpec) -> Result<()> {
|
||||||
|
let config =
|
||||||
|
serde_json::to_string_pretty(local_proxy).context("serializing LocalProxySpec to json")?;
|
||||||
|
std::fs::write(path, config).with_context(|| format!("writing {path}"))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Notify local proxy about a new config file.
|
||||||
|
fn notify_local_proxy(path: &Utf8Path) -> Result<()> {
|
||||||
|
match pid_file::read(path)? {
|
||||||
|
// if the file doesn't exist, or isn't locked, local_proxy isn't running
|
||||||
|
// and will naturally pick up our config later
|
||||||
|
PidFileRead::NotExist | PidFileRead::NotHeldByAnyProcess(_) => {}
|
||||||
|
PidFileRead::LockedByOtherProcess(pid) => {
|
||||||
|
// From the pid_file docs:
|
||||||
|
//
|
||||||
|
// > 1. The other process might exit at any time, turning the given PID stale.
|
||||||
|
// > 2. There is a small window in which `claim_for_current_process` has already
|
||||||
|
// > locked the file but not yet updates its contents. [`read`] will return
|
||||||
|
// > this variant here, but with the old file contents, i.e., a stale PID.
|
||||||
|
// >
|
||||||
|
// > The kernel is free to recycle PID once it has been `wait(2)`ed upon by
|
||||||
|
// > its creator. Thus, acting upon a stale PID, e.g., by issuing a `kill`
|
||||||
|
// > system call on it, bears the risk of killing an unrelated process.
|
||||||
|
// > This is an inherent limitation of using pidfiles.
|
||||||
|
// > The only race-free solution is to have a supervisor-process with a lifetime
|
||||||
|
// > that exceeds that of all of its child-processes (e.g., `runit`, `supervisord`).
|
||||||
|
//
|
||||||
|
// This is an ok risk as we only send a SIGHUP which likely won't actually
|
||||||
|
// kill the process, only reload config.
|
||||||
|
nix::sys::signal::kill(pid, Signal::SIGHUP).context("sending signal to local_proxy")?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -599,6 +599,7 @@ impl Endpoint {
|
|||||||
remote_extensions,
|
remote_extensions,
|
||||||
pgbouncer_settings: None,
|
pgbouncer_settings: None,
|
||||||
shard_stripe_size: Some(shard_stripe_size),
|
shard_stripe_size: Some(shard_stripe_size),
|
||||||
|
local_proxy_config: None,
|
||||||
};
|
};
|
||||||
let spec_path = self.endpoint_path().join("spec.json");
|
let spec_path = self.endpoint_path().join("spec.json");
|
||||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||||
|
|||||||
@@ -106,6 +106,11 @@ pub struct ComputeSpec {
|
|||||||
// Stripe size for pageserver sharding, in pages
|
// Stripe size for pageserver sharding, in pages
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub shard_stripe_size: Option<usize>,
|
pub shard_stripe_size: Option<usize>,
|
||||||
|
|
||||||
|
/// Local Proxy configuration used for JWT authentication
|
||||||
|
#[serde(default)]
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub local_proxy_config: Option<LocalProxySpec>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
|
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
|
||||||
|
|||||||
@@ -109,7 +109,7 @@ struct SqlOverHttpArgs {
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let _logging_guard = proxy::logging::init().await?;
|
let _logging_guard = proxy::logging::init_local_proxy()?;
|
||||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||||
|
|
||||||
@@ -138,7 +138,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
// in order to trigger the appropriate SIGHUP on config change.
|
// in order to trigger the appropriate SIGHUP on config change.
|
||||||
//
|
//
|
||||||
// This also claims a "lock" that makes sure only one instance
|
// This also claims a "lock" that makes sure only one instance
|
||||||
// of local-proxy runs at a time.
|
// of local_proxy runs at a time.
|
||||||
let _process_guard = loop {
|
let _process_guard = loop {
|
||||||
match pid_file::claim_for_current_process(&args.pid_path) {
|
match pid_file::claim_for_current_process(&args.pid_path) {
|
||||||
Ok(guard) => break guard,
|
Ok(guard) => break guard,
|
||||||
@@ -164,12 +164,6 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
16,
|
16,
|
||||||
));
|
));
|
||||||
|
|
||||||
// write the process ID to a file so that compute-ctl can find our process later
|
|
||||||
// in order to trigger the appropriate SIGHUP on config change.
|
|
||||||
let pid = std::process::id();
|
|
||||||
info!("process running in PID {pid}");
|
|
||||||
std::fs::write(args.pid_path, format!("{pid}\n")).context("writing PID to file")?;
|
|
||||||
|
|
||||||
let mut maintenance_tasks = JoinSet::new();
|
let mut maintenance_tasks = JoinSet::new();
|
||||||
|
|
||||||
let refresh_config_notify = Arc::new(Notify::new());
|
let refresh_config_notify = Arc::new(Notify::new());
|
||||||
@@ -182,9 +176,9 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
// trigger the first config load **after** setting up the signal hook
|
// trigger the first config load **after** setting up the signal hook
|
||||||
// to avoid the race condition where:
|
// to avoid the race condition where:
|
||||||
// 1. No config file registered when local-proxy starts up
|
// 1. No config file registered when local_proxy starts up
|
||||||
// 2. The config file is written but the signal hook is not yet received
|
// 2. The config file is written but the signal hook is not yet received
|
||||||
// 3. local-proxy completes startup but has no config loaded, despite there being a registerd config.
|
// 3. local_proxy completes startup but has no config loaded, despite there being a registerd config.
|
||||||
refresh_config_notify.notify_one();
|
refresh_config_notify.notify_one();
|
||||||
tokio::spawn(refresh_config_loop(args.config_path, refresh_config_notify));
|
tokio::spawn(refresh_config_loop(args.config_path, refresh_config_notify));
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,13 @@
|
|||||||
|
use tracing::Subscriber;
|
||||||
use tracing_subscriber::{
|
use tracing_subscriber::{
|
||||||
filter::{EnvFilter, LevelFilter},
|
filter::{EnvFilter, LevelFilter},
|
||||||
|
fmt::{
|
||||||
|
format::{Format, Full},
|
||||||
|
time::SystemTime,
|
||||||
|
FormatEvent, FormatFields,
|
||||||
|
},
|
||||||
prelude::*,
|
prelude::*,
|
||||||
|
registry::LookupSpan,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Initialize logging and OpenTelemetry tracing and exporter.
|
/// Initialize logging and OpenTelemetry tracing and exporter.
|
||||||
@@ -33,6 +40,45 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
|
|||||||
Ok(LoggingGuard)
|
Ok(LoggingGuard)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Initialize logging for local_proxy with log prefix and no opentelemetry.
|
||||||
|
///
|
||||||
|
/// Logging can be configured using `RUST_LOG` environment variable.
|
||||||
|
pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
|
||||||
|
let env_filter = EnvFilter::builder()
|
||||||
|
.with_default_directive(LevelFilter::INFO.into())
|
||||||
|
.from_env_lossy();
|
||||||
|
|
||||||
|
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||||
|
.with_ansi(false)
|
||||||
|
.with_writer(std::io::stderr)
|
||||||
|
.event_format(LocalProxyFormatter(Format::default().with_target(false)));
|
||||||
|
|
||||||
|
tracing_subscriber::registry()
|
||||||
|
.with(env_filter)
|
||||||
|
.with(fmt_layer)
|
||||||
|
.try_init()?;
|
||||||
|
|
||||||
|
Ok(LoggingGuard)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct LocalProxyFormatter(Format<Full, SystemTime>);
|
||||||
|
|
||||||
|
impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
|
||||||
|
where
|
||||||
|
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||||
|
N: for<'a> FormatFields<'a> + 'static,
|
||||||
|
{
|
||||||
|
fn format_event(
|
||||||
|
&self,
|
||||||
|
ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
|
||||||
|
mut writer: tracing_subscriber::fmt::format::Writer<'_>,
|
||||||
|
event: &tracing::Event<'_>,
|
||||||
|
) -> std::fmt::Result {
|
||||||
|
writer.write_str("[local_proxy] ")?;
|
||||||
|
self.0.format_event(ctx, writer, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct LoggingGuard;
|
pub struct LoggingGuard;
|
||||||
|
|
||||||
impl Drop for LoggingGuard {
|
impl Drop for LoggingGuard {
|
||||||
|
|||||||
Reference in New Issue
Block a user