Compare commits

..

5 Commits

Author SHA1 Message Date
Arpad Müller
896420acce wip 2025-07-01 01:15:19 +02:00
Arpad Müller
9d858b8cbe pass the right cancellation token 2025-06-30 16:08:18 +02:00
Arpad Müller
5415b6cb0d pass through cancellations 2025-06-30 16:08:18 +02:00
Arpad Müller
216ec91ef3 Respect cancellation for child generation 2025-06-30 16:08:18 +02:00
Arpad Müller
29d4f0638e Add cancellation token to RequestContext 2025-06-30 16:08:18 +02:00
46 changed files with 408 additions and 1186 deletions

1
.gitignore vendored
View File

@@ -6,7 +6,6 @@
/tmp_check_cli
__pycache__/
test_output/
neon_previous/
.vscode
.idea
*.swp

7
Cargo.lock generated
View File

@@ -1305,7 +1305,6 @@ dependencies = [
"fail",
"flate2",
"futures",
"hostname-validator",
"http 1.1.0",
"indexmap 2.9.0",
"itertools 0.10.5",
@@ -2772,12 +2771,6 @@ dependencies = [
"windows",
]
[[package]]
name = "hostname-validator"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f558a64ac9af88b5ba400d99b579451af0d39c6d360980045b91aac966d705e2"
[[package]]
name = "http"
version = "0.2.9"

View File

@@ -1983,7 +1983,7 @@ RUN apt update && \
locales \
lsof \
procps \
rsyslog-gnutls \
rsyslog \
screen \
tcpdump \
$VERSION_INSTALLS && \

View File

@@ -8,8 +8,6 @@
import 'sql_exporter/compute_logical_snapshot_files.libsonnet',
import 'sql_exporter/compute_logical_snapshots_bytes.libsonnet',
import 'sql_exporter/compute_max_connections.libsonnet',
import 'sql_exporter/compute_pg_oldest_frozen_xid_age.libsonnet',
import 'sql_exporter/compute_pg_oldest_mxid_age.libsonnet',
import 'sql_exporter/compute_receive_lsn.libsonnet',
import 'sql_exporter/compute_subscriptions_count.libsonnet',
import 'sql_exporter/connection_counts.libsonnet',

View File

@@ -1,13 +0,0 @@
{
metric_name: 'compute_pg_oldest_frozen_xid_age',
type: 'gauge',
help: 'Age of oldest XIDs that have not been frozen by VACUUM. An indicator of how long it has been since VACUUM last ran.',
key_labels: [
'database_name',
],
value_label: 'metric',
values: [
'frozen_xid_age',
],
query: importstr 'sql_exporter/compute_pg_oldest_frozen_xid_age.sql',
}

View File

@@ -1,4 +0,0 @@
SELECT datname database_name,
age(datfrozenxid) frozen_xid_age
FROM pg_database
ORDER BY frozen_xid_age DESC LIMIT 10;

View File

@@ -1,13 +0,0 @@
{
metric_name: 'compute_pg_oldest_mxid_age',
type: 'gauge',
help: 'Age of oldest MXIDs that have not been replaced by VACUUM. An indicator of how long it has been since VACUUM last ran.',
key_labels: [
'database_name',
],
value_label: 'metric',
values: [
'min_mxid_age',
],
query: importstr 'sql_exporter/compute_pg_oldest_mxid_age.sql',
}

View File

@@ -1,4 +0,0 @@
SELECT datname database_name,
mxid_age(datminmxid) min_mxid_age
FROM pg_database
ORDER BY min_mxid_age DESC LIMIT 10;

View File

@@ -1,8 +1,8 @@
diff --git a/sql/anon.sql b/sql/anon.sql
index 0cdc769..b450327 100644
index 0cdc769..f6cc950 100644
--- a/sql/anon.sql
+++ b/sql/anon.sql
@@ -1141,3 +1141,15 @@ $$
@@ -1141,3 +1141,8 @@ $$
-- TODO : https://en.wikipedia.org/wiki/L-diversity
-- TODO : https://en.wikipedia.org/wiki/T-closeness
@@ -11,13 +11,6 @@ index 0cdc769..b450327 100644
+
+GRANT ALL ON SCHEMA anon to neon_superuser;
+GRANT ALL ON ALL TABLES IN SCHEMA anon TO neon_superuser;
+
+DO $$
+BEGIN
+ IF current_setting('server_version_num')::int >= 150000 THEN
+ GRANT SET ON PARAMETER anon.transparent_dynamic_masking TO neon_superuser;
+ END IF;
+END $$;
diff --git a/sql/init.sql b/sql/init.sql
index 7da6553..9b6164b 100644
--- a/sql/init.sql

View File

@@ -27,7 +27,6 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
hostname-validator = "1.1"
indexmap.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true

View File

@@ -36,8 +36,6 @@
use std::ffi::OsString;
use std::fs::File;
use std::process::exit;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
@@ -192,9 +190,7 @@ fn main() -> Result<()> {
cgroup: cli.cgroup,
#[cfg(target_os = "linux")]
vm_monitor_addr: cli.vm_monitor_addr,
installed_extensions_collection_interval: Arc::new(AtomicU64::new(
cli.installed_extensions_collection_interval,
)),
installed_extensions_collection_interval: cli.installed_extensions_collection_interval,
},
config,
)?;

View File

@@ -25,7 +25,7 @@ use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
@@ -70,7 +70,6 @@ pub static BUILD_TAG: Lazy<String> = Lazy::new(|| {
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string()
});
const DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL: u64 = 3600;
/// Static configuration params that don't change after startup. These mostly
/// come from the CLI args, or are derived from them.
@@ -104,7 +103,7 @@ pub struct ComputeNodeParams {
pub remote_ext_base_url: Option<Url>,
/// Interval for installed extensions collection
pub installed_extensions_collection_interval: Arc<AtomicU64>,
pub installed_extensions_collection_interval: u64,
}
/// Compute node info shared across several `compute_ctl` threads.
@@ -127,9 +126,6 @@ pub struct ComputeNode {
// key: ext_archive_name, value: started download time, download_completed?
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
pub compute_ctl_config: ComputeCtlConfig,
/// Handle to the extension stats collection task
extension_stats_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
}
// store some metrics about download size that might impact startup time
@@ -432,7 +428,6 @@ impl ComputeNode {
state_changed: Condvar::new(),
ext_download_progress: RwLock::new(HashMap::new()),
compute_ctl_config: config.compute_ctl_config,
extension_stats_task: Mutex::new(None),
})
}
@@ -520,9 +515,6 @@ impl ComputeNode {
None
};
// Terminate the extension stats collection task
this.terminate_extension_stats_task();
// Terminate the vm_monitor so it releases the file watcher on
// /sys/fs/cgroup/neon-postgres.
// Note: the vm-monitor only runs on linux because it requires cgroups.
@@ -759,15 +751,10 @@ impl ComputeNode {
// Configure and start rsyslog for compliance audit logging
match pspec.spec.audit_log_level {
ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
let remote_tls_endpoint =
std::env::var("AUDIT_LOGGING_TLS_ENDPOINT").unwrap_or("".to_string());
let remote_plain_endpoint =
let remote_endpoint =
std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
if remote_plain_endpoint.is_empty() && remote_tls_endpoint.is_empty() {
anyhow::bail!(
"AUDIT_LOGGING_ENDPOINT and AUDIT_LOGGING_TLS_ENDPOINT are both empty"
);
if remote_endpoint.is_empty() {
anyhow::bail!("AUDIT_LOGGING_ENDPOINT is empty");
}
let log_directory_path = Path::new(&self.params.pgdata).join("log");
@@ -783,8 +770,7 @@ impl ComputeNode {
log_directory_path.clone(),
endpoint_id,
project_id,
&remote_plain_endpoint,
&remote_tls_endpoint,
&remote_endpoint,
)?;
// Launch a background task to clean up the audit logs
@@ -1685,8 +1671,6 @@ impl ComputeNode {
tls_config = self.compute_ctl_config.tls.clone();
}
self.update_installed_extensions_collection_interval(&spec);
let max_concurrent_connections = self.max_service_connections(compute_state, &spec);
// Merge-apply spec & changes to PostgreSQL state.
@@ -1751,8 +1735,6 @@ impl ComputeNode {
let tls_config = self.tls_config(&spec);
self.update_installed_extensions_collection_interval(&spec);
if let Some(ref pgbouncer_settings) = spec.pgbouncer_settings {
info!("tuning pgbouncer");
@@ -2357,20 +2339,10 @@ LIMIT 100",
}
pub fn spawn_extension_stats_task(&self) {
// Cancel any existing task
if let Some(handle) = self.extension_stats_task.lock().unwrap().take() {
handle.abort();
}
let conf = self.tokio_conn_conf.clone();
let atomic_interval = self.params.installed_extensions_collection_interval.clone();
let mut installed_extensions_collection_interval =
2 * atomic_interval.load(std::sync::atomic::Ordering::SeqCst);
info!(
"[NEON_EXT_SPAWN] Spawning background installed extensions worker with Timeout: {}",
installed_extensions_collection_interval
);
let handle = tokio::spawn(async move {
let installed_extensions_collection_interval =
self.params.installed_extensions_collection_interval;
tokio::spawn(async move {
// An initial sleep is added to ensure that two collections don't happen at the same time.
// The first collection happens during compute startup.
tokio::time::sleep(tokio::time::Duration::from_secs(
@@ -2383,48 +2355,8 @@ LIMIT 100",
loop {
interval.tick().await;
let _ = installed_extensions(conf.clone()).await;
// Acquire a read lock on the compute spec and then update the interval if necessary
interval = tokio::time::interval(tokio::time::Duration::from_secs(std::cmp::max(
installed_extensions_collection_interval,
2 * atomic_interval.load(std::sync::atomic::Ordering::SeqCst),
)));
installed_extensions_collection_interval = interval.period().as_secs();
}
});
// Store the new task handle
*self.extension_stats_task.lock().unwrap() = Some(handle);
}
fn terminate_extension_stats_task(&self) {
if let Some(handle) = self.extension_stats_task.lock().unwrap().take() {
handle.abort();
}
}
fn update_installed_extensions_collection_interval(&self, spec: &ComputeSpec) {
// Update the interval for collecting installed extensions statistics
// If the value is -1, we never suspend so set the value to default collection.
// If the value is 0, it means default, we will just continue to use the default.
if spec.suspend_timeout_seconds == -1 || spec.suspend_timeout_seconds == 0 {
info!(
"[NEON_EXT_INT_UPD] Spec Timeout: {}, New Timeout: {}",
spec.suspend_timeout_seconds, DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL
);
self.params.installed_extensions_collection_interval.store(
DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL,
std::sync::atomic::Ordering::SeqCst,
);
} else {
info!(
"[NEON_EXT_INT_UPD] Spec Timeout: {}",
spec.suspend_timeout_seconds
);
self.params.installed_extensions_collection_interval.store(
spec.suspend_timeout_seconds as u64,
std::sync::atomic::Ordering::SeqCst,
);
}
}
}

View File

@@ -10,13 +10,7 @@ input(type="imfile" File="{log_directory}/*.log"
startmsg.regex="^[[:digit:]]{{4}}-[[:digit:]]{{2}}-[[:digit:]]{{2}} [[:digit:]]{{2}}:[[:digit:]]{{2}}:[[:digit:]]{{2}}.[[:digit:]]{{3}} GMT,")
# the directory to store rsyslog state files
global(
workDirectory="/var/log/rsyslog"
DefaultNetstreamDriverCAFile="/etc/ssl/certs/ca-certificates.crt"
)
# Whether the remote syslog receiver uses tls
set $.remote_syslog_tls = "{remote_syslog_tls}";
global(workDirectory="/var/log/rsyslog")
# Construct json, endpoint_id and project_id as additional metadata
set $.json_log!endpoint_id = "{endpoint_id}";
@@ -27,29 +21,5 @@ set $.json_log!msg = $msg;
template(name="PgAuditLog" type="string"
string="<%PRI%>1 %TIMESTAMP:::date-rfc3339% %HOSTNAME% - - - - %$.json_log%")
# Forward to remote syslog receiver (over TLS)
if ( $syslogtag == 'pgaudit_log' ) then {{
if ( $.remote_syslog_tls == 'true' ) then {{
action(type="omfwd" target="{remote_syslog_host}" port="{remote_syslog_port}" protocol="tcp"
template="PgAuditLog"
queue.type="linkedList"
queue.size="1000"
action.ResumeRetryCount="10"
StreamDriver="gtls"
StreamDriverMode="1"
StreamDriverAuthMode="x509/name"
StreamDriverPermittedPeers="{remote_syslog_host}"
StreamDriver.CheckExtendedKeyPurpose="on"
StreamDriver.PermitExpiredCerts="off"
)
stop
}} else {{
action(type="omfwd" target="{remote_syslog_host}" port="{remote_syslog_port}" protocol="tcp"
template="PgAuditLog"
queue.type="linkedList"
queue.size="1000"
action.ResumeRetryCount="10"
)
stop
}}
}}
# Forward to remote syslog receiver (@@<hostname>:<port>;format
local5.info @@{remote_endpoint};PgAuditLog

View File

@@ -4,10 +4,8 @@ use std::path::Path;
use std::process::Command;
use std::time::Duration;
use std::{fs::OpenOptions, io::Write};
use url::{Host, Url};
use anyhow::{Context, Result, anyhow};
use hostname_validator;
use tracing::{error, info, instrument, warn};
const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
@@ -84,84 +82,18 @@ fn restart_rsyslog() -> Result<()> {
Ok(())
}
fn parse_audit_syslog_address(
remote_plain_endpoint: &str,
remote_tls_endpoint: &str,
) -> Result<(String, u16, String)> {
let tls;
let remote_endpoint = if !remote_tls_endpoint.is_empty() {
tls = "true".to_string();
remote_tls_endpoint
} else {
tls = "false".to_string();
remote_plain_endpoint
};
// Urlify the remote_endpoint, so parsing can be done with url::Url.
let url_str = format!("http://{remote_endpoint}");
let url = Url::parse(&url_str).map_err(|err| {
anyhow!("Error parsing {remote_endpoint}, expected host:port, got {err:?}")
})?;
let is_valid = url.scheme() == "http"
&& url.path() == "/"
&& url.query().is_none()
&& url.fragment().is_none()
&& url.username() == ""
&& url.password().is_none();
if !is_valid {
return Err(anyhow!(
"Invalid address format {remote_endpoint}, expected host:port"
));
}
let host = match url.host() {
Some(Host::Domain(h)) if hostname_validator::is_valid(h) => h.to_string(),
Some(Host::Ipv4(ip4)) => ip4.to_string(),
Some(Host::Ipv6(ip6)) => ip6.to_string(),
_ => return Err(anyhow!("Invalid host")),
};
let port = url
.port()
.ok_or_else(|| anyhow!("Invalid port in {remote_endpoint}"))?;
Ok((host, port, tls))
}
fn generate_audit_rsyslog_config(
log_directory: String,
endpoint_id: &str,
project_id: &str,
remote_syslog_host: &str,
remote_syslog_port: u16,
remote_syslog_tls: &str,
) -> String {
format!(
include_str!("config_template/compute_audit_rsyslog_template.conf"),
log_directory = log_directory,
endpoint_id = endpoint_id,
project_id = project_id,
remote_syslog_host = remote_syslog_host,
remote_syslog_port = remote_syslog_port,
remote_syslog_tls = remote_syslog_tls
)
}
pub fn configure_audit_rsyslog(
log_directory: String,
endpoint_id: &str,
project_id: &str,
remote_endpoint: &str,
remote_tls_endpoint: &str,
) -> Result<()> {
let (remote_syslog_host, remote_syslog_port, remote_syslog_tls) =
parse_audit_syslog_address(remote_endpoint, remote_tls_endpoint).unwrap();
let config_content = generate_audit_rsyslog_config(
log_directory,
endpoint_id,
project_id,
&remote_syslog_host,
remote_syslog_port,
&remote_syslog_tls,
let config_content: String = format!(
include_str!("config_template/compute_audit_rsyslog_template.conf"),
log_directory = log_directory,
endpoint_id = endpoint_id,
project_id = project_id,
remote_endpoint = remote_endpoint
);
info!("rsyslog config_content: {}", config_content);
@@ -326,8 +258,6 @@ pub fn launch_pgaudit_gc(log_directory: String) {
mod tests {
use crate::rsyslog::PostgresLogsRsyslogConfig;
use super::{generate_audit_rsyslog_config, parse_audit_syslog_address};
#[test]
fn test_postgres_logs_config() {
{
@@ -357,146 +287,4 @@ mod tests {
assert!(res.is_err());
}
}
#[test]
fn test_parse_audit_syslog_address() {
{
// host:port format (plaintext)
let parsed = parse_audit_syslog_address("collector.host.tld:5555", "");
assert!(parsed.is_ok());
assert_eq!(
parsed.unwrap(),
(
String::from("collector.host.tld"),
5555,
String::from("false")
)
);
}
{
// host:port format with ipv4 ip address (plaintext)
let parsed = parse_audit_syslog_address("10.0.0.1:5555", "");
assert!(parsed.is_ok());
assert_eq!(
parsed.unwrap(),
(String::from("10.0.0.1"), 5555, String::from("false"))
);
}
{
// host:port format with ipv6 ip address (plaintext)
let parsed =
parse_audit_syslog_address("[7e60:82ed:cb2e:d617:f904:f395:aaca:e252]:5555", "");
assert_eq!(
parsed.unwrap(),
(
String::from("7e60:82ed:cb2e:d617:f904:f395:aaca:e252"),
5555,
String::from("false")
)
);
}
{
// Only TLS host:port defined
let parsed = parse_audit_syslog_address("", "tls.host.tld:5556");
assert_eq!(
parsed.unwrap(),
(String::from("tls.host.tld"), 5556, String::from("true"))
);
}
{
// tls host should take precedence, when both defined
let parsed = parse_audit_syslog_address("plaintext.host.tld:5555", "tls.host.tld:5556");
assert_eq!(
parsed.unwrap(),
(String::from("tls.host.tld"), 5556, String::from("true"))
);
}
{
// host without port (plaintext)
let parsed = parse_audit_syslog_address("collector.host.tld", "");
assert!(parsed.is_err());
}
{
// port without host
let parsed = parse_audit_syslog_address(":5555", "");
assert!(parsed.is_err());
}
{
// valid host with invalid port
let parsed = parse_audit_syslog_address("collector.host.tld:90001", "");
assert!(parsed.is_err());
}
{
// invalid hostname with valid port
let parsed = parse_audit_syslog_address("-collector.host.tld:5555", "");
assert!(parsed.is_err());
}
{
// parse error
let parsed = parse_audit_syslog_address("collector.host.tld:::5555", "");
assert!(parsed.is_err());
}
}
#[test]
fn test_generate_audit_rsyslog_config() {
{
// plaintext version
let log_directory = "/tmp/log".to_string();
let endpoint_id = "ep-test-endpoint-id";
let project_id = "test-project-id";
let remote_syslog_host = "collector.host.tld";
let remote_syslog_port = 5555;
let remote_syslog_tls = "false";
let conf_str = generate_audit_rsyslog_config(
log_directory,
endpoint_id,
project_id,
remote_syslog_host,
remote_syslog_port,
remote_syslog_tls,
);
assert!(conf_str.contains(r#"set $.remote_syslog_tls = "false";"#));
assert!(conf_str.contains(r#"type="omfwd""#));
assert!(conf_str.contains(r#"target="collector.host.tld""#));
assert!(conf_str.contains(r#"port="5555""#));
assert!(conf_str.contains(r#"StreamDriverPermittedPeers="collector.host.tld""#));
}
{
// TLS version
let log_directory = "/tmp/log".to_string();
let endpoint_id = "ep-test-endpoint-id";
let project_id = "test-project-id";
let remote_syslog_host = "collector.host.tld";
let remote_syslog_port = 5556;
let remote_syslog_tls = "true";
let conf_str = generate_audit_rsyslog_config(
log_directory,
endpoint_id,
project_id,
remote_syslog_host,
remote_syslog_port,
remote_syslog_tls,
);
assert!(conf_str.contains(r#"set $.remote_syslog_tls = "true";"#));
assert!(conf_str.contains(r#"type="omfwd""#));
assert!(conf_str.contains(r#"target="collector.host.tld""#));
assert!(conf_str.contains(r#"port="5556""#));
assert!(conf_str.contains(r#"StreamDriverPermittedPeers="collector.host.tld""#));
}
}
}

View File

@@ -3,8 +3,7 @@
"timestamp": "2021-05-23T18:25:43.511Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
"suspend_timeout_seconds": 3600,
"cluster": {
"cluster_id": "test-cluster-42",
"name": "Zenith Test",

View File

@@ -1649,9 +1649,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = parse_safekeepers(&args.safekeepers)?;
endpoint
.reconfigure(Some(pageservers), None, safekeepers, None)
.await?;
endpoint.reconfigure(pageservers, None, safekeepers).await?;
}
EndpointCmd::Stop(args) => {
let endpoint_id = &args.endpoint_id;

View File

@@ -780,7 +780,6 @@ impl Endpoint {
endpoint_storage_addr: Some(endpoint_storage_addr),
endpoint_storage_token: Some(endpoint_storage_token),
autoprewarm: false,
suspend_timeout_seconds: -1, // Only used in neon_local.
};
// this strange code is needed to support respec() in tests
@@ -975,11 +974,12 @@ impl Endpoint {
pub async fn reconfigure(
&self,
pageservers: Option<Vec<(PageserverProtocol, Host, u16)>>,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
safekeeper_generation: Option<SafekeeperGeneration>,
) -> Result<()> {
anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
let (mut spec, compute_ctl_config) = {
let config_path = self.endpoint_path().join("config.json");
let file = std::fs::File::open(config_path)?;
@@ -991,24 +991,16 @@ impl Endpoint {
let postgresql_conf = self.read_postgresql_conf()?;
spec.cluster.postgresql_conf = Some(postgresql_conf);
// If pageservers are not specified, don't change them.
if let Some(pageservers) = pageservers {
anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
spec.pageserver_connstring = Some(pageserver_connstr);
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
spec.pageserver_connstring = Some(pageserver_connstr);
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}
// If safekeepers are not specified, don't change them.
if let Some(safekeepers) = safekeepers {
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
spec.safekeeper_connstrings = safekeeper_connstrings;
if let Some(g) = safekeeper_generation {
spec.safekeepers_generation = Some(g.into_inner());
}
}
let client = reqwest::Client::builder()
@@ -1046,24 +1038,6 @@ impl Endpoint {
}
}
pub async fn reconfigure_pageservers(
&self,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
stripe_size: Option<ShardStripeSize>,
) -> Result<()> {
self.reconfigure(Some(pageservers), stripe_size, None, None)
.await
}
pub async fn reconfigure_safekeepers(
&self,
safekeepers: Vec<NodeId>,
generation: SafekeeperGeneration,
) -> Result<()> {
self.reconfigure(None, None, Some(safekeepers), Some(generation))
.await
}
pub async fn stop(
&self,
mode: EndpointTerminateMode,

View File

@@ -4,7 +4,6 @@
"timestamp": "2022-10-12T18:00:00.000Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8c",
"suspend_timeout_seconds": -1,
"cluster": {
"cluster_id": "docker_compose",

View File

@@ -184,11 +184,6 @@ pub struct ComputeSpec {
/// Download LFC state from endpoint_storage and pass it to Postgres on startup
#[serde(default)]
pub autoprewarm: bool,
/// Suspend timeout in seconds.
///
/// We use this value to derive other values, such as the installed extensions metric.
pub suspend_timeout_seconds: i64,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.

View File

@@ -3,7 +3,6 @@
"timestamp": "2021-05-23T18:25:43.511Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
"suspend_timeout_seconds": 3600,
"cluster": {
"cluster_id": "test-cluster-42",

View File

@@ -92,6 +92,7 @@
use std::{sync::Arc, time::Duration};
use once_cell::sync::Lazy;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use utils::{id::TimelineId, shard::TenantShardId};
@@ -117,6 +118,7 @@ pub struct RequestContext {
scope: Scope,
perf_span: Option<PerfSpan>,
perf_span_dispatch: Option<Dispatch>,
cancel: CancellationToken,
}
#[derive(Clone)]
@@ -263,6 +265,10 @@ pub struct RequestContextBuilder {
impl RequestContextBuilder {
/// A new builder with default settings
pub fn new(task_kind: TaskKind) -> Self {
Self::new_with_cancel(task_kind, CancellationToken::new())
}
/// A new builder with default settings, with ability to specify the cancellation token
pub(crate) fn new_with_cancel(task_kind: TaskKind, cancel: CancellationToken) -> Self {
Self {
inner: RequestContext {
task_kind,
@@ -273,6 +279,7 @@ impl RequestContextBuilder {
scope: Scope::new_global(),
perf_span: None,
perf_span_dispatch: None,
cancel,
},
}
}
@@ -358,11 +365,18 @@ impl RequestContextBuilder {
self.inner
}
pub fn attached_child(self) -> RequestContext {
pub fn attached_child(mut self) -> RequestContext {
self.inner.cancel = self.inner.cancel.child_token();
self.inner
}
pub fn detached_child(self) -> RequestContext {
pub fn detached_child(mut self) -> RequestContext {
self.inner.cancel = CancellationToken::new();
self.inner
}
pub fn detached_child_with_cancel(mut self, cancel: CancellationToken) -> RequestContext {
self.inner.cancel = cancel;
self.inner
}
}
@@ -382,6 +396,7 @@ impl RequestContext {
scope: self.scope.clone(),
perf_span: self.perf_span.clone(),
perf_span_dispatch: self.perf_span_dispatch.clone(),
cancel: self.cancel.clone(),
}
}
@@ -427,6 +442,19 @@ impl RequestContext {
.detached_child()
}
/// Like [`Self::detached_child`], but with the ability to specify the cancellation token
pub fn detached_child_with_cancel(
&self,
task_kind: TaskKind,
download_behavior: DownloadBehavior,
cancel: CancellationToken,
) -> Self {
RequestContextBuilder::from(self)
.task_kind(task_kind)
.download_behavior(download_behavior)
.detached_child_with_cancel(cancel)
}
/// Create a child of context `self` for a task that shall not outlive `self`.
///
/// Use this when fanning-out work to other async tasks.
@@ -603,6 +631,10 @@ impl RequestContext {
pub(crate) fn has_perf_span(&self) -> bool {
self.perf_span.is_some()
}
pub(crate) fn cancellation_token(&self) -> &CancellationToken {
&self.cancel
}
}
/// [`Future`] extension trait that allow for creating performance

View File

@@ -6,13 +6,12 @@ use posthog_client_lite::{
CaptureEvent, FeatureResolverBackgroundLoop, PostHogEvaluationError,
PostHogFlagFilterPropertyValue,
};
use rand::Rng;
use remote_storage::RemoteStorageKind;
use serde_json::json;
use tokio_util::sync::CancellationToken;
use utils::id::TenantId;
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION, tenant::TenantShard};
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION};
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
@@ -139,7 +138,6 @@ impl FeatureResolver {
}
Arc::new(properties)
};
let fake_tenants = {
let mut tenants = Vec::new();
for i in 0..10 {
@@ -149,16 +147,9 @@ impl FeatureResolver {
conf.id,
i
);
let tenant_properties = PerTenantProperties {
remote_size_mb: Some(rand::thread_rng().gen_range(100.0..1000000.00)),
}
.into_posthog_properties();
let properties = Self::collect_properties_inner(
distinct_id.clone(),
Some(&internal_properties),
&tenant_properties,
);
tenants.push(CaptureEvent {
event: "initial_tenant_report".to_string(),
@@ -192,7 +183,6 @@ impl FeatureResolver {
fn collect_properties_inner(
tenant_id: String,
internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> HashMap<String, PostHogFlagFilterPropertyValue> {
let mut properties = HashMap::new();
if let Some(internal_properties) = internal_properties {
@@ -204,9 +194,6 @@ impl FeatureResolver {
"tenant_id".to_string(),
PostHogFlagFilterPropertyValue::String(tenant_id),
);
for (key, value) in tenant_properties.iter() {
properties.insert(key.clone(), value.clone());
}
properties
}
@@ -214,13 +201,8 @@ impl FeatureResolver {
pub(crate) fn collect_properties(
&self,
tenant_id: TenantId,
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> HashMap<String, PostHogFlagFilterPropertyValue> {
Self::collect_properties_inner(
tenant_id.to_string(),
self.internal_properties.as_deref(),
tenant_properties,
)
Self::collect_properties_inner(tenant_id.to_string(), self.internal_properties.as_deref())
}
/// Evaluate a multivariate feature flag. Currently, we do not support any properties.
@@ -232,7 +214,6 @@ impl FeatureResolver {
&self,
flag_key: &str,
tenant_id: TenantId,
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<String, PostHogEvaluationError> {
let force_overrides = self.force_overrides_for_testing.load();
if let Some(value) = force_overrides.get(flag_key) {
@@ -243,7 +224,7 @@ impl FeatureResolver {
let res = inner.feature_store().evaluate_multivariate(
flag_key,
&tenant_id.to_string(),
&self.collect_properties(tenant_id, tenant_properties),
&self.collect_properties(tenant_id),
);
match &res {
Ok(value) => {
@@ -276,7 +257,6 @@ impl FeatureResolver {
&self,
flag_key: &str,
tenant_id: TenantId,
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<(), PostHogEvaluationError> {
let force_overrides = self.force_overrides_for_testing.load();
if let Some(value) = force_overrides.get(flag_key) {
@@ -291,7 +271,7 @@ impl FeatureResolver {
let res = inner.feature_store().evaluate_boolean(
flag_key,
&tenant_id.to_string(),
&self.collect_properties(tenant_id, tenant_properties),
&self.collect_properties(tenant_id),
);
match &res {
Ok(()) => {
@@ -337,78 +317,3 @@ impl FeatureResolver {
.store(Arc::new(force_overrides));
}
}
struct PerTenantProperties {
pub remote_size_mb: Option<f64>,
}
impl PerTenantProperties {
pub fn into_posthog_properties(self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
let mut properties = HashMap::new();
if let Some(remote_size_mb) = self.remote_size_mb {
properties.insert(
"tenant_remote_size_mb".to_string(),
PostHogFlagFilterPropertyValue::Number(remote_size_mb),
);
}
properties
}
}
#[derive(Clone)]
pub struct TenantFeatureResolver {
inner: FeatureResolver,
tenant_id: TenantId,
cached_tenant_properties: Arc<ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>>,
}
impl TenantFeatureResolver {
pub fn new(inner: FeatureResolver, tenant_id: TenantId) -> Self {
Self {
inner,
tenant_id,
cached_tenant_properties: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
}
}
pub fn evaluate_multivariate(&self, flag_key: &str) -> Result<String, PostHogEvaluationError> {
self.inner.evaluate_multivariate(
flag_key,
self.tenant_id,
&self.cached_tenant_properties.load(),
)
}
pub fn evaluate_boolean(&self, flag_key: &str) -> Result<(), PostHogEvaluationError> {
self.inner.evaluate_boolean(
flag_key,
self.tenant_id,
&self.cached_tenant_properties.load(),
)
}
pub fn collect_properties(&self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
self.inner
.collect_properties(self.tenant_id, &self.cached_tenant_properties.load())
}
pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
self.inner.is_feature_flag_boolean(flag_key)
}
pub fn update_cached_tenant_properties(&self, tenant_shard: &TenantShard) {
let mut remote_size_mb = None;
for timeline in tenant_shard.list_timelines() {
let size = timeline.metrics.resident_physical_size_get();
if size == 0 {
remote_size_mb = None;
}
if let Some(ref mut remote_size_mb) = remote_size_mb {
*remote_size_mb += size as f64 / 1024.0 / 1024.0;
}
}
self.cached_tenant_properties.store(Arc::new(
PerTenantProperties { remote_size_mb }.into_posthog_properties(),
));
}
}

View File

@@ -3697,25 +3697,23 @@ async fn tenant_evaluate_feature_flag(
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
// TODO: the properties we get here might be stale right after it is collected. But such races are rare (updated every 10s)
// and we don't need to worry about it for now.
let properties = tenant.feature_resolver.collect_properties();
let properties = tenant.feature_resolver.collect_properties(tenant_shard_id.tenant_id);
if as_type.as_deref() == Some("boolean") {
let result = tenant.feature_resolver.evaluate_boolean(&flag);
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
let result = result.map(|_| true).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else if as_type.as_deref() == Some("multivariate") {
let result = tenant.feature_resolver.evaluate_multivariate(&flag).map_err(|e| e.to_string());
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else {
// Auto infer the type of the feature flag.
let is_boolean = tenant.feature_resolver.is_feature_flag_boolean(&flag).map_err(|e| ApiError::InternalServerError(anyhow::anyhow!("{e}")))?;
if is_boolean {
let result = tenant.feature_resolver.evaluate_boolean(&flag);
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
let result = result.map(|_| true).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else {
let result = tenant.feature_resolver.evaluate_multivariate(&flag).map_err(|e| e.to_string());
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
}
}

View File

@@ -86,7 +86,7 @@ use crate::context;
use crate::context::RequestContextBuilder;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
use crate::feature_resolver::{FeatureResolver, TenantFeatureResolver};
use crate::feature_resolver::FeatureResolver;
use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::{
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
@@ -386,7 +386,7 @@ pub struct TenantShard {
l0_flush_global_state: L0FlushGlobalState,
pub(crate) feature_resolver: TenantFeatureResolver,
pub(crate) feature_resolver: FeatureResolver,
}
impl std::fmt::Debug for TenantShard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -3263,7 +3263,7 @@ impl TenantShard {
};
let gc_compaction_strategy = self
.feature_resolver
.evaluate_multivariate("gc-comapction-strategy")
.evaluate_multivariate("gc-comapction-strategy", self.tenant_shard_id.tenant_id)
.ok();
let span = if let Some(gc_compaction_strategy) = gc_compaction_strategy {
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id, strategy = %gc_compaction_strategy)
@@ -3408,9 +3408,6 @@ impl TenantShard {
if let Some(ref walredo_mgr) = self.walredo_mgr {
walredo_mgr.maybe_quiesce(WALREDO_IDLE_TIMEOUT);
}
// Update the feature resolver with the latest tenant-spcific data.
self.feature_resolver.update_cached_tenant_properties(self);
}
pub fn timeline_has_no_attached_children(&self, timeline_id: TimelineId) -> bool {
@@ -4493,10 +4490,7 @@ impl TenantShard {
gc_block: Default::default(),
l0_flush_global_state,
basebackup_cache,
feature_resolver: TenantFeatureResolver::new(
feature_resolver,
tenant_shard_id.tenant_id,
),
feature_resolver,
}
}

View File

@@ -727,7 +727,7 @@ impl RemoteTimelineClient {
reason: "no need for a downloads gauge",
},
);
download::download_layer_file(
let fut = download::download_layer_file(
self.conf,
&self.storage_impl,
self.tenant_shard_id,
@@ -744,8 +744,11 @@ impl RemoteTimelineClient {
RemoteOpFileKind::Layer,
RemoteOpKind::Download,
Arc::clone(&self.metrics),
)
.await?
);
/*tokio::select! {
res = fut => res,
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
}*/fut.await?
};
REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();

View File

@@ -190,7 +190,13 @@ async fn download_object(
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
pausable_failpoint!("before-downloading-layer-stream-pausable");
tracing::info!("Starting layer download");
pausable_failpoint!(
"before-downloading-layer-stream-pausable",
ctx.cancellation_token()
)
.map_err(|_| DownloadError::Cancelled)?;
let dst_path = destination_file.path().to_owned();
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(

View File

@@ -182,7 +182,7 @@ impl BatchLayerWriter {
/// An image writer that takes images and produces multiple image layers.
#[must_use]
pub struct SplitImageLayerWriter<'a> {
inner: Option<ImageLayerWriter>,
inner: ImageLayerWriter,
target_layer_size: u64,
lsn: Lsn,
conf: &'static PageServerConf,
@@ -196,7 +196,7 @@ pub struct SplitImageLayerWriter<'a> {
impl<'a> SplitImageLayerWriter<'a> {
#[allow(clippy::too_many_arguments)]
pub fn new(
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
@@ -205,10 +205,22 @@ impl<'a> SplitImageLayerWriter<'a> {
target_layer_size: u64,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
) -> Self {
Self {
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
inner: None,
// XXX make this lazy like in SplitDeltaLayerWriter?
inner: ImageLayerWriter::new(
conf,
timeline_id,
tenant_shard_id,
&(start_key..Key::MAX),
lsn,
gate,
cancel.clone(),
ctx,
)
.await?,
conf,
timeline_id,
tenant_shard_id,
@@ -217,7 +229,7 @@ impl<'a> SplitImageLayerWriter<'a> {
start_key,
gate,
cancel,
}
})
}
pub async fn put_image(
@@ -226,31 +238,12 @@ impl<'a> SplitImageLayerWriter<'a> {
img: Bytes,
ctx: &RequestContext,
) -> Result<(), PutError> {
if self.inner.is_none() {
self.inner = Some(
ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&(self.start_key..Key::MAX),
self.lsn,
self.gate,
self.cancel.clone(),
ctx,
)
.await
.map_err(PutError::Other)?,
);
}
let inner = self.inner.as_mut().unwrap();
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
if inner.num_keys() >= 1
&& inner.estimated_size() + addition_size_estimation >= self.target_layer_size
if self.inner.num_keys() >= 1
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
{
let next_image_writer = ImageLayerWriter::new(
self.conf,
@@ -264,7 +257,7 @@ impl<'a> SplitImageLayerWriter<'a> {
)
.await
.map_err(PutError::Other)?;
let prev_image_writer = std::mem::replace(inner, next_image_writer);
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.batches.add_unfinished_image_writer(
prev_image_writer,
self.start_key..key,
@@ -272,7 +265,7 @@ impl<'a> SplitImageLayerWriter<'a> {
);
self.start_key = key;
}
inner.put_image(key, img, ctx).await
self.inner.put_image(key, img, ctx).await
}
pub(crate) async fn finish_with_discard_fn<D, F>(
@@ -289,10 +282,8 @@ impl<'a> SplitImageLayerWriter<'a> {
let Self {
mut batches, inner, ..
} = self;
if let Some(inner) = inner {
if inner.num_keys() != 0 {
batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
}
if inner.num_keys() != 0 {
batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
}
batches.finish_with_discard_fn(tline, ctx, discard_fn).await
}
@@ -507,7 +498,10 @@ mod tests {
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
);
&ctx,
)
.await
.unwrap();
let mut delta_writer = SplitDeltaLayerWriter::new(
tenant.conf,
@@ -583,7 +577,10 @@ mod tests {
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
);
&ctx,
)
.await
.unwrap();
let mut delta_writer = SplitDeltaLayerWriter::new(
tenant.conf,
tline.timeline_id,
@@ -679,7 +676,10 @@ mod tests {
4 * 1024,
&tline.gate,
tline.cancel.clone(),
);
&ctx,
)
.await
.unwrap();
let mut delta_writer = SplitDeltaLayerWriter::new(
tenant.conf,

View File

@@ -337,10 +337,14 @@ impl Layer {
})
.attached_child();
self.0
// good
let fut = self.0
.get_or_maybe_download(true, &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone())
.await
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone());
/*tokio::select! {
res = fut => res,
_ = ctx.cancellation_token().cancelled() => return Err(GetVectoredError::Cancelled),
}*/fut.await
.map_err(|err| match err {
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
GetVectoredError::Cancelled
@@ -1041,10 +1045,11 @@ impl LayerInner {
//
// if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
// pending eviction will try to evict even upon finding an uninitialized `self.inner`.
let needs_download = self
.needs_download()
.await
.map_err(DownloadError::PreStatFailed);
let needs_download = tokio::select! {
dl = self.needs_download() => dl,
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
}
.map_err(DownloadError::PreStatFailed);
scopeguard::ScopeGuard::into_inner(init_cancelled);
@@ -1054,8 +1059,12 @@ impl LayerInner {
// the file is present locally because eviction has not had a chance to run yet
#[cfg(test)]
self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
.await?;
//self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
//.await?;
tokio::select! {
dl = self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload) => dl?,
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
}
LAYER_IMPL_METRICS.inc_init_needed_no_download();
@@ -1092,10 +1101,16 @@ impl LayerInner {
tracing::info!(%reason, "downloading on-demand");
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
let res = self
let fut = self
.download_init_and_wait(timeline, permit, ctx.attached_child())
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
.await?;
;
let res = tokio::select! {
res = fut => res,
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
}?;
// bad
//fut.await?;
scopeguard::ScopeGuard::into_inner(init_cancelled);
Ok(res)

View File

@@ -106,7 +106,7 @@ use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
use crate::feature_resolver::TenantFeatureResolver;
use crate::feature_resolver::FeatureResolver;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::l0_flush::{self, L0FlushGlobalState};
use crate::metrics::{
@@ -202,7 +202,7 @@ pub struct TimelineResources {
pub l0_compaction_trigger: Arc<Notify>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
pub basebackup_cache: Arc<BasebackupCache>,
pub feature_resolver: TenantFeatureResolver,
pub feature_resolver: FeatureResolver,
}
pub struct Timeline {
@@ -450,7 +450,7 @@ pub struct Timeline {
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_cache: Arc<BasebackupCache>,
feature_resolver: TenantFeatureResolver,
feature_resolver: FeatureResolver,
}
pub(crate) enum PreviousHeatmap {
@@ -1345,9 +1345,12 @@ impl Timeline {
})
.attached_child();
self.get_vectored_reconstruct_data(query.clone(), reconstruct_state, &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
let fut = self.get_vectored_reconstruct_data(query.clone(), reconstruct_state, &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone());
/*tokio::select! {
res = fut => res,
_ = ctx.cancellation_token().cancelled() => return Err(GetVectoredError::Cancelled),
}*/fut.await
};
if let Err(err) = traversal_res {
@@ -4441,7 +4444,7 @@ impl Timeline {
let mut image_covered_keyspace = KeySpaceRandomAccum::new();
while let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
if cancel.is_cancelled() {
if cancel.is_cancelled() || ctx.cancellation_token().is_cancelled() {
return Err(GetVectoredError::Cancelled);
}
@@ -5308,7 +5311,6 @@ impl Timeline {
ctx: &RequestContext,
img_range: Range<Key>,
io_concurrency: IoConcurrency,
progress: Option<(usize, usize)>,
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
let mut wrote_keys = false;
@@ -5385,15 +5387,11 @@ impl Timeline {
}
}
let progress_report = progress
.map(|(idx, total)| format!("({idx}/{total}) "))
.unwrap_or_default();
if wrote_keys {
// Normal path: we have written some data into the new image layer for this
// partition, so flush it to disk.
info!(
"{} produced image layer for rel {}",
progress_report,
"produced image layer for rel {}",
ImageLayerName {
key_range: img_range.clone(),
lsn
@@ -5403,12 +5401,7 @@ impl Timeline {
unfinished_image_layer: image_layer_writer,
})
} else {
tracing::debug!(
"{} no data in range {}-{}",
progress_report,
img_range.start,
img_range.end
);
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
Ok(ImageLayerCreationOutcome::Empty)
}
}
@@ -5643,8 +5636,7 @@ impl Timeline {
}
}
let total = partition_parts.len();
for (idx, partition) in partition_parts.iter().enumerate() {
for partition in partition_parts.iter() {
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
}
@@ -5729,7 +5721,6 @@ impl Timeline {
ctx,
img_range.clone(),
io_concurrency,
Some((idx, total)),
)
.await?
} else {

View File

@@ -9,7 +9,7 @@ use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::{Duration, Instant};
use super::layer_manager::LayerManagerLockHolder;
use super::layer_manager::{LayerManagerLockHolder, LayerManagerReadGuard};
use super::{
CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
@@ -101,11 +101,7 @@ pub enum GcCompactionQueueItem {
/// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
auto: bool,
},
SubCompactionJob {
i: usize,
total: usize,
options: CompactOptions,
},
SubCompactionJob(CompactOptions),
Notify(GcCompactionJobId, Option<Lsn>),
}
@@ -167,7 +163,7 @@ impl GcCompactionQueueItem {
running,
job_id: id.0,
}),
GcCompactionQueueItem::SubCompactionJob { options, .. } => Some(CompactInfoResponse {
GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
compact_key_range: options.compact_key_range,
compact_lsn_range: options.compact_lsn_range,
sub_compaction: options.sub_compaction,
@@ -493,7 +489,7 @@ impl GcCompactionQueue {
.map(|job| job.compact_lsn_range.end)
.max()
.unwrap();
for (i, job) in jobs.into_iter().enumerate() {
for job in jobs {
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
// until we do further refactors to allow directly call `compact_with_gc`.
let mut flags: EnumSet<CompactFlags> = EnumSet::default();
@@ -511,11 +507,7 @@ impl GcCompactionQueue {
compact_lsn_range: Some(job.compact_lsn_range.into()),
sub_compaction_max_job_size_mb: None,
};
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob {
options,
i,
total: jobs_len,
});
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
}
if !auto {
@@ -659,7 +651,7 @@ impl GcCompactionQueue {
}
}
}
GcCompactionQueueItem::SubCompactionJob { options, i, total } => {
GcCompactionQueueItem::SubCompactionJob(options) => {
// TODO: error handling, clear the queue if any task fails?
let _gc_guard = match gc_block.start().await {
Ok(guard) => guard,
@@ -671,7 +663,6 @@ impl GcCompactionQueue {
)));
}
};
info!("running gc-compaction subcompaction job {}/{}", i, total);
let res = timeline.compact_with_options(cancel, options, ctx).await;
let compaction_result = match res {
Ok(res) => res,
@@ -1319,7 +1310,7 @@ impl Timeline {
|| cfg!(feature = "testing")
|| self
.feature_resolver
.evaluate_boolean("image-compaction-boundary")
.evaluate_boolean("image-compaction-boundary", self.tenant_shard_id.tenant_id)
.is_ok()
{
let last_repartition_lsn = self.partitioning.read().1;
@@ -1600,15 +1591,13 @@ impl Timeline {
let started = Instant::now();
let mut replace_image_layers = Vec::new();
let total = layers_to_rewrite.len();
for (i, layer) in layers_to_rewrite.into_iter().enumerate() {
for layer in layers_to_rewrite {
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total);
info!(layer=%layer, "rewriting layer after shard split");
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
@@ -1790,14 +1779,20 @@ impl Timeline {
} = {
let phase1_span = info_span!("compact_level0_phase1");
let ctx = ctx.attached_child();
let stats = CompactLevel0Phase1StatsBuilder {
let mut stats = CompactLevel0Phase1StatsBuilder {
version: Some(2),
tenant_id: Some(self.tenant_shard_id),
timeline_id: Some(self.timeline_id),
..Default::default()
};
let begin = tokio::time::Instant::now();
let phase1_layers_locked = self.layers.read(LayerManagerLockHolder::Compaction).await;
let now = tokio::time::Instant::now();
stats.read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
self.compact_level0_phase1(
phase1_layers_locked,
stats,
target_file_size,
force_compaction_ignore_threshold,
@@ -1818,19 +1813,16 @@ impl Timeline {
}
/// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
async fn compact_level0_phase1(
self: &Arc<Self>,
async fn compact_level0_phase1<'a>(
self: &'a Arc<Self>,
guard: LayerManagerReadGuard<'a>,
mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64,
force_compaction_ignore_threshold: bool,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let begin = tokio::time::Instant::now();
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let now = tokio::time::Instant::now();
stats.read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
stats.read_lock_held_spawn_blocking_startup_micros =
stats.read_lock_acquisition_micros.till_now(); // set by caller
let layers = guard.layer_map()?;
let level0_deltas = layers.level0_deltas();
stats.level0_deltas_count = Some(level0_deltas.len());
@@ -1865,12 +1857,6 @@ impl Timeline {
.map(|x| guard.get_from_desc(x))
.collect::<Vec<_>>();
drop_layer_manager_rlock(guard);
// The is the last LSN that we have seen for L0 compaction in the timeline. This LSN might be updated
// by the time we finish the compaction. So we need to get it here.
let l0_last_record_lsn = self.get_last_record_lsn();
// Gather the files to compact in this iteration.
//
// Start with the oldest Level 0 delta file, and collect any other
@@ -1958,7 +1944,9 @@ impl Timeline {
// we don't accidentally use it later in the function.
drop(level0_deltas);
stats.compaction_prerequisites_micros = stats.read_lock_acquisition_micros.till_now();
stats.read_lock_held_prerequisites_micros = stats
.read_lock_held_spawn_blocking_startup_micros
.till_now();
// TODO: replace with streaming k-merge
let all_keys = {
@@ -1980,7 +1968,7 @@ impl Timeline {
all_keys
};
stats.read_lock_held_key_sort_micros = stats.compaction_prerequisites_micros.till_now();
stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
// Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
//
@@ -2014,6 +2002,7 @@ impl Timeline {
}
}
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
@@ -2032,12 +2021,8 @@ impl Timeline {
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size = {
// TODO: optimize this with copy-on-write layer map.
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let layers = guard.layer_map()?;
layers.image_coverage(&key_range, l0_last_record_lsn).len()
};
let coverage_size =
layers.image_coverage(&key_range, last_record_lsn).len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
@@ -2056,6 +2041,7 @@ impl Timeline {
holes
};
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
drop_layer_manager_rlock(guard);
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
@@ -2396,8 +2382,9 @@ struct CompactLevel0Phase1StatsBuilder {
tenant_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
read_lock_acquisition_micros: DurationRecorder,
read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
read_lock_held_key_sort_micros: DurationRecorder,
compaction_prerequisites_micros: DurationRecorder,
read_lock_held_prerequisites_micros: DurationRecorder,
read_lock_held_compute_holes_micros: DurationRecorder,
read_lock_drop_micros: DurationRecorder,
write_layer_files_micros: DurationRecorder,
@@ -2412,8 +2399,9 @@ struct CompactLevel0Phase1Stats {
tenant_id: TenantShardId,
timeline_id: TimelineId,
read_lock_acquisition_micros: RecordedDuration,
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
read_lock_held_key_sort_micros: RecordedDuration,
compaction_prerequisites_micros: RecordedDuration,
read_lock_held_prerequisites_micros: RecordedDuration,
read_lock_held_compute_holes_micros: RecordedDuration,
read_lock_drop_micros: RecordedDuration,
write_layer_files_micros: RecordedDuration,
@@ -2438,12 +2426,16 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
.read_lock_acquisition_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
read_lock_held_spawn_blocking_startup_micros: value
.read_lock_held_spawn_blocking_startup_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
read_lock_held_key_sort_micros: value
.read_lock_held_key_sort_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
compaction_prerequisites_micros: value
.compaction_prerequisites_micros
read_lock_held_prerequisites_micros: value
.read_lock_held_prerequisites_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
read_lock_held_compute_holes_micros: value
@@ -3511,16 +3503,22 @@ impl Timeline {
// Only create image layers when there is no ancestor branches. TODO: create covering image layer
// when some condition meet.
let mut image_layer_writer = if !has_data_below {
Some(SplitImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
job_desc.compaction_key_range.start,
lowest_retain_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
))
Some(
SplitImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
job_desc.compaction_key_range.start,
lowest_retain_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
.context("failed to create image layer writer")
.map_err(CompactionError::Other)?,
)
} else {
None
};
@@ -4354,7 +4352,6 @@ impl TimelineAdaptor {
ctx,
key_range.clone(),
IoConcurrency::sequential(),
None,
)
.await?;

View File

@@ -885,7 +885,7 @@ async fn remote_copy(
}
tracing::info!("Deleting orphan layer file to make way for hard linking");
// Delete orphan layer file and try again, to ensure this layer has a well understood source
std::fs::remove_file(&adoptee_path)
std::fs::remove_file(adopted_path)
.map_err(|e| Error::launder(e.into(), Error::Prepare))?;
std::fs::hard_link(adopted_path, &adoptee_path)
.map_err(|e| Error::launder(e.into(), Error::Prepare))?;

View File

@@ -36,7 +36,7 @@ use utils::postgres_client::{ConnectionConfigArgs, wal_stream_connection_config}
use super::walreceiver_connection::{WalConnectionStatus, WalReceiverError};
use super::{TaskEvent, TaskHandle, TaskStateUpdate, WalReceiverConf};
use crate::context::{DownloadBehavior, RequestContext};
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::metrics::{
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
@@ -536,16 +536,18 @@ impl ConnectionManagerState {
let protocol = self.conf.protocol;
let validate_wal_contiguity = self.conf.validate_wal_contiguity;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
TaskKind::WalReceiverConnectionHandler,
DownloadBehavior::Download,
);
let ctx_builder = RequestContextBuilder::from(ctx)
.task_kind(TaskKind::WalReceiverConnectionHandler)
.download_behavior(DownloadBehavior::Download);
let span = info_span!("connection", %node_id);
let connection_handle = self.spawn(move |events_sender, cancellation| {
async move {
debug_assert_current_span_has_tenant_and_timeline_id();
let ctx = ctx_builder.detached_child_with_cancel(cancellation.clone());
let res = super::walreceiver_connection::handle_walreceiver_connection(
timeline,
protocol,

View File

@@ -275,20 +275,12 @@ pub(super) async fn handle_walreceiver_connection(
let copy_stream = replication_client.copy_both_simple(&query).await?;
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
let walingest_future = WalIngest::new(timeline.as_ref(), startpoint, &ctx);
let walingest_res = select! {
walingest_res = walingest_future => walingest_res,
_ = cancellation.cancelled() => {
// We are doing reads in WalIngest::new, and those can hang as they come from the network.
// Timeline cancellation hits the walreceiver cancellation token before it hits the timeline global one.
debug!("Connection cancelled");
return Err(WalReceiverError::Cancelled);
},
};
let mut walingest = walingest_res.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
.await
.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let (format, compression) = match protocol {
PostgresClientProtocol::Interpreted {

View File

@@ -299,7 +299,7 @@ where
//
let mut request_storage = Some(request);
for attempt in 1.. {
if self.cancel.is_cancelled() {
if self.cancel.is_cancelled() || self.ctx.cancellation_token().is_cancelled() {
return Err(FlushTaskError::Cancelled);
}
let result = async {

View File

@@ -78,6 +78,16 @@ pub(crate) trait ReportableError: fmt::Display + Send + 'static {
fn get_error_kind(&self) -> ErrorKind;
}
impl ReportableError for postgres_client::error::Error {
fn get_error_kind(&self) -> ErrorKind {
if self.as_db_error().is_some() {
ErrorKind::Postgres
} else {
ErrorKind::Compute
}
}
}
/// Flattens `Result<Result<T>>` into `Result<T>`.
pub fn flatten_err<T>(r: Result<anyhow::Result<T>, JoinError>) -> anyhow::Result<T> {
r.context("join error").and_then(|x| x)

View File

@@ -404,15 +404,7 @@ impl ReportableError for HttpConnError {
fn get_error_kind(&self) -> ErrorKind {
match self {
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
HttpConnError::PostgresConnectionError(p) => {
if p.as_db_error().is_some() {
// postgres rejected the connection
ErrorKind::Postgres
} else {
// couldn't even reach postgres
ErrorKind::Compute
}
}
HttpConnError::PostgresConnectionError(p) => p.get_error_kind(),
HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
HttpConnError::ComputeCtl(_) => ErrorKind::Service,
HttpConnError::JwtPayloadError(_) => ErrorKind::User,

View File

@@ -22,7 +22,7 @@ use serde_json::Value;
use serde_json::value::RawValue;
use tokio::time::{self, Instant};
use tokio_util::sync::CancellationToken;
use tracing::{Level, debug, error, info};
use tracing::{debug, error, info};
use typed_json::json;
use url::Url;
use uuid::Uuid;
@@ -390,35 +390,12 @@ pub(crate) async fn handle(
let line = get(db_error, |db| db.line().map(|l| l.to_string()));
let routine = get(db_error, |db| db.routine());
match &e {
SqlOverHttpError::Postgres(e)
if e.as_db_error().is_some() && error_kind == ErrorKind::User =>
{
// this error contains too much info, and it's not an error we care about.
if tracing::enabled!(Level::DEBUG) {
tracing::debug!(
kind=error_kind.to_metric_label(),
error=%e,
msg=message,
"forwarding error to user"
);
} else {
tracing::info!(
kind = error_kind.to_metric_label(),
error = "bad query",
"forwarding error to user"
);
}
}
_ => {
tracing::info!(
kind=error_kind.to_metric_label(),
error=%e,
msg=message,
"forwarding error to user"
);
}
}
tracing::info!(
kind=error_kind.to_metric_label(),
error=%e,
msg=message,
"forwarding error to user"
);
json_response(
e.get_http_status_code(),
@@ -483,15 +460,7 @@ impl ReportableError for SqlOverHttpError {
SqlOverHttpError::ConnInfo(e) => e.get_error_kind(),
SqlOverHttpError::ResponseTooLarge(_) => ErrorKind::User,
SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User,
// customer initiated SQL errors.
SqlOverHttpError::Postgres(p) => {
if p.as_db_error().is_some() {
ErrorKind::User
} else {
ErrorKind::Compute
}
}
// proxy initiated SQL errors.
SqlOverHttpError::Postgres(p) => p.get_error_kind(),
SqlOverHttpError::InternalPostgres(p) => {
if p.as_db_error().is_some() {
ErrorKind::Service
@@ -499,7 +468,6 @@ impl ReportableError for SqlOverHttpError {
ErrorKind::Compute
}
}
// postgres returned a bad row format that we couldn't parse.
SqlOverHttpError::JsonConversion(_) => ErrorKind::Postgres,
SqlOverHttpError::Cancelled(c) => c.get_error_kind(),
}

View File

@@ -52,7 +52,7 @@ pub trait ResponseErrorMessageExt: Sized {
impl ResponseErrorMessageExt for reqwest::Response {
async fn error_from_body(self) -> Result<Self> {
let status = self.status();
if status.is_success() {
if !(status.is_client_error() || status.is_server_error()) {
return Ok(self);
}

View File

@@ -14,12 +14,11 @@ use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
use safekeeper_api::membership::SafekeeperGeneration;
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info_span};
use utils::backoff::{self};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::id::{NodeId, TenantId};
use crate::service::Config;
@@ -37,7 +36,7 @@ struct UnshardedComputeHookTenant {
preferred_az: Option<AvailabilityZone>,
// Must hold this lock to send a notification.
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>>,
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
}
struct ShardedComputeHookTenant {
stripe_size: ShardStripeSize,
@@ -50,7 +49,7 @@ struct ShardedComputeHookTenant {
// Must hold this lock to send a notification. The contents represent
// the last successfully sent notification, and are used to coalesce multiple
// updates by only sending when there is a chance since our last successful send.
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>>,
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
}
/// Represents our knowledge of the compute's state: we can update this when we get a
@@ -58,9 +57,9 @@ struct ShardedComputeHookTenant {
///
/// Should be wrapped in an Option<>, as we cannot always know the remote state.
#[derive(PartialEq, Eq, Debug)]
struct ComputeRemoteState<R> {
struct ComputeRemoteState {
// The request body which was acked by the compute
request: R,
request: ComputeHookNotifyRequest,
// Whether the cplane indicated that the state was applied to running computes, or just
// persisted. In the Neon control plane, this is the difference between a 423 response (meaning
@@ -68,36 +67,6 @@ struct ComputeRemoteState<R> {
applied: bool,
}
type ComputeRemoteTenantState = ComputeRemoteState<NotifyAttachRequest>;
type ComputeRemoteTimelineState = ComputeRemoteState<NotifySafekeepersRequest>;
/// The trait which define the handler-specific types and methods.
/// We have two implementations of this trait so far:
/// - [`ComputeHookTenant`] for tenant attach notifications ("/notify-attach")
/// - [`ComputeHookTimeline`] for safekeeper change notifications ("/notify-safekeepers")
trait ApiMethod {
/// Type of the key which identifies the resource.
/// It's either TenantId for tenant attach notifications,
/// or TenantTimelineId for safekeeper change notifications.
type Key: std::cmp::Eq + std::hash::Hash + Clone;
type Request: serde::Serialize + std::fmt::Debug;
const API_PATH: &'static str;
fn maybe_send(
&self,
key: Self::Key,
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState<Self::Request>>>>,
) -> MaybeSendResult<Self::Request, Self::Key>;
async fn notify_local(
env: &LocalEnv,
cplane: &ComputeControlPlane,
req: &Self::Request,
) -> Result<(), NotifyError>;
}
enum ComputeHookTenant {
Unsharded(UnshardedComputeHookTenant),
Sharded(ShardedComputeHookTenant),
@@ -128,7 +97,7 @@ impl ComputeHookTenant {
}
}
fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>> {
fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>> {
match self {
Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
@@ -222,136 +191,19 @@ impl ComputeHookTenant {
}
}
/// The state of a timeline we need to notify the compute about.
struct ComputeHookTimeline {
generation: SafekeeperGeneration,
safekeepers: Vec<SafekeeperInfo>,
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTimelineState>>>,
}
impl ComputeHookTimeline {
/// Construct a new ComputeHookTimeline with the given safekeepers and generation.
fn new(generation: SafekeeperGeneration, safekeepers: Vec<SafekeeperInfo>) -> Self {
Self {
generation,
safekeepers,
send_lock: Arc::default(),
}
}
/// Update the state with a new SafekeepersUpdate.
/// Noop if the update generation is not greater than the current generation.
fn update(&mut self, sk_update: SafekeepersUpdate) {
if sk_update.generation > self.generation {
self.generation = sk_update.generation;
self.safekeepers = sk_update.safekeepers;
}
}
}
impl ApiMethod for ComputeHookTimeline {
type Key = TenantTimelineId;
type Request = NotifySafekeepersRequest;
const API_PATH: &'static str = "notify-safekeepers";
fn maybe_send(
&self,
ttid: TenantTimelineId,
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteTimelineState>>>,
) -> MaybeSendNotifySafekeepersResult {
let locked = match lock {
Some(already_locked) => already_locked,
None => {
// Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::timelines`] lock.
let Ok(locked) = self.send_lock.clone().try_lock_owned() else {
return MaybeSendResult::AwaitLock((ttid, self.send_lock.clone()));
};
locked
}
};
if locked
.as_ref()
.is_some_and(|s| s.request.generation >= self.generation)
{
return MaybeSendResult::Noop;
}
MaybeSendResult::Transmit((
NotifySafekeepersRequest {
tenant_id: ttid.tenant_id,
timeline_id: ttid.timeline_id,
generation: self.generation,
safekeepers: self.safekeepers.clone(),
},
locked,
))
}
async fn notify_local(
_env: &LocalEnv,
cplane: &ComputeControlPlane,
req: &NotifySafekeepersRequest,
) -> Result<(), NotifyError> {
let NotifySafekeepersRequest {
tenant_id,
timeline_id,
generation,
safekeepers,
} = req;
for (endpoint_name, endpoint) in &cplane.endpoints {
if endpoint.tenant_id == *tenant_id
&& endpoint.timeline_id == *timeline_id
&& endpoint.status() == EndpointStatus::Running
{
tracing::info!("Reconfiguring safekeepers for endpoint {endpoint_name}");
let safekeepers = safekeepers.iter().map(|sk| sk.id).collect::<Vec<_>>();
endpoint
.reconfigure_safekeepers(safekeepers, *generation)
.await
.map_err(NotifyError::NeonLocal)?;
}
}
Ok(())
}
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct NotifyAttachRequestShard {
struct ComputeHookNotifyRequestShard {
node_id: NodeId,
shard_number: ShardNumber,
}
/// Request body that we send to the control plane to notify it of where a tenant is attached
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct NotifyAttachRequest {
struct ComputeHookNotifyRequest {
tenant_id: TenantId,
preferred_az: Option<String>,
stripe_size: Option<ShardStripeSize>,
shards: Vec<NotifyAttachRequestShard>,
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
pub(crate) struct SafekeeperInfo {
pub id: NodeId,
/// Hostname of the safekeeper.
/// It exists for better debuggability. Might be missing.
/// Should not be used for anything else.
pub hostname: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct NotifySafekeepersRequest {
tenant_id: TenantId,
timeline_id: TimelineId,
generation: SafekeeperGeneration,
safekeepers: Vec<SafekeeperInfo>,
shards: Vec<ComputeHookNotifyRequestShard>,
}
/// Error type for attempts to call into the control plane compute notification hook
@@ -383,50 +235,42 @@ pub(crate) enum NotifyError {
NeonLocal(anyhow::Error),
}
enum MaybeSendResult<R, K> {
enum MaybeSendResult {
// Please send this request while holding the lock, and if you succeed then write
// the request into the lock.
Transmit(
(
R,
tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState<R>>>,
ComputeHookNotifyRequest,
tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>,
),
),
// Something requires sending, but you must wait for a current sender then call again
AwaitLock((K, Arc<tokio::sync::Mutex<Option<ComputeRemoteState<R>>>>)),
AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>),
// Nothing requires sending
Noop,
}
type MaybeSendNotifyAttachResult = MaybeSendResult<NotifyAttachRequest, TenantId>;
type MaybeSendNotifySafekeepersResult = MaybeSendResult<NotifySafekeepersRequest, TenantTimelineId>;
impl ApiMethod for ComputeHookTenant {
type Key = TenantId;
type Request = NotifyAttachRequest;
const API_PATH: &'static str = "notify-attach";
impl ComputeHookTenant {
fn maybe_send(
&self,
tenant_id: TenantId,
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteTenantState>>>,
) -> MaybeSendNotifyAttachResult {
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>>,
) -> MaybeSendResult {
let locked = match lock {
Some(already_locked) => already_locked,
None => {
// Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::tenants`] lock.
// Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
return MaybeSendResult::AwaitLock((tenant_id, self.get_send_lock().clone()));
return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
};
locked
}
};
let request = match self {
Self::Unsharded(unsharded_tenant) => Some(NotifyAttachRequest {
Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
tenant_id,
shards: vec![NotifyAttachRequestShard {
shards: vec![ComputeHookNotifyRequestShard {
shard_number: ShardNumber(0),
node_id: unsharded_tenant.node_id,
}],
@@ -439,12 +283,12 @@ impl ApiMethod for ComputeHookTenant {
Self::Sharded(sharded_tenant)
if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
{
Some(NotifyAttachRequest {
Some(ComputeHookNotifyRequest {
tenant_id,
shards: sharded_tenant
.shards
.iter()
.map(|(shard_number, node_id)| NotifyAttachRequestShard {
.map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
shard_number: *shard_number,
node_id: *node_id,
})
@@ -489,22 +333,98 @@ impl ApiMethod for ComputeHookTenant {
}
}
}
}
async fn notify_local(
env: &LocalEnv,
cplane: &ComputeControlPlane,
req: &NotifyAttachRequest,
/// The compute hook is a destination for notifications about changes to tenant:pageserver
/// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
/// the compute connection string.
pub(super) struct ComputeHook {
config: Config,
state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
authorization_header: Option<String>,
// Concurrency limiter, so that we do not overload the cloud control plane when updating
// large numbers of tenants (e.g. when failing over after a node failure)
api_concurrency: tokio::sync::Semaphore,
// This lock is only used in testing enviroments, to serialize calls into neon_lock
neon_local_lock: tokio::sync::Mutex<()>,
// We share a client across all notifications to enable connection re-use etc when
// sending large numbers of notifications
client: reqwest::Client,
}
/// Callers may give us a list of these when asking us to send a bulk batch
/// of notifications in the background. This is a 'notification' in the sense of
/// other code notifying us of a shard's status, rather than being the final notification
/// that we send upwards to the control plane for the whole tenant.
pub(crate) struct ShardUpdate<'a> {
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) node_id: NodeId,
pub(crate) stripe_size: ShardStripeSize,
pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
}
impl ComputeHook {
pub(super) fn new(config: Config) -> anyhow::Result<Self> {
let authorization_header = config
.control_plane_jwt_token
.clone()
.map(|jwt| format!("Bearer {jwt}"));
let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
for cert in &config.ssl_ca_certs {
client = client.add_root_certificate(cert.clone());
}
let client = client
.build()
.context("Failed to build http client for compute hook")?;
Ok(Self {
state: Default::default(),
config,
authorization_header,
neon_local_lock: Default::default(),
api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
client,
})
}
/// For test environments: use neon_local's LocalEnv to update compute
async fn do_notify_local(
&self,
reconfigure_request: &ComputeHookNotifyRequest,
) -> Result<(), NotifyError> {
let NotifyAttachRequest {
// neon_local updates are not safe to call concurrently, use a lock to serialize
// all calls to this function
let _locked = self.neon_local_lock.lock().await;
let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
tracing::warn!(
"neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
);
return Ok(());
};
let env = match LocalEnv::load_config(repo_dir) {
Ok(e) => e,
Err(e) => {
tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
return Ok(());
}
};
let cplane =
ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
let ComputeHookNotifyRequest {
tenant_id,
shards,
stripe_size,
preferred_az: _preferred_az,
} = req;
} = reconfigure_request;
for (endpoint_name, endpoint) in &cplane.endpoints {
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
tracing::info!("Reconfiguring pageservers for endpoint {endpoint_name}");
tracing::info!("Reconfiguring endpoint {endpoint_name}");
let pageservers = shards
.iter()
@@ -526,7 +446,7 @@ impl ApiMethod for ComputeHookTenant {
.collect::<Vec<_>>();
endpoint
.reconfigure_pageservers(pageservers, *stripe_size)
.reconfigure(pageservers, *stripe_size, None)
.await
.map_err(NotifyError::NeonLocal)?;
}
@@ -534,102 +454,11 @@ impl ApiMethod for ComputeHookTenant {
Ok(())
}
}
/// The compute hook is a destination for notifications about changes to tenant:pageserver
/// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
/// the compute connection string.
pub(super) struct ComputeHook {
config: Config,
tenants: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
timelines: std::sync::Mutex<HashMap<TenantTimelineId, ComputeHookTimeline>>,
authorization_header: Option<String>,
// Concurrency limiter, so that we do not overload the cloud control plane when updating
// large numbers of tenants (e.g. when failing over after a node failure)
api_concurrency: tokio::sync::Semaphore,
// This lock is only used in testing enviroments, to serialize calls into neon_local
neon_local_lock: tokio::sync::Mutex<()>,
// We share a client across all notifications to enable connection re-use etc when
// sending large numbers of notifications
client: reqwest::Client,
}
/// Callers may give us a list of these when asking us to send a bulk batch
/// of notifications in the background. This is a 'notification' in the sense of
/// other code notifying us of a shard's status, rather than being the final notification
/// that we send upwards to the control plane for the whole tenant.
pub(crate) struct ShardUpdate<'a> {
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) node_id: NodeId,
pub(crate) stripe_size: ShardStripeSize,
pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
}
pub(crate) struct SafekeepersUpdate {
pub(crate) tenant_id: TenantId,
pub(crate) timeline_id: TimelineId,
pub(crate) generation: SafekeeperGeneration,
pub(crate) safekeepers: Vec<SafekeeperInfo>,
}
impl ComputeHook {
pub(super) fn new(config: Config) -> anyhow::Result<Self> {
let authorization_header = config
.control_plane_jwt_token
.clone()
.map(|jwt| format!("Bearer {jwt}"));
let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
for cert in &config.ssl_ca_certs {
client = client.add_root_certificate(cert.clone());
}
let client = client
.build()
.context("Failed to build http client for compute hook")?;
Ok(Self {
tenants: Default::default(),
timelines: Default::default(),
config,
authorization_header,
neon_local_lock: Default::default(),
api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
client,
})
}
/// For test environments: use neon_local's LocalEnv to update compute
async fn do_notify_local<M: ApiMethod>(&self, req: &M::Request) -> Result<(), NotifyError> {
// neon_local updates are not safe to call concurrently, use a lock to serialize
// all calls to this function
let _locked = self.neon_local_lock.lock().await;
let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
tracing::warn!(
"neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
);
return Ok(());
};
let env = match LocalEnv::load_config(repo_dir) {
Ok(e) => e,
Err(e) => {
tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
return Ok(());
}
};
let cplane =
ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
M::notify_local(&env, &cplane, req).await
}
async fn do_notify_iteration<Req: serde::Serialize + std::fmt::Debug>(
async fn do_notify_iteration(
&self,
url: &String,
reconfigure_request: &Req,
reconfigure_request: &ComputeHookNotifyRequest,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let req = self.client.request(reqwest::Method::PUT, url);
@@ -651,7 +480,9 @@ impl ComputeHook {
};
// Treat all 2xx responses as success
if response.status().is_success() {
if response.status() >= reqwest::StatusCode::OK
&& response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
{
if response.status() != reqwest::StatusCode::OK {
// Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
// log a warning.
@@ -702,10 +533,10 @@ impl ComputeHook {
}
}
async fn do_notify<R: serde::Serialize + std::fmt::Debug>(
async fn do_notify(
&self,
url: &String,
reconfigure_request: &R,
reconfigure_request: &ComputeHookNotifyRequest,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
// We hold these semaphore units across all retries, rather than only across each
@@ -737,13 +568,13 @@ impl ComputeHook {
}
/// Synchronous phase: update the per-tenant state for the next intended notification
fn notify_attach_prepare(&self, shard_update: ShardUpdate) -> MaybeSendNotifyAttachResult {
let mut tenants_locked = self.tenants.lock().unwrap();
fn notify_prepare(&self, shard_update: ShardUpdate) -> MaybeSendResult {
let mut state_locked = self.state.lock().unwrap();
use std::collections::hash_map::Entry;
let tenant_shard_id = shard_update.tenant_shard_id;
let tenant = match tenants_locked.entry(tenant_shard_id.tenant_id) {
let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(e) => {
let ShardUpdate {
tenant_shard_id,
@@ -767,37 +598,10 @@ impl ComputeHook {
tenant.maybe_send(tenant_shard_id.tenant_id, None)
}
fn notify_safekeepers_prepare(
async fn notify_execute(
&self,
safekeepers_update: SafekeepersUpdate,
) -> MaybeSendNotifySafekeepersResult {
let mut timelines_locked = self.timelines.lock().unwrap();
let ttid = TenantTimelineId {
tenant_id: safekeepers_update.tenant_id,
timeline_id: safekeepers_update.timeline_id,
};
use std::collections::hash_map::Entry;
let timeline = match timelines_locked.entry(ttid) {
Entry::Vacant(e) => e.insert(ComputeHookTimeline::new(
safekeepers_update.generation,
safekeepers_update.safekeepers,
)),
Entry::Occupied(e) => {
let timeline = e.into_mut();
timeline.update(safekeepers_update);
timeline
}
};
timeline.maybe_send(ttid, None)
}
async fn notify_execute<M: ApiMethod>(
&self,
state: &std::sync::Mutex<HashMap<M::Key, M>>,
maybe_send_result: MaybeSendResult<M::Request, M::Key>,
maybe_send_result: MaybeSendResult,
tenant_shard_id: TenantShardId,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
// Process result: we may get an update to send, or we may have to wait for a lock
@@ -806,7 +610,7 @@ impl ComputeHook {
MaybeSendResult::Noop => {
return Ok(());
}
MaybeSendResult::AwaitLock((key, send_lock)) => {
MaybeSendResult::AwaitLock(send_lock) => {
let send_locked = tokio::select! {
guard = send_lock.lock_owned() => {guard},
_ = cancel.cancelled() => {
@@ -817,11 +621,11 @@ impl ComputeHook {
// Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
// we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
// try_lock.
let state_locked = state.lock().unwrap();
let Some(resource_state) = state_locked.get(&key) else {
let state_locked = self.state.lock().unwrap();
let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
return Ok(());
};
match resource_state.maybe_send(key, Some(send_locked)) {
match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
MaybeSendResult::AwaitLock(_) => {
unreachable!("We supplied lock guard")
}
@@ -840,18 +644,14 @@ impl ComputeHook {
.control_plane_url
.as_ref()
.map(|control_plane_url| {
format!(
"{}/{}",
control_plane_url.trim_end_matches('/'),
M::API_PATH
)
format!("{}/notify-attach", control_plane_url.trim_end_matches('/'))
});
// We validate this at startup
let notify_url = compute_hook_url.as_ref().unwrap();
self.do_notify(notify_url, &request, cancel).await
} else {
self.do_notify_local::<M>(&request).await.map_err(|e| {
self.do_notify_local(&request).await.map_err(|e| {
// This path is for testing only, so munge the error into our prod-style error type.
tracing::error!("neon_local notification hook failed: {e}");
NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
@@ -887,7 +687,7 @@ impl ComputeHook {
/// Infallible synchronous fire-and-forget version of notify(), that sends its results to
/// a channel. Something should consume the channel and arrange to try notifying again
/// if something failed.
pub(super) fn notify_attach_background(
pub(super) fn notify_background(
self: &Arc<Self>,
notifications: Vec<ShardUpdate>,
result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
@@ -896,7 +696,7 @@ impl ComputeHook {
let mut maybe_sends = Vec::new();
for shard_update in notifications {
let tenant_shard_id = shard_update.tenant_shard_id;
let maybe_send_result = self.notify_attach_prepare(shard_update);
let maybe_send_result = self.notify_prepare(shard_update);
maybe_sends.push((tenant_shard_id, maybe_send_result))
}
@@ -915,10 +715,10 @@ impl ComputeHook {
async move {
this
.notify_execute(&this.tenants, maybe_send_result, &cancel)
.notify_execute(maybe_send_result, tenant_shard_id, &cancel)
.await.map_err(|e| (tenant_shard_id, e))
}.instrument(info_span!(
"notify_attach_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
"notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
))
})
.buffered(API_CONCURRENCY);
@@ -961,23 +761,14 @@ impl ComputeHook {
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
/// the proper pageserver nodes for a tenant.
#[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
pub(super) async fn notify_attach<'a>(
pub(super) async fn notify<'a>(
&self,
shard_update: ShardUpdate<'a>,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let maybe_send_result = self.notify_attach_prepare(shard_update);
self.notify_execute(&self.tenants, maybe_send_result, cancel)
.await
}
pub(super) async fn notify_safekeepers(
&self,
safekeepers_update: SafekeepersUpdate,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let maybe_send_result = self.notify_safekeepers_prepare(safekeepers_update);
self.notify_execute(&self.timelines, maybe_send_result, cancel)
let tenant_shard_id = shard_update.tenant_shard_id;
let maybe_send_result = self.notify_prepare(shard_update);
self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
.await
}
@@ -993,8 +784,8 @@ impl ComputeHook {
) {
use std::collections::hash_map::Entry;
let mut tenants_locked = self.tenants.lock().unwrap();
match tenants_locked.entry(tenant_shard_id.tenant_id) {
let mut state_locked = self.state.lock().unwrap();
match state_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(_) => {
// This is a valid but niche case, where the tenant was previously attached
// as a Secondary location and then detached, so has no previously notified

View File

@@ -65,7 +65,7 @@ pub(super) struct Reconciler {
pub(crate) compute_hook: Arc<ComputeHook>,
/// To avoid stalling if the cloud control plane is unavailable, we may proceed
/// past failures in [`ComputeHook::notify_attach`], but we _must_ remember that we failed
/// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
/// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry.
pub(crate) compute_notify_failure: bool,
@@ -1023,7 +1023,7 @@ impl Reconciler {
if let Some(node) = &self.intent.attached {
let result = self
.compute_hook
.notify_attach(
.notify(
compute_hook::ShardUpdate {
tenant_shard_id: self.tenant_shard_id,
node_id: node.get_id(),

View File

@@ -878,18 +878,18 @@ impl Service {
// Emit compute hook notifications for all tenants which are already stably attached. Other tenants
// will emit compute hook notifications when they reconcile.
//
// Ordering: our calls to notify_attach_background synchronously establish a relative order for these notifications vs. any later
// Ordering: our calls to notify_background synchronously establish a relative order for these notifications vs. any later
// calls into the ComputeHook for the same tenant: we can leave these to run to completion in the background and any later
// calls will be correctly ordered wrt these.
//
// Concurrency: we call notify_attach_background for all tenants, which will create O(N) tokio tasks, but almost all of them
// Concurrency: we call notify_background for all tenants, which will create O(N) tokio tasks, but almost all of them
// will just wait on the ComputeHook::API_CONCURRENCY semaphore immediately, so very cheap until they get that semaphore
// unit and start doing I/O.
tracing::info!(
"Sending {} compute notifications",
compute_notifications.len()
);
self.compute_hook.notify_attach_background(
self.compute_hook.notify_background(
compute_notifications,
bg_compute_notify_result_tx.clone(),
&self.cancel,
@@ -6281,7 +6281,7 @@ impl Service {
for (child_id, child_ps, stripe_size) in child_locations {
if let Err(e) = self
.compute_hook
.notify_attach(
.notify(
compute_hook::ShardUpdate {
tenant_shard_id: child_id,
node_id: child_ps,

View File

@@ -4,7 +4,6 @@ use std::sync::Arc;
use std::time::Duration;
use super::safekeeper_reconciler::ScheduleRequest;
use crate::compute_hook;
use crate::heartbeater::SafekeeperState;
use crate::id_lock_map::trace_shared_lock;
use crate::metrics;
@@ -1199,11 +1198,7 @@ impl Service {
// 4. Call PUT configuration on safekeepers from the current set,
// delivering them joint_conf.
// Notify cplane/compute about the membership change BEFORE changing the membership on safekeepers.
// This way the compute will know about new safekeepers from joint_config before we require to
// collect a quorum from them.
self.cplane_notify_safekeepers(tenant_id, timeline_id, &joint_config)
.await?;
// TODO(diko): need to notify cplane with an updated set of safekeepers.
let results = self
.tenant_timeline_set_membership_quorum(
@@ -1310,55 +1305,8 @@ impl Service {
)
.await?;
// Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
// This way the compute will stop talking to excluded safekeepers only after we stop requiring to
// collect a quorum from them.
self.cplane_notify_safekeepers(tenant_id, timeline_id, &new_conf)
.await?;
// TODO(diko): need to notify cplane with an updated set of safekeepers.
Ok(())
}
/// Notify cplane about safekeeper membership change.
/// The cplane will receive a joint set of safekeepers as a safekeeper list.
async fn cplane_notify_safekeepers(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
mconf: &membership::Configuration,
) -> Result<(), ApiError> {
let mut safekeepers = Vec::new();
let mut ids: HashSet<_> = HashSet::new();
for member in mconf
.members
.m
.iter()
.chain(mconf.new_members.iter().flat_map(|m| m.m.iter()))
{
if ids.insert(member.id) {
safekeepers.push(compute_hook::SafekeeperInfo {
id: member.id,
hostname: Some(member.host.clone()),
});
}
}
self.compute_hook
.notify_safekeepers(
compute_hook::SafekeepersUpdate {
tenant_id,
timeline_id,
generation: mconf.generation,
safekeepers,
},
&self.cancel,
)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"failed to notify cplane about safekeeper membership change: {err}"
))
})
}
}

View File

@@ -724,21 +724,15 @@ class NeonEnvBuilder:
shutil.copytree(storcon_db_from_dir, storcon_db_to_dir, ignore=ignore_postgres_log)
assert not (storcon_db_to_dir / "postgres.log").exists()
# NB: neon_local rewrites postgresql.conf on each start based on neon_local config. No need to patch it.
# However, in this new NeonEnv, the pageservers and safekeepers listen on different ports, and the storage
# controller will currently reject re-attach requests from them because the NodeMetadata isn't identical.
# However, in this new NeonEnv, the pageservers listen on different ports, and the storage controller
# will currently reject re-attach requests from them because the NodeMetadata isn't identical.
# So, from_repo_dir patches up the the storcon database.
patch_script_path = self.repo_dir / "storage_controller_db.startup.sql"
assert not patch_script_path.exists()
patch_script = ""
for ps in self.env.pageservers:
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';\n"
for sk in self.env.safekeepers:
patch_script += f"UPDATE safekeepers SET http_port={sk.port.http}, port={sk.port.pg} WHERE id = '{sk.id}';\n"
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';"
patch_script_path.write_text(patch_script)
# Update the config with info about tenants and timelines

View File

@@ -76,7 +76,6 @@ if TYPE_CHECKING:
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION}
#
# # Build previous version of binaries and store them somewhere:
# rm -rf pg_install target
@@ -103,7 +102,6 @@ if TYPE_CHECKING:
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION}
# export NEON_BIN=target/${BUILD_TYPE}
# export POSTGRES_DISTRIB_DIR=pg_install
#

View File

@@ -399,7 +399,7 @@ def test_tx_abort_with_many_relations(
# How many relations: this number is tuned to be long enough to take tens of seconds
# if the rollback code path is buggy, tripping the test's timeout.
n = 5000
step = 500
step = 2500
def create():
# Create many relations

View File

@@ -32,13 +32,10 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
)
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["new_sk_set"] is None
assert len(mconf["sk_set"]) == 1
assert mconf["generation"] == 1
ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"])
# We specify all safekeepers, so compute will connect to all of them.
# Only those from the current membership configuration will be used.
# TODO(diko): set only current safekeepers when cplane notify is implemented.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
ep.safe_psql("CREATE TABLE t(a int)")
@@ -61,16 +58,7 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]
# 1 initial generation + 2 migrations on each loop iteration.
expected_gen = 1 + 2 * 3
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == expected_gen
assert ep.safe_psql("SHOW neon.safekeepers")[0][0].startswith(f"g#{expected_gen}:")
# Restart and check again to make sure data is persistent.
ep.stop()
ep.start(safekeeper_generation=1, safekeepers=[3])
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]