mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 20:10:38 +00:00
Compare commits
136 Commits
release-80
...
release-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
85072b715f | ||
|
|
6c86fe7143 | ||
|
|
66d5fe7f5b | ||
|
|
a1b9528757 | ||
|
|
1423bb8aa2 | ||
|
|
332f064a42 | ||
|
|
c962f2b447 | ||
|
|
446b3f9d28 | ||
|
|
23352dc2e9 | ||
|
|
c65fc5a955 | ||
|
|
3e624581cd | ||
|
|
fedf4f169c | ||
|
|
86d5798108 | ||
|
|
8b4088dd8a | ||
|
|
c91905e643 | ||
|
|
44b4e355a2 | ||
|
|
03666a1f37 | ||
|
|
9c92242ca0 | ||
|
|
a354071dd0 | ||
|
|
758680d4f8 | ||
|
|
1738fd0a96 | ||
|
|
87b7edfc72 | ||
|
|
def05700d5 | ||
|
|
b547681e08 | ||
|
|
0fd211537b | ||
|
|
a83bd4e81c | ||
|
|
ecdad5e6d5 | ||
|
|
d028929945 | ||
|
|
7b0e3db868 | ||
|
|
088eb72dd7 | ||
|
|
d550e3f626 | ||
|
|
8c6b41daf5 | ||
|
|
bbb050459b | ||
|
|
cab498c787 | ||
|
|
6359342ffb | ||
|
|
13285c2a5e | ||
|
|
33790d14a3 | ||
|
|
709b8cd371 | ||
|
|
1c9bbf1a92 | ||
|
|
16163fb850 | ||
|
|
73ccc2b08c | ||
|
|
c719be6474 | ||
|
|
718645e56c | ||
|
|
fbc8c36983 | ||
|
|
5519e42612 | ||
|
|
4157eaf4c5 | ||
|
|
60241127e2 | ||
|
|
f7d5322e8b | ||
|
|
41bb9c5280 | ||
|
|
69c0d61c5c | ||
|
|
63cb8ce975 | ||
|
|
907e4aa3c4 | ||
|
|
0a2a84b766 | ||
|
|
85b12ddd52 | ||
|
|
dd76f1eeee | ||
|
|
8963ac85f9 | ||
|
|
4a488b3e24 | ||
|
|
c4987b0b13 | ||
|
|
84b4821118 | ||
|
|
32ba9811f9 | ||
|
|
a0cd64c4d3 | ||
|
|
84687b743d | ||
|
|
b6f93dcec9 | ||
|
|
4f6c594973 | ||
|
|
a750c14735 | ||
|
|
9ce0dd4e55 | ||
|
|
0e1a336607 | ||
|
|
7fc2912d06 | ||
|
|
fdf231c237 | ||
|
|
1e08b5dccc | ||
|
|
030810ed3e | ||
|
|
62b74bdc2c | ||
|
|
8b7e9ed820 | ||
|
|
5dad89acd4 | ||
|
|
547b2d2827 | ||
|
|
93f29a0065 | ||
|
|
4f36494615 | ||
|
|
0a550f3e7d | ||
|
|
4bb9554e4a | ||
|
|
008616cfe6 | ||
|
|
e61ec94fbc | ||
|
|
e5152551ad | ||
|
|
b0822a5499 | ||
|
|
1fb6ab59e8 | ||
|
|
e16439400d | ||
|
|
e401f66698 | ||
|
|
2fa461b668 | ||
|
|
03d90bc0b3 | ||
|
|
268bc890ea | ||
|
|
8a6ee79f6f | ||
|
|
9052c32b46 | ||
|
|
995e729ebe | ||
|
|
76077e1ddf | ||
|
|
0467d88f06 | ||
|
|
f5eec194e7 | ||
|
|
7e00be391d | ||
|
|
d56599df2a | ||
|
|
9d9aab3680 | ||
|
|
a202b1b5cc | ||
|
|
90f731f3b1 | ||
|
|
7736b748d3 | ||
|
|
9c23333cb3 | ||
|
|
66a99009ba | ||
|
|
5d4c57491f | ||
|
|
73935ea3a2 | ||
|
|
32e595d4dd | ||
|
|
b0d69acb07 | ||
|
|
98355a419a | ||
|
|
cfb03d6cf0 | ||
|
|
d81ef3f962 | ||
|
|
5d62c67e75 | ||
|
|
53d53d5b1e | ||
|
|
29fe6ea47a | ||
|
|
640327ccb3 | ||
|
|
7cf0f6b37e | ||
|
|
03c2c569be | ||
|
|
eff6d4538a | ||
|
|
5ef7782e9c | ||
|
|
73101db8c4 | ||
|
|
bccdfc6d39 | ||
|
|
99595813bb | ||
|
|
fe07b54758 | ||
|
|
a42d173e7b | ||
|
|
e07f689238 | ||
|
|
7831eddc88 | ||
|
|
943b1bc80c | ||
|
|
95a184e9b7 | ||
|
|
3fa17e9d17 | ||
|
|
55e0fd9789 | ||
|
|
2a88889f44 | ||
|
|
5bad8126dc | ||
|
|
27bc242085 | ||
|
|
192b49cc6d | ||
|
|
e1b60f3693 | ||
|
|
2804f5323b | ||
|
|
676adc6b32 |
12
.github/workflows/periodic_pagebench.yml
vendored
12
.github/workflows/periodic_pagebench.yml
vendored
@@ -3,12 +3,12 @@ name: Periodic pagebench performance test on dedicated EC2 machine in eu-central
|
||||
on:
|
||||
schedule:
|
||||
# * is a special character in YAML so you have to quote this string
|
||||
# ┌───────────── minute (0 - 59)
|
||||
# │ ┌───────────── hour (0 - 23)
|
||||
# │ │ ┌───────────── day of the month (1 - 31)
|
||||
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
|
||||
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
|
||||
- cron: '0 */3 * * *' # Runs every 3 hours
|
||||
# ┌───────────── minute (0 - 59)
|
||||
# │ ┌───────────── hour (0 - 23)
|
||||
# │ │ ┌───────────── day of the month (1 - 31)
|
||||
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
|
||||
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
|
||||
- cron: '0 18 * * *' # Runs at 6 PM UTC every day
|
||||
workflow_dispatch: # Allows manual triggering of the workflow
|
||||
inputs:
|
||||
commit_hash:
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
# Autoscaling
|
||||
/libs/vm_monitor/ @neondatabase/autoscaling
|
||||
|
||||
# DevProd & PerfCorr
|
||||
/.github/ @neondatabase/developer-productivity @neondatabase/performance-correctness
|
||||
/test_runner/ @neondatabase/performance-correctness
|
||||
# DevProd
|
||||
/.github/ @neondatabase/developer-productivity
|
||||
|
||||
# Compute
|
||||
/pgxn/ @neondatabase/compute
|
||||
|
||||
13
Cargo.lock
generated
13
Cargo.lock
generated
@@ -1127,9 +1127,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.2.16"
|
||||
version = "1.1.30"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c"
|
||||
checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945"
|
||||
dependencies = [
|
||||
"jobserver",
|
||||
"libc",
|
||||
@@ -4303,7 +4303,6 @@ dependencies = [
|
||||
"tracing",
|
||||
"url",
|
||||
"utils",
|
||||
"uuid",
|
||||
"wal_decoder",
|
||||
"walkdir",
|
||||
"workspace_hack",
|
||||
@@ -5627,16 +5626,16 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.17.13"
|
||||
version = "0.17.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "70ac5d832aa16abd7d1def883a8545280c20a60f523a370aa3a9617c2b8550ee"
|
||||
checksum = "684d5e6e18f669ccebf64a92236bb7db9a34f07be010e3627368182027180866"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"getrandom 0.2.11",
|
||||
"libc",
|
||||
"spin",
|
||||
"untrusted",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -1980,10 +1980,12 @@ COPY --from=sql_exporter_preprocessor --chmod=0644 /home/nonroot/compute/etc/neo
|
||||
RUN echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
|
||||
|
||||
# rsyslog config permissions
|
||||
# directory for rsyslogd pid file
|
||||
RUN mkdir /var/run/rsyslogd && \
|
||||
chown -R postgres:postgres /var/run/rsyslogd && \
|
||||
chown -R postgres:postgres /etc/rsyslog.d/
|
||||
RUN chown postgres:postgres /etc/rsyslog.conf && \
|
||||
touch /etc/compute_rsyslog.conf && \
|
||||
chown -R postgres:postgres /etc/compute_rsyslog.conf && \
|
||||
# directory for rsyslogd pid file
|
||||
mkdir /var/run/rsyslogd && \
|
||||
chown -R postgres:postgres /var/run/rsyslogd
|
||||
|
||||
|
||||
ENV LANG=en_US.utf8
|
||||
|
||||
@@ -1,5 +1 @@
|
||||
SELECT sum(pg_database_size(datname)) AS total
|
||||
FROM pg_database
|
||||
-- Ignore invalid databases, as we will likely have problems with
|
||||
-- getting their size from the Pageserver.
|
||||
WHERE datconnlimit != -2;
|
||||
SELECT sum(pg_database_size(datname)) AS total FROM pg_database;
|
||||
|
||||
@@ -1,20 +1,10 @@
|
||||
-- We export stats for 10 non-system databases. Without this limit it is too
|
||||
-- easy to abuse the system by creating lots of databases.
|
||||
|
||||
SELECT pg_database_size(datname) AS db_size,
|
||||
deadlocks,
|
||||
tup_inserted AS inserted,
|
||||
tup_updated AS updated,
|
||||
tup_deleted AS deleted,
|
||||
datname
|
||||
SELECT pg_database_size(datname) AS db_size, deadlocks, tup_inserted AS inserted,
|
||||
tup_updated AS updated, tup_deleted AS deleted, datname
|
||||
FROM pg_stat_database
|
||||
WHERE datname IN (
|
||||
SELECT datname FROM pg_database
|
||||
-- Ignore invalid databases, as we will likely have problems with
|
||||
-- getting their size from the Pageserver.
|
||||
WHERE datconnlimit != -2
|
||||
AND datname <> 'postgres'
|
||||
AND NOT datistemplate
|
||||
ORDER BY oid
|
||||
LIMIT 10
|
||||
WHERE datname <> 'postgres' AND NOT datistemplate ORDER BY oid LIMIT 10
|
||||
);
|
||||
|
||||
@@ -39,10 +39,6 @@ commands:
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
|
||||
- name: rsyslogd
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
shell: '/usr/sbin/rsyslogd -n -i /var/run/rsyslogd/rsyslogd.pid -f /etc/compute_rsyslog.conf'
|
||||
shutdownHook: |
|
||||
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
|
||||
files:
|
||||
@@ -73,12 +69,6 @@ files:
|
||||
}
|
||||
memory {}
|
||||
}
|
||||
# Create dummy rsyslog config, because it refuses to start without at least one action configured.
|
||||
# compute_ctl will rewrite this file with the actual configuration, if needed.
|
||||
- filename: compute_rsyslog.conf
|
||||
content: |
|
||||
*.* /dev/null
|
||||
$IncludeConfig /etc/rsyslog.d/*.conf
|
||||
build: |
|
||||
# Build cgroup-tools
|
||||
#
|
||||
@@ -142,12 +132,6 @@ merge: |
|
||||
RUN set -e \
|
||||
&& chmod 0644 /etc/cgconfig.conf
|
||||
|
||||
|
||||
COPY compute_rsyslog.conf /etc/compute_rsyslog.conf
|
||||
RUN chmod 0666 /etc/compute_rsyslog.conf
|
||||
RUN chmod 0666 /var/log/
|
||||
|
||||
|
||||
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
|
||||
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
|
||||
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/
|
||||
|
||||
@@ -39,10 +39,6 @@ commands:
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
|
||||
- name: rsyslogd
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
shell: '/usr/sbin/rsyslogd -n -i /var/run/rsyslogd/rsyslogd.pid -f /etc/compute_rsyslog.conf'
|
||||
shutdownHook: |
|
||||
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
|
||||
files:
|
||||
@@ -73,12 +69,6 @@ files:
|
||||
}
|
||||
memory {}
|
||||
}
|
||||
# Create dummy rsyslog config, because it refuses to start without at least one action configured.
|
||||
# compute_ctl will rewrite this file with the actual configuration, if needed.
|
||||
- filename: compute_rsyslog.conf
|
||||
content: |
|
||||
*.* /dev/null
|
||||
$IncludeConfig /etc/rsyslog.d/*.conf
|
||||
build: |
|
||||
# Build cgroup-tools
|
||||
#
|
||||
@@ -138,11 +128,6 @@ merge: |
|
||||
RUN set -e \
|
||||
&& chmod 0644 /etc/cgconfig.conf
|
||||
|
||||
COPY compute_rsyslog.conf /etc/compute_rsyslog.conf
|
||||
RUN chmod 0666 /etc/compute_rsyslog.conf
|
||||
RUN chmod 0666 /var/log/
|
||||
|
||||
|
||||
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
|
||||
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
|
||||
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/
|
||||
|
||||
@@ -37,7 +37,7 @@ use crate::logger::startup_context_from_env;
|
||||
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
|
||||
use crate::monitor::launch_monitor;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::rsyslog::configure_audit_rsyslog;
|
||||
use crate::rsyslog::configure_and_start_rsyslog;
|
||||
use crate::spec::*;
|
||||
use crate::swap::resize_swap;
|
||||
use crate::sync_sk::{check_if_synced, ping_safekeeper};
|
||||
@@ -297,6 +297,79 @@ struct StartVmMonitorResult {
|
||||
vm_monitor: Option<tokio::task::JoinHandle<Result<()>>>,
|
||||
}
|
||||
|
||||
pub(crate) fn construct_superuser_query(spec: &ComputeSpec) -> String {
|
||||
let roles = spec
|
||||
.cluster
|
||||
.roles
|
||||
.iter()
|
||||
.map(|r| escape_literal(&r.name))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let dbs = spec
|
||||
.cluster
|
||||
.databases
|
||||
.iter()
|
||||
.map(|db| escape_literal(&db.name))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let roles_decl = if roles.is_empty() {
|
||||
String::from("roles text[] := NULL;")
|
||||
} else {
|
||||
format!(
|
||||
r#"
|
||||
roles text[] := ARRAY(SELECT rolname
|
||||
FROM pg_catalog.pg_roles
|
||||
WHERE rolname IN ({}));"#,
|
||||
roles.join(", ")
|
||||
)
|
||||
};
|
||||
|
||||
let database_decl = if dbs.is_empty() {
|
||||
String::from("dbs text[] := NULL;")
|
||||
} else {
|
||||
format!(
|
||||
r#"
|
||||
dbs text[] := ARRAY(SELECT datname
|
||||
FROM pg_catalog.pg_database
|
||||
WHERE datname IN ({}));"#,
|
||||
dbs.join(", ")
|
||||
)
|
||||
};
|
||||
|
||||
// ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
|
||||
// (see https://www.postgresql.org/docs/current/ddl-priv.html)
|
||||
let query = format!(
|
||||
r#"
|
||||
DO $$
|
||||
DECLARE
|
||||
r text;
|
||||
{}
|
||||
{}
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
|
||||
THEN
|
||||
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
|
||||
IF array_length(roles, 1) IS NOT NULL THEN
|
||||
EXECUTE format('GRANT neon_superuser TO %s',
|
||||
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
|
||||
FOREACH r IN ARRAY roles LOOP
|
||||
EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
|
||||
END LOOP;
|
||||
END IF;
|
||||
IF array_length(dbs, 1) IS NOT NULL THEN
|
||||
EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
|
||||
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
|
||||
END IF;
|
||||
END IF;
|
||||
END
|
||||
$$;"#,
|
||||
roles_decl, database_decl,
|
||||
);
|
||||
|
||||
query
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
pub fn new(
|
||||
params: ComputeNodeParams,
|
||||
@@ -616,7 +689,7 @@ impl ComputeNode {
|
||||
let log_directory_path = Path::new(&self.params.pgdata).join("log");
|
||||
// TODO: make this more robust
|
||||
// now rsyslog starts once and there is no monitoring or restart if it fails
|
||||
configure_audit_rsyslog(
|
||||
configure_and_start_rsyslog(
|
||||
log_directory_path.to_str().unwrap(),
|
||||
"hipaa",
|
||||
&remote_endpoint,
|
||||
|
||||
@@ -186,40 +186,15 @@ impl DatabaseExt for Database {
|
||||
/// Postgres SQL queries and DATABASE_URL.
|
||||
pub trait Escaping {
|
||||
fn pg_quote(&self) -> String;
|
||||
fn pg_quote_dollar(&self) -> (String, String);
|
||||
}
|
||||
|
||||
impl Escaping for PgIdent {
|
||||
/// This is intended to mimic Postgres quote_ident(), but for simplicity it
|
||||
/// always quotes provided string with `""` and escapes every `"`.
|
||||
/// **Not idempotent**, i.e. if string is already escaped it will be escaped again.
|
||||
/// N.B. it's not useful for escaping identifiers that are used inside WHERE
|
||||
/// clause, use `escape_literal()` instead.
|
||||
fn pg_quote(&self) -> String {
|
||||
format!("\"{}\"", self.replace('"', "\"\""))
|
||||
}
|
||||
|
||||
/// This helper is intended to be used for dollar-escaping strings for usage
|
||||
/// inside PL/pgSQL procedures. In addition to dollar-escaping the string,
|
||||
/// it also returns a tag that is intended to be used inside the outer
|
||||
/// PL/pgSQL procedure. If you do not need an outer tag, just discard it.
|
||||
/// Here we somewhat mimic the logic of Postgres' `pg_get_functiondef()`,
|
||||
/// <https://github.com/postgres/postgres/blob/8b49392b270b4ac0b9f5c210e2a503546841e832/src/backend/utils/adt/ruleutils.c#L2924>
|
||||
fn pg_quote_dollar(&self) -> (String, String) {
|
||||
let mut tag: String = "".to_string();
|
||||
let mut outer_tag = "x".to_string();
|
||||
|
||||
// Find the first suitable tag that is not present in the string.
|
||||
// Postgres' max role/DB name length is 63 bytes, so even in the
|
||||
// worst case it won't take long.
|
||||
while self.contains(&format!("${tag}$")) || self.contains(&format!("${outer_tag}$")) {
|
||||
tag += "x";
|
||||
outer_tag = tag.clone() + "x";
|
||||
}
|
||||
|
||||
let escaped = format!("${tag}${self}${tag}$");
|
||||
|
||||
(escaped, outer_tag)
|
||||
let result = format!("\"{}\"", self.replace('"', "\"\""));
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -251,13 +226,10 @@ pub async fn get_existing_dbs_async(
|
||||
// invalid state. See:
|
||||
// https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
|
||||
let rowstream = client
|
||||
// We use a subquery instead of a fancy `datdba::regrole::text AS owner`,
|
||||
// because the latter automatically wraps the result in double quotes,
|
||||
// if the role name contains special characters.
|
||||
.query_raw::<str, &String, &[String; 0]>(
|
||||
"SELECT
|
||||
datname AS name,
|
||||
(SELECT rolname FROM pg_roles WHERE oid = datdba) AS owner,
|
||||
datdba::regrole::text AS owner,
|
||||
NOT datallowconn AS restrict_conn,
|
||||
datconnlimit = - 2 AS invalid
|
||||
FROM
|
||||
|
||||
@@ -21,34 +21,40 @@ fn get_rsyslog_pid() -> Option<String> {
|
||||
}
|
||||
}
|
||||
|
||||
// Restart rsyslogd to apply the new configuration.
|
||||
// This is necessary, because there is no other way to reload the rsyslog configuration.
|
||||
//
|
||||
// Rsyslogd shouldn't lose any messages, because of the restart,
|
||||
// because it tracks the last read position in the log files
|
||||
// and will continue reading from that position.
|
||||
// TODO: test it properly
|
||||
//
|
||||
fn restart_rsyslog() -> Result<()> {
|
||||
let old_pid = get_rsyslog_pid().context("rsyslogd is not running")?;
|
||||
info!("rsyslogd is running with pid: {}, restart it", old_pid);
|
||||
// Start rsyslogd with the specified configuration file
|
||||
// If it is already running, do nothing.
|
||||
fn start_rsyslog(rsyslog_conf_path: &str) -> Result<()> {
|
||||
let pid = get_rsyslog_pid();
|
||||
if let Some(pid) = pid {
|
||||
info!("rsyslogd is already running with pid: {}", pid);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// kill it to restart
|
||||
let _ = Command::new("pkill")
|
||||
.arg("rsyslogd")
|
||||
let _ = Command::new("/usr/sbin/rsyslogd")
|
||||
.arg("-f")
|
||||
.arg(rsyslog_conf_path)
|
||||
.arg("-i")
|
||||
.arg("/var/run/rsyslogd/rsyslogd.pid")
|
||||
.output()
|
||||
.context("Failed to stop rsyslogd")?;
|
||||
.context("Failed to start rsyslogd")?;
|
||||
|
||||
// Check that rsyslogd is running
|
||||
if let Some(pid) = get_rsyslog_pid() {
|
||||
info!("rsyslogd started successfully with pid: {}", pid);
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("Failed to start rsyslogd"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn configure_audit_rsyslog(
|
||||
pub fn configure_and_start_rsyslog(
|
||||
log_directory: &str,
|
||||
tag: &str,
|
||||
remote_endpoint: &str,
|
||||
) -> Result<()> {
|
||||
let config_content: String = format!(
|
||||
include_str!("config_template/compute_audit_rsyslog_template.conf"),
|
||||
include_str!("config_template/compute_rsyslog_template.conf"),
|
||||
log_directory = log_directory,
|
||||
tag = tag,
|
||||
remote_endpoint = remote_endpoint
|
||||
@@ -56,7 +62,7 @@ pub fn configure_audit_rsyslog(
|
||||
|
||||
info!("rsyslog config_content: {}", config_content);
|
||||
|
||||
let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
|
||||
let rsyslog_conf_path = "/etc/compute_rsyslog.conf";
|
||||
let mut file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
@@ -65,13 +71,10 @@ pub fn configure_audit_rsyslog(
|
||||
|
||||
file.write_all(config_content.as_bytes())?;
|
||||
|
||||
info!(
|
||||
"rsyslog configuration file {} added successfully. Starting rsyslogd",
|
||||
rsyslog_conf_path
|
||||
);
|
||||
info!("rsyslog configuration added successfully. Starting rsyslogd");
|
||||
|
||||
// start the service, using the configuration
|
||||
restart_rsyslog()?;
|
||||
start_rsyslog(rsyslog_conf_path)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -13,17 +13,16 @@ use tokio_postgres::Client;
|
||||
use tokio_postgres::error::SqlState;
|
||||
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
|
||||
|
||||
use crate::compute::{ComputeNode, ComputeState};
|
||||
use crate::compute::{ComputeNode, ComputeState, construct_superuser_query};
|
||||
use crate::pg_helpers::{
|
||||
DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, get_existing_dbs_async,
|
||||
DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal, get_existing_dbs_async,
|
||||
get_existing_roles_async,
|
||||
};
|
||||
use crate::spec_apply::ApplySpecPhase::{
|
||||
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateNeonSuperuser,
|
||||
CreatePgauditExtension, CreatePgauditlogtofileExtension, CreateSchemaNeon,
|
||||
DisablePostgresDBPgAudit, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
|
||||
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
|
||||
RunInEachDatabase,
|
||||
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreatePgauditExtension,
|
||||
CreatePgauditlogtofileExtension, CreateSchemaNeon, CreateSuperUser, DisablePostgresDBPgAudit,
|
||||
DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, HandleNeonExtension,
|
||||
HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
|
||||
};
|
||||
use crate::spec_apply::PerDatabasePhase::{
|
||||
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
|
||||
@@ -188,7 +187,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
for phase in [
|
||||
CreateNeonSuperuser,
|
||||
CreateSuperUser,
|
||||
DropInvalidDatabases,
|
||||
RenameRoles,
|
||||
CreateAndAlterRoles,
|
||||
@@ -469,7 +468,7 @@ pub enum PerDatabasePhase {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum ApplySpecPhase {
|
||||
CreateNeonSuperuser,
|
||||
CreateSuperUser,
|
||||
DropInvalidDatabases,
|
||||
RenameRoles,
|
||||
CreateAndAlterRoles,
|
||||
@@ -596,10 +595,14 @@ async fn get_operations<'a>(
|
||||
apply_spec_phase: &'a ApplySpecPhase,
|
||||
) -> Result<Box<dyn Iterator<Item = Operation> + 'a + Send>> {
|
||||
match apply_spec_phase {
|
||||
ApplySpecPhase::CreateNeonSuperuser => Ok(Box::new(once(Operation {
|
||||
query: include_str!("sql/create_neon_superuser.sql").to_string(),
|
||||
comment: None,
|
||||
}))),
|
||||
ApplySpecPhase::CreateSuperUser => {
|
||||
let query = construct_superuser_query(spec);
|
||||
|
||||
Ok(Box::new(once(Operation {
|
||||
query,
|
||||
comment: None,
|
||||
})))
|
||||
}
|
||||
ApplySpecPhase::DropInvalidDatabases => {
|
||||
let mut ctx = ctx.write().await;
|
||||
let databases = &mut ctx.dbs;
|
||||
@@ -733,15 +736,14 @@ async fn get_operations<'a>(
|
||||
// We do not check whether the DB exists or not,
|
||||
// Postgres will take care of it for us
|
||||
"delete_db" => {
|
||||
let (db_name, outer_tag) = op.name.pg_quote_dollar();
|
||||
// In Postgres we can't drop a database if it is a template.
|
||||
// So we need to unset the template flag first, but it could
|
||||
// be a retry, so we could've already dropped the database.
|
||||
// Check that database exists first to make it idempotent.
|
||||
let unset_template_query: String = format!(
|
||||
include_str!("sql/unset_template_for_drop_dbs.sql"),
|
||||
datname = db_name,
|
||||
outer_tag = outer_tag,
|
||||
datname_str = escape_literal(&op.name),
|
||||
datname = &op.name.pg_quote()
|
||||
);
|
||||
|
||||
// Use FORCE to drop database even if there are active connections.
|
||||
@@ -848,8 +850,6 @@ async fn get_operations<'a>(
|
||||
comment: None,
|
||||
},
|
||||
Operation {
|
||||
// ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on the database
|
||||
// (see https://www.postgresql.org/docs/current/ddl-priv.html)
|
||||
query: format!(
|
||||
"GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
|
||||
db.name.pg_quote()
|
||||
@@ -909,11 +909,9 @@ async fn get_operations<'a>(
|
||||
PerDatabasePhase::DropLogicalSubscriptions => {
|
||||
match &db {
|
||||
DB::UserDB(db) => {
|
||||
let (db_name, outer_tag) = db.name.pg_quote_dollar();
|
||||
let drop_subscription_query: String = format!(
|
||||
include_str!("sql/drop_subscriptions.sql"),
|
||||
datname_str = db_name,
|
||||
outer_tag = outer_tag,
|
||||
datname_str = escape_literal(&db.name),
|
||||
);
|
||||
|
||||
let operations = vec![Operation {
|
||||
@@ -952,7 +950,6 @@ async fn get_operations<'a>(
|
||||
DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
|
||||
DB::UserDB(db) => db.owner.pg_quote(),
|
||||
};
|
||||
let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
|
||||
|
||||
Some(vec![
|
||||
// This will reassign all dependent objects to the db owner
|
||||
@@ -967,9 +964,7 @@ async fn get_operations<'a>(
|
||||
Operation {
|
||||
query: format!(
|
||||
include_str!("sql/pre_drop_role_revoke_privileges.sql"),
|
||||
// N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
|
||||
role_name = escaped_role,
|
||||
outer_tag = outer_tag,
|
||||
role_name = quoted,
|
||||
),
|
||||
comment: None,
|
||||
},
|
||||
@@ -994,14 +989,12 @@ async fn get_operations<'a>(
|
||||
DB::SystemDB => return Ok(Box::new(empty())),
|
||||
DB::UserDB(db) => db,
|
||||
};
|
||||
let (db_owner, outer_tag) = db.owner.pg_quote_dollar();
|
||||
|
||||
let operations = vec![
|
||||
Operation {
|
||||
query: format!(
|
||||
include_str!("sql/set_public_schema_owner.sql"),
|
||||
db_owner = db_owner,
|
||||
outer_tag = outer_tag,
|
||||
db_owner = db.owner.pg_quote()
|
||||
),
|
||||
comment: None,
|
||||
},
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
|
||||
THEN
|
||||
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
@@ -1,4 +1,4 @@
|
||||
DO ${outer_tag}$
|
||||
DO $$
|
||||
DECLARE
|
||||
subname TEXT;
|
||||
BEGIN
|
||||
@@ -9,4 +9,4 @@ BEGIN
|
||||
EXECUTE format('DROP SUBSCRIPTION %I;', subname);
|
||||
END LOOP;
|
||||
END;
|
||||
${outer_tag}$;
|
||||
$$;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
SET SESSION ROLE neon_superuser;
|
||||
|
||||
DO ${outer_tag}$
|
||||
DO $$
|
||||
DECLARE
|
||||
schema TEXT;
|
||||
revoke_query TEXT;
|
||||
@@ -16,15 +16,13 @@ BEGIN
|
||||
WHERE schema_name IN ('public')
|
||||
LOOP
|
||||
revoke_query := format(
|
||||
'REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA %I FROM %I GRANTED BY neon_superuser;',
|
||||
schema,
|
||||
-- N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
|
||||
{role_name}
|
||||
'REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA %I FROM {role_name} GRANTED BY neon_superuser;',
|
||||
schema
|
||||
);
|
||||
|
||||
EXECUTE revoke_query;
|
||||
END LOOP;
|
||||
END;
|
||||
${outer_tag}$;
|
||||
$$;
|
||||
|
||||
RESET ROLE;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
DO ${outer_tag}$
|
||||
DO
|
||||
$$
|
||||
DECLARE
|
||||
schema_owner TEXT;
|
||||
BEGIN
|
||||
@@ -15,8 +16,8 @@ DO ${outer_tag}$
|
||||
|
||||
IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'
|
||||
THEN
|
||||
EXECUTE format('ALTER SCHEMA public OWNER TO %I', {db_owner});
|
||||
ALTER SCHEMA public OWNER TO {db_owner};
|
||||
END IF;
|
||||
END IF;
|
||||
END
|
||||
${outer_tag}$;
|
||||
$$;
|
||||
@@ -1,12 +1,12 @@
|
||||
DO ${outer_tag}$
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS(
|
||||
SELECT 1
|
||||
FROM pg_catalog.pg_database
|
||||
WHERE datname = {datname}
|
||||
WHERE datname = {datname_str}
|
||||
)
|
||||
THEN
|
||||
EXECUTE format('ALTER DATABASE %I is_template false', {datname});
|
||||
ALTER DATABASE {datname} is_template false;
|
||||
END IF;
|
||||
END
|
||||
${outer_tag}$;
|
||||
$$;
|
||||
@@ -61,23 +61,6 @@ test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hoor
|
||||
assert_eq!(ident.pg_quote(), "\"\"\"name\"\";\\n select 1;\"");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ident_pg_quote_dollar() {
|
||||
let test_cases = vec![
|
||||
("name", ("$$name$$", "x")),
|
||||
("name$$", ("$x$name$$$x$", "xx")),
|
||||
("name$$$", ("$x$name$$$$x$", "xx")),
|
||||
("name$$$$", ("$x$name$$$$$x$", "xx")),
|
||||
("name$x$", ("$xx$name$x$$xx$", "xxx")),
|
||||
];
|
||||
|
||||
for (input, expected) in test_cases {
|
||||
let (escaped, tag) = PgIdent::from(input).pg_quote_dollar();
|
||||
assert_eq!(escaped, expected.0);
|
||||
assert_eq!(tag, expected.1);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn generic_options_search() {
|
||||
let generic_options: GenericOptions = Some(vec![
|
||||
|
||||
@@ -1146,15 +1146,6 @@ pub struct TimelineArchivalConfigRequest {
|
||||
pub state: TimelineArchivalState,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct TimelinePatchIndexPartRequest {
|
||||
pub rel_size_migration: Option<RelSizeMigration>,
|
||||
pub gc_compaction_last_completed_lsn: Option<Lsn>,
|
||||
pub applied_gc_cutoff_lsn: Option<Lsn>,
|
||||
#[serde(default)]
|
||||
pub force_index_update: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct TimelinesInfoAndOffloaded {
|
||||
pub timelines: Vec<TimelineInfo>,
|
||||
|
||||
@@ -98,7 +98,6 @@ criterion.workspace = true
|
||||
hex-literal.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
|
||||
indoc.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
[[bench]]
|
||||
name = "bench_layer_map"
|
||||
|
||||
@@ -456,8 +456,8 @@ impl PageServerConf {
|
||||
no_sync: no_sync.unwrap_or(false),
|
||||
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
|
||||
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
|
||||
load_previous_heatmap: load_previous_heatmap.unwrap_or(true),
|
||||
generate_unarchival_heatmap: generate_unarchival_heatmap.unwrap_or(true),
|
||||
load_previous_heatmap: load_previous_heatmap.unwrap_or(false),
|
||||
generate_unarchival_heatmap: generate_unarchival_heatmap.unwrap_or(false),
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------
|
||||
@@ -491,9 +491,7 @@ impl PageServerConf {
|
||||
#[cfg(test)]
|
||||
pub fn test_repo_dir(test_name: &str) -> Utf8PathBuf {
|
||||
let test_output_dir = std::env::var("TEST_OUTPUT").unwrap_or("../tmp_check".into());
|
||||
|
||||
let test_id = uuid::Uuid::new_v4();
|
||||
Utf8PathBuf::from(format!("{test_output_dir}/test_{test_name}_{test_id}"))
|
||||
Utf8PathBuf::from(format!("{test_output_dir}/test_{test_name}"))
|
||||
}
|
||||
|
||||
pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {
|
||||
|
||||
@@ -37,8 +37,7 @@ use pageserver_api::models::{
|
||||
TenantShardSplitResponse, TenantSorting, TenantState, TenantWaitLsnRequest,
|
||||
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateRequestMode,
|
||||
TimelineCreateRequestModeImportPgdata, TimelineGcRequest, TimelineInfo,
|
||||
TimelinePatchIndexPartRequest, TimelinesInfoAndOffloaded, TopTenantShardItem,
|
||||
TopTenantShardsRequest, TopTenantShardsResponse,
|
||||
TimelinesInfoAndOffloaded, TopTenantShardItem, TopTenantShardsRequest, TopTenantShardsResponse,
|
||||
};
|
||||
use pageserver_api::shard::{ShardCount, TenantShardId};
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, TimeTravelError};
|
||||
@@ -64,7 +63,6 @@ use crate::tenant::mgr::{
|
||||
GetActiveTenantError, GetTenantError, TenantManager, TenantMapError, TenantMapInsertError,
|
||||
TenantSlot, TenantSlotError, TenantSlotUpsertError, TenantStateError, UpsertLocationError,
|
||||
};
|
||||
use crate::tenant::remote_timeline_client::index::GcCompactionState;
|
||||
use crate::tenant::remote_timeline_client::{
|
||||
download_index_part, list_remote_tenant_shards, list_remote_timelines,
|
||||
};
|
||||
@@ -860,75 +858,6 @@ async fn timeline_archival_config_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// This API is used to patch the index part of a timeline. You must ensure such patches are safe to apply. Use this API as an emergency
|
||||
/// measure only.
|
||||
///
|
||||
/// Some examples of safe patches:
|
||||
/// - Increase the gc_cutoff and gc_compaction_cutoff to a larger value in case of a bug that didn't bump the cutoff and cause read errors.
|
||||
/// - Force set the index part to use reldir v2 (migrating/migrated).
|
||||
///
|
||||
/// Some examples of unsafe patches:
|
||||
/// - Force set the index part from v2 to v1 (legacy). This will cause the code path to ignore anything written to the new keyspace and cause
|
||||
/// errors.
|
||||
/// - Decrease the gc_cutoff without validating the data really exists. It will cause read errors in the background.
|
||||
async fn timeline_patch_index_part_handler(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
|
||||
let request_data: TimelinePatchIndexPartRequest = json_request(&mut request).await?;
|
||||
check_permission(&request, None)?; // require global permission for this request
|
||||
let state = get_state(&request);
|
||||
|
||||
async {
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
if let Some(rel_size_migration) = request_data.rel_size_migration {
|
||||
timeline
|
||||
.update_rel_size_v2_status(rel_size_migration)
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
}
|
||||
|
||||
if let Some(gc_compaction_last_completed_lsn) =
|
||||
request_data.gc_compaction_last_completed_lsn
|
||||
{
|
||||
timeline
|
||||
.update_gc_compaction_state(GcCompactionState {
|
||||
last_completed_lsn: gc_compaction_last_completed_lsn,
|
||||
})
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
}
|
||||
|
||||
if let Some(applied_gc_cutoff_lsn) = request_data.applied_gc_cutoff_lsn {
|
||||
{
|
||||
let guard = timeline.applied_gc_cutoff_lsn.lock_for_write();
|
||||
guard.store_and_unlock(applied_gc_cutoff_lsn);
|
||||
}
|
||||
}
|
||||
|
||||
if request_data.force_index_update {
|
||||
timeline
|
||||
.remote_client
|
||||
.force_schedule_index_upload()
|
||||
.context("force schedule index upload")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
}
|
||||
|
||||
Ok::<_, ApiError>(())
|
||||
}
|
||||
.instrument(info_span!("timeline_patch_index_part",
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug(),
|
||||
%timeline_id))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn timeline_detail_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -3700,10 +3629,6 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_timestamp_of_lsn",
|
||||
|r| api_handler(r, get_timestamp_of_lsn_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/patch_index_part",
|
||||
|r| api_handler(r, timeline_patch_index_part_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/lsn_lease",
|
||||
|r| api_handler(r, lsn_lease_handler),
|
||||
|
||||
@@ -143,29 +143,6 @@ pub(crate) static LAYERS_PER_READ_GLOBAL: Lazy<Histogram> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static LAYERS_PER_READ_BATCH_GLOBAL: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_layers_per_read_batch_global",
|
||||
"Layers visited to serve a single read batch (read amplification), regardless of number of reads.",
|
||||
vec![
|
||||
1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0
|
||||
],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static LAYERS_PER_READ_AMORTIZED_GLOBAL: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_layers_per_read_amortized_global",
|
||||
"Layers visited to serve a single read (read amplification). Amortized across a batch: \
|
||||
all visited layers are divided by number of reads.",
|
||||
vec![
|
||||
1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0
|
||||
],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static DELTAS_PER_READ_GLOBAL: Lazy<Histogram> = Lazy::new(|| {
|
||||
// We expect this to be low because of Postgres checkpoints. Let's see if that holds.
|
||||
register_histogram!(
|
||||
@@ -4097,8 +4074,6 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) {
|
||||
// histograms
|
||||
[
|
||||
&LAYERS_PER_READ_GLOBAL,
|
||||
&LAYERS_PER_READ_BATCH_GLOBAL,
|
||||
&LAYERS_PER_READ_AMORTIZED_GLOBAL,
|
||||
&DELTAS_PER_READ_GLOBAL,
|
||||
&WAIT_LSN_TIME,
|
||||
&WAL_REDO_TIME,
|
||||
|
||||
@@ -954,14 +954,6 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Only used in the `patch_index_part` HTTP API to force trigger an index upload.
|
||||
pub fn force_schedule_index_upload(self: &Arc<Self>) -> Result<(), NotInitialized> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
self.schedule_index_upload(upload_queue);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Launch an index-file upload operation in the background (internal function)
|
||||
fn schedule_index_upload(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
|
||||
let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn();
|
||||
|
||||
@@ -869,7 +869,8 @@ impl<'a> TenantDownloader<'a> {
|
||||
let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap();
|
||||
|
||||
let layers_in_heatmap = heatmap_timeline
|
||||
.hot_layers()
|
||||
.layers
|
||||
.iter()
|
||||
.map(|l| (&l.name, l.metadata.generation))
|
||||
.collect::<HashSet<_>>();
|
||||
let layers_on_disk = timeline_state
|
||||
@@ -1014,8 +1015,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
// Accumulate updates to the state
|
||||
let mut touched = Vec::new();
|
||||
|
||||
let timeline_id = timeline.timeline_id;
|
||||
for layer in timeline.into_hot_layers() {
|
||||
for layer in timeline.layers {
|
||||
if self.secondary_state.cancel.is_cancelled() {
|
||||
tracing::debug!("Cancelled -- dropping out of layer loop");
|
||||
return (Err(UpdateError::Cancelled), touched);
|
||||
@@ -1040,7 +1040,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
|
||||
match self
|
||||
.download_layer(tenant_shard_id, &timeline_id, layer, ctx)
|
||||
.download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
|
||||
.await
|
||||
{
|
||||
Ok(Some(layer)) => touched.push(layer),
|
||||
@@ -1148,7 +1148,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.hot_layers().count());
|
||||
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
|
||||
|
||||
let (result, touched) = self
|
||||
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
|
||||
@@ -1316,11 +1316,11 @@ async fn init_timeline_state(
|
||||
// As we iterate through layers found on disk, we will look up their metadata from this map.
|
||||
// Layers not present in metadata will be discarded.
|
||||
let heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
|
||||
heatmap.hot_layers().map(|l| (&l.name, l)).collect();
|
||||
heatmap.layers.iter().map(|l| (&l.name, l)).collect();
|
||||
|
||||
let last_heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
|
||||
if let Some(last_heatmap) = last_heatmap {
|
||||
last_heatmap.hot_layers().map(|l| (&l.name, l)).collect()
|
||||
last_heatmap.layers.iter().map(|l| (&l.name, l)).collect()
|
||||
} else {
|
||||
HashMap::new()
|
||||
};
|
||||
|
||||
@@ -42,7 +42,7 @@ pub(crate) struct HeatMapTimeline {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
|
||||
layers: Vec<HeatMapLayer>,
|
||||
pub(crate) layers: Vec<HeatMapLayer>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -53,10 +53,8 @@ pub(crate) struct HeatMapLayer {
|
||||
|
||||
#[serde_as(as = "TimestampSeconds<i64>")]
|
||||
pub(crate) access_time: SystemTime,
|
||||
|
||||
#[serde(default)]
|
||||
pub(crate) cold: bool, // TODO: an actual 'heat' score that would let secondary locations prioritize downloading
|
||||
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
|
||||
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
|
||||
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
|
||||
}
|
||||
|
||||
impl HeatMapLayer {
|
||||
@@ -64,13 +62,11 @@ impl HeatMapLayer {
|
||||
name: LayerName,
|
||||
metadata: LayerFileMetadata,
|
||||
access_time: SystemTime,
|
||||
cold: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
name,
|
||||
metadata,
|
||||
access_time,
|
||||
cold,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -82,18 +78,6 @@ impl HeatMapTimeline {
|
||||
layers,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn into_hot_layers(self) -> impl Iterator<Item = HeatMapLayer> {
|
||||
self.layers.into_iter().filter(|l| !l.cold)
|
||||
}
|
||||
|
||||
pub(crate) fn hot_layers(&self) -> impl Iterator<Item = &HeatMapLayer> {
|
||||
self.layers.iter().filter(|l| !l.cold)
|
||||
}
|
||||
|
||||
pub(crate) fn all_layers(&self) -> impl Iterator<Item = &HeatMapLayer> {
|
||||
self.layers.iter()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct HeatMapStats {
|
||||
@@ -108,7 +92,7 @@ impl HeatMapTenant {
|
||||
layers: 0,
|
||||
};
|
||||
for timeline in &self.timelines {
|
||||
for layer in timeline.hot_layers() {
|
||||
for layer in &timeline.layers {
|
||||
stats.layers += 1;
|
||||
stats.bytes += layer.metadata.file_size;
|
||||
}
|
||||
|
||||
@@ -1563,10 +1563,10 @@ impl LayerInner {
|
||||
|
||||
self.access_stats.record_residence_event();
|
||||
|
||||
*self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
|
||||
|
||||
self.status.as_ref().unwrap().send_replace(Status::Evicted);
|
||||
|
||||
*self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -99,8 +99,7 @@ use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate,
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
use crate::metrics::{
|
||||
DELTAS_PER_READ_GLOBAL, LAYERS_PER_READ_AMORTIZED_GLOBAL, LAYERS_PER_READ_BATCH_GLOBAL,
|
||||
LAYERS_PER_READ_GLOBAL, ScanLatencyOngoingRecording, TimelineMetrics,
|
||||
DELTAS_PER_READ_GLOBAL, LAYERS_PER_READ_GLOBAL, ScanLatencyOngoingRecording, TimelineMetrics,
|
||||
};
|
||||
use crate::page_service::TenantManagerTypes;
|
||||
use crate::pgdatadir_mapping::{
|
||||
@@ -1331,6 +1330,10 @@ impl Timeline {
|
||||
// (this is a requirement, not a bug). Skip updating the metric in these cases
|
||||
// to avoid infinite results.
|
||||
if !results.is_empty() {
|
||||
// Record the total number of layers visited towards each key in the batch. While some
|
||||
// layers may not intersect with a given read, and the cost of layer visits are
|
||||
// amortized across the batch, each visited layer contributes directly to the observed
|
||||
// latency for every read in the batch, which is what we care about.
|
||||
if layers_visited >= Self::LAYERS_VISITED_WARN_THRESHOLD {
|
||||
static LOG_PACER: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(60))));
|
||||
@@ -1345,23 +1348,9 @@ impl Timeline {
|
||||
});
|
||||
}
|
||||
|
||||
// Records the number of layers visited in a few different ways:
|
||||
//
|
||||
// * LAYERS_PER_READ: all layers count towards every read in the batch, because each
|
||||
// layer directly affects its observed latency.
|
||||
//
|
||||
// * LAYERS_PER_READ_BATCH: all layers count towards each batch, to get the per-batch
|
||||
// layer visits and access cost.
|
||||
//
|
||||
// * LAYERS_PER_READ_AMORTIZED: the average layer count per read, to get the amortized
|
||||
// read amplification after batching.
|
||||
let layers_visited = layers_visited as f64;
|
||||
let avg_layers_visited = layers_visited / results.len() as f64;
|
||||
LAYERS_PER_READ_BATCH_GLOBAL.observe(layers_visited);
|
||||
for _ in &results {
|
||||
self.metrics.layers_per_read.observe(layers_visited);
|
||||
LAYERS_PER_READ_GLOBAL.observe(layers_visited);
|
||||
LAYERS_PER_READ_AMORTIZED_GLOBAL.observe(avg_layers_visited);
|
||||
self.metrics.layers_per_read.observe(layers_visited as f64);
|
||||
LAYERS_PER_READ_GLOBAL.observe(layers_visited as f64);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3648,7 +3637,7 @@ impl Timeline {
|
||||
let visible_non_resident = match previous_heatmap.as_deref() {
|
||||
Some(PreviousHeatmap::Active {
|
||||
heatmap, read_at, ..
|
||||
}) => Some(heatmap.all_layers().filter_map(|hl| {
|
||||
}) => Some(heatmap.layers.iter().filter_map(|hl| {
|
||||
let desc: PersistentLayerDesc = hl.name.clone().into();
|
||||
let layer = guard.try_get_from_key(&desc.key())?;
|
||||
|
||||
@@ -3664,7 +3653,7 @@ impl Timeline {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some((desc, hl.metadata.clone(), hl.access_time, hl.cold))
|
||||
Some((desc, hl.metadata.clone(), hl.access_time))
|
||||
})),
|
||||
Some(PreviousHeatmap::Obsolete) => None,
|
||||
None => None,
|
||||
@@ -3680,7 +3669,6 @@ impl Timeline {
|
||||
layer.layer_desc().clone(),
|
||||
layer.metadata(),
|
||||
last_activity_ts,
|
||||
false, // these layers are not cold
|
||||
))
|
||||
}
|
||||
LayerVisibilityHint::Covered => {
|
||||
@@ -3707,14 +3695,12 @@ impl Timeline {
|
||||
// Sort layers in order of which to download first. For a large set of layers to download, we
|
||||
// want to prioritize those layers which are most likely to still be in the resident many minutes
|
||||
// or hours later:
|
||||
// - Cold layers go last for convenience when a human inspects the heatmap.
|
||||
// - Download L0s last, because they churn the fastest: L0s on a fast-writing tenant might
|
||||
// only exist for a few minutes before being compacted into L1s.
|
||||
// - For L1 & image layers, download most recent LSNs first: the older the LSN, the sooner
|
||||
// the layer is likely to be covered by an image layer during compaction.
|
||||
layers.sort_by_key(|(desc, _meta, _atime, cold)| {
|
||||
layers.sort_by_key(|(desc, _meta, _atime)| {
|
||||
std::cmp::Reverse((
|
||||
*cold,
|
||||
!LayerMap::is_l0(&desc.key_range, desc.is_delta),
|
||||
desc.lsn_range.end,
|
||||
))
|
||||
@@ -3722,9 +3708,7 @@ impl Timeline {
|
||||
|
||||
let layers = layers
|
||||
.into_iter()
|
||||
.map(|(desc, meta, atime, cold)| {
|
||||
HeatMapLayer::new(desc.layer_name(), meta, atime, cold)
|
||||
})
|
||||
.map(|(desc, meta, atime)| HeatMapLayer::new(desc.layer_name(), meta, atime))
|
||||
.collect();
|
||||
|
||||
Some(HeatMapTimeline::new(self.timeline_id, layers))
|
||||
@@ -3744,7 +3728,6 @@ impl Timeline {
|
||||
name: vl.layer_desc().layer_name(),
|
||||
metadata: vl.metadata(),
|
||||
access_time: now,
|
||||
cold: true,
|
||||
};
|
||||
heatmap_layers.push(hl);
|
||||
}
|
||||
@@ -7046,7 +7029,6 @@ mod tests {
|
||||
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::value::Value;
|
||||
use std::iter::Iterator;
|
||||
use tracing::Instrument;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -7060,8 +7042,8 @@ mod tests {
|
||||
use crate::tenant::{PreviousHeatmap, Timeline};
|
||||
|
||||
fn assert_heatmaps_have_same_layers(lhs: &HeatMapTimeline, rhs: &HeatMapTimeline) {
|
||||
assert_eq!(lhs.all_layers().count(), rhs.all_layers().count());
|
||||
let lhs_rhs = lhs.all_layers().zip(rhs.all_layers());
|
||||
assert_eq!(lhs.layers.len(), rhs.layers.len());
|
||||
let lhs_rhs = lhs.layers.iter().zip(rhs.layers.iter());
|
||||
for (l, r) in lhs_rhs {
|
||||
assert_eq!(l.name, r.name);
|
||||
assert_eq!(l.metadata, r.metadata);
|
||||
@@ -7139,11 +7121,10 @@ mod tests {
|
||||
assert_eq!(heatmap.timeline_id, timeline.timeline_id);
|
||||
|
||||
// L0 should come last
|
||||
let heatmap_layers = heatmap.all_layers().collect::<Vec<_>>();
|
||||
assert_eq!(heatmap_layers.last().unwrap().name, l0_delta.layer_name());
|
||||
assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name());
|
||||
|
||||
let mut last_lsn = Lsn::MAX;
|
||||
for layer in heatmap_layers {
|
||||
for layer in &heatmap.layers {
|
||||
// Covered layer should be omitted
|
||||
assert!(layer.name != covered_delta.layer_name());
|
||||
|
||||
@@ -7272,7 +7253,7 @@ mod tests {
|
||||
.expect("Infallible while timeline is not shut down");
|
||||
|
||||
// Both layers should be in the heatmap
|
||||
assert!(heatmap.all_layers().count() > 0);
|
||||
assert!(!heatmap.layers.is_empty());
|
||||
|
||||
// Now simulate a migration.
|
||||
timeline
|
||||
@@ -7298,7 +7279,7 @@ mod tests {
|
||||
.await
|
||||
.expect("Infallible while timeline is not shut down");
|
||||
|
||||
assert_eq!(post_eviction_heatmap.all_layers().count(), 0);
|
||||
assert!(post_eviction_heatmap.layers.is_empty());
|
||||
assert!(matches!(
|
||||
timeline.previous_heatmap.load().as_deref(),
|
||||
Some(PreviousHeatmap::Obsolete)
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use super::layer_manager::LayerManager;
|
||||
use super::{
|
||||
@@ -20,7 +19,6 @@ use anyhow::{Context, anyhow};
|
||||
use bytes::Bytes;
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use futures::FutureExt;
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
|
||||
@@ -444,7 +442,6 @@ impl GcCompactionQueue {
|
||||
));
|
||||
};
|
||||
let has_pending_tasks;
|
||||
let mut yield_for_l0 = false;
|
||||
let Some((id, item)) = ({
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
if let Some((id, item)) = guard.queued.pop_front() {
|
||||
@@ -494,23 +491,13 @@ impl GcCompactionQueue {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
|
||||
}
|
||||
let compaction_result =
|
||||
timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
let _ = timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
self.notify_and_unblock(id);
|
||||
if compaction_result == CompactionOutcome::YieldForL0 {
|
||||
yield_for_l0 = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => {
|
||||
// TODO: error handling, clear the queue if any task fails?
|
||||
let compaction_result = timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
if compaction_result == CompactionOutcome::YieldForL0 {
|
||||
// We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
|
||||
// again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
|
||||
// we need to clean things up before returning from the function.
|
||||
yield_for_l0 = true;
|
||||
}
|
||||
let _ = timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
}
|
||||
GcCompactionQueueItem::Notify(id, l2_lsn) => {
|
||||
self.notify_and_unblock(id);
|
||||
@@ -539,10 +526,7 @@ impl GcCompactionQueue {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.running = None;
|
||||
}
|
||||
Ok(if yield_for_l0 {
|
||||
tracing::info!("give up gc-compaction: yield for L0 compaction");
|
||||
CompactionOutcome::YieldForL0
|
||||
} else if has_pending_tasks {
|
||||
Ok(if has_pending_tasks {
|
||||
CompactionOutcome::Pending
|
||||
} else {
|
||||
CompactionOutcome::Done
|
||||
@@ -740,41 +724,17 @@ struct CompactionStatisticsNumSize {
|
||||
|
||||
#[derive(Debug, Serialize, Default)]
|
||||
pub struct CompactionStatistics {
|
||||
/// Delta layer visited (maybe compressed, physical size)
|
||||
delta_layer_visited: CompactionStatisticsNumSize,
|
||||
/// Image layer visited (maybe compressed, physical size)
|
||||
image_layer_visited: CompactionStatisticsNumSize,
|
||||
/// Delta layer produced (maybe compressed, physical size)
|
||||
delta_layer_produced: CompactionStatisticsNumSize,
|
||||
/// Image layer produced (maybe compressed, physical size)
|
||||
image_layer_produced: CompactionStatisticsNumSize,
|
||||
/// Delta layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
|
||||
delta_layer_discarded: CompactionStatisticsNumSize,
|
||||
/// Image layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
|
||||
image_layer_discarded: CompactionStatisticsNumSize,
|
||||
num_delta_layer_discarded: usize,
|
||||
num_image_layer_discarded: usize,
|
||||
num_unique_keys_visited: usize,
|
||||
/// Delta visited (uncompressed, original size)
|
||||
wal_keys_visited: CompactionStatisticsNumSize,
|
||||
/// Image visited (uncompressed, original size)
|
||||
image_keys_visited: CompactionStatisticsNumSize,
|
||||
/// Delta produced (uncompressed, original size)
|
||||
wal_produced: CompactionStatisticsNumSize,
|
||||
/// Image produced (uncompressed, original size)
|
||||
image_produced: CompactionStatisticsNumSize,
|
||||
|
||||
// Time spent in each phase
|
||||
time_acquire_lock_secs: f64,
|
||||
time_analyze_secs: f64,
|
||||
time_download_layer_secs: f64,
|
||||
time_main_loop_secs: f64,
|
||||
time_final_phase_secs: f64,
|
||||
time_total_secs: f64,
|
||||
|
||||
// Summary
|
||||
/// Ratio of the key-value size before/after gc-compaction.
|
||||
uncompressed_size_ratio: f64,
|
||||
/// Ratio of the physical size before/after gc-compaction.
|
||||
physical_size_ratio: f64,
|
||||
}
|
||||
|
||||
impl CompactionStatistics {
|
||||
@@ -824,13 +784,11 @@ impl CompactionStatistics {
|
||||
self.image_produced.num += 1;
|
||||
self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
|
||||
}
|
||||
fn discard_delta_layer(&mut self, original_size: u64) {
|
||||
self.delta_layer_discarded.num += 1;
|
||||
self.delta_layer_discarded.size += original_size;
|
||||
fn discard_delta_layer(&mut self) {
|
||||
self.num_delta_layer_discarded += 1;
|
||||
}
|
||||
fn discard_image_layer(&mut self, original_size: u64) {
|
||||
self.image_layer_discarded.num += 1;
|
||||
self.image_layer_discarded.size += original_size;
|
||||
fn discard_image_layer(&mut self) {
|
||||
self.num_image_layer_discarded += 1;
|
||||
}
|
||||
fn produce_delta_layer(&mut self, size: u64) {
|
||||
self.delta_layer_produced.num += 1;
|
||||
@@ -840,19 +798,6 @@ impl CompactionStatistics {
|
||||
self.image_layer_produced.num += 1;
|
||||
self.image_layer_produced.size += size;
|
||||
}
|
||||
fn finalize(&mut self) {
|
||||
let original_key_value_size = self.image_keys_visited.size + self.wal_keys_visited.size;
|
||||
let produced_key_value_size = self.image_produced.size + self.wal_produced.size;
|
||||
self.uncompressed_size_ratio =
|
||||
original_key_value_size as f64 / (produced_key_value_size as f64 + 1.0); // avoid div by 0
|
||||
let original_physical_size = self.image_layer_visited.size + self.delta_layer_visited.size;
|
||||
let produced_physical_size = self.image_layer_produced.size
|
||||
+ self.delta_layer_produced.size
|
||||
+ self.image_layer_discarded.size
|
||||
+ self.delta_layer_discarded.size; // Also include the discarded layers to make the ratio accurate
|
||||
self.physical_size_ratio =
|
||||
original_physical_size as f64 / (produced_physical_size as f64 + 1.0); // avoid div by 0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -2613,7 +2558,7 @@ impl Timeline {
|
||||
cancel: &CancellationToken,
|
||||
options: CompactOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactionOutcome, CompactionError> {
|
||||
) -> Result<(), CompactionError> {
|
||||
let sub_compaction = options.sub_compaction;
|
||||
let job = GcCompactJob::from_compact_options(options.clone());
|
||||
if sub_compaction {
|
||||
@@ -2635,7 +2580,7 @@ impl Timeline {
|
||||
if jobs_len == 0 {
|
||||
info!("no jobs to run, skipping gc bottom-most compaction");
|
||||
}
|
||||
return Ok(CompactionOutcome::Done);
|
||||
return Ok(());
|
||||
}
|
||||
self.compact_with_gc_inner(cancel, job, ctx).await
|
||||
}
|
||||
@@ -2645,14 +2590,11 @@ impl Timeline {
|
||||
cancel: &CancellationToken,
|
||||
job: GcCompactJob,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactionOutcome, CompactionError> {
|
||||
) -> Result<(), CompactionError> {
|
||||
// Block other compaction/GC tasks from running for now. GC-compaction could run along
|
||||
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
|
||||
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
|
||||
|
||||
let timer = Instant::now();
|
||||
let begin_timer = timer;
|
||||
|
||||
let gc_lock = async {
|
||||
tokio::select! {
|
||||
guard = self.gc_lock.lock() => Ok(guard),
|
||||
@@ -2660,9 +2602,6 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
let time_acquire_lock = timer.elapsed();
|
||||
let timer = Instant::now();
|
||||
|
||||
let gc_lock = crate::timed(
|
||||
gc_lock,
|
||||
"acquires gc lock",
|
||||
@@ -2714,7 +2653,7 @@ impl Timeline {
|
||||
tracing::warn!(
|
||||
"no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
|
||||
);
|
||||
return Ok(CompactionOutcome::Skipped);
|
||||
return Ok(());
|
||||
}
|
||||
real_gc_cutoff
|
||||
} else {
|
||||
@@ -2752,7 +2691,7 @@ impl Timeline {
|
||||
"no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}",
|
||||
gc_cutoff
|
||||
);
|
||||
return Ok(CompactionOutcome::Done);
|
||||
return Ok(());
|
||||
};
|
||||
// Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
|
||||
// the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
|
||||
@@ -2773,7 +2712,7 @@ impl Timeline {
|
||||
"no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}",
|
||||
compact_lsn_range.end
|
||||
);
|
||||
return Ok(CompactionOutcome::Done);
|
||||
return Ok(());
|
||||
};
|
||||
// Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
|
||||
// layers to compact.
|
||||
@@ -2799,7 +2738,7 @@ impl Timeline {
|
||||
"no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}",
|
||||
gc_cutoff, compact_key_range.start, compact_key_range.end
|
||||
);
|
||||
return Ok(CompactionOutcome::Done);
|
||||
return Ok(());
|
||||
}
|
||||
retain_lsns_below_horizon.sort();
|
||||
GcCompactionJobDescription {
|
||||
@@ -2852,9 +2791,6 @@ impl Timeline {
|
||||
has_data_below,
|
||||
);
|
||||
|
||||
let time_analyze = timer.elapsed();
|
||||
let timer = Instant::now();
|
||||
|
||||
for layer in &job_desc.selected_layers {
|
||||
debug!("read layer: {}", layer.layer_desc().key());
|
||||
}
|
||||
@@ -2914,15 +2850,6 @@ impl Timeline {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
let should_yield = self
|
||||
.l0_compaction_trigger
|
||||
.notified()
|
||||
.now_or_never()
|
||||
.is_some();
|
||||
if should_yield {
|
||||
tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
let resident_layer = layer
|
||||
.download_and_keep_resident(ctx)
|
||||
.await
|
||||
@@ -2966,9 +2893,6 @@ impl Timeline {
|
||||
.context("failed to create filter iterator")
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
let time_download_layer = timer.elapsed();
|
||||
let timer = Instant::now();
|
||||
|
||||
// Step 2: Produce images+deltas.
|
||||
let mut accumulated_values = Vec::new();
|
||||
let mut last_key: Option<Key> = None;
|
||||
@@ -3043,8 +2967,6 @@ impl Timeline {
|
||||
// the key and LSN range are determined. However, to keep things simple here, we still
|
||||
// create this writer, and discard the writer in the end.
|
||||
|
||||
let mut keys_processed = 0;
|
||||
|
||||
while let Some(((key, lsn, val), desc)) = merge_iter
|
||||
.next_with_trace()
|
||||
.await
|
||||
@@ -3054,18 +2976,6 @@ impl Timeline {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
keys_processed += 1;
|
||||
if keys_processed % 1000 == 0 {
|
||||
let should_yield = self
|
||||
.l0_compaction_trigger
|
||||
.notified()
|
||||
.now_or_never()
|
||||
.is_some();
|
||||
if should_yield {
|
||||
tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
}
|
||||
if self.shard_identity.is_key_disposable(&key) {
|
||||
// If this shard does not need to store this key, simply skip it.
|
||||
//
|
||||
@@ -3204,9 +3114,6 @@ impl Timeline {
|
||||
.map_err(CompactionError::Other)?;
|
||||
// end: move the above part to the loop body
|
||||
|
||||
let time_main_loop = timer.elapsed();
|
||||
let timer = Instant::now();
|
||||
|
||||
let mut rewrote_delta_layers = Vec::new();
|
||||
for (key, writers) in delta_layer_rewriters {
|
||||
if let Some(delta_writer_before) = writers.before {
|
||||
@@ -3271,13 +3178,6 @@ impl Timeline {
|
||||
let mut keep_layers = HashSet::new();
|
||||
let produced_delta_layers_len = produced_delta_layers.len();
|
||||
let produced_image_layers_len = produced_image_layers.len();
|
||||
|
||||
let layer_selection_by_key = job_desc
|
||||
.selected_layers
|
||||
.iter()
|
||||
.map(|l| (l.layer_desc().key(), l.layer_desc().clone()))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
for action in produced_delta_layers {
|
||||
match action {
|
||||
BatchWriterResult::Produced(layer) => {
|
||||
@@ -3291,16 +3191,8 @@ impl Timeline {
|
||||
if cfg!(debug_assertions) {
|
||||
info!("discarded delta layer: {}", l);
|
||||
}
|
||||
if let Some(layer_desc) = layer_selection_by_key.get(&l) {
|
||||
stat.discard_delta_layer(layer_desc.file_size());
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"discarded delta layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
|
||||
l
|
||||
);
|
||||
stat.discard_delta_layer(0);
|
||||
}
|
||||
keep_layers.insert(l);
|
||||
stat.discard_delta_layer();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3309,9 +3201,6 @@ impl Timeline {
|
||||
"produced rewritten delta layer: {}",
|
||||
layer.layer_desc().key()
|
||||
);
|
||||
// For now, we include rewritten delta layer size in the "produce_delta_layer". We could
|
||||
// make it a separate statistics in the future.
|
||||
stat.produce_delta_layer(layer.layer_desc().file_size());
|
||||
}
|
||||
compact_to.extend(rewrote_delta_layers);
|
||||
for action in produced_image_layers {
|
||||
@@ -3323,16 +3212,8 @@ impl Timeline {
|
||||
}
|
||||
BatchWriterResult::Discarded(l) => {
|
||||
debug!("discarded image layer: {}", l);
|
||||
if let Some(layer_desc) = layer_selection_by_key.get(&l) {
|
||||
stat.discard_image_layer(layer_desc.file_size());
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"discarded image layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
|
||||
l
|
||||
);
|
||||
stat.discard_image_layer(0);
|
||||
}
|
||||
keep_layers.insert(l);
|
||||
stat.discard_image_layer();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3380,16 +3261,6 @@ impl Timeline {
|
||||
|
||||
layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
|
||||
|
||||
let time_final_phase = timer.elapsed();
|
||||
|
||||
stat.time_final_phase_secs = time_final_phase.as_secs_f64();
|
||||
stat.time_main_loop_secs = time_main_loop.as_secs_f64();
|
||||
stat.time_acquire_lock_secs = time_acquire_lock.as_secs_f64();
|
||||
stat.time_download_layer_secs = time_download_layer.as_secs_f64();
|
||||
stat.time_analyze_secs = time_analyze.as_secs_f64();
|
||||
stat.time_total_secs = begin_timer.elapsed().as_secs_f64();
|
||||
stat.finalize();
|
||||
|
||||
info!(
|
||||
"gc-compaction statistics: {}",
|
||||
serde_json::to_string(&stat)
|
||||
@@ -3398,7 +3269,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
if dry_run {
|
||||
return Ok(CompactionOutcome::Done);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
@@ -3517,7 +3388,7 @@ impl Timeline {
|
||||
|
||||
drop(gc_lock);
|
||||
|
||||
Ok(CompactionOutcome::Done)
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -61,11 +61,11 @@ impl HeatmapLayersDownloader {
|
||||
|
||||
tracing::info!(
|
||||
resident_size=%timeline.resident_physical_size(),
|
||||
heatmap_layers=%heatmap.all_layers().count(),
|
||||
heatmap_layers=%heatmap.layers.len(),
|
||||
"Starting heatmap layers download"
|
||||
);
|
||||
|
||||
let stream = futures::stream::iter(heatmap.all_layers().cloned().filter_map(
|
||||
let stream = futures::stream::iter(heatmap.layers.into_iter().filter_map(
|
||||
|layer| {
|
||||
let ctx = ctx.attached_child();
|
||||
let tl = timeline.clone();
|
||||
|
||||
8
poetry.lock
generated
8
poetry.lock
generated
@@ -1414,14 +1414,14 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "jinja2"
|
||||
version = "3.1.6"
|
||||
version = "3.1.5"
|
||||
description = "A very fast and expressive template engine."
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67"},
|
||||
{file = "jinja2-3.1.6.tar.gz", hash = "sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d"},
|
||||
{file = "jinja2-3.1.5-py3-none-any.whl", hash = "sha256:aba0f4dc9ed8013c424088f68a5c226f7d6097ed89b246d7749c2ec4175c6adb"},
|
||||
{file = "jinja2-3.1.5.tar.gz", hash = "sha256:8fefff8dc3034e27bb80d67c671eb8a9bc424c0ef4c0826edbff304cceff43bb"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -3820,4 +3820,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "010ffce959bb256880ab5a267048c182e4612b3151f9a94e3bf5d3a7807962fe"
|
||||
content-hash = "9711c5479c867fa614ce3d352f1bbc63dba1cb2376d347f96fbeda6f512ee308"
|
||||
|
||||
@@ -286,16 +286,17 @@ where
|
||||
|
||||
/// Registers a SpanFields instance as span extension.
|
||||
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
|
||||
let csid = self.callsite_id(attrs.metadata().callsite());
|
||||
let span = ctx.span(id).expect("span must exist");
|
||||
let fields = SpanFields::default();
|
||||
fields.record_fields(attrs);
|
||||
|
||||
// This could deadlock when there's a panic somewhere in the tracing
|
||||
// event handling and a read or write guard is still held. This includes
|
||||
// the OTel subscriber.
|
||||
let mut exts = span.extensions_mut();
|
||||
|
||||
exts.insert(fields);
|
||||
exts.insert(csid);
|
||||
}
|
||||
|
||||
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
|
||||
@@ -564,13 +565,6 @@ impl EventFormatter {
|
||||
)?;
|
||||
}
|
||||
|
||||
let spans = SerializableSpans {
|
||||
ctx,
|
||||
callsite_ids,
|
||||
extract: ExtractedSpanFields::<'_, F>::new(extract_fields),
|
||||
};
|
||||
serializer.serialize_entry("spans", &spans)?;
|
||||
|
||||
// TODO: thread-local cache?
|
||||
let pid = std::process::id();
|
||||
// Skip adding pid 1 to reduce noise for services running in containers.
|
||||
@@ -620,9 +614,15 @@ impl EventFormatter {
|
||||
}
|
||||
}
|
||||
|
||||
if spans.extract.has_values() {
|
||||
// TODO: add fields from event, too?
|
||||
serializer.serialize_entry("extract", &spans.extract)?;
|
||||
let stack = SerializableSpans {
|
||||
ctx,
|
||||
callsite_ids,
|
||||
fields: ExtractedSpanFields::<'_, F>::new(extract_fields),
|
||||
};
|
||||
serializer.serialize_entry("spans", &stack)?;
|
||||
|
||||
if stack.fields.has_values() {
|
||||
serializer.serialize_entry("extract", &stack.fields)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
@@ -911,7 +911,7 @@ where
|
||||
{
|
||||
ctx: &'a Context<'ctx, Span>,
|
||||
callsite_ids: &'a papaya::HashMap<callsite::Identifier, CallsiteId>,
|
||||
extract: ExtractedSpanFields<'a, F>,
|
||||
fields: ExtractedSpanFields<'a, F>,
|
||||
}
|
||||
|
||||
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
|
||||
@@ -940,7 +940,7 @@ where
|
||||
|
||||
serializer.serialize_value(&SerializableSpanFields {
|
||||
span: &span,
|
||||
extract: &self.extract,
|
||||
fields: &self.fields,
|
||||
})?;
|
||||
}
|
||||
}
|
||||
@@ -955,7 +955,7 @@ where
|
||||
Span: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
span: &'a SpanRef<'span, Span>,
|
||||
extract: &'a ExtractedSpanFields<'a, F>,
|
||||
fields: &'a ExtractedSpanFields<'a, F>,
|
||||
}
|
||||
|
||||
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
|
||||
@@ -973,7 +973,7 @@ where
|
||||
for (name, value) in &data.fields.pin() {
|
||||
serializer.serialize_entry(name, value)?;
|
||||
// TODO: replace clone with reference, if possible.
|
||||
self.extract.set(name, value.clone());
|
||||
self.fields.set(name, value.clone());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ requests = "^2.32.3"
|
||||
pytest-xdist = "^3.3.1"
|
||||
asyncpg = "^0.30.0"
|
||||
aiopg = "^1.4.0"
|
||||
Jinja2 = "^3.1.6"
|
||||
Jinja2 = "^3.1.5"
|
||||
types-requests = "^2.31.0.0"
|
||||
types-psycopg2 = "^2.9.21.20241019"
|
||||
boto3 = "^1.34.11"
|
||||
|
||||
@@ -430,10 +430,7 @@ impl InterpretedWalReader {
|
||||
.with_context(|| "Failed to interpret WAL")?;
|
||||
|
||||
for (shard, record) in interpreted {
|
||||
// Shard zero needs to track the start LSN of the latest record
|
||||
// in adition to the LSN of the next record to ingest. The former
|
||||
// is included in basebackup persisted by the compute in WAL.
|
||||
if !shard.is_shard_zero() && record.is_empty() {
|
||||
if record.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -743,7 +740,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None)
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
@@ -886,16 +883,10 @@ mod tests {
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let mut next_record_lsns = Vec::default();
|
||||
let end_watch = Env::write_wal(
|
||||
tli,
|
||||
start_lsn,
|
||||
SIZE,
|
||||
MSG_COUNT,
|
||||
c"neon-file:",
|
||||
Some(&mut next_record_lsns),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_watch =
|
||||
Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
|
||||
let streaming_wal_reader = StreamingWalReader::new(
|
||||
@@ -1036,16 +1027,10 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let end_watch = Env::write_wal(
|
||||
tli,
|
||||
start_lsn,
|
||||
SIZE,
|
||||
MSG_COUNT,
|
||||
c"neon-file:",
|
||||
Some(&mut next_record_lsns),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_watch =
|
||||
Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(next_record_lsns.len() > 3);
|
||||
let shard_0_start_lsn = next_record_lsns[3];
|
||||
@@ -1139,88 +1124,4 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_shard_zero_does_not_skip_empty_records() {
|
||||
let _ = env_logger::builder().is_test(true).try_init();
|
||||
|
||||
const SIZE: usize = 8 * 1024;
|
||||
const MSG_COUNT: usize = 10;
|
||||
const PG_VERSION: u32 = 17;
|
||||
|
||||
let start_lsn = Lsn::from_str("0/149FD18").unwrap();
|
||||
let env = Env::new(true).unwrap();
|
||||
let tli = env
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let mut next_record_lsns = Vec::new();
|
||||
let end_watch = Env::write_wal(
|
||||
tli,
|
||||
start_lsn,
|
||||
SIZE,
|
||||
MSG_COUNT,
|
||||
// This is a logical message prefix that is not persisted to key value storage.
|
||||
// We use it in order to validate that shard zero receives emtpy interpreted records.
|
||||
c"test:",
|
||||
Some(&mut next_record_lsns),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
|
||||
let streaming_wal_reader = StreamingWalReader::new(
|
||||
resident_tli,
|
||||
None,
|
||||
start_lsn,
|
||||
end_pos,
|
||||
end_watch,
|
||||
MAX_SEND_SIZE,
|
||||
);
|
||||
|
||||
let shard = ShardIdentity::unsharded();
|
||||
let (records_tx, mut records_rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
|
||||
|
||||
let handle = InterpretedWalReader::spawn(
|
||||
streaming_wal_reader,
|
||||
start_lsn,
|
||||
records_tx,
|
||||
shard,
|
||||
PG_VERSION,
|
||||
&Some("pageserver".to_string()),
|
||||
);
|
||||
|
||||
let mut interpreted_records = Vec::new();
|
||||
while let Some(batch) = records_rx.recv().await {
|
||||
interpreted_records.push(batch.records);
|
||||
if batch.wal_end_lsn == batch.available_wal_end_lsn {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let received_next_record_lsns = interpreted_records
|
||||
.into_iter()
|
||||
.flat_map(|b| b.records)
|
||||
.map(|rec| rec.next_record_lsn)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// By default this also includes the start LSN. Trim it since it shouldn't be received.
|
||||
let next_record_lsns = next_record_lsns.into_iter().skip(1).collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(received_next_record_lsns, next_record_lsns);
|
||||
|
||||
handle.abort();
|
||||
let mut done = false;
|
||||
for _ in 0..5 {
|
||||
if handle.current_position().is_none() {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(1)).await;
|
||||
}
|
||||
|
||||
assert!(done);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use std::ffi::CStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
@@ -125,7 +124,6 @@ impl Env {
|
||||
start_lsn: Lsn,
|
||||
msg_size: usize,
|
||||
msg_count: usize,
|
||||
prefix: &CStr,
|
||||
mut next_record_lsns: Option<&mut Vec<Lsn>>,
|
||||
) -> anyhow::Result<EndWatch> {
|
||||
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
|
||||
@@ -135,6 +133,7 @@ impl Env {
|
||||
|
||||
WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
|
||||
|
||||
let prefix = c"neon-file:";
|
||||
let prefixlen = prefix.to_bytes_with_nul().len();
|
||||
assert!(msg_size >= prefixlen);
|
||||
let message = vec![0; msg_size - prefixlen];
|
||||
|
||||
@@ -246,7 +246,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None)
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
|
||||
@@ -1174,6 +1174,15 @@ class NeonEnv:
|
||||
"max_batch_size": 32,
|
||||
}
|
||||
|
||||
if config.test_may_use_compatibility_snapshot_binaries:
|
||||
log.info(
|
||||
"Skipping prev heatmap settings to avoid forward-compatibility related test failures"
|
||||
)
|
||||
else:
|
||||
# Look for gaps in WAL received from safekeepeers
|
||||
ps_cfg["load_previous_heatmap"] = True
|
||||
ps_cfg["generate_unarchival_heatmap"] = True
|
||||
|
||||
get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io
|
||||
if get_vectored_concurrent_io is not None:
|
||||
ps_cfg["get_vectored_concurrent_io"] = {
|
||||
|
||||
@@ -375,19 +375,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/reset", params=params)
|
||||
self.verbose_error(res)
|
||||
|
||||
def timeline_patch_index_part(
|
||||
self,
|
||||
tenant_id: TenantId | TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
data: dict[str, Any],
|
||||
):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/patch_index_part",
|
||||
json=data,
|
||||
)
|
||||
self.verbose_error(res)
|
||||
return res.json()
|
||||
|
||||
def tenant_location_conf(
|
||||
self,
|
||||
tenant_id: TenantId | TenantShardId,
|
||||
|
||||
@@ -5,59 +5,34 @@ import logging
|
||||
import requests
|
||||
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
|
||||
|
||||
TEST_ROLE_NAMES = [
|
||||
{"name": "neondb_owner"},
|
||||
{"name": "role with spaces"},
|
||||
{"name": "role with%20spaces "},
|
||||
{"name": "role with whitespaces "},
|
||||
{"name": "injective role with spaces'; SELECT pg_sleep(1000);"},
|
||||
{"name": "role with #pound-sign and &ersands=true"},
|
||||
{"name": "role with emoji 🌍"},
|
||||
{"name": "role \";with ';injections $$ $x$ $ %I !/\\&#@"},
|
||||
{"name": '"role in double quotes"'},
|
||||
{"name": "'role in single quotes'"},
|
||||
]
|
||||
|
||||
TEST_DB_NAMES = [
|
||||
{
|
||||
"name": "neondb",
|
||||
"owner": "neondb_owner",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": "db with spaces",
|
||||
"owner": "role with spaces",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": "db with%20spaces ",
|
||||
"owner": "role with%20spaces ",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": "db with whitespaces ",
|
||||
"owner": "role with whitespaces ",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": "injective db with spaces'; SELECT pg_sleep(1000);",
|
||||
"owner": "injective role with spaces'; SELECT pg_sleep(1000);",
|
||||
"name": "injective db with spaces'; SELECT pg_sleep(10);",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": "db with #pound-sign and &ersands=true",
|
||||
"owner": "role with #pound-sign and &ersands=true",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": "db with emoji 🌍",
|
||||
"owner": "role with emoji 🌍",
|
||||
},
|
||||
{
|
||||
"name": "db \";with ';injections $$ $x$ $ %I !/\\&#@",
|
||||
"owner": "role \";with ';injections $$ $x$ $ %I !/\\&#@",
|
||||
},
|
||||
{
|
||||
"name": '"db in double quotes"',
|
||||
"owner": '"role in double quotes"',
|
||||
},
|
||||
{
|
||||
"name": "'db in single quotes'",
|
||||
"owner": "'role in single quotes'",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
]
|
||||
|
||||
@@ -77,7 +52,6 @@ def test_compute_catalog(neon_simple_env: NeonEnv):
|
||||
**{
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": TEST_ROLE_NAMES,
|
||||
"databases": TEST_DB_NAMES,
|
||||
},
|
||||
}
|
||||
@@ -125,10 +99,10 @@ def test_compute_catalog(neon_simple_env: NeonEnv):
|
||||
), f"Expected 404 status code, but got {e.response.status_code}"
|
||||
|
||||
|
||||
def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
def test_compute_create_databases(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that compute_ctl can create and work with databases and roles
|
||||
with special characters (whitespaces, %, tabs, etc.) in the name.
|
||||
Test that compute_ctl can create and work with databases with special
|
||||
characters (whitespaces, %, tabs, etc.) in the name.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
@@ -142,7 +116,6 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
**{
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": TEST_ROLE_NAMES,
|
||||
"databases": TEST_DB_NAMES,
|
||||
},
|
||||
}
|
||||
@@ -166,43 +139,6 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
assert len(curr_db) == 1
|
||||
assert curr_db[0] == db["name"]
|
||||
|
||||
for role in TEST_ROLE_NAMES:
|
||||
with endpoint.cursor() as cursor:
|
||||
cursor.execute("SELECT rolname FROM pg_roles WHERE rolname = %s", (role["name"],))
|
||||
catalog_role = cursor.fetchone()
|
||||
assert catalog_role is not None
|
||||
assert catalog_role[0] == role["name"]
|
||||
|
||||
delta_operations = []
|
||||
for db in TEST_DB_NAMES:
|
||||
delta_operations.append({"action": "delete_db", "name": db["name"]})
|
||||
for role in TEST_ROLE_NAMES:
|
||||
delta_operations.append({"action": "delete_role", "name": role["name"]})
|
||||
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": [],
|
||||
"databases": [],
|
||||
},
|
||||
"delta_operations": delta_operations,
|
||||
}
|
||||
)
|
||||
endpoint.reconfigure()
|
||||
|
||||
for db in TEST_DB_NAMES:
|
||||
with endpoint.cursor() as cursor:
|
||||
cursor.execute("SELECT datname FROM pg_database WHERE datname = %s", (db["name"],))
|
||||
catalog_db = cursor.fetchone()
|
||||
assert catalog_db is None
|
||||
|
||||
for role in TEST_ROLE_NAMES:
|
||||
with endpoint.cursor() as cursor:
|
||||
cursor.execute("SELECT rolname FROM pg_roles WHERE rolname = %s", (role["name"],))
|
||||
catalog_role = cursor.fetchone()
|
||||
assert catalog_role is None
|
||||
|
||||
|
||||
def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
@@ -214,19 +150,17 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
|
||||
# stuff into the spec.json file.
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
SUB_DB_NAME = "';subscriber_db $$ $x$ $;"
|
||||
PUB_DB_NAME = "publisher_db"
|
||||
TEST_DB_NAMES = [
|
||||
{
|
||||
"name": "neondb",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": SUB_DB_NAME,
|
||||
"name": "subscriber_db",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": PUB_DB_NAME,
|
||||
"name": "publisher_db",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
]
|
||||
@@ -243,47 +177,47 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
|
||||
)
|
||||
endpoint.reconfigure()
|
||||
|
||||
# Connect to the PUB_DB_NAME and create a publication
|
||||
with endpoint.cursor(dbname=PUB_DB_NAME) as cursor:
|
||||
# connect to the publisher_db and create a publication
|
||||
with endpoint.cursor(dbname="publisher_db") as cursor:
|
||||
cursor.execute("CREATE PUBLICATION mypub FOR ALL TABLES")
|
||||
cursor.execute("select pg_catalog.pg_create_logical_replication_slot('mysub', 'pgoutput');")
|
||||
cursor.execute("CREATE TABLE t(a int)")
|
||||
cursor.execute("INSERT INTO t VALUES (1)")
|
||||
cursor.execute("CHECKPOINT")
|
||||
|
||||
# Connect to the SUB_DB_NAME and create a subscription
|
||||
# Note that we need to create subscription with the following connstr:
|
||||
connstr = endpoint.connstr(dbname=PUB_DB_NAME).replace("'", "''")
|
||||
with endpoint.cursor(dbname=SUB_DB_NAME) as cursor:
|
||||
# connect to the subscriber_db and create a subscription
|
||||
# Note that we need to create subscription with
|
||||
connstr = endpoint.connstr(dbname="publisher_db").replace("'", "''")
|
||||
with endpoint.cursor(dbname="subscriber_db") as cursor:
|
||||
cursor.execute("CREATE TABLE t(a int)")
|
||||
cursor.execute(
|
||||
f"CREATE SUBSCRIPTION mysub CONNECTION '{connstr}' PUBLICATION mypub WITH (create_slot = false) "
|
||||
f"CREATE SUBSCRIPTION mysub CONNECTION '{connstr}' PUBLICATION mypub WITH (create_slot = false) "
|
||||
)
|
||||
|
||||
# Wait for the subscription to be active
|
||||
# wait for the subscription to be active
|
||||
logical_replication_sync(
|
||||
endpoint,
|
||||
endpoint,
|
||||
"mysub",
|
||||
sub_dbname=SUB_DB_NAME,
|
||||
pub_dbname=PUB_DB_NAME,
|
||||
sub_dbname="subscriber_db",
|
||||
pub_dbname="publisher_db",
|
||||
)
|
||||
|
||||
# Check that replication is working
|
||||
with endpoint.cursor(dbname=SUB_DB_NAME) as cursor:
|
||||
with endpoint.cursor(dbname="subscriber_db") as cursor:
|
||||
cursor.execute("SELECT * FROM t")
|
||||
rows = cursor.fetchall()
|
||||
assert len(rows) == 1
|
||||
assert rows[0][0] == 1
|
||||
|
||||
# Drop the SUB_DB_NAME from the list
|
||||
# drop the subscriber_db from the list
|
||||
TEST_DB_NAMES_NEW = [
|
||||
{
|
||||
"name": "neondb",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": PUB_DB_NAME,
|
||||
"name": "publisher_db",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
]
|
||||
@@ -296,7 +230,7 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
|
||||
"databases": TEST_DB_NAMES_NEW,
|
||||
},
|
||||
"delta_operations": [
|
||||
{"action": "delete_db", "name": SUB_DB_NAME},
|
||||
{"action": "delete_db", "name": "subscriber_db"},
|
||||
# also test the case when we try to delete a non-existent database
|
||||
# shouldn't happen in normal operation,
|
||||
# but can occur when failed operations are retried
|
||||
@@ -305,22 +239,22 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
|
||||
}
|
||||
)
|
||||
|
||||
logging.info(f"Reconfiguring the endpoint to drop the {SUB_DB_NAME} database")
|
||||
logging.info("Reconfiguring the endpoint to drop the subscriber_db")
|
||||
endpoint.reconfigure()
|
||||
|
||||
# Check that the SUB_DB_NAME is dropped
|
||||
# Check that the subscriber_db is dropped
|
||||
with endpoint.cursor() as cursor:
|
||||
cursor.execute("SELECT datname FROM pg_database WHERE datname = %s", (SUB_DB_NAME,))
|
||||
cursor.execute("SELECT datname FROM pg_database WHERE datname = %s", ("subscriber_db",))
|
||||
catalog_db = cursor.fetchone()
|
||||
assert catalog_db is None
|
||||
|
||||
# Check that we can still connect to the PUB_DB_NAME
|
||||
with endpoint.cursor(dbname=PUB_DB_NAME) as cursor:
|
||||
# Check that we can still connect to the publisher_db
|
||||
with endpoint.cursor(dbname="publisher_db") as cursor:
|
||||
cursor.execute("SELECT * FROM current_database()")
|
||||
curr_db = cursor.fetchone()
|
||||
assert curr_db is not None
|
||||
assert len(curr_db) == 1
|
||||
assert curr_db[0] == PUB_DB_NAME
|
||||
assert curr_db[0] == "publisher_db"
|
||||
|
||||
|
||||
def test_compute_drop_role(neon_simple_env: NeonEnv):
|
||||
@@ -331,7 +265,6 @@ def test_compute_drop_role(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
env = neon_simple_env
|
||||
TEST_DB_NAME = "db_with_permissions"
|
||||
TEST_GRANTEE = "'); MALFORMED SQL $$ $x$ $/;5%$ %I"
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -368,18 +301,16 @@ def test_compute_drop_role(neon_simple_env: NeonEnv):
|
||||
cursor.execute("create view test_view as select * from test_table")
|
||||
|
||||
with endpoint.cursor(dbname=TEST_DB_NAME, user="neon") as cursor:
|
||||
cursor.execute(f'create role "{TEST_GRANTEE}"')
|
||||
cursor.execute("create role readonly")
|
||||
# We (`compute_ctl`) make 'neon' the owner of schema 'public' in the owned database.
|
||||
# Postgres has all sorts of permissions and grants that we may not handle well,
|
||||
# but this is the shortest repro grant for the issue
|
||||
# https://github.com/neondatabase/cloud/issues/13582
|
||||
cursor.execute(f'grant select on all tables in schema public to "{TEST_GRANTEE}"')
|
||||
cursor.execute("grant select on all tables in schema public to readonly")
|
||||
|
||||
# Check that role was created
|
||||
with endpoint.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"SELECT rolname FROM pg_roles WHERE rolname = %(role)s", {"role": TEST_GRANTEE}
|
||||
)
|
||||
cursor.execute("SELECT rolname FROM pg_roles WHERE rolname = 'readonly'")
|
||||
role = cursor.fetchone()
|
||||
assert role is not None
|
||||
|
||||
@@ -387,8 +318,7 @@ def test_compute_drop_role(neon_simple_env: NeonEnv):
|
||||
# that may block our ability to drop the role.
|
||||
with endpoint.cursor(dbname=TEST_DB_NAME) as cursor:
|
||||
cursor.execute(
|
||||
"select grantor from information_schema.role_table_grants where grantee = %(grantee)s",
|
||||
{"grantee": TEST_GRANTEE},
|
||||
"select grantor from information_schema.role_table_grants where grantee = 'readonly'"
|
||||
)
|
||||
res = cursor.fetchall()
|
||||
assert len(res) == 2, f"Expected 2 table grants, got {len(res)}"
|
||||
@@ -402,7 +332,7 @@ def test_compute_drop_role(neon_simple_env: NeonEnv):
|
||||
"delta_operations": [
|
||||
{
|
||||
"action": "delete_role",
|
||||
"name": TEST_GRANTEE,
|
||||
"name": "readonly",
|
||||
},
|
||||
],
|
||||
}
|
||||
@@ -411,9 +341,7 @@ def test_compute_drop_role(neon_simple_env: NeonEnv):
|
||||
|
||||
# Check that role is dropped
|
||||
with endpoint.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"SELECT rolname FROM pg_roles WHERE rolname = %(role)s", {"role": TEST_GRANTEE}
|
||||
)
|
||||
cursor.execute("SELECT rolname FROM pg_roles WHERE rolname = 'readonly'")
|
||||
role = cursor.fetchone()
|
||||
assert role is None
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.utils import run_only_on_default_postgres, wait_until
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
def check_client(env: NeonEnv, client: PageserverHttpClient):
|
||||
@@ -138,25 +138,3 @@ def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilde
|
||||
|
||||
with env.pageserver.http_client(auth_token=pageserver_token) as client:
|
||||
check_client(env, client)
|
||||
|
||||
|
||||
@run_only_on_default_postgres("it does not use any postgres functionality")
|
||||
def test_pageserver_http_index_part_force_patch(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
with env.pageserver.http_client() as client:
|
||||
client.timeline_patch_index_part(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
{"rel_size_migration": "migrating"},
|
||||
)
|
||||
assert client.timeline_detail(tenant_id, timeline_id)["rel_size_migration"] == "migrating"
|
||||
# This is invalid in practice: we should never rollback the migrating state to legacy.
|
||||
# But we do it here to test the API.
|
||||
client.timeline_patch_index_part(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
{"rel_size_migration": "legacy"},
|
||||
)
|
||||
assert client.timeline_detail(tenant_id, timeline_id)["rel_size_migration"] == "legacy"
|
||||
|
||||
@@ -955,17 +955,6 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
raise RuntimeError(f"No heatmap for timeline: {tlid}")
|
||||
|
||||
def count_timeline_heatmap_layers(tlid) -> tuple[int, int]:
|
||||
cold, hot = 0, 0
|
||||
layers = timeline_heatmap(tlid)["layers"]
|
||||
for layer in layers:
|
||||
if layer["cold"]:
|
||||
cold += 1
|
||||
else:
|
||||
hot += 1
|
||||
|
||||
return cold, hot
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*Timed out.*downloading layers.*",
|
||||
@@ -999,19 +988,13 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id
|
||||
)
|
||||
|
||||
def all_layers_downloaded(node, expected_layer_count: int):
|
||||
local_layers_count = len(node.list_layers(tenant_id, timeline_id))
|
||||
def all_layers_downloaded(expected_layer_count: int):
|
||||
local_layers_count = len(ps_secondary.list_layers(tenant_id, timeline_id))
|
||||
|
||||
log.info(f"{local_layers_count=} {after_migration_heatmap_layers_count=}")
|
||||
assert local_layers_count >= expected_layer_count
|
||||
|
||||
def no_layers_downloaded(node):
|
||||
local_layers_count = len(node.list_layers(tenant_id, timeline_id))
|
||||
|
||||
log.info(f"{local_layers_count=} {after_migration_heatmap_layers_count=}")
|
||||
assert local_layers_count == 0
|
||||
|
||||
wait_until(lambda: all_layers_downloaded(ps_secondary, after_migration_heatmap_layers_count))
|
||||
wait_until(lambda: all_layers_downloaded(after_migration_heatmap_layers_count))
|
||||
|
||||
# Read everything and make sure that we're not downloading anything extra.
|
||||
# All hot layers should be available locally now.
|
||||
@@ -1064,35 +1047,13 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
wait_until(lambda: check_archival_state(TimelineArchivalState.UNARCHIVED, child_timeline_id))
|
||||
|
||||
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
|
||||
log.info(f"Parent timeline heatmap size: {len(timeline_heatmap(timeline_id)['layers'])}")
|
||||
log.info(f"Child timeline heatmap size: {len(timeline_heatmap(child_timeline_id)['layers'])}")
|
||||
|
||||
parent_cold, parent_hot = count_timeline_heatmap_layers(timeline_id)
|
||||
child_cold, child_hot = count_timeline_heatmap_layers(child_timeline_id)
|
||||
|
||||
log.info(f"Parent timeline heatmap size: cold={parent_cold}, hot={parent_hot}")
|
||||
log.info(f"Child timeline heatmap size: cold={child_cold}, hot={child_hot}")
|
||||
|
||||
# All layers in the heatmap should come from the generation on unarchival.
|
||||
# Hence, they should be cold.
|
||||
assert parent_cold > 0
|
||||
assert parent_hot == 0
|
||||
|
||||
expected_locally = parent_cold
|
||||
expected_locally = len(timeline_heatmap(timeline_id)["layers"])
|
||||
assert expected_locally > 0
|
||||
|
||||
env.storage_controller.download_heatmap_layers(
|
||||
TenantShardId(tenant_id, shard_number=0, shard_count=0), child_timeline_id, recurse=True
|
||||
)
|
||||
wait_until(lambda: all_layers_downloaded(ps_secondary, expected_locally))
|
||||
|
||||
for ps in env.pageservers:
|
||||
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "off")])
|
||||
|
||||
# The uploaded heatmap is still empty. Clean up all layers on the secondary.
|
||||
ps_attached.http_client().tenant_secondary_download(tenant_id, wait_ms=100)
|
||||
wait_until(lambda: no_layers_downloaded(ps_attached))
|
||||
|
||||
# Upload a new heatmap. The previously cold layers become hot since they're now resident.
|
||||
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
|
||||
|
||||
# Warm up the current secondary.
|
||||
ps_attached.http_client().tenant_secondary_download(tenant_id, wait_ms=100)
|
||||
wait_until(lambda: all_layers_downloaded(ps_secondary, expected_locally))
|
||||
wait_until(lambda: all_layers_downloaded(expected_locally))
|
||||
|
||||
Reference in New Issue
Block a user