mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-27 23:30:38 +00:00
Compare commits
2 Commits
pump_prefe
...
fix_audit_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
15b06a6a1e | ||
|
|
ff87b51300 |
@@ -1497,6 +1497,27 @@ impl ComputeNode {
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
|
||||
/// Apply config operations that are not covered by `skip_pg_catalog_updates`
|
||||
#[instrument(skip_all)]
|
||||
pub fn apply_config_non_skippable(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
|
||||
|
||||
let conf = Arc::new(conf);
|
||||
let spec = Arc::new(
|
||||
compute_state
|
||||
.pspec
|
||||
.as_ref()
|
||||
.expect("spec must be set")
|
||||
.spec
|
||||
.clone(),
|
||||
);
|
||||
|
||||
// Merge-apply spec & changes to PostgreSQL state.
|
||||
self.apply_spec_sql_non_skippable(spec.clone(), conf.clone())?;
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
|
||||
// Wrapped this around `pg_ctl reload`, but right now we don't use
|
||||
// `pg_ctl` for start / stop.
|
||||
#[instrument(skip_all)]
|
||||
@@ -1619,8 +1640,24 @@ impl ComputeNode {
|
||||
"updated postgresql.conf to set neon.disable_logical_replication_subscribers=false"
|
||||
);
|
||||
}
|
||||
self.pg_reload_conf()?;
|
||||
} else {
|
||||
// We need to run some operations even if skip_pg_catalog_updates is set
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are applying config:
|
||||
// creating new extensions, roles, etc...
|
||||
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
self.apply_config_non_skippable(compute_state)?;
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
self.pg_reload_conf()?;
|
||||
}
|
||||
|
||||
self.post_apply_config()?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -308,6 +308,75 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Similar to apply_spec_sql, but for the simplified set of operations
|
||||
// that we perform even when `pg_skip_catalog_updates` is set.
|
||||
//
|
||||
// Keep the list of operations as small as possible,
|
||||
// as it will be run on every spec change and affect compute start time.
|
||||
pub fn apply_spec_sql_non_skippable(
|
||||
&self,
|
||||
spec: Arc<ComputeSpec>,
|
||||
conf: Arc<tokio_postgres::Config>,
|
||||
) -> Result<()> {
|
||||
info!("Applying non_skippable config",);
|
||||
debug!("Config: {:?}", spec);
|
||||
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
rt.block_on(async {
|
||||
let client = Self::get_maintenance_client(&conf).await?;
|
||||
let spec = spec.clone();
|
||||
|
||||
let jwks_roles = Arc::new(
|
||||
spec.as_ref()
|
||||
.local_proxy_config
|
||||
.iter()
|
||||
.flat_map(|it| &it.jwks)
|
||||
.flatten()
|
||||
.flat_map(|setting| &setting.role_names)
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>(),
|
||||
);
|
||||
|
||||
// NOTE: Here we assume that operations below don't use ctx
|
||||
// TODO: refactor apply_operations() to accept ctx as option.
|
||||
let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
|
||||
roles: HashMap::new(),
|
||||
dbs: HashMap::new(),
|
||||
}));
|
||||
|
||||
let mut phases = vec![];
|
||||
|
||||
match spec.audit_log_level {
|
||||
ComputeAudit::Hipaa => {
|
||||
phases.push(CreatePgauditExtension);
|
||||
phases.push(CreatePgauditlogtofileExtension);
|
||||
phases.push(DisablePostgresDBPgAudit);
|
||||
}
|
||||
ComputeAudit::Log => {
|
||||
phases.push(CreatePgauditExtension);
|
||||
phases.push(DisablePostgresDBPgAudit);
|
||||
}
|
||||
ComputeAudit::Disabled => {}
|
||||
}
|
||||
|
||||
for phase in phases {
|
||||
debug!("Applying phase {:?}", &phase);
|
||||
apply_operations(
|
||||
spec.clone(),
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
phase,
|
||||
|| async { Ok(&client) },
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Apply SQL migrations of the RunInEachDatabase phase.
|
||||
///
|
||||
/// May opt to not connect to databases that don't have any scheduled
|
||||
|
||||
@@ -184,8 +184,6 @@ pub struct NeonStorageControllerConf {
|
||||
pub timelines_onto_safekeepers: bool,
|
||||
|
||||
pub use_https_safekeeper_api: bool,
|
||||
|
||||
pub use_local_compute_notifications: bool,
|
||||
}
|
||||
|
||||
impl NeonStorageControllerConf {
|
||||
@@ -215,7 +213,6 @@ impl Default for NeonStorageControllerConf {
|
||||
use_https_pageserver_api: false,
|
||||
timelines_onto_safekeepers: false,
|
||||
use_https_safekeeper_api: false,
|
||||
use_local_compute_notifications: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -555,10 +555,6 @@ impl StorageController {
|
||||
args.push("--use-https-safekeeper-api".to_string());
|
||||
}
|
||||
|
||||
if self.config.use_local_compute_notifications {
|
||||
args.push("--use-local-compute-notifications".to_string());
|
||||
}
|
||||
|
||||
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
|
||||
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
|
||||
}
|
||||
|
||||
@@ -38,7 +38,6 @@ use std::panic::AssertUnwindSafe;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::FutureExt;
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -585,25 +584,18 @@ pub async fn shutdown_tasks(
|
||||
// warn to catch these in tests; there shouldn't be any
|
||||
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
|
||||
}
|
||||
const INITIAL_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
const PERIODIC_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
if tokio::time::timeout(INITIAL_COMPLAIN_TIMEOUT, &mut join_handle)
|
||||
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
// allow some time to elapse before logging to cut down the number of log
|
||||
// lines.
|
||||
info!("waiting for task {} to shut down", task.name);
|
||||
loop {
|
||||
tokio::select! {
|
||||
// we never handled this return value, but:
|
||||
// - we don't deschedule which would lead to is_cancelled
|
||||
// - panics are already logged (is_panicked)
|
||||
// - task errors are already logged in the wrapper
|
||||
_ = &mut join_handle => break,
|
||||
_ = tokio::time::sleep(PERIODIC_COMPLAIN_TIMEOUT) => info!("still waiting for task {} to shut down", task.name),
|
||||
}
|
||||
}
|
||||
// we never handled this return value, but:
|
||||
// - we don't deschedule which would lead to is_cancelled
|
||||
// - panics are already logged (is_panicked)
|
||||
// - task errors are already logged in the wrapper
|
||||
let _ = join_handle.await;
|
||||
info!("task {} completed", task.name);
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -694,7 +694,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
{
|
||||
struct iovec iov[PG_IOV_MAX];
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
|
||||
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
|
||||
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
|
||||
int iteration_hits = 0;
|
||||
int iteration_misses = 0;
|
||||
uint64 io_time_us = 0;
|
||||
@@ -708,13 +708,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0);
|
||||
iov[i].iov_base = buffers[buf_offset + i];
|
||||
iov[i].iov_len = BLCKSZ;
|
||||
BITMAP_CLR(mask, buf_offset + i);
|
||||
}
|
||||
if (n_blocks_to_read == 0)
|
||||
{
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
{
|
||||
BITMAP_CLR(mask, buf_offset + i);
|
||||
}
|
||||
buf_offset += blocks_in_chunk;
|
||||
nblocks -= blocks_in_chunk;
|
||||
blkno += blocks_in_chunk;
|
||||
@@ -747,10 +744,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
if (entry == NULL)
|
||||
{
|
||||
/* Pages are not cached */
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
{
|
||||
BITMAP_CLR(mask, buf_offset + i);
|
||||
}
|
||||
lfc_ctl->misses += blocks_in_chunk;
|
||||
pgBufferUsage.file_cache.misses += blocks_in_chunk;
|
||||
LWLockRelease(lfc_lock);
|
||||
@@ -773,10 +766,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
{
|
||||
FileCacheBlockState state = UNAVAILABLE;
|
||||
bool sleeping = false;
|
||||
|
||||
if (!BITMAP_ISSET(mask, buf_offset + i))
|
||||
continue;
|
||||
|
||||
while (lfc_ctl->generation == generation)
|
||||
{
|
||||
state = GET_STATE(entry, chunk_offs + i);
|
||||
@@ -800,13 +789,11 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
if (state == AVAILABLE)
|
||||
{
|
||||
BITMAP_SET(mask, buf_offset + i);
|
||||
iteration_hits++;
|
||||
}
|
||||
else
|
||||
{
|
||||
BITMAP_CLR(mask, buf_offset + i);
|
||||
iteration_misses++;
|
||||
}
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
@@ -814,36 +801,15 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
if (iteration_hits != 0)
|
||||
{
|
||||
if (blocks_in_chunk == n_blocks_to_read)
|
||||
{
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
rc = preadv(lfc_desc, iov, blocks_in_chunk,
|
||||
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
|
||||
pgstat_report_wait_end();
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
rc = preadv(lfc_desc, iov, blocks_in_chunk,
|
||||
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != (BLCKSZ * blocks_in_chunk))
|
||||
{
|
||||
lfc_disable("read");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
else
|
||||
if (rc != (BLCKSZ * blocks_in_chunk))
|
||||
{
|
||||
/* Some blocks are already prefetched in provided buffers, we should not rewrite them, so we can not use vector read */
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
{
|
||||
if (BITMAP_ISSET(mask, buf_offset + i))
|
||||
{
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
rc = pread(lfc_desc, iov[i].iov_base, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs + i) * BLCKSZ);
|
||||
pgstat_report_wait_end();
|
||||
if (rc != BLCKSZ)
|
||||
{
|
||||
lfc_disable("read");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
lfc_disable("read");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1034,12 +1000,12 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
LWLockRelease(lfc_lock);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
lwlsn = neon_get_lwlsn(rinfo, forknum, blkno);
|
||||
|
||||
if (lwlsn > lsn)
|
||||
{
|
||||
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_modified_since LSN %X/%X",
|
||||
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X",
|
||||
blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn));
|
||||
LWLockRelease(lfc_lock);
|
||||
return false;
|
||||
|
||||
@@ -1142,23 +1142,37 @@ pageserver_try_receive(shardno_t shard_no)
|
||||
NeonResponse *resp;
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
PGconn *pageserver_conn = shard->conn;
|
||||
int rc;
|
||||
/* read response */
|
||||
int rc;
|
||||
|
||||
if (shard->state != PS_Connected)
|
||||
return NULL;
|
||||
|
||||
Assert(pageserver_conn);
|
||||
|
||||
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
|
||||
if (rc == 0)
|
||||
while (true)
|
||||
{
|
||||
if (!PQconsumeInput(shard->conn))
|
||||
if (PQisBusy(shard->conn))
|
||||
{
|
||||
return NULL;
|
||||
WaitEvent event;
|
||||
if (WaitEventSetWait(shard->wes_read, 0, &event, 1,
|
||||
WAIT_EVENT_NEON_PS_READ) != 1
|
||||
|| (event.events & WL_SOCKET_READABLE) == 0)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
|
||||
if (rc == 0)
|
||||
{
|
||||
if (!PQconsumeInput(shard->conn))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
break;
|
||||
}
|
||||
|
||||
if (rc == 0)
|
||||
return NULL;
|
||||
else if (rc > 0)
|
||||
|
||||
@@ -624,19 +624,16 @@ impl ComputeHook {
|
||||
MaybeSendResult::Transmit((request, lock)) => (request, lock),
|
||||
};
|
||||
|
||||
let result = if !self.config.use_local_compute_notifications {
|
||||
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
|
||||
Some(if control_plane_url.ends_with('/') {
|
||||
format!("{control_plane_url}notify-attach")
|
||||
} else {
|
||||
format!("{control_plane_url}/notify-attach")
|
||||
})
|
||||
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
|
||||
Some(if control_plane_url.ends_with('/') {
|
||||
format!("{control_plane_url}notify-attach")
|
||||
} else {
|
||||
self.config.compute_hook_url.clone()
|
||||
};
|
||||
|
||||
// We validate this at startup
|
||||
let notify_url = compute_hook_url.as_ref().unwrap();
|
||||
format!("{control_plane_url}/notify-attach")
|
||||
})
|
||||
} else {
|
||||
self.config.compute_hook_url.clone()
|
||||
};
|
||||
let result = if let Some(notify_url) = &compute_hook_url {
|
||||
self.do_notify(notify_url, &request, cancel).await
|
||||
} else {
|
||||
self.do_notify_local(&request).await.map_err(|e| {
|
||||
|
||||
@@ -203,11 +203,6 @@ struct Cli {
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
#[arg(long)]
|
||||
ssl_ca_file: Option<PathBuf>,
|
||||
|
||||
/// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver
|
||||
/// the compute notification directly (instead of via control plane).
|
||||
#[arg(long, default_value = "false")]
|
||||
use_local_compute_notifications: bool,
|
||||
}
|
||||
|
||||
enum StrictMode {
|
||||
@@ -373,9 +368,6 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
|
||||
);
|
||||
}
|
||||
StrictMode::Strict if args.use_local_compute_notifications => {
|
||||
anyhow::bail!("`--use-local-compute-notifications` is only permitted in `--dev` mode");
|
||||
}
|
||||
StrictMode::Strict => {
|
||||
tracing::info!("Starting in strict mode: configuration is OK.")
|
||||
}
|
||||
@@ -435,7 +427,6 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
use_https_safekeeper_api: args.use_https_safekeeper_api,
|
||||
ssl_ca_certs,
|
||||
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
|
||||
use_local_compute_notifications: args.use_local_compute_notifications,
|
||||
};
|
||||
|
||||
// Validate that we can connect to the database
|
||||
|
||||
@@ -448,8 +448,6 @@ pub struct Config {
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
|
||||
pub timelines_onto_safekeepers: bool,
|
||||
|
||||
pub use_local_compute_notifications: bool,
|
||||
}
|
||||
|
||||
impl From<DatabaseError> for ApiError {
|
||||
|
||||
@@ -1169,12 +1169,6 @@ class NeonEnv:
|
||||
if storage_controller_config is not None:
|
||||
cfg["storage_controller"] = storage_controller_config
|
||||
|
||||
if config.test_may_use_compatibility_snapshot_binaries:
|
||||
if "storage_controller" in cfg:
|
||||
cfg["storage_controller"]["use_local_compute_notifications"] = False
|
||||
else:
|
||||
cfg["storage_controller"] = {"use_local_compute_notifications": False}
|
||||
|
||||
# Create config for pageserver
|
||||
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
|
||||
@@ -82,7 +82,6 @@ def test_storage_controller_many_tenants(
|
||||
# guard against regressions in restart time.
|
||||
"max_offline": "30s",
|
||||
"max_warming_up": "300s",
|
||||
"use_local_compute_notifications": False,
|
||||
}
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
|
||||
|
||||
@@ -5,9 +5,11 @@ import asyncio
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
|
||||
"""
|
||||
A relatively low level test of reconfiguring a compute's pageserver at runtime. Usually this
|
||||
is all done via the storage controller, but this test will disable the storage controller's compute
|
||||
@@ -21,6 +23,19 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{make_httpserver.host}:{make_httpserver.port}/"
|
||||
)
|
||||
|
||||
def ignore_notify(request: Request):
|
||||
# This test does direct updates to compute configuration: disable the storage controller's notification
|
||||
log.info(f"Ignoring storage controller compute notification: {request.json}")
|
||||
return Response(status=200)
|
||||
|
||||
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(
|
||||
ignore_notify
|
||||
)
|
||||
|
||||
env.create_branch("test_change_pageserver")
|
||||
endpoint = env.endpoints.create_start("test_change_pageserver")
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@ import fixtures.utils
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
@@ -593,22 +592,17 @@ def test_historic_storage_formats(
|
||||
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.parametrize(
|
||||
**fixtures.utils.allpairs_versions(),
|
||||
)
|
||||
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
|
||||
def test_versions_mismatch(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
compatibility_snapshot_dir,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
combination,
|
||||
):
|
||||
"""
|
||||
Checks compatibility of different combinations of versions of the components
|
||||
"""
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.from_repo_dir(
|
||||
compatibility_snapshot_dir / "repo",
|
||||
|
||||
@@ -97,5 +97,5 @@ def test_lfc_prefetch(neon_simple_env: NeonEnv):
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
# No redundant prefetch requests if prefetch results are stored in LFC
|
||||
# No redundant prefethc requrests if prefetch results are stored in LFC
|
||||
assert prefetch_expired == 0
|
||||
|
||||
@@ -91,8 +91,6 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
|
||||
f"http://{make_httpserver.host}:{make_httpserver.port}/"
|
||||
)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
def ignore_notify(request: Request):
|
||||
# This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions),
|
||||
# so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute.
|
||||
|
||||
@@ -808,8 +808,6 @@ def test_sharding_split_stripe_size(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
|
||||
)
|
||||
@@ -1318,11 +1316,6 @@ def test_sharding_split_failures(
|
||||
initial_shard_count = 2
|
||||
split_shard_count = 4
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
# Route to `compute_reconfigure_listener` instead
|
||||
"use_local_compute_notifications": False,
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
|
||||
@@ -73,9 +73,7 @@ def get_node_shard_counts(env: NeonEnv, tenant_ids):
|
||||
|
||||
|
||||
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
|
||||
def test_storage_controller_smoke(
|
||||
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure, combination
|
||||
):
|
||||
def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination):
|
||||
"""
|
||||
Test the basic lifecycle of a storage controller:
|
||||
- Restarting
|
||||
@@ -85,7 +83,6 @@ def test_storage_controller_smoke(
|
||||
"""
|
||||
|
||||
neon_env_builder.num_pageservers = 3
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
env = neon_env_builder.init_configs()
|
||||
|
||||
# Start services by hand so that we can skip a pageserver (this will start + register later)
|
||||
@@ -623,8 +620,6 @@ def test_storage_controller_compute_hook(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
|
||||
@@ -743,8 +738,6 @@ def test_storage_controller_stuck_compute_hook(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
|
||||
@@ -892,8 +885,6 @@ def test_storage_controller_compute_hook_retry(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
@@ -1017,8 +1008,6 @@ def test_storage_controller_compute_hook_revert(
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
tenant_id = env.initial_tenant
|
||||
@@ -1409,11 +1398,6 @@ def test_storage_controller_tenant_deletion(
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
# Route to `compute_reconfigure_listener` instead
|
||||
"use_local_compute_notifications": False,
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
@@ -2192,12 +2176,7 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
|
||||
|
||||
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
|
||||
@pytest.mark.parametrize("num_azs", [1, 2])
|
||||
def test_graceful_cluster_restart(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
num_azs: int,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
combination,
|
||||
):
|
||||
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int, combination):
|
||||
"""
|
||||
Graceful reststart of storage controller clusters use the drain and
|
||||
fill hooks in order to migrate attachments away from pageservers before
|
||||
@@ -2209,7 +2188,6 @@ def test_graceful_cluster_restart(
|
||||
"""
|
||||
neon_env_builder.num_azs = num_azs
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
@@ -2465,6 +2443,7 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("while_offline", [True, False])
|
||||
def test_storage_controller_node_deletion(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
while_offline: bool,
|
||||
):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user