mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
Compare commits
35 Commits
skyzh/add-
...
conrad/ove
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5dd600f9f9 | ||
|
|
2122f962d5 | ||
|
|
9971fba584 | ||
|
|
a77919f4b2 | ||
|
|
a618056770 | ||
|
|
307e1e64c8 | ||
|
|
a537b2ffd0 | ||
|
|
64353b48db | ||
|
|
79ddc803af | ||
|
|
f5070f6aa4 | ||
|
|
3b7cc4234c | ||
|
|
33abfc2b74 | ||
|
|
93b964f829 | ||
|
|
d0aaec2abb | ||
|
|
d0dc65da12 | ||
|
|
03d635b916 | ||
|
|
5cd7f936f9 | ||
|
|
101e115b38 | ||
|
|
b37bb7d7ed | ||
|
|
bef5954fd7 | ||
|
|
8477d15f95 | ||
|
|
622b3b2993 | ||
|
|
659366060d | ||
|
|
42d93031a1 | ||
|
|
d22377c754 | ||
|
|
6c70789cfd | ||
|
|
7e55497e13 | ||
|
|
40f32ea326 | ||
|
|
1d1502bc16 | ||
|
|
7eb85c56ac | ||
|
|
24d62c647f | ||
|
|
4d2e4b19c3 | ||
|
|
0691b73f53 | ||
|
|
3cf5e1386c | ||
|
|
608afc3055 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1303,6 +1303,7 @@ dependencies = [
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"indexmap 2.0.1",
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
|
||||
@@ -1971,7 +1971,8 @@ COPY --from=sql_exporter_preprocessor --chmod=0644 /home/nonroot/compute/etc/sql
|
||||
COPY --from=sql_exporter_preprocessor --chmod=0644 /home/nonroot/compute/etc/neon_collector_autoscaling.yml /etc/neon_collector_autoscaling.yml
|
||||
|
||||
# Make the libraries we built available
|
||||
RUN echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
|
||||
COPY --chmod=0666 compute/etc/ld.so.conf.d/00-neon.conf /etc/ld.so.conf.d/00-neon.conf
|
||||
RUN /sbin/ldconfig
|
||||
|
||||
# rsyslog config permissions
|
||||
# directory for rsyslogd pid file
|
||||
|
||||
1
compute/etc/ld.so.conf.d/00-neon.conf
Normal file
1
compute/etc/ld.so.conf.d/00-neon.conf
Normal file
@@ -0,0 +1 @@
|
||||
/usr/local/lib
|
||||
@@ -28,6 +28,7 @@ flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
indexmap.workspace = true
|
||||
itertools.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
|
||||
@@ -60,12 +60,16 @@ use utils::failpoint_support;
|
||||
// Compatibility hack: if the control plane specified any remote-ext-config
|
||||
// use the default value for extension storage proxy gateway.
|
||||
// Remove this once the control plane is updated to pass the gateway URL
|
||||
fn parse_remote_ext_config(arg: &str) -> Result<String> {
|
||||
if arg.starts_with("http") {
|
||||
Ok(arg.trim_end_matches('/').to_string())
|
||||
fn parse_remote_ext_base_url(arg: &str) -> Result<String> {
|
||||
const FALLBACK_PG_EXT_GATEWAY_BASE_URL: &str =
|
||||
"http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local";
|
||||
|
||||
Ok(if arg.starts_with("http") {
|
||||
arg
|
||||
} else {
|
||||
Ok("http://pg-ext-s3-gateway".to_string())
|
||||
FALLBACK_PG_EXT_GATEWAY_BASE_URL
|
||||
}
|
||||
.to_owned())
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -74,8 +78,10 @@ struct Cli {
|
||||
#[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
|
||||
pub pgbin: String,
|
||||
|
||||
#[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
|
||||
pub remote_ext_config: Option<String>,
|
||||
/// The base URL for the remote extension storage proxy gateway.
|
||||
/// Should be in the form of `http(s)://<gateway-hostname>[:<port>]`.
|
||||
#[arg(short = 'r', long, value_parser = parse_remote_ext_base_url, alias = "remote-ext-config")]
|
||||
pub remote_ext_base_url: Option<String>,
|
||||
|
||||
/// The port to bind the external listening HTTP server to. Clients running
|
||||
/// outside the compute will talk to the compute through this port. Keep
|
||||
@@ -164,7 +170,7 @@ fn main() -> Result<()> {
|
||||
pgversion: get_pg_version_string(&cli.pgbin),
|
||||
external_http_port: cli.external_http_port,
|
||||
internal_http_port: cli.internal_http_port,
|
||||
ext_remote_storage: cli.remote_ext_config.clone(),
|
||||
remote_ext_base_url: cli.remote_ext_base_url.clone(),
|
||||
resize_swap_on_bind: cli.resize_swap_on_bind,
|
||||
set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -265,4 +271,18 @@ mod test {
|
||||
fn verify_cli() {
|
||||
Cli::command().debug_assert()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_pg_ext_gateway_base_url() {
|
||||
let arg = "http://pg-ext-s3-gateway2";
|
||||
let result = super::parse_remote_ext_base_url(arg).unwrap();
|
||||
assert_eq!(result, arg);
|
||||
|
||||
let arg = "pg-ext-s3-gateway";
|
||||
let result = super::parse_remote_ext_base_url(arg).unwrap();
|
||||
assert_eq!(
|
||||
result,
|
||||
"http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -348,6 +348,7 @@ async fn run_dump_restore(
|
||||
"--no-security-labels".to_string(),
|
||||
"--no-subscriptions".to_string(),
|
||||
"--no-tablespaces".to_string(),
|
||||
"--no-event-triggers".to_string(),
|
||||
// format
|
||||
"--format".to_string(),
|
||||
"directory".to_string(),
|
||||
|
||||
@@ -11,6 +11,7 @@ use compute_api::spec::{
|
||||
use futures::StreamExt;
|
||||
use futures::future::join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use itertools::Itertools;
|
||||
use nix::sys::signal::{Signal, kill};
|
||||
use nix::unistd::Pid;
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -18,7 +19,7 @@ use postgres;
|
||||
use postgres::NoTls;
|
||||
use postgres::error::SqlState;
|
||||
use remote_storage::{DownloadError, RemotePath};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::net::SocketAddr;
|
||||
use std::os::unix::fs::{PermissionsExt, symlink};
|
||||
use std::path::Path;
|
||||
@@ -95,7 +96,7 @@ pub struct ComputeNodeParams {
|
||||
pub internal_http_port: u16,
|
||||
|
||||
/// the address of extension storage proxy gateway
|
||||
pub ext_remote_storage: Option<String>,
|
||||
pub remote_ext_base_url: Option<String>,
|
||||
}
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
@@ -329,11 +330,39 @@ struct StartVmMonitorResult {
|
||||
impl ComputeNode {
|
||||
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
|
||||
let connstr = params.connstr.as_str();
|
||||
let conn_conf = postgres::config::Config::from_str(connstr)
|
||||
let mut conn_conf = postgres::config::Config::from_str(connstr)
|
||||
.context("cannot build postgres config from connstr")?;
|
||||
let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr)
|
||||
let mut tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr)
|
||||
.context("cannot build tokio postgres config from connstr")?;
|
||||
|
||||
// Users can set some configuration parameters per database with
|
||||
// ALTER DATABASE ... SET ...
|
||||
//
|
||||
// There are at least these parameters:
|
||||
//
|
||||
// - role=some_other_role
|
||||
// - default_transaction_read_only=on
|
||||
// - statement_timeout=1, i.e., 1ms, which will cause most of the queries to fail
|
||||
// - search_path=non_public_schema, this should be actually safe because
|
||||
// we don't call any functions in user databases, but better to always reset
|
||||
// it to public.
|
||||
//
|
||||
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
|
||||
// Unset them via connection string options before connecting to the database.
|
||||
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
|
||||
//
|
||||
// TODO(ololobus): we currently pass `-c default_transaction_read_only=off` from control plane
|
||||
// as well. After rolling out this code, we can remove this parameter from control plane.
|
||||
// In the meantime, double-passing is fine, the last value is applied.
|
||||
// See: <https://github.com/neondatabase/cloud/blob/133dd8c4dbbba40edfbad475bf6a45073ca63faf/goapp/controlplane/internal/pkg/compute/provisioner/provisioner_common.go#L70>
|
||||
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
|
||||
let options = match conn_conf.get_options() {
|
||||
Some(options) => format!("{} {}", options, EXTRA_OPTIONS),
|
||||
None => EXTRA_OPTIONS.to_string(),
|
||||
};
|
||||
conn_conf.options(&options);
|
||||
tokio_conn_conf.options(&options);
|
||||
|
||||
let mut new_state = ComputeState::new();
|
||||
if let Some(spec) = config.spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
@@ -1449,15 +1478,20 @@ impl ComputeNode {
|
||||
Err(e) => match e.code() {
|
||||
Some(&SqlState::INVALID_PASSWORD)
|
||||
| Some(&SqlState::INVALID_AUTHORIZATION_SPECIFICATION) => {
|
||||
// Connect with zenith_admin if cloud_admin could not authenticate
|
||||
// Connect with `zenith_admin` if `cloud_admin` could not authenticate
|
||||
info!(
|
||||
"cannot connect to postgres: {}, retrying with `zenith_admin` username",
|
||||
"cannot connect to Postgres: {}, retrying with 'zenith_admin' username",
|
||||
e
|
||||
);
|
||||
let mut zenith_admin_conf = postgres::config::Config::from(conf.clone());
|
||||
zenith_admin_conf.application_name("compute_ctl:apply_config");
|
||||
zenith_admin_conf.user("zenith_admin");
|
||||
|
||||
// It doesn't matter what were the options before, here we just want
|
||||
// to connect and create a new superuser role.
|
||||
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
|
||||
zenith_admin_conf.options(ZENITH_OPTIONS);
|
||||
|
||||
let mut client =
|
||||
zenith_admin_conf.connect(NoTls)
|
||||
.context("broken cloud_admin credential: tried connecting with cloud_admin but could not authenticate, and zenith_admin does not work either")?;
|
||||
@@ -1623,9 +1657,7 @@ impl ComputeNode {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
let mut conf =
|
||||
tokio_postgres::Config::from_str(self.params.connstr.as_str()).unwrap();
|
||||
conf.application_name("apply_config");
|
||||
let conf = self.get_tokio_conn_conf(Some("compute_ctl:reconfigure"));
|
||||
let conf = Arc::new(conf);
|
||||
|
||||
let spec = Arc::new(spec.clone());
|
||||
@@ -1865,9 +1897,9 @@ LIMIT 100",
|
||||
real_ext_name: String,
|
||||
ext_path: RemotePath,
|
||||
) -> Result<u64, DownloadError> {
|
||||
let ext_remote_storage =
|
||||
let remote_ext_base_url =
|
||||
self.params
|
||||
.ext_remote_storage
|
||||
.remote_ext_base_url
|
||||
.as_ref()
|
||||
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
|
||||
"Remote extensions storage is not configured",
|
||||
@@ -1929,7 +1961,7 @@ LIMIT 100",
|
||||
let download_size = extension_server::download_extension(
|
||||
&real_ext_name,
|
||||
&ext_path,
|
||||
ext_remote_storage,
|
||||
remote_ext_base_url,
|
||||
&self.params.pgbin,
|
||||
)
|
||||
.await
|
||||
@@ -1964,23 +1996,40 @@ LIMIT 100",
|
||||
tokio::spawn(conn);
|
||||
|
||||
// TODO: support other types of grants apart from schemas?
|
||||
let query = format!(
|
||||
"GRANT {} ON SCHEMA {} TO {}",
|
||||
privileges
|
||||
.iter()
|
||||
// should not be quoted as it's part of the command.
|
||||
// is already sanitized so it's ok
|
||||
.map(|p| p.as_str())
|
||||
.collect::<Vec<&'static str>>()
|
||||
.join(", "),
|
||||
// quote the schema and role name as identifiers to sanitize them.
|
||||
schema_name.pg_quote(),
|
||||
role_name.pg_quote(),
|
||||
);
|
||||
db_client
|
||||
.simple_query(&query)
|
||||
|
||||
// check the role grants first - to gracefully handle read-replicas.
|
||||
let select = "SELECT privilege_type
|
||||
FROM pg_namespace
|
||||
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) acl ON true
|
||||
JOIN pg_user users ON acl.grantee = users.usesysid
|
||||
WHERE users.usename = $1
|
||||
AND nspname = $2";
|
||||
let rows = db_client
|
||||
.query(select, &[role_name, schema_name])
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute query: {}", query))?;
|
||||
.with_context(|| format!("Failed to execute query: {select}"))?;
|
||||
|
||||
let already_granted: HashSet<String> = rows.into_iter().map(|row| row.get(0)).collect();
|
||||
|
||||
let grants = privileges
|
||||
.iter()
|
||||
.filter(|p| !already_granted.contains(p.as_str()))
|
||||
// should not be quoted as it's part of the command.
|
||||
// is already sanitized so it's ok
|
||||
.map(|p| p.as_str())
|
||||
.join(", ");
|
||||
|
||||
if !grants.is_empty() {
|
||||
// quote the schema and role name as identifiers to sanitize them.
|
||||
let schema_name = schema_name.pg_quote();
|
||||
let role_name = role_name.pg_quote();
|
||||
|
||||
let query = format!("GRANT {grants} ON SCHEMA {schema_name} TO {role_name}",);
|
||||
db_client
|
||||
.simple_query(&query)
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute query: {}", query))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2038,7 +2087,7 @@ LIMIT 100",
|
||||
&self,
|
||||
spec: &ComputeSpec,
|
||||
) -> Result<RemoteExtensionMetrics> {
|
||||
if self.params.ext_remote_storage.is_none() {
|
||||
if self.params.remote_ext_base_url.is_none() {
|
||||
return Ok(RemoteExtensionMetrics {
|
||||
num_ext_downloaded: 0,
|
||||
largest_ext_size: 0,
|
||||
|
||||
@@ -224,7 +224,10 @@ pub fn write_postgres_conf(
|
||||
writeln!(file, "pgaudit.log_rotation_age=5")?;
|
||||
|
||||
// Enable audit logs for pg_session_jwt extension
|
||||
writeln!(file, "pg_session_jwt.audit_log=on")?;
|
||||
// TODO: Consider a good approach for shipping pg_session_jwt logs to the same sink as
|
||||
// pgAudit - additional context in https://github.com/neondatabase/cloud/issues/28863
|
||||
//
|
||||
// writeln!(file, "pg_session_jwt.audit_log=on")?;
|
||||
|
||||
// Add audit shared_preload_libraries, if they are not present.
|
||||
//
|
||||
|
||||
@@ -158,14 +158,14 @@ fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
|
||||
pub async fn download_extension(
|
||||
ext_name: &str,
|
||||
ext_path: &RemotePath,
|
||||
ext_remote_storage: &str,
|
||||
remote_ext_base_url: &str,
|
||||
pgbin: &str,
|
||||
) -> Result<u64> {
|
||||
info!("Download extension {:?} from {:?}", ext_name, ext_path);
|
||||
|
||||
// TODO add retry logic
|
||||
let download_buffer =
|
||||
match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
|
||||
match download_extension_tar(remote_ext_base_url, &ext_path.to_string()).await {
|
||||
Ok(buffer) => buffer,
|
||||
Err(error_message) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
@@ -272,8 +272,8 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
|
||||
// Do request to extension storage proxy, e.g.,
|
||||
// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
|
||||
// using HTTP GET and return the response body as bytes.
|
||||
async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
|
||||
let uri = format!("{}/{}", ext_remote_storage, ext_path);
|
||||
async fn download_extension_tar(remote_ext_base_url: &str, ext_path: &str) -> Result<Bytes> {
|
||||
let uri = format!("{}/{}", remote_ext_base_url, ext_path);
|
||||
let filename = Path::new(ext_path)
|
||||
.file_name()
|
||||
.unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
|
||||
|
||||
@@ -22,7 +22,7 @@ pub(in crate::http) async fn download_extension(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
) -> Response {
|
||||
// Don't even try to download extensions if no remote storage is configured
|
||||
if compute.params.ext_remote_storage.is_none() {
|
||||
if compute.params.remote_ext_base_url.is_none() {
|
||||
return JsonResponse::error(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
"remote storage is not configured",
|
||||
|
||||
@@ -644,9 +644,10 @@ struct EndpointStartCmdArgs {
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
help = "Configure the remote extensions storage proxy gateway to request for extensions."
|
||||
help = "Configure the remote extensions storage proxy gateway URL to request for extensions.",
|
||||
alias = "remote-ext-config"
|
||||
)]
|
||||
remote_ext_config: Option<String>,
|
||||
remote_ext_base_url: Option<String>,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
@@ -1414,9 +1415,16 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
EndpointCmd::Start(args) => {
|
||||
let endpoint_id = &args.endpoint_id;
|
||||
let pageserver_id = args.endpoint_pageserver_id;
|
||||
let remote_ext_config = &args.remote_ext_config;
|
||||
let remote_ext_base_url = &args.remote_ext_base_url;
|
||||
|
||||
let safekeepers_generation = args.safekeepers_generation.map(SafekeeperGeneration::new);
|
||||
let default_generation = env
|
||||
.storage_controller
|
||||
.timelines_onto_safekeepers
|
||||
.then_some(1);
|
||||
let safekeepers_generation = args
|
||||
.safekeepers_generation
|
||||
.or(default_generation)
|
||||
.map(SafekeeperGeneration::new);
|
||||
// If --safekeepers argument is given, use only the listed
|
||||
// safekeeper nodes; otherwise all from the env.
|
||||
let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {
|
||||
@@ -1510,7 +1518,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
safekeepers_generation,
|
||||
safekeepers,
|
||||
pageservers,
|
||||
remote_ext_config.as_ref(),
|
||||
remote_ext_base_url.as_ref(),
|
||||
stripe_size.0 as usize,
|
||||
args.create_test_user,
|
||||
args.start_timeout,
|
||||
|
||||
@@ -655,7 +655,7 @@ impl Endpoint {
|
||||
safekeepers_generation: Option<SafekeeperGeneration>,
|
||||
safekeepers: Vec<NodeId>,
|
||||
pageservers: Vec<(Host, u16)>,
|
||||
remote_ext_config: Option<&String>,
|
||||
remote_ext_base_url: Option<&String>,
|
||||
shard_stripe_size: usize,
|
||||
create_test_user: bool,
|
||||
start_timeout: Duration,
|
||||
@@ -825,8 +825,8 @@ impl Endpoint {
|
||||
.stderr(logfile.try_clone()?)
|
||||
.stdout(logfile);
|
||||
|
||||
if let Some(remote_ext_config) = remote_ext_config {
|
||||
cmd.args(["--remote-ext-config", remote_ext_config]);
|
||||
if let Some(remote_ext_base_url) = remote_ext_base_url {
|
||||
cmd.args(["--remote-ext-base-url", remote_ext_base_url]);
|
||||
}
|
||||
|
||||
let child = cmd.spawn()?;
|
||||
|
||||
@@ -10,7 +10,8 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hyper0::Uri;
|
||||
use nix::unistd::Pid;
|
||||
use pageserver_api::controller_api::{
|
||||
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
|
||||
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest,
|
||||
SafekeeperSchedulingPolicyRequest, SkSchedulingPolicy, TenantCreateRequest,
|
||||
TenantCreateResponse, TenantLocateResponse,
|
||||
};
|
||||
use pageserver_api::models::{
|
||||
@@ -20,7 +21,7 @@ use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
|
||||
use pem::Pem;
|
||||
use postgres_backend::AuthType;
|
||||
use reqwest::Method;
|
||||
use reqwest::{Method, Response};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::process::Command;
|
||||
@@ -570,6 +571,11 @@ impl StorageController {
|
||||
let peer_jwt_token = encode_from_key_file(&peer_claims, private_key)
|
||||
.expect("failed to generate jwt token");
|
||||
args.push(format!("--peer-jwt-token={peer_jwt_token}"));
|
||||
|
||||
let claims = Claims::new(None, Scope::SafekeeperData);
|
||||
let jwt_token =
|
||||
encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
|
||||
args.push(format!("--safekeeper-jwt-token={jwt_token}"));
|
||||
}
|
||||
|
||||
if let Some(public_key) = &self.public_key {
|
||||
@@ -614,6 +620,10 @@ impl StorageController {
|
||||
self.env.base_data_dir.display()
|
||||
));
|
||||
|
||||
if self.env.safekeepers.iter().any(|sk| sk.auth_enabled) && self.private_key.is_none() {
|
||||
anyhow::bail!("Safekeeper set up for auth but no private key specified");
|
||||
}
|
||||
|
||||
if self.config.timelines_onto_safekeepers {
|
||||
args.push("--timelines-onto-safekeepers".to_string());
|
||||
}
|
||||
@@ -640,6 +650,10 @@ impl StorageController {
|
||||
)
|
||||
.await?;
|
||||
|
||||
if self.config.timelines_onto_safekeepers {
|
||||
self.register_safekeepers().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -743,6 +757,23 @@ impl StorageController {
|
||||
where
|
||||
RQ: Serialize + Sized,
|
||||
RS: DeserializeOwned + Sized,
|
||||
{
|
||||
let response = self.dispatch_inner(method, path, body).await?;
|
||||
Ok(response
|
||||
.json()
|
||||
.await
|
||||
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
|
||||
}
|
||||
|
||||
/// Simple HTTP request wrapper for calling into storage controller
|
||||
async fn dispatch_inner<RQ>(
|
||||
&self,
|
||||
method: reqwest::Method,
|
||||
path: String,
|
||||
body: Option<RQ>,
|
||||
) -> anyhow::Result<Response>
|
||||
where
|
||||
RQ: Serialize + Sized,
|
||||
{
|
||||
// In the special case of the `storage_controller start` subcommand, we wish
|
||||
// to use the API endpoint of the newly started storage controller in order
|
||||
@@ -785,10 +816,31 @@ impl StorageController {
|
||||
let response = builder.send().await?;
|
||||
let response = response.error_from_body().await?;
|
||||
|
||||
Ok(response
|
||||
.json()
|
||||
.await
|
||||
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Register the safekeepers in the storage controller
|
||||
#[instrument(skip(self))]
|
||||
async fn register_safekeepers(&self) -> anyhow::Result<()> {
|
||||
for sk in self.env.safekeepers.iter() {
|
||||
let sk_id = sk.id;
|
||||
let body = serde_json::json!({
|
||||
"id": sk_id,
|
||||
"created_at": "2023-10-25T09:11:25Z",
|
||||
"updated_at": "2024-08-28T11:32:43Z",
|
||||
"region_id": "aws-us-east-2",
|
||||
"host": "127.0.0.1",
|
||||
"port": sk.pg_port,
|
||||
"http_port": sk.http_port,
|
||||
"https_port": sk.https_port,
|
||||
"version": 5957,
|
||||
"availability_zone_id": format!("us-east-2b-{sk_id}"),
|
||||
});
|
||||
self.upsert_safekeeper(sk_id, body).await?;
|
||||
self.safekeeper_scheduling_policy(sk_id, SkSchedulingPolicy::Active)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call into the attach_hook API, for use before handing out attachments to pageservers
|
||||
@@ -816,6 +868,42 @@ impl StorageController {
|
||||
Ok(response.generation)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn upsert_safekeeper(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
request: serde_json::Value,
|
||||
) -> anyhow::Result<()> {
|
||||
let resp = self
|
||||
.dispatch_inner::<serde_json::Value>(
|
||||
Method::POST,
|
||||
format!("control/v1/safekeeper/{node_id}"),
|
||||
Some(request),
|
||||
)
|
||||
.await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!(
|
||||
"setting scheduling policy unsuccessful for safekeeper {node_id}: {}",
|
||||
resp.status()
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn safekeeper_scheduling_policy(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
scheduling_policy: SkSchedulingPolicy,
|
||||
) -> anyhow::Result<()> {
|
||||
self.dispatch::<SafekeeperSchedulingPolicyRequest, ()>(
|
||||
Method::POST,
|
||||
format!("control/v1/safekeeper/{node_id}/scheduling_policy"),
|
||||
Some(SafekeeperSchedulingPolicyRequest { scheduling_policy }),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn inspect(
|
||||
&self,
|
||||
|
||||
@@ -14,6 +14,14 @@ PG_VERSION=${PG_VERSION:-14}
|
||||
CONFIG_FILE_ORG=/var/db/postgres/configs/config.json
|
||||
CONFIG_FILE=/tmp/config.json
|
||||
|
||||
# Test that the first library path that the dynamic loader looks in is the path
|
||||
# that we use for custom compiled software
|
||||
first_path="$(ldconfig --verbose 2>/dev/null \
|
||||
| grep --invert-match ^$'\t' \
|
||||
| cut --delimiter=: --fields=1 \
|
||||
| head --lines=1)"
|
||||
test "$first_path" == '/usr/local/lib' || true # Remove the || true in a follow-up PR. Needed for backwards compat.
|
||||
|
||||
echo "Waiting pageserver become ready."
|
||||
while ! nc -z pageserver 6400; do
|
||||
sleep 1;
|
||||
|
||||
@@ -5,3 +5,4 @@ listen_http_addr='0.0.0.0:9898'
|
||||
remote_storage={ endpoint='http://minio:9000', bucket_name='neon', bucket_region='eu-north-1', prefix_in_bucket='/pageserver' }
|
||||
control_plane_api='http://0.0.0.0:6666' # No storage controller in docker compose, specify a junk address
|
||||
control_plane_emergency_mode=true
|
||||
virtual_file_io_mode="buffered" # the CI runners where we run the docker compose tests have slow disks
|
||||
|
||||
@@ -7,6 +7,8 @@ Author: Christian Schwarz
|
||||
|
||||
A brief RFC / GitHub Epic describing a vectored version of the `Timeline::get` method that is at the heart of Pageserver.
|
||||
|
||||
**EDIT**: the implementation of this feature is described in [Vlad's (internal) tech talk](https://drive.google.com/file/d/1vfY24S869UP8lEUUDHRWKF1AJn8fpWoJ/view?usp=drive_link).
|
||||
|
||||
# Motivation
|
||||
|
||||
During basebackup, we issue many `Timeline::get` calls for SLRU pages that are *adjacent* in key space.
|
||||
|
||||
362
docs/rfcs/2025-04-30-direct-io-for-pageserver.md
Normal file
362
docs/rfcs/2025-04-30-direct-io-for-pageserver.md
Normal file
@@ -0,0 +1,362 @@
|
||||
# Direct IO For Pageserver
|
||||
|
||||
Date: Apr 30, 2025
|
||||
|
||||
## Summary
|
||||
|
||||
This document is a retroactive RFC. It
|
||||
- provides some background on what direct IO is,
|
||||
- motivates why Pageserver should be using it for its IO, and
|
||||
- describes how we changed Pageserver to use it.
|
||||
|
||||
The [initial proposal](https://github.com/neondatabase/neon/pull/8240) that kicked off the work can be found in this closed GitHub PR.
|
||||
|
||||
People primarily involved in this project were:
|
||||
- Yuchen Liang <yuchen@neon.tech>
|
||||
- Vlad Lazar <vlad@neon.tech>
|
||||
- Christian Schwarz <christian@neon.tech>
|
||||
|
||||
## Timeline
|
||||
|
||||
For posterity, here is the rough timeline of the development work that got us to where we are today.
|
||||
|
||||
- Jan 2024: [integrate `tokio-epoll-uring`](https://github.com/neondatabase/neon/pull/5824) along with owned buffers API
|
||||
- March 2024: `tokio-epoll-uring` enabled in all regions in buffered IO mode
|
||||
- Feb 2024 to June 2024: PS PageCache Bypass For Data Blocks
|
||||
- Feb 2024: [Vectored Get Implementation](https://github.com/neondatabase/neon/pull/6576) bypasses delta & image layer blocks for page requests
|
||||
- Apr to June 2024: [Epic: bypass PageCache for use data blocks](https://github.com/neondatabase/neon/issues/7386) addresses remaining users
|
||||
- Aug to Nov 2024: direct IO: first code; preliminaries; read path coding; BufferedWriter; benchmarks show perf regressions too high, no-go.
|
||||
- Nov 2024 to Jan 2025: address perf regressions by developing page_service pipelining (aka batching) and concurrent IO ([Epic](https://github.com/neondatabase/neon/issues/9376))
|
||||
- Feb to March 2024: rollout batching, then concurrent+direct IO => read path and InMemoryLayer is now direct IO
|
||||
- Apr 2025: develop & roll out direct IO for the write path
|
||||
|
||||
## Background: Terminology & Glossary
|
||||
|
||||
**kernel page cache**: the Linux kernel's page cache is a write-back cache for filesystem contents.
|
||||
The cached unit is memory-page-sized & aligned chunks of the files that are being cached (typically 4k).
|
||||
The cache lives in kernel memory and is not directly accessible through userspace.
|
||||
|
||||
**Buffered IO**: an application's read/write system calls go through the kernel page cache.
|
||||
For example, a 10 byte sized read or write to offset 5000 in a file will load the file contents
|
||||
at offset `[4096,8192)` into a free page in the kernel page cache. If necessary, it will evict
|
||||
a page to make room (cf eviction). Then, the kernel performs a memory-to-memory copy of 10 bytes
|
||||
from/to the offset `4` (`5000 = 4096 + 4`) within the cached page. If it's a write, the kernel keeps
|
||||
track of the fact that the page is now "dirty" in some ancillary structure.
|
||||
|
||||
**Writeback**: a buffered read/write syscall returns after the memory-to-memory copy. The modifications
|
||||
made by e.g. write system calls are not even *issued* to disk, let alone durable. Instead, the kernel
|
||||
asynchronously writes back dirtied pages based on a variety of conditions. For us, the most relevant
|
||||
ones are a) explicit request by userspace (`fsync`) and b) memory pressure.
|
||||
|
||||
**Memory pressure**: the kernel page cache is a best effort service and a user of spare memory capacity.
|
||||
If there is no free memory, the kernel page allocator will take pages used by page cache to satisfy allocations.
|
||||
Before reusing a page like that, the page has to be written back (writeback, see above).
|
||||
The far-reaching consequence of this is that **any allocation of anonymous memory can do IO** if the only
|
||||
way to get that memory is by eviction & re-using a dirty page cache page.
|
||||
Notably, this includes a simple `malloc` in userspace, because eventually that boils down to `mmap(..., MAP_ANON, ...)`.
|
||||
I refer to this effect as the "malloc latency backscatter" caused by buffered IO.
|
||||
|
||||
**Direct IO** allows application's read/write system calls to bypass the kernel page cache. The filesystem
|
||||
is still involved because it is ultimately in charge of mapping the concept of files & offsets within them
|
||||
to sectors on block devices. Typically, the filesystem poses size and alignment requirements for memory buffers
|
||||
and file offsets (statx `Dio_mem_align` / `Dio_offset_align`), see [this gist](https://gist.github.com/problame/1c35cac41b7cd617779f8aae50f97155).
|
||||
The IO operations will fail at runtime with EINVAL if the alignment requirements are not met.
|
||||
|
||||
**"buffered" vs "direct"**: the central distinction between buffered and direct IO is about who allocates and
|
||||
fills the IO buffers, and who controls when exactly the IOs are issued. In buffered IO, it's the syscall handlers,
|
||||
kernel page cache, and memory management subsystems (cf "writeback"). In direct IO, all of it is done by
|
||||
the application.
|
||||
It takes more effort by the application to program with direct instead of buffered IO.
|
||||
The return is precise control over and a clear distinction between consumption/modification of memory vs disk.
|
||||
|
||||
**Pageserver PageCache**: Pageserver has an additional `PageCache` (referred to as PS PageCache from here on, as opposed to "kernel page cache").
|
||||
Its caching unit is 8KiB blocks of the layer files written by Pageserver.
|
||||
A miss in PageCache is filled by reading from the filesystem, through the `VirtualFile` abstraction layer.
|
||||
The default size is tiny (64MiB), very much like Postgres's `shared_buffers`.
|
||||
We ran production at 128MiB for a long time but gradually moved it up to 2GiB over the past ~year.
|
||||
|
||||
**VirtualFile** is Pageserver's abstraction for file IO, very similar to the facility in Postgres that bears the same name.
|
||||
Its historical purpose appears to be working around open file descriptor limitations, which is practically irrelevant on Linux.
|
||||
However, the facility in Pageserver is useful as an intermediary layer for metrics and abstracts over the different kinds of
|
||||
IO engines that Pageserver supports (`std-fs` vs `tokio-epoll-uring`).
|
||||
|
||||
## Background: History Of Caching In Pageserver
|
||||
|
||||
For multiple years, Pageserver's `PageCache` was on the path of all read _and write_ IO.
|
||||
It performed write-back to the kernel using buffered IO.
|
||||
|
||||
We converted it into a read-only cache of immutable data in [PR 4994](https://github.com/neondatabase/neon/pull/4994).
|
||||
|
||||
The introduction of `tokio-epoll-uring` required converting the code base to used owned IO buffers.
|
||||
The `PageCache` pages are usable as owned IO buffers.
|
||||
|
||||
We then started bypassing PageCache for user data blocks.
|
||||
Data blocks are the 8k blocks of data in layer files that hold the multiple `Value`s, as opposed to the disk btree index blocks that tell us which values exist in a file at what offsets.
|
||||
The disk btree embedded in delta & image layers remains `PageCache`'d.
|
||||
Epics for that work were:
|
||||
- Vectored `Timeline::get` (cf RFC 30) skipped delta and image layer data block `PageCache`ing outright.
|
||||
- Epic https://github.com/neondatabase/neon/issues/7386 took care of the remaining users for data blocks:
|
||||
- Materialized page cache (cached materialized pages; shown to be ~0% hit rate in practice)
|
||||
- InMemoryLayer
|
||||
- Compaction
|
||||
|
||||
The outcome of the above:
|
||||
1. All data blocks are always read through the `VirtualFile` APIs, hitting the kernel buffered read path (=> kernel page cache).
|
||||
2. Indirect blocks (=disk btree blocks) would be cached in the PS `PageCache`.
|
||||
|
||||
In production we size the PS `PageCache` to be 2GiB.
|
||||
Thus drives hit rate up to ~99.95% and the eviction rate / replacement rates down to less than 200/second on a 1-minute average, on the busiest machines.
|
||||
High baseline replacement rates are treated as a signal of resource exhaustion (page cache insufficient to host working set of the PS).
|
||||
The response to this is to migrate tenants away, or increase PS `PageCache` size.
|
||||
It is currently manual but could be automated, e.g., in Storage Controller.
|
||||
|
||||
In the future, we may eliminate the `PageCache` even for indirect blocks.
|
||||
For example with an LRU cache that has as unit the entire disk btree content
|
||||
instead of individual blocks.
|
||||
|
||||
## High-Level Design
|
||||
|
||||
So, before work on this project started, all data block reads and the entire write path of Pageserver were using kernel-buffered IO, i.e., the kernel page cache.
|
||||
We now want to get the kernel page cache out of the picture by using direct IO for all interaction with the filesystem.
|
||||
This achieves the following system properties:
|
||||
|
||||
**Predictable VirtualFile latencies**
|
||||
* With buffered IO, reads are sometimes fast, sometimes slow, depending on kernel page cache hit/miss.
|
||||
* With buffered IO, appends when writing out new layer files during ingest or compaction are sometimes fast, sometimes slow because of write-back backpressure.
|
||||
* With buffered IO, the "malloc backscatter" phenomenon pointed out in the Glossary section is not something we actively observe.
|
||||
But we do have occasional spikes in Dirty memory amount and Memory PSI graphs, so it may already be affecting to some degree.
|
||||
* By switching to direct IO, above operations will have the (predictable) device latency -- always.
|
||||
Reads and appends always go to disk.
|
||||
And malloc will not have to write back dirty data.
|
||||
|
||||
**Explicitness & Tangibility of resource usage**
|
||||
* In a multi-tenant system, it is generally desirable and valuable to be *explicit* about the main resources we use for each tenant.
|
||||
* By using direct IO, we become explicit about the resources *disk IOPs* and *memory capacity* in a way that was previously being conflated through the kernel page cache, outside our immediate control.
|
||||
* We will be able to build per-tenant observability of resource usage ("what tenant is causing the actual IOs that are sent to the disk?").
|
||||
* We will be able to build accounting & QoS by implementing an IO scheduler that is tenant aware. The kernel is not tenant-aware and can't do that.
|
||||
|
||||
**CPU Efficiency**
|
||||
* The involvement of the kernel page cache means one additional memory-to-memory copy on read and write path.
|
||||
* Direct IO will eliminate that memory-to-memory copy, if we can make the userspace buffers used for the IO calls satisfy direct IO alignment requirements.
|
||||
|
||||
The **trade-off** is that we no longer get the theoretical benefits of the kernel page cache. These are:
|
||||
- read latency improvements for repeat reads of the same data ("locality of reference")
|
||||
- asterisk: only if that state is still cache-resident by time of next access
|
||||
- write throughput by having kernel page cache batch small VFS writes into bigger disk writes
|
||||
- asterisk: only if memory pressure is low enough that the kernel can afford to delay writeback
|
||||
|
||||
We are **happy to make this trade-off**:
|
||||
- Because of the advantages listed above.
|
||||
- Because we empirically have enough DRAM on Pageservers to serve metadata (=index blocks) from PS PageCache.
|
||||
(At just 2GiB PS PageCache size, we average a 99.95% hit rate).
|
||||
So, the latency of going to disk is only for data block reads, not the index traversal.
|
||||
- Because **the kernel page cache is ineffective** at high tenant density anyway (#tenants/pageserver instance).
|
||||
And because dense packing of tenants will always be desirable to drive COGS down, we should design the system for it.
|
||||
(See the appendix for a more detailed explanation why this is).
|
||||
- So, we accept that some reads that used to be fast by circumstance will have higher but **predictable** latency than before.
|
||||
|
||||
### Desired End State
|
||||
|
||||
The desired end state of the project is as follows, and with some asterisks, we have achieved it.
|
||||
|
||||
All IOs of the Pageserver data path use direct IO, thereby bypassing the kernel page cache.
|
||||
|
||||
In particular, the "data path" includes
|
||||
- the wal ingest path
|
||||
- compaction
|
||||
- anything on the `Timeline::get` / `Timeline::get_vectored` path.
|
||||
|
||||
The production Pageserver config is tuned such that virtually all non-data blocks are cached in the PS PageCache.
|
||||
Hit rate target is 99.95%.
|
||||
|
||||
There are no regressions to ingest latency.
|
||||
|
||||
The total "wait-for-disk time" contribution to random getpage request latency is `O(1 read IOP latency)`.
|
||||
We accomplish that by having a near 100% PS PageCache hit rate so that layer index traversal effectively never needs not wait for IO.
|
||||
Thereby, it can issue all the data blocks as it traverses the index, and only wait at the end of it (concurrent IO).
|
||||
|
||||
The amortized "wait-for-disk time" contribution of this direct IO proposal to a series of sequential getpage requests is `1/32 * read IOP latency` for each getpage request.
|
||||
We accomplish this by server-side batching of up to 32 reads into a single `Timeline::get_vectored` call.
|
||||
(This is an ideal world where our batches are full - that's not the case in prod today because of lack of queue depth).
|
||||
|
||||
## Design & Implementation
|
||||
|
||||
### Prerequisites
|
||||
|
||||
A lot of prerequisite work had to happen to enable use of direct IO.
|
||||
|
||||
To meet the "wait-for-disk time" requirements from the DoD, we implement for the read path:
|
||||
- page_service level server-side batching (config field `page_service_pipelining`)
|
||||
- concurrent IO (config field `get_vectored_concurrent_io`)
|
||||
The work for both of these these was tracked [in the epic](https://github.com/neondatabase/neon/issues/9376).
|
||||
Server-side batching will likely be obsoleted by the [#proj-compute-communicator](https://github.com/neondatabase/neon/pull/10799).
|
||||
The Concurrent IO work is described in retroactive RFC `2025-04-30-pageserver-concurrent-io-on-read-path.md`.
|
||||
The implementation is relatively brittle and needs further investment, see the `Future Work` section in that RFC.
|
||||
|
||||
For the write path, and especially WAL ingest, we need to hide write latency.
|
||||
We accomplish this by implementing a (`BufferedWriter`) type that does double-buffering: flushes of the filled
|
||||
buffer happen in a sidecar tokio task while new writes fill a new buffer.
|
||||
We refactor InMemoryLayer as well as BlobWriter (=> delta and image layer writers) to use this new `BufferedWriter`.
|
||||
The most comprehensive write-up of this work is in [the PR description](https://github.com/neondatabase/neon/pull/11558).
|
||||
|
||||
### Ensuring Adherence to Alignment Requirements
|
||||
|
||||
Direct IO puts requirements on
|
||||
- memory buffer alignment
|
||||
- io size (=memory buffer size)
|
||||
- file offset alignment
|
||||
|
||||
The requirements are specific to a combination of filesystem/block-device/architecture(hardware page size!).
|
||||
|
||||
In Neon production environments we currently use ext4 with Linux 6.1.X on AWS and Azure storage-optimized instances (locally attached NVMe).
|
||||
Instead of dynamic discovery using `statx`, we statically hard-code 512 bytes as the buffer/offset alignment and size-multiple.
|
||||
We made this decision because:
|
||||
- a) it is compatible with all the environments we need to run in
|
||||
- b) our primary workload can be small-random-read-heavy (we do merge adjacent reads if possible, but the worst case is that all `Value`s that needs to be read are far apart)
|
||||
- c) 512-byte tail latency on the production instance types is much better than 4k (p99.9: 3x lower, p99.99 5x lower).
|
||||
- d) hard-coding at compile-time allows us to use the Rust type system to enforce the use of only aligned IO buffers, eliminating a source of runtime errors typically associated with direct IO.
|
||||
|
||||
This was [discussed here](https://neondb.slack.com/archives/C07BZ38E6SD/p1725036790965549?thread_ts=1725026845.455259&cid=C07BZ38E6SD).
|
||||
|
||||
The new `IoBufAligned` / `IoBufAlignedMut` marker traits indicate that a given buffer meets memory alignment requirements.
|
||||
All `VirtualFile` APIs and several software layers built on top of them only accept buffers that implement those traits.
|
||||
Implementors of the marker traits are:
|
||||
- `IoBuffer` / `IoBufferMut`: used for most reads and writes
|
||||
- `PageWriteGuardBuf`: for filling PS PageCache pages (index blocks!)
|
||||
|
||||
The alignment requirement is infectious; it permeates bottom-up throughout the code base.
|
||||
We stop the infection at roughly the same layers in the code base where we stopped permeating the
|
||||
use of owned-buffers-style API for tokio-epoll-uring. The way the stopping works is by introducing
|
||||
a memory-to-memory copy from/to some unaligned memory location on the stack/current/heap.
|
||||
The places where we currently stop permeating are sort of arbitrary. For example, it would probably
|
||||
make sense to replace more usage of `Bytes` that we know holds 8k pages with 8k-sized `IoBuffer`s.
|
||||
|
||||
The `IoBufAligned` / `IoBufAlignedMut` types do not protect us from the following types of runtime errors:
|
||||
- non-adherence to file offset alignment requirements
|
||||
- non-adherence to io size requirements
|
||||
|
||||
The following higher-level constructs ensure we meet the requirements:
|
||||
- read path: the `ChunkedVectoredReadBuilder` and `mod vectored_dio_read` ensure reads happen at aligned offsets and in appropriate size multiples.
|
||||
- write path: `BufferedWriter` only writes in multiples of the capacity, at offsets that are `start_offset+N*capacity`; see its doc comment.
|
||||
|
||||
Note that these types are used always, regardless of whether direct IO is enabled or not.
|
||||
There are some cases where this adds unnecessary overhead to buffered IO (e.g. all memcpy's inflated to multiples of 512).
|
||||
But we could not identify meaningful impact in practice when we shipped these changes while we were still using buffered IO.
|
||||
|
||||
### Configuration / Feature Flagging
|
||||
|
||||
In the previous section we described how all users of VirtualFile were changed to always adhere to direct IO alignment and size-multiple requirements.
|
||||
To actually enable direct IO, all we need to do is set the `O_DIRECT` flag in `open` syscalls / io_uring operations.
|
||||
|
||||
We set `O_DIRECT` based on:
|
||||
- the VirtualFile API used to create/open the VirtualFile instance
|
||||
- the `virtual_file_io_mode` configuration flag
|
||||
- the OpenOptions `read` and/or `write` flags.
|
||||
|
||||
The VirtualFile APIs suffixed with `_v2` are the only ones that _may_ open with `O_DIRECT` depending on the other two factors in above list.
|
||||
Other APIs never use `O_DIRECT`.
|
||||
(The name is bad and should really be `_maybe_direct_io`.)
|
||||
|
||||
The reason for having new APIs is because all code used VirtualFile but implementation and rollout happened in consecutive phases (read path, InMemoryLayer, write path).
|
||||
At the VirtualFile level, context on whether an instance of VirtualFile is on read path, InMemoryLayer, or write path is not available.
|
||||
|
||||
The `_v2` APIs then check make the decision to set `O_DIRECT` based on the `virtual_file_io_mode` flag and the OpenOptions `read`/`write` flags.
|
||||
The result is the following runtime behavior:
|
||||
|
||||
|what|OpenOptions|`v_f_io_mode`<br/>=`buffered`|`v_f_io_mode`<br/>=`direct`|`v_f_io_mode`<br/>=`direct-rw`|
|
||||
|-|-|-|-|-|
|
||||
|`DeltaLayerInner`|read|()|O_DIRECT|O_DIRECT|
|
||||
|`ImageLayerInner`|read|()|O_DIRECT|O_DIRECT|
|
||||
|`InMemoryLayer`|read + write|()|()*|O_DIRECT|
|
||||
|`DeltaLayerWriter`| write | () | () | O_DIRECT |
|
||||
|`ImageLayerWriter`| write | () | () | O_DIRECT |
|
||||
|`download_layer_file`|write |()|()|O_DIRECT|
|
||||
|
||||
The `InMemoryLayer` is marked with `*` because there was a period when it *did* use O_DIRECT under `=direct`.
|
||||
That period was when we implemented and shipped the first version of `BufferedWriter`.
|
||||
We used it in `InMemoryLayer` and `download_layer_file` but it was only sensitive to `v_f_io_mode` in `InMemoryLayer`.
|
||||
The introduction of `=direct-rw`, and the switch of the remaining write path to `BufferedWriter`, happened later,
|
||||
in https://github.com/neondatabase/neon/pull/11558.
|
||||
|
||||
Note that this way of feature flagging inside VirtualFile makes it less and less a general purpose POSIX file access abstraction.
|
||||
For example, with `=direct-rw` enabled, it is no longer possible to open a `VirtualFile` without `O_DIRECT`. It'll always be set.
|
||||
|
||||
## Correctness Validation
|
||||
|
||||
The correctness risks with this project were:
|
||||
- Memory safety issues in the `IoBuffer` / `IoBufferMut` implementation.
|
||||
These types expose an API that is largely identical to that of the `bytes` crate and/or Vec.
|
||||
- Runtime errors (=> downtime / unavailability) because of non-adherence to alignment/size-multiple requirements, resulting in EINVAL on the read path.
|
||||
|
||||
We sadly do not have infrastructure to run pageserver under `cargo miri`.
|
||||
So for memory safety issues, we relied on careful peer review.
|
||||
|
||||
We do assert the production-like alignment requirements in testing builds.
|
||||
However, these asserts were added retroactively.
|
||||
The actual validation before rollout happened in staging and pre-prod.
|
||||
We eventually enabled `=direct`/`=direct-rw` for Rust unit tests and the regression test suite.
|
||||
I cannot recall a single instance of staging/pre-prod/production errors caused by non-adherence to alignment/size-multiple requirements.
|
||||
Evidently developer testing was good enough.
|
||||
|
||||
## Performance Validation
|
||||
|
||||
The read path went through a lot of iterations of benchmarking in staging and pre-prod.
|
||||
The benchmarks in those environments demonstrated performance regressions early in the implementation.
|
||||
It was actually this performance testing that made us implement batching and concurrent IO to avoid unacceptable regressions.
|
||||
|
||||
The write path was much quicker to validate because `bench_ingest` covered all of the (less numerous) access patterns.
|
||||
|
||||
## Future Work
|
||||
|
||||
There is minor and major follow-up work that can be considered in the future.
|
||||
Check the (soon-to-be-closed) Epic https://github.com/neondatabase/neon/issues/8130's "Follow-Ups" section for a current list.
|
||||
|
||||
Read Path:
|
||||
- PS PageCache hit rate is crucial to unlock concurrent IO and reasonable latency for random reads generally.
|
||||
Instead of reactively sizing PS PageCache, we should estimate the required PS PageCache size
|
||||
and potentially also use that to drive placement decisions of shards from StorageController
|
||||
https://github.com/neondatabase/neon/issues/9288
|
||||
- ... unless we get rid of PS PageCache entirely and cache the index block in a more specialized cache.
|
||||
But even then, an estimation of the working set would be helpful to figure out caching strategy.
|
||||
|
||||
Write Path:
|
||||
- BlobWriter and its users could switch back to a borrowed API https://github.com/neondatabase/neon/issues/10129
|
||||
- ... unless we want to implement bypass mode for large writes https://github.com/neondatabase/neon/issues/10101
|
||||
- The `TempVirtualFile` introduced as part of this project could internalize more of the common usage pattern: https://github.com/neondatabase/neon/issues/11692
|
||||
- Reduce conditional compilation around `virtual_file_io_mode`: https://github.com/neondatabase/neon/issues/11676
|
||||
|
||||
Both:
|
||||
- A performance simulation mode that pads VirtualFile op latencies to typical NVMe latencies, even if the underlying storage is faster.
|
||||
This would avoid misleadingly good performance on developer systems and in benchmarks on systems that are less busy than production hosts.
|
||||
However, padding latencies at microsecond scale is non-trivial.
|
||||
|
||||
Misc:
|
||||
- We should finish trimming VirtualFile's scope to be truly limited to core data path read & write.
|
||||
Abstractions for reading & writing pageserver config, location config, heatmaps, etc, should use
|
||||
APIs in a different package (`VirtualFile::crashsafe_overwrite` and `VirtualFile::read_to_string`
|
||||
are good entrypoints for cleanup.) https://github.com/neondatabase/neon/issues/11809
|
||||
|
||||
# Appendix
|
||||
|
||||
## Why Kernel Page Cache Is Ineffective At Tenant High Density
|
||||
|
||||
In the Motivation section, we stated:
|
||||
|
||||
> - **The kernel page cache ineffective** at high tenant density anyways (#tenants/pageserver instance).
|
||||
|
||||
The reason is that the Pageserver workload sent from Computes is whatever is a Compute cache(s) miss.
|
||||
That's either sequential scans or random reads.
|
||||
A random read workload simply causes cache thrashing because a packed Pageserver NVMe drive (`im4gn.2xlarge`) has ~100x more capacity than DRAM available.
|
||||
It is complete waste to have the kernel page cache cache data blocks in this case.
|
||||
Sequential read workloads *can* benefit iff those pages have been updated recently (=no image layer yet) and together in time/LSN space.
|
||||
In such cases, the WAL records of those updates likely sit on the same delta layer block.
|
||||
When Compute does a sequential scan, it sends a series of single-page requests for these individual pages.
|
||||
When Pageserver processes the second request in such a series, it goes to the same delta layer block and have a kernel page cache hit.
|
||||
This dependence on kernel page cache for sequential scan performance is significant, but the solution is at a higher level than generic data block caching.
|
||||
We can either add a small per-connection LRU cache for such delta layer blocks.
|
||||
Or we can merge those sequential requests into a larger vectored get request, which is designed to never read a block twice.
|
||||
This amortizes the read latency for our delta layer block across the vectored get batch size (which currently is up to 32).
|
||||
|
||||
There are Pageserver-internal workloads that do sequential access (compaction, image layer generation), but these
|
||||
1. are not latency-critical and can do batched access outside of the `page_service` protocol constraints (image layer generation)
|
||||
2. don't actually need to reconstruct images and therefore can use totally different access methods (=> compaction can use k-way merge iterators with their own internal buffering / prefetching).
|
||||
251
docs/rfcs/2025-04-30-pageserver-concurrent-io-on-read-path.md
Normal file
251
docs/rfcs/2025-04-30-pageserver-concurrent-io-on-read-path.md
Normal file
@@ -0,0 +1,251 @@
|
||||
# Concurrent IO for Pageserver Read Path
|
||||
|
||||
Date: May 6, 2025
|
||||
|
||||
## Summary
|
||||
|
||||
This document is a retroactive RFC on the Pageserver Concurrent IO work that happened in late 2024 / early 2025.
|
||||
|
||||
The gist of it is that Pageserver's `Timeline::get_vectored` now _issues_ the data block read operations against layer files
|
||||
_as it traverses the layer map_ and only _wait_ once, for all of them, after traversal is complete.
|
||||
|
||||
Assuming a good PS PageCache hits on the index blocks during traversal, this drives down the "wait-for-disk" time
|
||||
contribution down from `random_read_io_latency * O(number_of_values)` to `random_read_io_latency * O(1 + traversal)`.
|
||||
|
||||
The motivation for why this work had to happen when it happened was the switch of Pageserver to
|
||||
- not cache user data blocks in PS PageCache and
|
||||
- switch to use direct IO.
|
||||
More context on this are given in complimentary RFC `./rfcs/2025-04-30-direct-io-for-pageserver.md`.
|
||||
|
||||
### Refs
|
||||
|
||||
- Epic: https://github.com/neondatabase/neon/issues/9378
|
||||
- Prototyping happened during the Lisbon 2024 Offsite hackathon: https://github.com/neondatabase/neon/pull/9002
|
||||
- Main implementation PR with good description: https://github.com/neondatabase/neon/issues/9378
|
||||
|
||||
Design and implementation by:
|
||||
- Vlad Lazar <vlad@neon.tech>
|
||||
- Christian Schwarz <christian@neon.tech>
|
||||
|
||||
## Background & Motivation
|
||||
|
||||
The Pageserver read path (`Timeline::get_vectored`) consists of two high-level steps:
|
||||
- Retrieve the delta and image `Value`s required to reconstruct the requested Page@LSN (`Timeline::get_values_reconstruct_data`).
|
||||
- Pass these values to walredo to reconstruct the page images.
|
||||
|
||||
The read path used to be single-key but has been made multi-key some time ago.
|
||||
([Internal tech talk by Vlad](https://drive.google.com/file/d/1vfY24S869UP8lEUUDHRWKF1AJn8fpWoJ/view?usp=drive_link))
|
||||
However, for simplicity, most of this doc will explain things in terms of a single key being requested.
|
||||
|
||||
The `Value` retrieval step above can be broken down into the following functions:
|
||||
- **Traversal** of the layer map to figure out which `Value`s from which layer files are required for the page reconstruction.
|
||||
- **Read IO Planning**: planning of the read IOs that need to be issued to the layer files / filesystem / disk.
|
||||
The main job here is to coalesce the small value reads into larger filesystem-level read operations.
|
||||
This layer also takes care of direct IO alignment and size-multiple requirements (cf the RFC for details.)
|
||||
Check `struct VectoredReadPlanner` and `mod vectored_dio_read` for how it's done.
|
||||
- **Perform the read IO** using `tokio-epoll-uring`.
|
||||
|
||||
Before this project, above functions were sequentially interleaved, meaning:
|
||||
1. we would advance traversal, ...
|
||||
2. discover, that we need to read a value, ...
|
||||
3. read it from disk using `tokio-epoll-uring`, ...
|
||||
4. goto 1 unless we're done.
|
||||
|
||||
This meant that if N `Value`s need to be read to reconstruct a page,
|
||||
the time we spend waiting for disk will be we `random_read_io_latency * O(number_of_values)`.
|
||||
|
||||
## Design
|
||||
|
||||
The **traversal** and **read IO Planning** jobs still happen sequentially, layer by layer, as before.
|
||||
But instead of performing the read IOs inline, we submit the IOs to a concurrent tokio task for execution.
|
||||
After the last read from the last layer is submitted, we wait for the IOs to complete.
|
||||
|
||||
Assuming the filesystem / disk is able to actually process the submitted IOs without queuing,
|
||||
we arrive at _time spent waiting for disk_ ~ `random_read_io_latency * O(1 + traversal)`.
|
||||
|
||||
Note this whole RFC is concerned with the steady state where all layer files required for reconstruction are resident on local NVMe.
|
||||
Traversal will stall on on-demand layer download if a layer is not yet resident.
|
||||
It cannot proceed without the layer being resident beccause its next step depends on the contents of the layer index.
|
||||
|
||||
### Avoiding Waiting For IO During Traversal
|
||||
|
||||
The `traversal` component in above time-spent-waiting-for-disk estimation is dominant and needs to be minimized.
|
||||
|
||||
Before this project, traversal needed to perform IOs for the following:
|
||||
1. The time we are waiting on PS PageCache to page in the visited layers' disk btree index blocks.
|
||||
2. When visiting a delta layer, reading the data block that contains a `Value` for a requested key,
|
||||
to determine whether the `Value::will_init` the page and therefore traversal can stop for this key.
|
||||
|
||||
The solution for (1) is to raise the PS PageCache size such that the hit rate is practically 100%.
|
||||
(Check out the `Background: History Of Caching In Pageserver` section in the RFC on Direct IO for more details.)
|
||||
|
||||
The solution for (2) is source `will_init` from the disk btree index keys, which fortunately
|
||||
already encode this bit of information since the introduction of the current storage/layer format.
|
||||
|
||||
### Concurrent IOs, Submission & Completion
|
||||
|
||||
To separate IO submission from waiting for its completion,
|
||||
we introduce the notion of an `IoConcurrency` struct through which IOs are issued.
|
||||
|
||||
An IO is an opaque future that
|
||||
- captures the `tx` side of a `oneshot` channel
|
||||
- performs the read IO by calling `VirtualFile::read_exact_at().await`
|
||||
- sending the result into the `tx`
|
||||
|
||||
Issuing an IO means `Box`ing the future above and handing that `Box` over to the `IoConcurrency` struct.
|
||||
|
||||
The traversal code that submits the IO stores the the corresponding `oneshot::Receiver`
|
||||
in the `VectoredValueReconstructState`, in the the place where we previously stored
|
||||
the sequentially read `img` and `records` fields.
|
||||
|
||||
When we're done with traversal, we wait for all submitted IOs:
|
||||
for each key, there is a future that awaits all the `oneshot::Receiver`s
|
||||
for that key, and then calls into walredo to reconstruct the page image.
|
||||
Walredo is now invoked concurrently for each value instead of sequentially.
|
||||
Walredo itself remains unchanged.
|
||||
|
||||
The spawned IO futures are driven to completion by a sidecar tokio task that
|
||||
is separate from the task that performs all the layer visiting and spawning of IOs.
|
||||
That tasks receives the IO futures via an unbounded mpsc channel and
|
||||
drives them to completion inside a `FuturedUnordered`.
|
||||
|
||||
### Error handling, Panics, Cancellation-Safety
|
||||
|
||||
There are two error classes during reconstruct data retrieval:
|
||||
* traversal errors: index lookup, move to next layer, and the like
|
||||
* value read IO errors
|
||||
|
||||
A traversal error fails the entire `get_vectored` request, as before this PR.
|
||||
A value read error only fails reconstruction of that value.
|
||||
|
||||
Panics and dropping of the `get_vectored` future before it completes
|
||||
leaves the sidecar task running and does not cancel submitted IOs
|
||||
(see next section for details on sidecar task lifecycle).
|
||||
All of this is safe, but, today's preference in the team is to close out
|
||||
all resource usage explicitly if possible, rather than cancelling + forgetting
|
||||
about it on drop. So, there is warning if we drop a
|
||||
`VectoredValueReconstructState`/`ValuesReconstructState` that still has uncompleted IOs.
|
||||
|
||||
### Sidecar Task Lifecycle
|
||||
|
||||
The sidecar tokio task is spawned as part of the `IoConcurrency::spawn_from_conf` struct.
|
||||
The `IoConcurrency` object acts as a handle through which IO futures are submitted.
|
||||
|
||||
The spawned tokio task holds the `Timeline::gate` open.
|
||||
It is _not_ sensitive to `Timeline::cancel`, but instead to the `IoConcurrency` object being dropped.
|
||||
|
||||
Once the `IoConcurrency` struct is dropped, no new IO futures can come in
|
||||
but already submitted IO futures will be driven to completion regardless.
|
||||
We _could_ safely stop polling these futures because `tokio-epoll-uring` op futures are cancel-safe.
|
||||
But the underlying kernel and hardware resources are not magically freed up by that.
|
||||
So, again, in the interest of closing out all outstanding resource usage, we make timeline shutdown wait for sidecar tasks and their IOs to complete.
|
||||
Under normal conditions, this should be in the low hundreds of microseconds.
|
||||
|
||||
It is advisable to make the `IoConcurrency` as long-lived as possible to minimize the amount of
|
||||
tokio task churn (=> lower pressure on tokio). Generally this means creating it "high up" in the call stack.
|
||||
The pain with this is that the `IoConcurrency` reference needs to be propagated "down" to
|
||||
the (short-lived) functions/scope where we issue the IOs.
|
||||
We would like to use `RequestContext` for this propagation in the future (issue [here](https://github.com/neondatabase/neon/issues/10460)).
|
||||
For now, we just add another argument to the relevant code paths.
|
||||
|
||||
### Feature Gating
|
||||
|
||||
The `IoConcurrency` is an `enum` with two variants: `Sequential` and `SidecarTask`.
|
||||
|
||||
The behavior from before this project is available through `IoConcurrency::Sequential`,
|
||||
which awaits the IO futures in place, without "spawning" or "submitting" them anywhere.
|
||||
|
||||
The `get_vectored_concurrent_io` pageserver config variable determines the runtime value,
|
||||
**except** for the places that use `IoConcurrency::sequential` to get an `IoConcurrency` object.
|
||||
|
||||
### Alternatives Explored & Caveats Encountered
|
||||
|
||||
A few words on the rationale behind having a sidecar *task* and what
|
||||
alternatives were considered but abandoned.
|
||||
|
||||
#### Why We Need A Sidecar *Task* / Why Just `FuturesUnordered` Doesn't Work
|
||||
|
||||
We explored to not have a sidecar task, and instead have a `FuturesUnordered` per
|
||||
`Timeline::get_vectored`. We would queue all IO futures in it and poll it for the
|
||||
first time after traversal is complete (i.e., at `collect_pending_ios`).
|
||||
|
||||
The obvious disadvantage, but not showstopper, is that we wouldn't be submitting
|
||||
IOs until traversal is complete.
|
||||
|
||||
The showstopper however, is that deadlocks happen if we don't drive the
|
||||
IO futures to completion independently of the traversal task.
|
||||
The reason is that both the IO futures and the traversal task may hold _some_,
|
||||
_and_ try to acquire _more_, shared limited resources.
|
||||
For example, both the travseral task and IO future may try to acquire
|
||||
* a `VirtualFile` file descriptor cache slot async mutex (observed during impl)
|
||||
* a `tokio-epoll-uring` submission slot (observed during impl)
|
||||
* a `PageCache` slot (currently this is not the case but we may move more code into the IO futures in the future)
|
||||
|
||||
#### Why We Don't Do `tokio::task`-per-IO-future
|
||||
|
||||
Another option is to spawn a short-lived `tokio::task` for each IO future.
|
||||
We implemented and benchmarked it during development, but found little
|
||||
throughput improvement and moderate mean & tail latency degradation.
|
||||
Concerns about pressure on the tokio scheduler led us to abandon this variant.
|
||||
|
||||
## Future Work
|
||||
|
||||
In addition to what is listed here, also check the "Punted" list in the epic:
|
||||
https://github.com/neondatabase/neon/issues/9378
|
||||
|
||||
### Enable `Timeline::get`
|
||||
|
||||
The only major code path that still uses `IoConcurrency::sequential` is `Timeline::get`.
|
||||
The impact is that roughly the following parts of pageserver do not benefit yet:
|
||||
- parts of basebackup
|
||||
- reads performed by the ingest path
|
||||
- most internal operations that read metadata keys (e.g. `collect_keyspace`!)
|
||||
|
||||
The solution is to propagate `IoConcurrency` via `RequestContext`:https://github.com/neondatabase/neon/issues/10460
|
||||
|
||||
The tricky part is to figure out at which level of the code the `IoConcurrency` is spawned (and added to the RequestContext).
|
||||
|
||||
Also, propagation via `RequestContext` makes makes it harder to tell during development whether a given
|
||||
piece of code uses concurrent vs sequential mode: one has to recurisvely walk up the call tree to find the
|
||||
place that puts the `IoConcurrency` into the `RequestContext`.
|
||||
We'd have to use `::Sequential` as the conservative default value in a fresh `RequestContext`, and add some
|
||||
observability to weed out places that fail to enrich with a properly spanwed `IoConcurrency::spawn_from_conf`.
|
||||
|
||||
### Concurrent On-Demand Downloads enabled by Detached Indices
|
||||
|
||||
As stated earlier, traversal stalls on on-demand download because its next step depends on the contents of the layer index.
|
||||
Once we have separated indices from data blocks (=> https://github.com/neondatabase/neon/issues/11695)
|
||||
we will only need to stall if the index is not resident. The download of the data blocks can happen concurrently or in the background. For example:
|
||||
- Move the `Layer::get_or_maybe_download().await` inside the IO futures.
|
||||
This goes in the opposite direction of the next "future work" item below, but it's easy to do.
|
||||
- Serve the IO future directly from object storage and dispatch the layer download
|
||||
to some other actor, e.g., an actor that is responsible for both downloads & eviction.
|
||||
|
||||
### New `tokio-epoll-uring` API That Separates Submission & Wait-For-Completion
|
||||
|
||||
Instead of `$op().await` style API, it would be useful to have a different `tokio-epoll-uring` API
|
||||
that separates enqueuing (without necessarily `io_uring_enter`ing the kernel each time), submission,
|
||||
and then wait for completion.
|
||||
|
||||
The `$op().await` API is too opaque, so we _have_ to stuff it into a `FuturesUnordered`.
|
||||
|
||||
A split API as sketched above would allow traversal to ensure an IO operation is enqueued to the kernel/disk (and get back-pressure iff the io_uring squeue is full).
|
||||
While avoiding spending of CPU cycles on processing of completions while we're still traversing.
|
||||
|
||||
The idea gets muddied by the fact that we may self-deadlock if we submit too much without completing.
|
||||
So, the submission part of the split API needs to process completions if squeue is full.
|
||||
|
||||
In any way, this split API is precondition for the bigger issue with the design presented here,
|
||||
which we dicsuss in the next section.
|
||||
|
||||
### Opaque Futures Are Brittle
|
||||
|
||||
The use of opaque futures to represent submitted IOs is a clever hack to minimize changes & allow for near-perfect feature-gating.
|
||||
However, we take on **brittleness** because callers must guarantee that the submitted futures are independent.
|
||||
By our experience, it is non-trivial to identify or rule out the interdependencies.
|
||||
See the lengthy doc comment on the `IoConcurrency::spawn_io` method for more details.
|
||||
|
||||
The better interface and proper subsystem boundary is a _descriptive_ struct of what needs to be done ("read this range from this VirtualFile into this buffer")
|
||||
and get back a means to wait for completion.
|
||||
The subsystem can thereby reason by its own how operations may be related;
|
||||
unlike today, where the submitted opaque future can do just about anything.
|
||||
@@ -182,6 +182,7 @@ pub struct ConfigToml {
|
||||
pub tracing: Option<Tracing>,
|
||||
pub enable_tls_page_service_api: bool,
|
||||
pub dev_mode: bool,
|
||||
pub timeline_import_config: TimelineImportConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -300,6 +301,12 @@ impl From<OtelExporterProtocol> for tracing_utils::Protocol {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct TimelineImportConfig {
|
||||
pub import_job_concurrency: NonZeroUsize,
|
||||
pub import_job_soft_size_limit: NonZeroUsize,
|
||||
}
|
||||
|
||||
pub mod statvfs {
|
||||
pub mod mock {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -659,6 +666,10 @@ impl Default for ConfigToml {
|
||||
tracing: None,
|
||||
enable_tls_page_service_api: false,
|
||||
dev_mode: false,
|
||||
timeline_import_config: TimelineImportConfig {
|
||||
import_job_concurrency: NonZeroUsize::new(128).unwrap(),
|
||||
import_job_soft_size_limit: NonZeroUsize::new(1024 * 1024 * 1024).unwrap(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1832,6 +1832,7 @@ pub mod virtual_file {
|
||||
Eq,
|
||||
Hash,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::EnumIter,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
@@ -1843,10 +1844,8 @@ pub mod virtual_file {
|
||||
/// Uses buffered IO.
|
||||
Buffered,
|
||||
/// Uses direct IO for reads only.
|
||||
#[cfg(target_os = "linux")]
|
||||
Direct,
|
||||
/// Use direct IO for reads and writes.
|
||||
#[cfg(target_os = "linux")]
|
||||
DirectRw,
|
||||
}
|
||||
|
||||
@@ -1854,26 +1853,13 @@ pub mod virtual_file {
|
||||
pub fn preferred() -> Self {
|
||||
// The default behavior when running Rust unit tests without any further
|
||||
// flags is to use the newest behavior (DirectRw).
|
||||
// The CI uses the following environment variable to unit tests for all
|
||||
// different modes.
|
||||
// The CI uses the environment variable to unit tests for all different modes.
|
||||
// NB: the Python regression & perf tests have their own defaults management
|
||||
// that writes pageserver.toml; they do not use this variable.
|
||||
if cfg!(test) {
|
||||
static CACHED: LazyLock<IoMode> = LazyLock::new(|| {
|
||||
utils::env::var_serde_json_string(
|
||||
"NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE",
|
||||
)
|
||||
.unwrap_or(
|
||||
#[cfg(target_os = "linux")]
|
||||
IoMode::DirectRw,
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
IoMode::Buffered,
|
||||
)
|
||||
});
|
||||
*CACHED
|
||||
} else {
|
||||
IoMode::Buffered
|
||||
}
|
||||
static ENV_OVERRIDE: LazyLock<Option<IoMode>> = LazyLock::new(|| {
|
||||
utils::env::var_serde_json_string("NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE")
|
||||
});
|
||||
ENV_OVERRIDE.unwrap_or(IoMode::DirectRw)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1883,9 +1869,7 @@ pub mod virtual_file {
|
||||
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
||||
Ok(match value {
|
||||
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
|
||||
#[cfg(target_os = "linux")]
|
||||
v if v == (IoMode::Direct as u8) => IoMode::Direct,
|
||||
#[cfg(target_os = "linux")]
|
||||
v if v == (IoMode::DirectRw as u8) => IoMode::DirectRw,
|
||||
x => return Err(x),
|
||||
})
|
||||
|
||||
@@ -299,6 +299,7 @@ pub struct PullTimelineRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub http_hosts: Vec<String>,
|
||||
pub ignore_tombstone: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
||||
@@ -17,7 +17,7 @@ impl std::fmt::Display for RateLimitStats {
|
||||
}
|
||||
|
||||
impl RateLimit {
|
||||
pub fn new(interval: Duration) -> Self {
|
||||
pub const fn new(interval: Duration) -> Self {
|
||||
Self {
|
||||
last: None,
|
||||
interval,
|
||||
|
||||
@@ -14,6 +14,7 @@ use pageserver_api::key::Key;
|
||||
use pageserver_api::models::virtual_file::IoMode;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::value::Value;
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -244,13 +245,7 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
];
|
||||
let exploded_parameters = {
|
||||
let mut out = Vec::new();
|
||||
for io_mode in [
|
||||
IoMode::Buffered,
|
||||
#[cfg(target_os = "linux")]
|
||||
IoMode::Direct,
|
||||
#[cfg(target_os = "linux")]
|
||||
IoMode::DirectRw,
|
||||
] {
|
||||
for io_mode in IoMode::iter() {
|
||||
for param in expect.clone() {
|
||||
let HandPickedParameters {
|
||||
volume_mib,
|
||||
|
||||
@@ -230,6 +230,8 @@ pub struct PageServerConf {
|
||||
/// such as authentication requirements for HTTP and PostgreSQL APIs.
|
||||
/// This is insecure and should only be used in development environments.
|
||||
pub dev_mode: bool,
|
||||
|
||||
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -404,6 +406,7 @@ impl PageServerConf {
|
||||
tracing,
|
||||
enable_tls_page_service_api,
|
||||
dev_mode,
|
||||
timeline_import_config,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -457,6 +460,7 @@ impl PageServerConf {
|
||||
tracing,
|
||||
enable_tls_page_service_api,
|
||||
dev_mode,
|
||||
timeline_import_config,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
|
||||
@@ -1038,21 +1038,23 @@ impl PageServerHandler {
|
||||
tracing::info_span!(
|
||||
parent: &parent_span,
|
||||
"handle_get_page_request",
|
||||
request_id = %req.hdr.reqid,
|
||||
rel = %req.rel,
|
||||
blkno = %req.blkno,
|
||||
req_lsn = %req.hdr.request_lsn,
|
||||
not_modified_since_lsn = %req.hdr.not_modified_since
|
||||
not_modified_since_lsn = %req.hdr.not_modified_since,
|
||||
)
|
||||
}};
|
||||
($shard_id:expr) => {{
|
||||
tracing::info_span!(
|
||||
parent: &parent_span,
|
||||
"handle_get_page_request",
|
||||
request_id = %req.hdr.reqid,
|
||||
rel = %req.rel,
|
||||
blkno = %req.blkno,
|
||||
req_lsn = %req.hdr.request_lsn,
|
||||
not_modified_since_lsn = %req.hdr.not_modified_since,
|
||||
shard_id = %$shard_id
|
||||
shard_id = %$shard_id,
|
||||
)
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::{PerfInstrumentFutureExt, RequestContext};
|
||||
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
@@ -275,24 +275,30 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let nblocks = match self
|
||||
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_REL_SIZE",
|
||||
reltag=%tag,
|
||||
lsn=%lsn,
|
||||
)
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(nblocks) => nblocks,
|
||||
Err(err) => {
|
||||
result_slots[response_slot_idx].write(Err(err));
|
||||
slots_filled += 1;
|
||||
continue;
|
||||
let nblocks = {
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_REL_SIZE",
|
||||
reltag=%tag,
|
||||
lsn=%lsn,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
match self
|
||||
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
{
|
||||
Ok(nblocks) => nblocks,
|
||||
Err(err) => {
|
||||
result_slots[response_slot_idx].write(Err(err));
|
||||
slots_filled += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -308,6 +314,17 @@ impl Timeline {
|
||||
|
||||
let key = rel_block_to_key(*tag, *blknum);
|
||||
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_BATCH",
|
||||
batch_size = %page_count,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let key_slots = keys_slots.entry(key).or_default();
|
||||
key_slots.push((response_slot_idx, ctx));
|
||||
|
||||
@@ -323,14 +340,7 @@ impl Timeline {
|
||||
let query = VersionedKeySpaceQuery::scattered(query);
|
||||
let res = self
|
||||
.get_vectored(query, io_concurrency, ctx)
|
||||
.maybe_perf_instrument(ctx, |current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"GET_BATCH",
|
||||
batch_size = %page_count,
|
||||
)
|
||||
})
|
||||
.maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
|
||||
.await;
|
||||
|
||||
match res {
|
||||
|
||||
@@ -94,10 +94,23 @@ impl Header {
|
||||
pub enum WriteBlobError {
|
||||
#[error(transparent)]
|
||||
Flush(FlushTaskError),
|
||||
#[error("blob too large ({len} bytes)")]
|
||||
BlobTooLarge { len: usize },
|
||||
#[error(transparent)]
|
||||
WriteBlobRaw(anyhow::Error),
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl WriteBlobError {
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
WriteBlobError::Flush(e) => e.is_cancel(),
|
||||
WriteBlobError::Other(_) => false,
|
||||
}
|
||||
}
|
||||
pub fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
WriteBlobError::Flush(e) => e.into_anyhow(),
|
||||
WriteBlobError::Other(e) => e,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockCursor<'_> {
|
||||
@@ -327,7 +340,9 @@ where
|
||||
return (
|
||||
(
|
||||
io_buf.slice_len(),
|
||||
Err(WriteBlobError::BlobTooLarge { len }),
|
||||
Err(WriteBlobError::Other(anyhow::anyhow!(
|
||||
"blob too large ({len} bytes)"
|
||||
))),
|
||||
),
|
||||
srcbuf,
|
||||
);
|
||||
@@ -391,7 +406,7 @@ where
|
||||
// Verify the header, to ensure we don't write invalid/corrupt data.
|
||||
let header = match Header::decode(&raw_with_header)
|
||||
.context("decoding blob header")
|
||||
.map_err(WriteBlobError::WriteBlobRaw)
|
||||
.map_err(WriteBlobError::Other)
|
||||
{
|
||||
Ok(header) => header,
|
||||
Err(err) => return (raw_with_header, Err(err)),
|
||||
@@ -401,7 +416,7 @@ where
|
||||
let raw_len = raw_with_header.len();
|
||||
return (
|
||||
raw_with_header,
|
||||
Err(WriteBlobError::WriteBlobRaw(anyhow::anyhow!(
|
||||
Err(WriteBlobError::Other(anyhow::anyhow!(
|
||||
"header length mismatch: {header_total_len} != {raw_len}"
|
||||
))),
|
||||
);
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
pub mod batch_split_writer;
|
||||
pub mod delta_layer;
|
||||
pub mod errors;
|
||||
pub mod filter_iterator;
|
||||
pub mod image_layer;
|
||||
pub mod inmemory_layer;
|
||||
|
||||
@@ -10,6 +10,7 @@ use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use super::errors::PutError;
|
||||
use super::layer::S3_UPLOAD_LIMIT;
|
||||
use super::{
|
||||
DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
|
||||
@@ -235,7 +236,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), PutError> {
|
||||
// The current estimation is an upper bound of the space that the key/image could take
|
||||
// because we did not consider compression in this estimation. The resulting image layer
|
||||
// could be smaller than the target size.
|
||||
@@ -253,7 +254,8 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(PutError::Other)?;
|
||||
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
|
||||
self.batches.add_unfinished_image_writer(
|
||||
prev_image_writer,
|
||||
@@ -346,7 +348,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), PutError> {
|
||||
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
|
||||
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
|
||||
//
|
||||
@@ -366,7 +368,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
.await
|
||||
.map_err(PutError::Other)?,
|
||||
));
|
||||
}
|
||||
let (_, inner) = self.inner.as_mut().unwrap();
|
||||
@@ -386,7 +389,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(PutError::Other)?;
|
||||
let (start_key, prev_delta_writer) =
|
||||
self.inner.replace((key, next_delta_writer)).unwrap();
|
||||
self.batches.add_unfinished_delta_writer(
|
||||
@@ -396,11 +400,11 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
);
|
||||
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
|
||||
// We have to produce a very large file b/c a key is updated too often.
|
||||
anyhow::bail!(
|
||||
return Err(PutError::Other(anyhow::anyhow!(
|
||||
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
|
||||
key,
|
||||
inner.estimated_size()
|
||||
);
|
||||
)));
|
||||
}
|
||||
}
|
||||
self.last_key_written = key;
|
||||
|
||||
@@ -55,6 +55,7 @@ use utils::bin_ser::SerializeError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::errors::PutError;
|
||||
use super::{
|
||||
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
|
||||
ValuesReconstructState,
|
||||
@@ -477,12 +478,15 @@ impl DeltaLayerWriterInner {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), PutError> {
|
||||
let (_, res) = self
|
||||
.put_value_bytes(
|
||||
key,
|
||||
lsn,
|
||||
Value::ser(&val)?.slice_len(),
|
||||
Value::ser(&val)
|
||||
.map_err(anyhow::Error::new)
|
||||
.map_err(PutError::Other)?
|
||||
.slice_len(),
|
||||
val.will_init(),
|
||||
ctx,
|
||||
)
|
||||
@@ -497,7 +501,7 @@ impl DeltaLayerWriterInner {
|
||||
val: FullSlice<Buf>,
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, anyhow::Result<()>)
|
||||
) -> (FullSlice<Buf>, Result<(), PutError>)
|
||||
where
|
||||
Buf: IoBuf + Send,
|
||||
{
|
||||
@@ -513,19 +517,24 @@ impl DeltaLayerWriterInner {
|
||||
.blob_writer
|
||||
.write_blob_maybe_compressed(val, ctx, compression)
|
||||
.await;
|
||||
let res = res.map_err(PutError::WriteBlob);
|
||||
let off = match res {
|
||||
Ok((off, _)) => off,
|
||||
Err(e) => return (val, Err(anyhow::anyhow!(e))),
|
||||
Err(e) => return (val, Err(e)),
|
||||
};
|
||||
|
||||
let blob_ref = BlobRef::new(off, will_init);
|
||||
|
||||
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
|
||||
let res = self.tree.append(&delta_key.0, blob_ref.0);
|
||||
let res = self
|
||||
.tree
|
||||
.append(&delta_key.0, blob_ref.0)
|
||||
.map_err(anyhow::Error::new)
|
||||
.map_err(PutError::Other);
|
||||
|
||||
self.num_keys += 1;
|
||||
|
||||
(val, res.map_err(|e| anyhow::anyhow!(e)))
|
||||
(val, res)
|
||||
}
|
||||
|
||||
fn size(&self) -> u64 {
|
||||
@@ -694,7 +703,7 @@ impl DeltaLayerWriter {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), PutError> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
@@ -709,7 +718,7 @@ impl DeltaLayerWriter {
|
||||
val: FullSlice<Buf>,
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, anyhow::Result<()>)
|
||||
) -> (FullSlice<Buf>, Result<(), PutError>)
|
||||
where
|
||||
Buf: IoBuf + Send,
|
||||
{
|
||||
|
||||
24
pageserver/src/tenant/storage_layer/errors.rs
Normal file
24
pageserver/src/tenant/storage_layer/errors.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use crate::tenant::blob_io::WriteBlobError;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum PutError {
|
||||
#[error(transparent)]
|
||||
WriteBlob(WriteBlobError),
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl PutError {
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
PutError::WriteBlob(e) => e.is_cancel(),
|
||||
PutError::Other(_) => false,
|
||||
}
|
||||
}
|
||||
pub fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
PutError::WriteBlob(e) => e.into_anyhow(),
|
||||
PutError::Other(e) => e,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -53,6 +53,7 @@ use utils::bin_ser::SerializeError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::errors::PutError;
|
||||
use super::layer_name::ImageLayerName;
|
||||
use super::{
|
||||
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
|
||||
@@ -842,8 +843,14 @@ impl ImageLayerWriterInner {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
) -> Result<(), PutError> {
|
||||
if !self.key_range.contains(&key) {
|
||||
return Err(PutError::Other(anyhow::anyhow!(
|
||||
"key {:?} not in range {:?}",
|
||||
key,
|
||||
self.key_range
|
||||
)));
|
||||
}
|
||||
let compression = self.conf.image_compression;
|
||||
let uncompressed_len = img.len() as u64;
|
||||
self.uncompressed_bytes += uncompressed_len;
|
||||
@@ -853,7 +860,7 @@ impl ImageLayerWriterInner {
|
||||
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
|
||||
.await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let (off, compression_info) = res?;
|
||||
let (off, compression_info) = res.map_err(PutError::WriteBlob)?;
|
||||
if compression_info.compressed_size.is_some() {
|
||||
// The image has been considered for compression at least
|
||||
self.uncompressed_bytes_eligible += uncompressed_len;
|
||||
@@ -865,7 +872,10 @@ impl ImageLayerWriterInner {
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, off)?;
|
||||
self.tree
|
||||
.append(&keybuf, off)
|
||||
.map_err(anyhow::Error::new)
|
||||
.map_err(PutError::Other)?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
@@ -1085,7 +1095,7 @@ impl ImageLayerWriter {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), PutError> {
|
||||
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ use super::{
|
||||
LayerVisibilityHint, PerfInstrumentFutureExt, PersistentLayerDesc, ValuesReconstructState,
|
||||
};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::context::{RequestContext, RequestContextBuilder};
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Timeline;
|
||||
@@ -1076,24 +1076,17 @@ impl LayerInner {
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
let ctx = if ctx.has_perf_span() {
|
||||
let dl_ctx = RequestContextBuilder::from(ctx)
|
||||
.task_kind(TaskKind::LayerDownload)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"DOWNLOAD_LAYER",
|
||||
layer = %self,
|
||||
reason = %reason
|
||||
)
|
||||
})
|
||||
.detached_child();
|
||||
ctx.perf_follows_from(&dl_ctx);
|
||||
dl_ctx
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"DOWNLOAD_LAYER",
|
||||
layer = %self,
|
||||
reason = %reason,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
async move {
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
@@ -1101,7 +1094,7 @@ impl LayerInner {
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
let res = self
|
||||
.download_init_and_wait(timeline, permit, ctx.attached_child())
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
|
||||
.await?;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
@@ -1709,7 +1702,7 @@ impl DownloadError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, PartialEq, Copy, Clone)]
|
||||
pub(crate) enum NeedsDownload {
|
||||
NotFound,
|
||||
NotFile(std::fs::FileType),
|
||||
|
||||
@@ -340,7 +340,7 @@ pub(crate) fn log_compaction_error(
|
||||
} else {
|
||||
match level {
|
||||
Level::ERROR if degrade_to_warning => warn!("Compaction failed and discarded: {err:#}"),
|
||||
Level::ERROR => error!("Compaction failed: {err:#}"),
|
||||
Level::ERROR => error!("Compaction failed: {err:?}"),
|
||||
Level::INFO => info!("Compaction failed: {err:#}"),
|
||||
level => unimplemented!("unexpected level {level:?}"),
|
||||
}
|
||||
|
||||
@@ -987,6 +987,16 @@ impl From<PageReconstructError> for CreateImageLayersError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<super::storage_layer::errors::PutError> for CreateImageLayersError {
|
||||
fn from(e: super::storage_layer::errors::PutError) -> Self {
|
||||
if e.is_cancel() {
|
||||
CreateImageLayersError::Cancelled
|
||||
} else {
|
||||
CreateImageLayersError::Other(e.into_anyhow())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetVectoredError> for CreateImageLayersError {
|
||||
fn from(e: GetVectoredError) -> Self {
|
||||
match e {
|
||||
@@ -2117,22 +2127,14 @@ impl Timeline {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Regardless of whether we're going to try_freeze_and_flush
|
||||
// or not, stop ingesting any more data. Walreceiver only provides
|
||||
// cancellation but no "wait until gone", because it uses the Timeline::gate.
|
||||
// So, only after the self.gate.close() below will we know for sure that
|
||||
// no walreceiver tasks are left.
|
||||
// For `try_freeze_and_flush=true`, this means that we might still be ingesting
|
||||
// data during the call to `self.freeze_and_flush()` below.
|
||||
// That's not ideal, but, we don't have the concept of a ChildGuard,
|
||||
// which is what we'd need to properly model early shutdown of the walreceiver
|
||||
// task sub-tree before the other Timeline task sub-trees.
|
||||
// or not, stop ingesting any more data.
|
||||
let walreceiver = self.walreceiver.lock().unwrap().take();
|
||||
tracing::debug!(
|
||||
is_some = walreceiver.is_some(),
|
||||
"Waiting for WalReceiverManager..."
|
||||
);
|
||||
if let Some(walreceiver) = walreceiver {
|
||||
walreceiver.cancel();
|
||||
walreceiver.shutdown().await;
|
||||
}
|
||||
// ... and inform any waiters for newer LSNs that there won't be any.
|
||||
self.last_record_lsn.shutdown();
|
||||
@@ -5923,6 +5925,16 @@ impl From<layer_manager::Shutdown> for CompactionError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<super::storage_layer::errors::PutError> for CompactionError {
|
||||
fn from(e: super::storage_layer::errors::PutError) -> Self {
|
||||
if e.is_cancel() {
|
||||
CompactionError::ShuttingDown
|
||||
} else {
|
||||
CompactionError::Other(e.into_anyhow())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(serde::Serialize)]
|
||||
struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
|
||||
|
||||
@@ -1277,6 +1277,8 @@ impl Timeline {
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
|
||||
let gc_cutoff = *self.applied_gc_cutoff_lsn.read();
|
||||
|
||||
// 2. Repartition and create image layers if necessary
|
||||
match self
|
||||
.repartition(
|
||||
@@ -1287,7 +1289,7 @@ impl Timeline {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) if lsn >= gc_cutoff => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::from(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
@@ -1341,6 +1343,10 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(_) => {
|
||||
info!("skipping repartitioning due to image compaction LSN being below GC cutoff");
|
||||
}
|
||||
|
||||
// Suppress errors when cancelled.
|
||||
Err(_) if self.cancel.is_cancelled() => {}
|
||||
Err(err) if err.is_cancel() => {}
|
||||
@@ -2204,8 +2210,7 @@ impl Timeline {
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.put_value(key, lsn, value, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
.await?;
|
||||
} else {
|
||||
let owner = self.shard_identity.get_shard_number(&key);
|
||||
|
||||
@@ -3607,6 +3612,13 @@ impl Timeline {
|
||||
last_key = Some(key);
|
||||
}
|
||||
accumulated_values.push((key, lsn, val));
|
||||
|
||||
if accumulated_values.len() >= 65536 {
|
||||
// Assume all of them are images, that would be 512MB of data in memory for a single key.
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"too many values for a single key, giving up gc-compaction"
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
let last_key: &mut Key = last_key.as_mut().unwrap();
|
||||
stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
|
||||
|
||||
@@ -149,14 +149,7 @@ pub async fn doit(
|
||||
}
|
||||
.await?;
|
||||
|
||||
flow::run(
|
||||
timeline.clone(),
|
||||
base_lsn,
|
||||
control_file,
|
||||
storage.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?;
|
||||
|
||||
//
|
||||
// Communicate that shard is done.
|
||||
|
||||
@@ -34,7 +34,9 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::{bail, ensure};
|
||||
use bytes::Bytes;
|
||||
use futures::stream::FuturesOrdered;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::config::TimelineImportConfig;
|
||||
use pageserver_api::key::{
|
||||
CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, Key, TWOPHASEDIR_KEY, rel_block_to_key,
|
||||
rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
|
||||
@@ -46,8 +48,9 @@ use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::relfile_utils::parse_relfilename;
|
||||
use postgres_ffi::{BLCKSZ, pg_constants};
|
||||
use remote_storage::RemotePath;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{Instrument, debug, info_span, instrument};
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::{debug, instrument};
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -63,37 +66,39 @@ use crate::tenant::storage_layer::{ImageLayerWriter, Layer};
|
||||
|
||||
pub async fn run(
|
||||
timeline: Arc<Timeline>,
|
||||
pgdata_lsn: Lsn,
|
||||
control_file: ControlFile,
|
||||
storage: RemoteStorageWrapper,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
Flow {
|
||||
timeline,
|
||||
pgdata_lsn,
|
||||
let planner = Planner {
|
||||
control_file,
|
||||
tasks: Vec::new(),
|
||||
storage,
|
||||
}
|
||||
.run(ctx)
|
||||
.await
|
||||
storage: storage.clone(),
|
||||
shard: timeline.shard_identity,
|
||||
tasks: Vec::default(),
|
||||
};
|
||||
|
||||
let import_config = &timeline.conf.timeline_import_config;
|
||||
let plan = planner.plan(import_config).await?;
|
||||
plan.execute(timeline, import_config, ctx).await
|
||||
}
|
||||
|
||||
struct Flow {
|
||||
timeline: Arc<Timeline>,
|
||||
pgdata_lsn: Lsn,
|
||||
struct Planner {
|
||||
control_file: ControlFile,
|
||||
tasks: Vec<AnyImportTask>,
|
||||
storage: RemoteStorageWrapper,
|
||||
shard: ShardIdentity,
|
||||
tasks: Vec<AnyImportTask>,
|
||||
}
|
||||
|
||||
impl Flow {
|
||||
/// Perform the ingestion into [`Self::timeline`].
|
||||
/// Assumes the timeline is empty (= no layers).
|
||||
pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
|
||||
struct Plan {
|
||||
jobs: Vec<ChunkProcessingJob>,
|
||||
}
|
||||
|
||||
self.pgdata_lsn = pgdata_lsn;
|
||||
impl Planner {
|
||||
/// Creates an import plan
|
||||
///
|
||||
/// This function is and must remain pure: given the same input, it will generate the same import plan.
|
||||
async fn plan(mut self, import_config: &TimelineImportConfig) -> anyhow::Result<Plan> {
|
||||
let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
|
||||
|
||||
let datadir = PgDataDir::new(&self.storage).await?;
|
||||
|
||||
@@ -115,7 +120,7 @@ impl Flow {
|
||||
}
|
||||
|
||||
// Import SLRUs
|
||||
if self.timeline.tenant_shard_id.is_shard_zero() {
|
||||
if self.shard.is_shard_zero() {
|
||||
// pg_xact (01:00 keyspace)
|
||||
self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
|
||||
.await?;
|
||||
@@ -166,14 +171,16 @@ impl Flow {
|
||||
let mut last_end_key = Key::MIN;
|
||||
let mut current_chunk = Vec::new();
|
||||
let mut current_chunk_size: usize = 0;
|
||||
let mut parallel_jobs = Vec::new();
|
||||
let mut jobs = Vec::new();
|
||||
for task in std::mem::take(&mut self.tasks).into_iter() {
|
||||
if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 {
|
||||
if current_chunk_size + task.total_size()
|
||||
> import_config.import_job_soft_size_limit.into()
|
||||
{
|
||||
let key_range = last_end_key..task.key_range().start;
|
||||
parallel_jobs.push(ChunkProcessingJob::new(
|
||||
jobs.push(ChunkProcessingJob::new(
|
||||
key_range.clone(),
|
||||
std::mem::take(&mut current_chunk),
|
||||
&self,
|
||||
pgdata_lsn,
|
||||
));
|
||||
last_end_key = key_range.end;
|
||||
current_chunk_size = 0;
|
||||
@@ -181,45 +188,13 @@ impl Flow {
|
||||
current_chunk_size += task.total_size();
|
||||
current_chunk.push(task);
|
||||
}
|
||||
parallel_jobs.push(ChunkProcessingJob::new(
|
||||
jobs.push(ChunkProcessingJob::new(
|
||||
last_end_key..Key::MAX,
|
||||
current_chunk,
|
||||
&self,
|
||||
pgdata_lsn,
|
||||
));
|
||||
|
||||
// Start all jobs simultaneosly
|
||||
let mut work = JoinSet::new();
|
||||
// TODO: semaphore?
|
||||
for job in parallel_jobs {
|
||||
let ctx: RequestContext =
|
||||
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
|
||||
work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job")));
|
||||
}
|
||||
let mut results = Vec::new();
|
||||
while let Some(result) = work.join_next().await {
|
||||
match result {
|
||||
Ok(res) => {
|
||||
results.push(res);
|
||||
}
|
||||
Err(_joinset_err) => {
|
||||
results.push(Err(anyhow::anyhow!(
|
||||
"parallel job panicked or cancelled, check pageserver logs"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if results.iter().all(|r| r.is_ok()) {
|
||||
Ok(())
|
||||
} else {
|
||||
let mut msg = String::new();
|
||||
for result in results {
|
||||
if let Err(err) = result {
|
||||
msg.push_str(&format!("{err:?}\n\n"));
|
||||
}
|
||||
}
|
||||
bail!("Some parallel jobs failed:\n\n{msg}");
|
||||
}
|
||||
Ok(Plan { jobs })
|
||||
}
|
||||
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
|
||||
@@ -266,7 +241,7 @@ impl Flow {
|
||||
let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
|
||||
self.tasks
|
||||
.push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
|
||||
*self.timeline.get_shard_identity(),
|
||||
self.shard,
|
||||
start_key..end_key,
|
||||
&file.path,
|
||||
self.storage.clone(),
|
||||
@@ -289,7 +264,7 @@ impl Flow {
|
||||
}
|
||||
|
||||
async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
|
||||
assert!(self.timeline.tenant_shard_id.is_shard_zero());
|
||||
assert!(self.shard.is_shard_zero());
|
||||
|
||||
let segments = self.storage.listfilesindir(path).await?;
|
||||
let segments: Vec<(String, u32, usize)> = segments
|
||||
@@ -344,6 +319,68 @@ impl Flow {
|
||||
}
|
||||
}
|
||||
|
||||
impl Plan {
|
||||
async fn execute(
|
||||
self,
|
||||
timeline: Arc<Timeline>,
|
||||
import_config: &TimelineImportConfig,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut work = FuturesOrdered::new();
|
||||
let semaphore = Arc::new(Semaphore::new(import_config.import_job_concurrency.into()));
|
||||
|
||||
let jobs_in_plan = self.jobs.len();
|
||||
|
||||
let mut jobs = self.jobs.into_iter().enumerate().peekable();
|
||||
let mut results = Vec::new();
|
||||
|
||||
// Run import jobs concurrently up to the limit specified by the pageserver configuration.
|
||||
// Note that we process completed futures in the oreder of insertion. This will be the
|
||||
// building block for resuming imports across pageserver restarts or tenant migrations.
|
||||
while results.len() < jobs_in_plan {
|
||||
tokio::select! {
|
||||
permit = semaphore.clone().acquire_owned(), if jobs.peek().is_some() => {
|
||||
let permit = permit.expect("never closed");
|
||||
let (job_idx, job) = jobs.next().expect("we peeked");
|
||||
let job_timeline = timeline.clone();
|
||||
let ctx = ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
|
||||
|
||||
work.push_back(tokio::task::spawn(async move {
|
||||
let _permit = permit;
|
||||
let res = job.run(job_timeline, &ctx).await;
|
||||
(job_idx, res)
|
||||
}));
|
||||
},
|
||||
maybe_complete_job_idx = work.next() => {
|
||||
match maybe_complete_job_idx {
|
||||
Some(Ok((_job_idx, res))) => {
|
||||
results.push(res);
|
||||
},
|
||||
Some(Err(_)) => {
|
||||
results.push(Err(anyhow::anyhow!(
|
||||
"parallel job panicked or cancelled, check pageserver logs"
|
||||
)));
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if results.iter().all(|r| r.is_ok()) {
|
||||
Ok(())
|
||||
} else {
|
||||
let mut msg = String::new();
|
||||
for result in results {
|
||||
if let Err(err) = result {
|
||||
msg.push_str(&format!("{err:?}\n\n"));
|
||||
}
|
||||
}
|
||||
bail!("Some parallel jobs failed:\n\n{msg}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// dbdir iteration tools
|
||||
//
|
||||
@@ -713,7 +750,6 @@ impl From<ImportSlruBlocksTask> for AnyImportTask {
|
||||
}
|
||||
|
||||
struct ChunkProcessingJob {
|
||||
timeline: Arc<Timeline>,
|
||||
range: Range<Key>,
|
||||
tasks: Vec<AnyImportTask>,
|
||||
|
||||
@@ -721,25 +757,24 @@ struct ChunkProcessingJob {
|
||||
}
|
||||
|
||||
impl ChunkProcessingJob {
|
||||
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &Flow) -> Self {
|
||||
assert!(env.pgdata_lsn.is_valid());
|
||||
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, pgdata_lsn: Lsn) -> Self {
|
||||
assert!(pgdata_lsn.is_valid());
|
||||
Self {
|
||||
timeline: env.timeline.clone(),
|
||||
range,
|
||||
tasks,
|
||||
pgdata_lsn: env.pgdata_lsn,
|
||||
pgdata_lsn,
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
async fn run(self, timeline: Arc<Timeline>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
let mut writer = ImageLayerWriter::new(
|
||||
self.timeline.conf,
|
||||
self.timeline.timeline_id,
|
||||
self.timeline.tenant_shard_id,
|
||||
timeline.conf,
|
||||
timeline.timeline_id,
|
||||
timeline.tenant_shard_id,
|
||||
&self.range,
|
||||
self.pgdata_lsn,
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -751,24 +786,20 @@ impl ChunkProcessingJob {
|
||||
|
||||
let resident_layer = if nimages > 0 {
|
||||
let (desc, path) = writer.finish(ctx).await?;
|
||||
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?
|
||||
Layer::finish_creating(timeline.conf, &timeline, desc, &path)?
|
||||
} else {
|
||||
// dropping the writer cleans up
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// this is sharing the same code as create_image_layers
|
||||
let mut guard = self.timeline.layers.write().await;
|
||||
let mut guard = timeline.layers.write().await;
|
||||
guard
|
||||
.open_mut()?
|
||||
.track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics);
|
||||
.track_new_image_layers(&[resident_layer.clone()], &timeline.metrics);
|
||||
crate::tenant::timeline::drop_wlock(guard);
|
||||
|
||||
// Schedule the layer for upload but don't add barriers such as
|
||||
// wait for completion or index upload, so we don't inhibit upload parallelism.
|
||||
// TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?)
|
||||
// TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level.
|
||||
self.timeline
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_layer_file_upload(resident_layer)?;
|
||||
|
||||
|
||||
@@ -63,6 +63,7 @@ pub struct WalReceiver {
|
||||
/// All task spawned by [`WalReceiver::start`] and its children are sensitive to this token.
|
||||
/// It's a child token of [`Timeline`] so that timeline shutdown can cancel WalReceiver tasks early for `freeze_and_flush=true`.
|
||||
cancel: CancellationToken,
|
||||
task: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl WalReceiver {
|
||||
@@ -79,7 +80,7 @@ impl WalReceiver {
|
||||
let loop_status = Arc::new(std::sync::RwLock::new(None));
|
||||
let manager_status = Arc::clone(&loop_status);
|
||||
let cancel = timeline.cancel.child_token();
|
||||
WALRECEIVER_RUNTIME.spawn({
|
||||
let task = WALRECEIVER_RUNTIME.spawn({
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
@@ -120,14 +121,25 @@ impl WalReceiver {
|
||||
Self {
|
||||
manager_status,
|
||||
cancel,
|
||||
task,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = tracing::Level::DEBUG)]
|
||||
pub fn cancel(&self) {
|
||||
pub async fn shutdown(self) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
debug!("cancelling walreceiver tasks");
|
||||
self.cancel.cancel();
|
||||
match self.task.await {
|
||||
Ok(()) => debug!("Shutdown success"),
|
||||
Err(je) if je.is_cancelled() => unreachable!("not used"),
|
||||
Err(je) if je.is_panic() => {
|
||||
// already logged by panic hook
|
||||
}
|
||||
Err(je) => {
|
||||
error!("shutdown walreceiver task join error: {je}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {
|
||||
|
||||
@@ -74,6 +74,8 @@ pub struct VirtualFile {
|
||||
|
||||
impl VirtualFile {
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
///
|
||||
/// Insensitive to `virtual_file_io_mode` setting.
|
||||
pub async fn open<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
@@ -95,31 +97,20 @@ impl VirtualFile {
|
||||
Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
|
||||
}
|
||||
|
||||
/// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
|
||||
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut open_options: OpenOptions,
|
||||
mut open_options: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let mode = get_io_mode();
|
||||
let set_o_direct = match (mode, open_options.is_write()) {
|
||||
let direct = match (mode, open_options.is_write()) {
|
||||
(IoMode::Buffered, _) => false,
|
||||
#[cfg(target_os = "linux")]
|
||||
(IoMode::Direct, false) => true,
|
||||
#[cfg(target_os = "linux")]
|
||||
(IoMode::Direct, true) => false,
|
||||
#[cfg(target_os = "linux")]
|
||||
(IoMode::DirectRw, _) => true,
|
||||
};
|
||||
if set_o_direct {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
open_options = open_options.custom_flags(nix::libc::O_DIRECT);
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
unreachable!(
|
||||
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
|
||||
);
|
||||
}
|
||||
open_options = open_options.direct(direct);
|
||||
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
|
||||
Ok(VirtualFile { inner, _mode: mode })
|
||||
}
|
||||
@@ -791,6 +782,12 @@ impl VirtualFileInner {
|
||||
where
|
||||
Buf: tokio_epoll_uring::IoBufMut + Send,
|
||||
{
|
||||
self.validate_direct_io(
|
||||
Slice::stable_ptr(&buf).addr(),
|
||||
Slice::bytes_total(&buf),
|
||||
offset,
|
||||
);
|
||||
|
||||
let file_guard = match self
|
||||
.lock_file()
|
||||
.await
|
||||
@@ -816,6 +813,8 @@ impl VirtualFileInner {
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<B>, Result<usize, Error>) {
|
||||
self.validate_direct_io(buf.as_ptr().addr(), buf.len(), offset);
|
||||
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
Err(e) => return (buf, Err(e)),
|
||||
@@ -830,6 +829,64 @@ impl VirtualFileInner {
|
||||
(buf, result)
|
||||
})
|
||||
}
|
||||
|
||||
/// Validate all reads and writes to adhere to the O_DIRECT requirements of our production systems.
|
||||
///
|
||||
/// Validating it iin userspace sets a consistent bar, independent of what actual OS/filesystem/block device is in use.
|
||||
fn validate_direct_io(&self, addr: usize, size: usize, offset: u64) {
|
||||
// TODO: eventually enable validation in the builds we use in real environments like staging, preprod, and prod.
|
||||
if !(cfg!(feature = "testing") || cfg!(test)) {
|
||||
return;
|
||||
}
|
||||
if !self.open_options.is_direct() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Validate buffer memory alignment.
|
||||
//
|
||||
// What practically matters as of Linux 6.1 is bdev_dma_alignment()
|
||||
// which is practically between 512 and 4096.
|
||||
// On our production systems, the value is 512.
|
||||
// The IoBuffer/IoBufferMut hard-code that value.
|
||||
//
|
||||
// Because the alloctor might return _more_ aligned addresses than requested,
|
||||
// there is a chance that testing would not catch violations of a runtime requirement stricter than 512.
|
||||
{
|
||||
let requirement = 512;
|
||||
let remainder = addr % requirement;
|
||||
assert!(
|
||||
remainder == 0,
|
||||
"Direct I/O buffer must be aligned: buffer_addr=0x{addr:x} % 0x{requirement:x} = 0x{remainder:x}"
|
||||
);
|
||||
}
|
||||
|
||||
// Validate offset alignment.
|
||||
//
|
||||
// We hard-code 512 throughout the code base.
|
||||
// So enforce just that and not anything more restrictive.
|
||||
// Even the shallowest testing will expose more restrictive requirements if those ever arise.
|
||||
{
|
||||
let requirement = 512;
|
||||
let remainder = offset % requirement;
|
||||
assert!(
|
||||
remainder == 0,
|
||||
"Direct I/O offset must be aligned: offset=0x{offset:x} % 0x{requirement:x} = 0x{remainder:x}"
|
||||
);
|
||||
}
|
||||
|
||||
// Validate buffer size multiple requirement.
|
||||
//
|
||||
// The requirement in Linux 6.1 is bdev_logical_block_size().
|
||||
// On our production systems, that is 512.
|
||||
{
|
||||
let requirement = 512;
|
||||
let remainder = size % requirement;
|
||||
assert!(
|
||||
remainder == 0,
|
||||
"Direct I/O buffer size must be a multiple of {requirement}: size=0x{size:x} % 0x{requirement:x} = 0x{remainder:x}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
@@ -1218,7 +1275,6 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use owned_buffers_io::slice::SliceMutExt;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::{Rng, thread_rng};
|
||||
|
||||
@@ -1226,162 +1282,38 @@ mod tests {
|
||||
use crate::context::DownloadBehavior;
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
enum MaybeVirtualFile {
|
||||
VirtualFile(VirtualFile),
|
||||
File(File),
|
||||
}
|
||||
|
||||
impl From<VirtualFile> for MaybeVirtualFile {
|
||||
fn from(vf: VirtualFile) -> Self {
|
||||
MaybeVirtualFile::VirtualFile(vf)
|
||||
}
|
||||
}
|
||||
|
||||
impl MaybeVirtualFile {
|
||||
async fn read_exact_at(
|
||||
&self,
|
||||
mut slice: tokio_epoll_uring::Slice<IoBufferMut>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<tokio_epoll_uring::Slice<IoBufferMut>, Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
|
||||
MaybeVirtualFile::File(file) => {
|
||||
let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
|
||||
file.read_exact_at(rust_slice, offset).map(|()| slice)
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
&self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => {
|
||||
let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
|
||||
res
|
||||
}
|
||||
MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to slurp a portion of a file into a string
|
||||
async fn read_string_at(
|
||||
&mut self,
|
||||
pos: u64,
|
||||
len: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<String, Error> {
|
||||
let slice = IoBufferMut::with_capacity(len).slice_full();
|
||||
assert_eq!(slice.bytes_total(), len);
|
||||
let slice = self.read_exact_at(slice, pos, ctx).await?;
|
||||
let buf = slice.into_inner();
|
||||
assert_eq!(buf.len(), len);
|
||||
|
||||
Ok(String::from_utf8(buf.to_vec()).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_virtual_files() -> anyhow::Result<()> {
|
||||
// The real work is done in the test_files() helper function. This
|
||||
// allows us to run the same set of tests against a native File, and
|
||||
// VirtualFile. We trust the native Files and wouldn't need to test them,
|
||||
// but this allows us to verify that the operations return the same
|
||||
// results with VirtualFiles as with native Files. (Except that with
|
||||
// native files, you will run out of file descriptors if the ulimit
|
||||
// is low enough.)
|
||||
struct A;
|
||||
|
||||
impl Adapter for A {
|
||||
async fn open(
|
||||
path: Utf8PathBuf,
|
||||
opts: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<MaybeVirtualFile, anyhow::Error> {
|
||||
let vf = VirtualFile::open_with_options_v2(&path, opts, ctx).await?;
|
||||
Ok(MaybeVirtualFile::VirtualFile(vf))
|
||||
}
|
||||
}
|
||||
test_files::<A>("virtual_files").await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_physical_files() -> anyhow::Result<()> {
|
||||
struct B;
|
||||
|
||||
impl Adapter for B {
|
||||
async fn open(
|
||||
path: Utf8PathBuf,
|
||||
opts: OpenOptions,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<MaybeVirtualFile, anyhow::Error> {
|
||||
Ok(MaybeVirtualFile::File({
|
||||
let owned_fd = opts.open(path.as_std_path()).await?;
|
||||
File::from(owned_fd)
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
test_files::<B>("physical_files").await
|
||||
}
|
||||
|
||||
/// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
|
||||
/// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
|
||||
/// in trait which benefits from the new lifetime capture rules already.
|
||||
trait Adapter {
|
||||
async fn open(
|
||||
path: Utf8PathBuf,
|
||||
opts: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<MaybeVirtualFile, anyhow::Error>;
|
||||
}
|
||||
|
||||
async fn test_files<A>(testname: &str) -> anyhow::Result<()>
|
||||
where
|
||||
A: Adapter,
|
||||
{
|
||||
let ctx =
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir("test_virtual_files");
|
||||
std::fs::create_dir_all(&testdir)?;
|
||||
|
||||
let zeropad512 = |content: &[u8]| {
|
||||
let mut buf = IoBufferMut::with_capacity_zeroed(512);
|
||||
buf[..content.len()].copy_from_slice(content);
|
||||
buf.freeze().slice_len()
|
||||
};
|
||||
|
||||
let path_a = testdir.join("file_a");
|
||||
let mut file_a = A::open(
|
||||
let file_a = VirtualFile::open_with_options_v2(
|
||||
path_a.clone(),
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
// set create & truncate flags to ensure when we trigger a reopen later in this test,
|
||||
// the reopen_options must have masked out those flags; if they don't, then
|
||||
// the after reopen we will fail to read the `content_a` that we write here.
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.to_owned(),
|
||||
.truncate(true),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
let (_, res) = file_a.write_all_at(zeropad512(b"content_a"), 0, &ctx).await;
|
||||
res?;
|
||||
|
||||
file_a
|
||||
.write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx)
|
||||
.await?;
|
||||
|
||||
// cannot read from a file opened in write-only mode
|
||||
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
|
||||
|
||||
// Close the file and re-open for reading
|
||||
let mut file_a = A::open(path_a, OpenOptions::new().read(true), &ctx).await?;
|
||||
|
||||
// cannot write to a file opened in read-only mode
|
||||
let _ = file_a
|
||||
.write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
// Try simple read
|
||||
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
|
||||
|
||||
// Create another test file, and try FileExt functions on it.
|
||||
let path_b = testdir.join("file_b");
|
||||
let mut file_b = A::open(
|
||||
let file_b = VirtualFile::open_with_options_v2(
|
||||
path_b.clone(),
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
@@ -1391,37 +1323,44 @@ mod tests {
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
file_b
|
||||
.write_all_at(IoBuffer::from(b"BAR").slice_len(), 3, &ctx)
|
||||
.await?;
|
||||
file_b
|
||||
.write_all_at(IoBuffer::from(b"FOO").slice_len(), 0, &ctx)
|
||||
.await?;
|
||||
let (_, res) = file_b.write_all_at(zeropad512(b"content_b"), 0, &ctx).await;
|
||||
res?;
|
||||
|
||||
assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
|
||||
let assert_first_512_eq = async |vfile: &VirtualFile, expect: &[u8]| {
|
||||
let buf = vfile
|
||||
.read_exact_at(IoBufferMut::with_capacity_zeroed(512).slice_full(), 0, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(&buf[..], &zeropad512(expect)[..]);
|
||||
};
|
||||
|
||||
// Open a lot of files, enough to cause some evictions. (Or to be precise,
|
||||
// open the same file many times. The effect is the same.)
|
||||
// Open a lot of file descriptors / VirtualFile instances.
|
||||
// Enough to cause some evictions in the fd cache.
|
||||
|
||||
let mut vfiles = Vec::new();
|
||||
let mut file_b_dupes = Vec::new();
|
||||
for _ in 0..100 {
|
||||
let mut vfile = A::open(path_b.clone(), OpenOptions::new().read(true), &ctx).await?;
|
||||
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
|
||||
vfiles.push(vfile);
|
||||
let vfile = VirtualFile::open_with_options_v2(
|
||||
path_b.clone(),
|
||||
OpenOptions::new().read(true),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
assert_first_512_eq(&vfile, b"content_b").await;
|
||||
file_b_dupes.push(vfile);
|
||||
}
|
||||
|
||||
// make sure we opened enough files to definitely cause evictions.
|
||||
assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
|
||||
assert!(file_b_dupes.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
|
||||
|
||||
// The underlying file descriptor for 'file_a' should be closed now. Try to read
|
||||
// from it again.
|
||||
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
|
||||
// from it again. The VirtualFile reopens the file internally.
|
||||
assert_first_512_eq(&file_a, b"content_a").await;
|
||||
|
||||
// Check that all the other FDs still work too. Use them in random order for
|
||||
// good measure.
|
||||
vfiles.as_mut_slice().shuffle(&mut thread_rng());
|
||||
for vfile in vfiles.iter_mut() {
|
||||
assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
|
||||
file_b_dupes.as_mut_slice().shuffle(&mut thread_rng());
|
||||
for vfile in file_b_dupes.iter_mut() {
|
||||
assert_first_512_eq(vfile, b"content_b").await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -1452,7 +1391,7 @@ mod tests {
|
||||
// Open the file many times.
|
||||
let mut files = Vec::new();
|
||||
for _ in 0..VIRTUAL_FILES {
|
||||
let f = VirtualFileInner::open_with_options(
|
||||
let f = VirtualFile::open_with_options_v2(
|
||||
&test_file_path,
|
||||
OpenOptions::new().read(true),
|
||||
&ctx,
|
||||
@@ -1497,8 +1436,6 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_atomic_overwrite_basic() {
|
||||
let ctx =
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
|
||||
std::fs::create_dir_all(&testdir).unwrap();
|
||||
|
||||
@@ -1508,26 +1445,22 @@ mod tests {
|
||||
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
|
||||
|
||||
let post = std::fs::read_to_string(&path).unwrap();
|
||||
assert_eq!(post, "foo");
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
|
||||
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
|
||||
|
||||
let post = std::fs::read_to_string(&path).unwrap();
|
||||
assert_eq!(post, "bar");
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_atomic_overwrite_preexisting_tmp() {
|
||||
let ctx =
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
|
||||
let testdir =
|
||||
crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
|
||||
std::fs::create_dir_all(&testdir).unwrap();
|
||||
@@ -1542,10 +1475,8 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
|
||||
let post = std::fs::read_to_string(&path).unwrap();
|
||||
assert_eq!(post, "foo");
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,13 +111,17 @@ pub(crate) fn get() -> IoEngine {
|
||||
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::sync::atomic::{AtomicU8, Ordering};
|
||||
#[cfg(target_os = "linux")]
|
||||
use {std::time::Duration, tracing::info};
|
||||
|
||||
use super::owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use super::owned_buffers_io::slice::SliceMutExt;
|
||||
use super::{FileGuard, Metadata};
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
|
||||
pub(super) fn epoll_uring_error_to_std(
|
||||
e: tokio_epoll_uring::Error<std::io::Error>,
|
||||
) -> std::io::Error {
|
||||
match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
@@ -149,7 +153,11 @@ impl IoEngine {
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let (resources, res) = system.read(file_guard, offset, slice).await;
|
||||
let (resources, res) =
|
||||
retry_ecanceled_once((file_guard, slice), |(file_guard, slice)| async {
|
||||
system.read(file_guard, offset, slice).await
|
||||
})
|
||||
.await;
|
||||
(resources, res.map_err(epoll_uring_error_to_std))
|
||||
}
|
||||
}
|
||||
@@ -164,7 +172,10 @@ impl IoEngine {
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let (resources, res) = system.fsync(file_guard).await;
|
||||
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
|
||||
system.fsync(file_guard).await
|
||||
})
|
||||
.await;
|
||||
(resources, res.map_err(epoll_uring_error_to_std))
|
||||
}
|
||||
}
|
||||
@@ -182,7 +193,10 @@ impl IoEngine {
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let (resources, res) = system.fdatasync(file_guard).await;
|
||||
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
|
||||
system.fdatasync(file_guard).await
|
||||
})
|
||||
.await;
|
||||
(resources, res.map_err(epoll_uring_error_to_std))
|
||||
}
|
||||
}
|
||||
@@ -201,7 +215,10 @@ impl IoEngine {
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let (resources, res) = system.statx(file_guard).await;
|
||||
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
|
||||
system.statx(file_guard).await
|
||||
})
|
||||
.await;
|
||||
(
|
||||
resources,
|
||||
res.map_err(epoll_uring_error_to_std).map(Metadata::from),
|
||||
@@ -224,6 +241,7 @@ impl IoEngine {
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
// TODO: ftruncate op for tokio-epoll-uring
|
||||
// Don't forget to use retry_ecanceled_once
|
||||
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
|
||||
(file_guard, res)
|
||||
}
|
||||
@@ -245,8 +263,11 @@ impl IoEngine {
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let ((file_guard, slice), res) =
|
||||
system.write(file_guard, offset, buf.into_raw_slice()).await;
|
||||
let ((file_guard, slice), res) = retry_ecanceled_once(
|
||||
(file_guard, buf.into_raw_slice()),
|
||||
async |(file_guard, buf)| system.write(file_guard, offset, buf).await,
|
||||
)
|
||||
.await;
|
||||
(
|
||||
(file_guard, FullSlice::must_new(slice)),
|
||||
res.map_err(epoll_uring_error_to_std),
|
||||
@@ -282,6 +303,56 @@ impl IoEngine {
|
||||
}
|
||||
}
|
||||
|
||||
/// We observe in tests that stop pageserver with SIGTERM immediately after it was ingesting data,
|
||||
/// occasionally buffered writers fail (and get retried by BufferedWriter) with ECANCELED.
|
||||
/// The problem is believed to be a race condition in how io_uring handles punted async work (io-wq) and signals.
|
||||
/// Investigation ticket: <https://github.com/neondatabase/neon/issues/11446>
|
||||
///
|
||||
/// This function retries the operation once if it fails with ECANCELED.
|
||||
/// ONLY USE FOR IDEMPOTENT [`super::VirtualFile`] operations.
|
||||
#[cfg(target_os = "linux")]
|
||||
pub(super) async fn retry_ecanceled_once<F, Fut, T, V>(
|
||||
resources: T,
|
||||
f: F,
|
||||
) -> (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)
|
||||
where
|
||||
F: Fn(T) -> Fut,
|
||||
Fut: std::future::Future<Output = (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)>,
|
||||
T: Send,
|
||||
V: Send,
|
||||
{
|
||||
let (resources, res) = f(resources).await;
|
||||
let Err(e) = res else {
|
||||
return (resources, res);
|
||||
};
|
||||
let tokio_epoll_uring::Error::Op(err) = e else {
|
||||
return (resources, Err(e));
|
||||
};
|
||||
if err.raw_os_error() != Some(nix::libc::ECANCELED) {
|
||||
return (resources, Err(tokio_epoll_uring::Error::Op(err)));
|
||||
}
|
||||
{
|
||||
static RATE_LIMIT: std::sync::Mutex<utils::rate_limit::RateLimit> =
|
||||
std::sync::Mutex::new(utils::rate_limit::RateLimit::new(Duration::from_secs(1)));
|
||||
let mut guard = RATE_LIMIT.lock().unwrap();
|
||||
guard.call2(|rate_limit_stats| {
|
||||
info!(
|
||||
%rate_limit_stats, "ECANCELED observed, assuming it is due to a signal being received by the submitting thread, retrying after a delay; this message is rate-limited"
|
||||
);
|
||||
});
|
||||
drop(guard);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await; // something big enough to beat even heavily overcommitted CI runners
|
||||
let (resources, res) = f(resources).await;
|
||||
(resources, res)
|
||||
}
|
||||
|
||||
pub(super) fn panic_operation_must_be_idempotent() {
|
||||
panic!(
|
||||
"unsupported; io_engine may retry operations internally and thus needs them to be idempotent (retry_ecanceled_once)"
|
||||
)
|
||||
}
|
||||
|
||||
pub enum FeatureTestResult {
|
||||
PlatformPreferred(IoEngineKind),
|
||||
Worse {
|
||||
|
||||
@@ -8,7 +8,13 @@ use super::io_engine::IoEngine;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OpenOptions {
|
||||
/// We keep a copy of the write() flag we pass to the `inner`` `OptionOptions`
|
||||
/// to support [`Self::is_write`].
|
||||
write: bool,
|
||||
/// We don't expose + pass through a raw `custom_flags()` style API.
|
||||
/// The only custom flag we support is `O_DIRECT`, which we track here
|
||||
/// and map to `custom_flags()` in the [`Self::open`] method.
|
||||
direct: bool,
|
||||
inner: Inner,
|
||||
}
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -30,6 +36,7 @@ impl Default for OpenOptions {
|
||||
};
|
||||
Self {
|
||||
write: false,
|
||||
direct: false,
|
||||
inner,
|
||||
}
|
||||
}
|
||||
@@ -44,6 +51,10 @@ impl OpenOptions {
|
||||
self.write
|
||||
}
|
||||
|
||||
pub(super) fn is_direct(&self) -> bool {
|
||||
self.direct
|
||||
}
|
||||
|
||||
pub fn read(mut self, read: bool) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
@@ -110,18 +121,48 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
/// Don't use, `O_APPEND` is not supported.
|
||||
pub fn append(&mut self, _append: bool) {
|
||||
super::io_engine::panic_operation_must_be_idempotent();
|
||||
}
|
||||
|
||||
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
|
||||
match &self.inner {
|
||||
Inner::StdFs(x) => x.open(path).map(|file| file.into()),
|
||||
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))]
|
||||
let mut custom_flags = 0;
|
||||
if self.direct {
|
||||
#[cfg(target_os = "linux")]
|
||||
Inner::TokioEpollUring(x) => {
|
||||
{
|
||||
custom_flags |= nix::libc::O_DIRECT;
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
// Other platforms may be used for development but don't necessarily have a 1:1 equivalent to Linux's O_DIRECT (macOS!).
|
||||
// Just don't set the flag; to catch alignment bugs typical for O_DIRECT,
|
||||
// we have a runtime validation layer inside `VirtualFile::write_at` and `VirtualFile::read_at`.
|
||||
static WARNING: std::sync::Once = std::sync::Once::new();
|
||||
WARNING.call_once(|| {
|
||||
let span = tracing::info_span!(parent: None, "open_options");
|
||||
let _enter = span.enter();
|
||||
tracing::warn!("your platform is not a supported production platform, ignoing request for O_DIRECT; this could hide alignment bugs; this warning is logged once per process");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
match self.inner.clone() {
|
||||
Inner::StdFs(mut x) => x
|
||||
.custom_flags(custom_flags)
|
||||
.open(path)
|
||||
.map(|file| file.into()),
|
||||
#[cfg(target_os = "linux")]
|
||||
Inner::TokioEpollUring(mut x) => {
|
||||
x.custom_flags(custom_flags);
|
||||
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
|
||||
system.open(path, x).await.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, system)
|
||||
}
|
||||
let (_, res) = super::io_engine::retry_ecanceled_once((), |()| async {
|
||||
let res = system.open(path, &x).await;
|
||||
((), res)
|
||||
})
|
||||
.await;
|
||||
res.map_err(super::io_engine::epoll_uring_error_to_std)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -139,16 +180,8 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn custom_flags(mut self, flags: i32) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
Inner::TokioEpollUring(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
}
|
||||
}
|
||||
pub fn direct(mut self, direct: bool) -> Self {
|
||||
self.direct = direct;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -247,6 +247,19 @@ pub enum FlushTaskError {
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl FlushTaskError {
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
FlushTaskError::Cancelled => true,
|
||||
}
|
||||
}
|
||||
pub fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
FlushTaskError::Cancelled => anyhow::anyhow!(self),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Buf, W> FlushBackgroundTask<Buf, W>
|
||||
where
|
||||
Buf: IoBufAligned + Send + Sync,
|
||||
|
||||
@@ -425,15 +425,12 @@ compact_prefetch_buffers(void)
|
||||
* point inside and outside PostgreSQL.
|
||||
*
|
||||
* This still does throw errors when it receives malformed responses from PS.
|
||||
*
|
||||
* When we're not called from CHECK_FOR_INTERRUPTS (indicated by
|
||||
* IsHandlingInterrupts) we also report we've ended prefetch receive work,
|
||||
* just in case state tracking was lost due to an error in the sync getPage
|
||||
* response code.
|
||||
*/
|
||||
void
|
||||
communicator_prefetch_pump_state(bool IsHandlingInterrupts)
|
||||
communicator_prefetch_pump_state(void)
|
||||
{
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
while (MyPState->ring_receive != MyPState->ring_flush)
|
||||
{
|
||||
NeonResponse *response;
|
||||
@@ -482,9 +479,7 @@ communicator_prefetch_pump_state(bool IsHandlingInterrupts)
|
||||
}
|
||||
}
|
||||
|
||||
/* We never pump the prefetch state while handling other pages */
|
||||
if (!IsHandlingInterrupts)
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
communicator_reconfigure_timeout_if_needed();
|
||||
}
|
||||
@@ -672,9 +667,10 @@ prefetch_wait_for(uint64 ring_index)
|
||||
|
||||
Assert(MyPState->ring_unused > ring_index);
|
||||
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
while (MyPState->ring_receive <= ring_index)
|
||||
{
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
entry = GetPrfSlot(MyPState->ring_receive);
|
||||
|
||||
Assert(entry->status == PRFS_REQUESTED);
|
||||
@@ -683,17 +679,18 @@ prefetch_wait_for(uint64 ring_index)
|
||||
result = false;
|
||||
break;
|
||||
}
|
||||
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
|
||||
if (result)
|
||||
{
|
||||
/* Check that slot is actually received (srver can be disconnected in prefetch_pump_state called from CHECK_FOR_INTERRUPTS */
|
||||
PrefetchRequest *slot = GetPrfSlot(ring_index);
|
||||
return slot->status == PRFS_RECEIVED;
|
||||
result = slot->status == PRFS_RECEIVED;
|
||||
}
|
||||
return false;
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
return result;
|
||||
;
|
||||
}
|
||||
|
||||
@@ -720,6 +717,7 @@ prefetch_read(PrefetchRequest *slot)
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_receive);
|
||||
Assert(readpage_reentrant_guard);
|
||||
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
@@ -802,6 +800,7 @@ communicator_prefetch_receive(BufferTag tag)
|
||||
PrfHashEntry *entry;
|
||||
PrefetchRequest hashkey;
|
||||
|
||||
Assert(readpage_reentrant_guard);
|
||||
hashkey.buftag = tag;
|
||||
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
||||
if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index))
|
||||
@@ -821,8 +820,12 @@ communicator_prefetch_receive(BufferTag tag)
|
||||
void
|
||||
prefetch_on_ps_disconnect(void)
|
||||
{
|
||||
bool save_readpage_reentrant_guard = readpage_reentrant_guard;
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
|
||||
/* Prohibit callig of prefetch_pump_state */
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
while (MyPState->ring_receive < MyPState->ring_unused)
|
||||
{
|
||||
PrefetchRequest *slot;
|
||||
@@ -851,6 +854,9 @@ prefetch_on_ps_disconnect(void)
|
||||
MyNeonCounters->getpage_prefetch_discards_total += 1;
|
||||
}
|
||||
|
||||
/* Restore guard */
|
||||
readpage_reentrant_guard = save_readpage_reentrant_guard;
|
||||
|
||||
/*
|
||||
* We can have gone into retry due to network error, so update stats with
|
||||
* the latest available
|
||||
@@ -2509,7 +2515,7 @@ communicator_processinterrupts(void)
|
||||
if (timeout_signaled)
|
||||
{
|
||||
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
|
||||
communicator_prefetch_pump_state(true);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
timeout_signaled = false;
|
||||
communicator_reconfigure_timeout_if_needed();
|
||||
|
||||
@@ -44,7 +44,7 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
|
||||
void *buffer);
|
||||
|
||||
extern void communicator_reconfigure_timeout_if_needed(void);
|
||||
extern void communicator_prefetch_pump_state(bool IsHandlingInterrupts);
|
||||
extern void communicator_prefetch_pump_state(void);
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
@@ -433,7 +433,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
|
||||
now = GetCurrentTimestamp();
|
||||
us_since_last_attempt = (int64) (now - shard->last_reconnect_time);
|
||||
shard->last_reconnect_time = now;
|
||||
|
||||
/*
|
||||
* Make sure we don't do exponential backoff with a constant multiplier
|
||||
@@ -447,14 +446,23 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
/*
|
||||
* If we did other tasks between reconnect attempts, then we won't
|
||||
* need to wait as long as a full delay.
|
||||
*
|
||||
* This is a loop to protect against interrupted sleeps.
|
||||
*/
|
||||
if (us_since_last_attempt < shard->delay_us)
|
||||
while (us_since_last_attempt < shard->delay_us)
|
||||
{
|
||||
pg_usleep(shard->delay_us - us_since_last_attempt);
|
||||
|
||||
/* At least we should handle cancellations here */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
now = GetCurrentTimestamp();
|
||||
us_since_last_attempt = (int64) (now - shard->last_reconnect_time);
|
||||
}
|
||||
|
||||
/* update the delay metric */
|
||||
shard->delay_us = Min(shard->delay_us * 2, MAX_RECONNECT_INTERVAL_USEC);
|
||||
shard->last_reconnect_time = now;
|
||||
|
||||
/*
|
||||
* Connect using the connection string we got from the
|
||||
|
||||
@@ -1179,7 +1179,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
blocknum += iterblocks;
|
||||
}
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
return false;
|
||||
}
|
||||
@@ -1218,7 +1218,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
||||
|
||||
communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
return false;
|
||||
}
|
||||
@@ -1262,7 +1262,7 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
*/
|
||||
neon_log(SmgrTrace, "writeback noop");
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -1315,7 +1315,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
}
|
||||
|
||||
/* Try to read PS results if they are available */
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
|
||||
|
||||
@@ -1339,7 +1339,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
|
||||
@@ -1449,7 +1449,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
nblocks, PG_IOV_MAX);
|
||||
|
||||
/* Try to read PS results if they are available */
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks);
|
||||
@@ -1480,7 +1480,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (forknum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
|
||||
@@ -1665,7 +1665,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
|
||||
|
||||
lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer);
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -1727,7 +1727,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
||||
|
||||
lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks);
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -1902,7 +1902,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
|
||||
|
||||
neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop");
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
|
||||
@@ -12,9 +12,9 @@ use tracing::{debug, warn};
|
||||
use crate::auth::password_hack::parse_endpoint_param;
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::metrics::{Metrics, SniKind};
|
||||
use crate::metrics::{Metrics, SniGroup, SniKind};
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::serverless::SERVERLESS_DRIVER_SNI;
|
||||
use crate::serverless::{AUTH_BROKER_SNI, SERVERLESS_DRIVER_SNI};
|
||||
use crate::types::{EndpointId, RoleName};
|
||||
|
||||
#[derive(Debug, Error, PartialEq, Eq, Clone)]
|
||||
@@ -65,7 +65,7 @@ pub(crate) fn endpoint_sni(sni: &str, common_names: &HashSet<String>) -> Option<
|
||||
if !common_names.contains(common_name) {
|
||||
return None;
|
||||
}
|
||||
if subdomain == SERVERLESS_DRIVER_SNI {
|
||||
if subdomain == SERVERLESS_DRIVER_SNI || subdomain == AUTH_BROKER_SNI {
|
||||
return None;
|
||||
}
|
||||
Some(EndpointId::from(subdomain))
|
||||
@@ -128,22 +128,23 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
|
||||
let metrics = Metrics::get();
|
||||
debug!(%user, "credentials");
|
||||
if sni.is_some() {
|
||||
|
||||
let protocol = ctx.protocol();
|
||||
let kind = if sni.is_some() {
|
||||
debug!("Connection with sni");
|
||||
metrics.proxy.accepted_connections_by_sni.inc(SniKind::Sni);
|
||||
SniKind::Sni
|
||||
} else if endpoint.is_some() {
|
||||
metrics
|
||||
.proxy
|
||||
.accepted_connections_by_sni
|
||||
.inc(SniKind::NoSni);
|
||||
debug!("Connection without sni");
|
||||
SniKind::NoSni
|
||||
} else {
|
||||
metrics
|
||||
.proxy
|
||||
.accepted_connections_by_sni
|
||||
.inc(SniKind::PasswordHack);
|
||||
debug!("Connection with password hack");
|
||||
}
|
||||
SniKind::PasswordHack
|
||||
};
|
||||
|
||||
metrics
|
||||
.proxy
|
||||
.accepted_connections_by_sni
|
||||
.inc(SniGroup { protocol, kind });
|
||||
|
||||
let options = NeonOptions::parse_params(params);
|
||||
|
||||
|
||||
@@ -423,8 +423,8 @@ async fn refresh_config_inner(
|
||||
if let Some(tls_config) = data.tls {
|
||||
let tls_config = tokio::task::spawn_blocking(move || {
|
||||
crate::tls::server_config::configure_tls(
|
||||
&tls_config.key_path,
|
||||
&tls_config.cert_path,
|
||||
tls_config.key_path.as_ref(),
|
||||
tls_config.cert_path.as_ref(),
|
||||
None,
|
||||
false,
|
||||
)
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
/// A stand-alone program that routes connections, e.g. from
|
||||
/// `aaa--bbb--1234.external.domain` to `aaa.bbb.internal.domain:1234`.
|
||||
///
|
||||
/// This allows connecting to pods/services running in the same Kubernetes cluster from
|
||||
/// the outside. Similar to an ingress controller for HTTPS.
|
||||
//! A stand-alone program that routes connections, e.g. from
|
||||
//! `aaa--bbb--1234.external.domain` to `aaa.bbb.internal.domain:1234`.
|
||||
//!
|
||||
//! This allows connecting to pods/services running in the same Kubernetes cluster from
|
||||
//! the outside. Similar to an ingress controller for HTTPS.
|
||||
|
||||
use std::path::Path;
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
|
||||
use anyhow::{Context, anyhow, bail, ensure};
|
||||
@@ -86,46 +88,7 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
args.get_one::<String>("tls-key"),
|
||||
args.get_one::<String>("tls-cert"),
|
||||
) {
|
||||
(Some(key_path), Some(cert_path)) => {
|
||||
let key = {
|
||||
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
|
||||
|
||||
let mut keys =
|
||||
rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]).collect_vec();
|
||||
|
||||
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
|
||||
PrivateKeyDer::Pkcs8(
|
||||
keys.pop()
|
||||
.expect("keys should not be empty")
|
||||
.context(format!("Failed to read TLS keys at '{key_path}'"))?,
|
||||
)
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain: Vec<_> = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
|
||||
})?
|
||||
};
|
||||
|
||||
// needed for channel bindings
|
||||
let first_cert = cert_chain.first().context("missing certificate")?;
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let tls_config =
|
||||
rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])
|
||||
.context("ring should support TLS1.2 and TLS1.3")?
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(cert_chain, key)?
|
||||
.into();
|
||||
|
||||
(tls_config, tls_server_end_point)
|
||||
}
|
||||
(Some(key_path), Some(cert_path)) => parse_tls(key_path.as_ref(), cert_path.as_ref())?,
|
||||
_ => bail!("tls-key and tls-cert must be specified"),
|
||||
};
|
||||
|
||||
@@ -188,7 +151,58 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
match signal {}
|
||||
}
|
||||
|
||||
async fn task_main(
|
||||
pub(super) fn parse_tls(
|
||||
key_path: &Path,
|
||||
cert_path: &Path,
|
||||
) -> anyhow::Result<(Arc<rustls::ServerConfig>, TlsServerEndPoint)> {
|
||||
let key = {
|
||||
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
|
||||
|
||||
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]).collect_vec();
|
||||
|
||||
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
|
||||
PrivateKeyDer::Pkcs8(
|
||||
keys.pop()
|
||||
.expect("keys should not be empty")
|
||||
.context(format!(
|
||||
"Failed to read TLS keys at '{}'",
|
||||
key_path.display()
|
||||
))?,
|
||||
)
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path).context(format!(
|
||||
"Failed to read TLS cert file at '{}.'",
|
||||
cert_path.display()
|
||||
))?;
|
||||
|
||||
let cert_chain: Vec<_> = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read TLS certificate chain from bytes from file at '{}'.",
|
||||
cert_path.display()
|
||||
)
|
||||
})?
|
||||
};
|
||||
|
||||
// needed for channel bindings
|
||||
let first_cert = cert_chain.first().context("missing certificate")?;
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let tls_config =
|
||||
rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])
|
||||
.context("ring should support TLS1.2 and TLS1.3")?
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(cert_chain, key)?
|
||||
.into();
|
||||
|
||||
Ok((tls_config, tls_server_end_point))
|
||||
}
|
||||
|
||||
pub(super) async fn task_main(
|
||||
dest_suffix: Arc<String>,
|
||||
tls_config: Arc<rustls::ServerConfig>,
|
||||
compute_tls_config: Option<Arc<rustls::ClientConfig>>,
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::bail;
|
||||
use anyhow::{bail, ensure};
|
||||
use arc_swap::ArcSwapOption;
|
||||
use futures::future::Either;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
@@ -62,18 +63,18 @@ struct ProxyCliArgs {
|
||||
region: String,
|
||||
/// listen for incoming client connections on ip:port
|
||||
#[clap(short, long, default_value = "127.0.0.1:4432")]
|
||||
proxy: String,
|
||||
proxy: SocketAddr,
|
||||
#[clap(value_enum, long, default_value_t = AuthBackendType::ConsoleRedirect)]
|
||||
auth_backend: AuthBackendType,
|
||||
/// listen for management callback connection on ip:port
|
||||
#[clap(short, long, default_value = "127.0.0.1:7000")]
|
||||
mgmt: String,
|
||||
mgmt: SocketAddr,
|
||||
/// listen for incoming http connections (metrics, etc) on ip:port
|
||||
#[clap(long, default_value = "127.0.0.1:7001")]
|
||||
http: String,
|
||||
http: SocketAddr,
|
||||
/// listen for incoming wss connections on ip:port
|
||||
#[clap(long)]
|
||||
wss: Option<String>,
|
||||
wss: Option<SocketAddr>,
|
||||
/// redirect unauthenticated users to the given uri in case of console redirect auth
|
||||
#[clap(short, long, default_value = "http://localhost:3000/psql_session/")]
|
||||
uri: String,
|
||||
@@ -99,18 +100,18 @@ struct ProxyCliArgs {
|
||||
///
|
||||
/// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
|
||||
#[clap(short = 'k', long, alias = "ssl-key")]
|
||||
tls_key: Option<String>,
|
||||
tls_key: Option<PathBuf>,
|
||||
/// path to TLS cert for client postgres connections
|
||||
///
|
||||
/// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
|
||||
#[clap(short = 'c', long, alias = "ssl-cert")]
|
||||
tls_cert: Option<String>,
|
||||
tls_cert: Option<PathBuf>,
|
||||
/// Allow writing TLS session keys to the given file pointed to by the environment variable `SSLKEYLOGFILE`.
|
||||
#[clap(long, alias = "allow-ssl-keylogfile")]
|
||||
allow_tls_keylogfile: bool,
|
||||
/// path to directory with TLS certificates for client postgres connections
|
||||
#[clap(long)]
|
||||
certs_dir: Option<String>,
|
||||
certs_dir: Option<PathBuf>,
|
||||
/// timeout for the TLS handshake
|
||||
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
|
||||
handshake_timeout: tokio::time::Duration,
|
||||
@@ -229,6 +230,9 @@ struct ProxyCliArgs {
|
||||
// TODO: rename to `console_redirect_confirmation_timeout`.
|
||||
#[clap(long, default_value = "2m", value_parser = humantime::parse_duration)]
|
||||
webauth_confirmation_timeout: std::time::Duration,
|
||||
|
||||
#[clap(flatten)]
|
||||
pg_sni_router: PgSniRouterArgs,
|
||||
}
|
||||
|
||||
#[derive(clap::Args, Clone, Copy, Debug)]
|
||||
@@ -277,6 +281,25 @@ struct SqlOverHttpArgs {
|
||||
sql_over_http_max_response_size_bytes: usize,
|
||||
}
|
||||
|
||||
#[derive(clap::Args, Clone, Debug)]
|
||||
struct PgSniRouterArgs {
|
||||
/// listen for incoming client connections on ip:port
|
||||
#[clap(id = "sni-router-listen", long, default_value = "127.0.0.1:4432")]
|
||||
listen: SocketAddr,
|
||||
/// listen for incoming client connections on ip:port, requiring TLS to compute
|
||||
#[clap(id = "sni-router-listen-tls", long, default_value = "127.0.0.1:4433")]
|
||||
listen_tls: SocketAddr,
|
||||
/// path to TLS key for client postgres connections
|
||||
#[clap(id = "sni-router-tls-key", long)]
|
||||
tls_key: Option<PathBuf>,
|
||||
/// path to TLS cert for client postgres connections
|
||||
#[clap(id = "sni-router-tls-cert", long)]
|
||||
tls_cert: Option<PathBuf>,
|
||||
/// append this domain zone to the SNI hostname to get the destination address
|
||||
#[clap(id = "sni-router-destination", long)]
|
||||
dest: Option<String>,
|
||||
}
|
||||
|
||||
pub async fn run() -> anyhow::Result<()> {
|
||||
let _logging_guard = crate::logging::init().await?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
@@ -307,73 +330,51 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"),
|
||||
}
|
||||
info!("Using region: {}", args.aws_region);
|
||||
|
||||
// TODO: untangle the config args
|
||||
let regional_redis_client = match (args.redis_auth_type.as_str(), &args.redis_notifications) {
|
||||
("plain", redis_url) => match redis_url {
|
||||
None => {
|
||||
bail!("plain auth requires redis_notifications to be set");
|
||||
}
|
||||
Some(url) => {
|
||||
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone()))
|
||||
}
|
||||
},
|
||||
("irsa", _) => match (&args.redis_host, args.redis_port) {
|
||||
(Some(host), Some(port)) => Some(
|
||||
ConnectionWithCredentialsProvider::new_with_credentials_provider(
|
||||
host.to_string(),
|
||||
port,
|
||||
elasticache::CredentialsProvider::new(
|
||||
args.aws_region,
|
||||
args.redis_cluster_name,
|
||||
args.redis_user_id,
|
||||
)
|
||||
.await,
|
||||
),
|
||||
),
|
||||
(None, None) => {
|
||||
warn!(
|
||||
"irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client"
|
||||
);
|
||||
None
|
||||
}
|
||||
_ => {
|
||||
bail!("redis-host and redis-port must be specified together");
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
bail!("unknown auth type given");
|
||||
}
|
||||
};
|
||||
|
||||
let redis_notifications_client = if let Some(url) = args.redis_notifications {
|
||||
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url))
|
||||
} else {
|
||||
regional_redis_client.clone()
|
||||
};
|
||||
let (regional_redis_client, redis_notifications_client) = configure_redis(&args).await?;
|
||||
|
||||
// Check that we can bind to address before further initialization
|
||||
let http_address: SocketAddr = args.http.parse()?;
|
||||
info!("Starting http on {http_address}");
|
||||
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
|
||||
info!("Starting http on {}", args.http);
|
||||
let http_listener = TcpListener::bind(args.http).await?.into_std()?;
|
||||
|
||||
let mgmt_address: SocketAddr = args.mgmt.parse()?;
|
||||
info!("Starting mgmt on {mgmt_address}");
|
||||
let mgmt_listener = TcpListener::bind(mgmt_address).await?;
|
||||
info!("Starting mgmt on {}", args.mgmt);
|
||||
let mgmt_listener = TcpListener::bind(args.mgmt).await?;
|
||||
|
||||
let proxy_listener = if args.is_auth_broker {
|
||||
None
|
||||
} else {
|
||||
let proxy_address: SocketAddr = args.proxy.parse()?;
|
||||
info!("Starting proxy on {proxy_address}");
|
||||
info!("Starting proxy on {}", args.proxy);
|
||||
Some(TcpListener::bind(args.proxy).await?)
|
||||
};
|
||||
|
||||
Some(TcpListener::bind(proxy_address).await?)
|
||||
let sni_router_listeners = {
|
||||
let args = &args.pg_sni_router;
|
||||
if args.dest.is_some() {
|
||||
ensure!(
|
||||
args.tls_key.is_some(),
|
||||
"sni-router-tls-key must be provided"
|
||||
);
|
||||
ensure!(
|
||||
args.tls_cert.is_some(),
|
||||
"sni-router-tls-cert must be provided"
|
||||
);
|
||||
|
||||
info!(
|
||||
"Starting pg-sni-router on {} and {}",
|
||||
args.listen, args.listen_tls
|
||||
);
|
||||
|
||||
Some((
|
||||
TcpListener::bind(args.listen).await?,
|
||||
TcpListener::bind(args.listen_tls).await?,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: rename the argument to something like serverless.
|
||||
// It now covers more than just websockets, it also covers SQL over HTTP.
|
||||
let serverless_listener = if let Some(serverless_address) = args.wss {
|
||||
let serverless_address: SocketAddr = serverless_address.parse()?;
|
||||
info!("Starting wss on {serverless_address}");
|
||||
Some(TcpListener::bind(serverless_address).await?)
|
||||
} else if args.is_auth_broker {
|
||||
@@ -458,6 +459,37 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// spawn pg-sni-router mode.
|
||||
if let Some((listen, listen_tls)) = sni_router_listeners {
|
||||
let args = args.pg_sni_router;
|
||||
let dest = args.dest.expect("already asserted it is set");
|
||||
let key_path = args.tls_key.expect("already asserted it is set");
|
||||
let cert_path = args.tls_cert.expect("already asserted it is set");
|
||||
|
||||
let (tls_config, tls_server_end_point) =
|
||||
super::pg_sni_router::parse_tls(&key_path, &cert_path)?;
|
||||
|
||||
let dest = Arc::new(dest);
|
||||
|
||||
client_tasks.spawn(super::pg_sni_router::task_main(
|
||||
dest.clone(),
|
||||
tls_config.clone(),
|
||||
None,
|
||||
tls_server_end_point,
|
||||
listen,
|
||||
cancellation_token.clone(),
|
||||
));
|
||||
|
||||
client_tasks.spawn(super::pg_sni_router::task_main(
|
||||
dest,
|
||||
tls_config,
|
||||
Some(config.connect_to_compute.tls.clone()),
|
||||
tls_server_end_point,
|
||||
listen_tls,
|
||||
cancellation_token.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
client_tasks.spawn(crate::context::parquet::worker(
|
||||
cancellation_token.clone(),
|
||||
args.parquet_upload,
|
||||
@@ -565,7 +597,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
(Some(key_path), Some(cert_path)) => Some(config::configure_tls(
|
||||
key_path,
|
||||
cert_path,
|
||||
args.certs_dir.as_ref(),
|
||||
args.certs_dir.as_deref(),
|
||||
args.allow_tls_keylogfile,
|
||||
)?),
|
||||
(None, None) => None,
|
||||
@@ -811,6 +843,60 @@ fn build_auth_backend(
|
||||
}
|
||||
}
|
||||
|
||||
async fn configure_redis(
|
||||
args: &ProxyCliArgs,
|
||||
) -> anyhow::Result<(
|
||||
Option<ConnectionWithCredentialsProvider>,
|
||||
Option<ConnectionWithCredentialsProvider>,
|
||||
)> {
|
||||
// TODO: untangle the config args
|
||||
let regional_redis_client = match (args.redis_auth_type.as_str(), &args.redis_notifications) {
|
||||
("plain", redis_url) => match redis_url {
|
||||
None => {
|
||||
bail!("plain auth requires redis_notifications to be set");
|
||||
}
|
||||
Some(url) => {
|
||||
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone()))
|
||||
}
|
||||
},
|
||||
("irsa", _) => match (&args.redis_host, args.redis_port) {
|
||||
(Some(host), Some(port)) => Some(
|
||||
ConnectionWithCredentialsProvider::new_with_credentials_provider(
|
||||
host.to_string(),
|
||||
port,
|
||||
elasticache::CredentialsProvider::new(
|
||||
args.aws_region.clone(),
|
||||
args.redis_cluster_name.clone(),
|
||||
args.redis_user_id.clone(),
|
||||
)
|
||||
.await,
|
||||
),
|
||||
),
|
||||
(None, None) => {
|
||||
// todo: upgrade to error?
|
||||
warn!(
|
||||
"irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client"
|
||||
);
|
||||
None
|
||||
}
|
||||
_ => {
|
||||
bail!("redis-host and redis-port must be specified together");
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
bail!("unknown auth type given");
|
||||
}
|
||||
};
|
||||
|
||||
let redis_notifications_client = if let Some(url) = &args.redis_notifications {
|
||||
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(&**url))
|
||||
} else {
|
||||
regional_redis_client.clone()
|
||||
};
|
||||
|
||||
Ok((regional_redis_client, redis_notifications_client))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -6,12 +6,12 @@ use ipnet::{IpNet, Ipv4Net, Ipv6Net};
|
||||
use postgres_client::CancelToken;
|
||||
use postgres_client::tls::MakeTlsConnect;
|
||||
use pq_proto::CancelKeyData;
|
||||
use redis::{FromRedisValue, Pipeline, Value, pipe};
|
||||
use redis::{Cmd, FromRedisValue, Value};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tracing::{debug, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::auth::{AuthError, check_peer_addr_is_in_list};
|
||||
@@ -56,8 +56,86 @@ pub enum CancelKeyOp {
|
||||
},
|
||||
}
|
||||
|
||||
type Callback = Box<dyn FnOnce(anyhow::Result<&[redis::Value]>) + Send>;
|
||||
pub struct Pipeline {
|
||||
inner: redis::Pipeline,
|
||||
// vec![(number of commands, fn(values))]
|
||||
replies: Vec<(usize, Callback)>,
|
||||
}
|
||||
|
||||
impl Pipeline {
|
||||
fn with_capacity(n: usize) -> Self {
|
||||
Self {
|
||||
inner: redis::Pipeline::with_capacity(n),
|
||||
replies: Vec::with_capacity(n),
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute(&mut self, client: &mut RedisKVClient) {
|
||||
let commands = self.inner.len();
|
||||
let batch_size = self.replies.len();
|
||||
|
||||
match client.query(&self.inner).await {
|
||||
Ok(Value::Array(values)) if values.len() == commands => {
|
||||
debug!(
|
||||
commands,
|
||||
batch_size, "successfully completed cancellation jobs",
|
||||
);
|
||||
let mut values = &*values;
|
||||
for (n, resp) in self.replies.drain(..) {
|
||||
let (v, rest) = values.split_at(n);
|
||||
values = rest;
|
||||
resp(Ok(v));
|
||||
}
|
||||
}
|
||||
Ok(value) => {
|
||||
error!(
|
||||
commands,
|
||||
batch_size,
|
||||
?value,
|
||||
"unexpected redis return value"
|
||||
);
|
||||
for (_n, resp) in self.replies.drain(..) {
|
||||
resp(Err(anyhow!("incorrect response type from redis")));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
for (_n, resp) in self.replies.drain(..) {
|
||||
resp(Err(anyhow!("could not send cmd to redis: {err}")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.inner.clear();
|
||||
self.replies.clear();
|
||||
}
|
||||
|
||||
/// Add a batch of commands to the pipeline, and run the resp fn when they are all done.
|
||||
///
|
||||
/// If multiple commands are provided, the response should be able to decode
|
||||
/// all of the values. You can provide a tuple in that case.
|
||||
fn add_commands<F, T, const N: usize>(&mut self, cmds: [Cmd; N], resp: F)
|
||||
where
|
||||
F: FnOnce(anyhow::Result<T>) + Send + 'static,
|
||||
T: FromRedisValue,
|
||||
{
|
||||
for cmd in cmds {
|
||||
self.inner.add_command(cmd);
|
||||
}
|
||||
let reply = Box::new(move |res: anyhow::Result<&[redis::Value]>| {
|
||||
let res = match res {
|
||||
Ok(v) => T::from_redis_value(&redis::Value::Array(v.to_owned()))
|
||||
.context("could not parse value"),
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
resp(res);
|
||||
});
|
||||
self.replies.push((N, reply as Box<_>));
|
||||
}
|
||||
}
|
||||
|
||||
impl CancelKeyOp {
|
||||
fn register(self, pipe: &mut Pipeline) -> Option<CancelReplyOp> {
|
||||
fn register(self, pipe: &mut Pipeline) {
|
||||
#[allow(clippy::used_underscore_binding)]
|
||||
match self {
|
||||
CancelKeyOp::StoreCancelKey {
|
||||
@@ -68,18 +146,30 @@ impl CancelKeyOp {
|
||||
_guard,
|
||||
expire,
|
||||
} => {
|
||||
pipe.hset(&key, field, value);
|
||||
pipe.expire(key, expire);
|
||||
let resp_tx = resp_tx?;
|
||||
Some(CancelReplyOp::StoreCancelKey { resp_tx, _guard })
|
||||
pipe.add_commands(
|
||||
[Cmd::hset(&key, field, value), Cmd::expire(key, expire)],
|
||||
// ignore all results
|
||||
move |res: anyhow::Result<()>| {
|
||||
let _guard = _guard;
|
||||
if let Some(resp_tx) = resp_tx {
|
||||
if resp_tx.send(res).is_err() {
|
||||
tracing::debug!("could not send reply");
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
CancelKeyOp::GetCancelData {
|
||||
key,
|
||||
resp_tx,
|
||||
_guard,
|
||||
} => {
|
||||
pipe.hgetall(key);
|
||||
Some(CancelReplyOp::GetCancelData { resp_tx, _guard })
|
||||
pipe.add_commands([Cmd::hgetall(key)], move |res| {
|
||||
let _guard = _guard;
|
||||
if resp_tx.send(res).is_err() {
|
||||
tracing::debug!("could not send reply");
|
||||
}
|
||||
});
|
||||
}
|
||||
CancelKeyOp::RemoveCancelKey {
|
||||
key,
|
||||
@@ -87,79 +177,14 @@ impl CancelKeyOp {
|
||||
resp_tx,
|
||||
_guard,
|
||||
} => {
|
||||
pipe.hdel(key, field);
|
||||
let resp_tx = resp_tx?;
|
||||
Some(CancelReplyOp::RemoveCancelKey { resp_tx, _guard })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Message types for sending through mpsc channel
|
||||
pub enum CancelReplyOp {
|
||||
StoreCancelKey {
|
||||
resp_tx: oneshot::Sender<anyhow::Result<()>>,
|
||||
_guard: CancelChannelSizeGuard<'static>,
|
||||
},
|
||||
GetCancelData {
|
||||
resp_tx: oneshot::Sender<anyhow::Result<Vec<(String, String)>>>,
|
||||
_guard: CancelChannelSizeGuard<'static>,
|
||||
},
|
||||
RemoveCancelKey {
|
||||
resp_tx: oneshot::Sender<anyhow::Result<()>>,
|
||||
_guard: CancelChannelSizeGuard<'static>,
|
||||
},
|
||||
}
|
||||
|
||||
impl CancelReplyOp {
|
||||
fn send_err(self, e: anyhow::Error) {
|
||||
match self {
|
||||
CancelReplyOp::StoreCancelKey { resp_tx, _guard } => {
|
||||
resp_tx
|
||||
.send(Err(e))
|
||||
.inspect_err(|_| tracing::debug!("could not send reply"))
|
||||
.ok();
|
||||
}
|
||||
CancelReplyOp::GetCancelData { resp_tx, _guard } => {
|
||||
resp_tx
|
||||
.send(Err(e))
|
||||
.inspect_err(|_| tracing::debug!("could not send reply"))
|
||||
.ok();
|
||||
}
|
||||
CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => {
|
||||
resp_tx
|
||||
.send(Err(e))
|
||||
.inspect_err(|_| tracing::debug!("could not send reply"))
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn send_value(self, v: redis::Value) {
|
||||
match self {
|
||||
CancelReplyOp::StoreCancelKey { resp_tx, _guard } => {
|
||||
let send =
|
||||
FromRedisValue::from_owned_redis_value(v).context("could not parse value");
|
||||
resp_tx
|
||||
.send(send)
|
||||
.inspect_err(|_| tracing::debug!("could not send reply"))
|
||||
.ok();
|
||||
}
|
||||
CancelReplyOp::GetCancelData { resp_tx, _guard } => {
|
||||
let send =
|
||||
FromRedisValue::from_owned_redis_value(v).context("could not parse value");
|
||||
resp_tx
|
||||
.send(send)
|
||||
.inspect_err(|_| tracing::debug!("could not send reply"))
|
||||
.ok();
|
||||
}
|
||||
CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => {
|
||||
let send =
|
||||
FromRedisValue::from_owned_redis_value(v).context("could not parse value");
|
||||
resp_tx
|
||||
.send(send)
|
||||
.inspect_err(|_| tracing::debug!("could not send reply"))
|
||||
.ok();
|
||||
pipe.add_commands([Cmd::hdel(key, field)], move |res| {
|
||||
let _guard = _guard;
|
||||
if let Some(resp_tx) = resp_tx {
|
||||
if resp_tx.send(res).is_err() {
|
||||
tracing::debug!("could not send reply");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -170,8 +195,8 @@ pub async fn handle_cancel_messages(
|
||||
client: &mut RedisKVClient,
|
||||
mut rx: mpsc::Receiver<CancelKeyOp>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut batch = Vec::new();
|
||||
let mut replies = vec![];
|
||||
let mut batch = Vec::with_capacity(BATCH_SIZE);
|
||||
let mut pipeline = Pipeline::with_capacity(BATCH_SIZE);
|
||||
|
||||
loop {
|
||||
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
|
||||
@@ -182,42 +207,11 @@ pub async fn handle_cancel_messages(
|
||||
let batch_size = batch.len();
|
||||
debug!(batch_size, "running cancellation jobs");
|
||||
|
||||
let mut pipe = pipe();
|
||||
for msg in batch.drain(..) {
|
||||
if let Some(reply) = msg.register(&mut pipe) {
|
||||
replies.push(reply);
|
||||
} else {
|
||||
pipe.ignore();
|
||||
}
|
||||
msg.register(&mut pipeline);
|
||||
}
|
||||
|
||||
let responses = replies.len();
|
||||
|
||||
match client.query(pipe).await {
|
||||
// for each reply, we expect that many values.
|
||||
Ok(Value::Array(values)) if values.len() == responses => {
|
||||
debug!(
|
||||
batch_size,
|
||||
responses, "successfully completed cancellation jobs",
|
||||
);
|
||||
for (value, reply) in std::iter::zip(values, replies.drain(..)) {
|
||||
reply.send_value(value);
|
||||
}
|
||||
}
|
||||
Ok(value) => {
|
||||
debug!(?value, "unexpected redis return value");
|
||||
for reply in replies.drain(..) {
|
||||
reply.send_err(anyhow!("incorrect response type from redis"));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
for reply in replies.drain(..) {
|
||||
reply.send_err(anyhow!("could not send cmd to redis: {err}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
replies.clear();
|
||||
pipeline.execute(client).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -115,8 +115,8 @@ pub struct ProxyMetrics {
|
||||
#[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
|
||||
pub allowed_vpc_endpoint_ids: Histogram<10>,
|
||||
|
||||
/// Number of connections (per sni).
|
||||
pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
|
||||
/// Number of connections, by the method we used to determine the endpoint.
|
||||
pub accepted_connections_by_sni: CounterVec<SniSet>,
|
||||
|
||||
/// Number of connection failures (per kind).
|
||||
pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
|
||||
@@ -342,11 +342,20 @@ pub enum LatencyExclusions {
|
||||
ClientCplaneComputeRetry,
|
||||
}
|
||||
|
||||
#[derive(LabelGroup)]
|
||||
#[label(set = SniSet)]
|
||||
pub struct SniGroup {
|
||||
pub protocol: Protocol,
|
||||
pub kind: SniKind,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Copy, Clone)]
|
||||
#[label(singleton = "kind")]
|
||||
pub enum SniKind {
|
||||
/// Domain name based routing. SNI for libpq/websockets. Host for HTTP
|
||||
Sni,
|
||||
/// Metadata based routing. `options` for libpq/websockets. Header for HTTP
|
||||
NoSni,
|
||||
/// Metadata based routing, using the password field.
|
||||
PasswordHack,
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ impl RedisKVClient {
|
||||
|
||||
pub(crate) async fn query<T: FromRedisValue>(
|
||||
&mut self,
|
||||
q: impl Queryable,
|
||||
q: &impl Queryable,
|
||||
) -> anyhow::Result<T> {
|
||||
if !self.limiter.check() {
|
||||
tracing::info!("Rate limit exceeded. Skipping query");
|
||||
|
||||
@@ -56,6 +56,7 @@ use crate::serverless::backend::PoolingBackend;
|
||||
use crate::serverless::http_util::{api_error_into_response, json_response};
|
||||
|
||||
pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
|
||||
pub(crate) const AUTH_BROKER_SNI: &str = "apiauth";
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
|
||||
@@ -38,7 +38,7 @@ use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig};
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::http::{ReadBodyError, read_body_with_limit};
|
||||
use crate::metrics::{HttpDirection, Metrics};
|
||||
use crate::metrics::{HttpDirection, Metrics, SniGroup, SniKind};
|
||||
use crate::proxy::{NeonOptions, run_until_cancelled};
|
||||
use crate::serverless::backend::HttpConnError;
|
||||
use crate::types::{DbName, RoleName};
|
||||
@@ -227,6 +227,32 @@ fn get_conn_info(
|
||||
}
|
||||
}
|
||||
|
||||
// check the URL that was used, for metrics
|
||||
{
|
||||
let host_endpoint = headers
|
||||
// get the host header
|
||||
.get("host")
|
||||
// extract the domain
|
||||
.and_then(|h| {
|
||||
let (host, _port) = h.to_str().ok()?.split_once(':')?;
|
||||
Some(host)
|
||||
})
|
||||
// get the endpoint prefix
|
||||
.map(|h| h.split_once('.').map_or(h, |(prefix, _)| prefix));
|
||||
|
||||
let kind = if host_endpoint == Some(&*endpoint) {
|
||||
SniKind::Sni
|
||||
} else {
|
||||
SniKind::NoSni
|
||||
};
|
||||
|
||||
let protocol = ctx.protocol();
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.accepted_connections_by_sni
|
||||
.inc(SniGroup { protocol, kind });
|
||||
}
|
||||
|
||||
ctx.set_user_agent(
|
||||
headers
|
||||
.get(hyper::header::USER_AGENT)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context, bail};
|
||||
@@ -21,9 +22,9 @@ pub struct TlsConfig {
|
||||
|
||||
/// Configure TLS for the main endpoint.
|
||||
pub fn configure_tls(
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
certs_dir: Option<&String>,
|
||||
key_path: &Path,
|
||||
cert_path: &Path,
|
||||
certs_dir: Option<&Path>,
|
||||
allow_tls_keylogfile: bool,
|
||||
) -> anyhow::Result<TlsConfig> {
|
||||
// add default certificate
|
||||
@@ -39,8 +40,7 @@ pub fn configure_tls(
|
||||
let key_path = path.join("tls.key");
|
||||
let cert_path = path.join("tls.crt");
|
||||
if key_path.exists() && cert_path.exists() {
|
||||
cert_resolver
|
||||
.add_cert_path(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
|
||||
cert_resolver.add_cert_path(&key_path, &cert_path)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -86,7 +86,7 @@ pub struct CertResolver {
|
||||
}
|
||||
|
||||
impl CertResolver {
|
||||
fn parse_new(key_path: &str, cert_path: &str) -> anyhow::Result<Self> {
|
||||
fn parse_new(key_path: &Path, cert_path: &Path) -> anyhow::Result<Self> {
|
||||
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
|
||||
Self::new(priv_key, cert_chain)
|
||||
}
|
||||
@@ -103,7 +103,7 @@ impl CertResolver {
|
||||
Ok(Self { certs, default })
|
||||
}
|
||||
|
||||
fn add_cert_path(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
|
||||
fn add_cert_path(&mut self, key_path: &Path, cert_path: &Path) -> anyhow::Result<()> {
|
||||
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
|
||||
self.add_cert(priv_key, cert_chain)
|
||||
}
|
||||
@@ -124,26 +124,29 @@ impl CertResolver {
|
||||
}
|
||||
|
||||
fn parse_key_cert(
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
key_path: &Path,
|
||||
cert_path: &Path,
|
||||
) -> anyhow::Result<(PrivateKeyDer<'static>, Vec<CertificateDer<'static>>)> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
.with_context(|| format!("Failed to read TLS keys at '{}'", key_path.display()))?;
|
||||
rustls_pemfile::private_key(&mut &key_bytes[..])
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{}'", key_path.display()))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{}'", key_path.display()))?
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
let cert_chain_bytes = std::fs::read(cert_path).context(format!(
|
||||
"Failed to read TLS cert file at '{}.'",
|
||||
cert_path.display()
|
||||
))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
|
||||
"Failed to read TLS certificate chain from bytes from file at '{}'.",
|
||||
cert_path.display()
|
||||
)
|
||||
})?
|
||||
};
|
||||
|
||||
@@ -121,6 +121,20 @@ impl Client {
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn switch_timeline_membership(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &models::TimelineMembershipSwitchRequest,
|
||||
) -> Result<models::TimelineMembershipSwitchResponse> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/membership",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
);
|
||||
let resp = self.put(&uri, req).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
|
||||
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
|
||||
let resp = self
|
||||
|
||||
@@ -243,8 +243,7 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
|
||||
let resp =
|
||||
pull_timeline::handle_request(data, conf.sk_auth_token.clone(), ca_certs, global_timelines)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.await?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::{SinkExt, StreamExt, TryStreamExt};
|
||||
use http_utils::error::ApiError;
|
||||
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
|
||||
use reqwest::Certificate;
|
||||
use safekeeper_api::Term;
|
||||
@@ -30,7 +31,7 @@ use utils::pausable_failpoint;
|
||||
|
||||
use crate::control_file::CONTROL_FILE_NAME;
|
||||
use crate::state::{EvictionState, TimelinePersistentState};
|
||||
use crate::timeline::{Timeline, WalResidentTimeline};
|
||||
use crate::timeline::{Timeline, TimelineError, WalResidentTimeline};
|
||||
use crate::timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline};
|
||||
use crate::wal_storage::open_wal_file;
|
||||
use crate::{GlobalTimelines, debug_dump, wal_backup};
|
||||
@@ -395,7 +396,7 @@ pub async fn handle_request(
|
||||
sk_auth_token: Option<SecretString>,
|
||||
ssl_ca_certs: Vec<Certificate>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<PullTimelineResponse> {
|
||||
) -> Result<PullTimelineResponse, ApiError> {
|
||||
let existing_tli = global_timelines.get(TenantTimelineId::new(
|
||||
request.tenant_id,
|
||||
request.timeline_id,
|
||||
@@ -411,7 +412,9 @@ pub async fn handle_request(
|
||||
for ssl_ca_cert in ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert);
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
let http_client = http_client
|
||||
.build()
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
let http_hosts = request.http_hosts.clone();
|
||||
|
||||
@@ -443,10 +446,10 @@ pub async fn handle_request(
|
||||
// offline and C comes online. Then we want a pull on C with A and B as hosts to work.
|
||||
let min_required_successful = (http_hosts.len() - 1).max(1);
|
||||
if statuses.len() < min_required_successful {
|
||||
bail!(
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"only got {} successful status responses. required: {min_required_successful}",
|
||||
statuses.len()
|
||||
)
|
||||
)));
|
||||
}
|
||||
|
||||
// Find the most advanced safekeeper
|
||||
@@ -465,14 +468,32 @@ pub async fn handle_request(
|
||||
assert!(status.tenant_id == request.tenant_id);
|
||||
assert!(status.timeline_id == request.timeline_id);
|
||||
|
||||
pull_timeline(
|
||||
let check_tombstone = !request.ignore_tombstone.unwrap_or_default();
|
||||
|
||||
match pull_timeline(
|
||||
status,
|
||||
safekeeper_host,
|
||||
sk_auth_token,
|
||||
http_client,
|
||||
global_timelines,
|
||||
check_tombstone,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(resp) => Ok(resp),
|
||||
Err(e) => {
|
||||
match e.downcast_ref::<TimelineError>() {
|
||||
Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
|
||||
safekeeper_host: None,
|
||||
}),
|
||||
Some(TimelineError::CreationInProgress(_)) => {
|
||||
// We don't return success here because creation might still fail.
|
||||
Err(ApiError::Conflict("Creation in progress".to_owned()))
|
||||
}
|
||||
_ => Err(ApiError::InternalServerError(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn pull_timeline(
|
||||
@@ -481,6 +502,7 @@ async fn pull_timeline(
|
||||
sk_auth_token: Option<SecretString>,
|
||||
http_client: reqwest::Client,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
check_tombstone: bool,
|
||||
) -> Result<PullTimelineResponse> {
|
||||
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
|
||||
info!(
|
||||
@@ -552,7 +574,7 @@ async fn pull_timeline(
|
||||
|
||||
// Finally, load the timeline.
|
||||
let _tli = global_timelines
|
||||
.load_temp_timeline(ttid, &tli_dir_path, false)
|
||||
.load_temp_timeline(ttid, &tli_dir_path, check_tombstone)
|
||||
.await?;
|
||||
|
||||
Ok(PullTimelineResponse {
|
||||
|
||||
@@ -98,6 +98,23 @@ impl SafekeeperClient {
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn switch_timeline_membership(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &models::TimelineMembershipSwitchRequest,
|
||||
) -> Result<models::TimelineMembershipSwitchResponse> {
|
||||
measured_request!(
|
||||
"switch_timeline_membership",
|
||||
crate::metrics::Method::Put,
|
||||
&self.node_id_label,
|
||||
self.inner
|
||||
.switch_timeline_membership(tenant_id, timeline_id, req)
|
||||
.await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_tenant(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -3886,10 +3886,10 @@ impl Service {
|
||||
|
||||
None
|
||||
} else if safekeepers {
|
||||
// Note that we do not support creating the timeline on the safekeepers
|
||||
// for imported timelines. The `start_lsn` of the timeline is not known
|
||||
// until the import finshes.
|
||||
// https://github.com/neondatabase/neon/issues/11569
|
||||
// Note that for imported timelines, we do not create the timeline on the safekeepers
|
||||
// straight away. Instead, we do it once the import finalized such that we know what
|
||||
// start LSN to provide for the safekeepers. This is done in
|
||||
// [`Self::finalize_timeline_import`].
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
|
||||
@@ -3966,11 +3966,22 @@ impl Service {
|
||||
let active = self.timeline_active_on_all_shards(&import).await?;
|
||||
|
||||
match active {
|
||||
true => {
|
||||
Some(timeline_info) => {
|
||||
tracing::info!("Timeline became active on all shards");
|
||||
|
||||
if self.config.timelines_onto_safekeepers {
|
||||
// Now that we know the start LSN of this timeline, create it on the
|
||||
// safekeepers.
|
||||
self.tenant_timeline_create_safekeepers_until_success(
|
||||
import.tenant_id,
|
||||
timeline_info,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
false => {
|
||||
None => {
|
||||
tracing::info!("Timeline not active on all shards yet");
|
||||
|
||||
tokio::select! {
|
||||
@@ -4004,9 +4015,6 @@ impl Service {
|
||||
.range_mut(TenantShardId::tenant_range(import.tenant_id))
|
||||
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle);
|
||||
|
||||
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
|
||||
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
|
||||
// https://github.com/neondatabase/neon/issues/11569
|
||||
tracing::info!(%import_failed, "Timeline import complete");
|
||||
|
||||
Ok(())
|
||||
@@ -4021,10 +4029,16 @@ impl Service {
|
||||
.await;
|
||||
}
|
||||
|
||||
/// If the timeline is active on all shards, returns the [`TimelineInfo`]
|
||||
/// collected from shard 0.
|
||||
///
|
||||
/// An error is returned if the shard layout has changed during the import.
|
||||
/// This is guarded against within the storage controller and the pageserver,
|
||||
/// and, therefore, unexpected.
|
||||
async fn timeline_active_on_all_shards(
|
||||
self: &Arc<Self>,
|
||||
import: &TimelineImport,
|
||||
) -> anyhow::Result<bool> {
|
||||
) -> anyhow::Result<Option<TimelineInfo>> {
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
@@ -4048,13 +4062,17 @@ impl Service {
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
} else {
|
||||
return Ok(false);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
targets
|
||||
};
|
||||
|
||||
if targets.is_empty() {
|
||||
anyhow::bail!("No shards found to finalize import for");
|
||||
}
|
||||
|
||||
let results = self
|
||||
.tenant_for_shards_api(
|
||||
targets,
|
||||
@@ -4070,10 +4088,17 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(results.into_iter().all(|res| match res {
|
||||
let all_active = results.iter().all(|res| match res {
|
||||
Ok(info) => info.state == TimelineState::Active,
|
||||
Err(_) => false,
|
||||
}))
|
||||
});
|
||||
|
||||
if all_active {
|
||||
// Both unwraps are validated above
|
||||
Ok(Some(results.into_iter().next().unwrap().unwrap()))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_archival_config(
|
||||
@@ -8485,7 +8510,7 @@ impl Service {
|
||||
// By default, live migrations are generous about the wait time for getting
|
||||
// the secondary location up to speed. When draining, give up earlier in order
|
||||
// to not stall the operation when a cold secondary is encountered.
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
|
||||
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
|
||||
@@ -8818,7 +8843,7 @@ impl Service {
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
|
||||
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
|
||||
|
||||
@@ -1,4 +1,9 @@
|
||||
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
str::FromStr,
|
||||
sync::{Arc, atomic::AtomicU64},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use clashmap::{ClashMap, Entry};
|
||||
use safekeeper_api::models::PullTimelineRequest;
|
||||
@@ -169,10 +174,17 @@ pub(crate) struct ScheduleRequest {
|
||||
pub(crate) kind: SafekeeperTimelineOpKind,
|
||||
}
|
||||
|
||||
/// A way to keep ongoing/queued reconcile requests apart
|
||||
#[derive(Copy, Clone, PartialEq, Eq)]
|
||||
struct TokenId(u64);
|
||||
|
||||
type OngoingTokens = ClashMap<(TenantId, Option<TimelineId>), (CancellationToken, TokenId)>;
|
||||
|
||||
/// Handle to per safekeeper reconciler.
|
||||
struct ReconcilerHandle {
|
||||
tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
|
||||
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
|
||||
tx: UnboundedSender<(ScheduleRequest, CancellationToken, TokenId)>,
|
||||
ongoing_tokens: Arc<OngoingTokens>,
|
||||
token_id_counter: AtomicU64,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
@@ -185,24 +197,28 @@ impl ReconcilerHandle {
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
) -> CancellationToken {
|
||||
) -> (CancellationToken, TokenId) {
|
||||
let token_id = self
|
||||
.token_id_counter
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let token_id = TokenId(token_id);
|
||||
let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
|
||||
if let Entry::Occupied(entry) = &entry {
|
||||
let cancel: &CancellationToken = entry.get();
|
||||
let (cancel, _) = entry.get();
|
||||
cancel.cancel();
|
||||
}
|
||||
entry.insert(self.cancel.child_token()).clone()
|
||||
entry.insert((self.cancel.child_token(), token_id)).clone()
|
||||
}
|
||||
/// Cancel an ongoing reconciliation
|
||||
fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
|
||||
if let Some((_, cancel)) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
|
||||
if let Some((_, (cancel, _id))) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
|
||||
cancel.cancel();
|
||||
}
|
||||
}
|
||||
fn schedule_reconcile(&self, req: ScheduleRequest) {
|
||||
let cancel = self.new_token_slot(req.tenant_id, req.timeline_id);
|
||||
let (cancel, token_id) = self.new_token_slot(req.tenant_id, req.timeline_id);
|
||||
let hostname = req.safekeeper.skp.host.clone();
|
||||
if let Err(err) = self.tx.send((req, cancel)) {
|
||||
if let Err(err) = self.tx.send((req, cancel, token_id)) {
|
||||
tracing::info!("scheduling request onto {hostname} returned error: {err}");
|
||||
}
|
||||
}
|
||||
@@ -211,13 +227,14 @@ impl ReconcilerHandle {
|
||||
pub(crate) struct SafekeeperReconciler {
|
||||
inner: SafekeeperReconcilerInner,
|
||||
concurrency_limiter: Arc<Semaphore>,
|
||||
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>,
|
||||
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken, TokenId)>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
/// Thin wrapper over `Service` to not clutter its inherent functions
|
||||
#[derive(Clone)]
|
||||
struct SafekeeperReconcilerInner {
|
||||
ongoing_tokens: Arc<OngoingTokens>,
|
||||
service: Arc<Service>,
|
||||
}
|
||||
|
||||
@@ -226,15 +243,20 @@ impl SafekeeperReconciler {
|
||||
// We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
let concurrency = service.config.safekeeper_reconciler_concurrency;
|
||||
let ongoing_tokens = Arc::new(ClashMap::new());
|
||||
let mut reconciler = SafekeeperReconciler {
|
||||
inner: SafekeeperReconcilerInner { service },
|
||||
inner: SafekeeperReconcilerInner {
|
||||
service,
|
||||
ongoing_tokens: ongoing_tokens.clone(),
|
||||
},
|
||||
rx,
|
||||
concurrency_limiter: Arc::new(Semaphore::new(concurrency)),
|
||||
cancel: cancel.clone(),
|
||||
};
|
||||
let handle = ReconcilerHandle {
|
||||
tx,
|
||||
ongoing_tokens: Arc::new(ClashMap::new()),
|
||||
ongoing_tokens,
|
||||
token_id_counter: AtomicU64::new(0),
|
||||
cancel,
|
||||
};
|
||||
tokio::spawn(async move { reconciler.run().await });
|
||||
@@ -246,7 +268,9 @@ impl SafekeeperReconciler {
|
||||
req = self.rx.recv() => req,
|
||||
_ = self.cancel.cancelled() => break,
|
||||
};
|
||||
let Some((req, req_cancel)) = req else { break };
|
||||
let Some((req, req_cancel, req_token_id)) = req else {
|
||||
break;
|
||||
};
|
||||
|
||||
let permit_res = tokio::select! {
|
||||
req = self.concurrency_limiter.clone().acquire_owned() => req,
|
||||
@@ -265,7 +289,7 @@ impl SafekeeperReconciler {
|
||||
let timeline_id = req.timeline_id;
|
||||
let node_id = req.safekeeper.skp.id;
|
||||
inner
|
||||
.reconcile_one(req, req_cancel)
|
||||
.reconcile_one(req, req_cancel, req_token_id)
|
||||
.instrument(tracing::info_span!(
|
||||
"reconcile_one",
|
||||
?kind,
|
||||
@@ -280,8 +304,14 @@ impl SafekeeperReconciler {
|
||||
}
|
||||
|
||||
impl SafekeeperReconcilerInner {
|
||||
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) {
|
||||
async fn reconcile_one(
|
||||
&self,
|
||||
req: ScheduleRequest,
|
||||
req_cancel: CancellationToken,
|
||||
req_token_id: TokenId,
|
||||
) {
|
||||
let req_host = req.safekeeper.skp.host.clone();
|
||||
let success;
|
||||
match req.kind {
|
||||
SafekeeperTimelineOpKind::Pull => {
|
||||
let Some(timeline_id) = req.timeline_id else {
|
||||
@@ -301,20 +331,24 @@ impl SafekeeperReconcilerInner {
|
||||
http_hosts,
|
||||
tenant_id: req.tenant_id,
|
||||
timeline_id,
|
||||
ignore_tombstone: Some(false),
|
||||
};
|
||||
self.reconcile_inner(
|
||||
req,
|
||||
async |client| client.pull_timeline(&pull_req).await,
|
||||
|resp| {
|
||||
if let Some(host) = resp.safekeeper_host {
|
||||
tracing::info!("pulled timeline from {host} onto {req_host}");
|
||||
} else {
|
||||
tracing::info!("timeline already present on safekeeper on {req_host}");
|
||||
}
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
success = self
|
||||
.reconcile_inner(
|
||||
&req,
|
||||
async |client| client.pull_timeline(&pull_req).await,
|
||||
|resp| {
|
||||
if let Some(host) = resp.safekeeper_host {
|
||||
tracing::info!("pulled timeline from {host} onto {req_host}");
|
||||
} else {
|
||||
tracing::info!(
|
||||
"timeline already present on safekeeper on {req_host}"
|
||||
);
|
||||
}
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
SafekeeperTimelineOpKind::Exclude => {
|
||||
// TODO actually exclude instead of delete here
|
||||
@@ -325,22 +359,23 @@ impl SafekeeperReconcilerInner {
|
||||
);
|
||||
return;
|
||||
};
|
||||
self.reconcile_inner(
|
||||
req,
|
||||
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|
||||
|_resp| {
|
||||
tracing::info!("deleted timeline from {req_host}");
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
success = self
|
||||
.reconcile_inner(
|
||||
&req,
|
||||
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|
||||
|_resp| {
|
||||
tracing::info!("deleted timeline from {req_host}");
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
SafekeeperTimelineOpKind::Delete => {
|
||||
let tenant_id = req.tenant_id;
|
||||
if let Some(timeline_id) = req.timeline_id {
|
||||
let deleted = self
|
||||
success = self
|
||||
.reconcile_inner(
|
||||
req,
|
||||
&req,
|
||||
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|
||||
|_resp| {
|
||||
tracing::info!("deleted timeline from {req_host}");
|
||||
@@ -348,13 +383,13 @@ impl SafekeeperReconcilerInner {
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
if deleted {
|
||||
if success {
|
||||
self.delete_timeline_from_db(tenant_id, timeline_id).await;
|
||||
}
|
||||
} else {
|
||||
let deleted = self
|
||||
success = self
|
||||
.reconcile_inner(
|
||||
req,
|
||||
&req,
|
||||
async |client| client.delete_tenant(tenant_id).await,
|
||||
|_resp| {
|
||||
tracing::info!(%tenant_id, "deleted tenant from {req_host}");
|
||||
@@ -362,12 +397,21 @@ impl SafekeeperReconcilerInner {
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
if deleted {
|
||||
if success {
|
||||
self.delete_tenant_timelines_from_db(tenant_id).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if success {
|
||||
self.ongoing_tokens.remove_if(
|
||||
&(req.tenant_id, req.timeline_id),
|
||||
|_ttid, (_cancel, token_id)| {
|
||||
// Ensure that this request is indeed the request we just finished and not a new one
|
||||
req_token_id == *token_id
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) {
|
||||
match self
|
||||
@@ -421,10 +465,10 @@ impl SafekeeperReconcilerInner {
|
||||
self.delete_timeline_from_db(tenant_id, timeline_id).await;
|
||||
}
|
||||
}
|
||||
/// Returns whether the reconciliation happened successfully
|
||||
/// Returns whether the reconciliation happened successfully (or we got cancelled)
|
||||
async fn reconcile_inner<T, F, U>(
|
||||
&self,
|
||||
req: ScheduleRequest,
|
||||
req: &ScheduleRequest,
|
||||
closure: impl Fn(SafekeeperClient) -> F,
|
||||
log_success: impl FnOnce(T) -> U,
|
||||
req_cancel: CancellationToken,
|
||||
|
||||
@@ -323,6 +323,42 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_info: TimelineInfo,
|
||||
) -> anyhow::Result<()> {
|
||||
const BACKOFF: Duration = Duration::from_secs(5);
|
||||
|
||||
loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
anyhow::bail!("Shut down requested while finalizing import");
|
||||
}
|
||||
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(_) => {
|
||||
tracing::info!("Timeline created on safekeepers");
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to create timeline on safekeepers: {err}");
|
||||
tokio::select! {
|
||||
_ = self.cancel.cancelled() => {
|
||||
anyhow::bail!("Shut down requested while finalizing import");
|
||||
},
|
||||
_ = tokio::time::sleep(BACKOFF) => {}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Directly insert the timeline into the database without reconciling it with safekeepers.
|
||||
///
|
||||
/// Useful if the timeline already exists on the specified safekeepers,
|
||||
|
||||
@@ -165,16 +165,17 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
.head_object(&path, &CancellationToken::new())
|
||||
.await;
|
||||
|
||||
if response.is_err() {
|
||||
if let Err(e) = response {
|
||||
// Object is not present.
|
||||
let is_l0 = LayerMap::is_l0(layer.key_range(), layer.is_delta());
|
||||
|
||||
let msg = format!(
|
||||
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {})",
|
||||
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {}) with error: {}",
|
||||
layer,
|
||||
metadata.generation.get_suffix(),
|
||||
metadata.shard,
|
||||
is_l0,
|
||||
e,
|
||||
);
|
||||
|
||||
if is_l0 || ignore_error {
|
||||
@@ -354,6 +355,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
match res {
|
||||
ListTimelineBlobsResult::Ready(data) => Ok(data),
|
||||
ListTimelineBlobsResult::MissingIndexPart(_) => {
|
||||
tracing::warn!("listing raced with removal of an index, retrying");
|
||||
// Retry if listing raced with removal of an index
|
||||
let data = list_timeline_blobs_impl(remote_client, id, root_target)
|
||||
.await?
|
||||
@@ -440,7 +442,7 @@ async fn list_timeline_blobs_impl(
|
||||
}
|
||||
|
||||
if index_part_keys.is_empty() && s3_layers.is_empty() {
|
||||
tracing::debug!("Timeline is empty: expected post-deletion state.");
|
||||
tracing::info!("Timeline is empty: expected post-deletion state.");
|
||||
if initdb_archive {
|
||||
tracing::info!("Timeline is post deletion but initdb archive is still present.");
|
||||
}
|
||||
|
||||
@@ -137,11 +137,10 @@ struct TenantRefAccumulator {
|
||||
impl TenantRefAccumulator {
|
||||
fn update(&mut self, ttid: TenantShardTimelineId, index_part: &IndexPart) {
|
||||
let this_shard_idx = ttid.tenant_shard_id.to_index();
|
||||
(*self
|
||||
.shards_seen
|
||||
self.shards_seen
|
||||
.entry(ttid.tenant_shard_id.tenant_id)
|
||||
.or_default())
|
||||
.insert(this_shard_idx);
|
||||
.or_default()
|
||||
.insert(this_shard_idx);
|
||||
|
||||
let mut ancestor_refs = Vec::new();
|
||||
for (layer_name, layer_metadata) in &index_part.layer_metadata {
|
||||
@@ -594,6 +593,7 @@ async fn gc_timeline(
|
||||
index_part_snapshot_time: _,
|
||||
} => (index_part, *index_part_generation, data.unused_index_keys),
|
||||
BlobDataParseResult::Relic => {
|
||||
tracing::info!("Skipping timeline {ttid}, it is a relic");
|
||||
// Post-deletion tenant location: don't try and GC it.
|
||||
return Ok(summary);
|
||||
}
|
||||
@@ -767,10 +767,13 @@ pub async fn pageserver_physical_gc(
|
||||
stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id).await?,
|
||||
);
|
||||
Ok(try_stream! {
|
||||
let mut cnt = 0;
|
||||
while let Some(ttid_res) = timelines.next().await {
|
||||
let ttid = ttid_res?;
|
||||
cnt += 1;
|
||||
yield (ttid, tenant_manifest_arc.clone());
|
||||
}
|
||||
tracing::info!(%tenant_shard_id, "Found {} timelines", cnt);
|
||||
})
|
||||
}
|
||||
});
|
||||
@@ -790,6 +793,7 @@ pub async fn pageserver_physical_gc(
|
||||
&accumulator,
|
||||
tenant_manifest_arc,
|
||||
)
|
||||
.instrument(info_span!("gc_timeline", %ttid))
|
||||
});
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
let mut timelines = std::pin::pin!(timelines);
|
||||
|
||||
@@ -153,7 +153,10 @@ pub async fn scan_pageserver_metadata(
|
||||
const CONCURRENCY: usize = 32;
|
||||
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
|
||||
let timelines = tenants.map_ok(|t| {
|
||||
tracing::info!("Found tenant: {}", t);
|
||||
stream_tenant_timelines(&remote_client, &target, t)
|
||||
});
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
let timelines = timelines.try_flatten();
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ pub struct SnapshotDownloader {
|
||||
remote_client: GenericRemoteStorage,
|
||||
#[allow(dead_code)]
|
||||
target: RootTarget,
|
||||
bucket_config: BucketConfig,
|
||||
tenant_id: TenantId,
|
||||
output_path: Utf8PathBuf,
|
||||
concurrency: usize,
|
||||
@@ -43,7 +42,6 @@ impl SnapshotDownloader {
|
||||
Ok(Self {
|
||||
remote_client,
|
||||
target,
|
||||
bucket_config,
|
||||
tenant_id,
|
||||
output_path,
|
||||
concurrency,
|
||||
@@ -218,11 +216,9 @@ impl SnapshotDownloader {
|
||||
}
|
||||
|
||||
pub async fn download(&self) -> anyhow::Result<()> {
|
||||
let (remote_client, target) =
|
||||
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
|
||||
// Generate a stream of TenantShardId
|
||||
let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
|
||||
let shards =
|
||||
stream_tenant_shards(&self.remote_client, &self.target, self.tenant_id).await?;
|
||||
let shards: Vec<TenantShardId> = shards.try_collect().await?;
|
||||
|
||||
// Only read from shards that have the highest count: avoids redundantly downloading
|
||||
@@ -240,7 +236,8 @@ impl SnapshotDownloader {
|
||||
|
||||
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?;
|
||||
let timelines =
|
||||
stream_tenant_timelines(&self.remote_client, &self.target, shard).await?;
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
async fn load_timeline_index(
|
||||
@@ -251,8 +248,8 @@ impl SnapshotDownloader {
|
||||
let data = list_timeline_blobs(remote_client, ttid, target).await?;
|
||||
Ok((ttid, data))
|
||||
}
|
||||
let timelines =
|
||||
timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid));
|
||||
let timelines = timelines
|
||||
.map_ok(|ttid| load_timeline_index(&self.remote_client, &self.target, ttid));
|
||||
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
|
||||
|
||||
while let Some(i) = timelines.next().await {
|
||||
|
||||
@@ -557,7 +557,7 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
endpoint_id: str,
|
||||
safekeepers_generation: int | None = None,
|
||||
safekeepers: list[int] | None = None,
|
||||
remote_ext_config: str | None = None,
|
||||
remote_ext_base_url: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
allow_multiple: bool = False,
|
||||
create_test_user: bool = False,
|
||||
@@ -572,8 +572,8 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
extra_env_vars = env or {}
|
||||
if basebackup_request_tries is not None:
|
||||
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
|
||||
if remote_ext_config is not None:
|
||||
args.extend(["--remote-ext-config", remote_ext_config])
|
||||
if remote_ext_base_url is not None:
|
||||
args.extend(["--remote-ext-base-url", remote_ext_base_url])
|
||||
|
||||
if safekeepers_generation is not None:
|
||||
args.extend(["--safekeepers-generation", str(safekeepers_generation)])
|
||||
|
||||
@@ -1274,6 +1274,8 @@ class NeonEnv:
|
||||
|
||||
if self.pageserver_virtual_file_io_engine is not None:
|
||||
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
|
||||
if self.pageserver_virtual_file_io_mode is not None:
|
||||
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
|
||||
if config.pageserver_default_tenant_config_compaction_algorithm is not None:
|
||||
tenant_config = ps_cfg.setdefault("tenant_config", {})
|
||||
tenant_config["compaction_algorithm"] = (
|
||||
@@ -1299,13 +1301,6 @@ class NeonEnv:
|
||||
for key, value in override.items():
|
||||
ps_cfg[key] = value
|
||||
|
||||
if self.pageserver_virtual_file_io_mode is not None:
|
||||
# TODO(christian): https://github.com/neondatabase/neon/issues/11598
|
||||
if not config.test_may_use_compatibility_snapshot_binaries:
|
||||
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
|
||||
else:
|
||||
log.info("ignoring virtual_file_io_mode parametrization for compatibility test")
|
||||
|
||||
if self.pageserver_wal_receiver_protocol is not None:
|
||||
key, value = PageserverWalReceiverProtocol.to_config_key_value(
|
||||
self.pageserver_wal_receiver_protocol
|
||||
@@ -1409,30 +1404,6 @@ class NeonEnv:
|
||||
for f in futs:
|
||||
f.result()
|
||||
|
||||
# Last step: register safekeepers at the storage controller
|
||||
if (
|
||||
self.storage_controller_config is not None
|
||||
and self.storage_controller_config.get("timelines_onto_safekeepers") is True
|
||||
):
|
||||
for sk_id, sk in enumerate(self.safekeepers):
|
||||
# 0 is an invalid safekeeper id
|
||||
sk_id = sk_id + 1
|
||||
body = {
|
||||
"id": sk_id,
|
||||
"created_at": "2023-10-25T09:11:25Z",
|
||||
"updated_at": "2024-08-28T11:32:43Z",
|
||||
"region_id": "aws-us-east-2",
|
||||
"host": "127.0.0.1",
|
||||
"port": sk.port.pg,
|
||||
"http_port": sk.port.http,
|
||||
"https_port": None,
|
||||
"version": 5957,
|
||||
"availability_zone_id": f"us-east-2b-{sk_id}",
|
||||
}
|
||||
|
||||
self.storage_controller.on_safekeeper_deploy(sk_id, body)
|
||||
self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active")
|
||||
|
||||
self.endpoint_storage.start(timeout_in_seconds=timeout_in_seconds)
|
||||
|
||||
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
|
||||
@@ -3636,6 +3607,8 @@ class NeonProxy(PgProtocol):
|
||||
http_port: int,
|
||||
mgmt_port: int,
|
||||
external_http_port: int,
|
||||
router_port: int,
|
||||
router_tls_port: int,
|
||||
auth_backend: NeonProxy.AuthBackend,
|
||||
metric_collection_endpoint: str | None = None,
|
||||
metric_collection_interval: str | None = None,
|
||||
@@ -3652,6 +3625,8 @@ class NeonProxy(PgProtocol):
|
||||
self.test_output_dir = test_output_dir
|
||||
self.proxy_port = proxy_port
|
||||
self.mgmt_port = mgmt_port
|
||||
self.router_port = router_port
|
||||
self.router_tls_port = router_tls_port
|
||||
self.auth_backend = auth_backend
|
||||
self.metric_collection_endpoint = metric_collection_endpoint
|
||||
self.metric_collection_interval = metric_collection_interval
|
||||
@@ -3666,6 +3641,14 @@ class NeonProxy(PgProtocol):
|
||||
key_path = self.test_output_dir / "proxy.key"
|
||||
generate_proxy_tls_certs("*.local.neon.build", key_path, crt_path)
|
||||
|
||||
# generate key for pg-sni-router.
|
||||
# endpoint.namespace.local.neon.build resolves to 127.0.0.1
|
||||
generate_proxy_tls_certs(
|
||||
"endpoint.namespace.local.neon.build",
|
||||
self.test_output_dir / "router.key",
|
||||
self.test_output_dir / "router.crt",
|
||||
)
|
||||
|
||||
args = [
|
||||
str(self.neon_binpath / "proxy"),
|
||||
*["--http", f"{self.host}:{self.http_port}"],
|
||||
@@ -3675,6 +3658,11 @@ class NeonProxy(PgProtocol):
|
||||
*["--sql-over-http-timeout", f"{self.http_timeout_seconds}s"],
|
||||
*["-c", str(crt_path)],
|
||||
*["-k", str(key_path)],
|
||||
*["--sni-router-listen", f"{self.host}:{self.router_port}"],
|
||||
*["--sni-router-listen-tls", f"{self.host}:{self.router_tls_port}"],
|
||||
*["--sni-router-tls-cert", str(self.test_output_dir / "router.crt")],
|
||||
*["--sni-router-tls-key", str(self.test_output_dir / "router.key")],
|
||||
*["--sni-router-destination", "local.neon.build"],
|
||||
*self.auth_backend.extra_args(),
|
||||
]
|
||||
|
||||
@@ -3866,7 +3854,7 @@ class NeonAuthBroker:
|
||||
external_http_port: int,
|
||||
auth_backend: NeonAuthBroker.ProxyV1,
|
||||
):
|
||||
self.domain = "apiauth.local.neon.build" # resolves to 127.0.0.1
|
||||
self.domain = "local.neon.build" # resolves to 127.0.0.1
|
||||
self.host = "127.0.0.1"
|
||||
self.http_port = http_port
|
||||
self.external_http_port = external_http_port
|
||||
@@ -3883,7 +3871,7 @@ class NeonAuthBroker:
|
||||
# generate key of it doesn't exist
|
||||
crt_path = self.test_output_dir / "proxy.crt"
|
||||
key_path = self.test_output_dir / "proxy.key"
|
||||
generate_proxy_tls_certs("apiauth.local.neon.build", key_path, crt_path)
|
||||
generate_proxy_tls_certs(f"apiauth.{self.domain}", key_path, crt_path)
|
||||
|
||||
args = [
|
||||
str(self.neon_binpath / "proxy"),
|
||||
@@ -3927,10 +3915,10 @@ class NeonAuthBroker:
|
||||
|
||||
log.info(f"Executing http query: {query}")
|
||||
|
||||
connstr = f"postgresql://{user}@{self.domain}/postgres"
|
||||
connstr = f"postgresql://{user}@ep-foo-bar-1234.{self.domain}/postgres"
|
||||
async with httpx.AsyncClient(verify=str(self.test_output_dir / "proxy.crt")) as client:
|
||||
response = await client.post(
|
||||
f"https://{self.domain}:{self.external_http_port}/sql",
|
||||
f"https://apiauth.{self.domain}:{self.external_http_port}/sql",
|
||||
json={"query": query, "params": args},
|
||||
headers={
|
||||
"Neon-Connection-String": connstr,
|
||||
@@ -3974,6 +3962,8 @@ def link_proxy(
|
||||
proxy_port = port_distributor.get_port()
|
||||
mgmt_port = port_distributor.get_port()
|
||||
external_http_port = port_distributor.get_port()
|
||||
router_port = port_distributor.get_port()
|
||||
router_tls_port = port_distributor.get_port()
|
||||
|
||||
with NeonProxy(
|
||||
neon_binpath=neon_binpath,
|
||||
@@ -3981,6 +3971,8 @@ def link_proxy(
|
||||
proxy_port=proxy_port,
|
||||
http_port=http_port,
|
||||
mgmt_port=mgmt_port,
|
||||
router_port=router_port,
|
||||
router_tls_port=router_tls_port,
|
||||
external_http_port=external_http_port,
|
||||
auth_backend=NeonProxy.Link(),
|
||||
) as proxy:
|
||||
@@ -4014,6 +4006,8 @@ def static_proxy(
|
||||
mgmt_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
external_http_port = port_distributor.get_port()
|
||||
router_port = port_distributor.get_port()
|
||||
router_tls_port = port_distributor.get_port()
|
||||
|
||||
with NeonProxy(
|
||||
neon_binpath=neon_binpath,
|
||||
@@ -4021,6 +4015,8 @@ def static_proxy(
|
||||
proxy_port=proxy_port,
|
||||
http_port=http_port,
|
||||
mgmt_port=mgmt_port,
|
||||
router_port=router_port,
|
||||
router_tls_port=router_tls_port,
|
||||
external_http_port=external_http_port,
|
||||
auth_backend=NeonProxy.Postgres(auth_endpoint),
|
||||
) as proxy:
|
||||
@@ -4226,7 +4222,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
def start(
|
||||
self,
|
||||
remote_ext_config: str | None = None,
|
||||
remote_ext_base_url: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
safekeeper_generation: int | None = None,
|
||||
safekeepers: list[int] | None = None,
|
||||
@@ -4252,7 +4248,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self.endpoint_id,
|
||||
safekeepers_generation=safekeeper_generation,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
remote_ext_base_url=remote_ext_base_url,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
create_test_user=create_test_user,
|
||||
@@ -4467,7 +4463,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
hot_standby: bool = False,
|
||||
lsn: Lsn | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
remote_ext_config: str | None = None,
|
||||
remote_ext_base_url: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
allow_multiple: bool = False,
|
||||
basebackup_request_tries: int | None = None,
|
||||
@@ -4486,7 +4482,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
).start(
|
||||
remote_ext_config=remote_ext_config,
|
||||
remote_ext_base_url=remote_ext_base_url,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
@@ -4570,7 +4566,7 @@ class EndpointFactory:
|
||||
lsn: Lsn | None = None,
|
||||
hot_standby: bool = False,
|
||||
config_lines: list[str] | None = None,
|
||||
remote_ext_config: str | None = None,
|
||||
remote_ext_base_url: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
basebackup_request_tries: int | None = None,
|
||||
) -> Endpoint:
|
||||
@@ -4590,7 +4586,7 @@ class EndpointFactory:
|
||||
hot_standby=hot_standby,
|
||||
config_lines=config_lines,
|
||||
lsn=lsn,
|
||||
remote_ext_config=remote_ext_config,
|
||||
remote_ext_base_url=remote_ext_base_url,
|
||||
pageserver_id=pageserver_id,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
)
|
||||
@@ -4644,7 +4640,10 @@ class EndpointFactory:
|
||||
return self
|
||||
|
||||
def new_replica(
|
||||
self, origin: Endpoint, endpoint_id: str, config_lines: list[str] | None = None
|
||||
self,
|
||||
origin: Endpoint,
|
||||
endpoint_id: str | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
@@ -4660,7 +4659,10 @@ class EndpointFactory:
|
||||
)
|
||||
|
||||
def new_replica_start(
|
||||
self, origin: Endpoint, endpoint_id: str, config_lines: list[str] | None = None
|
||||
self,
|
||||
origin: Endpoint,
|
||||
endpoint_id: str | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
@@ -5477,6 +5479,13 @@ def wait_for_last_flush_lsn(
|
||||
|
||||
if last_flush_lsn is None:
|
||||
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
# The last_flush_lsn may not correspond to a record boundary.
|
||||
# For example, if the compute flushed WAL on a page boundary,
|
||||
# the remaining part of the record might not be flushed for a long time.
|
||||
# This would prevent the pageserver from reaching last_flush_lsn promptly.
|
||||
# To ensure the rest of the record reaches the pageserver quickly,
|
||||
# we forcibly flush the WAL by using CHECKPOINT.
|
||||
endpoint.safe_psql("CHECKPOINT")
|
||||
|
||||
results = []
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
|
||||
@@ -111,6 +111,13 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
".*stalling layer flushes for compaction backpressure.*",
|
||||
".*layer roll waiting for flush due to compaction backpressure.*",
|
||||
".*BatchSpanProcessor.*",
|
||||
*(
|
||||
[
|
||||
r".*your platform is not a supported production platform, ignoing request for O_DIRECT; this could hide alignment bugs.*"
|
||||
]
|
||||
if sys.platform != "linux"
|
||||
else []
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -122,6 +129,10 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
|
||||
".*Call to node.*management API.*failed.*Timeout.*",
|
||||
".*Failed to update node .+ after heartbeat round.*error sending request for url.*",
|
||||
".*background_reconcile: failed to fetch top tenants:.*client error \\(Connect\\).*",
|
||||
# Many tests will take safekeepers offline
|
||||
".*Call to safekeeper.*management API.*failed.*receive body.*",
|
||||
".*Call to safekeeper.*management API.*failed.*ReceiveBody.*",
|
||||
".*Call to safekeeper.*management API.*failed.*Timeout.*",
|
||||
# Many tests will start up with a node offline
|
||||
".*startup_reconcile: Could not scan node.*",
|
||||
# Tests run in dev mode
|
||||
|
||||
@@ -544,3 +544,69 @@ def test_drop_role_with_table_privileges_from_non_neon_superuser(neon_simple_env
|
||||
)
|
||||
role = cursor.fetchone()
|
||||
assert role is None
|
||||
|
||||
|
||||
def test_db_with_custom_settings(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that compute_ctl can work with databases that have some custom settings.
|
||||
For example, role=some_other_role, default_transaction_read_only=on,
|
||||
search_path=non_public_schema, statement_timeout=1 (1ms).
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
TEST_ROLE = "some_other_role"
|
||||
TEST_DB = "db_with_custom_settings"
|
||||
TEST_SCHEMA = "non_public_schema"
|
||||
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"databases": [
|
||||
{
|
||||
"name": TEST_DB,
|
||||
"owner": TEST_ROLE,
|
||||
}
|
||||
],
|
||||
"roles": [
|
||||
{
|
||||
"name": TEST_ROLE,
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
endpoint.reconfigure()
|
||||
|
||||
with endpoint.cursor(dbname=TEST_DB) as cursor:
|
||||
cursor.execute(f"CREATE SCHEMA {TEST_SCHEMA}")
|
||||
cursor.execute(f"ALTER DATABASE {TEST_DB} SET role = {TEST_ROLE}")
|
||||
cursor.execute(f"ALTER DATABASE {TEST_DB} SET default_transaction_read_only = on")
|
||||
cursor.execute(f"ALTER DATABASE {TEST_DB} SET search_path = {TEST_SCHEMA}")
|
||||
cursor.execute(f"ALTER DATABASE {TEST_DB} SET statement_timeout = 1")
|
||||
|
||||
with endpoint.cursor(dbname=TEST_DB) as cursor:
|
||||
cursor.execute("SELECT current_role")
|
||||
role = cursor.fetchone()
|
||||
assert role is not None
|
||||
assert role[0] == TEST_ROLE
|
||||
|
||||
cursor.execute("SHOW default_transaction_read_only")
|
||||
default_transaction_read_only = cursor.fetchone()
|
||||
assert default_transaction_read_only is not None
|
||||
assert default_transaction_read_only[0] == "on"
|
||||
|
||||
cursor.execute("SHOW search_path")
|
||||
search_path = cursor.fetchone()
|
||||
assert search_path is not None
|
||||
assert search_path[0] == TEST_SCHEMA
|
||||
|
||||
# Do not check statement_timeout, because we force it to 2min
|
||||
# in `endpoint.cursor()` fixture.
|
||||
|
||||
endpoint.reconfigure()
|
||||
|
||||
@@ -221,7 +221,7 @@ def test_remote_extensions(
|
||||
|
||||
endpoint.create_remote_extension_spec(spec)
|
||||
|
||||
endpoint.start(remote_ext_config=extensions_endpoint)
|
||||
endpoint.start(remote_ext_base_url=extensions_endpoint)
|
||||
|
||||
with endpoint.connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -249,7 +249,7 @@ def test_remote_extensions(
|
||||
# Remove the extension files to force a redownload of the extension.
|
||||
extension.remove(test_output_dir, pg_version)
|
||||
|
||||
endpoint.start(remote_ext_config=extensions_endpoint)
|
||||
endpoint.start(remote_ext_base_url=extensions_endpoint)
|
||||
|
||||
# Test that ALTER EXTENSION UPDATE statements also fetch remote extensions.
|
||||
with endpoint.connect() as conn:
|
||||
|
||||
@@ -24,6 +24,7 @@ from fixtures.utils import (
|
||||
skip_in_debug_build,
|
||||
wait_until,
|
||||
)
|
||||
from fixtures.workload import Workload
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
|
||||
from mypy_boto3_s3 import S3Client
|
||||
@@ -97,6 +98,10 @@ def test_pgdata_import_smoke(
|
||||
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
|
||||
)
|
||||
|
||||
if neon_env_builder.storage_controller_config is None:
|
||||
neon_env_builder.storage_controller_config = {}
|
||||
neon_env_builder.storage_controller_config["timelines_onto_safekeepers"] = True
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# The test needs LocalFs support, which is only built in testing mode.
|
||||
@@ -286,34 +291,28 @@ def test_pgdata_import_smoke(
|
||||
#
|
||||
# validate that we can write
|
||||
#
|
||||
rw_endpoint = env.endpoints.create_start(
|
||||
branch_name=import_branch_name,
|
||||
endpoint_id="rw",
|
||||
tenant_id=tenant_id,
|
||||
config_lines=ep_config,
|
||||
)
|
||||
rw_endpoint.safe_psql("create table othertable(values text)")
|
||||
rw_lsn = Lsn(rw_endpoint.safe_psql_scalar("select pg_current_wal_flush_lsn()"))
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name=import_branch_name)
|
||||
workload.init()
|
||||
workload.write_rows(64)
|
||||
workload.validate()
|
||||
|
||||
# TODO: consider using `class Workload` here
|
||||
# to do compaction and whatnot?
|
||||
rw_lsn = Lsn(workload.endpoint().safe_psql_scalar("select pg_current_wal_flush_lsn()"))
|
||||
|
||||
#
|
||||
# validate that we can branch (important use case)
|
||||
#
|
||||
|
||||
# ... at the tip
|
||||
_ = env.create_branch(
|
||||
child_timeline_id = env.create_branch(
|
||||
new_branch_name="br-tip",
|
||||
ancestor_branch_name=import_branch_name,
|
||||
tenant_id=tenant_id,
|
||||
ancestor_start_lsn=rw_lsn,
|
||||
)
|
||||
br_tip_endpoint = env.endpoints.create_start(
|
||||
branch_name="br-tip", endpoint_id="br-tip-ro", tenant_id=tenant_id, config_lines=ep_config
|
||||
)
|
||||
validate_vanilla_equivalence(br_tip_endpoint)
|
||||
br_tip_endpoint.safe_psql("select * from othertable")
|
||||
child_workload = workload.branch(timeline_id=child_timeline_id, branch_name="br-tip")
|
||||
child_workload.validate()
|
||||
|
||||
validate_vanilla_equivalence(child_workload.endpoint())
|
||||
|
||||
# ... at the initdb lsn
|
||||
_ = env.create_branch(
|
||||
@@ -330,7 +329,7 @@ def test_pgdata_import_smoke(
|
||||
)
|
||||
validate_vanilla_equivalence(br_initdb_endpoint)
|
||||
with pytest.raises(psycopg2.errors.UndefinedTable):
|
||||
br_initdb_endpoint.safe_psql("select * from othertable")
|
||||
br_initdb_endpoint.safe_psql(f"select * from {workload.table}")
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason="PG version is irrelevant here")
|
||||
@@ -641,6 +640,55 @@ def test_fast_import_binary(
|
||||
assert res[0][0] == 10
|
||||
|
||||
|
||||
def test_fast_import_event_triggers(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
port_distributor: PortDistributor,
|
||||
fast_import: FastImport,
|
||||
):
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("""
|
||||
CREATE FUNCTION test_event_trigger_for_drops()
|
||||
RETURNS event_trigger LANGUAGE plpgsql AS $$
|
||||
DECLARE
|
||||
obj record;
|
||||
BEGIN
|
||||
FOR obj IN SELECT * FROM pg_event_trigger_dropped_objects()
|
||||
LOOP
|
||||
RAISE NOTICE '% dropped object: % %.% %',
|
||||
tg_tag,
|
||||
obj.object_type,
|
||||
obj.schema_name,
|
||||
obj.object_name,
|
||||
obj.object_identity;
|
||||
END LOOP;
|
||||
END
|
||||
$$;
|
||||
|
||||
CREATE EVENT TRIGGER test_event_trigger_for_drops
|
||||
ON sql_drop
|
||||
EXECUTE PROCEDURE test_event_trigger_for_drops();
|
||||
""")
|
||||
|
||||
pg_port = port_distributor.get_port()
|
||||
p = fast_import.run_pgdata(pg_port=pg_port, source_connection_string=vanilla_pg.connstr())
|
||||
assert p.returncode == 0
|
||||
|
||||
vanilla_pg.stop()
|
||||
|
||||
pgbin = PgBin(test_output_dir, fast_import.pg_distrib_dir, fast_import.pg_version)
|
||||
with VanillaPostgres(
|
||||
fast_import.workdir / "pgdata", pgbin, pg_port, False
|
||||
) as new_pgdata_vanilla_pg:
|
||||
new_pgdata_vanilla_pg.start()
|
||||
|
||||
# database name and user are hardcoded in fast_import binary, and they are different from normal vanilla postgres
|
||||
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
|
||||
res = conn.safe_psql("SELECT count(*) FROM pg_event_trigger;")
|
||||
log.info(f"Result: {res}")
|
||||
assert res[0][0] == 0, f"Neon does not support importing event triggers, got: {res[0][0]}"
|
||||
|
||||
|
||||
def test_fast_import_restore_to_connstring(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
|
||||
@@ -52,6 +52,8 @@ def proxy_with_metric_collector(
|
||||
proxy_port = port_distributor.get_port()
|
||||
mgmt_port = port_distributor.get_port()
|
||||
external_http_port = port_distributor.get_port()
|
||||
router_port = port_distributor.get_port()
|
||||
router_tls_port = port_distributor.get_port()
|
||||
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
@@ -63,6 +65,8 @@ def proxy_with_metric_collector(
|
||||
proxy_port=proxy_port,
|
||||
http_port=http_port,
|
||||
mgmt_port=mgmt_port,
|
||||
router_port=router_port,
|
||||
router_tls_port=router_tls_port,
|
||||
external_http_port=external_http_port,
|
||||
metric_collection_endpoint=metric_collection_endpoint,
|
||||
metric_collection_interval=metric_collection_interval,
|
||||
|
||||
@@ -39,3 +39,10 @@ def test_role_grants(neon_simple_env: NeonEnv):
|
||||
res = cur.fetchall()
|
||||
|
||||
assert res == [(1,)], "select should not succeed"
|
||||
|
||||
# confirm that replicas can also ensure the grants are correctly set.
|
||||
replica = env.endpoints.new_replica_start(endpoint)
|
||||
replica_client = replica.http_client()
|
||||
replica_client.set_role_grants(
|
||||
"test_role_grants", "test_role", "test_schema", ["CREATE", "USAGE"]
|
||||
)
|
||||
|
||||
@@ -6,7 +6,7 @@ from typing import TYPE_CHECKING
|
||||
|
||||
import backoff
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import PgProtocol, VanillaPostgres
|
||||
from fixtures.neon_fixtures import NeonProxy, PgProtocol, VanillaPostgres
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
@@ -41,6 +41,7 @@ class PgSniRouter(PgProtocol):
|
||||
self,
|
||||
neon_binpath: Path,
|
||||
port: int,
|
||||
tls_port: int,
|
||||
destination: str,
|
||||
tls_cert: Path,
|
||||
tls_key: Path,
|
||||
@@ -53,6 +54,7 @@ class PgSniRouter(PgProtocol):
|
||||
self.host = host
|
||||
self.neon_binpath = neon_binpath
|
||||
self.port = port
|
||||
self.tls_port = tls_port
|
||||
self.destination = destination
|
||||
self.tls_cert = tls_cert
|
||||
self.tls_key = tls_key
|
||||
@@ -64,6 +66,7 @@ class PgSniRouter(PgProtocol):
|
||||
args = [
|
||||
str(self.neon_binpath / "pg_sni_router"),
|
||||
*["--listen", f"127.0.0.1:{self.port}"],
|
||||
*["--listen-tls", f"127.0.0.1:{self.tls_port}"],
|
||||
*["--tls-cert", str(self.tls_cert)],
|
||||
*["--tls-key", str(self.tls_key)],
|
||||
*["--destination", self.destination],
|
||||
@@ -127,10 +130,12 @@ def test_pg_sni_router(
|
||||
pg_port = vanilla_pg.default_options["port"]
|
||||
|
||||
router_port = port_distributor.get_port()
|
||||
router_tls_port = port_distributor.get_port()
|
||||
|
||||
with PgSniRouter(
|
||||
neon_binpath=neon_binpath,
|
||||
port=router_port,
|
||||
tls_port=router_tls_port,
|
||||
destination="local.neon.build",
|
||||
tls_cert=test_output_dir / "router.crt",
|
||||
tls_key=test_output_dir / "router.key",
|
||||
@@ -146,3 +151,22 @@ def test_pg_sni_router(
|
||||
hostaddr="127.0.0.1",
|
||||
)
|
||||
assert out[0][0] == 1
|
||||
|
||||
|
||||
def test_pg_sni_router_in_proxy(
|
||||
static_proxy: NeonProxy,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
):
|
||||
# static_proxy starts this.
|
||||
assert vanilla_pg.is_running()
|
||||
pg_port = vanilla_pg.default_options["port"]
|
||||
|
||||
out = static_proxy.safe_psql(
|
||||
"select 1",
|
||||
dbname="postgres",
|
||||
sslmode="require",
|
||||
host=f"endpoint--namespace--{pg_port}.local.neon.build",
|
||||
hostaddr="127.0.0.1",
|
||||
port=static_proxy.router_port,
|
||||
)
|
||||
assert out[0][0] == 1
|
||||
|
||||
@@ -1822,7 +1822,7 @@ def test_timeline_detach_with_aux_files_with_detach_v1(
|
||||
endpoint2.safe_psql(
|
||||
"SELECT pg_create_logical_replication_slot('test_slot_restore', 'pgoutput')"
|
||||
)
|
||||
lsn3 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
|
||||
lsn3 = wait_for_last_flush_lsn(env, endpoint2, env.initial_tenant, branch_timeline_id)
|
||||
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
|
||||
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn3).keys()) == set(
|
||||
["pg_replslot/test_slot_restore/state"]
|
||||
@@ -1839,7 +1839,7 @@ def test_timeline_detach_with_aux_files_with_detach_v1(
|
||||
assert all_reparented == set([])
|
||||
|
||||
# We need to ensure all safekeeper data are ingested before checking aux files: the API does not wait for LSN.
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
|
||||
wait_for_last_flush_lsn(env, endpoint2, env.initial_tenant, branch_timeline_id)
|
||||
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn2).keys()) == set(
|
||||
["pg_replslot/test_slot_parent_1/state", "pg_replslot/test_slot_parent_2/state"]
|
||||
), "main branch unaffected"
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: c8dab02bfc...06b405bc98
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: b838c8969b...72f83df76c
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 05ddf212e2...d72d76f2cd
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: eab3a37834...0d59c91c1a
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.4",
|
||||
"eab3a37834cac6ec0719bf817ac918a201712d66"
|
||||
"0d59c91c1a23e667f1d1169d5f040b3fa0a0ab44"
|
||||
],
|
||||
"v16": [
|
||||
"16.8",
|
||||
"05ddf212e2e07b788b5c8b88bdcf98630941f6ae"
|
||||
"d72d76f2cdee4194dd052ce099e9784aca7c794a"
|
||||
],
|
||||
"v15": [
|
||||
"15.12",
|
||||
"b838c8969b7c63f3e637a769656f5f36793b797c"
|
||||
"72f83df76c61ce18d81bd371f0afd2a43d59c052"
|
||||
],
|
||||
"v14": [
|
||||
"14.17",
|
||||
"c8dab02bfc003ae7bd59096919042d7840f3c194"
|
||||
"06b405bc982fd53522689aa4acbfd9c44b7993cf"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user