mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 13:10:38 +00:00
Compare commits
11 Commits
fix_audit_
...
pump_prefe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5843b50183 | ||
|
|
24233d9976 | ||
|
|
735ccee5b2 | ||
|
|
a729bc98a9 | ||
|
|
de0a8d78c2 | ||
|
|
d7f7d33b0e | ||
|
|
cfbe7a0b3f | ||
|
|
049e1c508d | ||
|
|
5f3551e405 | ||
|
|
3e5884ff01 | ||
|
|
9fc7c22cc9 |
@@ -1,6 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::os::unix::fs::{PermissionsExt, symlink};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
@@ -153,11 +153,6 @@ pub struct ComputeState {
|
||||
pub startup_span: Option<tracing::span::Span>,
|
||||
|
||||
pub metrics: ComputeMetrics,
|
||||
|
||||
/// current audit log level
|
||||
/// to know if it is already configured, or we need to set up audit
|
||||
/// when compute receives a new spec
|
||||
pub audit_log_level: ComputeAudit,
|
||||
}
|
||||
|
||||
impl ComputeState {
|
||||
@@ -170,7 +165,6 @@ impl ComputeState {
|
||||
pspec: None,
|
||||
startup_span: None,
|
||||
metrics: ComputeMetrics::default(),
|
||||
audit_log_level: ComputeAudit::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -626,10 +620,16 @@ impl ComputeNode {
|
||||
});
|
||||
}
|
||||
|
||||
// If extended compute audit is enabled configure and start rsyslog
|
||||
if pspec.spec.audit_log_level == ComputeAudit::Hipaa {
|
||||
let log_directory_path = self.get_audit_log_dir().to_string_lossy().to_string();
|
||||
configure_audit_rsyslog(&log_directory_path, pspec.spec.audit_log_level.as_str())?;
|
||||
// Configure and start rsyslog for HIPAA if necessary
|
||||
if let ComputeAudit::Hipaa = pspec.spec.audit_log_level {
|
||||
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
|
||||
if remote_endpoint.is_empty() {
|
||||
anyhow::bail!("AUDIT_LOGGING_ENDPOINT is empty");
|
||||
}
|
||||
|
||||
let log_directory_path = Path::new(&self.params.pgdata).join("log");
|
||||
let log_directory_path = log_directory_path.to_string_lossy().to_string();
|
||||
configure_audit_rsyslog(log_directory_path.clone(), "hipaa", &remote_endpoint)?;
|
||||
|
||||
// Launch a background task to clean up the audit logs
|
||||
launch_pgaudit_gc(log_directory_path);
|
||||
@@ -684,11 +684,6 @@ impl ComputeNode {
|
||||
});
|
||||
}
|
||||
|
||||
// after all the configuration is done
|
||||
// preserve the information about the current audit log level
|
||||
// so that we don't relaunch rsyslog on every spec change
|
||||
self.set_audit_log_level(pspec.spec.audit_log_level);
|
||||
|
||||
// All done!
|
||||
let startup_end_time = Utc::now();
|
||||
let metrics = {
|
||||
@@ -843,19 +838,6 @@ impl ComputeNode {
|
||||
self.state.lock().unwrap().status
|
||||
}
|
||||
|
||||
pub fn set_audit_log_level(&self, audit_log_level: ComputeAudit) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.audit_log_level = audit_log_level;
|
||||
}
|
||||
|
||||
pub fn get_audit_log_level(&self) -> ComputeAudit {
|
||||
self.state.lock().unwrap().audit_log_level
|
||||
}
|
||||
|
||||
pub fn get_audit_log_dir(&self) -> PathBuf {
|
||||
Path::new(&self.params.pgdata).join("log")
|
||||
}
|
||||
|
||||
pub fn get_timeline_id(&self) -> Option<TimelineId> {
|
||||
self.state
|
||||
.lock()
|
||||
@@ -1566,29 +1548,6 @@ impl ComputeNode {
|
||||
});
|
||||
}
|
||||
|
||||
// If extended compute audit is enabled configure and start rsyslog
|
||||
// We check that the audit_log_level changed compared to the previous spec and skip this step if not.
|
||||
let audit_log_level = self.get_audit_log_level();
|
||||
|
||||
if spec.audit_log_level == ComputeAudit::Hipaa && audit_log_level != spec.audit_log_level {
|
||||
info!(
|
||||
"Configuring audit logging because audit_log_level changed from {:?} to {:?}",
|
||||
audit_log_level, spec.audit_log_level
|
||||
);
|
||||
|
||||
let log_directory_path = self.get_audit_log_dir().to_string_lossy().to_string();
|
||||
configure_audit_rsyslog(&log_directory_path, spec.audit_log_level.as_str())?;
|
||||
|
||||
// Launch a background task to clean up the audit logs
|
||||
// If rsyslog was already configured, we don't need to start this process again.
|
||||
match audit_log_level {
|
||||
ComputeAudit::Disabled | ComputeAudit::Log => {
|
||||
launch_pgaudit_gc(log_directory_path);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Write new config
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
config::write_postgres_conf(
|
||||
@@ -1598,14 +1557,7 @@ impl ComputeNode {
|
||||
&self.compute_ctl_config.tls,
|
||||
)?;
|
||||
|
||||
// Override the skip_catalog_updates flag
|
||||
// if we need to install new extensions
|
||||
//
|
||||
// Check that audit_log_level changed compared to the previous spec and skip this step if not.
|
||||
// All operations are idempotent, so this is just a performance optimization.
|
||||
let force_catalog_updates = audit_log_level != spec.audit_log_level;
|
||||
|
||||
if !spec.skip_pg_catalog_updates || force_catalog_updates {
|
||||
if !spec.skip_pg_catalog_updates {
|
||||
let max_concurrent_connections = spec.reconfigure_concurrency;
|
||||
// Temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are reconfiguring:
|
||||
@@ -1630,11 +1582,6 @@ impl ComputeNode {
|
||||
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
// after all the configuration is done
|
||||
// preserve the information about the current audit log level
|
||||
// so that we don't relaunch rsyslog on every spec change
|
||||
self.set_audit_log_level(spec.audit_log_level);
|
||||
|
||||
let unknown_op = "unknown".to_string();
|
||||
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
|
||||
info!(
|
||||
|
||||
@@ -200,9 +200,8 @@ pub fn write_postgres_conf(
|
||||
)?;
|
||||
// This log level is very verbose
|
||||
// but this is necessary for HIPAA compliance.
|
||||
// Exclude 'misc' category, because it doesn't contain anything relevant.
|
||||
// Exclude 'misc' category, because it doesn't contain anythig relevant.
|
||||
writeln!(file, "pgaudit.log='all, -misc'")?;
|
||||
// Log parameters for all queries
|
||||
writeln!(file, "pgaudit.log_parameter=on")?;
|
||||
// Disable logging of catalog queries
|
||||
// The catalog doesn't contain sensitive data, so we don't need to audit it.
|
||||
|
||||
@@ -9,7 +9,6 @@ use anyhow::{Context, Result, anyhow};
|
||||
use tracing::{error, info, instrument, warn};
|
||||
|
||||
const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
|
||||
const AUDIT_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
|
||||
|
||||
fn get_rsyslog_pid() -> Option<String> {
|
||||
let output = Command::new("pgrep")
|
||||
@@ -49,43 +48,32 @@ fn restart_rsyslog() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn configure_audit_rsyslog(log_directory: &str, audit_log_level: &str) -> Result<()> {
|
||||
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT")?;
|
||||
if remote_endpoint.is_empty() {
|
||||
return Err(anyhow!("AUDIT_LOGGING_ENDPOINT is not set"));
|
||||
}
|
||||
|
||||
let old_config_content = match std::fs::read_to_string(AUDIT_LOGS_CONF_PATH) {
|
||||
Ok(c) => c,
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
|
||||
pub fn configure_audit_rsyslog(
|
||||
log_directory: String,
|
||||
tag: &str,
|
||||
remote_endpoint: &str,
|
||||
) -> Result<()> {
|
||||
let config_content: String = format!(
|
||||
include_str!("config_template/compute_audit_rsyslog_template.conf"),
|
||||
log_directory = log_directory,
|
||||
tag = audit_log_level,
|
||||
tag = tag,
|
||||
remote_endpoint = remote_endpoint
|
||||
);
|
||||
|
||||
if old_config_content == config_content {
|
||||
info!("rsyslog configuration is up-to-date");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!("rsyslog config_content: {}", config_content);
|
||||
|
||||
let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
|
||||
let mut file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.open(AUDIT_LOGS_CONF_PATH)?;
|
||||
.open(rsyslog_conf_path)?;
|
||||
|
||||
file.write_all(config_content.as_bytes())?;
|
||||
|
||||
info!(
|
||||
"rsyslog configuration file {} added successfully. Starting rsyslogd",
|
||||
AUDIT_LOGS_CONF_PATH
|
||||
rsyslog_conf_path
|
||||
);
|
||||
|
||||
// start the service, using the configuration
|
||||
|
||||
@@ -184,6 +184,8 @@ pub struct NeonStorageControllerConf {
|
||||
pub timelines_onto_safekeepers: bool,
|
||||
|
||||
pub use_https_safekeeper_api: bool,
|
||||
|
||||
pub use_local_compute_notifications: bool,
|
||||
}
|
||||
|
||||
impl NeonStorageControllerConf {
|
||||
@@ -213,6 +215,7 @@ impl Default for NeonStorageControllerConf {
|
||||
use_https_pageserver_api: false,
|
||||
timelines_onto_safekeepers: false,
|
||||
use_https_safekeeper_api: false,
|
||||
use_local_compute_notifications: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -555,6 +555,10 @@ impl StorageController {
|
||||
args.push("--use-https-safekeeper-api".to_string());
|
||||
}
|
||||
|
||||
if self.config.use_local_compute_notifications {
|
||||
args.push("--use-local-compute-notifications".to_string());
|
||||
}
|
||||
|
||||
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
|
||||
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
|
||||
}
|
||||
|
||||
@@ -160,6 +160,12 @@ pub struct ComputeSpec {
|
||||
pub drop_subscriptions_before_start: bool,
|
||||
|
||||
/// Log level for audit logging:
|
||||
///
|
||||
/// Disabled - no audit logging. This is the default.
|
||||
/// log - log masked statements to the postgres log using pgaudit extension
|
||||
/// hipaa - log unmasked statements to the file using pgaudit and pgauditlogtofile extension
|
||||
///
|
||||
/// Extensions should be present in shared_preload_libraries
|
||||
#[serde(default)]
|
||||
pub audit_log_level: ComputeAudit,
|
||||
}
|
||||
@@ -282,27 +288,16 @@ impl ComputeMode {
|
||||
}
|
||||
|
||||
/// Log level for audit logging
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
|
||||
/// Disabled, log, hipaa
|
||||
/// Default is Disabled
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
|
||||
pub enum ComputeAudit {
|
||||
#[default]
|
||||
/// no audit logging. This is the default.
|
||||
Disabled,
|
||||
/// write masked audit log statements to the postgres log using pgaudit extension
|
||||
Log,
|
||||
/// log unmasked statements to the file using pgaudit and pgauditlogtofile extensions
|
||||
Hipaa,
|
||||
}
|
||||
|
||||
impl ComputeAudit {
|
||||
pub fn as_str(&self) -> &str {
|
||||
match self {
|
||||
ComputeAudit::Disabled => "disabled",
|
||||
ComputeAudit::Log => "log",
|
||||
ComputeAudit::Hipaa => "hipaa",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct Cluster {
|
||||
pub cluster_id: Option<String>,
|
||||
|
||||
@@ -38,6 +38,7 @@ use std::panic::AssertUnwindSafe;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::FutureExt;
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -584,18 +585,25 @@ pub async fn shutdown_tasks(
|
||||
// warn to catch these in tests; there shouldn't be any
|
||||
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
|
||||
}
|
||||
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
|
||||
const INITIAL_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
const PERIODIC_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
if tokio::time::timeout(INITIAL_COMPLAIN_TIMEOUT, &mut join_handle)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
// allow some time to elapse before logging to cut down the number of log
|
||||
// lines.
|
||||
info!("waiting for task {} to shut down", task.name);
|
||||
// we never handled this return value, but:
|
||||
// - we don't deschedule which would lead to is_cancelled
|
||||
// - panics are already logged (is_panicked)
|
||||
// - task errors are already logged in the wrapper
|
||||
let _ = join_handle.await;
|
||||
loop {
|
||||
tokio::select! {
|
||||
// we never handled this return value, but:
|
||||
// - we don't deschedule which would lead to is_cancelled
|
||||
// - panics are already logged (is_panicked)
|
||||
// - task errors are already logged in the wrapper
|
||||
_ = &mut join_handle => break,
|
||||
_ = tokio::time::sleep(PERIODIC_COMPLAIN_TIMEOUT) => info!("still waiting for task {} to shut down", task.name),
|
||||
}
|
||||
}
|
||||
info!("task {} completed", task.name);
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -694,7 +694,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
{
|
||||
struct iovec iov[PG_IOV_MAX];
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
|
||||
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
|
||||
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
|
||||
int iteration_hits = 0;
|
||||
int iteration_misses = 0;
|
||||
uint64 io_time_us = 0;
|
||||
@@ -708,10 +708,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0);
|
||||
iov[i].iov_base = buffers[buf_offset + i];
|
||||
iov[i].iov_len = BLCKSZ;
|
||||
BITMAP_CLR(mask, buf_offset + i);
|
||||
}
|
||||
if (n_blocks_to_read == 0)
|
||||
{
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
{
|
||||
BITMAP_CLR(mask, buf_offset + i);
|
||||
}
|
||||
buf_offset += blocks_in_chunk;
|
||||
nblocks -= blocks_in_chunk;
|
||||
blkno += blocks_in_chunk;
|
||||
@@ -744,6 +747,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
if (entry == NULL)
|
||||
{
|
||||
/* Pages are not cached */
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
{
|
||||
BITMAP_CLR(mask, buf_offset + i);
|
||||
}
|
||||
lfc_ctl->misses += blocks_in_chunk;
|
||||
pgBufferUsage.file_cache.misses += blocks_in_chunk;
|
||||
LWLockRelease(lfc_lock);
|
||||
@@ -766,6 +773,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
{
|
||||
FileCacheBlockState state = UNAVAILABLE;
|
||||
bool sleeping = false;
|
||||
|
||||
if (!BITMAP_ISSET(mask, buf_offset + i))
|
||||
continue;
|
||||
|
||||
while (lfc_ctl->generation == generation)
|
||||
{
|
||||
state = GET_STATE(entry, chunk_offs + i);
|
||||
@@ -789,11 +800,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
if (state == AVAILABLE)
|
||||
{
|
||||
BITMAP_SET(mask, buf_offset + i);
|
||||
iteration_hits++;
|
||||
}
|
||||
else
|
||||
{
|
||||
BITMAP_CLR(mask, buf_offset + i);
|
||||
iteration_misses++;
|
||||
}
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
@@ -801,15 +814,36 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
if (iteration_hits != 0)
|
||||
{
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
rc = preadv(lfc_desc, iov, blocks_in_chunk,
|
||||
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != (BLCKSZ * blocks_in_chunk))
|
||||
if (blocks_in_chunk == n_blocks_to_read)
|
||||
{
|
||||
lfc_disable("read");
|
||||
return -1;
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
rc = preadv(lfc_desc, iov, blocks_in_chunk,
|
||||
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != (BLCKSZ * blocks_in_chunk))
|
||||
{
|
||||
lfc_disable("read");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Some blocks are already prefetched in provided buffers, we should not rewrite them, so we can not use vector read */
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
{
|
||||
if (BITMAP_ISSET(mask, buf_offset + i))
|
||||
{
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
rc = pread(lfc_desc, iov[i].iov_base, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs + i) * BLCKSZ);
|
||||
pgstat_report_wait_end();
|
||||
if (rc != BLCKSZ)
|
||||
{
|
||||
lfc_disable("read");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1000,12 +1034,12 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
LWLockRelease(lfc_lock);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
lwlsn = neon_get_lwlsn(rinfo, forknum, blkno);
|
||||
|
||||
if (lwlsn > lsn)
|
||||
{
|
||||
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X",
|
||||
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_modified_since LSN %X/%X",
|
||||
blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn));
|
||||
LWLockRelease(lfc_lock);
|
||||
return false;
|
||||
|
||||
@@ -1142,37 +1142,23 @@ pageserver_try_receive(shardno_t shard_no)
|
||||
NeonResponse *resp;
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
PGconn *pageserver_conn = shard->conn;
|
||||
/* read response */
|
||||
int rc;
|
||||
int rc;
|
||||
|
||||
if (shard->state != PS_Connected)
|
||||
return NULL;
|
||||
|
||||
Assert(pageserver_conn);
|
||||
|
||||
while (true)
|
||||
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
|
||||
if (rc == 0)
|
||||
{
|
||||
if (PQisBusy(shard->conn))
|
||||
if (!PQconsumeInput(shard->conn))
|
||||
{
|
||||
WaitEvent event;
|
||||
if (WaitEventSetWait(shard->wes_read, 0, &event, 1,
|
||||
WAIT_EVENT_NEON_PS_READ) != 1
|
||||
|| (event.events & WL_SOCKET_READABLE) == 0)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
|
||||
if (rc == 0)
|
||||
{
|
||||
if (!PQconsumeInput(shard->conn))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
break;
|
||||
}
|
||||
|
||||
if (rc == 0)
|
||||
return NULL;
|
||||
else if (rc > 0)
|
||||
|
||||
@@ -624,16 +624,19 @@ impl ComputeHook {
|
||||
MaybeSendResult::Transmit((request, lock)) => (request, lock),
|
||||
};
|
||||
|
||||
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
|
||||
Some(if control_plane_url.ends_with('/') {
|
||||
format!("{control_plane_url}notify-attach")
|
||||
let result = if !self.config.use_local_compute_notifications {
|
||||
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
|
||||
Some(if control_plane_url.ends_with('/') {
|
||||
format!("{control_plane_url}notify-attach")
|
||||
} else {
|
||||
format!("{control_plane_url}/notify-attach")
|
||||
})
|
||||
} else {
|
||||
format!("{control_plane_url}/notify-attach")
|
||||
})
|
||||
} else {
|
||||
self.config.compute_hook_url.clone()
|
||||
};
|
||||
let result = if let Some(notify_url) = &compute_hook_url {
|
||||
self.config.compute_hook_url.clone()
|
||||
};
|
||||
|
||||
// We validate this at startup
|
||||
let notify_url = compute_hook_url.as_ref().unwrap();
|
||||
self.do_notify(notify_url, &request, cancel).await
|
||||
} else {
|
||||
self.do_notify_local(&request).await.map_err(|e| {
|
||||
|
||||
@@ -203,6 +203,11 @@ struct Cli {
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
#[arg(long)]
|
||||
ssl_ca_file: Option<PathBuf>,
|
||||
|
||||
/// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver
|
||||
/// the compute notification directly (instead of via control plane).
|
||||
#[arg(long, default_value = "false")]
|
||||
use_local_compute_notifications: bool,
|
||||
}
|
||||
|
||||
enum StrictMode {
|
||||
@@ -368,6 +373,9 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
|
||||
);
|
||||
}
|
||||
StrictMode::Strict if args.use_local_compute_notifications => {
|
||||
anyhow::bail!("`--use-local-compute-notifications` is only permitted in `--dev` mode");
|
||||
}
|
||||
StrictMode::Strict => {
|
||||
tracing::info!("Starting in strict mode: configuration is OK.")
|
||||
}
|
||||
@@ -427,6 +435,7 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
use_https_safekeeper_api: args.use_https_safekeeper_api,
|
||||
ssl_ca_certs,
|
||||
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
|
||||
use_local_compute_notifications: args.use_local_compute_notifications,
|
||||
};
|
||||
|
||||
// Validate that we can connect to the database
|
||||
|
||||
@@ -448,6 +448,8 @@ pub struct Config {
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
|
||||
pub timelines_onto_safekeepers: bool,
|
||||
|
||||
pub use_local_compute_notifications: bool,
|
||||
}
|
||||
|
||||
impl From<DatabaseError> for ApiError {
|
||||
|
||||
@@ -1169,6 +1169,12 @@ class NeonEnv:
|
||||
if storage_controller_config is not None:
|
||||
cfg["storage_controller"] = storage_controller_config
|
||||
|
||||
if config.test_may_use_compatibility_snapshot_binaries:
|
||||
if "storage_controller" in cfg:
|
||||
cfg["storage_controller"]["use_local_compute_notifications"] = False
|
||||
else:
|
||||
cfg["storage_controller"] = {"use_local_compute_notifications": False}
|
||||
|
||||
# Create config for pageserver
|
||||
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
|
||||
@@ -82,6 +82,7 @@ def test_storage_controller_many_tenants(
|
||||
# guard against regressions in restart time.
|
||||
"max_offline": "30s",
|
||||
"max_warming_up": "300s",
|
||||
"use_local_compute_notifications": False,
|
||||
}
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
|
||||
|
||||
@@ -5,11 +5,9 @@ import asyncio
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
|
||||
def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
A relatively low level test of reconfiguring a compute's pageserver at runtime. Usually this
|
||||
is all done via the storage controller, but this test will disable the storage controller's compute
|
||||
@@ -23,19 +21,6 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{make_httpserver.host}:{make_httpserver.port}/"
|
||||
)
|
||||
|
||||
def ignore_notify(request: Request):
|
||||
# This test does direct updates to compute configuration: disable the storage controller's notification
|
||||
log.info(f"Ignoring storage controller compute notification: {request.json}")
|
||||
return Response(status=200)
|
||||
|
||||
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(
|
||||
ignore_notify
|
||||
)
|
||||
|
||||
env.create_branch("test_change_pageserver")
|
||||
endpoint = env.endpoints.create_start("test_change_pageserver")
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import fixtures.utils
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
@@ -592,17 +593,22 @@ def test_historic_storage_formats(
|
||||
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
|
||||
@pytest.mark.parametrize(
|
||||
**fixtures.utils.allpairs_versions(),
|
||||
)
|
||||
def test_versions_mismatch(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
compatibility_snapshot_dir,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
combination,
|
||||
):
|
||||
"""
|
||||
Checks compatibility of different combinations of versions of the components
|
||||
"""
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.from_repo_dir(
|
||||
compatibility_snapshot_dir / "repo",
|
||||
|
||||
@@ -97,5 +97,5 @@ def test_lfc_prefetch(neon_simple_env: NeonEnv):
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
# No redundant prefethc requrests if prefetch results are stored in LFC
|
||||
# No redundant prefetch requests if prefetch results are stored in LFC
|
||||
assert prefetch_expired == 0
|
||||
|
||||
@@ -91,6 +91,8 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
|
||||
f"http://{make_httpserver.host}:{make_httpserver.port}/"
|
||||
)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
def ignore_notify(request: Request):
|
||||
# This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions),
|
||||
# so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute.
|
||||
|
||||
@@ -808,6 +808,8 @@ def test_sharding_split_stripe_size(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
|
||||
)
|
||||
@@ -1316,6 +1318,11 @@ def test_sharding_split_failures(
|
||||
initial_shard_count = 2
|
||||
split_shard_count = 4
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
# Route to `compute_reconfigure_listener` instead
|
||||
"use_local_compute_notifications": False,
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
|
||||
@@ -73,7 +73,9 @@ def get_node_shard_counts(env: NeonEnv, tenant_ids):
|
||||
|
||||
|
||||
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
|
||||
def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination):
|
||||
def test_storage_controller_smoke(
|
||||
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure, combination
|
||||
):
|
||||
"""
|
||||
Test the basic lifecycle of a storage controller:
|
||||
- Restarting
|
||||
@@ -83,6 +85,7 @@ def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination)
|
||||
"""
|
||||
|
||||
neon_env_builder.num_pageservers = 3
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
env = neon_env_builder.init_configs()
|
||||
|
||||
# Start services by hand so that we can skip a pageserver (this will start + register later)
|
||||
@@ -620,6 +623,8 @@ def test_storage_controller_compute_hook(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
|
||||
@@ -738,6 +743,8 @@ def test_storage_controller_stuck_compute_hook(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
|
||||
@@ -885,6 +892,8 @@ def test_storage_controller_compute_hook_retry(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
@@ -1008,6 +1017,8 @@ def test_storage_controller_compute_hook_revert(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
tenant_id = env.initial_tenant
|
||||
@@ -1398,6 +1409,11 @@ def test_storage_controller_tenant_deletion(
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
# Route to `compute_reconfigure_listener` instead
|
||||
"use_local_compute_notifications": False,
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
@@ -2176,7 +2192,12 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
|
||||
|
||||
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
|
||||
@pytest.mark.parametrize("num_azs", [1, 2])
|
||||
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int, combination):
|
||||
def test_graceful_cluster_restart(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
num_azs: int,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
combination,
|
||||
):
|
||||
"""
|
||||
Graceful reststart of storage controller clusters use the drain and
|
||||
fill hooks in order to migrate attachments away from pageservers before
|
||||
@@ -2188,6 +2209,7 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int
|
||||
"""
|
||||
neon_env_builder.num_azs = num_azs
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
@@ -2443,7 +2465,6 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("while_offline", [True, False])
|
||||
def test_storage_controller_node_deletion(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
while_offline: bool,
|
||||
):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user