Compare commits

..

15 Commits

Author SHA1 Message Date
Alex Chi Z
5a48aafccb fix(pageserver): tx abort test flaky
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-01 11:25:38 -07:00
Alex Chi Z.
5ec8881c0b feat(pageserver): resolve feature flag based on remote size (#12400)
## Problem

Part of #11813 

## Summary of changes

* Compute tenant remote size in the housekeeping loop.
* Add a new `TenantFeatureResolver` struct to cache the tenant-specific
properties.
* Evaluate feature flag based on the remote size.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-01 18:11:24 +00:00
Alex Chi Z.
b254dce8a1 feat(pageserver): report compaction progress (#12401)
## Problem

close https://github.com/neondatabase/neon/issues/11528

## Summary of changes

Gives us better observability of compaction progress.

- Image creation: num of partition processed / total partition
- Gc-compaction: index of the in the queue / total items for a full
compaction
- Shard ancestor compaction: layers to rewrite / total layers

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-01 17:00:27 +00:00
Alex Chi Z.
3815e3b2b5 feat(pageserver): reduce lock contention in l0 compaction (#12360)
## Problem

L0 compaction currently holds the read lock for a long region while it
doesn't need to.

## Summary of changes

This patch reduces the one long contention region into 2 short ones:
gather the layers to compact at the beginning, and several short read
locks when querying the image coverage.

Co-Authored-By: Chen Luo

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-01 16:58:41 +00:00
Suhas Thalanki
bbcd70eab3 Dynamic Masking Support for anon v2 (#11733)
## Problem

This PR works on adding dynamic masking support for `anon` v2. It
currently only supports static masking.

## Summary of changes

Added a security definer function that sets the dynamic masking guc to
`true` with superuser permissions.
Added a security definer function that adds `anon` to
`session_preload_libraries` if it's not already present.

Related to: https://github.com/neondatabase/cloud/issues/20456
2025-07-01 16:50:27 +00:00
Suhas Thalanki
0934ce9bce compute: metrics for autovacuum (mxid, postgres) (#12294)
## Problem

Currently we do not have metrics for autovacuum.

## Summary of changes

Added a metric that extracts the top 5 DBs with oldest mxid and frozen
xid. Tables that were vacuumed recently should have younger value (or
younger age).

Related Issue: https://github.com/neondatabase/cloud/issues/27296
2025-07-01 15:33:23 +00:00
Conrad Ludgate
4932963bac [proxy]: dont log user errors from postgres (#12412)
## Problem

#8843 

User initiated sql queries are being classified as "postgres" errors,
whereas they're really user errors.

## Summary of changes

Classify user-initiated postgres errors as user errors if they are
related to a sql query that we ran on their behalf. Do not log those
errors.
2025-07-01 13:03:34 +00:00
Lassi Pölönen
6d73cfa608 Support audit syslog over TLS (#12124)
Add support to transport syslogs over TLS. Since TLS params essentially
require passing host and port separately, add a boolean flag to the
configuration template and also use the same `action` format for
plaintext logs. This allows seamless transition.

The plaintext host:port is picked from `AUDIT_LOGGING_ENDPOINT` (as
earlier) and from `AUDIT_LOGGING_TLS_ENDPOINT`. The TLS host:port is
used when defined and non-empty.

`remote_endpoint` is split separately to hostname and port as required
by `omfwd` module.

Also the address parsing and config content generation are split to more
testable functions with basic tests added.
2025-07-01 12:53:46 +00:00
Dmitrii Kovalkov
d2d9946bab tests: override safekeeper ports in storcon DB (#12410)
## Problem
We persist safekeeper host/port in the storcon DB after
https://github.com/neondatabase/neon/pull/11712, so the storcon fails to
ping safekeepers in the compatibility tests, where we start the cluster
from the snapshot.

PR also adds some small code improvements related to the test failure.

- Closes: https://github.com/neondatabase/neon/issues/12339

## Summary of changes
- Update safekeeper ports in the storcon DB when starting the neon from
the dir (snapshot)
- Fail the response on all not-success codes (e.g. 3xx). Should not
happen, but just to be more safe.
- Add `neon_previous/` to .gitignore to make it easier to run compat
tests.
- Add missing EXPORT to the instruction for running compat tests
2025-07-01 12:47:16 +00:00
Trung Dinh
daa402f35a pageserver: Make ImageLayerWriter sync, infallible and lazy (#12403)
## Problem

## Summary of changes
Make ImageLayerWriter sync, infallible and lazy.


Address https://github.com/neondatabase/neon/issues/12389.

All unit tests passed.
2025-07-01 09:53:11 +00:00
Suhas Thalanki
5f3532970e [compute] fix: background worker that collects installed extension metrics now updates collection interval (#12277)
## Problem

Previously, the background worker that collects the list of installed
extensions across DBs had a timeout set to 1 hour. This cause a problem
with computes that had a `suspend_timeout` > 1 hour as this collection
was treated as activity, preventing compute shutdown.

Issue: https://github.com/neondatabase/cloud/issues/30147

## Summary of changes

Passing the `suspend_timeout` as part of the `ComputeSpec` so that any
updates to this are taken into account by the background worker and
updates its collection interval.
2025-06-30 22:12:37 +00:00
Arpad Müller
2e681e0ef8 detach_ancestor: delete the right layer when hardlink fails (#12397)
If a hardlink operation inside `detach_ancestor` fails due to the layer
already existing, we delete the layer to make sure the source is one we
know about, and then retry.

But we deleted the wrong file, namely, the one we wanted to use as the
source of the hardlink. As a result, the follow up hard link operation
failed. Our PR corrects this mistake.
2025-06-30 21:36:15 +00:00
Dmitrii Kovalkov
8e216a3a59 storcon: notify cplane on safekeeper membership change (#12390)
## Problem
We don't notify cplane about safekeeper membership change yet. Without
the notification the compute needs to know all the safekeepers on the
cluster to be able to speak to them. Change notifications will allow to
avoid it.

- Closes: https://github.com/neondatabase/neon/issues/12188

## Summary of changes
- Implement `notify_safekeepers` method in `ComputeHook`
- Notify cplane about safekeepers in `safekeeper_migrate` handler.
- Update the test to make sure notifications work.

## Out of scope
- There is `cplane_notified_generation` field in `timelines` table in
strocon's database. It's not needed now, so it's not updated in the PR.
Probably we can remove it.
- e2e tests to make sure it works with a production cplane
2025-06-30 14:09:50 +00:00
Erik Grinaker
d0a4ae3e8f pageserver: add gRPC LSN lease support (#12384)
## Problem

The gRPC API does not provide LSN leases.

## Summary of changes

* Add LSN lease support to the gRPC API.
* Use gRPC LSN leases for static computes with `grpc://` connstrings.
* Move `PageserverProtocol` into the `compute_api::spec` module and
reuse it.
2025-06-30 12:44:17 +00:00
Erik Grinaker
a384d7d501 pageserver: assert no changes to shard identity (#12379)
## Problem

Location config changes can currently result in changes to the shard
identity. Such changes will cause data corruption, as seen with #12217.

Resolves #12227.
Requires #12377.

## Summary of changes

Assert that the shard identity does not change on location config
updates and on (re)attach.

This is currently asserted with `critical!`, in case it misfires in
production. Later, we should reject such requests with an error and turn
this into a proper assertion.
2025-06-30 12:36:45 +00:00
58 changed files with 1548 additions and 518 deletions

1
.gitignore vendored
View File

@@ -6,6 +6,7 @@
/tmp_check_cli
__pycache__/
test_output/
neon_previous/
.vscode
.idea
*.swp

21
Cargo.lock generated
View File

@@ -1279,6 +1279,7 @@ dependencies = [
"remote_storage",
"serde",
"serde_json",
"url",
"utils",
]
@@ -1304,6 +1305,7 @@ dependencies = [
"fail",
"flate2",
"futures",
"hostname-validator",
"http 1.1.0",
"indexmap 2.9.0",
"itertools 0.10.5",
@@ -2770,6 +2772,12 @@ dependencies = [
"windows",
]
[[package]]
name = "hostname-validator"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f558a64ac9af88b5ba400d99b579451af0d39c6d360980045b91aac966d705e2"
[[package]]
name = "http"
version = "0.2.9"
@@ -4421,6 +4429,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror 1.0.69",
"tracing",
"tracing-utils",
"utils",
]
@@ -4479,6 +4488,7 @@ dependencies = [
"pageserver_api",
"postgres_ffi_types",
"prost 0.13.5",
"prost-types 0.13.5",
"strum",
"strum_macros",
"thiserror 1.0.69",
@@ -5156,7 +5166,7 @@ dependencies = [
"petgraph",
"prettyplease",
"prost 0.13.5",
"prost-types 0.13.3",
"prost-types 0.13.5",
"regex",
"syn 2.0.100",
"tempfile",
@@ -5199,9 +5209,9 @@ dependencies = [
[[package]]
name = "prost-types"
version = "0.13.3"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670"
checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
dependencies = [
"prost 0.13.5",
]
@@ -6808,6 +6818,7 @@ dependencies = [
"chrono",
"clap",
"clashmap",
"compute_api",
"control_plane",
"cron",
"diesel",
@@ -7641,7 +7652,7 @@ dependencies = [
"prettyplease",
"proc-macro2",
"prost-build 0.13.3",
"prost-types 0.13.3",
"prost-types 0.13.5",
"quote",
"syn 2.0.100",
]
@@ -7653,7 +7664,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9687bd5bfeafebdded2356950f278bba8226f0b32109537c4253406e09aafe1"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.3",
"prost-types 0.13.5",
"tokio",
"tokio-stream",
"tonic 0.13.1",

View File

@@ -152,6 +152,7 @@ pprof = { version = "0.14", features = ["criterion", "flamegraph", "frame-pointe
procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.13.5"
prost-types = "0.13.5"
rand = "0.8"
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"

View File

@@ -1983,7 +1983,7 @@ RUN apt update && \
locales \
lsof \
procps \
rsyslog \
rsyslog-gnutls \
screen \
tcpdump \
$VERSION_INSTALLS && \

View File

@@ -8,6 +8,8 @@
import 'sql_exporter/compute_logical_snapshot_files.libsonnet',
import 'sql_exporter/compute_logical_snapshots_bytes.libsonnet',
import 'sql_exporter/compute_max_connections.libsonnet',
import 'sql_exporter/compute_pg_oldest_frozen_xid_age.libsonnet',
import 'sql_exporter/compute_pg_oldest_mxid_age.libsonnet',
import 'sql_exporter/compute_receive_lsn.libsonnet',
import 'sql_exporter/compute_subscriptions_count.libsonnet',
import 'sql_exporter/connection_counts.libsonnet',

View File

@@ -0,0 +1,13 @@
{
metric_name: 'compute_pg_oldest_frozen_xid_age',
type: 'gauge',
help: 'Age of oldest XIDs that have not been frozen by VACUUM. An indicator of how long it has been since VACUUM last ran.',
key_labels: [
'database_name',
],
value_label: 'metric',
values: [
'frozen_xid_age',
],
query: importstr 'sql_exporter/compute_pg_oldest_frozen_xid_age.sql',
}

View File

@@ -0,0 +1,4 @@
SELECT datname database_name,
age(datfrozenxid) frozen_xid_age
FROM pg_database
ORDER BY frozen_xid_age DESC LIMIT 10;

View File

@@ -0,0 +1,13 @@
{
metric_name: 'compute_pg_oldest_mxid_age',
type: 'gauge',
help: 'Age of oldest MXIDs that have not been replaced by VACUUM. An indicator of how long it has been since VACUUM last ran.',
key_labels: [
'database_name',
],
value_label: 'metric',
values: [
'min_mxid_age',
],
query: importstr 'sql_exporter/compute_pg_oldest_mxid_age.sql',
}

View File

@@ -0,0 +1,4 @@
SELECT datname database_name,
mxid_age(datminmxid) min_mxid_age
FROM pg_database
ORDER BY min_mxid_age DESC LIMIT 10;

View File

@@ -1,8 +1,8 @@
diff --git a/sql/anon.sql b/sql/anon.sql
index 0cdc769..f6cc950 100644
index 0cdc769..b450327 100644
--- a/sql/anon.sql
+++ b/sql/anon.sql
@@ -1141,3 +1141,8 @@ $$
@@ -1141,3 +1141,15 @@ $$
-- TODO : https://en.wikipedia.org/wiki/L-diversity
-- TODO : https://en.wikipedia.org/wiki/T-closeness
@@ -11,6 +11,13 @@ index 0cdc769..f6cc950 100644
+
+GRANT ALL ON SCHEMA anon to neon_superuser;
+GRANT ALL ON ALL TABLES IN SCHEMA anon TO neon_superuser;
+
+DO $$
+BEGIN
+ IF current_setting('server_version_num')::int >= 150000 THEN
+ GRANT SET ON PARAMETER anon.transparent_dynamic_masking TO neon_superuser;
+ END IF;
+END $$;
diff --git a/sql/init.sql b/sql/init.sql
index 7da6553..9b6164b 100644
--- a/sql/init.sql

View File

@@ -27,6 +27,7 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
hostname-validator = "1.1"
indexmap.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true

View File

@@ -36,6 +36,8 @@
use std::ffi::OsString;
use std::fs::File;
use std::process::exit;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
@@ -190,7 +192,9 @@ fn main() -> Result<()> {
cgroup: cli.cgroup,
#[cfg(target_os = "linux")]
vm_monitor_addr: cli.vm_monitor_addr,
installed_extensions_collection_interval: cli.installed_extensions_collection_interval,
installed_extensions_collection_interval: Arc::new(AtomicU64::new(
cli.installed_extensions_collection_interval,
)),
},
config,
)?;

View File

@@ -1,4 +1,4 @@
use anyhow::{Context, Result, anyhow};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{
@@ -6,7 +6,7 @@ use compute_api::responses::{
LfcPrewarmState, TlsConfig,
};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent,
};
use futures::StreamExt;
use futures::future::join_all;
@@ -25,7 +25,7 @@ use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
@@ -70,6 +70,7 @@ pub static BUILD_TAG: Lazy<String> = Lazy::new(|| {
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string()
});
const DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL: u64 = 3600;
/// Static configuration params that don't change after startup. These mostly
/// come from the CLI args, or are derived from them.
@@ -103,7 +104,7 @@ pub struct ComputeNodeParams {
pub remote_ext_base_url: Option<Url>,
/// Interval for installed extensions collection
pub installed_extensions_collection_interval: u64,
pub installed_extensions_collection_interval: Arc<AtomicU64>,
}
/// Compute node info shared across several `compute_ctl` threads.
@@ -126,6 +127,9 @@ pub struct ComputeNode {
// key: ext_archive_name, value: started download time, download_completed?
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
pub compute_ctl_config: ComputeCtlConfig,
/// Handle to the extension stats collection task
extension_stats_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
}
// store some metrics about download size that might impact startup time
@@ -428,6 +432,7 @@ impl ComputeNode {
state_changed: Condvar::new(),
ext_download_progress: RwLock::new(HashMap::new()),
compute_ctl_config: config.compute_ctl_config,
extension_stats_task: Mutex::new(None),
})
}
@@ -515,6 +520,9 @@ impl ComputeNode {
None
};
// Terminate the extension stats collection task
this.terminate_extension_stats_task();
// Terminate the vm_monitor so it releases the file watcher on
// /sys/fs/cgroup/neon-postgres.
// Note: the vm-monitor only runs on linux because it requires cgroups.
@@ -751,10 +759,15 @@ impl ComputeNode {
// Configure and start rsyslog for compliance audit logging
match pspec.spec.audit_log_level {
ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
let remote_endpoint =
let remote_tls_endpoint =
std::env::var("AUDIT_LOGGING_TLS_ENDPOINT").unwrap_or("".to_string());
let remote_plain_endpoint =
std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
if remote_endpoint.is_empty() {
anyhow::bail!("AUDIT_LOGGING_ENDPOINT is empty");
if remote_plain_endpoint.is_empty() && remote_tls_endpoint.is_empty() {
anyhow::bail!(
"AUDIT_LOGGING_ENDPOINT and AUDIT_LOGGING_TLS_ENDPOINT are both empty"
);
}
let log_directory_path = Path::new(&self.params.pgdata).join("log");
@@ -770,7 +783,8 @@ impl ComputeNode {
log_directory_path.clone(),
endpoint_id,
project_id,
&remote_endpoint,
&remote_plain_endpoint,
&remote_tls_endpoint,
)?;
// Launch a background task to clean up the audit logs
@@ -1003,19 +1017,12 @@ impl ComputeNode {
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
// Detect the protocol scheme. If the URL doesn't have a scheme, assume libpq.
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let scheme = match Url::parse(shard0_connstr) {
Ok(url) => url.scheme().to_lowercase().to_string(),
Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(),
Err(err) => return Err(anyhow!("invalid connstring URL: {err}")),
};
let started = Instant::now();
let (connected, size) = match scheme.as_str() {
"postgresql" | "postgres" => self.try_get_basebackup_libpq(spec, lsn)?,
"grpc" => self.try_get_basebackup_grpc(spec, lsn)?,
scheme => return Err(anyhow!("unknown URL scheme {scheme}")),
let (connected, size) = match PageserverProtocol::from_connstring(shard0_connstr)? {
PageserverProtocol::Libpq => self.try_get_basebackup_libpq(spec, lsn)?,
PageserverProtocol::Grpc => self.try_get_basebackup_grpc(spec, lsn)?,
};
let mut state = self.state.lock().unwrap();
@@ -1280,7 +1287,7 @@ impl ComputeNode {
let start_time = Utc::now();
let mut sync_handle = maybe_cgexec(&self.params.pgbin)
.args(["--sync-safekeepers"]) // CF walproposer.c:289
.args(["--sync-safekeepers"])
.env("PGDATA", &self.params.pgdata) // we cannot use -D in this mode
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
@@ -1678,6 +1685,8 @@ impl ComputeNode {
tls_config = self.compute_ctl_config.tls.clone();
}
self.update_installed_extensions_collection_interval(&spec);
let max_concurrent_connections = self.max_service_connections(compute_state, &spec);
// Merge-apply spec & changes to PostgreSQL state.
@@ -1742,6 +1751,8 @@ impl ComputeNode {
let tls_config = self.tls_config(&spec);
self.update_installed_extensions_collection_interval(&spec);
if let Some(ref pgbouncer_settings) = spec.pgbouncer_settings {
info!("tuning pgbouncer");
@@ -2346,10 +2357,20 @@ LIMIT 100",
}
pub fn spawn_extension_stats_task(&self) {
// Cancel any existing task
if let Some(handle) = self.extension_stats_task.lock().unwrap().take() {
handle.abort();
}
let conf = self.tokio_conn_conf.clone();
let installed_extensions_collection_interval =
self.params.installed_extensions_collection_interval;
tokio::spawn(async move {
let atomic_interval = self.params.installed_extensions_collection_interval.clone();
let mut installed_extensions_collection_interval =
2 * atomic_interval.load(std::sync::atomic::Ordering::SeqCst);
info!(
"[NEON_EXT_SPAWN] Spawning background installed extensions worker with Timeout: {}",
installed_extensions_collection_interval
);
let handle = tokio::spawn(async move {
// An initial sleep is added to ensure that two collections don't happen at the same time.
// The first collection happens during compute startup.
tokio::time::sleep(tokio::time::Duration::from_secs(
@@ -2362,8 +2383,48 @@ LIMIT 100",
loop {
interval.tick().await;
let _ = installed_extensions(conf.clone()).await;
// Acquire a read lock on the compute spec and then update the interval if necessary
interval = tokio::time::interval(tokio::time::Duration::from_secs(std::cmp::max(
installed_extensions_collection_interval,
2 * atomic_interval.load(std::sync::atomic::Ordering::SeqCst),
)));
installed_extensions_collection_interval = interval.period().as_secs();
}
});
// Store the new task handle
*self.extension_stats_task.lock().unwrap() = Some(handle);
}
fn terminate_extension_stats_task(&self) {
if let Some(handle) = self.extension_stats_task.lock().unwrap().take() {
handle.abort();
}
}
fn update_installed_extensions_collection_interval(&self, spec: &ComputeSpec) {
// Update the interval for collecting installed extensions statistics
// If the value is -1, we never suspend so set the value to default collection.
// If the value is 0, it means default, we will just continue to use the default.
if spec.suspend_timeout_seconds == -1 || spec.suspend_timeout_seconds == 0 {
info!(
"[NEON_EXT_INT_UPD] Spec Timeout: {}, New Timeout: {}",
spec.suspend_timeout_seconds, DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL
);
self.params.installed_extensions_collection_interval.store(
DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL,
std::sync::atomic::Ordering::SeqCst,
);
} else {
info!(
"[NEON_EXT_INT_UPD] Spec Timeout: {}",
spec.suspend_timeout_seconds
);
self.params.installed_extensions_collection_interval.store(
spec.suspend_timeout_seconds as u64,
std::sync::atomic::Ordering::SeqCst,
);
}
}
}

View File

@@ -10,7 +10,13 @@ input(type="imfile" File="{log_directory}/*.log"
startmsg.regex="^[[:digit:]]{{4}}-[[:digit:]]{{2}}-[[:digit:]]{{2}} [[:digit:]]{{2}}:[[:digit:]]{{2}}:[[:digit:]]{{2}}.[[:digit:]]{{3}} GMT,")
# the directory to store rsyslog state files
global(workDirectory="/var/log/rsyslog")
global(
workDirectory="/var/log/rsyslog"
DefaultNetstreamDriverCAFile="/etc/ssl/certs/ca-certificates.crt"
)
# Whether the remote syslog receiver uses tls
set $.remote_syslog_tls = "{remote_syslog_tls}";
# Construct json, endpoint_id and project_id as additional metadata
set $.json_log!endpoint_id = "{endpoint_id}";
@@ -21,5 +27,29 @@ set $.json_log!msg = $msg;
template(name="PgAuditLog" type="string"
string="<%PRI%>1 %TIMESTAMP:::date-rfc3339% %HOSTNAME% - - - - %$.json_log%")
# Forward to remote syslog receiver (@@<hostname>:<port>;format
local5.info @@{remote_endpoint};PgAuditLog
# Forward to remote syslog receiver (over TLS)
if ( $syslogtag == 'pgaudit_log' ) then {{
if ( $.remote_syslog_tls == 'true' ) then {{
action(type="omfwd" target="{remote_syslog_host}" port="{remote_syslog_port}" protocol="tcp"
template="PgAuditLog"
queue.type="linkedList"
queue.size="1000"
action.ResumeRetryCount="10"
StreamDriver="gtls"
StreamDriverMode="1"
StreamDriverAuthMode="x509/name"
StreamDriverPermittedPeers="{remote_syslog_host}"
StreamDriver.CheckExtendedKeyPurpose="on"
StreamDriver.PermitExpiredCerts="off"
)
stop
}} else {{
action(type="omfwd" target="{remote_syslog_host}" port="{remote_syslog_port}" protocol="tcp"
template="PgAuditLog"
queue.type="linkedList"
queue.size="1000"
action.ResumeRetryCount="10"
)
stop
}}
}}

View File

@@ -4,7 +4,9 @@ use std::thread;
use std::time::{Duration, SystemTime};
use anyhow::{Result, bail};
use compute_api::spec::ComputeMode;
use compute_api::spec::{ComputeMode, PageserverProtocol};
use itertools::Itertools as _;
use pageserver_page_api as page_api;
use postgres::{NoTls, SimpleQueryMessage};
use tracing::{info, warn};
use utils::id::{TenantId, TimelineId};
@@ -76,25 +78,17 @@ fn acquire_lsn_lease_with_retry(
loop {
// Note: List of pageservers is dynamic, need to re-read configs before each attempt.
let configs = {
let (connstrings, auth) = {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
let conn_strings = spec.pageserver_connstr.split(',');
conn_strings
.map(|connstr| {
let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
if let Some(storage_auth_token) = &spec.storage_auth_token {
config.password(storage_auth_token.clone());
}
config
})
.collect::<Vec<_>>()
(
spec.pageserver_connstr.clone(),
spec.storage_auth_token.clone(),
)
};
let result = try_acquire_lsn_lease(tenant_id, timeline_id, lsn, &configs);
let result =
try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn);
match result {
Ok(Some(res)) => {
return Ok(res);
@@ -116,68 +110,104 @@ fn acquire_lsn_lease_with_retry(
}
}
/// Tries to acquire an LSN lease through PS page_service API.
/// Tries to acquire LSN leases on all Pageserver shards.
fn try_acquire_lsn_lease(
connstrings: &str,
auth: Option<&str>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
configs: &[postgres::Config],
) -> Result<Option<SystemTime>> {
fn get_valid_until(
config: &postgres::Config,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let mut client = config.connect(NoTls)?;
let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
let res = client.simple_query(&cmd)?;
let msg = match res.first() {
Some(msg) => msg,
None => bail!("empty response"),
};
let row = match msg {
SimpleQueryMessage::Row(row) => row,
_ => bail!("error parsing lsn lease response"),
let connstrings = connstrings.split(',').collect_vec();
let shard_count = connstrings.len();
let mut leases = Vec::new();
for (shard_number, &connstring) in connstrings.iter().enumerate() {
let tenant_shard_id = match shard_count {
0 | 1 => TenantShardId::unsharded(tenant_id),
shard_count => TenantShardId {
tenant_id,
shard_number: ShardNumber(shard_number as u8),
shard_count: ShardCount::new(shard_count as u8),
},
};
// Note: this will be None if a lease is explicitly not granted.
let valid_until_str = row.get("valid_until");
let valid_until = valid_until_str.map(|s| {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
.expect("Time larger than max SystemTime could handle")
});
Ok(valid_until)
let lease = match PageserverProtocol::from_connstring(connstring)? {
PageserverProtocol::Libpq => {
acquire_lsn_lease_libpq(connstring, auth, tenant_shard_id, timeline_id, lsn)?
}
PageserverProtocol::Grpc => {
acquire_lsn_lease_grpc(connstring, auth, tenant_shard_id, timeline_id, lsn)?
}
};
leases.push(lease);
}
let shard_count = configs.len();
Ok(leases.into_iter().min().flatten())
}
let valid_until = if shard_count > 1 {
configs
.iter()
.enumerate()
.map(|(shard_number, config)| {
let tenant_shard_id = TenantShardId {
tenant_id,
shard_count: ShardCount::new(shard_count as u8),
shard_number: ShardNumber(shard_number as u8),
};
get_valid_until(config, tenant_shard_id, timeline_id, lsn)
})
.collect::<Result<Vec<Option<SystemTime>>>>()?
.into_iter()
.min()
.unwrap()
} else {
get_valid_until(
&configs[0],
TenantShardId::unsharded(tenant_id),
timeline_id,
lsn,
)?
/// Acquires an LSN lease on a single shard, using the libpq API. The connstring must use a
/// postgresql:// scheme.
fn acquire_lsn_lease_libpq(
connstring: &str,
auth: Option<&str>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let mut config = postgres::Config::from_str(connstring)?;
if let Some(auth) = auth {
config.password(auth);
}
let mut client = config.connect(NoTls)?;
let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
let res = client.simple_query(&cmd)?;
let msg = match res.first() {
Some(msg) => msg,
None => bail!("empty response"),
};
let row = match msg {
SimpleQueryMessage::Row(row) => row,
_ => bail!("error parsing lsn lease response"),
};
// Note: this will be None if a lease is explicitly not granted.
let valid_until_str = row.get("valid_until");
let valid_until = valid_until_str.map(|s| {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
.expect("Time larger than max SystemTime could handle")
});
Ok(valid_until)
}
/// Acquires an LSN lease on a single shard, using the gRPC API. The connstring must use a
/// grpc:// scheme.
fn acquire_lsn_lease_grpc(
connstring: &str,
auth: Option<&str>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::new(
connstring.to_string(),
tenant_shard_id.tenant_id,
timeline_id,
tenant_shard_id.to_index(),
auth.map(String::from),
None,
)
.await?;
let req = page_api::LeaseLsnRequest { lsn };
match client.lease_lsn(req).await {
Ok(expires) => Ok(Some(expires)),
// Lease couldn't be acquired because the LSN has been garbage collected.
Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
Err(err) => Err(err.into()),
}
})
}

View File

@@ -4,8 +4,10 @@ use std::path::Path;
use std::process::Command;
use std::time::Duration;
use std::{fs::OpenOptions, io::Write};
use url::{Host, Url};
use anyhow::{Context, Result, anyhow};
use hostname_validator;
use tracing::{error, info, instrument, warn};
const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
@@ -82,18 +84,84 @@ fn restart_rsyslog() -> Result<()> {
Ok(())
}
fn parse_audit_syslog_address(
remote_plain_endpoint: &str,
remote_tls_endpoint: &str,
) -> Result<(String, u16, String)> {
let tls;
let remote_endpoint = if !remote_tls_endpoint.is_empty() {
tls = "true".to_string();
remote_tls_endpoint
} else {
tls = "false".to_string();
remote_plain_endpoint
};
// Urlify the remote_endpoint, so parsing can be done with url::Url.
let url_str = format!("http://{remote_endpoint}");
let url = Url::parse(&url_str).map_err(|err| {
anyhow!("Error parsing {remote_endpoint}, expected host:port, got {err:?}")
})?;
let is_valid = url.scheme() == "http"
&& url.path() == "/"
&& url.query().is_none()
&& url.fragment().is_none()
&& url.username() == ""
&& url.password().is_none();
if !is_valid {
return Err(anyhow!(
"Invalid address format {remote_endpoint}, expected host:port"
));
}
let host = match url.host() {
Some(Host::Domain(h)) if hostname_validator::is_valid(h) => h.to_string(),
Some(Host::Ipv4(ip4)) => ip4.to_string(),
Some(Host::Ipv6(ip6)) => ip6.to_string(),
_ => return Err(anyhow!("Invalid host")),
};
let port = url
.port()
.ok_or_else(|| anyhow!("Invalid port in {remote_endpoint}"))?;
Ok((host, port, tls))
}
fn generate_audit_rsyslog_config(
log_directory: String,
endpoint_id: &str,
project_id: &str,
remote_syslog_host: &str,
remote_syslog_port: u16,
remote_syslog_tls: &str,
) -> String {
format!(
include_str!("config_template/compute_audit_rsyslog_template.conf"),
log_directory = log_directory,
endpoint_id = endpoint_id,
project_id = project_id,
remote_syslog_host = remote_syslog_host,
remote_syslog_port = remote_syslog_port,
remote_syslog_tls = remote_syslog_tls
)
}
pub fn configure_audit_rsyslog(
log_directory: String,
endpoint_id: &str,
project_id: &str,
remote_endpoint: &str,
remote_tls_endpoint: &str,
) -> Result<()> {
let config_content: String = format!(
include_str!("config_template/compute_audit_rsyslog_template.conf"),
log_directory = log_directory,
endpoint_id = endpoint_id,
project_id = project_id,
remote_endpoint = remote_endpoint
let (remote_syslog_host, remote_syslog_port, remote_syslog_tls) =
parse_audit_syslog_address(remote_endpoint, remote_tls_endpoint).unwrap();
let config_content = generate_audit_rsyslog_config(
log_directory,
endpoint_id,
project_id,
&remote_syslog_host,
remote_syslog_port,
&remote_syslog_tls,
);
info!("rsyslog config_content: {}", config_content);
@@ -258,6 +326,8 @@ pub fn launch_pgaudit_gc(log_directory: String) {
mod tests {
use crate::rsyslog::PostgresLogsRsyslogConfig;
use super::{generate_audit_rsyslog_config, parse_audit_syslog_address};
#[test]
fn test_postgres_logs_config() {
{
@@ -287,4 +357,146 @@ mod tests {
assert!(res.is_err());
}
}
#[test]
fn test_parse_audit_syslog_address() {
{
// host:port format (plaintext)
let parsed = parse_audit_syslog_address("collector.host.tld:5555", "");
assert!(parsed.is_ok());
assert_eq!(
parsed.unwrap(),
(
String::from("collector.host.tld"),
5555,
String::from("false")
)
);
}
{
// host:port format with ipv4 ip address (plaintext)
let parsed = parse_audit_syslog_address("10.0.0.1:5555", "");
assert!(parsed.is_ok());
assert_eq!(
parsed.unwrap(),
(String::from("10.0.0.1"), 5555, String::from("false"))
);
}
{
// host:port format with ipv6 ip address (plaintext)
let parsed =
parse_audit_syslog_address("[7e60:82ed:cb2e:d617:f904:f395:aaca:e252]:5555", "");
assert_eq!(
parsed.unwrap(),
(
String::from("7e60:82ed:cb2e:d617:f904:f395:aaca:e252"),
5555,
String::from("false")
)
);
}
{
// Only TLS host:port defined
let parsed = parse_audit_syslog_address("", "tls.host.tld:5556");
assert_eq!(
parsed.unwrap(),
(String::from("tls.host.tld"), 5556, String::from("true"))
);
}
{
// tls host should take precedence, when both defined
let parsed = parse_audit_syslog_address("plaintext.host.tld:5555", "tls.host.tld:5556");
assert_eq!(
parsed.unwrap(),
(String::from("tls.host.tld"), 5556, String::from("true"))
);
}
{
// host without port (plaintext)
let parsed = parse_audit_syslog_address("collector.host.tld", "");
assert!(parsed.is_err());
}
{
// port without host
let parsed = parse_audit_syslog_address(":5555", "");
assert!(parsed.is_err());
}
{
// valid host with invalid port
let parsed = parse_audit_syslog_address("collector.host.tld:90001", "");
assert!(parsed.is_err());
}
{
// invalid hostname with valid port
let parsed = parse_audit_syslog_address("-collector.host.tld:5555", "");
assert!(parsed.is_err());
}
{
// parse error
let parsed = parse_audit_syslog_address("collector.host.tld:::5555", "");
assert!(parsed.is_err());
}
}
#[test]
fn test_generate_audit_rsyslog_config() {
{
// plaintext version
let log_directory = "/tmp/log".to_string();
let endpoint_id = "ep-test-endpoint-id";
let project_id = "test-project-id";
let remote_syslog_host = "collector.host.tld";
let remote_syslog_port = 5555;
let remote_syslog_tls = "false";
let conf_str = generate_audit_rsyslog_config(
log_directory,
endpoint_id,
project_id,
remote_syslog_host,
remote_syslog_port,
remote_syslog_tls,
);
assert!(conf_str.contains(r#"set $.remote_syslog_tls = "false";"#));
assert!(conf_str.contains(r#"type="omfwd""#));
assert!(conf_str.contains(r#"target="collector.host.tld""#));
assert!(conf_str.contains(r#"port="5555""#));
assert!(conf_str.contains(r#"StreamDriverPermittedPeers="collector.host.tld""#));
}
{
// TLS version
let log_directory = "/tmp/log".to_string();
let endpoint_id = "ep-test-endpoint-id";
let project_id = "test-project-id";
let remote_syslog_host = "collector.host.tld";
let remote_syslog_port = 5556;
let remote_syslog_tls = "true";
let conf_str = generate_audit_rsyslog_config(
log_directory,
endpoint_id,
project_id,
remote_syslog_host,
remote_syslog_port,
remote_syslog_tls,
);
assert!(conf_str.contains(r#"set $.remote_syslog_tls = "true";"#));
assert!(conf_str.contains(r#"type="omfwd""#));
assert!(conf_str.contains(r#"target="collector.host.tld""#));
assert!(conf_str.contains(r#"port="5556""#));
assert!(conf_str.contains(r#"StreamDriverPermittedPeers="collector.host.tld""#));
}
}
}

View File

@@ -3,7 +3,8 @@
"timestamp": "2021-05-23T18:25:43.511Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
"suspend_timeout_seconds": 3600,
"cluster": {
"cluster_id": "test-cluster-42",
"name": "Zenith Test",

View File

@@ -16,9 +16,9 @@ use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::requests::ComputeClaimsScope;
use compute_api::spec::ComputeMode;
use compute_api::spec::{ComputeMode, PageserverProtocol};
use control_plane::broker::StorageBroker;
use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode, PageserverProtocol};
use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode};
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
@@ -1649,7 +1649,9 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = parse_safekeepers(&args.safekeepers)?;
endpoint.reconfigure(pageservers, None, safekeepers).await?;
endpoint
.reconfigure(Some(pageservers), None, safekeepers, None)
.await?;
}
EndpointCmd::Stop(args) => {
let endpoint_id = &args.endpoint_id;

View File

@@ -56,8 +56,8 @@ use compute_api::responses::{
TlsConfig,
};
use compute_api::spec::{
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
RemoteExtSpec, Role,
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PageserverProtocol,
PgIdent, RemoteExtSpec, Role,
};
use jsonwebtoken::jwk::{
AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
@@ -373,29 +373,6 @@ impl std::fmt::Display for EndpointTerminateMode {
}
}
/// Protocol used to connect to a Pageserver.
#[derive(Clone, Copy, Debug)]
pub enum PageserverProtocol {
Libpq,
Grpc,
}
impl PageserverProtocol {
/// Returns the URL scheme for the protocol, used in connstrings.
pub fn scheme(&self) -> &'static str {
match self {
Self::Libpq => "postgresql",
Self::Grpc => "grpc",
}
}
}
impl Display for PageserverProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.scheme())
}
}
impl Endpoint {
fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
if !entry.file_type()?.is_dir() {
@@ -803,6 +780,7 @@ impl Endpoint {
endpoint_storage_addr: Some(endpoint_storage_addr),
endpoint_storage_token: Some(endpoint_storage_token),
autoprewarm: false,
suspend_timeout_seconds: -1, // Only used in neon_local.
};
// this strange code is needed to support respec() in tests
@@ -997,12 +975,11 @@ impl Endpoint {
pub async fn reconfigure(
&self,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
pageservers: Option<Vec<(PageserverProtocol, Host, u16)>>,
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
safekeeper_generation: Option<SafekeeperGeneration>,
) -> Result<()> {
anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
let (mut spec, compute_ctl_config) = {
let config_path = self.endpoint_path().join("config.json");
let file = std::fs::File::open(config_path)?;
@@ -1014,16 +991,24 @@ impl Endpoint {
let postgresql_conf = self.read_postgresql_conf()?;
spec.cluster.postgresql_conf = Some(postgresql_conf);
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
spec.pageserver_connstring = Some(pageserver_connstr);
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
// If pageservers are not specified, don't change them.
if let Some(pageservers) = pageservers {
anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
spec.pageserver_connstring = Some(pageserver_connstr);
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}
}
// If safekeepers are not specified, don't change them.
if let Some(safekeepers) = safekeepers {
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
spec.safekeeper_connstrings = safekeeper_connstrings;
if let Some(g) = safekeeper_generation {
spec.safekeepers_generation = Some(g.into_inner());
}
}
let client = reqwest::Client::builder()
@@ -1061,6 +1046,24 @@ impl Endpoint {
}
}
pub async fn reconfigure_pageservers(
&self,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
stripe_size: Option<ShardStripeSize>,
) -> Result<()> {
self.reconfigure(Some(pageservers), stripe_size, None, None)
.await
}
pub async fn reconfigure_safekeepers(
&self,
safekeepers: Vec<NodeId>,
generation: SafekeeperGeneration,
) -> Result<()> {
self.reconfigure(None, None, Some(safekeepers), Some(generation))
.await
}
pub async fn stop(
&self,
mode: EndpointTerminateMode,

View File

@@ -4,6 +4,7 @@
"timestamp": "2022-10-12T18:00:00.000Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8c",
"suspend_timeout_seconds": -1,
"cluster": {
"cluster_id": "docker_compose",

View File

@@ -12,6 +12,7 @@ jsonwebtoken.workspace = true
serde.workspace = true
serde_json.workspace = true
regex.workspace = true
url.workspace = true
utils = { path = "../utils" }
remote_storage = { version = "0.1", path = "../remote_storage/" }

View File

@@ -4,11 +4,14 @@
//! provide it by calling the compute_ctl's `/compute_ctl` endpoint, or
//! compute_ctl can fetch it by calling the control plane's API.
use std::collections::HashMap;
use std::fmt::Display;
use anyhow::anyhow;
use indexmap::IndexMap;
use regex::Regex;
use remote_storage::RemotePath;
use serde::{Deserialize, Serialize};
use url::Url;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -181,6 +184,11 @@ pub struct ComputeSpec {
/// Download LFC state from endpoint_storage and pass it to Postgres on startup
#[serde(default)]
pub autoprewarm: bool,
/// Suspend timeout in seconds.
///
/// We use this value to derive other values, such as the installed extensions metric.
pub suspend_timeout_seconds: i64,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
@@ -429,6 +437,47 @@ pub struct JwksSettings {
pub jwt_audience: Option<String>,
}
/// Protocol used to connect to a Pageserver. Parsed from the connstring scheme.
#[derive(Clone, Copy, Debug, Default)]
pub enum PageserverProtocol {
/// The original protocol based on libpq and COPY. Uses postgresql:// or postgres:// scheme.
#[default]
Libpq,
/// A newer, gRPC-based protocol. Uses grpc:// scheme.
Grpc,
}
impl PageserverProtocol {
/// Parses the protocol from a connstring scheme. Defaults to Libpq if no scheme is given.
/// Errors if the connstring is an invalid URL.
pub fn from_connstring(connstring: &str) -> anyhow::Result<Self> {
let scheme = match Url::parse(connstring) {
Ok(url) => url.scheme().to_lowercase(),
Err(url::ParseError::RelativeUrlWithoutBase) => return Ok(Self::default()),
Err(err) => return Err(anyhow!("invalid connstring URL: {err}")),
};
match scheme.as_str() {
"postgresql" | "postgres" => Ok(Self::Libpq),
"grpc" => Ok(Self::Grpc),
scheme => Err(anyhow!("invalid protocol scheme: {scheme}")),
}
}
/// Returns the URL scheme for the protocol, for use in connstrings.
pub fn scheme(&self) -> &'static str {
match self {
Self::Libpq => "postgresql",
Self::Grpc => "grpc",
}
}
}
impl Display for PageserverProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.scheme())
}
}
#[cfg(test)]
mod tests {
use std::fs::File;

View File

@@ -3,6 +3,7 @@
"timestamp": "2021-05-23T18:25:43.511Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
"suspend_timeout_seconds": 3600,
"cluster": {
"cluster_id": "test-cluster-42",

View File

@@ -30,12 +30,13 @@ humantime-serde.workspace = true
chrono = { workspace = true, features = ["serde"] }
itertools.workspace = true
storage_broker.workspace = true
camino = {workspace = true, features = ["serde1"]}
camino = { workspace = true, features = ["serde1"] }
remote_storage.workspace = true
postgres_backend.workspace = true
nix = {workspace = true, optional = true}
nix = { workspace = true, optional = true }
reqwest.workspace = true
rand.workspace = true
tracing.workspace = true
tracing-utils.workspace = true
once_cell.workspace = true

View File

@@ -37,6 +37,7 @@ use std::hash::{Hash, Hasher};
pub use ::utils::shard::*;
use postgres_ffi_types::forknum::INIT_FORKNUM;
use serde::{Deserialize, Serialize};
use utils::critical;
use crate::key::Key;
use crate::models::ShardParameters;
@@ -188,6 +189,17 @@ impl ShardIdentity {
}
}
/// Asserts that the given shard identities are equal. Changes to shard parameters will likely
/// result in data corruption.
pub fn assert_equal(&self, other: ShardIdentity) {
if self != &other {
// TODO: for now, we're conservative and just log errors in production. Turn this into a
// real assertion when we're confident it doesn't misfire, and also reject requests that
// attempt to change it with an error response.
critical!("shard identity mismatch: {self:?} != {other:?}");
}
}
fn is_broken(&self) -> bool {
self.layout == LAYOUT_BROKEN
}

View File

@@ -162,9 +162,9 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
// Fast path for the common case that the whole record fits on the page.
let pageleft = self.lsn.remaining_in_block() as u32;
if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
self.lsn += xl_tot_len as u64; /* we set self.lsn to the exclusive end of the record */
self.lsn += xl_tot_len as u64;
let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
return Ok(Some(self.complete_record(recordbuf)?)); // this returns (start LSN of NEXT!? record, this record's Bytes), wtf
return Ok(Some(self.complete_record(recordbuf)?));
} else {
// Need to assemble the record from pieces. Remember the size of the
// record, and loop back. On next iterations, we will reach the branch

View File

@@ -80,8 +80,8 @@ pub struct InterpretedWalRecord {
/// by the pageserver
pub batch: SerializedValueBatch,
/// Byte offset within WAL for the start of the next PG WAL record.
/// Usually this is the byte following the last byte of this record here,
/// but in case of XLOG SWITCH records it will be within the next segment.
/// Usually this is the end LSN of the current record, but in case of
/// XLOG SWITCH records it will be within the next segment.
pub next_record_lsn: Lsn,
/// Whether to flush all uncommitted modifications to the storage engine
/// before ingesting this record. This is currently only used for legacy PG
@@ -213,7 +213,6 @@ pub struct XactCommon {
pub origin_id: u16,
// Fields below are only used for logging
pub xl_xid: TransactionId,
// The `next_record_lsn` returned by wal decoder when we decoded this record.
pub lsn: Lsn,
}
@@ -256,7 +255,6 @@ pub enum XlogRecord {
#[derive(Clone, Serialize, Deserialize)]
pub struct RawXlogRecord {
pub info: u8,
// The `next_record_lsn` returned by wal decoder when we decoded this record.
pub lsn: Lsn,
pub buf: Bytes,
}

View File

@@ -80,8 +80,6 @@ impl Eq for OrderedValueMeta {}
#[derive(Serialize, Deserialize, Clone)]
pub struct SerializedValueMeta {
pub key: CompactKey,
// The `next_record_lsn` emitted by the wal_decoder for the WAL record
// that corresponds to this value.
pub lsn: Lsn,
/// Starting offset of the value for the (key, LSN) tuple
/// in [`SerializedValueBatch::raw`]
@@ -94,8 +92,6 @@ pub struct SerializedValueMeta {
#[derive(Serialize, Deserialize, Clone)]
pub struct ObservedValueMeta {
pub key: CompactKey,
// The `next_record_lsn` emitted by the wal_decoder for the WAL record
// that corresponds to this value.
pub lsn: Lsn,
}
@@ -113,12 +109,7 @@ pub struct SerializedValueBatch {
/// by LSN. Note that entries for a key do not have to be contiguous.
pub metadata: Vec<ValueMeta>,
/// The highest LSN of any value in the batch.
///
/// The "LSN of a Value" is the `next_record_lsn` that the wal_decoder
/// emitted for that value, i.e., the LSN of a Value is
/// an LSN that is >= the next byte after the last byte of this value's
/// WAL record.
/// The highest LSN of any value in the batch
pub max_lsn: Lsn,
/// Number of values encoded by [`Self::raw`]

View File

@@ -11,6 +11,7 @@ futures.workspace = true
pageserver_api.workspace = true
postgres_ffi_types.workspace = true
prost.workspace = true
prost-types.workspace = true
strum.workspace = true
strum_macros.workspace = true
thiserror.workspace = true

View File

@@ -35,6 +35,8 @@
syntax = "proto3";
package page_api;
import "google/protobuf/timestamp.proto";
service PageService {
// Returns whether a relation exists.
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
@@ -64,6 +66,10 @@ service PageService {
// Fetches an SLRU segment.
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage
// collect the LSN until the lease expires. Must be acquired on all relevant shards.
rpc LeaseLsn (LeaseLsnRequest) returns (LeaseLsnResponse);
}
// The LSN a request should read at.
@@ -252,3 +258,17 @@ message GetSlruSegmentRequest {
message GetSlruSegmentResponse {
bytes segment = 1;
}
// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage
// collect the LSN until the lease expires. Must be acquired on all relevant shards.
message LeaseLsnRequest {
// The LSN to lease. Can't be 0 or below the current GC cutoff.
uint64 lsn = 1;
}
// Lease acquisition response. If the lease could not be granted because the LSN has already been
// garbage collected, a FailedPrecondition status will be returned instead.
message LeaseLsnResponse {
// The lease expiration time.
google.protobuf.Timestamp expires = 1;
}

View File

@@ -187,4 +187,17 @@ impl Client {
let response = self.client.get_slru_segment(proto_req).await?;
Ok(response.into_inner().try_into()?)
}
/// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't
/// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards.
///
/// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be
/// acquired because the LSN has already been garbage collected.
pub async fn lease_lsn(
&mut self,
req: model::LeaseLsnRequest,
) -> Result<model::LeaseLsnResponse, tonic::Status> {
let req = proto::LeaseLsnRequest::from(req);
Ok(self.client.lease_lsn(req).await?.into_inner().try_into()?)
}
}

View File

@@ -16,6 +16,7 @@
//! stream combinators without dealing with errors, and avoids validating the same message twice.
use std::fmt::Display;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use bytes::Bytes;
use postgres_ffi_types::Oid;
@@ -703,3 +704,54 @@ impl From<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
// SlruKind is defined in pageserver_api::reltag.
pub type SlruKind = pageserver_api::reltag::SlruKind;
/// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage
/// collect the LSN until the lease expires.
pub struct LeaseLsnRequest {
/// The LSN to lease.
pub lsn: Lsn,
}
impl TryFrom<proto::LeaseLsnRequest> for LeaseLsnRequest {
type Error = ProtocolError;
fn try_from(pb: proto::LeaseLsnRequest) -> Result<Self, Self::Error> {
if pb.lsn == 0 {
return Err(ProtocolError::Missing("lsn"));
}
Ok(Self { lsn: Lsn(pb.lsn) })
}
}
impl From<LeaseLsnRequest> for proto::LeaseLsnRequest {
fn from(request: LeaseLsnRequest) -> Self {
Self { lsn: request.lsn.0 }
}
}
/// Lease expiration time. If the lease could not be granted because the LSN has already been
/// garbage collected, a FailedPrecondition status will be returned instead.
pub type LeaseLsnResponse = SystemTime;
impl TryFrom<proto::LeaseLsnResponse> for LeaseLsnResponse {
type Error = ProtocolError;
fn try_from(pb: proto::LeaseLsnResponse) -> Result<Self, Self::Error> {
let expires = pb.expires.ok_or(ProtocolError::Missing("expires"))?;
UNIX_EPOCH
.checked_add(Duration::new(expires.seconds as u64, expires.nanos as u32))
.ok_or_else(|| ProtocolError::invalid("expires", expires))
}
}
impl From<LeaseLsnResponse> for proto::LeaseLsnResponse {
fn from(response: LeaseLsnResponse) -> Self {
let expires = response.duration_since(UNIX_EPOCH).unwrap_or_default();
Self {
expires: Some(prost_types::Timestamp {
seconds: expires.as_secs() as i64,
nanos: expires.subsec_nanos() as i32,
}),
}
}
}

View File

@@ -6,12 +6,13 @@ use posthog_client_lite::{
CaptureEvent, FeatureResolverBackgroundLoop, PostHogEvaluationError,
PostHogFlagFilterPropertyValue,
};
use rand::Rng;
use remote_storage::RemoteStorageKind;
use serde_json::json;
use tokio_util::sync::CancellationToken;
use utils::id::TenantId;
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION};
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION, tenant::TenantShard};
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
@@ -138,6 +139,7 @@ impl FeatureResolver {
}
Arc::new(properties)
};
let fake_tenants = {
let mut tenants = Vec::new();
for i in 0..10 {
@@ -147,9 +149,16 @@ impl FeatureResolver {
conf.id,
i
);
let tenant_properties = PerTenantProperties {
remote_size_mb: Some(rand::thread_rng().gen_range(100.0..1000000.00)),
}
.into_posthog_properties();
let properties = Self::collect_properties_inner(
distinct_id.clone(),
Some(&internal_properties),
&tenant_properties,
);
tenants.push(CaptureEvent {
event: "initial_tenant_report".to_string(),
@@ -183,6 +192,7 @@ impl FeatureResolver {
fn collect_properties_inner(
tenant_id: String,
internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> HashMap<String, PostHogFlagFilterPropertyValue> {
let mut properties = HashMap::new();
if let Some(internal_properties) = internal_properties {
@@ -194,6 +204,9 @@ impl FeatureResolver {
"tenant_id".to_string(),
PostHogFlagFilterPropertyValue::String(tenant_id),
);
for (key, value) in tenant_properties.iter() {
properties.insert(key.clone(), value.clone());
}
properties
}
@@ -201,8 +214,13 @@ impl FeatureResolver {
pub(crate) fn collect_properties(
&self,
tenant_id: TenantId,
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> HashMap<String, PostHogFlagFilterPropertyValue> {
Self::collect_properties_inner(tenant_id.to_string(), self.internal_properties.as_deref())
Self::collect_properties_inner(
tenant_id.to_string(),
self.internal_properties.as_deref(),
tenant_properties,
)
}
/// Evaluate a multivariate feature flag. Currently, we do not support any properties.
@@ -214,6 +232,7 @@ impl FeatureResolver {
&self,
flag_key: &str,
tenant_id: TenantId,
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<String, PostHogEvaluationError> {
let force_overrides = self.force_overrides_for_testing.load();
if let Some(value) = force_overrides.get(flag_key) {
@@ -224,7 +243,7 @@ impl FeatureResolver {
let res = inner.feature_store().evaluate_multivariate(
flag_key,
&tenant_id.to_string(),
&self.collect_properties(tenant_id),
&self.collect_properties(tenant_id, tenant_properties),
);
match &res {
Ok(value) => {
@@ -257,6 +276,7 @@ impl FeatureResolver {
&self,
flag_key: &str,
tenant_id: TenantId,
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<(), PostHogEvaluationError> {
let force_overrides = self.force_overrides_for_testing.load();
if let Some(value) = force_overrides.get(flag_key) {
@@ -271,7 +291,7 @@ impl FeatureResolver {
let res = inner.feature_store().evaluate_boolean(
flag_key,
&tenant_id.to_string(),
&self.collect_properties(tenant_id),
&self.collect_properties(tenant_id, tenant_properties),
);
match &res {
Ok(()) => {
@@ -317,3 +337,78 @@ impl FeatureResolver {
.store(Arc::new(force_overrides));
}
}
struct PerTenantProperties {
pub remote_size_mb: Option<f64>,
}
impl PerTenantProperties {
pub fn into_posthog_properties(self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
let mut properties = HashMap::new();
if let Some(remote_size_mb) = self.remote_size_mb {
properties.insert(
"tenant_remote_size_mb".to_string(),
PostHogFlagFilterPropertyValue::Number(remote_size_mb),
);
}
properties
}
}
#[derive(Clone)]
pub struct TenantFeatureResolver {
inner: FeatureResolver,
tenant_id: TenantId,
cached_tenant_properties: Arc<ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>>,
}
impl TenantFeatureResolver {
pub fn new(inner: FeatureResolver, tenant_id: TenantId) -> Self {
Self {
inner,
tenant_id,
cached_tenant_properties: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
}
}
pub fn evaluate_multivariate(&self, flag_key: &str) -> Result<String, PostHogEvaluationError> {
self.inner.evaluate_multivariate(
flag_key,
self.tenant_id,
&self.cached_tenant_properties.load(),
)
}
pub fn evaluate_boolean(&self, flag_key: &str) -> Result<(), PostHogEvaluationError> {
self.inner.evaluate_boolean(
flag_key,
self.tenant_id,
&self.cached_tenant_properties.load(),
)
}
pub fn collect_properties(&self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
self.inner
.collect_properties(self.tenant_id, &self.cached_tenant_properties.load())
}
pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
self.inner.is_feature_flag_boolean(flag_key)
}
pub fn update_cached_tenant_properties(&self, tenant_shard: &TenantShard) {
let mut remote_size_mb = None;
for timeline in tenant_shard.list_timelines() {
let size = timeline.metrics.resident_physical_size_get();
if size == 0 {
remote_size_mb = None;
}
if let Some(ref mut remote_size_mb) = remote_size_mb {
*remote_size_mb += size as f64 / 1024.0 / 1024.0;
}
}
self.cached_tenant_properties.store(Arc::new(
PerTenantProperties { remote_size_mb }.into_posthog_properties(),
));
}
}

View File

@@ -1896,6 +1896,10 @@ async fn update_tenant_config_handler(
ShardParameters::from(tenant.get_shard_identity()),
);
tenant
.get_shard_identity()
.assert_equal(location_conf.shard); // not strictly necessary since we construct it above
crate::tenant::TenantShard::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
@@ -1940,6 +1944,10 @@ async fn patch_tenant_config_handler(
ShardParameters::from(tenant.get_shard_identity()),
);
tenant
.get_shard_identity()
.assert_equal(location_conf.shard); // not strictly necessary since we construct it above
crate::tenant::TenantShard::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
@@ -3689,23 +3697,25 @@ async fn tenant_evaluate_feature_flag(
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let properties = tenant.feature_resolver.collect_properties(tenant_shard_id.tenant_id);
// TODO: the properties we get here might be stale right after it is collected. But such races are rare (updated every 10s)
// and we don't need to worry about it for now.
let properties = tenant.feature_resolver.collect_properties();
if as_type.as_deref() == Some("boolean") {
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
let result = tenant.feature_resolver.evaluate_boolean(&flag);
let result = result.map(|_| true).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else if as_type.as_deref() == Some("multivariate") {
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
let result = tenant.feature_resolver.evaluate_multivariate(&flag).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else {
// Auto infer the type of the feature flag.
let is_boolean = tenant.feature_resolver.is_feature_flag_boolean(&flag).map_err(|e| ApiError::InternalServerError(anyhow::anyhow!("{e}")))?;
if is_boolean {
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
let result = tenant.feature_resolver.evaluate_boolean(&flag);
let result = result.map(|_| true).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else {
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
let result = tenant.feature_resolver.evaluate_multivariate(&flag).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
}
}

View File

@@ -14,6 +14,7 @@ use std::{io, str};
use anyhow::{Context as _, bail};
use bytes::{Buf as _, BufMut as _, BytesMut};
use chrono::Utc;
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use itertools::Itertools;
@@ -3760,6 +3761,36 @@ impl proto::PageService for GrpcPageServiceHandler {
let resp: page_api::GetSlruSegmentResponse = resp.segment;
Ok(tonic::Response::new(resp.into()))
}
#[instrument(skip_all, fields(lsn))]
async fn lease_lsn(
&self,
req: tonic::Request<proto::LeaseLsnRequest>,
) -> Result<tonic::Response<proto::LeaseLsnResponse>, tonic::Status> {
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_timeline(&timeline);
// Validate and convert the request, and decorate the span.
let req: page_api::LeaseLsnRequest = req.into_inner().try_into()?;
span_record!(lsn=%req.lsn);
// Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted.
let lease_length = timeline.get_lsn_lease_length();
let expires = match timeline.renew_lsn_lease(req.lsn, lease_length, &ctx) {
Ok(lease) => lease.valid_until,
Err(err) => return Err(tonic::Status::failed_precondition(format!("{err}"))),
};
// TODO: is this spammy? Move it compute-side?
info!(
"acquired lease for {} until {}",
req.lsn,
chrono::DateTime::<Utc>::from(expires).to_rfc3339()
);
Ok(tonic::Response::new(expires.into()))
}
}
/// gRPC middleware layer that handles observability concerns:

View File

@@ -1494,7 +1494,7 @@ pub struct DatadirModification<'a> {
/// in the state in 'tline' yet.
pub tline: &'a Timeline,
/// Current LSN of the modification.
/// Current LSN of the modification
lsn: Lsn,
// The modifications are not applied directly to the underlying key-value store.

View File

@@ -86,7 +86,7 @@ use crate::context;
use crate::context::RequestContextBuilder;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
use crate::feature_resolver::FeatureResolver;
use crate::feature_resolver::{FeatureResolver, TenantFeatureResolver};
use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::{
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
@@ -102,7 +102,6 @@ use crate::tenant::remote_timeline_client::{
INITDB_PATH, MaybeDeletedIndexPart, remote_initdb_archive_path,
};
use crate::tenant::storage_layer::{DeltaLayer, ImageLayer};
use crate::tenant::timeline::CheckOtherForCancel;
use crate::tenant::timeline::delete::DeleteTimelineFlow;
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
use crate::virtual_file::VirtualFile;
@@ -387,7 +386,7 @@ pub struct TenantShard {
l0_flush_global_state: L0FlushGlobalState,
pub(crate) feature_resolver: FeatureResolver,
pub(crate) feature_resolver: TenantFeatureResolver,
}
impl std::fmt::Debug for TenantShard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -3264,7 +3263,7 @@ impl TenantShard {
};
let gc_compaction_strategy = self
.feature_resolver
.evaluate_multivariate("gc-comapction-strategy", self.tenant_shard_id.tenant_id)
.evaluate_multivariate("gc-comapction-strategy")
.ok();
let span = if let Some(gc_compaction_strategy) = gc_compaction_strategy {
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id, strategy = %gc_compaction_strategy)
@@ -3314,11 +3313,9 @@ impl TenantShard {
/// Trips the compaction circuit breaker if appropriate.
pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) {
if err.is_cancel(CheckOtherForCancel::No /* XXX flip this to Yes so that all the Other() errors that are cancel don't trip the circuit breaker? */) {
return;
}
match err {
CompactionError::ShuttingDown => unreachable!("is_cancel"),
err if err.is_cancel() => {}
CompactionError::ShuttingDown => (),
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
// shouldn't block compaction.
CompactionError::Offload(_) => {}
@@ -3335,7 +3332,7 @@ impl TenantShard {
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
}
CompactionError::AlreadyRunning(_) => unreachable!("is_cancel, but XXX why?"),
CompactionError::AlreadyRunning(_) => {}
}
}
@@ -3411,6 +3408,9 @@ impl TenantShard {
if let Some(ref walredo_mgr) = self.walredo_mgr {
walredo_mgr.maybe_quiesce(WALREDO_IDLE_TIMEOUT);
}
// Update the feature resolver with the latest tenant-spcific data.
self.feature_resolver.update_cached_tenant_properties(self);
}
pub fn timeline_has_no_attached_children(&self, timeline_id: TimelineId) -> bool {
@@ -4493,7 +4493,10 @@ impl TenantShard {
gc_block: Default::default(),
l0_flush_global_state,
basebackup_cache,
feature_resolver,
feature_resolver: TenantFeatureResolver::new(
feature_resolver,
tenant_shard_id.tenant_id,
),
}
}
@@ -4532,6 +4535,10 @@ impl TenantShard {
Ok(toml_edit::de::from_str::<LocationConf>(&config)?)
}
/// Stores a tenant location config to disk.
///
/// NB: make sure to call `ShardIdentity::assert_equal` before persisting a new config, to avoid
/// changes to shard parameters that may result in data corruption.
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
pub(super) async fn persist_tenant_config(
conf: &'static PageServerConf,

View File

@@ -12,6 +12,7 @@
use pageserver_api::models;
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
use serde::{Deserialize, Serialize};
use utils::critical;
use utils::generation::Generation;
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -171,6 +172,16 @@ impl LocationConf {
}
}
// This should never happen.
// TODO: turn this into a proper assertion.
if stripe_size != self.shard.stripe_size {
critical!(
"stripe size mismatch: {} != {}",
self.shard.stripe_size,
stripe_size,
);
}
self.shard.stripe_size = stripe_size;
}

View File

@@ -880,6 +880,9 @@ impl TenantManager {
// phase of writing config and/or waiting for flush, before returning.
match fast_path_taken {
Some(FastPathModified::Attached(tenant)) => {
tenant
.shard_identity
.assert_equal(new_location_config.shard);
TenantShard::persist_tenant_config(
self.conf,
&tenant_shard_id,
@@ -914,7 +917,10 @@ impl TenantManager {
return Ok(Some(tenant));
}
Some(FastPathModified::Secondary(_secondary_tenant)) => {
Some(FastPathModified::Secondary(secondary_tenant)) => {
secondary_tenant
.shard_identity
.assert_equal(new_location_config.shard);
TenantShard::persist_tenant_config(
self.conf,
&tenant_shard_id,
@@ -948,6 +954,10 @@ impl TenantManager {
match slot_guard.get_old_value() {
Some(TenantSlot::Attached(tenant)) => {
tenant
.shard_identity
.assert_equal(new_location_config.shard);
// The case where we keep a Tenant alive was covered above in the special case
// for Attached->Attached transitions in the same generation. By this point,
// if we see an attached tenant we know it will be discarded and should be
@@ -981,9 +991,13 @@ impl TenantManager {
// rather than assuming it to be empty.
spawn_mode = SpawnMode::Eager;
}
Some(TenantSlot::Secondary(state)) => {
Some(TenantSlot::Secondary(secondary_tenant)) => {
secondary_tenant
.shard_identity
.assert_equal(new_location_config.shard);
info!("Shutting down secondary tenant");
state.shutdown().await;
secondary_tenant.shutdown().await;
}
Some(TenantSlot::InProgress(_)) => {
// This should never happen: acquire_slot should error out

View File

@@ -101,7 +101,7 @@ pub(crate) struct SecondaryTenant {
// Secondary mode does not need the full shard identity or the pageserver_api::models::TenantConfig. However,
// storing these enables us to report our full LocationConf, enabling convenient reconciliation
// by the control plane (see [`Self::get_location_conf`])
shard_identity: ShardIdentity,
pub(crate) shard_identity: ShardIdentity,
tenant_conf: std::sync::Mutex<pageserver_api::models::TenantConfig>,
// Internal state used by the Downloader.

View File

@@ -182,7 +182,7 @@ impl BatchLayerWriter {
/// An image writer that takes images and produces multiple image layers.
#[must_use]
pub struct SplitImageLayerWriter<'a> {
inner: ImageLayerWriter,
inner: Option<ImageLayerWriter>,
target_layer_size: u64,
lsn: Lsn,
conf: &'static PageServerConf,
@@ -196,7 +196,7 @@ pub struct SplitImageLayerWriter<'a> {
impl<'a> SplitImageLayerWriter<'a> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
pub fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
@@ -205,22 +205,10 @@ impl<'a> SplitImageLayerWriter<'a> {
target_layer_size: u64,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
) -> Self {
Self {
target_layer_size,
// XXX make this lazy like in SplitDeltaLayerWriter?
inner: ImageLayerWriter::new(
conf,
timeline_id,
tenant_shard_id,
&(start_key..Key::MAX),
lsn,
gate,
cancel.clone(),
ctx,
)
.await?,
inner: None,
conf,
timeline_id,
tenant_shard_id,
@@ -229,7 +217,7 @@ impl<'a> SplitImageLayerWriter<'a> {
start_key,
gate,
cancel,
})
}
}
pub async fn put_image(
@@ -238,12 +226,31 @@ impl<'a> SplitImageLayerWriter<'a> {
img: Bytes,
ctx: &RequestContext,
) -> Result<(), PutError> {
if self.inner.is_none() {
self.inner = Some(
ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&(self.start_key..Key::MAX),
self.lsn,
self.gate,
self.cancel.clone(),
ctx,
)
.await
.map_err(PutError::Other)?,
);
}
let inner = self.inner.as_mut().unwrap();
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
if self.inner.num_keys() >= 1
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
if inner.num_keys() >= 1
&& inner.estimated_size() + addition_size_estimation >= self.target_layer_size
{
let next_image_writer = ImageLayerWriter::new(
self.conf,
@@ -257,7 +264,7 @@ impl<'a> SplitImageLayerWriter<'a> {
)
.await
.map_err(PutError::Other)?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
let prev_image_writer = std::mem::replace(inner, next_image_writer);
self.batches.add_unfinished_image_writer(
prev_image_writer,
self.start_key..key,
@@ -265,7 +272,7 @@ impl<'a> SplitImageLayerWriter<'a> {
);
self.start_key = key;
}
self.inner.put_image(key, img, ctx).await
inner.put_image(key, img, ctx).await
}
pub(crate) async fn finish_with_discard_fn<D, F>(
@@ -282,8 +289,10 @@ impl<'a> SplitImageLayerWriter<'a> {
let Self {
mut batches, inner, ..
} = self;
if inner.num_keys() != 0 {
batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
if let Some(inner) = inner {
if inner.num_keys() != 0 {
batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
}
}
batches.finish_with_discard_fn(tline, ctx, discard_fn).await
}
@@ -498,10 +507,7 @@ mod tests {
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
.unwrap();
);
let mut delta_writer = SplitDeltaLayerWriter::new(
tenant.conf,
@@ -577,10 +583,7 @@ mod tests {
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
.unwrap();
);
let mut delta_writer = SplitDeltaLayerWriter::new(
tenant.conf,
tline.timeline_id,
@@ -676,10 +679,7 @@ mod tests {
4 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
.unwrap();
);
let mut delta_writer = SplitDeltaLayerWriter::new(
tenant.conf,

View File

@@ -17,14 +17,17 @@ use tracing::*;
use utils::backoff::exponential_backoff_duration;
use utils::completion::Barrier;
use utils::pausable_failpoint;
use utils::sync::gate::GateError;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
use crate::tenant::blob_io::WriteBlobError;
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::compaction::CompactionOutcome;
use crate::tenant::timeline::{CheckOtherForCancel, CompactionError};
use crate::tenant::{TenantShard, TenantState};
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
/// Semaphore limiting concurrent background tasks (across all tenants).
///
@@ -292,12 +295,48 @@ pub(crate) fn log_compaction_error(
task_cancelled: bool,
degrade_to_warning: bool,
) {
let is_cancel = err.is_cancel(CheckOtherForCancel::Yes);
use CompactionError::*;
let level = if is_cancel || task_cancelled {
Level::INFO
} else {
Level::ERROR
use crate::tenant::PageReconstructError;
use crate::tenant::upload_queue::NotInitialized;
let level = match err {
e if e.is_cancel() => return,
ShuttingDown => return,
Offload(_) => Level::ERROR,
AlreadyRunning(_) => Level::ERROR,
CollectKeySpaceError(_) => Level::ERROR,
_ if task_cancelled => Level::INFO,
Other(err) => {
let root_cause = err.root_cause();
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
let buffered_writer_flush_task_canelled = root_cause
.downcast_ref::<FlushTaskError>()
.is_some_and(|e| e.is_cancel());
let write_blob_cancelled = root_cause
.downcast_ref::<WriteBlobError>()
.is_some_and(|e| e.is_cancel());
let gate_closed = root_cause
.downcast_ref::<GateError>()
.is_some_and(|e| e.is_cancel());
let is_stopping = upload_queue
|| timeline
|| buffered_writer_flush_task_canelled
|| write_blob_cancelled
|| gate_closed;
if is_stopping {
Level::INFO
} else {
Level::ERROR
}
}
};
if let Some((error_count, sleep_duration)) = retry_info {

View File

@@ -77,7 +77,7 @@ use utils::postgres_client::PostgresClientProtocol;
use utils::rate_limit::RateLimit;
use utils::seqwait::SeqWait;
use utils::simple_rcu::{Rcu, RcuReadGuard};
use utils::sync::gate::{Gate, GateError, GateGuard};
use utils::sync::gate::{Gate, GateGuard};
use utils::{completion, critical, fs_ext, pausable_failpoint};
#[cfg(test)]
use wal_decoder::models::value::Value;
@@ -106,7 +106,7 @@ use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
use crate::feature_resolver::FeatureResolver;
use crate::feature_resolver::TenantFeatureResolver;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::l0_flush::{self, L0FlushGlobalState};
use crate::metrics::{
@@ -119,7 +119,6 @@ use crate::pgdatadir_mapping::{
MAX_AUX_FILE_V2_DELTAS, MetricsUpdate,
};
use crate::task_mgr::TaskKind;
use crate::tenant::blob_io::WriteBlobError;
use crate::tenant::config::AttachmentMode;
use crate::tenant::gc_result::GcResult;
use crate::tenant::layer_map::LayerMap;
@@ -134,7 +133,6 @@ use crate::tenant::storage_layer::{
};
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use crate::walingest::WalLagCooldown;
use crate::walredo::RedoAttemptType;
@@ -204,7 +202,7 @@ pub struct TimelineResources {
pub l0_compaction_trigger: Arc<Notify>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
pub basebackup_cache: Arc<BasebackupCache>,
pub feature_resolver: FeatureResolver,
pub feature_resolver: TenantFeatureResolver,
}
pub struct Timeline {
@@ -452,7 +450,7 @@ pub struct Timeline {
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_cache: Arc<BasebackupCache>,
feature_resolver: FeatureResolver,
feature_resolver: TenantFeatureResolver,
}
pub(crate) enum PreviousHeatmap {
@@ -2064,10 +2062,9 @@ impl Timeline {
};
// Signal compaction failure to avoid L0 flush stalls when it's broken.
// XXX this looks an awful lot like the circuit breaker code? Can we dedupe classification?
match &result {
Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed),
Err(e) if e.is_cancel(CheckOtherForCancel::No /* XXX flip this to Yes so that all the Other() errors that are cancel don't trip the circuit breaker? */) => {}
Err(e) if e.is_cancel() => {}
Err(CompactionError::ShuttingDown) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
@@ -5311,6 +5308,7 @@ impl Timeline {
ctx: &RequestContext,
img_range: Range<Key>,
io_concurrency: IoConcurrency,
progress: Option<(usize, usize)>,
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
let mut wrote_keys = false;
@@ -5387,11 +5385,15 @@ impl Timeline {
}
}
let progress_report = progress
.map(|(idx, total)| format!("({idx}/{total}) "))
.unwrap_or_default();
if wrote_keys {
// Normal path: we have written some data into the new image layer for this
// partition, so flush it to disk.
info!(
"produced image layer for rel {}",
"{} produced image layer for rel {}",
progress_report,
ImageLayerName {
key_range: img_range.clone(),
lsn
@@ -5401,7 +5403,12 @@ impl Timeline {
unfinished_image_layer: image_layer_writer,
})
} else {
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
tracing::debug!(
"{} no data in range {}-{}",
progress_report,
img_range.start,
img_range.end
);
Ok(ImageLayerCreationOutcome::Empty)
}
}
@@ -5636,7 +5643,8 @@ impl Timeline {
}
}
for partition in partition_parts.iter() {
let total = partition_parts.len();
for (idx, partition) in partition_parts.iter().enumerate() {
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
}
@@ -5721,6 +5729,7 @@ impl Timeline {
ctx,
img_range.clone(),
io_concurrency,
Some((idx, total)),
)
.await?
} else {
@@ -5998,61 +6007,19 @@ pub(crate) enum CompactionError {
AlreadyRunning(&'static str),
}
/// Whether [`CompactionError::is_cancel`] should inspect the
/// [`CompactionError::Other`] anyhow Error's root cause for
/// typical causes of cancellation.
pub(crate) enum CheckOtherForCancel {
No,
Yes,
}
impl CompactionError {
/// Errors that can be ignored, i.e., cancel and shutdown.
pub fn is_cancel(&self, check_other: CheckOtherForCancel) -> bool {
if matches!(
pub fn is_cancel(&self) -> bool {
matches!(
self,
Self::ShuttingDown
| Self::AlreadyRunning(_) // XXX why do we treat AlreadyRunning as cancel?
| Self::AlreadyRunning(_)
| Self::CollectKeySpaceError(CollectKeySpaceError::Cancelled)
| Self::CollectKeySpaceError(CollectKeySpaceError::PageRead(
PageReconstructError::Cancelled
))
| Self::Offload(OffloadError::Cancelled)
) {
return true;
}
let root_cause = match &check_other {
CheckOtherForCancel::No => return false,
CheckOtherForCancel::Yes => {
if let Self::Other(other) = self {
other.root_cause()
} else {
return false;
}
}
};
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
let buffered_writer_flush_task_canelled = root_cause
.downcast_ref::<FlushTaskError>()
.is_some_and(|e| e.is_cancel());
let write_blob_cancelled = root_cause
.downcast_ref::<WriteBlobError>()
.is_some_and(|e| e.is_cancel());
let gate_closed = root_cause
.downcast_ref::<GateError>()
.is_some_and(|e| e.is_cancel());
upload_queue
|| timeline
|| buffered_writer_flush_task_canelled
|| write_blob_cancelled
|| gate_closed
)
}
/// Critical errors that indicate data corruption.

View File

@@ -9,11 +9,11 @@ use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::{Duration, Instant};
use super::layer_manager::{LayerManagerLockHolder, LayerManagerReadGuard};
use super::layer_manager::LayerManagerLockHolder;
use super::{
CheckOtherForCancel, CompactFlags, CompactOptions, CompactionError, CreateImageLayersError,
DurationRecorder, GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus,
RecordedDuration, Timeline,
CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
Timeline,
};
use crate::tenant::timeline::DeltaEntry;
@@ -101,7 +101,11 @@ pub enum GcCompactionQueueItem {
/// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
auto: bool,
},
SubCompactionJob(CompactOptions),
SubCompactionJob {
i: usize,
total: usize,
options: CompactOptions,
},
Notify(GcCompactionJobId, Option<Lsn>),
}
@@ -163,7 +167,7 @@ impl GcCompactionQueueItem {
running,
job_id: id.0,
}),
GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
GcCompactionQueueItem::SubCompactionJob { options, .. } => Some(CompactInfoResponse {
compact_key_range: options.compact_key_range,
compact_lsn_range: options.compact_lsn_range,
sub_compaction: options.sub_compaction,
@@ -489,7 +493,7 @@ impl GcCompactionQueue {
.map(|job| job.compact_lsn_range.end)
.max()
.unwrap();
for job in jobs {
for (i, job) in jobs.into_iter().enumerate() {
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
// until we do further refactors to allow directly call `compact_with_gc`.
let mut flags: EnumSet<CompactFlags> = EnumSet::default();
@@ -507,7 +511,11 @@ impl GcCompactionQueue {
compact_lsn_range: Some(job.compact_lsn_range.into()),
sub_compaction_max_job_size_mb: None,
};
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob {
options,
i,
total: jobs_len,
});
}
if !auto {
@@ -651,7 +659,7 @@ impl GcCompactionQueue {
}
}
}
GcCompactionQueueItem::SubCompactionJob(options) => {
GcCompactionQueueItem::SubCompactionJob { options, i, total } => {
// TODO: error handling, clear the queue if any task fails?
let _gc_guard = match gc_block.start().await {
Ok(guard) => guard,
@@ -663,6 +671,7 @@ impl GcCompactionQueue {
)));
}
};
info!("running gc-compaction subcompaction job {}/{}", i, total);
let res = timeline.compact_with_options(cancel, options, ctx).await;
let compaction_result = match res {
Ok(res) => res,
@@ -1310,7 +1319,7 @@ impl Timeline {
|| cfg!(feature = "testing")
|| self
.feature_resolver
.evaluate_boolean("image-compaction-boundary", self.tenant_shard_id.tenant_id)
.evaluate_boolean("image-compaction-boundary")
.is_ok()
{
let last_repartition_lsn = self.partitioning.read().1;
@@ -1405,7 +1414,7 @@ impl Timeline {
// Suppress errors when cancelled.
Err(_) if self.cancel.is_cancelled() => {}
Err(err) if err.is_cancel(CheckOtherForCancel::No) => {}
Err(err) if err.is_cancel() => {}
// Alert on critical errors that indicate data corruption.
Err(err) if err.is_critical() => {
@@ -1591,13 +1600,15 @@ impl Timeline {
let started = Instant::now();
let mut replace_image_layers = Vec::new();
let total = layers_to_rewrite.len();
for layer in layers_to_rewrite {
for (i, layer) in layers_to_rewrite.into_iter().enumerate() {
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
info!(layer=%layer, "rewriting layer after shard split");
info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total);
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
@@ -1779,20 +1790,14 @@ impl Timeline {
} = {
let phase1_span = info_span!("compact_level0_phase1");
let ctx = ctx.attached_child();
let mut stats = CompactLevel0Phase1StatsBuilder {
let stats = CompactLevel0Phase1StatsBuilder {
version: Some(2),
tenant_id: Some(self.tenant_shard_id),
timeline_id: Some(self.timeline_id),
..Default::default()
};
let begin = tokio::time::Instant::now();
let phase1_layers_locked = self.layers.read(LayerManagerLockHolder::Compaction).await;
let now = tokio::time::Instant::now();
stats.read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
self.compact_level0_phase1(
phase1_layers_locked,
stats,
target_file_size,
force_compaction_ignore_threshold,
@@ -1813,16 +1818,19 @@ impl Timeline {
}
/// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
async fn compact_level0_phase1<'a>(
self: &'a Arc<Self>,
guard: LayerManagerReadGuard<'a>,
async fn compact_level0_phase1(
self: &Arc<Self>,
mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64,
force_compaction_ignore_threshold: bool,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
stats.read_lock_held_spawn_blocking_startup_micros =
stats.read_lock_acquisition_micros.till_now(); // set by caller
let begin = tokio::time::Instant::now();
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let now = tokio::time::Instant::now();
stats.read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
let layers = guard.layer_map()?;
let level0_deltas = layers.level0_deltas();
stats.level0_deltas_count = Some(level0_deltas.len());
@@ -1857,6 +1865,12 @@ impl Timeline {
.map(|x| guard.get_from_desc(x))
.collect::<Vec<_>>();
drop_layer_manager_rlock(guard);
// The is the last LSN that we have seen for L0 compaction in the timeline. This LSN might be updated
// by the time we finish the compaction. So we need to get it here.
let l0_last_record_lsn = self.get_last_record_lsn();
// Gather the files to compact in this iteration.
//
// Start with the oldest Level 0 delta file, and collect any other
@@ -1944,9 +1958,7 @@ impl Timeline {
// we don't accidentally use it later in the function.
drop(level0_deltas);
stats.read_lock_held_prerequisites_micros = stats
.read_lock_held_spawn_blocking_startup_micros
.till_now();
stats.compaction_prerequisites_micros = stats.read_lock_acquisition_micros.till_now();
// TODO: replace with streaming k-merge
let all_keys = {
@@ -1968,7 +1980,7 @@ impl Timeline {
all_keys
};
stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
stats.read_lock_held_key_sort_micros = stats.compaction_prerequisites_micros.till_now();
// Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
//
@@ -2002,7 +2014,6 @@ impl Timeline {
}
}
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
@@ -2021,8 +2032,12 @@ impl Timeline {
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size =
layers.image_coverage(&key_range, last_record_lsn).len();
let coverage_size = {
// TODO: optimize this with copy-on-write layer map.
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let layers = guard.layer_map()?;
layers.image_coverage(&key_range, l0_last_record_lsn).len()
};
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
@@ -2041,7 +2056,6 @@ impl Timeline {
holes
};
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
drop_layer_manager_rlock(guard);
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
@@ -2382,9 +2396,8 @@ struct CompactLevel0Phase1StatsBuilder {
tenant_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
read_lock_acquisition_micros: DurationRecorder,
read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
read_lock_held_key_sort_micros: DurationRecorder,
read_lock_held_prerequisites_micros: DurationRecorder,
compaction_prerequisites_micros: DurationRecorder,
read_lock_held_compute_holes_micros: DurationRecorder,
read_lock_drop_micros: DurationRecorder,
write_layer_files_micros: DurationRecorder,
@@ -2399,9 +2412,8 @@ struct CompactLevel0Phase1Stats {
tenant_id: TenantShardId,
timeline_id: TimelineId,
read_lock_acquisition_micros: RecordedDuration,
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
read_lock_held_key_sort_micros: RecordedDuration,
read_lock_held_prerequisites_micros: RecordedDuration,
compaction_prerequisites_micros: RecordedDuration,
read_lock_held_compute_holes_micros: RecordedDuration,
read_lock_drop_micros: RecordedDuration,
write_layer_files_micros: RecordedDuration,
@@ -2426,16 +2438,12 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
.read_lock_acquisition_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
read_lock_held_spawn_blocking_startup_micros: value
.read_lock_held_spawn_blocking_startup_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
read_lock_held_key_sort_micros: value
.read_lock_held_key_sort_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
read_lock_held_prerequisites_micros: value
.read_lock_held_prerequisites_micros
compaction_prerequisites_micros: value
.compaction_prerequisites_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
read_lock_held_compute_holes_micros: value
@@ -3503,22 +3511,16 @@ impl Timeline {
// Only create image layers when there is no ancestor branches. TODO: create covering image layer
// when some condition meet.
let mut image_layer_writer = if !has_data_below {
Some(
SplitImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
job_desc.compaction_key_range.start,
lowest_retain_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
.context("failed to create image layer writer")
.map_err(CompactionError::Other)?,
)
Some(SplitImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
job_desc.compaction_key_range.start,
lowest_retain_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
))
} else {
None
};
@@ -4352,6 +4354,7 @@ impl TimelineAdaptor {
ctx,
key_range.clone(),
IoConcurrency::sequential(),
None,
)
.await?;

View File

@@ -885,7 +885,7 @@ async fn remote_copy(
}
tracing::info!("Deleting orphan layer file to make way for hard linking");
// Delete orphan layer file and try again, to ensure this layer has a well understood source
std::fs::remove_file(adopted_path)
std::fs::remove_file(&adoptee_path)
.map_err(|e| Error::launder(e.into(), Error::Prepare))?;
std::fs::hard_link(adopted_path, &adoptee_path)
.map_err(|e| Error::launder(e.into(), Error::Prepare))?;

View File

@@ -78,16 +78,6 @@ pub(crate) trait ReportableError: fmt::Display + Send + 'static {
fn get_error_kind(&self) -> ErrorKind;
}
impl ReportableError for postgres_client::error::Error {
fn get_error_kind(&self) -> ErrorKind {
if self.as_db_error().is_some() {
ErrorKind::Postgres
} else {
ErrorKind::Compute
}
}
}
/// Flattens `Result<Result<T>>` into `Result<T>`.
pub fn flatten_err<T>(r: Result<anyhow::Result<T>, JoinError>) -> anyhow::Result<T> {
r.context("join error").and_then(|x| x)

View File

@@ -404,7 +404,15 @@ impl ReportableError for HttpConnError {
fn get_error_kind(&self) -> ErrorKind {
match self {
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
HttpConnError::PostgresConnectionError(p) => p.get_error_kind(),
HttpConnError::PostgresConnectionError(p) => {
if p.as_db_error().is_some() {
// postgres rejected the connection
ErrorKind::Postgres
} else {
// couldn't even reach postgres
ErrorKind::Compute
}
}
HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
HttpConnError::ComputeCtl(_) => ErrorKind::Service,
HttpConnError::JwtPayloadError(_) => ErrorKind::User,

View File

@@ -22,7 +22,7 @@ use serde_json::Value;
use serde_json::value::RawValue;
use tokio::time::{self, Instant};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
use tracing::{Level, debug, error, info};
use typed_json::json;
use url::Url;
use uuid::Uuid;
@@ -390,12 +390,35 @@ pub(crate) async fn handle(
let line = get(db_error, |db| db.line().map(|l| l.to_string()));
let routine = get(db_error, |db| db.routine());
tracing::info!(
kind=error_kind.to_metric_label(),
error=%e,
msg=message,
"forwarding error to user"
);
match &e {
SqlOverHttpError::Postgres(e)
if e.as_db_error().is_some() && error_kind == ErrorKind::User =>
{
// this error contains too much info, and it's not an error we care about.
if tracing::enabled!(Level::DEBUG) {
tracing::debug!(
kind=error_kind.to_metric_label(),
error=%e,
msg=message,
"forwarding error to user"
);
} else {
tracing::info!(
kind = error_kind.to_metric_label(),
error = "bad query",
"forwarding error to user"
);
}
}
_ => {
tracing::info!(
kind=error_kind.to_metric_label(),
error=%e,
msg=message,
"forwarding error to user"
);
}
}
json_response(
e.get_http_status_code(),
@@ -460,7 +483,15 @@ impl ReportableError for SqlOverHttpError {
SqlOverHttpError::ConnInfo(e) => e.get_error_kind(),
SqlOverHttpError::ResponseTooLarge(_) => ErrorKind::User,
SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User,
SqlOverHttpError::Postgres(p) => p.get_error_kind(),
// customer initiated SQL errors.
SqlOverHttpError::Postgres(p) => {
if p.as_db_error().is_some() {
ErrorKind::User
} else {
ErrorKind::Compute
}
}
// proxy initiated SQL errors.
SqlOverHttpError::InternalPostgres(p) => {
if p.as_db_error().is_some() {
ErrorKind::Service
@@ -468,6 +499,7 @@ impl ReportableError for SqlOverHttpError {
ErrorKind::Compute
}
}
// postgres returned a bad row format that we couldn't parse.
SqlOverHttpError::JsonConversion(_) => ErrorKind::Postgres,
SqlOverHttpError::Cancelled(c) => c.get_error_kind(),
}

View File

@@ -52,7 +52,7 @@ pub trait ResponseErrorMessageExt: Sized {
impl ResponseErrorMessageExt for reqwest::Response {
async fn error_from_body(self) -> Result<Self> {
let status = self.status();
if !(status.is_client_error() || status.is_server_error()) {
if status.is_success() {
return Ok(self);
}

View File

@@ -20,6 +20,7 @@ camino.workspace = true
chrono.workspace = true
clap.workspace = true
clashmap.workspace = true
compute_api.workspace = true
cron.workspace = true
fail.workspace = true
futures.workspace = true

View File

@@ -5,7 +5,8 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus, PageserverProtocol};
use compute_api::spec::PageserverProtocol;
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv;
use futures::StreamExt;
use hyper::StatusCode;
@@ -13,11 +14,12 @@ use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
use safekeeper_api::membership::SafekeeperGeneration;
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info_span};
use utils::backoff::{self};
use utils::id::{NodeId, TenantId};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::service::Config;
@@ -35,7 +37,7 @@ struct UnshardedComputeHookTenant {
preferred_az: Option<AvailabilityZone>,
// Must hold this lock to send a notification.
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>>,
}
struct ShardedComputeHookTenant {
stripe_size: ShardStripeSize,
@@ -48,7 +50,7 @@ struct ShardedComputeHookTenant {
// Must hold this lock to send a notification. The contents represent
// the last successfully sent notification, and are used to coalesce multiple
// updates by only sending when there is a chance since our last successful send.
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>>,
}
/// Represents our knowledge of the compute's state: we can update this when we get a
@@ -56,9 +58,9 @@ struct ShardedComputeHookTenant {
///
/// Should be wrapped in an Option<>, as we cannot always know the remote state.
#[derive(PartialEq, Eq, Debug)]
struct ComputeRemoteState {
struct ComputeRemoteState<R> {
// The request body which was acked by the compute
request: ComputeHookNotifyRequest,
request: R,
// Whether the cplane indicated that the state was applied to running computes, or just
// persisted. In the Neon control plane, this is the difference between a 423 response (meaning
@@ -66,6 +68,36 @@ struct ComputeRemoteState {
applied: bool,
}
type ComputeRemoteTenantState = ComputeRemoteState<NotifyAttachRequest>;
type ComputeRemoteTimelineState = ComputeRemoteState<NotifySafekeepersRequest>;
/// The trait which define the handler-specific types and methods.
/// We have two implementations of this trait so far:
/// - [`ComputeHookTenant`] for tenant attach notifications ("/notify-attach")
/// - [`ComputeHookTimeline`] for safekeeper change notifications ("/notify-safekeepers")
trait ApiMethod {
/// Type of the key which identifies the resource.
/// It's either TenantId for tenant attach notifications,
/// or TenantTimelineId for safekeeper change notifications.
type Key: std::cmp::Eq + std::hash::Hash + Clone;
type Request: serde::Serialize + std::fmt::Debug;
const API_PATH: &'static str;
fn maybe_send(
&self,
key: Self::Key,
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState<Self::Request>>>>,
) -> MaybeSendResult<Self::Request, Self::Key>;
async fn notify_local(
env: &LocalEnv,
cplane: &ComputeControlPlane,
req: &Self::Request,
) -> Result<(), NotifyError>;
}
enum ComputeHookTenant {
Unsharded(UnshardedComputeHookTenant),
Sharded(ShardedComputeHookTenant),
@@ -96,7 +128,7 @@ impl ComputeHookTenant {
}
}
fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>> {
fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>> {
match self {
Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
@@ -190,19 +222,136 @@ impl ComputeHookTenant {
}
}
/// The state of a timeline we need to notify the compute about.
struct ComputeHookTimeline {
generation: SafekeeperGeneration,
safekeepers: Vec<SafekeeperInfo>,
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTimelineState>>>,
}
impl ComputeHookTimeline {
/// Construct a new ComputeHookTimeline with the given safekeepers and generation.
fn new(generation: SafekeeperGeneration, safekeepers: Vec<SafekeeperInfo>) -> Self {
Self {
generation,
safekeepers,
send_lock: Arc::default(),
}
}
/// Update the state with a new SafekeepersUpdate.
/// Noop if the update generation is not greater than the current generation.
fn update(&mut self, sk_update: SafekeepersUpdate) {
if sk_update.generation > self.generation {
self.generation = sk_update.generation;
self.safekeepers = sk_update.safekeepers;
}
}
}
impl ApiMethod for ComputeHookTimeline {
type Key = TenantTimelineId;
type Request = NotifySafekeepersRequest;
const API_PATH: &'static str = "notify-safekeepers";
fn maybe_send(
&self,
ttid: TenantTimelineId,
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteTimelineState>>>,
) -> MaybeSendNotifySafekeepersResult {
let locked = match lock {
Some(already_locked) => already_locked,
None => {
// Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::timelines`] lock.
let Ok(locked) = self.send_lock.clone().try_lock_owned() else {
return MaybeSendResult::AwaitLock((ttid, self.send_lock.clone()));
};
locked
}
};
if locked
.as_ref()
.is_some_and(|s| s.request.generation >= self.generation)
{
return MaybeSendResult::Noop;
}
MaybeSendResult::Transmit((
NotifySafekeepersRequest {
tenant_id: ttid.tenant_id,
timeline_id: ttid.timeline_id,
generation: self.generation,
safekeepers: self.safekeepers.clone(),
},
locked,
))
}
async fn notify_local(
_env: &LocalEnv,
cplane: &ComputeControlPlane,
req: &NotifySafekeepersRequest,
) -> Result<(), NotifyError> {
let NotifySafekeepersRequest {
tenant_id,
timeline_id,
generation,
safekeepers,
} = req;
for (endpoint_name, endpoint) in &cplane.endpoints {
if endpoint.tenant_id == *tenant_id
&& endpoint.timeline_id == *timeline_id
&& endpoint.status() == EndpointStatus::Running
{
tracing::info!("Reconfiguring safekeepers for endpoint {endpoint_name}");
let safekeepers = safekeepers.iter().map(|sk| sk.id).collect::<Vec<_>>();
endpoint
.reconfigure_safekeepers(safekeepers, *generation)
.await
.map_err(NotifyError::NeonLocal)?;
}
}
Ok(())
}
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct ComputeHookNotifyRequestShard {
struct NotifyAttachRequestShard {
node_id: NodeId,
shard_number: ShardNumber,
}
/// Request body that we send to the control plane to notify it of where a tenant is attached
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct ComputeHookNotifyRequest {
struct NotifyAttachRequest {
tenant_id: TenantId,
preferred_az: Option<String>,
stripe_size: Option<ShardStripeSize>,
shards: Vec<ComputeHookNotifyRequestShard>,
shards: Vec<NotifyAttachRequestShard>,
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
pub(crate) struct SafekeeperInfo {
pub id: NodeId,
/// Hostname of the safekeeper.
/// It exists for better debuggability. Might be missing.
/// Should not be used for anything else.
pub hostname: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct NotifySafekeepersRequest {
tenant_id: TenantId,
timeline_id: TimelineId,
generation: SafekeeperGeneration,
safekeepers: Vec<SafekeeperInfo>,
}
/// Error type for attempts to call into the control plane compute notification hook
@@ -234,42 +383,50 @@ pub(crate) enum NotifyError {
NeonLocal(anyhow::Error),
}
enum MaybeSendResult {
enum MaybeSendResult<R, K> {
// Please send this request while holding the lock, and if you succeed then write
// the request into the lock.
Transmit(
(
ComputeHookNotifyRequest,
tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>,
R,
tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState<R>>>,
),
),
// Something requires sending, but you must wait for a current sender then call again
AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>),
AwaitLock((K, Arc<tokio::sync::Mutex<Option<ComputeRemoteState<R>>>>)),
// Nothing requires sending
Noop,
}
impl ComputeHookTenant {
type MaybeSendNotifyAttachResult = MaybeSendResult<NotifyAttachRequest, TenantId>;
type MaybeSendNotifySafekeepersResult = MaybeSendResult<NotifySafekeepersRequest, TenantTimelineId>;
impl ApiMethod for ComputeHookTenant {
type Key = TenantId;
type Request = NotifyAttachRequest;
const API_PATH: &'static str = "notify-attach";
fn maybe_send(
&self,
tenant_id: TenantId,
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>>,
) -> MaybeSendResult {
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteTenantState>>>,
) -> MaybeSendNotifyAttachResult {
let locked = match lock {
Some(already_locked) => already_locked,
None => {
// Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
// Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::tenants`] lock.
let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
return MaybeSendResult::AwaitLock((tenant_id, self.get_send_lock().clone()));
};
locked
}
};
let request = match self {
Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
Self::Unsharded(unsharded_tenant) => Some(NotifyAttachRequest {
tenant_id,
shards: vec![ComputeHookNotifyRequestShard {
shards: vec![NotifyAttachRequestShard {
shard_number: ShardNumber(0),
node_id: unsharded_tenant.node_id,
}],
@@ -282,12 +439,12 @@ impl ComputeHookTenant {
Self::Sharded(sharded_tenant)
if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
{
Some(ComputeHookNotifyRequest {
Some(NotifyAttachRequest {
tenant_id,
shards: sharded_tenant
.shards
.iter()
.map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
.map(|(shard_number, node_id)| NotifyAttachRequestShard {
shard_number: *shard_number,
node_id: *node_id,
})
@@ -332,98 +489,22 @@ impl ComputeHookTenant {
}
}
}
}
/// The compute hook is a destination for notifications about changes to tenant:pageserver
/// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
/// the compute connection string.
pub(super) struct ComputeHook {
config: Config,
state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
authorization_header: Option<String>,
// Concurrency limiter, so that we do not overload the cloud control plane when updating
// large numbers of tenants (e.g. when failing over after a node failure)
api_concurrency: tokio::sync::Semaphore,
// This lock is only used in testing enviroments, to serialize calls into neon_lock
neon_local_lock: tokio::sync::Mutex<()>,
// We share a client across all notifications to enable connection re-use etc when
// sending large numbers of notifications
client: reqwest::Client,
}
/// Callers may give us a list of these when asking us to send a bulk batch
/// of notifications in the background. This is a 'notification' in the sense of
/// other code notifying us of a shard's status, rather than being the final notification
/// that we send upwards to the control plane for the whole tenant.
pub(crate) struct ShardUpdate<'a> {
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) node_id: NodeId,
pub(crate) stripe_size: ShardStripeSize,
pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
}
impl ComputeHook {
pub(super) fn new(config: Config) -> anyhow::Result<Self> {
let authorization_header = config
.control_plane_jwt_token
.clone()
.map(|jwt| format!("Bearer {jwt}"));
let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
for cert in &config.ssl_ca_certs {
client = client.add_root_certificate(cert.clone());
}
let client = client
.build()
.context("Failed to build http client for compute hook")?;
Ok(Self {
state: Default::default(),
config,
authorization_header,
neon_local_lock: Default::default(),
api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
client,
})
}
/// For test environments: use neon_local's LocalEnv to update compute
async fn do_notify_local(
&self,
reconfigure_request: &ComputeHookNotifyRequest,
async fn notify_local(
env: &LocalEnv,
cplane: &ComputeControlPlane,
req: &NotifyAttachRequest,
) -> Result<(), NotifyError> {
// neon_local updates are not safe to call concurrently, use a lock to serialize
// all calls to this function
let _locked = self.neon_local_lock.lock().await;
let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
tracing::warn!(
"neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
);
return Ok(());
};
let env = match LocalEnv::load_config(repo_dir) {
Ok(e) => e,
Err(e) => {
tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
return Ok(());
}
};
let cplane =
ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
let ComputeHookNotifyRequest {
let NotifyAttachRequest {
tenant_id,
shards,
stripe_size,
preferred_az: _preferred_az,
} = reconfigure_request;
} = req;
for (endpoint_name, endpoint) in &cplane.endpoints {
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
tracing::info!("Reconfiguring endpoint {endpoint_name}");
tracing::info!("Reconfiguring pageservers for endpoint {endpoint_name}");
let pageservers = shards
.iter()
@@ -445,7 +526,7 @@ impl ComputeHook {
.collect::<Vec<_>>();
endpoint
.reconfigure(pageservers, *stripe_size, None)
.reconfigure_pageservers(pageservers, *stripe_size)
.await
.map_err(NotifyError::NeonLocal)?;
}
@@ -453,11 +534,102 @@ impl ComputeHook {
Ok(())
}
}
async fn do_notify_iteration(
/// The compute hook is a destination for notifications about changes to tenant:pageserver
/// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
/// the compute connection string.
pub(super) struct ComputeHook {
config: Config,
tenants: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
timelines: std::sync::Mutex<HashMap<TenantTimelineId, ComputeHookTimeline>>,
authorization_header: Option<String>,
// Concurrency limiter, so that we do not overload the cloud control plane when updating
// large numbers of tenants (e.g. when failing over after a node failure)
api_concurrency: tokio::sync::Semaphore,
// This lock is only used in testing enviroments, to serialize calls into neon_local
neon_local_lock: tokio::sync::Mutex<()>,
// We share a client across all notifications to enable connection re-use etc when
// sending large numbers of notifications
client: reqwest::Client,
}
/// Callers may give us a list of these when asking us to send a bulk batch
/// of notifications in the background. This is a 'notification' in the sense of
/// other code notifying us of a shard's status, rather than being the final notification
/// that we send upwards to the control plane for the whole tenant.
pub(crate) struct ShardUpdate<'a> {
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) node_id: NodeId,
pub(crate) stripe_size: ShardStripeSize,
pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
}
pub(crate) struct SafekeepersUpdate {
pub(crate) tenant_id: TenantId,
pub(crate) timeline_id: TimelineId,
pub(crate) generation: SafekeeperGeneration,
pub(crate) safekeepers: Vec<SafekeeperInfo>,
}
impl ComputeHook {
pub(super) fn new(config: Config) -> anyhow::Result<Self> {
let authorization_header = config
.control_plane_jwt_token
.clone()
.map(|jwt| format!("Bearer {jwt}"));
let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
for cert in &config.ssl_ca_certs {
client = client.add_root_certificate(cert.clone());
}
let client = client
.build()
.context("Failed to build http client for compute hook")?;
Ok(Self {
tenants: Default::default(),
timelines: Default::default(),
config,
authorization_header,
neon_local_lock: Default::default(),
api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
client,
})
}
/// For test environments: use neon_local's LocalEnv to update compute
async fn do_notify_local<M: ApiMethod>(&self, req: &M::Request) -> Result<(), NotifyError> {
// neon_local updates are not safe to call concurrently, use a lock to serialize
// all calls to this function
let _locked = self.neon_local_lock.lock().await;
let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
tracing::warn!(
"neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
);
return Ok(());
};
let env = match LocalEnv::load_config(repo_dir) {
Ok(e) => e,
Err(e) => {
tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
return Ok(());
}
};
let cplane =
ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
M::notify_local(&env, &cplane, req).await
}
async fn do_notify_iteration<Req: serde::Serialize + std::fmt::Debug>(
&self,
url: &String,
reconfigure_request: &ComputeHookNotifyRequest,
reconfigure_request: &Req,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let req = self.client.request(reqwest::Method::PUT, url);
@@ -479,9 +651,7 @@ impl ComputeHook {
};
// Treat all 2xx responses as success
if response.status() >= reqwest::StatusCode::OK
&& response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
{
if response.status().is_success() {
if response.status() != reqwest::StatusCode::OK {
// Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
// log a warning.
@@ -532,10 +702,10 @@ impl ComputeHook {
}
}
async fn do_notify(
async fn do_notify<R: serde::Serialize + std::fmt::Debug>(
&self,
url: &String,
reconfigure_request: &ComputeHookNotifyRequest,
reconfigure_request: &R,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
// We hold these semaphore units across all retries, rather than only across each
@@ -567,13 +737,13 @@ impl ComputeHook {
}
/// Synchronous phase: update the per-tenant state for the next intended notification
fn notify_prepare(&self, shard_update: ShardUpdate) -> MaybeSendResult {
let mut state_locked = self.state.lock().unwrap();
fn notify_attach_prepare(&self, shard_update: ShardUpdate) -> MaybeSendNotifyAttachResult {
let mut tenants_locked = self.tenants.lock().unwrap();
use std::collections::hash_map::Entry;
let tenant_shard_id = shard_update.tenant_shard_id;
let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
let tenant = match tenants_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(e) => {
let ShardUpdate {
tenant_shard_id,
@@ -597,10 +767,37 @@ impl ComputeHook {
tenant.maybe_send(tenant_shard_id.tenant_id, None)
}
async fn notify_execute(
fn notify_safekeepers_prepare(
&self,
maybe_send_result: MaybeSendResult,
tenant_shard_id: TenantShardId,
safekeepers_update: SafekeepersUpdate,
) -> MaybeSendNotifySafekeepersResult {
let mut timelines_locked = self.timelines.lock().unwrap();
let ttid = TenantTimelineId {
tenant_id: safekeepers_update.tenant_id,
timeline_id: safekeepers_update.timeline_id,
};
use std::collections::hash_map::Entry;
let timeline = match timelines_locked.entry(ttid) {
Entry::Vacant(e) => e.insert(ComputeHookTimeline::new(
safekeepers_update.generation,
safekeepers_update.safekeepers,
)),
Entry::Occupied(e) => {
let timeline = e.into_mut();
timeline.update(safekeepers_update);
timeline
}
};
timeline.maybe_send(ttid, None)
}
async fn notify_execute<M: ApiMethod>(
&self,
state: &std::sync::Mutex<HashMap<M::Key, M>>,
maybe_send_result: MaybeSendResult<M::Request, M::Key>,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
// Process result: we may get an update to send, or we may have to wait for a lock
@@ -609,7 +806,7 @@ impl ComputeHook {
MaybeSendResult::Noop => {
return Ok(());
}
MaybeSendResult::AwaitLock(send_lock) => {
MaybeSendResult::AwaitLock((key, send_lock)) => {
let send_locked = tokio::select! {
guard = send_lock.lock_owned() => {guard},
_ = cancel.cancelled() => {
@@ -620,11 +817,11 @@ impl ComputeHook {
// Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
// we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
// try_lock.
let state_locked = self.state.lock().unwrap();
let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
let state_locked = state.lock().unwrap();
let Some(resource_state) = state_locked.get(&key) else {
return Ok(());
};
match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
match resource_state.maybe_send(key, Some(send_locked)) {
MaybeSendResult::AwaitLock(_) => {
unreachable!("We supplied lock guard")
}
@@ -643,14 +840,18 @@ impl ComputeHook {
.control_plane_url
.as_ref()
.map(|control_plane_url| {
format!("{}/notify-attach", control_plane_url.trim_end_matches('/'))
format!(
"{}/{}",
control_plane_url.trim_end_matches('/'),
M::API_PATH
)
});
// We validate this at startup
let notify_url = compute_hook_url.as_ref().unwrap();
self.do_notify(notify_url, &request, cancel).await
} else {
self.do_notify_local(&request).await.map_err(|e| {
self.do_notify_local::<M>(&request).await.map_err(|e| {
// This path is for testing only, so munge the error into our prod-style error type.
tracing::error!("neon_local notification hook failed: {e}");
NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
@@ -686,7 +887,7 @@ impl ComputeHook {
/// Infallible synchronous fire-and-forget version of notify(), that sends its results to
/// a channel. Something should consume the channel and arrange to try notifying again
/// if something failed.
pub(super) fn notify_background(
pub(super) fn notify_attach_background(
self: &Arc<Self>,
notifications: Vec<ShardUpdate>,
result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
@@ -695,7 +896,7 @@ impl ComputeHook {
let mut maybe_sends = Vec::new();
for shard_update in notifications {
let tenant_shard_id = shard_update.tenant_shard_id;
let maybe_send_result = self.notify_prepare(shard_update);
let maybe_send_result = self.notify_attach_prepare(shard_update);
maybe_sends.push((tenant_shard_id, maybe_send_result))
}
@@ -714,10 +915,10 @@ impl ComputeHook {
async move {
this
.notify_execute(maybe_send_result, tenant_shard_id, &cancel)
.notify_execute(&this.tenants, maybe_send_result, &cancel)
.await.map_err(|e| (tenant_shard_id, e))
}.instrument(info_span!(
"notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
"notify_attach_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
))
})
.buffered(API_CONCURRENCY);
@@ -760,14 +961,23 @@ impl ComputeHook {
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
/// the proper pageserver nodes for a tenant.
#[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
pub(super) async fn notify<'a>(
pub(super) async fn notify_attach<'a>(
&self,
shard_update: ShardUpdate<'a>,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let tenant_shard_id = shard_update.tenant_shard_id;
let maybe_send_result = self.notify_prepare(shard_update);
self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
let maybe_send_result = self.notify_attach_prepare(shard_update);
self.notify_execute(&self.tenants, maybe_send_result, cancel)
.await
}
pub(super) async fn notify_safekeepers(
&self,
safekeepers_update: SafekeepersUpdate,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let maybe_send_result = self.notify_safekeepers_prepare(safekeepers_update);
self.notify_execute(&self.timelines, maybe_send_result, cancel)
.await
}
@@ -783,8 +993,8 @@ impl ComputeHook {
) {
use std::collections::hash_map::Entry;
let mut state_locked = self.state.lock().unwrap();
match state_locked.entry(tenant_shard_id.tenant_id) {
let mut tenants_locked = self.tenants.lock().unwrap();
match tenants_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(_) => {
// This is a valid but niche case, where the tenant was previously attached
// as a Secondary location and then detached, so has no previously notified

View File

@@ -65,7 +65,7 @@ pub(super) struct Reconciler {
pub(crate) compute_hook: Arc<ComputeHook>,
/// To avoid stalling if the cloud control plane is unavailable, we may proceed
/// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
/// past failures in [`ComputeHook::notify_attach`], but we _must_ remember that we failed
/// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry.
pub(crate) compute_notify_failure: bool,
@@ -1023,7 +1023,7 @@ impl Reconciler {
if let Some(node) = &self.intent.attached {
let result = self
.compute_hook
.notify(
.notify_attach(
compute_hook::ShardUpdate {
tenant_shard_id: self.tenant_shard_id,
node_id: node.get_id(),

View File

@@ -878,18 +878,18 @@ impl Service {
// Emit compute hook notifications for all tenants which are already stably attached. Other tenants
// will emit compute hook notifications when they reconcile.
//
// Ordering: our calls to notify_background synchronously establish a relative order for these notifications vs. any later
// Ordering: our calls to notify_attach_background synchronously establish a relative order for these notifications vs. any later
// calls into the ComputeHook for the same tenant: we can leave these to run to completion in the background and any later
// calls will be correctly ordered wrt these.
//
// Concurrency: we call notify_background for all tenants, which will create O(N) tokio tasks, but almost all of them
// Concurrency: we call notify_attach_background for all tenants, which will create O(N) tokio tasks, but almost all of them
// will just wait on the ComputeHook::API_CONCURRENCY semaphore immediately, so very cheap until they get that semaphore
// unit and start doing I/O.
tracing::info!(
"Sending {} compute notifications",
compute_notifications.len()
);
self.compute_hook.notify_background(
self.compute_hook.notify_attach_background(
compute_notifications,
bg_compute_notify_result_tx.clone(),
&self.cancel,
@@ -6281,7 +6281,7 @@ impl Service {
for (child_id, child_ps, stripe_size) in child_locations {
if let Err(e) = self
.compute_hook
.notify(
.notify_attach(
compute_hook::ShardUpdate {
tenant_shard_id: child_id,
node_id: child_ps,

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use std::time::Duration;
use super::safekeeper_reconciler::ScheduleRequest;
use crate::compute_hook;
use crate::heartbeater::SafekeeperState;
use crate::id_lock_map::trace_shared_lock;
use crate::metrics;
@@ -1198,7 +1199,11 @@ impl Service {
// 4. Call PUT configuration on safekeepers from the current set,
// delivering them joint_conf.
// TODO(diko): need to notify cplane with an updated set of safekeepers.
// Notify cplane/compute about the membership change BEFORE changing the membership on safekeepers.
// This way the compute will know about new safekeepers from joint_config before we require to
// collect a quorum from them.
self.cplane_notify_safekeepers(tenant_id, timeline_id, &joint_config)
.await?;
let results = self
.tenant_timeline_set_membership_quorum(
@@ -1305,8 +1310,55 @@ impl Service {
)
.await?;
// TODO(diko): need to notify cplane with an updated set of safekeepers.
// Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
// This way the compute will stop talking to excluded safekeepers only after we stop requiring to
// collect a quorum from them.
self.cplane_notify_safekeepers(tenant_id, timeline_id, &new_conf)
.await?;
Ok(())
}
/// Notify cplane about safekeeper membership change.
/// The cplane will receive a joint set of safekeepers as a safekeeper list.
async fn cplane_notify_safekeepers(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
mconf: &membership::Configuration,
) -> Result<(), ApiError> {
let mut safekeepers = Vec::new();
let mut ids: HashSet<_> = HashSet::new();
for member in mconf
.members
.m
.iter()
.chain(mconf.new_members.iter().flat_map(|m| m.m.iter()))
{
if ids.insert(member.id) {
safekeepers.push(compute_hook::SafekeeperInfo {
id: member.id,
hostname: Some(member.host.clone()),
});
}
}
self.compute_hook
.notify_safekeepers(
compute_hook::SafekeepersUpdate {
tenant_id,
timeline_id,
generation: mconf.generation,
safekeepers,
},
&self.cancel,
)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"failed to notify cplane about safekeeper membership change: {err}"
))
})
}
}

View File

@@ -724,15 +724,21 @@ class NeonEnvBuilder:
shutil.copytree(storcon_db_from_dir, storcon_db_to_dir, ignore=ignore_postgres_log)
assert not (storcon_db_to_dir / "postgres.log").exists()
# NB: neon_local rewrites postgresql.conf on each start based on neon_local config. No need to patch it.
# However, in this new NeonEnv, the pageservers listen on different ports, and the storage controller
# will currently reject re-attach requests from them because the NodeMetadata isn't identical.
# However, in this new NeonEnv, the pageservers and safekeepers listen on different ports, and the storage
# controller will currently reject re-attach requests from them because the NodeMetadata isn't identical.
# So, from_repo_dir patches up the the storcon database.
patch_script_path = self.repo_dir / "storage_controller_db.startup.sql"
assert not patch_script_path.exists()
patch_script = ""
for ps in self.env.pageservers:
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';"
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';\n"
for sk in self.env.safekeepers:
patch_script += f"UPDATE safekeepers SET http_port={sk.port.http}, port={sk.port.pg} WHERE id = '{sk.id}';\n"
patch_script_path.write_text(patch_script)
# Update the config with info about tenants and timelines

View File

@@ -76,6 +76,7 @@ if TYPE_CHECKING:
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION}
#
# # Build previous version of binaries and store them somewhere:
# rm -rf pg_install target
@@ -102,6 +103,7 @@ if TYPE_CHECKING:
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION}
# export NEON_BIN=target/${BUILD_TYPE}
# export POSTGRES_DISTRIB_DIR=pg_install
#

View File

@@ -399,7 +399,7 @@ def test_tx_abort_with_many_relations(
# How many relations: this number is tuned to be long enough to take tens of seconds
# if the rollback code path is buggy, tripping the test's timeout.
n = 5000
step = 2500
step = 500
def create():
# Create many relations

View File

@@ -32,10 +32,13 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
)
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
# We specify all safekeepers, so compute will connect to all of them.
# Only those from the current membership configuration will be used.
# TODO(diko): set only current safekeepers when cplane notify is implemented.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["new_sk_set"] is None
assert len(mconf["sk_set"]) == 1
assert mconf["generation"] == 1
ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"])
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
ep.safe_psql("CREATE TABLE t(a int)")
@@ -58,7 +61,16 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]
# 1 initial generation + 2 migrations on each loop iteration.
expected_gen = 1 + 2 * 3
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == expected_gen
assert ep.safe_psql("SHOW neon.safekeepers")[0][0].startswith(f"g#{expected_gen}:")
# Restart and check again to make sure data is persistent.
ep.stop()
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.start(safekeeper_generation=1, safekeepers=[3])
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]