Compare commits

...

11 Commits

Author SHA1 Message Date
Konstantin Knizhnik
5843b50183 Fix mistyping in test comments 2025-03-27 21:06:57 +01:00
Konstantin Knizhnik
24233d9976 Undo changes in pagestore_smgr.c 2025-03-27 21:06:20 +01:00
Konstantin Knizhnik
735ccee5b2 Do not overwrite buffer filled by prefetch_lookup in lfc_readv_select 2025-03-27 14:12:54 +01:00
Konstantin Knizhnik
a729bc98a9 Do not switch connection to non-blocking mode 2025-03-24 21:06:21 +01:00
Konstantin Knizhnik
de0a8d78c2 Fix switch to non-blocking mode 2025-03-24 21:06:21 +01:00
Konstantin Knizhnik
d7f7d33b0e Use non -blocking mode for compute<->PS protocol 2025-03-24 21:06:20 +01:00
Konstantin Knizhnik
cfbe7a0b3f Remove loop from pageserver_try_receive 2025-03-24 21:06:20 +01:00
Konstantin Knizhnik
049e1c508d Rerstrict interval of polling socket state 2025-03-24 21:06:20 +01:00
Arpad Müller
5f3551e405 Add "still waiting for task" for slow shutdowns (#11351)
To help with narrowing down
https://github.com/neondatabase/cloud/issues/26362, we make the case
more noisy where we are wait for the shutdown of a specific task (in the
case of that issue, the `gc_loop`).
2025-03-24 17:29:44 +00:00
Anastasia Lubennikova
3e5884ff01 Revert "feat(compute_ctl): allow to change audit_log_level for existi… (#11343)
…ng (#11308)"

This reverts commit e5aef3747c.

The logic of this commit was incorrect:
enabling audit requires a restart of the compute,
because audit extensions use shared_preload_libraries.
So it cannot be done in the configuration phase,
require endpoint restart instead.
2025-03-21 18:09:34 +00:00
Vlad Lazar
9fc7c22cc9 storcon: add use_local_compute_notifications flag (#11333)
## Problem

While working on bulk import, I want to use the `control-plane-url` flag
for a different request.
Currently, the local compute hook is used whenever no control plane is
specified in the config.
My test requires local compute notifications and a configured
`control-plane-url` which isn't supported.

## Summary of changes

Add a `use-local-compute-notifications` flag. When this is set, we use
the local flow regardless of other config values.
It's enabled by default in neon_local and disabled by default in all
other envs. I had to turn the flag off in tests
that wish to bypass the local flow, but that's expected.

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2025-03-21 15:31:06 +00:00
20 changed files with 177 additions and 171 deletions

View File

@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::{Path, PathBuf};
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, Ordering};
@@ -153,11 +153,6 @@ pub struct ComputeState {
pub startup_span: Option<tracing::span::Span>,
pub metrics: ComputeMetrics,
/// current audit log level
/// to know if it is already configured, or we need to set up audit
/// when compute receives a new spec
pub audit_log_level: ComputeAudit,
}
impl ComputeState {
@@ -170,7 +165,6 @@ impl ComputeState {
pspec: None,
startup_span: None,
metrics: ComputeMetrics::default(),
audit_log_level: ComputeAudit::default(),
}
}
@@ -626,10 +620,16 @@ impl ComputeNode {
});
}
// If extended compute audit is enabled configure and start rsyslog
if pspec.spec.audit_log_level == ComputeAudit::Hipaa {
let log_directory_path = self.get_audit_log_dir().to_string_lossy().to_string();
configure_audit_rsyslog(&log_directory_path, pspec.spec.audit_log_level.as_str())?;
// Configure and start rsyslog for HIPAA if necessary
if let ComputeAudit::Hipaa = pspec.spec.audit_log_level {
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
if remote_endpoint.is_empty() {
anyhow::bail!("AUDIT_LOGGING_ENDPOINT is empty");
}
let log_directory_path = Path::new(&self.params.pgdata).join("log");
let log_directory_path = log_directory_path.to_string_lossy().to_string();
configure_audit_rsyslog(log_directory_path.clone(), "hipaa", &remote_endpoint)?;
// Launch a background task to clean up the audit logs
launch_pgaudit_gc(log_directory_path);
@@ -684,11 +684,6 @@ impl ComputeNode {
});
}
// after all the configuration is done
// preserve the information about the current audit log level
// so that we don't relaunch rsyslog on every spec change
self.set_audit_log_level(pspec.spec.audit_log_level);
// All done!
let startup_end_time = Utc::now();
let metrics = {
@@ -843,19 +838,6 @@ impl ComputeNode {
self.state.lock().unwrap().status
}
pub fn set_audit_log_level(&self, audit_log_level: ComputeAudit) {
let mut state = self.state.lock().unwrap();
state.audit_log_level = audit_log_level;
}
pub fn get_audit_log_level(&self) -> ComputeAudit {
self.state.lock().unwrap().audit_log_level
}
pub fn get_audit_log_dir(&self) -> PathBuf {
Path::new(&self.params.pgdata).join("log")
}
pub fn get_timeline_id(&self) -> Option<TimelineId> {
self.state
.lock()
@@ -1566,29 +1548,6 @@ impl ComputeNode {
});
}
// If extended compute audit is enabled configure and start rsyslog
// We check that the audit_log_level changed compared to the previous spec and skip this step if not.
let audit_log_level = self.get_audit_log_level();
if spec.audit_log_level == ComputeAudit::Hipaa && audit_log_level != spec.audit_log_level {
info!(
"Configuring audit logging because audit_log_level changed from {:?} to {:?}",
audit_log_level, spec.audit_log_level
);
let log_directory_path = self.get_audit_log_dir().to_string_lossy().to_string();
configure_audit_rsyslog(&log_directory_path, spec.audit_log_level.as_str())?;
// Launch a background task to clean up the audit logs
// If rsyslog was already configured, we don't need to start this process again.
match audit_log_level {
ComputeAudit::Disabled | ComputeAudit::Log => {
launch_pgaudit_gc(log_directory_path);
}
_ => {}
}
}
// Write new config
let pgdata_path = Path::new(&self.params.pgdata);
config::write_postgres_conf(
@@ -1598,14 +1557,7 @@ impl ComputeNode {
&self.compute_ctl_config.tls,
)?;
// Override the skip_catalog_updates flag
// if we need to install new extensions
//
// Check that audit_log_level changed compared to the previous spec and skip this step if not.
// All operations are idempotent, so this is just a performance optimization.
let force_catalog_updates = audit_log_level != spec.audit_log_level;
if !spec.skip_pg_catalog_updates || force_catalog_updates {
if !spec.skip_pg_catalog_updates {
let max_concurrent_connections = spec.reconfigure_concurrency;
// Temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are reconfiguring:
@@ -1630,11 +1582,6 @@ impl ComputeNode {
self.pg_reload_conf()?;
// after all the configuration is done
// preserve the information about the current audit log level
// so that we don't relaunch rsyslog on every spec change
self.set_audit_log_level(spec.audit_log_level);
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(

View File

@@ -200,9 +200,8 @@ pub fn write_postgres_conf(
)?;
// This log level is very verbose
// but this is necessary for HIPAA compliance.
// Exclude 'misc' category, because it doesn't contain anything relevant.
// Exclude 'misc' category, because it doesn't contain anythig relevant.
writeln!(file, "pgaudit.log='all, -misc'")?;
// Log parameters for all queries
writeln!(file, "pgaudit.log_parameter=on")?;
// Disable logging of catalog queries
// The catalog doesn't contain sensitive data, so we don't need to audit it.

View File

@@ -9,7 +9,6 @@ use anyhow::{Context, Result, anyhow};
use tracing::{error, info, instrument, warn};
const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
const AUDIT_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
fn get_rsyslog_pid() -> Option<String> {
let output = Command::new("pgrep")
@@ -49,43 +48,32 @@ fn restart_rsyslog() -> Result<()> {
Ok(())
}
pub fn configure_audit_rsyslog(log_directory: &str, audit_log_level: &str) -> Result<()> {
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT")?;
if remote_endpoint.is_empty() {
return Err(anyhow!("AUDIT_LOGGING_ENDPOINT is not set"));
}
let old_config_content = match std::fs::read_to_string(AUDIT_LOGS_CONF_PATH) {
Ok(c) => c,
Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
Err(err) => return Err(err.into()),
};
pub fn configure_audit_rsyslog(
log_directory: String,
tag: &str,
remote_endpoint: &str,
) -> Result<()> {
let config_content: String = format!(
include_str!("config_template/compute_audit_rsyslog_template.conf"),
log_directory = log_directory,
tag = audit_log_level,
tag = tag,
remote_endpoint = remote_endpoint
);
if old_config_content == config_content {
info!("rsyslog configuration is up-to-date");
return Ok(());
}
info!("rsyslog config_content: {}", config_content);
let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(AUDIT_LOGS_CONF_PATH)?;
.open(rsyslog_conf_path)?;
file.write_all(config_content.as_bytes())?;
info!(
"rsyslog configuration file {} added successfully. Starting rsyslogd",
AUDIT_LOGS_CONF_PATH
rsyslog_conf_path
);
// start the service, using the configuration

View File

@@ -184,6 +184,8 @@ pub struct NeonStorageControllerConf {
pub timelines_onto_safekeepers: bool,
pub use_https_safekeeper_api: bool,
pub use_local_compute_notifications: bool,
}
impl NeonStorageControllerConf {
@@ -213,6 +215,7 @@ impl Default for NeonStorageControllerConf {
use_https_pageserver_api: false,
timelines_onto_safekeepers: false,
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
}
}
}

View File

@@ -555,6 +555,10 @@ impl StorageController {
args.push("--use-https-safekeeper-api".to_string());
}
if self.config.use_local_compute_notifications {
args.push("--use-local-compute-notifications".to_string());
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
}

View File

@@ -160,6 +160,12 @@ pub struct ComputeSpec {
pub drop_subscriptions_before_start: bool,
/// Log level for audit logging:
///
/// Disabled - no audit logging. This is the default.
/// log - log masked statements to the postgres log using pgaudit extension
/// hipaa - log unmasked statements to the file using pgaudit and pgauditlogtofile extension
///
/// Extensions should be present in shared_preload_libraries
#[serde(default)]
pub audit_log_level: ComputeAudit,
}
@@ -282,27 +288,16 @@ impl ComputeMode {
}
/// Log level for audit logging
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
/// Disabled, log, hipaa
/// Default is Disabled
#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
pub enum ComputeAudit {
#[default]
/// no audit logging. This is the default.
Disabled,
/// write masked audit log statements to the postgres log using pgaudit extension
Log,
/// log unmasked statements to the file using pgaudit and pgauditlogtofile extensions
Hipaa,
}
impl ComputeAudit {
pub fn as_str(&self) -> &str {
match self {
ComputeAudit::Disabled => "disabled",
ComputeAudit::Log => "log",
ComputeAudit::Hipaa => "hipaa",
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct Cluster {
pub cluster_id: Option<String>,

View File

@@ -38,6 +38,7 @@ use std::panic::AssertUnwindSafe;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::FutureExt;
use once_cell::sync::Lazy;
@@ -584,18 +585,25 @@ pub async fn shutdown_tasks(
// warn to catch these in tests; there shouldn't be any
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
const INITIAL_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(1);
const PERIODIC_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(60);
if tokio::time::timeout(INITIAL_COMPLAIN_TIMEOUT, &mut join_handle)
.await
.is_err()
{
// allow some time to elapse before logging to cut down the number of log
// lines.
info!("waiting for task {} to shut down", task.name);
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
let _ = join_handle.await;
loop {
tokio::select! {
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
_ = &mut join_handle => break,
_ = tokio::time::sleep(PERIODIC_COMPLAIN_TIMEOUT) => info!("still waiting for task {} to shut down", task.name),
}
}
info!("task {} completed", task.name);
}
} else {

View File

@@ -694,7 +694,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
@@ -708,10 +708,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0);
iov[i].iov_base = buffers[buf_offset + i];
iov[i].iov_len = BLCKSZ;
BITMAP_CLR(mask, buf_offset + i);
}
if (n_blocks_to_read == 0)
{
for (int i = 0; i < blocks_in_chunk; i++)
{
BITMAP_CLR(mask, buf_offset + i);
}
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
blkno += blocks_in_chunk;
@@ -744,6 +747,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (entry == NULL)
{
/* Pages are not cached */
for (int i = 0; i < blocks_in_chunk; i++)
{
BITMAP_CLR(mask, buf_offset + i);
}
lfc_ctl->misses += blocks_in_chunk;
pgBufferUsage.file_cache.misses += blocks_in_chunk;
LWLockRelease(lfc_lock);
@@ -766,6 +773,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
FileCacheBlockState state = UNAVAILABLE;
bool sleeping = false;
if (!BITMAP_ISSET(mask, buf_offset + i))
continue;
while (lfc_ctl->generation == generation)
{
state = GET_STATE(entry, chunk_offs + i);
@@ -789,11 +800,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
if (state == AVAILABLE)
{
BITMAP_SET(mask, buf_offset + i);
iteration_hits++;
}
else
{
BITMAP_CLR(mask, buf_offset + i);
iteration_misses++;
}
}
LWLockRelease(lfc_lock);
@@ -801,15 +814,36 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * blocks_in_chunk))
if (blocks_in_chunk == n_blocks_to_read)
{
lfc_disable("read");
return -1;
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * blocks_in_chunk))
{
lfc_disable("read");
return -1;
}
}
else
{
/* Some blocks are already prefetched in provided buffers, we should not rewrite them, so we can not use vector read */
for (int i = 0; i < blocks_in_chunk; i++)
{
if (BITMAP_ISSET(mask, buf_offset + i))
{
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = pread(lfc_desc, iov[i].iov_base, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs + i) * BLCKSZ);
pgstat_report_wait_end();
if (rc != BLCKSZ)
{
lfc_disable("read");
return -1;
}
}
}
}
}
@@ -1000,12 +1034,12 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
LWLockRelease(lfc_lock);
return false;
}
lwlsn = neon_get_lwlsn(rinfo, forknum, blkno);
if (lwlsn > lsn)
{
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X",
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_modified_since LSN %X/%X",
blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn));
LWLockRelease(lfc_lock);
return false;

View File

@@ -1142,37 +1142,23 @@ pageserver_try_receive(shardno_t shard_no)
NeonResponse *resp;
PageServer *shard = &page_servers[shard_no];
PGconn *pageserver_conn = shard->conn;
/* read response */
int rc;
int rc;
if (shard->state != PS_Connected)
return NULL;
Assert(pageserver_conn);
while (true)
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
{
if (PQisBusy(shard->conn))
if (!PQconsumeInput(shard->conn))
{
WaitEvent event;
if (WaitEventSetWait(shard->wes_read, 0, &event, 1,
WAIT_EVENT_NEON_PS_READ) != 1
|| (event.events & WL_SOCKET_READABLE) == 0)
{
return NULL;
}
return NULL;
}
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
{
if (!PQconsumeInput(shard->conn))
{
return NULL;
}
}
else
break;
}
if (rc == 0)
return NULL;
else if (rc > 0)

View File

@@ -624,16 +624,19 @@ impl ComputeHook {
MaybeSendResult::Transmit((request, lock)) => (request, lock),
};
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
let result = if !self.config.use_local_compute_notifications {
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
self.config.compute_hook_url.clone()
};
let result = if let Some(notify_url) = &compute_hook_url {
self.config.compute_hook_url.clone()
};
// We validate this at startup
let notify_url = compute_hook_url.as_ref().unwrap();
self.do_notify(notify_url, &request, cancel).await
} else {
self.do_notify_local(&request).await.map_err(|e| {

View File

@@ -203,6 +203,11 @@ struct Cli {
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<PathBuf>,
/// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver
/// the compute notification directly (instead of via control plane).
#[arg(long, default_value = "false")]
use_local_compute_notifications: bool,
}
enum StrictMode {
@@ -368,6 +373,9 @@ async fn async_main() -> anyhow::Result<()> {
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
);
}
StrictMode::Strict if args.use_local_compute_notifications => {
anyhow::bail!("`--use-local-compute-notifications` is only permitted in `--dev` mode");
}
StrictMode::Strict => {
tracing::info!("Starting in strict mode: configuration is OK.")
}
@@ -427,6 +435,7 @@ async fn async_main() -> anyhow::Result<()> {
use_https_safekeeper_api: args.use_https_safekeeper_api,
ssl_ca_certs,
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
use_local_compute_notifications: args.use_local_compute_notifications,
};
// Validate that we can connect to the database

View File

@@ -448,6 +448,8 @@ pub struct Config {
pub ssl_ca_certs: Vec<Certificate>,
pub timelines_onto_safekeepers: bool,
pub use_local_compute_notifications: bool,
}
impl From<DatabaseError> for ApiError {

View File

@@ -1169,6 +1169,12 @@ class NeonEnv:
if storage_controller_config is not None:
cfg["storage_controller"] = storage_controller_config
if config.test_may_use_compatibility_snapshot_binaries:
if "storage_controller" in cfg:
cfg["storage_controller"]["use_local_compute_notifications"] = False
else:
cfg["storage_controller"] = {"use_local_compute_notifications": False}
# Create config for pageserver
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"

View File

@@ -82,6 +82,7 @@ def test_storage_controller_many_tenants(
# guard against regressions in restart time.
"max_offline": "30s",
"max_warming_up": "300s",
"use_local_compute_notifications": False,
}
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api

View File

@@ -5,11 +5,9 @@ import asyncio
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.remote_storage import RemoteStorageKind
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
"""
A relatively low level test of reconfiguring a compute's pageserver at runtime. Usually this
is all done via the storage controller, but this test will disable the storage controller's compute
@@ -23,19 +21,6 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
)
env = neon_env_builder.init_start()
neon_env_builder.control_plane_hooks_api = (
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
def ignore_notify(request: Request):
# This test does direct updates to compute configuration: disable the storage controller's notification
log.info(f"Ignoring storage controller compute notification: {request.json}")
return Response(status=200)
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(
ignore_notify
)
env.create_branch("test_change_pageserver")
endpoint = env.endpoints.create_start("test_change_pageserver")

View File

@@ -12,6 +12,7 @@ import fixtures.utils
import pytest
import toml
from fixtures.common_types import TenantId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -592,17 +593,22 @@ def test_historic_storage_formats(
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
@pytest.mark.parametrize(
**fixtures.utils.allpairs_versions(),
)
def test_versions_mismatch(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_version: PgVersion,
compatibility_snapshot_dir,
compute_reconfigure_listener: ComputeReconfigure,
combination,
):
"""
Checks compatibility of different combinations of versions of the components
"""
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(
compatibility_snapshot_dir / "repo",

View File

@@ -97,5 +97,5 @@ def test_lfc_prefetch(neon_simple_env: NeonEnv):
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
# No redundant prefethc requrests if prefetch results are stored in LFC
# No redundant prefetch requests if prefetch results are stored in LFC
assert prefetch_expired == 0

View File

@@ -91,6 +91,8 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
def ignore_notify(request: Request):
# This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions),
# so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute.

View File

@@ -808,6 +808,8 @@ def test_sharding_split_stripe_size(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
env = neon_env_builder.init_start(
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
)
@@ -1316,6 +1318,11 @@ def test_sharding_split_failures(
initial_shard_count = 2
split_shard_count = 4
neon_env_builder.storage_controller_config = {
# Route to `compute_reconfigure_listener` instead
"use_local_compute_notifications": False,
}
env = neon_env_builder.init_configs()
env.start()

View File

@@ -73,7 +73,9 @@ def get_node_shard_counts(env: NeonEnv, tenant_ids):
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination):
def test_storage_controller_smoke(
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure, combination
):
"""
Test the basic lifecycle of a storage controller:
- Restarting
@@ -83,6 +85,7 @@ def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination)
"""
neon_env_builder.num_pageservers = 3
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
# Start services by hand so that we can skip a pageserver (this will start + register later)
@@ -620,6 +623,8 @@ def test_storage_controller_compute_hook(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -738,6 +743,8 @@ def test_storage_controller_stuck_compute_hook(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -885,6 +892,8 @@ def test_storage_controller_compute_hook_retry(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_configs()
env.start()
@@ -1008,6 +1017,8 @@ def test_storage_controller_compute_hook_revert(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
tenant_id = env.initial_tenant
@@ -1398,6 +1409,11 @@ def test_storage_controller_tenant_deletion(
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
neon_env_builder.storage_controller_config = {
# Route to `compute_reconfigure_listener` instead
"use_local_compute_notifications": False,
}
env = neon_env_builder.init_configs()
env.start()
@@ -2176,7 +2192,12 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
@pytest.mark.parametrize("num_azs", [1, 2])
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int, combination):
def test_graceful_cluster_restart(
neon_env_builder: NeonEnvBuilder,
num_azs: int,
compute_reconfigure_listener: ComputeReconfigure,
combination,
):
"""
Graceful reststart of storage controller clusters use the drain and
fill hooks in order to migrate attachments away from pageservers before
@@ -2188,6 +2209,7 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int
"""
neon_env_builder.num_azs = num_azs
neon_env_builder.num_pageservers = 2
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
env.start()
@@ -2443,7 +2465,6 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("while_offline", [True, False])
def test_storage_controller_node_deletion(
neon_env_builder: NeonEnvBuilder,
compute_reconfigure_listener: ComputeReconfigure,
while_offline: bool,
):
"""