mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 23:50:39 +00:00
Compare commits
10 Commits
release-pr
...
skyzh/tx-a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5a48aafccb | ||
|
|
5ec8881c0b | ||
|
|
b254dce8a1 | ||
|
|
3815e3b2b5 | ||
|
|
bbcd70eab3 | ||
|
|
0934ce9bce | ||
|
|
4932963bac | ||
|
|
6d73cfa608 | ||
|
|
d2d9946bab | ||
|
|
daa402f35a |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -6,6 +6,7 @@
|
||||
/tmp_check_cli
|
||||
__pycache__/
|
||||
test_output/
|
||||
neon_previous/
|
||||
.vscode
|
||||
.idea
|
||||
*.swp
|
||||
|
||||
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -1305,6 +1305,7 @@ dependencies = [
|
||||
"fail",
|
||||
"flate2",
|
||||
"futures",
|
||||
"hostname-validator",
|
||||
"http 1.1.0",
|
||||
"indexmap 2.9.0",
|
||||
"itertools 0.10.5",
|
||||
@@ -2771,6 +2772,12 @@ dependencies = [
|
||||
"windows",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hostname-validator"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f558a64ac9af88b5ba400d99b579451af0d39c6d360980045b91aac966d705e2"
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "0.2.9"
|
||||
|
||||
@@ -1983,7 +1983,7 @@ RUN apt update && \
|
||||
locales \
|
||||
lsof \
|
||||
procps \
|
||||
rsyslog \
|
||||
rsyslog-gnutls \
|
||||
screen \
|
||||
tcpdump \
|
||||
$VERSION_INSTALLS && \
|
||||
|
||||
@@ -8,6 +8,8 @@
|
||||
import 'sql_exporter/compute_logical_snapshot_files.libsonnet',
|
||||
import 'sql_exporter/compute_logical_snapshots_bytes.libsonnet',
|
||||
import 'sql_exporter/compute_max_connections.libsonnet',
|
||||
import 'sql_exporter/compute_pg_oldest_frozen_xid_age.libsonnet',
|
||||
import 'sql_exporter/compute_pg_oldest_mxid_age.libsonnet',
|
||||
import 'sql_exporter/compute_receive_lsn.libsonnet',
|
||||
import 'sql_exporter/compute_subscriptions_count.libsonnet',
|
||||
import 'sql_exporter/connection_counts.libsonnet',
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
{
|
||||
metric_name: 'compute_pg_oldest_frozen_xid_age',
|
||||
type: 'gauge',
|
||||
help: 'Age of oldest XIDs that have not been frozen by VACUUM. An indicator of how long it has been since VACUUM last ran.',
|
||||
key_labels: [
|
||||
'database_name',
|
||||
],
|
||||
value_label: 'metric',
|
||||
values: [
|
||||
'frozen_xid_age',
|
||||
],
|
||||
query: importstr 'sql_exporter/compute_pg_oldest_frozen_xid_age.sql',
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
SELECT datname database_name,
|
||||
age(datfrozenxid) frozen_xid_age
|
||||
FROM pg_database
|
||||
ORDER BY frozen_xid_age DESC LIMIT 10;
|
||||
@@ -0,0 +1,13 @@
|
||||
{
|
||||
metric_name: 'compute_pg_oldest_mxid_age',
|
||||
type: 'gauge',
|
||||
help: 'Age of oldest MXIDs that have not been replaced by VACUUM. An indicator of how long it has been since VACUUM last ran.',
|
||||
key_labels: [
|
||||
'database_name',
|
||||
],
|
||||
value_label: 'metric',
|
||||
values: [
|
||||
'min_mxid_age',
|
||||
],
|
||||
query: importstr 'sql_exporter/compute_pg_oldest_mxid_age.sql',
|
||||
}
|
||||
4
compute/etc/sql_exporter/compute_pg_oldest_mxid_age.sql
Normal file
4
compute/etc/sql_exporter/compute_pg_oldest_mxid_age.sql
Normal file
@@ -0,0 +1,4 @@
|
||||
SELECT datname database_name,
|
||||
mxid_age(datminmxid) min_mxid_age
|
||||
FROM pg_database
|
||||
ORDER BY min_mxid_age DESC LIMIT 10;
|
||||
@@ -1,8 +1,8 @@
|
||||
diff --git a/sql/anon.sql b/sql/anon.sql
|
||||
index 0cdc769..f6cc950 100644
|
||||
index 0cdc769..b450327 100644
|
||||
--- a/sql/anon.sql
|
||||
+++ b/sql/anon.sql
|
||||
@@ -1141,3 +1141,8 @@ $$
|
||||
@@ -1141,3 +1141,15 @@ $$
|
||||
-- TODO : https://en.wikipedia.org/wiki/L-diversity
|
||||
|
||||
-- TODO : https://en.wikipedia.org/wiki/T-closeness
|
||||
@@ -11,6 +11,13 @@ index 0cdc769..f6cc950 100644
|
||||
+
|
||||
+GRANT ALL ON SCHEMA anon to neon_superuser;
|
||||
+GRANT ALL ON ALL TABLES IN SCHEMA anon TO neon_superuser;
|
||||
+
|
||||
+DO $$
|
||||
+BEGIN
|
||||
+ IF current_setting('server_version_num')::int >= 150000 THEN
|
||||
+ GRANT SET ON PARAMETER anon.transparent_dynamic_masking TO neon_superuser;
|
||||
+ END IF;
|
||||
+END $$;
|
||||
diff --git a/sql/init.sql b/sql/init.sql
|
||||
index 7da6553..9b6164b 100644
|
||||
--- a/sql/init.sql
|
||||
|
||||
@@ -27,6 +27,7 @@ fail.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
hostname-validator = "1.1"
|
||||
indexmap.workspace = true
|
||||
itertools.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
|
||||
@@ -759,10 +759,15 @@ impl ComputeNode {
|
||||
// Configure and start rsyslog for compliance audit logging
|
||||
match pspec.spec.audit_log_level {
|
||||
ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
|
||||
let remote_endpoint =
|
||||
let remote_tls_endpoint =
|
||||
std::env::var("AUDIT_LOGGING_TLS_ENDPOINT").unwrap_or("".to_string());
|
||||
let remote_plain_endpoint =
|
||||
std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
|
||||
if remote_endpoint.is_empty() {
|
||||
anyhow::bail!("AUDIT_LOGGING_ENDPOINT is empty");
|
||||
|
||||
if remote_plain_endpoint.is_empty() && remote_tls_endpoint.is_empty() {
|
||||
anyhow::bail!(
|
||||
"AUDIT_LOGGING_ENDPOINT and AUDIT_LOGGING_TLS_ENDPOINT are both empty"
|
||||
);
|
||||
}
|
||||
|
||||
let log_directory_path = Path::new(&self.params.pgdata).join("log");
|
||||
@@ -778,7 +783,8 @@ impl ComputeNode {
|
||||
log_directory_path.clone(),
|
||||
endpoint_id,
|
||||
project_id,
|
||||
&remote_endpoint,
|
||||
&remote_plain_endpoint,
|
||||
&remote_tls_endpoint,
|
||||
)?;
|
||||
|
||||
// Launch a background task to clean up the audit logs
|
||||
|
||||
@@ -10,7 +10,13 @@ input(type="imfile" File="{log_directory}/*.log"
|
||||
startmsg.regex="^[[:digit:]]{{4}}-[[:digit:]]{{2}}-[[:digit:]]{{2}} [[:digit:]]{{2}}:[[:digit:]]{{2}}:[[:digit:]]{{2}}.[[:digit:]]{{3}} GMT,")
|
||||
|
||||
# the directory to store rsyslog state files
|
||||
global(workDirectory="/var/log/rsyslog")
|
||||
global(
|
||||
workDirectory="/var/log/rsyslog"
|
||||
DefaultNetstreamDriverCAFile="/etc/ssl/certs/ca-certificates.crt"
|
||||
)
|
||||
|
||||
# Whether the remote syslog receiver uses tls
|
||||
set $.remote_syslog_tls = "{remote_syslog_tls}";
|
||||
|
||||
# Construct json, endpoint_id and project_id as additional metadata
|
||||
set $.json_log!endpoint_id = "{endpoint_id}";
|
||||
@@ -21,5 +27,29 @@ set $.json_log!msg = $msg;
|
||||
template(name="PgAuditLog" type="string"
|
||||
string="<%PRI%>1 %TIMESTAMP:::date-rfc3339% %HOSTNAME% - - - - %$.json_log%")
|
||||
|
||||
# Forward to remote syslog receiver (@@<hostname>:<port>;format
|
||||
local5.info @@{remote_endpoint};PgAuditLog
|
||||
# Forward to remote syslog receiver (over TLS)
|
||||
if ( $syslogtag == 'pgaudit_log' ) then {{
|
||||
if ( $.remote_syslog_tls == 'true' ) then {{
|
||||
action(type="omfwd" target="{remote_syslog_host}" port="{remote_syslog_port}" protocol="tcp"
|
||||
template="PgAuditLog"
|
||||
queue.type="linkedList"
|
||||
queue.size="1000"
|
||||
action.ResumeRetryCount="10"
|
||||
StreamDriver="gtls"
|
||||
StreamDriverMode="1"
|
||||
StreamDriverAuthMode="x509/name"
|
||||
StreamDriverPermittedPeers="{remote_syslog_host}"
|
||||
StreamDriver.CheckExtendedKeyPurpose="on"
|
||||
StreamDriver.PermitExpiredCerts="off"
|
||||
)
|
||||
stop
|
||||
}} else {{
|
||||
action(type="omfwd" target="{remote_syslog_host}" port="{remote_syslog_port}" protocol="tcp"
|
||||
template="PgAuditLog"
|
||||
queue.type="linkedList"
|
||||
queue.size="1000"
|
||||
action.ResumeRetryCount="10"
|
||||
)
|
||||
stop
|
||||
}}
|
||||
}}
|
||||
|
||||
@@ -4,8 +4,10 @@ use std::path::Path;
|
||||
use std::process::Command;
|
||||
use std::time::Duration;
|
||||
use std::{fs::OpenOptions, io::Write};
|
||||
use url::{Host, Url};
|
||||
|
||||
use anyhow::{Context, Result, anyhow};
|
||||
use hostname_validator;
|
||||
use tracing::{error, info, instrument, warn};
|
||||
|
||||
const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
|
||||
@@ -82,18 +84,84 @@ fn restart_rsyslog() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parse_audit_syslog_address(
|
||||
remote_plain_endpoint: &str,
|
||||
remote_tls_endpoint: &str,
|
||||
) -> Result<(String, u16, String)> {
|
||||
let tls;
|
||||
let remote_endpoint = if !remote_tls_endpoint.is_empty() {
|
||||
tls = "true".to_string();
|
||||
remote_tls_endpoint
|
||||
} else {
|
||||
tls = "false".to_string();
|
||||
remote_plain_endpoint
|
||||
};
|
||||
// Urlify the remote_endpoint, so parsing can be done with url::Url.
|
||||
let url_str = format!("http://{remote_endpoint}");
|
||||
let url = Url::parse(&url_str).map_err(|err| {
|
||||
anyhow!("Error parsing {remote_endpoint}, expected host:port, got {err:?}")
|
||||
})?;
|
||||
|
||||
let is_valid = url.scheme() == "http"
|
||||
&& url.path() == "/"
|
||||
&& url.query().is_none()
|
||||
&& url.fragment().is_none()
|
||||
&& url.username() == ""
|
||||
&& url.password().is_none();
|
||||
|
||||
if !is_valid {
|
||||
return Err(anyhow!(
|
||||
"Invalid address format {remote_endpoint}, expected host:port"
|
||||
));
|
||||
}
|
||||
let host = match url.host() {
|
||||
Some(Host::Domain(h)) if hostname_validator::is_valid(h) => h.to_string(),
|
||||
Some(Host::Ipv4(ip4)) => ip4.to_string(),
|
||||
Some(Host::Ipv6(ip6)) => ip6.to_string(),
|
||||
_ => return Err(anyhow!("Invalid host")),
|
||||
};
|
||||
let port = url
|
||||
.port()
|
||||
.ok_or_else(|| anyhow!("Invalid port in {remote_endpoint}"))?;
|
||||
|
||||
Ok((host, port, tls))
|
||||
}
|
||||
|
||||
fn generate_audit_rsyslog_config(
|
||||
log_directory: String,
|
||||
endpoint_id: &str,
|
||||
project_id: &str,
|
||||
remote_syslog_host: &str,
|
||||
remote_syslog_port: u16,
|
||||
remote_syslog_tls: &str,
|
||||
) -> String {
|
||||
format!(
|
||||
include_str!("config_template/compute_audit_rsyslog_template.conf"),
|
||||
log_directory = log_directory,
|
||||
endpoint_id = endpoint_id,
|
||||
project_id = project_id,
|
||||
remote_syslog_host = remote_syslog_host,
|
||||
remote_syslog_port = remote_syslog_port,
|
||||
remote_syslog_tls = remote_syslog_tls
|
||||
)
|
||||
}
|
||||
|
||||
pub fn configure_audit_rsyslog(
|
||||
log_directory: String,
|
||||
endpoint_id: &str,
|
||||
project_id: &str,
|
||||
remote_endpoint: &str,
|
||||
remote_tls_endpoint: &str,
|
||||
) -> Result<()> {
|
||||
let config_content: String = format!(
|
||||
include_str!("config_template/compute_audit_rsyslog_template.conf"),
|
||||
log_directory = log_directory,
|
||||
endpoint_id = endpoint_id,
|
||||
project_id = project_id,
|
||||
remote_endpoint = remote_endpoint
|
||||
let (remote_syslog_host, remote_syslog_port, remote_syslog_tls) =
|
||||
parse_audit_syslog_address(remote_endpoint, remote_tls_endpoint).unwrap();
|
||||
let config_content = generate_audit_rsyslog_config(
|
||||
log_directory,
|
||||
endpoint_id,
|
||||
project_id,
|
||||
&remote_syslog_host,
|
||||
remote_syslog_port,
|
||||
&remote_syslog_tls,
|
||||
);
|
||||
|
||||
info!("rsyslog config_content: {}", config_content);
|
||||
@@ -258,6 +326,8 @@ pub fn launch_pgaudit_gc(log_directory: String) {
|
||||
mod tests {
|
||||
use crate::rsyslog::PostgresLogsRsyslogConfig;
|
||||
|
||||
use super::{generate_audit_rsyslog_config, parse_audit_syslog_address};
|
||||
|
||||
#[test]
|
||||
fn test_postgres_logs_config() {
|
||||
{
|
||||
@@ -287,4 +357,146 @@ mod tests {
|
||||
assert!(res.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_audit_syslog_address() {
|
||||
{
|
||||
// host:port format (plaintext)
|
||||
let parsed = parse_audit_syslog_address("collector.host.tld:5555", "");
|
||||
assert!(parsed.is_ok());
|
||||
assert_eq!(
|
||||
parsed.unwrap(),
|
||||
(
|
||||
String::from("collector.host.tld"),
|
||||
5555,
|
||||
String::from("false")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
// host:port format with ipv4 ip address (plaintext)
|
||||
let parsed = parse_audit_syslog_address("10.0.0.1:5555", "");
|
||||
assert!(parsed.is_ok());
|
||||
assert_eq!(
|
||||
parsed.unwrap(),
|
||||
(String::from("10.0.0.1"), 5555, String::from("false"))
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
// host:port format with ipv6 ip address (plaintext)
|
||||
let parsed =
|
||||
parse_audit_syslog_address("[7e60:82ed:cb2e:d617:f904:f395:aaca:e252]:5555", "");
|
||||
assert_eq!(
|
||||
parsed.unwrap(),
|
||||
(
|
||||
String::from("7e60:82ed:cb2e:d617:f904:f395:aaca:e252"),
|
||||
5555,
|
||||
String::from("false")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
// Only TLS host:port defined
|
||||
let parsed = parse_audit_syslog_address("", "tls.host.tld:5556");
|
||||
assert_eq!(
|
||||
parsed.unwrap(),
|
||||
(String::from("tls.host.tld"), 5556, String::from("true"))
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
// tls host should take precedence, when both defined
|
||||
let parsed = parse_audit_syslog_address("plaintext.host.tld:5555", "tls.host.tld:5556");
|
||||
assert_eq!(
|
||||
parsed.unwrap(),
|
||||
(String::from("tls.host.tld"), 5556, String::from("true"))
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
// host without port (plaintext)
|
||||
let parsed = parse_audit_syslog_address("collector.host.tld", "");
|
||||
assert!(parsed.is_err());
|
||||
}
|
||||
|
||||
{
|
||||
// port without host
|
||||
let parsed = parse_audit_syslog_address(":5555", "");
|
||||
assert!(parsed.is_err());
|
||||
}
|
||||
|
||||
{
|
||||
// valid host with invalid port
|
||||
let parsed = parse_audit_syslog_address("collector.host.tld:90001", "");
|
||||
assert!(parsed.is_err());
|
||||
}
|
||||
|
||||
{
|
||||
// invalid hostname with valid port
|
||||
let parsed = parse_audit_syslog_address("-collector.host.tld:5555", "");
|
||||
assert!(parsed.is_err());
|
||||
}
|
||||
|
||||
{
|
||||
// parse error
|
||||
let parsed = parse_audit_syslog_address("collector.host.tld:::5555", "");
|
||||
assert!(parsed.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_generate_audit_rsyslog_config() {
|
||||
{
|
||||
// plaintext version
|
||||
let log_directory = "/tmp/log".to_string();
|
||||
let endpoint_id = "ep-test-endpoint-id";
|
||||
let project_id = "test-project-id";
|
||||
let remote_syslog_host = "collector.host.tld";
|
||||
let remote_syslog_port = 5555;
|
||||
let remote_syslog_tls = "false";
|
||||
|
||||
let conf_str = generate_audit_rsyslog_config(
|
||||
log_directory,
|
||||
endpoint_id,
|
||||
project_id,
|
||||
remote_syslog_host,
|
||||
remote_syslog_port,
|
||||
remote_syslog_tls,
|
||||
);
|
||||
|
||||
assert!(conf_str.contains(r#"set $.remote_syslog_tls = "false";"#));
|
||||
assert!(conf_str.contains(r#"type="omfwd""#));
|
||||
assert!(conf_str.contains(r#"target="collector.host.tld""#));
|
||||
assert!(conf_str.contains(r#"port="5555""#));
|
||||
assert!(conf_str.contains(r#"StreamDriverPermittedPeers="collector.host.tld""#));
|
||||
}
|
||||
|
||||
{
|
||||
// TLS version
|
||||
let log_directory = "/tmp/log".to_string();
|
||||
let endpoint_id = "ep-test-endpoint-id";
|
||||
let project_id = "test-project-id";
|
||||
let remote_syslog_host = "collector.host.tld";
|
||||
let remote_syslog_port = 5556;
|
||||
let remote_syslog_tls = "true";
|
||||
|
||||
let conf_str = generate_audit_rsyslog_config(
|
||||
log_directory,
|
||||
endpoint_id,
|
||||
project_id,
|
||||
remote_syslog_host,
|
||||
remote_syslog_port,
|
||||
remote_syslog_tls,
|
||||
);
|
||||
|
||||
assert!(conf_str.contains(r#"set $.remote_syslog_tls = "true";"#));
|
||||
assert!(conf_str.contains(r#"type="omfwd""#));
|
||||
assert!(conf_str.contains(r#"target="collector.host.tld""#));
|
||||
assert!(conf_str.contains(r#"port="5556""#));
|
||||
assert!(conf_str.contains(r#"StreamDriverPermittedPeers="collector.host.tld""#));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,12 +6,13 @@ use posthog_client_lite::{
|
||||
CaptureEvent, FeatureResolverBackgroundLoop, PostHogEvaluationError,
|
||||
PostHogFlagFilterPropertyValue,
|
||||
};
|
||||
use rand::Rng;
|
||||
use remote_storage::RemoteStorageKind;
|
||||
use serde_json::json;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TenantId;
|
||||
|
||||
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION};
|
||||
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION, tenant::TenantShard};
|
||||
|
||||
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
|
||||
|
||||
@@ -138,6 +139,7 @@ impl FeatureResolver {
|
||||
}
|
||||
Arc::new(properties)
|
||||
};
|
||||
|
||||
let fake_tenants = {
|
||||
let mut tenants = Vec::new();
|
||||
for i in 0..10 {
|
||||
@@ -147,9 +149,16 @@ impl FeatureResolver {
|
||||
conf.id,
|
||||
i
|
||||
);
|
||||
|
||||
let tenant_properties = PerTenantProperties {
|
||||
remote_size_mb: Some(rand::thread_rng().gen_range(100.0..1000000.00)),
|
||||
}
|
||||
.into_posthog_properties();
|
||||
|
||||
let properties = Self::collect_properties_inner(
|
||||
distinct_id.clone(),
|
||||
Some(&internal_properties),
|
||||
&tenant_properties,
|
||||
);
|
||||
tenants.push(CaptureEvent {
|
||||
event: "initial_tenant_report".to_string(),
|
||||
@@ -183,6 +192,7 @@ impl FeatureResolver {
|
||||
fn collect_properties_inner(
|
||||
tenant_id: String,
|
||||
internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
|
||||
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> HashMap<String, PostHogFlagFilterPropertyValue> {
|
||||
let mut properties = HashMap::new();
|
||||
if let Some(internal_properties) = internal_properties {
|
||||
@@ -194,6 +204,9 @@ impl FeatureResolver {
|
||||
"tenant_id".to_string(),
|
||||
PostHogFlagFilterPropertyValue::String(tenant_id),
|
||||
);
|
||||
for (key, value) in tenant_properties.iter() {
|
||||
properties.insert(key.clone(), value.clone());
|
||||
}
|
||||
properties
|
||||
}
|
||||
|
||||
@@ -201,8 +214,13 @@ impl FeatureResolver {
|
||||
pub(crate) fn collect_properties(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> HashMap<String, PostHogFlagFilterPropertyValue> {
|
||||
Self::collect_properties_inner(tenant_id.to_string(), self.internal_properties.as_deref())
|
||||
Self::collect_properties_inner(
|
||||
tenant_id.to_string(),
|
||||
self.internal_properties.as_deref(),
|
||||
tenant_properties,
|
||||
)
|
||||
}
|
||||
|
||||
/// Evaluate a multivariate feature flag. Currently, we do not support any properties.
|
||||
@@ -214,6 +232,7 @@ impl FeatureResolver {
|
||||
&self,
|
||||
flag_key: &str,
|
||||
tenant_id: TenantId,
|
||||
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> Result<String, PostHogEvaluationError> {
|
||||
let force_overrides = self.force_overrides_for_testing.load();
|
||||
if let Some(value) = force_overrides.get(flag_key) {
|
||||
@@ -224,7 +243,7 @@ impl FeatureResolver {
|
||||
let res = inner.feature_store().evaluate_multivariate(
|
||||
flag_key,
|
||||
&tenant_id.to_string(),
|
||||
&self.collect_properties(tenant_id),
|
||||
&self.collect_properties(tenant_id, tenant_properties),
|
||||
);
|
||||
match &res {
|
||||
Ok(value) => {
|
||||
@@ -257,6 +276,7 @@ impl FeatureResolver {
|
||||
&self,
|
||||
flag_key: &str,
|
||||
tenant_id: TenantId,
|
||||
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> Result<(), PostHogEvaluationError> {
|
||||
let force_overrides = self.force_overrides_for_testing.load();
|
||||
if let Some(value) = force_overrides.get(flag_key) {
|
||||
@@ -271,7 +291,7 @@ impl FeatureResolver {
|
||||
let res = inner.feature_store().evaluate_boolean(
|
||||
flag_key,
|
||||
&tenant_id.to_string(),
|
||||
&self.collect_properties(tenant_id),
|
||||
&self.collect_properties(tenant_id, tenant_properties),
|
||||
);
|
||||
match &res {
|
||||
Ok(()) => {
|
||||
@@ -317,3 +337,78 @@ impl FeatureResolver {
|
||||
.store(Arc::new(force_overrides));
|
||||
}
|
||||
}
|
||||
|
||||
struct PerTenantProperties {
|
||||
pub remote_size_mb: Option<f64>,
|
||||
}
|
||||
|
||||
impl PerTenantProperties {
|
||||
pub fn into_posthog_properties(self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
|
||||
let mut properties = HashMap::new();
|
||||
if let Some(remote_size_mb) = self.remote_size_mb {
|
||||
properties.insert(
|
||||
"tenant_remote_size_mb".to_string(),
|
||||
PostHogFlagFilterPropertyValue::Number(remote_size_mb),
|
||||
);
|
||||
}
|
||||
properties
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TenantFeatureResolver {
|
||||
inner: FeatureResolver,
|
||||
tenant_id: TenantId,
|
||||
cached_tenant_properties: Arc<ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>>,
|
||||
}
|
||||
|
||||
impl TenantFeatureResolver {
|
||||
pub fn new(inner: FeatureResolver, tenant_id: TenantId) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
tenant_id,
|
||||
cached_tenant_properties: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn evaluate_multivariate(&self, flag_key: &str) -> Result<String, PostHogEvaluationError> {
|
||||
self.inner.evaluate_multivariate(
|
||||
flag_key,
|
||||
self.tenant_id,
|
||||
&self.cached_tenant_properties.load(),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn evaluate_boolean(&self, flag_key: &str) -> Result<(), PostHogEvaluationError> {
|
||||
self.inner.evaluate_boolean(
|
||||
flag_key,
|
||||
self.tenant_id,
|
||||
&self.cached_tenant_properties.load(),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn collect_properties(&self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
|
||||
self.inner
|
||||
.collect_properties(self.tenant_id, &self.cached_tenant_properties.load())
|
||||
}
|
||||
|
||||
pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
|
||||
self.inner.is_feature_flag_boolean(flag_key)
|
||||
}
|
||||
|
||||
pub fn update_cached_tenant_properties(&self, tenant_shard: &TenantShard) {
|
||||
let mut remote_size_mb = None;
|
||||
for timeline in tenant_shard.list_timelines() {
|
||||
let size = timeline.metrics.resident_physical_size_get();
|
||||
if size == 0 {
|
||||
remote_size_mb = None;
|
||||
}
|
||||
if let Some(ref mut remote_size_mb) = remote_size_mb {
|
||||
*remote_size_mb += size as f64 / 1024.0 / 1024.0;
|
||||
}
|
||||
}
|
||||
self.cached_tenant_properties.store(Arc::new(
|
||||
PerTenantProperties { remote_size_mb }.into_posthog_properties(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3697,23 +3697,25 @@ async fn tenant_evaluate_feature_flag(
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
let properties = tenant.feature_resolver.collect_properties(tenant_shard_id.tenant_id);
|
||||
// TODO: the properties we get here might be stale right after it is collected. But such races are rare (updated every 10s)
|
||||
// and we don't need to worry about it for now.
|
||||
let properties = tenant.feature_resolver.collect_properties();
|
||||
if as_type.as_deref() == Some("boolean") {
|
||||
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
|
||||
let result = tenant.feature_resolver.evaluate_boolean(&flag);
|
||||
let result = result.map(|_| true).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
|
||||
} else if as_type.as_deref() == Some("multivariate") {
|
||||
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
|
||||
let result = tenant.feature_resolver.evaluate_multivariate(&flag).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
|
||||
} else {
|
||||
// Auto infer the type of the feature flag.
|
||||
let is_boolean = tenant.feature_resolver.is_feature_flag_boolean(&flag).map_err(|e| ApiError::InternalServerError(anyhow::anyhow!("{e}")))?;
|
||||
if is_boolean {
|
||||
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
|
||||
let result = tenant.feature_resolver.evaluate_boolean(&flag);
|
||||
let result = result.map(|_| true).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
|
||||
} else {
|
||||
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
|
||||
let result = tenant.feature_resolver.evaluate_multivariate(&flag).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,7 +86,7 @@ use crate::context;
|
||||
use crate::context::RequestContextBuilder;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
|
||||
use crate::feature_resolver::FeatureResolver;
|
||||
use crate::feature_resolver::{FeatureResolver, TenantFeatureResolver};
|
||||
use crate::l0_flush::L0FlushGlobalState;
|
||||
use crate::metrics::{
|
||||
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
|
||||
@@ -386,7 +386,7 @@ pub struct TenantShard {
|
||||
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
|
||||
pub(crate) feature_resolver: FeatureResolver,
|
||||
pub(crate) feature_resolver: TenantFeatureResolver,
|
||||
}
|
||||
impl std::fmt::Debug for TenantShard {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
@@ -3263,7 +3263,7 @@ impl TenantShard {
|
||||
};
|
||||
let gc_compaction_strategy = self
|
||||
.feature_resolver
|
||||
.evaluate_multivariate("gc-comapction-strategy", self.tenant_shard_id.tenant_id)
|
||||
.evaluate_multivariate("gc-comapction-strategy")
|
||||
.ok();
|
||||
let span = if let Some(gc_compaction_strategy) = gc_compaction_strategy {
|
||||
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id, strategy = %gc_compaction_strategy)
|
||||
@@ -3408,6 +3408,9 @@ impl TenantShard {
|
||||
if let Some(ref walredo_mgr) = self.walredo_mgr {
|
||||
walredo_mgr.maybe_quiesce(WALREDO_IDLE_TIMEOUT);
|
||||
}
|
||||
|
||||
// Update the feature resolver with the latest tenant-spcific data.
|
||||
self.feature_resolver.update_cached_tenant_properties(self);
|
||||
}
|
||||
|
||||
pub fn timeline_has_no_attached_children(&self, timeline_id: TimelineId) -> bool {
|
||||
@@ -4490,7 +4493,10 @@ impl TenantShard {
|
||||
gc_block: Default::default(),
|
||||
l0_flush_global_state,
|
||||
basebackup_cache,
|
||||
feature_resolver,
|
||||
feature_resolver: TenantFeatureResolver::new(
|
||||
feature_resolver,
|
||||
tenant_shard_id.tenant_id,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -182,7 +182,7 @@ impl BatchLayerWriter {
|
||||
/// An image writer that takes images and produces multiple image layers.
|
||||
#[must_use]
|
||||
pub struct SplitImageLayerWriter<'a> {
|
||||
inner: ImageLayerWriter,
|
||||
inner: Option<ImageLayerWriter>,
|
||||
target_layer_size: u64,
|
||||
lsn: Lsn,
|
||||
conf: &'static PageServerConf,
|
||||
@@ -196,7 +196,7 @@ pub struct SplitImageLayerWriter<'a> {
|
||||
|
||||
impl<'a> SplitImageLayerWriter<'a> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -205,22 +205,10 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
target_layer_size: u64,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
) -> Self {
|
||||
Self {
|
||||
target_layer_size,
|
||||
// XXX make this lazy like in SplitDeltaLayerWriter?
|
||||
inner: ImageLayerWriter::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
&(start_key..Key::MAX),
|
||||
lsn,
|
||||
gate,
|
||||
cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
inner: None,
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
@@ -229,7 +217,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
start_key,
|
||||
gate,
|
||||
cancel,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn put_image(
|
||||
@@ -238,12 +226,31 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), PutError> {
|
||||
if self.inner.is_none() {
|
||||
self.inner = Some(
|
||||
ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&(self.start_key..Key::MAX),
|
||||
self.lsn,
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(PutError::Other)?,
|
||||
);
|
||||
}
|
||||
|
||||
let inner = self.inner.as_mut().unwrap();
|
||||
|
||||
// The current estimation is an upper bound of the space that the key/image could take
|
||||
// because we did not consider compression in this estimation. The resulting image layer
|
||||
// could be smaller than the target size.
|
||||
let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
|
||||
if self.inner.num_keys() >= 1
|
||||
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
if inner.num_keys() >= 1
|
||||
&& inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
{
|
||||
let next_image_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
@@ -257,7 +264,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
)
|
||||
.await
|
||||
.map_err(PutError::Other)?;
|
||||
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
|
||||
let prev_image_writer = std::mem::replace(inner, next_image_writer);
|
||||
self.batches.add_unfinished_image_writer(
|
||||
prev_image_writer,
|
||||
self.start_key..key,
|
||||
@@ -265,7 +272,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
);
|
||||
self.start_key = key;
|
||||
}
|
||||
self.inner.put_image(key, img, ctx).await
|
||||
inner.put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish_with_discard_fn<D, F>(
|
||||
@@ -282,8 +289,10 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
let Self {
|
||||
mut batches, inner, ..
|
||||
} = self;
|
||||
if inner.num_keys() != 0 {
|
||||
batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
|
||||
if let Some(inner) = inner {
|
||||
if inner.num_keys() != 0 {
|
||||
batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
|
||||
}
|
||||
}
|
||||
batches.finish_with_discard_fn(tline, ctx, discard_fn).await
|
||||
}
|
||||
@@ -498,10 +507,7 @@ mod tests {
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
);
|
||||
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
@@ -577,10 +583,7 @@ mod tests {
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
);
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
@@ -676,10 +679,7 @@ mod tests {
|
||||
4 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
);
|
||||
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
|
||||
@@ -106,7 +106,7 @@ use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
|
||||
use crate::feature_resolver::FeatureResolver;
|
||||
use crate::feature_resolver::TenantFeatureResolver;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
use crate::metrics::{
|
||||
@@ -202,7 +202,7 @@ pub struct TimelineResources {
|
||||
pub l0_compaction_trigger: Arc<Notify>,
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
pub basebackup_cache: Arc<BasebackupCache>,
|
||||
pub feature_resolver: FeatureResolver,
|
||||
pub feature_resolver: TenantFeatureResolver,
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
@@ -450,7 +450,7 @@ pub struct Timeline {
|
||||
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
|
||||
feature_resolver: FeatureResolver,
|
||||
feature_resolver: TenantFeatureResolver,
|
||||
}
|
||||
|
||||
pub(crate) enum PreviousHeatmap {
|
||||
@@ -5308,6 +5308,7 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
img_range: Range<Key>,
|
||||
io_concurrency: IoConcurrency,
|
||||
progress: Option<(usize, usize)>,
|
||||
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
|
||||
let mut wrote_keys = false;
|
||||
|
||||
@@ -5384,11 +5385,15 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let progress_report = progress
|
||||
.map(|(idx, total)| format!("({idx}/{total}) "))
|
||||
.unwrap_or_default();
|
||||
if wrote_keys {
|
||||
// Normal path: we have written some data into the new image layer for this
|
||||
// partition, so flush it to disk.
|
||||
info!(
|
||||
"produced image layer for rel {}",
|
||||
"{} produced image layer for rel {}",
|
||||
progress_report,
|
||||
ImageLayerName {
|
||||
key_range: img_range.clone(),
|
||||
lsn
|
||||
@@ -5398,7 +5403,12 @@ impl Timeline {
|
||||
unfinished_image_layer: image_layer_writer,
|
||||
})
|
||||
} else {
|
||||
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
|
||||
tracing::debug!(
|
||||
"{} no data in range {}-{}",
|
||||
progress_report,
|
||||
img_range.start,
|
||||
img_range.end
|
||||
);
|
||||
Ok(ImageLayerCreationOutcome::Empty)
|
||||
}
|
||||
}
|
||||
@@ -5633,7 +5643,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
for partition in partition_parts.iter() {
|
||||
let total = partition_parts.len();
|
||||
for (idx, partition) in partition_parts.iter().enumerate() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
}
|
||||
@@ -5718,6 +5729,7 @@ impl Timeline {
|
||||
ctx,
|
||||
img_range.clone(),
|
||||
io_concurrency,
|
||||
Some((idx, total)),
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
|
||||
@@ -9,7 +9,7 @@ use std::ops::{Deref, Range};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use super::layer_manager::{LayerManagerLockHolder, LayerManagerReadGuard};
|
||||
use super::layer_manager::LayerManagerLockHolder;
|
||||
use super::{
|
||||
CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
|
||||
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
|
||||
@@ -101,7 +101,11 @@ pub enum GcCompactionQueueItem {
|
||||
/// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
|
||||
auto: bool,
|
||||
},
|
||||
SubCompactionJob(CompactOptions),
|
||||
SubCompactionJob {
|
||||
i: usize,
|
||||
total: usize,
|
||||
options: CompactOptions,
|
||||
},
|
||||
Notify(GcCompactionJobId, Option<Lsn>),
|
||||
}
|
||||
|
||||
@@ -163,7 +167,7 @@ impl GcCompactionQueueItem {
|
||||
running,
|
||||
job_id: id.0,
|
||||
}),
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
|
||||
GcCompactionQueueItem::SubCompactionJob { options, .. } => Some(CompactInfoResponse {
|
||||
compact_key_range: options.compact_key_range,
|
||||
compact_lsn_range: options.compact_lsn_range,
|
||||
sub_compaction: options.sub_compaction,
|
||||
@@ -489,7 +493,7 @@ impl GcCompactionQueue {
|
||||
.map(|job| job.compact_lsn_range.end)
|
||||
.max()
|
||||
.unwrap();
|
||||
for job in jobs {
|
||||
for (i, job) in jobs.into_iter().enumerate() {
|
||||
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
|
||||
// until we do further refactors to allow directly call `compact_with_gc`.
|
||||
let mut flags: EnumSet<CompactFlags> = EnumSet::default();
|
||||
@@ -507,7 +511,11 @@ impl GcCompactionQueue {
|
||||
compact_lsn_range: Some(job.compact_lsn_range.into()),
|
||||
sub_compaction_max_job_size_mb: None,
|
||||
};
|
||||
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
|
||||
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob {
|
||||
options,
|
||||
i,
|
||||
total: jobs_len,
|
||||
});
|
||||
}
|
||||
|
||||
if !auto {
|
||||
@@ -651,7 +659,7 @@ impl GcCompactionQueue {
|
||||
}
|
||||
}
|
||||
}
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => {
|
||||
GcCompactionQueueItem::SubCompactionJob { options, i, total } => {
|
||||
// TODO: error handling, clear the queue if any task fails?
|
||||
let _gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
@@ -663,6 +671,7 @@ impl GcCompactionQueue {
|
||||
)));
|
||||
}
|
||||
};
|
||||
info!("running gc-compaction subcompaction job {}/{}", i, total);
|
||||
let res = timeline.compact_with_options(cancel, options, ctx).await;
|
||||
let compaction_result = match res {
|
||||
Ok(res) => res,
|
||||
@@ -1310,7 +1319,7 @@ impl Timeline {
|
||||
|| cfg!(feature = "testing")
|
||||
|| self
|
||||
.feature_resolver
|
||||
.evaluate_boolean("image-compaction-boundary", self.tenant_shard_id.tenant_id)
|
||||
.evaluate_boolean("image-compaction-boundary")
|
||||
.is_ok()
|
||||
{
|
||||
let last_repartition_lsn = self.partitioning.read().1;
|
||||
@@ -1591,13 +1600,15 @@ impl Timeline {
|
||||
let started = Instant::now();
|
||||
|
||||
let mut replace_image_layers = Vec::new();
|
||||
let total = layers_to_rewrite.len();
|
||||
|
||||
for layer in layers_to_rewrite {
|
||||
for (i, layer) in layers_to_rewrite.into_iter().enumerate() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
|
||||
info!(layer=%layer, "rewriting layer after shard split");
|
||||
info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total);
|
||||
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
@@ -1779,20 +1790,14 @@ impl Timeline {
|
||||
} = {
|
||||
let phase1_span = info_span!("compact_level0_phase1");
|
||||
let ctx = ctx.attached_child();
|
||||
let mut stats = CompactLevel0Phase1StatsBuilder {
|
||||
let stats = CompactLevel0Phase1StatsBuilder {
|
||||
version: Some(2),
|
||||
tenant_id: Some(self.tenant_shard_id),
|
||||
timeline_id: Some(self.timeline_id),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let begin = tokio::time::Instant::now();
|
||||
let phase1_layers_locked = self.layers.read(LayerManagerLockHolder::Compaction).await;
|
||||
let now = tokio::time::Instant::now();
|
||||
stats.read_lock_acquisition_micros =
|
||||
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
|
||||
self.compact_level0_phase1(
|
||||
phase1_layers_locked,
|
||||
stats,
|
||||
target_file_size,
|
||||
force_compaction_ignore_threshold,
|
||||
@@ -1813,16 +1818,19 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
|
||||
async fn compact_level0_phase1<'a>(
|
||||
self: &'a Arc<Self>,
|
||||
guard: LayerManagerReadGuard<'a>,
|
||||
async fn compact_level0_phase1(
|
||||
self: &Arc<Self>,
|
||||
mut stats: CompactLevel0Phase1StatsBuilder,
|
||||
target_file_size: u64,
|
||||
force_compaction_ignore_threshold: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
stats.read_lock_held_spawn_blocking_startup_micros =
|
||||
stats.read_lock_acquisition_micros.till_now(); // set by caller
|
||||
let begin = tokio::time::Instant::now();
|
||||
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
|
||||
let now = tokio::time::Instant::now();
|
||||
stats.read_lock_acquisition_micros =
|
||||
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
|
||||
|
||||
let layers = guard.layer_map()?;
|
||||
let level0_deltas = layers.level0_deltas();
|
||||
stats.level0_deltas_count = Some(level0_deltas.len());
|
||||
@@ -1857,6 +1865,12 @@ impl Timeline {
|
||||
.map(|x| guard.get_from_desc(x))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
drop_layer_manager_rlock(guard);
|
||||
|
||||
// The is the last LSN that we have seen for L0 compaction in the timeline. This LSN might be updated
|
||||
// by the time we finish the compaction. So we need to get it here.
|
||||
let l0_last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Gather the files to compact in this iteration.
|
||||
//
|
||||
// Start with the oldest Level 0 delta file, and collect any other
|
||||
@@ -1944,9 +1958,7 @@ impl Timeline {
|
||||
// we don't accidentally use it later in the function.
|
||||
drop(level0_deltas);
|
||||
|
||||
stats.read_lock_held_prerequisites_micros = stats
|
||||
.read_lock_held_spawn_blocking_startup_micros
|
||||
.till_now();
|
||||
stats.compaction_prerequisites_micros = stats.read_lock_acquisition_micros.till_now();
|
||||
|
||||
// TODO: replace with streaming k-merge
|
||||
let all_keys = {
|
||||
@@ -1968,7 +1980,7 @@ impl Timeline {
|
||||
all_keys
|
||||
};
|
||||
|
||||
stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
|
||||
stats.read_lock_held_key_sort_micros = stats.compaction_prerequisites_micros.till_now();
|
||||
|
||||
// Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
|
||||
//
|
||||
@@ -2002,7 +2014,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
@@ -2021,8 +2032,12 @@ impl Timeline {
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size =
|
||||
layers.image_coverage(&key_range, last_record_lsn).len();
|
||||
let coverage_size = {
|
||||
// TODO: optimize this with copy-on-write layer map.
|
||||
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
|
||||
let layers = guard.layer_map()?;
|
||||
layers.image_coverage(&key_range, l0_last_record_lsn).len()
|
||||
};
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
@@ -2041,7 +2056,6 @@ impl Timeline {
|
||||
holes
|
||||
};
|
||||
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
|
||||
drop_layer_manager_rlock(guard);
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
@@ -2382,9 +2396,8 @@ struct CompactLevel0Phase1StatsBuilder {
|
||||
tenant_id: Option<TenantShardId>,
|
||||
timeline_id: Option<TimelineId>,
|
||||
read_lock_acquisition_micros: DurationRecorder,
|
||||
read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
|
||||
read_lock_held_key_sort_micros: DurationRecorder,
|
||||
read_lock_held_prerequisites_micros: DurationRecorder,
|
||||
compaction_prerequisites_micros: DurationRecorder,
|
||||
read_lock_held_compute_holes_micros: DurationRecorder,
|
||||
read_lock_drop_micros: DurationRecorder,
|
||||
write_layer_files_micros: DurationRecorder,
|
||||
@@ -2399,9 +2412,8 @@ struct CompactLevel0Phase1Stats {
|
||||
tenant_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
read_lock_acquisition_micros: RecordedDuration,
|
||||
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
|
||||
read_lock_held_key_sort_micros: RecordedDuration,
|
||||
read_lock_held_prerequisites_micros: RecordedDuration,
|
||||
compaction_prerequisites_micros: RecordedDuration,
|
||||
read_lock_held_compute_holes_micros: RecordedDuration,
|
||||
read_lock_drop_micros: RecordedDuration,
|
||||
write_layer_files_micros: RecordedDuration,
|
||||
@@ -2426,16 +2438,12 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
|
||||
.read_lock_acquisition_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
|
||||
read_lock_held_spawn_blocking_startup_micros: value
|
||||
.read_lock_held_spawn_blocking_startup_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
|
||||
read_lock_held_key_sort_micros: value
|
||||
.read_lock_held_key_sort_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
|
||||
read_lock_held_prerequisites_micros: value
|
||||
.read_lock_held_prerequisites_micros
|
||||
compaction_prerequisites_micros: value
|
||||
.compaction_prerequisites_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
|
||||
read_lock_held_compute_holes_micros: value
|
||||
@@ -3503,22 +3511,16 @@ impl Timeline {
|
||||
// Only create image layers when there is no ancestor branches. TODO: create covering image layer
|
||||
// when some condition meet.
|
||||
let mut image_layer_writer = if !has_data_below {
|
||||
Some(
|
||||
SplitImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
job_desc.compaction_key_range.start,
|
||||
lowest_retain_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("failed to create image layer writer")
|
||||
.map_err(CompactionError::Other)?,
|
||||
)
|
||||
Some(SplitImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
job_desc.compaction_key_range.start,
|
||||
lowest_retain_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -4352,6 +4354,7 @@ impl TimelineAdaptor {
|
||||
ctx,
|
||||
key_range.clone(),
|
||||
IoConcurrency::sequential(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -64,13 +64,6 @@ impl Pipeline {
|
||||
let responses = self.replies;
|
||||
let batch_size = self.inner.len();
|
||||
|
||||
if !client.credentials_refreshed() {
|
||||
tracing::debug!(
|
||||
"Redis credentials are not refreshed. Sleeping for 5 seconds before retrying..."
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
|
||||
match client.query(&self.inner).await {
|
||||
// for each reply, we expect that many values.
|
||||
Ok(Value::Array(values)) if values.len() == responses => {
|
||||
@@ -134,14 +127,6 @@ impl QueueProcessing for CancellationProcessor {
|
||||
}
|
||||
|
||||
async fn apply(&mut self, batch: Vec<Self::Req>) -> Vec<Self::Res> {
|
||||
if !self.client.credentials_refreshed() {
|
||||
// this will cause a timeout for cancellation operations
|
||||
tracing::debug!(
|
||||
"Redis credentials are not refreshed. Sleeping for 5 seconds before retrying..."
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
|
||||
let mut pipeline = Pipeline::with_capacity(batch.len());
|
||||
|
||||
let batch_size = batch.len();
|
||||
|
||||
@@ -78,16 +78,6 @@ pub(crate) trait ReportableError: fmt::Display + Send + 'static {
|
||||
fn get_error_kind(&self) -> ErrorKind;
|
||||
}
|
||||
|
||||
impl ReportableError for postgres_client::error::Error {
|
||||
fn get_error_kind(&self) -> ErrorKind {
|
||||
if self.as_db_error().is_some() {
|
||||
ErrorKind::Postgres
|
||||
} else {
|
||||
ErrorKind::Compute
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Flattens `Result<Result<T>>` into `Result<T>`.
|
||||
pub fn flatten_err<T>(r: Result<anyhow::Result<T>, JoinError>) -> anyhow::Result<T> {
|
||||
r.context("join error").and_then(|x| x)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::FutureExt;
|
||||
@@ -33,7 +33,6 @@ pub struct ConnectionWithCredentialsProvider {
|
||||
con: Option<MultiplexedConnection>,
|
||||
refresh_token_task: Option<JoinHandle<()>>,
|
||||
mutex: tokio::sync::Mutex<()>,
|
||||
credentials_refreshed: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl Clone for ConnectionWithCredentialsProvider {
|
||||
@@ -43,7 +42,6 @@ impl Clone for ConnectionWithCredentialsProvider {
|
||||
con: None,
|
||||
refresh_token_task: None,
|
||||
mutex: tokio::sync::Mutex::new(()),
|
||||
credentials_refreshed: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -67,7 +65,6 @@ impl ConnectionWithCredentialsProvider {
|
||||
con: None,
|
||||
refresh_token_task: None,
|
||||
mutex: tokio::sync::Mutex::new(()),
|
||||
credentials_refreshed: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,7 +78,6 @@ impl ConnectionWithCredentialsProvider {
|
||||
con: None,
|
||||
refresh_token_task: None,
|
||||
mutex: tokio::sync::Mutex::new(()),
|
||||
credentials_refreshed: Arc::new(AtomicBool::new(true)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,10 +85,6 @@ impl ConnectionWithCredentialsProvider {
|
||||
redis::cmd("PING").query_async(con).await
|
||||
}
|
||||
|
||||
pub(crate) fn credentials_refreshed(&self) -> bool {
|
||||
self.credentials_refreshed.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub(crate) async fn connect(&mut self) -> anyhow::Result<()> {
|
||||
let _guard = self.mutex.lock().await;
|
||||
if let Some(con) = self.con.as_mut() {
|
||||
@@ -120,15 +112,11 @@ impl ConnectionWithCredentialsProvider {
|
||||
if let Credentials::Dynamic(credentials_provider, _) = &self.credentials {
|
||||
let credentials_provider = credentials_provider.clone();
|
||||
let con2 = con.clone();
|
||||
let credentials_refreshed = self.credentials_refreshed.clone();
|
||||
let f = tokio::spawn(async move {
|
||||
let result = Self::keep_connection(con2, credentials_provider).await;
|
||||
if let Err(e) = result {
|
||||
credentials_refreshed.store(false, Ordering::Release);
|
||||
debug!("keep_connection failed: {e}");
|
||||
} else {
|
||||
credentials_refreshed.store(true, Ordering::Release);
|
||||
}
|
||||
Self::keep_connection(con2, credentials_provider)
|
||||
.await
|
||||
.inspect_err(|e| debug!("keep_connection failed: {e}"))
|
||||
.ok();
|
||||
});
|
||||
self.refresh_token_task = Some(f);
|
||||
}
|
||||
|
||||
@@ -40,10 +40,6 @@ impl RedisKVClient {
|
||||
.inspect_err(|e| tracing::error!("failed to connect to redis: {e}"))
|
||||
}
|
||||
|
||||
pub(crate) fn credentials_refreshed(&self) -> bool {
|
||||
self.client.credentials_refreshed()
|
||||
}
|
||||
|
||||
pub(crate) async fn query<T: FromRedisValue>(
|
||||
&mut self,
|
||||
q: &impl Queryable,
|
||||
@@ -53,7 +49,7 @@ impl RedisKVClient {
|
||||
Err(e) => e,
|
||||
};
|
||||
|
||||
tracing::debug!("failed to run query: {e}");
|
||||
tracing::error!("failed to run query: {e}");
|
||||
match e.retry_method() {
|
||||
redis::RetryMethod::Reconnect => {
|
||||
tracing::info!("Redis client is disconnected. Reconnecting...");
|
||||
|
||||
@@ -404,7 +404,15 @@ impl ReportableError for HttpConnError {
|
||||
fn get_error_kind(&self) -> ErrorKind {
|
||||
match self {
|
||||
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
|
||||
HttpConnError::PostgresConnectionError(p) => p.get_error_kind(),
|
||||
HttpConnError::PostgresConnectionError(p) => {
|
||||
if p.as_db_error().is_some() {
|
||||
// postgres rejected the connection
|
||||
ErrorKind::Postgres
|
||||
} else {
|
||||
// couldn't even reach postgres
|
||||
ErrorKind::Compute
|
||||
}
|
||||
}
|
||||
HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
|
||||
HttpConnError::ComputeCtl(_) => ErrorKind::Service,
|
||||
HttpConnError::JwtPayloadError(_) => ErrorKind::User,
|
||||
|
||||
@@ -22,7 +22,7 @@ use serde_json::Value;
|
||||
use serde_json::value::RawValue;
|
||||
use tokio::time::{self, Instant};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info};
|
||||
use tracing::{Level, debug, error, info};
|
||||
use typed_json::json;
|
||||
use url::Url;
|
||||
use uuid::Uuid;
|
||||
@@ -390,12 +390,35 @@ pub(crate) async fn handle(
|
||||
let line = get(db_error, |db| db.line().map(|l| l.to_string()));
|
||||
let routine = get(db_error, |db| db.routine());
|
||||
|
||||
tracing::info!(
|
||||
kind=error_kind.to_metric_label(),
|
||||
error=%e,
|
||||
msg=message,
|
||||
"forwarding error to user"
|
||||
);
|
||||
match &e {
|
||||
SqlOverHttpError::Postgres(e)
|
||||
if e.as_db_error().is_some() && error_kind == ErrorKind::User =>
|
||||
{
|
||||
// this error contains too much info, and it's not an error we care about.
|
||||
if tracing::enabled!(Level::DEBUG) {
|
||||
tracing::debug!(
|
||||
kind=error_kind.to_metric_label(),
|
||||
error=%e,
|
||||
msg=message,
|
||||
"forwarding error to user"
|
||||
);
|
||||
} else {
|
||||
tracing::info!(
|
||||
kind = error_kind.to_metric_label(),
|
||||
error = "bad query",
|
||||
"forwarding error to user"
|
||||
);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
tracing::info!(
|
||||
kind=error_kind.to_metric_label(),
|
||||
error=%e,
|
||||
msg=message,
|
||||
"forwarding error to user"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
json_response(
|
||||
e.get_http_status_code(),
|
||||
@@ -460,7 +483,15 @@ impl ReportableError for SqlOverHttpError {
|
||||
SqlOverHttpError::ConnInfo(e) => e.get_error_kind(),
|
||||
SqlOverHttpError::ResponseTooLarge(_) => ErrorKind::User,
|
||||
SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User,
|
||||
SqlOverHttpError::Postgres(p) => p.get_error_kind(),
|
||||
// customer initiated SQL errors.
|
||||
SqlOverHttpError::Postgres(p) => {
|
||||
if p.as_db_error().is_some() {
|
||||
ErrorKind::User
|
||||
} else {
|
||||
ErrorKind::Compute
|
||||
}
|
||||
}
|
||||
// proxy initiated SQL errors.
|
||||
SqlOverHttpError::InternalPostgres(p) => {
|
||||
if p.as_db_error().is_some() {
|
||||
ErrorKind::Service
|
||||
@@ -468,6 +499,7 @@ impl ReportableError for SqlOverHttpError {
|
||||
ErrorKind::Compute
|
||||
}
|
||||
}
|
||||
// postgres returned a bad row format that we couldn't parse.
|
||||
SqlOverHttpError::JsonConversion(_) => ErrorKind::Postgres,
|
||||
SqlOverHttpError::Cancelled(c) => c.get_error_kind(),
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ pub trait ResponseErrorMessageExt: Sized {
|
||||
impl ResponseErrorMessageExt for reqwest::Response {
|
||||
async fn error_from_body(self) -> Result<Self> {
|
||||
let status = self.status();
|
||||
if !(status.is_client_error() || status.is_server_error()) {
|
||||
if status.is_success() {
|
||||
return Ok(self);
|
||||
}
|
||||
|
||||
|
||||
@@ -724,15 +724,21 @@ class NeonEnvBuilder:
|
||||
|
||||
shutil.copytree(storcon_db_from_dir, storcon_db_to_dir, ignore=ignore_postgres_log)
|
||||
assert not (storcon_db_to_dir / "postgres.log").exists()
|
||||
|
||||
# NB: neon_local rewrites postgresql.conf on each start based on neon_local config. No need to patch it.
|
||||
# However, in this new NeonEnv, the pageservers listen on different ports, and the storage controller
|
||||
# will currently reject re-attach requests from them because the NodeMetadata isn't identical.
|
||||
# However, in this new NeonEnv, the pageservers and safekeepers listen on different ports, and the storage
|
||||
# controller will currently reject re-attach requests from them because the NodeMetadata isn't identical.
|
||||
# So, from_repo_dir patches up the the storcon database.
|
||||
patch_script_path = self.repo_dir / "storage_controller_db.startup.sql"
|
||||
assert not patch_script_path.exists()
|
||||
patch_script = ""
|
||||
|
||||
for ps in self.env.pageservers:
|
||||
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';"
|
||||
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';\n"
|
||||
|
||||
for sk in self.env.safekeepers:
|
||||
patch_script += f"UPDATE safekeepers SET http_port={sk.port.http}, port={sk.port.pg} WHERE id = '{sk.id}';\n"
|
||||
|
||||
patch_script_path.write_text(patch_script)
|
||||
|
||||
# Update the config with info about tenants and timelines
|
||||
|
||||
@@ -76,6 +76,7 @@ if TYPE_CHECKING:
|
||||
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
|
||||
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
|
||||
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
|
||||
# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION}
|
||||
#
|
||||
# # Build previous version of binaries and store them somewhere:
|
||||
# rm -rf pg_install target
|
||||
@@ -102,6 +103,7 @@ if TYPE_CHECKING:
|
||||
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
|
||||
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
|
||||
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
|
||||
# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION}
|
||||
# export NEON_BIN=target/${BUILD_TYPE}
|
||||
# export POSTGRES_DISTRIB_DIR=pg_install
|
||||
#
|
||||
|
||||
@@ -399,7 +399,7 @@ def test_tx_abort_with_many_relations(
|
||||
# How many relations: this number is tuned to be long enough to take tens of seconds
|
||||
# if the rollback code path is buggy, tripping the test's timeout.
|
||||
n = 5000
|
||||
step = 2500
|
||||
step = 500
|
||||
|
||||
def create():
|
||||
# Create many relations
|
||||
|
||||
Reference in New Issue
Block a user