mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 12:40:36 +00:00
Compare commits
21 Commits
vlad/debug
...
mmeent/rep
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9d0ec4cab | ||
|
|
efb58f0743 | ||
|
|
d33fbeee06 | ||
|
|
684bf85ca2 | ||
|
|
5ef5bc6c2b | ||
|
|
e084c12ef6 | ||
|
|
9bc82d7ad2 | ||
|
|
279c6c0417 | ||
|
|
dc4388580e | ||
|
|
2d7a3a6f0f | ||
|
|
f7bdc138c2 | ||
|
|
2cb1e43604 | ||
|
|
cdc8057a7e | ||
|
|
a3d88258a7 | ||
|
|
a5d45bceed | ||
|
|
d7cecc485c | ||
|
|
b94054dca0 | ||
|
|
41adde29d7 | ||
|
|
45649ccd62 | ||
|
|
19c5eb53e5 | ||
|
|
397e030fb0 |
@@ -52,7 +52,7 @@ permissions:
|
||||
|
||||
jobs:
|
||||
build-neon:
|
||||
runs-on: ${{ fromJSON(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large-debug-seccomp')) }}
|
||||
runs-on: ${{ fromJSON(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
permissions:
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
contents: read
|
||||
|
||||
36
Cargo.lock
generated
36
Cargo.lock
generated
@@ -4321,7 +4321,6 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_compaction",
|
||||
"pageserver_page_api",
|
||||
"pem",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
@@ -4330,7 +4329,6 @@ dependencies = [
|
||||
"postgres_connection",
|
||||
"postgres_ffi",
|
||||
"postgres_initdb",
|
||||
"posthog_client_lite",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"procfs",
|
||||
@@ -4365,8 +4363,6 @@ dependencies = [
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tonic 0.13.1",
|
||||
"tonic-reflection",
|
||||
"tracing",
|
||||
"tracing-utils",
|
||||
"twox-hash",
|
||||
@@ -4459,15 +4455,9 @@ dependencies = [
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"pageserver_api",
|
||||
"postgres_ffi",
|
||||
"prost 0.13.5",
|
||||
"smallvec",
|
||||
"thiserror 1.0.69",
|
||||
"tonic 0.13.1",
|
||||
"tonic-build",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -4908,16 +4898,11 @@ name = "posthog_client_lite"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -7535,10 +7520,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"h2 0.4.4",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
@@ -7549,7 +7532,6 @@ dependencies = [
|
||||
"pin-project",
|
||||
"prost 0.13.5",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.2",
|
||||
"tokio-stream",
|
||||
@@ -7573,19 +7555,6 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic-reflection"
|
||||
version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9687bd5bfeafebdded2356950f278bba8226f0b32109537c4253406e09aafe1"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.3",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic 0.13.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.13"
|
||||
@@ -8557,8 +8526,6 @@ dependencies = [
|
||||
"ahash",
|
||||
"anstream",
|
||||
"anyhow",
|
||||
"axum",
|
||||
"axum-core",
|
||||
"base64 0.13.1",
|
||||
"base64 0.21.7",
|
||||
"base64ct",
|
||||
@@ -8581,8 +8548,10 @@ dependencies = [
|
||||
"fail",
|
||||
"form_urlencoded",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
"futures-io",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
"generic-array",
|
||||
"getrandom 0.2.11",
|
||||
@@ -8612,6 +8581,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"p256 0.13.2",
|
||||
"parquet",
|
||||
"percent-encoding",
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
"prost 0.13.5",
|
||||
|
||||
@@ -199,8 +199,7 @@ tokio-tar = "0.3"
|
||||
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
|
||||
toml = "0.8"
|
||||
toml_edit = "0.22"
|
||||
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
|
||||
tonic-reflection = { version = "0.13.1", features = ["server"] }
|
||||
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "tls-ring", "tls-native-roots"] }
|
||||
tower = { version = "0.5.2", default-features = false }
|
||||
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
|
||||
|
||||
@@ -247,7 +246,6 @@ azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rus
|
||||
## Local libraries
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
|
||||
desim = { version = "0.1", path = "./libs/desim" }
|
||||
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
|
||||
http-utils = { version = "0.1", path = "./libs/http-utils/" }
|
||||
metrics = { version = "0.1", path = "./libs/metrics/" }
|
||||
@@ -260,19 +258,19 @@ postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
|
||||
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
|
||||
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
|
||||
postgres_initdb = { path = "./libs/postgres_initdb" }
|
||||
posthog_client_lite = { version = "0.1", path = "./libs/posthog_client_lite" }
|
||||
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
|
||||
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
|
||||
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
|
||||
safekeeper_client = { path = "./safekeeper/client" }
|
||||
desim = { version = "0.1", path = "./libs/desim" }
|
||||
storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy.
|
||||
storage_controller_client = { path = "./storage_controller/client" }
|
||||
tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" }
|
||||
tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" }
|
||||
utils = { version = "0.1", path = "./libs/utils/" }
|
||||
vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" }
|
||||
wal_decoder = { version = "0.1", path = "./libs/wal_decoder" }
|
||||
walproposer = { version = "0.1", path = "./libs/walproposer/" }
|
||||
wal_decoder = { version = "0.1", path = "./libs/wal_decoder" }
|
||||
|
||||
## Common library dependency
|
||||
workspace_hack = { version = "0.1", path = "./workspace_hack/" }
|
||||
|
||||
@@ -1847,7 +1847,7 @@ COPY docker-compose/ext-src/ /ext-src/
|
||||
COPY --from=pg-build /postgres /postgres
|
||||
#COPY --from=postgis-src /ext-src/ /ext-src/
|
||||
COPY --from=plv8-src /ext-src/ /ext-src/
|
||||
COPY --from=h3-pg-src /ext-src/h3-pg-src /ext-src/h3-pg-src
|
||||
#COPY --from=h3-pg-src /ext-src/ /ext-src/
|
||||
COPY --from=postgresql-unit-src /ext-src/ /ext-src/
|
||||
COPY --from=pgvector-src /ext-src/ /ext-src/
|
||||
COPY --from=pgjwt-src /ext-src/ /ext-src/
|
||||
|
||||
@@ -136,10 +136,6 @@ struct Cli {
|
||||
requires = "compute-id"
|
||||
)]
|
||||
pub control_plane_uri: Option<String>,
|
||||
|
||||
/// Interval in seconds for collecting installed extensions statistics
|
||||
#[arg(long, default_value = "3600")]
|
||||
pub installed_extensions_collection_interval: u64,
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
@@ -183,7 +179,6 @@ fn main() -> Result<()> {
|
||||
cgroup: cli.cgroup,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor_addr: cli.vm_monitor_addr,
|
||||
installed_extensions_collection_interval: cli.installed_extensions_collection_interval,
|
||||
},
|
||||
config,
|
||||
)?;
|
||||
|
||||
@@ -97,9 +97,6 @@ pub struct ComputeNodeParams {
|
||||
|
||||
/// the address of extension storage proxy gateway
|
||||
pub remote_ext_base_url: Option<String>,
|
||||
|
||||
/// Interval for installed extensions collection
|
||||
pub installed_extensions_collection_interval: u64,
|
||||
}
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
@@ -698,18 +695,25 @@ impl ComputeNode {
|
||||
let log_directory_path = Path::new(&self.params.pgdata).join("log");
|
||||
let log_directory_path = log_directory_path.to_string_lossy().to_string();
|
||||
|
||||
// Add project_id,endpoint_id to identify the logs.
|
||||
// Add project_id,endpoint_id tag to identify the logs.
|
||||
//
|
||||
// These ids are passed from cplane,
|
||||
let endpoint_id = pspec.spec.endpoint_id.as_deref().unwrap_or("");
|
||||
let project_id = pspec.spec.project_id.as_deref().unwrap_or("");
|
||||
// for backwards compatibility (old computes that don't have them),
|
||||
// we set them to None.
|
||||
// TODO: Clean up this code when all computes have them.
|
||||
let tag: Option<String> = match (
|
||||
pspec.spec.project_id.as_deref(),
|
||||
pspec.spec.endpoint_id.as_deref(),
|
||||
) {
|
||||
(Some(project_id), Some(endpoint_id)) => {
|
||||
Some(format!("{project_id}/{endpoint_id}"))
|
||||
}
|
||||
(Some(project_id), None) => Some(format!("{project_id}/None")),
|
||||
(None, Some(endpoint_id)) => Some(format!("None,{endpoint_id}")),
|
||||
(None, None) => None,
|
||||
};
|
||||
|
||||
configure_audit_rsyslog(
|
||||
log_directory_path.clone(),
|
||||
endpoint_id,
|
||||
project_id,
|
||||
&remote_endpoint,
|
||||
)?;
|
||||
configure_audit_rsyslog(log_directory_path.clone(), tag, &remote_endpoint)?;
|
||||
|
||||
// Launch a background task to clean up the audit logs
|
||||
launch_pgaudit_gc(log_directory_path);
|
||||
@@ -745,7 +749,17 @@ impl ComputeNode {
|
||||
|
||||
let conf = self.get_tokio_conn_conf(None);
|
||||
tokio::task::spawn(async {
|
||||
let _ = installed_extensions(conf).await;
|
||||
let res = get_installed_extensions(conf).await;
|
||||
match res {
|
||||
Ok(extensions) => {
|
||||
info!(
|
||||
"[NEON_EXT_STAT] {}",
|
||||
serde_json::to_string(&extensions)
|
||||
.expect("failed to serialize extensions list")
|
||||
);
|
||||
}
|
||||
Err(err) => error!("could not get installed extensions: {err:?}"),
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -775,9 +789,6 @@ impl ComputeNode {
|
||||
// Log metrics so that we can search for slow operations in logs
|
||||
info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished");
|
||||
|
||||
// Spawn the extension stats background task
|
||||
self.spawn_extension_stats_task();
|
||||
|
||||
if pspec.spec.prewarm_lfc_on_startup {
|
||||
self.prewarm_lfc();
|
||||
}
|
||||
@@ -2188,41 +2199,6 @@ LIMIT 100",
|
||||
info!("Pageserver config changed");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_extension_stats_task(&self) {
|
||||
let conf = self.tokio_conn_conf.clone();
|
||||
let installed_extensions_collection_interval =
|
||||
self.params.installed_extensions_collection_interval;
|
||||
tokio::spawn(async move {
|
||||
// An initial sleep is added to ensure that two collections don't happen at the same time.
|
||||
// The first collection happens during compute startup.
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(
|
||||
installed_extensions_collection_interval,
|
||||
))
|
||||
.await;
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
|
||||
installed_extensions_collection_interval,
|
||||
));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let _ = installed_extensions(conf.clone()).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn installed_extensions(conf: tokio_postgres::Config) -> Result<()> {
|
||||
let res = get_installed_extensions(conf).await;
|
||||
match res {
|
||||
Ok(extensions) => {
|
||||
info!(
|
||||
"[NEON_EXT_STAT] {}",
|
||||
serde_json::to_string(&extensions).expect("failed to serialize extensions list")
|
||||
);
|
||||
}
|
||||
Err(err) => error!("could not get installed extensions: {err:?}"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn forward_termination_signal() {
|
||||
|
||||
@@ -2,24 +2,10 @@
|
||||
module(load="imfile")
|
||||
|
||||
# Input configuration for log files in the specified directory
|
||||
# The messages can be multiline. The start of the message is a timestamp
|
||||
# in "%Y-%m-%d %H:%M:%S.%3N GMT" (so timezone hardcoded).
|
||||
# Replace log_directory with the directory containing the log files
|
||||
input(type="imfile" File="{log_directory}/*.log"
|
||||
Tag="pgaudit_log" Severity="info" Facility="local5"
|
||||
startmsg.regex="^[[:digit:]]{{4}}-[[:digit:]]{{2}}-[[:digit:]]{{2}} [[:digit:]]{{2}}:[[:digit:]]{{2}}:[[:digit:]]{{2}}.[[:digit:]]{{3}} GMT,")
|
||||
|
||||
# Replace {log_directory} with the directory containing the log files
|
||||
input(type="imfile" File="{log_directory}/*.log" Tag="{tag}" Severity="info" Facility="local0")
|
||||
# the directory to store rsyslog state files
|
||||
global(workDirectory="/var/log/rsyslog")
|
||||
|
||||
# Construct json, endpoint_id and project_id as additional metadata
|
||||
set $.json_log!endpoint_id = "{endpoint_id}";
|
||||
set $.json_log!project_id = "{project_id}";
|
||||
set $.json_log!msg = $msg;
|
||||
|
||||
# Template suitable for rfc5424 syslog format
|
||||
template(name="PgAuditLog" type="string"
|
||||
string="<%PRI%>1 %TIMESTAMP:::date-rfc3339% %HOSTNAME% - - - - %$.json_log%")
|
||||
|
||||
# Forward to remote syslog receiver (@@<hostname>:<port>;format
|
||||
local5.info @@{remote_endpoint};PgAuditLog
|
||||
# Forward logs to remote syslog server
|
||||
*.* @@{remote_endpoint}
|
||||
|
||||
@@ -84,15 +84,13 @@ fn restart_rsyslog() -> Result<()> {
|
||||
|
||||
pub fn configure_audit_rsyslog(
|
||||
log_directory: String,
|
||||
endpoint_id: &str,
|
||||
project_id: &str,
|
||||
tag: Option<String>,
|
||||
remote_endpoint: &str,
|
||||
) -> Result<()> {
|
||||
let config_content: String = format!(
|
||||
include_str!("config_template/compute_audit_rsyslog_template.conf"),
|
||||
log_directory = log_directory,
|
||||
endpoint_id = endpoint_id,
|
||||
project_id = project_id,
|
||||
tag = tag.unwrap_or("".to_string()),
|
||||
remote_endpoint = remote_endpoint
|
||||
);
|
||||
|
||||
|
||||
@@ -2,10 +2,8 @@
|
||||
[pageserver]
|
||||
listen_pg_addr = '127.0.0.1:64000'
|
||||
listen_http_addr = '127.0.0.1:9898'
|
||||
listen_grpc_addr = '127.0.0.1:51051'
|
||||
pg_auth_type = 'Trust'
|
||||
http_auth_type = 'Trust'
|
||||
grpc_auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
id = 1
|
||||
|
||||
@@ -4,10 +4,8 @@
|
||||
id=1
|
||||
listen_pg_addr = '127.0.0.1:64000'
|
||||
listen_http_addr = '127.0.0.1:9898'
|
||||
listen_grpc_addr = '127.0.0.1:51051'
|
||||
pg_auth_type = 'Trust'
|
||||
http_auth_type = 'Trust'
|
||||
grpc_auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
id = 1
|
||||
|
||||
@@ -32,7 +32,6 @@ use control_plane::storage_controller::{
|
||||
};
|
||||
use nix::fcntl::{Flock, FlockArg};
|
||||
use pageserver_api::config::{
|
||||
DEFAULT_GRPC_LISTEN_PORT as DEFAULT_PAGESERVER_GRPC_PORT,
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
|
||||
};
|
||||
@@ -1008,16 +1007,13 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
|
||||
let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
|
||||
let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
|
||||
let grpc_port = DEFAULT_PAGESERVER_GRPC_PORT + i;
|
||||
NeonLocalInitPageserverConf {
|
||||
id: pageserver_id,
|
||||
listen_pg_addr: format!("127.0.0.1:{pg_port}"),
|
||||
listen_http_addr: format!("127.0.0.1:{http_port}"),
|
||||
listen_https_addr: None,
|
||||
listen_grpc_addr: Some(format!("127.0.0.1:{grpc_port}")),
|
||||
pg_auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
grpc_auth_type: AuthType::Trust,
|
||||
other: Default::default(),
|
||||
// Typical developer machines use disks with slow fsync, and we don't care
|
||||
// about data integrity: disable disk syncs.
|
||||
@@ -1279,7 +1275,6 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
|
||||
mode: pageserver_api::models::TimelineCreateRequestMode::Branch {
|
||||
ancestor_timeline_id,
|
||||
ancestor_start_lsn: start_lsn,
|
||||
read_only: false,
|
||||
pg_version: None,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -616,19 +616,16 @@ impl Endpoint {
|
||||
|
||||
/// Map safekeepers ids to the actual connection strings.
|
||||
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for sk_id in sk_ids {
|
||||
let sk = self
|
||||
.env
|
||||
sk_ids.into_iter()
|
||||
.map(|node_id| {
|
||||
self.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.find(|node| node.id == sk_id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
|
||||
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
|
||||
}
|
||||
}
|
||||
Ok(safekeeper_connstrings)
|
||||
.find(|node| node.id == node_id)
|
||||
.map(|node| format!("127.0.0.1:{}", node.get_compute_port()))
|
||||
.ok_or_else(|| anyhow!("safekeeer {node_id} does not exist"))
|
||||
})
|
||||
.collect::<Result<Vec<String>>>()
|
||||
}
|
||||
|
||||
/// Generate a JWT with the correct claims.
|
||||
|
||||
@@ -278,10 +278,8 @@ pub struct PageServerConf {
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub pg_auth_type: AuthType,
|
||||
pub http_auth_type: AuthType,
|
||||
pub grpc_auth_type: AuthType,
|
||||
pub no_sync: bool,
|
||||
}
|
||||
|
||||
@@ -292,10 +290,8 @@ impl Default for PageServerConf {
|
||||
listen_pg_addr: String::new(),
|
||||
listen_http_addr: String::new(),
|
||||
listen_https_addr: None,
|
||||
listen_grpc_addr: None,
|
||||
pg_auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
grpc_auth_type: AuthType::Trust,
|
||||
no_sync: false,
|
||||
}
|
||||
}
|
||||
@@ -310,10 +306,8 @@ pub struct NeonLocalInitPageserverConf {
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub pg_auth_type: AuthType,
|
||||
pub http_auth_type: AuthType,
|
||||
pub grpc_auth_type: AuthType,
|
||||
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
|
||||
pub no_sync: bool,
|
||||
#[serde(flatten)]
|
||||
@@ -327,10 +321,8 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
grpc_auth_type,
|
||||
no_sync,
|
||||
other: _,
|
||||
} = conf;
|
||||
@@ -339,9 +331,7 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
|
||||
listen_pg_addr: listen_pg_addr.clone(),
|
||||
listen_http_addr: listen_http_addr.clone(),
|
||||
listen_https_addr: listen_https_addr.clone(),
|
||||
listen_grpc_addr: listen_grpc_addr.clone(),
|
||||
pg_auth_type: *pg_auth_type,
|
||||
grpc_auth_type: *grpc_auth_type,
|
||||
http_auth_type: *http_auth_type,
|
||||
no_sync: *no_sync,
|
||||
}
|
||||
@@ -717,10 +707,8 @@ impl LocalEnv {
|
||||
listen_pg_addr: String,
|
||||
listen_http_addr: String,
|
||||
listen_https_addr: Option<String>,
|
||||
listen_grpc_addr: Option<String>,
|
||||
pg_auth_type: AuthType,
|
||||
http_auth_type: AuthType,
|
||||
grpc_auth_type: AuthType,
|
||||
#[serde(default)]
|
||||
no_sync: bool,
|
||||
}
|
||||
@@ -744,10 +732,8 @@ impl LocalEnv {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
grpc_auth_type,
|
||||
no_sync,
|
||||
} = config_toml;
|
||||
let IdentityTomlSubset {
|
||||
@@ -764,10 +750,8 @@ impl LocalEnv {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
grpc_auth_type,
|
||||
no_sync,
|
||||
};
|
||||
pageservers.push(conf);
|
||||
|
||||
@@ -129,9 +129,7 @@ impl PageServerNode {
|
||||
));
|
||||
}
|
||||
|
||||
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type]
|
||||
.contains(&AuthType::NeonJWT)
|
||||
{
|
||||
if conf.http_auth_type != AuthType::Trust || conf.pg_auth_type != AuthType::Trust {
|
||||
// Keys are generated in the toplevel repo dir, pageservers' workdirs
|
||||
// are one level below that, so refer to keys with ../
|
||||
overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());
|
||||
|
||||
@@ -20,7 +20,7 @@ first_path="$(ldconfig --verbose 2>/dev/null \
|
||||
| grep --invert-match ^$'\t' \
|
||||
| cut --delimiter=: --fields=1 \
|
||||
| head --lines=1)"
|
||||
test "$first_path" == '/usr/local/lib'
|
||||
test "$first_path" == '/usr/local/lib' || true # Remove the || true in a follow-up PR. Needed for backwards compat.
|
||||
|
||||
echo "Waiting pageserver become ready."
|
||||
while ! nc -z pageserver 6400; do
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
set -ex
|
||||
cd "$(dirname "${0}")"
|
||||
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
|
||||
dropdb --if-exists contrib_regression
|
||||
createdb contrib_regression
|
||||
cd h3_postgis/test
|
||||
psql -d contrib_regression -c "CREATE EXTENSION postgis" -c "CREATE EXTENSION postgis_raster" -c "CREATE EXTENSION h3" -c "CREATE EXTENSION h3_postgis"
|
||||
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
|
||||
${PG_REGRESS} --use-existing --dbname contrib_regression ${TESTS}
|
||||
cd ../../h3/test
|
||||
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
|
||||
dropdb --if-exists contrib_regression
|
||||
createdb contrib_regression
|
||||
psql -d contrib_regression -c "CREATE EXTENSION h3"
|
||||
${PG_REGRESS} --use-existing --dbname contrib_regression ${TESTS}
|
||||
@@ -1,7 +0,0 @@
|
||||
#!/bin/sh
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
|
||||
cd h3/test
|
||||
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
|
||||
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression ${TESTS}
|
||||
@@ -1,6 +0,0 @@
|
||||
#!/bin/sh
|
||||
set -ex
|
||||
cd "$(dirname "${0}")"
|
||||
if [ -f Makefile ]; then
|
||||
make installcheck
|
||||
fi
|
||||
@@ -1,9 +0,0 @@
|
||||
#!/bin/sh
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
[ -f Makefile ] || exit 0
|
||||
dropdb --if-exist contrib_regression
|
||||
createdb contrib_regression
|
||||
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
|
||||
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
|
||||
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression ${TESTS}
|
||||
@@ -82,8 +82,7 @@ EXTENSIONS='[
|
||||
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
|
||||
{"extname": "pgjwt", "extdir": "pgjwt-src"},
|
||||
{"extname": "pgtap", "extdir": "pgtap-src"},
|
||||
{"extname": "pg_repack", "extdir": "pg_repack-src"},
|
||||
{"extname": "h3", "extdir": "h3-pg-src"}
|
||||
{"extname": "pg_repack", "extdir": "pg_repack-src"}
|
||||
]'
|
||||
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
|
||||
COMPUTE_TAG=${NEW_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d
|
||||
|
||||
@@ -8,8 +8,6 @@ pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
// TODO: gRPC is disabled by default for now, but the port is used in neon_local.
|
||||
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::num::{NonZeroU64, NonZeroUsize};
|
||||
@@ -45,21 +43,6 @@ pub struct NodeMetadata {
|
||||
pub other: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
/// PostHog integration config.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct PostHogConfig {
|
||||
/// PostHog project ID
|
||||
pub project_id: String,
|
||||
/// Server-side (private) API key
|
||||
pub server_api_key: String,
|
||||
/// Client-side (public) API key
|
||||
pub client_api_key: String,
|
||||
/// Private API URL
|
||||
pub private_api_url: String,
|
||||
/// Public API URL
|
||||
pub public_api_url: String,
|
||||
}
|
||||
|
||||
/// `pageserver.toml`
|
||||
///
|
||||
/// We use serde derive with `#[serde(default)]` to generate a deserializer
|
||||
@@ -121,7 +104,6 @@ pub struct ConfigToml {
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
#[serde(with = "humantime_serde")]
|
||||
@@ -141,7 +123,6 @@ pub struct ConfigToml {
|
||||
pub http_auth_type: AuthType,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub pg_auth_type: AuthType,
|
||||
pub grpc_auth_type: AuthType,
|
||||
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub tenant_config: TenantConfigToml,
|
||||
@@ -201,8 +182,6 @@ pub struct ConfigToml {
|
||||
pub tracing: Option<Tracing>,
|
||||
pub enable_tls_page_service_api: bool,
|
||||
pub dev_mode: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub posthog_config: Option<PostHogConfig>,
|
||||
pub timeline_import_config: TimelineImportConfig,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub basebackup_cache_config: Option<BasebackupCacheConfig>,
|
||||
@@ -609,7 +588,6 @@ impl Default for ConfigToml {
|
||||
listen_pg_addr: (DEFAULT_PG_LISTEN_ADDR.to_string()),
|
||||
listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||
listen_https_addr: (None),
|
||||
listen_grpc_addr: None, // TODO: default to 127.0.0.1:51051
|
||||
ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE),
|
||||
ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE),
|
||||
ssl_cert_reload_period: Duration::from_secs(60),
|
||||
@@ -626,7 +604,6 @@ impl Default for ConfigToml {
|
||||
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()
|
||||
http_auth_type: (AuthType::Trust),
|
||||
pg_auth_type: (AuthType::Trust),
|
||||
grpc_auth_type: (AuthType::Trust),
|
||||
auth_validation_public_key_path: (None),
|
||||
remote_storage: None,
|
||||
broker_endpoint: (storage_broker::DEFAULT_ENDPOINT
|
||||
@@ -718,7 +695,6 @@ impl Default for ConfigToml {
|
||||
import_job_checkpoint_threshold: NonZeroUsize::new(128).unwrap(),
|
||||
},
|
||||
basebackup_cache_config: None,
|
||||
posthog_config: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -354,9 +354,6 @@ pub struct ShardImportProgressV1 {
|
||||
pub completed: usize,
|
||||
/// Hash of the plan
|
||||
pub import_plan_hash: u64,
|
||||
/// Soft limit for the job size
|
||||
/// This needs to remain constant throughout the import
|
||||
pub job_soft_size_limit: usize,
|
||||
}
|
||||
|
||||
impl ShardImportStatus {
|
||||
@@ -405,8 +402,6 @@ pub enum TimelineCreateRequestMode {
|
||||
// using a flattened enum, so, it was an accepted field, and
|
||||
// we continue to accept it by having it here.
|
||||
pg_version: Option<u32>,
|
||||
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
|
||||
read_only: bool,
|
||||
},
|
||||
ImportPgdata {
|
||||
import_pgdata: TimelineCreateRequestModeImportPgdata,
|
||||
|
||||
@@ -6,14 +6,9 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
reqwest.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
sha2.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
|
||||
tokio-util.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
tracing.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
@@ -1,59 +0,0 @@
|
||||
//! A background loop that fetches feature flags from PostHog and updates the feature store.
|
||||
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{FeatureStore, PostHogClient, PostHogClientConfig};
|
||||
|
||||
/// A background loop that fetches feature flags from PostHog and updates the feature store.
|
||||
pub struct FeatureResolverBackgroundLoop {
|
||||
posthog_client: PostHogClient,
|
||||
feature_store: ArcSwap<FeatureStore>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl FeatureResolverBackgroundLoop {
|
||||
pub fn new(config: PostHogClientConfig, shutdown_pageserver: CancellationToken) -> Self {
|
||||
Self {
|
||||
posthog_client: PostHogClient::new(config),
|
||||
feature_store: ArcSwap::new(Arc::new(FeatureStore::new())),
|
||||
cancel: shutdown_pageserver,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn(self: Arc<Self>, handle: &tokio::runtime::Handle, refresh_period: Duration) {
|
||||
let this = self.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
handle.spawn(async move {
|
||||
tracing::info!("Starting PostHog feature resolver");
|
||||
let mut ticker = tokio::time::interval(refresh_period);
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = ticker.tick() => {}
|
||||
_ = cancel.cancelled() => break
|
||||
}
|
||||
let resp = match this
|
||||
.posthog_client
|
||||
.get_feature_flags_local_evaluation()
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
tracing::warn!("Cannot get feature flags: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let feature_store = FeatureStore::new_with_flags(resp.flags);
|
||||
this.feature_store.store(Arc::new(feature_store));
|
||||
}
|
||||
tracing::info!("PostHog feature resolver stopped");
|
||||
});
|
||||
}
|
||||
|
||||
pub fn feature_store(&self) -> Arc<FeatureStore> {
|
||||
self.feature_store.load_full()
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,5 @@
|
||||
//! A lite version of the PostHog client that only supports local evaluation of feature flags.
|
||||
|
||||
mod background_loop;
|
||||
|
||||
pub use background_loop::FeatureResolverBackgroundLoop;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -24,7 +20,8 @@ pub enum PostHogEvaluationError {
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct LocalEvaluationResponse {
|
||||
pub flags: Vec<LocalEvaluationFlag>,
|
||||
#[allow(dead_code)]
|
||||
flags: Vec<LocalEvaluationFlag>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -37,7 +34,7 @@ pub struct LocalEvaluationFlag {
|
||||
#[derive(Deserialize)]
|
||||
pub struct LocalEvaluationFlagFilters {
|
||||
groups: Vec<LocalEvaluationFlagFilterGroup>,
|
||||
multivariate: Option<LocalEvaluationFlagMultivariate>,
|
||||
multivariate: LocalEvaluationFlagMultivariate,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -97,12 +94,6 @@ impl FeatureStore {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_flags(flags: Vec<LocalEvaluationFlag>) -> Self {
|
||||
let mut store = Self::new();
|
||||
store.set_flags(flags);
|
||||
store
|
||||
}
|
||||
|
||||
pub fn set_flags(&mut self, flags: Vec<LocalEvaluationFlag>) {
|
||||
self.flags.clear();
|
||||
for flag in flags {
|
||||
@@ -254,7 +245,7 @@ impl FeatureStore {
|
||||
}
|
||||
}
|
||||
|
||||
/// Evaluate a multivariate feature flag. Returns an error if the flag is not available or if there are errors
|
||||
/// Evaluate a multivariate feature flag. Returns `None` if the flag is not available or if there are errors
|
||||
/// during the evaluation.
|
||||
///
|
||||
/// The parsing logic is as follows:
|
||||
@@ -272,15 +263,10 @@ impl FeatureStore {
|
||||
/// Example: we have a multivariate flag with 3 groups of the configured global rollout percentage: A (10%), B (20%), C (70%).
|
||||
/// There is a single group with a condition that has a rollout percentage of 10% and it does not have a variant override.
|
||||
/// Then, we will have 1% of the users evaluated to A, 2% to B, and 7% to C.
|
||||
///
|
||||
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
|
||||
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
|
||||
/// propagated beyond where the feature flag gets resolved.
|
||||
pub fn evaluate_multivariate(
|
||||
&self,
|
||||
flag_key: &str,
|
||||
user_id: &str,
|
||||
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> Result<String, PostHogEvaluationError> {
|
||||
let hash_on_global_rollout_percentage =
|
||||
Self::consistent_hash(user_id, flag_key, "multivariate");
|
||||
@@ -290,39 +276,10 @@ impl FeatureStore {
|
||||
flag_key,
|
||||
hash_on_global_rollout_percentage,
|
||||
hash_on_group_rollout_percentage,
|
||||
properties,
|
||||
&HashMap::new(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Evaluate a boolean feature flag. Returns an error if the flag is not available or if there are errors
|
||||
/// during the evaluation.
|
||||
///
|
||||
/// The parsing logic is as follows:
|
||||
///
|
||||
/// * Generate a consistent hash for the tenant-feature.
|
||||
/// * Match each filter group.
|
||||
/// - If a group is matched, it will first determine whether the user is in the range of the rollout
|
||||
/// percentage.
|
||||
/// - If the hash falls within the group's rollout percentage, return true.
|
||||
/// * Otherwise, continue with the next group until all groups are evaluated and no group is within the
|
||||
/// rollout percentage.
|
||||
/// * If there are no matching groups, return an error.
|
||||
///
|
||||
/// Returns `Ok(())` if the feature flag evaluates to true. In the future, it will return a payload.
|
||||
///
|
||||
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
|
||||
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
|
||||
/// propagated beyond where the feature flag gets resolved.
|
||||
pub fn evaluate_boolean(
|
||||
&self,
|
||||
flag_key: &str,
|
||||
user_id: &str,
|
||||
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> Result<(), PostHogEvaluationError> {
|
||||
let hash_on_global_rollout_percentage = Self::consistent_hash(user_id, flag_key, "boolean");
|
||||
self.evaluate_boolean_inner(flag_key, hash_on_global_rollout_percentage, properties)
|
||||
}
|
||||
|
||||
/// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID
|
||||
/// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests
|
||||
/// and avoid duplicate computations.
|
||||
@@ -349,11 +306,6 @@ impl FeatureStore {
|
||||
flag_key
|
||||
)));
|
||||
}
|
||||
let Some(ref multivariate) = flag_config.filters.multivariate else {
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"No multivariate available, should use evaluate_boolean?: {flag_key}"
|
||||
)));
|
||||
};
|
||||
// TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog
|
||||
// Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it
|
||||
// does not matter.
|
||||
@@ -362,7 +314,7 @@ impl FeatureStore {
|
||||
GroupEvaluationResult::MatchedAndOverride(variant) => return Ok(variant),
|
||||
GroupEvaluationResult::MatchedAndEvaluate => {
|
||||
let mut percentage = 0;
|
||||
for variant in &multivariate.variants {
|
||||
for variant in &flag_config.filters.multivariate.variants {
|
||||
percentage += variant.rollout_percentage;
|
||||
if self
|
||||
.evaluate_percentage(hash_on_global_rollout_percentage, percentage)
|
||||
@@ -390,77 +342,6 @@ impl FeatureStore {
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID
|
||||
/// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests
|
||||
/// and avoid duplicate computations.
|
||||
///
|
||||
/// Use a different consistent hash for evaluating the group rollout percentage.
|
||||
/// The behavior: if the condition is set to rolling out to 10% of the users, and
|
||||
/// we set the variant A to 20% in the global config, then 2% of the total users will
|
||||
/// be evaluated to variant A.
|
||||
///
|
||||
/// Note that the hash to determine group rollout percentage is shared across all groups. So if we have two
|
||||
/// exactly-the-same conditions with 10% and 20% rollout percentage respectively, a total of 20% of the users
|
||||
/// will be evaluated (versus 30% if group evaluation is done independently).
|
||||
pub(crate) fn evaluate_boolean_inner(
|
||||
&self,
|
||||
flag_key: &str,
|
||||
hash_on_global_rollout_percentage: f64,
|
||||
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> Result<(), PostHogEvaluationError> {
|
||||
if let Some(flag_config) = self.flags.get(flag_key) {
|
||||
if !flag_config.active {
|
||||
return Err(PostHogEvaluationError::NotAvailable(format!(
|
||||
"The feature flag is not active: {}",
|
||||
flag_key
|
||||
)));
|
||||
}
|
||||
if flag_config.filters.multivariate.is_some() {
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"This looks like a multivariate flag, should use evaluate_multivariate?: {flag_key}"
|
||||
)));
|
||||
};
|
||||
// TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog
|
||||
// Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it
|
||||
// does not matter.
|
||||
for group in &flag_config.filters.groups {
|
||||
match self.evaluate_group(group, hash_on_global_rollout_percentage, properties)? {
|
||||
GroupEvaluationResult::MatchedAndOverride(_) => {
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"Boolean flag cannot have overrides: {}",
|
||||
flag_key
|
||||
)));
|
||||
}
|
||||
GroupEvaluationResult::MatchedAndEvaluate => {
|
||||
return Ok(());
|
||||
}
|
||||
GroupEvaluationResult::Unmatched => continue,
|
||||
}
|
||||
}
|
||||
// If no group is matched, the feature is not available, and up to the caller to decide what to do.
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
} else {
|
||||
// The feature flag is not available yet
|
||||
Err(PostHogEvaluationError::NotAvailable(format!(
|
||||
"Not found in the local evaluation spec: {}",
|
||||
flag_key
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostHogClientConfig {
|
||||
/// The server API key.
|
||||
pub server_api_key: String,
|
||||
/// The client API key.
|
||||
pub client_api_key: String,
|
||||
/// The project ID.
|
||||
pub project_id: String,
|
||||
/// The private API URL.
|
||||
pub private_api_url: String,
|
||||
/// The public API URL.
|
||||
pub public_api_url: String,
|
||||
}
|
||||
|
||||
/// A lite PostHog client.
|
||||
@@ -479,16 +360,37 @@ pub struct PostHogClientConfig {
|
||||
/// want to report the feature flag usage back to PostHog. The current plan is to use PostHog only as an UI to
|
||||
/// configure feature flags so it is very likely that the client API will not be used.
|
||||
pub struct PostHogClient {
|
||||
/// The config.
|
||||
config: PostHogClientConfig,
|
||||
/// The server API key.
|
||||
server_api_key: String,
|
||||
/// The client API key.
|
||||
client_api_key: String,
|
||||
/// The project ID.
|
||||
project_id: String,
|
||||
/// The private API URL.
|
||||
private_api_url: String,
|
||||
/// The public API URL.
|
||||
public_api_url: String,
|
||||
/// The HTTP client.
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
impl PostHogClient {
|
||||
pub fn new(config: PostHogClientConfig) -> Self {
|
||||
pub fn new(
|
||||
server_api_key: String,
|
||||
client_api_key: String,
|
||||
project_id: String,
|
||||
private_api_url: String,
|
||||
public_api_url: String,
|
||||
) -> Self {
|
||||
let client = reqwest::Client::new();
|
||||
Self { config, client }
|
||||
Self {
|
||||
server_api_key,
|
||||
client_api_key,
|
||||
project_id,
|
||||
private_api_url,
|
||||
public_api_url,
|
||||
client,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_us_region(
|
||||
@@ -496,13 +398,13 @@ impl PostHogClient {
|
||||
client_api_key: String,
|
||||
project_id: String,
|
||||
) -> Self {
|
||||
Self::new(PostHogClientConfig {
|
||||
Self::new(
|
||||
server_api_key,
|
||||
client_api_key,
|
||||
project_id,
|
||||
private_api_url: "https://us.posthog.com".to_string(),
|
||||
public_api_url: "https://us.i.posthog.com".to_string(),
|
||||
})
|
||||
"https://us.posthog.com".to_string(),
|
||||
"https://us.i.posthog.com".to_string(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Fetch the feature flag specs from the server.
|
||||
@@ -520,12 +422,12 @@ impl PostHogClient {
|
||||
// with bearer token of self.server_api_key
|
||||
let url = format!(
|
||||
"{}/api/projects/{}/feature_flags/local_evaluation",
|
||||
self.config.private_api_url, self.config.project_id
|
||||
self.private_api_url, self.project_id
|
||||
);
|
||||
let response = self
|
||||
.client
|
||||
.get(url)
|
||||
.bearer_auth(&self.config.server_api_key)
|
||||
.bearer_auth(&self.server_api_key)
|
||||
.send()
|
||||
.await?;
|
||||
let body = response.text().await?;
|
||||
@@ -544,11 +446,11 @@ impl PostHogClient {
|
||||
) -> anyhow::Result<()> {
|
||||
// PUBLIC_URL/capture/
|
||||
// with bearer token of self.client_api_key
|
||||
let url = format!("{}/capture/", self.config.public_api_url);
|
||||
let url = format!("{}/capture/", self.public_api_url);
|
||||
self.client
|
||||
.post(url)
|
||||
.body(serde_json::to_string(&json!({
|
||||
"api_key": self.config.client_api_key,
|
||||
"api_key": self.client_api_key,
|
||||
"distinct_id": distinct_id,
|
||||
"event": event,
|
||||
"properties": properties,
|
||||
@@ -565,162 +467,95 @@ mod tests {
|
||||
|
||||
fn data() -> &'static str {
|
||||
r#"{
|
||||
"flags": [
|
||||
{
|
||||
"id": 141807,
|
||||
"team_id": 152860,
|
||||
"name": "",
|
||||
"key": "image-compaction-boundary",
|
||||
"filters": {
|
||||
"groups": [
|
||||
{
|
||||
"variant": null,
|
||||
"properties": [
|
||||
{
|
||||
"key": "plan_type",
|
||||
"type": "person",
|
||||
"value": [
|
||||
"free"
|
||||
],
|
||||
"operator": "exact"
|
||||
}
|
||||
"flags": [
|
||||
{
|
||||
"id": 132794,
|
||||
"team_id": 152860,
|
||||
"name": "",
|
||||
"key": "gc-compaction",
|
||||
"filters": {
|
||||
"groups": [
|
||||
{
|
||||
"variant": "enabled-stage-2",
|
||||
"properties": [
|
||||
{
|
||||
"key": "plan_type",
|
||||
"type": "person",
|
||||
"value": [
|
||||
"free"
|
||||
],
|
||||
"operator": "exact"
|
||||
},
|
||||
{
|
||||
"key": "pageserver_remote_size",
|
||||
"type": "person",
|
||||
"value": "10000000",
|
||||
"operator": "lt"
|
||||
}
|
||||
],
|
||||
"rollout_percentage": 50
|
||||
},
|
||||
{
|
||||
"properties": [
|
||||
{
|
||||
"key": "plan_type",
|
||||
"type": "person",
|
||||
"value": [
|
||||
"free"
|
||||
],
|
||||
"operator": "exact"
|
||||
},
|
||||
{
|
||||
"key": "pageserver_remote_size",
|
||||
"type": "person",
|
||||
"value": "10000000",
|
||||
"operator": "lt"
|
||||
}
|
||||
],
|
||||
"rollout_percentage": 80
|
||||
}
|
||||
],
|
||||
"payloads": {},
|
||||
"multivariate": {
|
||||
"variants": [
|
||||
{
|
||||
"key": "disabled",
|
||||
"name": "",
|
||||
"rollout_percentage": 90
|
||||
},
|
||||
{
|
||||
"key": "enabled-stage-1",
|
||||
"name": "",
|
||||
"rollout_percentage": 10
|
||||
},
|
||||
{
|
||||
"key": "enabled-stage-2",
|
||||
"name": "",
|
||||
"rollout_percentage": 0
|
||||
},
|
||||
{
|
||||
"key": "enabled-stage-3",
|
||||
"name": "",
|
||||
"rollout_percentage": 0
|
||||
},
|
||||
{
|
||||
"key": "enabled",
|
||||
"name": "",
|
||||
"rollout_percentage": 0
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"deleted": false,
|
||||
"active": true,
|
||||
"ensure_experience_continuity": false,
|
||||
"has_encrypted_payloads": false,
|
||||
"version": 6
|
||||
}
|
||||
],
|
||||
"rollout_percentage": 40
|
||||
},
|
||||
{
|
||||
"variant": null,
|
||||
"properties": [],
|
||||
"rollout_percentage": 10
|
||||
}
|
||||
],
|
||||
"payloads": {},
|
||||
"multivariate": null
|
||||
},
|
||||
"deleted": false,
|
||||
"active": true,
|
||||
"ensure_experience_continuity": false,
|
||||
"has_encrypted_payloads": false,
|
||||
"version": 1
|
||||
},
|
||||
{
|
||||
"id": 135586,
|
||||
"team_id": 152860,
|
||||
"name": "",
|
||||
"key": "boolean-flag",
|
||||
"filters": {
|
||||
"groups": [
|
||||
{
|
||||
"variant": null,
|
||||
"properties": [
|
||||
{
|
||||
"key": "plan_type",
|
||||
"type": "person",
|
||||
"value": [
|
||||
"free"
|
||||
],
|
||||
"operator": "exact"
|
||||
}
|
||||
],
|
||||
"rollout_percentage": 47
|
||||
}
|
||||
],
|
||||
"payloads": {},
|
||||
"multivariate": null
|
||||
},
|
||||
"deleted": false,
|
||||
"active": true,
|
||||
"ensure_experience_continuity": false,
|
||||
"has_encrypted_payloads": false,
|
||||
"version": 1
|
||||
},
|
||||
{
|
||||
"id": 132794,
|
||||
"team_id": 152860,
|
||||
"name": "",
|
||||
"key": "gc-compaction",
|
||||
"filters": {
|
||||
"groups": [
|
||||
{
|
||||
"variant": "enabled-stage-2",
|
||||
"properties": [
|
||||
{
|
||||
"key": "plan_type",
|
||||
"type": "person",
|
||||
"value": [
|
||||
"free"
|
||||
],
|
||||
"operator": "exact"
|
||||
},
|
||||
{
|
||||
"key": "pageserver_remote_size",
|
||||
"type": "person",
|
||||
"value": "10000000",
|
||||
"operator": "lt"
|
||||
}
|
||||
],
|
||||
"rollout_percentage": 50
|
||||
},
|
||||
{
|
||||
"properties": [
|
||||
{
|
||||
"key": "plan_type",
|
||||
"type": "person",
|
||||
"value": [
|
||||
"free"
|
||||
],
|
||||
"operator": "exact"
|
||||
},
|
||||
{
|
||||
"key": "pageserver_remote_size",
|
||||
"type": "person",
|
||||
"value": "10000000",
|
||||
"operator": "lt"
|
||||
}
|
||||
],
|
||||
"rollout_percentage": 80
|
||||
}
|
||||
],
|
||||
"payloads": {},
|
||||
"multivariate": {
|
||||
"variants": [
|
||||
{
|
||||
"key": "disabled",
|
||||
"name": "",
|
||||
"rollout_percentage": 90
|
||||
},
|
||||
{
|
||||
"key": "enabled-stage-1",
|
||||
"name": "",
|
||||
"rollout_percentage": 10
|
||||
},
|
||||
{
|
||||
"key": "enabled-stage-2",
|
||||
"name": "",
|
||||
"rollout_percentage": 0
|
||||
},
|
||||
{
|
||||
"key": "enabled-stage-3",
|
||||
"name": "",
|
||||
"rollout_percentage": 0
|
||||
},
|
||||
{
|
||||
"key": "enabled",
|
||||
"name": "",
|
||||
"rollout_percentage": 0
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"deleted": false,
|
||||
"active": true,
|
||||
"ensure_experience_continuity": false,
|
||||
"has_encrypted_payloads": false,
|
||||
"version": 7
|
||||
}
|
||||
],
|
||||
"group_type_mapping": {},
|
||||
"cohorts": {}
|
||||
}"#
|
||||
"group_type_mapping": {},
|
||||
"cohorts": {}
|
||||
}"#
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -796,125 +631,4 @@ mod tests {
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
),);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evaluate_boolean_1() {
|
||||
// The `boolean-flag` feature flag only has one group that matches on the free user.
|
||||
|
||||
let mut store = FeatureStore::new();
|
||||
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
|
||||
store.set_flags(response.flags);
|
||||
|
||||
// This lacks the required properties and cannot be evaluated.
|
||||
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &HashMap::new());
|
||||
assert!(matches!(
|
||||
variant,
|
||||
Err(PostHogEvaluationError::NotAvailable(_))
|
||||
),);
|
||||
|
||||
let properties_unmatched = HashMap::from([
|
||||
(
|
||||
"plan_type".to_string(),
|
||||
PostHogFlagFilterPropertyValue::String("paid".to_string()),
|
||||
),
|
||||
(
|
||||
"pageserver_remote_size".to_string(),
|
||||
PostHogFlagFilterPropertyValue::Number(1000.0),
|
||||
),
|
||||
]);
|
||||
|
||||
// This does not match any group so there will be an error.
|
||||
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &properties_unmatched);
|
||||
assert!(matches!(
|
||||
variant,
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
),);
|
||||
|
||||
let properties = HashMap::from([
|
||||
(
|
||||
"plan_type".to_string(),
|
||||
PostHogFlagFilterPropertyValue::String("free".to_string()),
|
||||
),
|
||||
(
|
||||
"pageserver_remote_size".to_string(),
|
||||
PostHogFlagFilterPropertyValue::Number(1000.0),
|
||||
),
|
||||
]);
|
||||
|
||||
// It matches the first group as 0.10 <= 0.50 and the properties are matched. Then it gets evaluated to the variant override.
|
||||
let variant = store.evaluate_boolean_inner("boolean-flag", 0.10, &properties);
|
||||
assert!(variant.is_ok());
|
||||
|
||||
// It matches the group conditions but not the group rollout percentage.
|
||||
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &properties);
|
||||
assert!(matches!(
|
||||
variant,
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
),);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evaluate_boolean_2() {
|
||||
// The `image-compaction-boundary` feature flag has one group that matches on the free user and a group that matches on all users.
|
||||
|
||||
let mut store = FeatureStore::new();
|
||||
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
|
||||
store.set_flags(response.flags);
|
||||
|
||||
// This lacks the required properties and cannot be evaluated.
|
||||
let variant =
|
||||
store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &HashMap::new());
|
||||
assert!(matches!(
|
||||
variant,
|
||||
Err(PostHogEvaluationError::NotAvailable(_))
|
||||
),);
|
||||
|
||||
let properties_unmatched = HashMap::from([
|
||||
(
|
||||
"plan_type".to_string(),
|
||||
PostHogFlagFilterPropertyValue::String("paid".to_string()),
|
||||
),
|
||||
(
|
||||
"pageserver_remote_size".to_string(),
|
||||
PostHogFlagFilterPropertyValue::Number(1000.0),
|
||||
),
|
||||
]);
|
||||
|
||||
// This does not match the filtered group but the all user group.
|
||||
let variant =
|
||||
store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &properties_unmatched);
|
||||
assert!(matches!(
|
||||
variant,
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
),);
|
||||
let variant =
|
||||
store.evaluate_boolean_inner("image-compaction-boundary", 0.05, &properties_unmatched);
|
||||
assert!(variant.is_ok());
|
||||
|
||||
let properties = HashMap::from([
|
||||
(
|
||||
"plan_type".to_string(),
|
||||
PostHogFlagFilterPropertyValue::String("free".to_string()),
|
||||
),
|
||||
(
|
||||
"pageserver_remote_size".to_string(),
|
||||
PostHogFlagFilterPropertyValue::Number(1000.0),
|
||||
),
|
||||
]);
|
||||
|
||||
// It matches the first group as 0.30 <= 0.40 and the properties are matched. Then it gets evaluated to the variant override.
|
||||
let variant = store.evaluate_boolean_inner("image-compaction-boundary", 0.30, &properties);
|
||||
assert!(variant.is_ok());
|
||||
|
||||
// It matches the group conditions but not the group rollout percentage.
|
||||
let variant = store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &properties);
|
||||
assert!(matches!(
|
||||
variant,
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
),);
|
||||
|
||||
// It matches the second "all" group conditions.
|
||||
let variant = store.evaluate_boolean_inner("image-compaction-boundary", 0.09, &properties);
|
||||
assert!(variant.is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
#![allow(clippy::todo)]
|
||||
|
||||
use std::ffi::CString;
|
||||
use std::str::FromStr;
|
||||
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use utils::id::TenantTimelineId;
|
||||
@@ -174,8 +173,6 @@ pub struct Config {
|
||||
pub ttid: TenantTimelineId,
|
||||
/// List of safekeepers in format `host:port`
|
||||
pub safekeepers_list: Vec<String>,
|
||||
/// libpq connection info options
|
||||
pub safekeeper_conninfo_options: String,
|
||||
/// Safekeeper reconnect timeout in milliseconds
|
||||
pub safekeeper_reconnect_timeout: i32,
|
||||
/// Safekeeper connection timeout in milliseconds
|
||||
@@ -205,9 +202,6 @@ impl Wrapper {
|
||||
.into_bytes_with_nul();
|
||||
assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
|
||||
let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
|
||||
let safekeeper_conninfo_options = CString::from_str(&config.safekeeper_conninfo_options)
|
||||
.unwrap()
|
||||
.into_raw();
|
||||
|
||||
let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
|
||||
|
||||
@@ -215,11 +209,11 @@ impl Wrapper {
|
||||
neon_tenant,
|
||||
neon_timeline,
|
||||
safekeepers_list,
|
||||
safekeeper_conninfo_options,
|
||||
safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
|
||||
safekeeper_connection_timeout: config.safekeeper_connection_timeout,
|
||||
wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
|
||||
syncSafekeepers: config.sync_safekeepers,
|
||||
replicaPromote: false,
|
||||
systemId: 0,
|
||||
pgTimeline: 1,
|
||||
proto_version: 3,
|
||||
@@ -583,7 +577,6 @@ mod tests {
|
||||
let config = crate::walproposer::Config {
|
||||
ttid,
|
||||
safekeepers_list: vec!["localhost:5000".to_string()],
|
||||
safekeeper_conninfo_options: String::new(),
|
||||
safekeeper_reconnect_timeout: 1000,
|
||||
safekeeper_connection_timeout: 10000,
|
||||
sync_safekeepers: true,
|
||||
|
||||
@@ -17,69 +17,50 @@ anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-stream.workspace = true
|
||||
bincode.workspace = true
|
||||
bit_field.workspace = true
|
||||
bincode.workspace = true
|
||||
byteorder.workspace = true
|
||||
bytes.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
camino.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
clap = { workspace = true, features = ["string"] }
|
||||
consumption_metrics.workspace = true
|
||||
crc32c.workspace = true
|
||||
either.workspace = true
|
||||
enum-map.workspace = true
|
||||
enumset = { workspace = true, features = ["serde"]}
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
hashlink.workspace = true
|
||||
hex.workspace = true
|
||||
http-utils.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
hyper0.workspace = true
|
||||
itertools.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
md5.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
num_cpus.workspace = true # hack to get the number of worker threads tokio uses
|
||||
# hack to get the number of worker threads tokio uses
|
||||
num_cpus.workspace = true
|
||||
num-traits.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that
|
||||
pageserver_compaction.workspace = true
|
||||
pageserver_page_api.workspace = true
|
||||
pem.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
postgres_initdb.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
postgres-types.workspace = true
|
||||
posthog_client_lite.workspace = true
|
||||
postgres_initdb.workspace = true
|
||||
pprof.workspace = true
|
||||
pq_proto.workspace = true
|
||||
rand.workspace = true
|
||||
range-set-blaze = { version = "0.1.16", features = ["alloc"] }
|
||||
regex.workspace = true
|
||||
remote_storage.workspace = true
|
||||
reqwest.workspace = true
|
||||
rpds.workspace = true
|
||||
rustls.workspace = true
|
||||
scopeguard.workspace = true
|
||||
send-future.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json = { workspace = true, features = ["raw_value"] }
|
||||
serde_path_to_error.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde.workspace = true
|
||||
smallvec.workspace = true
|
||||
storage_broker.workspace = true
|
||||
strum_macros.workspace = true
|
||||
strum.workspace = true
|
||||
sysinfo.workspace = true
|
||||
tenant_size_model.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
|
||||
@@ -88,18 +69,34 @@ tokio-io-timeout.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit = { workspace = true, features = [ "serde" ] }
|
||||
tonic.workspace = true
|
||||
tonic-reflection.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
url.workspace = true
|
||||
utils.workspace = true
|
||||
wal_decoder.workspace = true
|
||||
walkdir.workspace = true
|
||||
metrics.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that
|
||||
pageserver_compaction.workspace = true
|
||||
pem.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
pq_proto.workspace = true
|
||||
remote_storage.workspace = true
|
||||
storage_broker.workspace = true
|
||||
tenant_size_model.workspace = true
|
||||
http-utils.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
reqwest.workspace = true
|
||||
rpds.workspace = true
|
||||
enum-map.workspace = true
|
||||
enumset = { workspace = true, features = ["serde"]}
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
wal_decoder.workspace = true
|
||||
smallvec.workspace = true
|
||||
twox-hash.workspace = true
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
|
||||
@@ -5,14 +5,8 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
bytes.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
prost.workspace = true
|
||||
smallvec.workspace = true
|
||||
thiserror.workspace = true
|
||||
tonic.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
|
||||
@@ -54,9 +54,9 @@ service PageService {
|
||||
// RPCs use regular unary requests, since they are not as frequent and
|
||||
// performance-critical, and this simplifies implementation.
|
||||
//
|
||||
// NB: a gRPC status response (e.g. errors) will terminate the stream. The
|
||||
// stream may be shared by multiple Postgres backends, so we avoid this by
|
||||
// sending them as GetPageResponse.status_code instead.
|
||||
// NB: a status response (e.g. errors) will terminate the stream. The stream
|
||||
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
|
||||
// Most errors are therefore sent as GetPageResponse.status instead.
|
||||
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
|
||||
|
||||
// Returns the size of a relation, as # of blocks.
|
||||
@@ -159,8 +159,8 @@ message GetPageRequest {
|
||||
// A GetPageRequest class. Primarily intended for observability, but may also be
|
||||
// used for prioritization in the future.
|
||||
enum GetPageClass {
|
||||
// Unknown class. For backwards compatibility: used when an older client version sends a class
|
||||
// that a newer server version has removed.
|
||||
// Unknown class. For forwards compatibility: used when the client sends a
|
||||
// class that the server doesn't know about.
|
||||
GET_PAGE_CLASS_UNKNOWN = 0;
|
||||
// A normal request. This is the default.
|
||||
GET_PAGE_CLASS_NORMAL = 1;
|
||||
@@ -180,37 +180,31 @@ message GetPageResponse {
|
||||
// The original request's ID.
|
||||
uint64 request_id = 1;
|
||||
// The response status code.
|
||||
GetPageStatusCode status_code = 2;
|
||||
GetPageStatus status = 2;
|
||||
// A string describing the status, if any.
|
||||
string reason = 3;
|
||||
// The 8KB page images, in the same order as the request. Empty if status_code != OK.
|
||||
// The 8KB page images, in the same order as the request. Empty if status != OK.
|
||||
repeated bytes page_image = 4;
|
||||
}
|
||||
|
||||
// A GetPageResponse status code.
|
||||
//
|
||||
// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
|
||||
// (potentially shared by many backends), and a gRPC status response would terminate the stream so
|
||||
// we send GetPageResponse messages with these codes instead.
|
||||
enum GetPageStatusCode {
|
||||
// Unknown status. For forwards compatibility: used when an older client version receives a new
|
||||
// status code from a newer server version.
|
||||
GET_PAGE_STATUS_CODE_UNKNOWN = 0;
|
||||
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
|
||||
// want to send errors as gRPC statuses, since this would terminate the stream.
|
||||
enum GetPageStatus {
|
||||
// Unknown status. For forwards compatibility: used when the server sends a
|
||||
// status code that the client doesn't know about.
|
||||
GET_PAGE_STATUS_UNKNOWN = 0;
|
||||
// The request was successful.
|
||||
GET_PAGE_STATUS_CODE_OK = 1;
|
||||
GET_PAGE_STATUS_OK = 1;
|
||||
// The page did not exist. The tenant/timeline/shard has already been
|
||||
// validated during stream setup.
|
||||
GET_PAGE_STATUS_CODE_NOT_FOUND = 2;
|
||||
GET_PAGE_STATUS_NOT_FOUND = 2;
|
||||
// The request was invalid.
|
||||
GET_PAGE_STATUS_CODE_INVALID_REQUEST = 3;
|
||||
// The request failed due to an internal server error.
|
||||
GET_PAGE_STATUS_CODE_INTERNAL_ERROR = 4;
|
||||
GET_PAGE_STATUS_INVALID = 3;
|
||||
// The tenant is rate limited. Slow down and retry later.
|
||||
GET_PAGE_STATUS_CODE_SLOW_DOWN = 5;
|
||||
// NB: shutdown errors are emitted as a gRPC Unavailable status.
|
||||
//
|
||||
// TODO: consider adding a GET_PAGE_STATUS_CODE_LAYER_DOWNLOAD in the case of a layer download.
|
||||
// This could free up the server task to process other requests while the download is in progress.
|
||||
GET_PAGE_STATUS_SLOW_DOWN = 4;
|
||||
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
|
||||
// layer download. This could free up the server task to process other
|
||||
// requests while the layer download is in progress.
|
||||
}
|
||||
|
||||
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on
|
||||
|
||||
@@ -17,7 +17,3 @@ pub mod proto {
|
||||
pub use page_service_client::PageServiceClient;
|
||||
pub use page_service_server::{PageService, PageServiceServer};
|
||||
}
|
||||
|
||||
mod model;
|
||||
|
||||
pub use model::*;
|
||||
|
||||
@@ -1,595 +0,0 @@
|
||||
//! Structs representing the canonical page service API.
|
||||
//!
|
||||
//! These mirror the autogenerated Protobuf types. The differences are:
|
||||
//!
|
||||
//! - Types that are in fact required by the API are not Options. The protobuf "required"
|
||||
//! attribute is deprecated and 'prost' marks a lot of members as optional because of that.
|
||||
//! (See <https://github.com/tokio-rs/prost/issues/800> for a gripe on this)
|
||||
//!
|
||||
//! - Use more precise datatypes, e.g. Lsn and uints shorter than 32 bits.
|
||||
//!
|
||||
//! - Validate protocol invariants, via try_from() and try_into().
|
||||
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::Oid;
|
||||
use smallvec::SmallVec;
|
||||
// TODO: split out Lsn, RelTag, SlruKind, Oid and other basic types to a separate crate, to avoid
|
||||
// pulling in all of their other crate dependencies when building the client.
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::proto;
|
||||
|
||||
/// A protocol error. Typically returned via try_from() or try_into().
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ProtocolError {
|
||||
#[error("field '{0}' has invalid value '{1}'")]
|
||||
Invalid(&'static str, String),
|
||||
#[error("required field '{0}' is missing")]
|
||||
Missing(&'static str),
|
||||
}
|
||||
|
||||
impl ProtocolError {
|
||||
/// Helper to generate a new ProtocolError::Invalid for the given field and value.
|
||||
pub fn invalid(field: &'static str, value: impl std::fmt::Debug) -> Self {
|
||||
Self::Invalid(field, format!("{value:?}"))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ProtocolError> for tonic::Status {
|
||||
fn from(err: ProtocolError) -> Self {
|
||||
tonic::Status::invalid_argument(format!("{err}"))
|
||||
}
|
||||
}
|
||||
|
||||
/// The LSN a request should read at.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct ReadLsn {
|
||||
/// The request's read LSN.
|
||||
pub request_lsn: Lsn,
|
||||
/// If given, the caller guarantees that the page has not been modified since this LSN. Must be
|
||||
/// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page
|
||||
/// without waiting for the request LSN to arrive. Valid for all request types.
|
||||
///
|
||||
/// It is undefined behaviour to make a request such that the page was, in fact, modified
|
||||
/// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an
|
||||
/// error, or it might return the old page version or the new page version. Setting
|
||||
/// not_modified_since_lsn equal to request_lsn is always safe, but can lead to unnecessary
|
||||
/// waiting.
|
||||
pub not_modified_since_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
impl ReadLsn {
|
||||
/// Validates the ReadLsn.
|
||||
pub fn validate(&self) -> Result<(), ProtocolError> {
|
||||
if self.request_lsn == Lsn::INVALID {
|
||||
return Err(ProtocolError::invalid("request_lsn", self.request_lsn));
|
||||
}
|
||||
if self.not_modified_since_lsn > Some(self.request_lsn) {
|
||||
return Err(ProtocolError::invalid(
|
||||
"not_modified_since_lsn",
|
||||
self.not_modified_since_lsn,
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<proto::ReadLsn> for ReadLsn {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::ReadLsn) -> Result<Self, Self::Error> {
|
||||
let read_lsn = Self {
|
||||
request_lsn: Lsn(pb.request_lsn),
|
||||
not_modified_since_lsn: match pb.not_modified_since_lsn {
|
||||
0 => None,
|
||||
lsn => Some(Lsn(lsn)),
|
||||
},
|
||||
};
|
||||
read_lsn.validate()?;
|
||||
Ok(read_lsn)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ReadLsn> for proto::ReadLsn {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(read_lsn: ReadLsn) -> Result<Self, Self::Error> {
|
||||
read_lsn.validate()?;
|
||||
Ok(Self {
|
||||
request_lsn: read_lsn.request_lsn.0,
|
||||
not_modified_since_lsn: read_lsn.not_modified_since_lsn.unwrap_or_default().0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// RelTag is defined in pageserver_api::reltag.
|
||||
pub type RelTag = pageserver_api::reltag::RelTag;
|
||||
|
||||
impl TryFrom<proto::RelTag> for RelTag {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::RelTag) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
spcnode: pb.spc_oid,
|
||||
dbnode: pb.db_oid,
|
||||
relnode: pb.rel_number,
|
||||
forknum: pb
|
||||
.fork_number
|
||||
.try_into()
|
||||
.map_err(|_| ProtocolError::invalid("fork_number", pb.fork_number))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RelTag> for proto::RelTag {
|
||||
fn from(rel_tag: RelTag) -> Self {
|
||||
Self {
|
||||
spc_oid: rel_tag.spcnode,
|
||||
db_oid: rel_tag.dbnode,
|
||||
rel_number: rel_tag.relnode,
|
||||
fork_number: rel_tag.forknum as u32,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct CheckRelExistsRequest {
|
||||
pub read_lsn: ReadLsn,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::CheckRelExistsRequest> for CheckRelExistsRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::CheckRelExistsRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type CheckRelExistsResponse = bool;
|
||||
|
||||
impl From<proto::CheckRelExistsResponse> for CheckRelExistsResponse {
|
||||
fn from(pb: proto::CheckRelExistsResponse) -> Self {
|
||||
pb.exists
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
|
||||
fn from(exists: CheckRelExistsResponse) -> Self {
|
||||
Self { exists }
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests a base backup at a given LSN.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct GetBaseBackupRequest {
|
||||
/// The LSN to fetch a base backup at.
|
||||
pub read_lsn: ReadLsn,
|
||||
/// If true, logical replication slots will not be created.
|
||||
pub replica: bool,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
replica: pb.replica,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(request: GetBaseBackupRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: Some(request.read_lsn.try_into()?),
|
||||
replica: request.replica,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type GetBaseBackupResponseChunk = Bytes;
|
||||
|
||||
impl TryFrom<proto::GetBaseBackupResponseChunk> for GetBaseBackupResponseChunk {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetBaseBackupResponseChunk) -> Result<Self, Self::Error> {
|
||||
if pb.chunk.is_empty() {
|
||||
return Err(ProtocolError::Missing("chunk"));
|
||||
}
|
||||
Ok(pb.chunk)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetBaseBackupResponseChunk> for proto::GetBaseBackupResponseChunk {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(chunk: GetBaseBackupResponseChunk) -> Result<Self, Self::Error> {
|
||||
if chunk.is_empty() {
|
||||
return Err(ProtocolError::Missing("chunk"));
|
||||
}
|
||||
Ok(Self { chunk })
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests the size of a database, as # of bytes. Only valid on shard 0, other shards will error.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct GetDbSizeRequest {
|
||||
pub read_lsn: ReadLsn,
|
||||
pub db_oid: Oid,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::GetDbSizeRequest> for GetDbSizeRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetDbSizeRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
db_oid: pb.db_oid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetDbSizeRequest> for proto::GetDbSizeRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(request: GetDbSizeRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: Some(request.read_lsn.try_into()?),
|
||||
db_oid: request.db_oid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type GetDbSizeResponse = u64;
|
||||
|
||||
impl From<proto::GetDbSizeResponse> for GetDbSizeResponse {
|
||||
fn from(pb: proto::GetDbSizeResponse) -> Self {
|
||||
pb.num_bytes
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetDbSizeResponse> for proto::GetDbSizeResponse {
|
||||
fn from(num_bytes: GetDbSizeResponse) -> Self {
|
||||
Self { num_bytes }
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests one or more pages.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct GetPageRequest {
|
||||
/// A request ID. Will be included in the response. Should be unique for in-flight requests on
|
||||
/// the stream.
|
||||
pub request_id: RequestID,
|
||||
/// The request class.
|
||||
pub request_class: GetPageClass,
|
||||
/// The LSN to read at.
|
||||
pub read_lsn: ReadLsn,
|
||||
/// The relation to read from.
|
||||
pub rel: RelTag,
|
||||
/// Page numbers to read. Must belong to the remote shard.
|
||||
///
|
||||
/// Multiple pages will be executed as a single batch by the Pageserver, amortizing layer access
|
||||
/// costs and parallelizing them. This may increase the latency of any individual request, but
|
||||
/// improves the overall latency and throughput of the batch as a whole.
|
||||
pub block_numbers: SmallVec<[u32; 1]>,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::GetPageRequest> for GetPageRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetPageRequest) -> Result<Self, Self::Error> {
|
||||
if pb.block_number.is_empty() {
|
||||
return Err(ProtocolError::Missing("block_number"));
|
||||
}
|
||||
Ok(Self {
|
||||
request_id: pb.request_id,
|
||||
request_class: pb.request_class.into(),
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
|
||||
block_numbers: pb.block_number.into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetPageRequest> for proto::GetPageRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(request: GetPageRequest) -> Result<Self, Self::Error> {
|
||||
if request.block_numbers.is_empty() {
|
||||
return Err(ProtocolError::Missing("block_number"));
|
||||
}
|
||||
Ok(Self {
|
||||
request_id: request.request_id,
|
||||
request_class: request.request_class.into(),
|
||||
read_lsn: Some(request.read_lsn.try_into()?),
|
||||
rel: Some(request.rel.into()),
|
||||
block_number: request.block_numbers.into_vec(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A GetPage request ID.
|
||||
pub type RequestID = u64;
|
||||
|
||||
/// A GetPage request class.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum GetPageClass {
|
||||
/// Unknown class. For backwards compatibility: used when an older client version sends a class
|
||||
/// that a newer server version has removed.
|
||||
Unknown,
|
||||
/// A normal request. This is the default.
|
||||
Normal,
|
||||
/// A prefetch request. NB: can only be classified on pg < 18.
|
||||
Prefetch,
|
||||
/// A background request (e.g. vacuum).
|
||||
Background,
|
||||
}
|
||||
|
||||
impl From<proto::GetPageClass> for GetPageClass {
|
||||
fn from(pb: proto::GetPageClass) -> Self {
|
||||
match pb {
|
||||
proto::GetPageClass::Unknown => Self::Unknown,
|
||||
proto::GetPageClass::Normal => Self::Normal,
|
||||
proto::GetPageClass::Prefetch => Self::Prefetch,
|
||||
proto::GetPageClass::Background => Self::Background,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<i32> for GetPageClass {
|
||||
fn from(class: i32) -> Self {
|
||||
proto::GetPageClass::try_from(class)
|
||||
.unwrap_or(proto::GetPageClass::Unknown)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetPageClass> for proto::GetPageClass {
|
||||
fn from(class: GetPageClass) -> Self {
|
||||
match class {
|
||||
GetPageClass::Unknown => Self::Unknown,
|
||||
GetPageClass::Normal => Self::Normal,
|
||||
GetPageClass::Prefetch => Self::Prefetch,
|
||||
GetPageClass::Background => Self::Background,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetPageClass> for i32 {
|
||||
fn from(class: GetPageClass) -> Self {
|
||||
proto::GetPageClass::from(class).into()
|
||||
}
|
||||
}
|
||||
|
||||
/// A GetPage response.
|
||||
///
|
||||
/// A batch response will contain all of the requested pages. We could eagerly emit individual pages
|
||||
/// as soon as they are ready, but on a readv() Postgres holds buffer pool locks on all pages in the
|
||||
/// batch and we'll only return once the entire batch is ready, so no one can make use of the
|
||||
/// individual pages.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct GetPageResponse {
|
||||
/// The original request's ID.
|
||||
pub request_id: RequestID,
|
||||
/// The response status code.
|
||||
pub status_code: GetPageStatusCode,
|
||||
/// A string describing the status, if any.
|
||||
pub reason: Option<String>,
|
||||
/// The 8KB page images, in the same order as the request. Empty if status != OK.
|
||||
pub page_images: SmallVec<[Bytes; 1]>,
|
||||
}
|
||||
|
||||
impl From<proto::GetPageResponse> for GetPageResponse {
|
||||
fn from(pb: proto::GetPageResponse) -> Self {
|
||||
Self {
|
||||
request_id: pb.request_id,
|
||||
status_code: pb.status_code.into(),
|
||||
reason: Some(pb.reason).filter(|r| !r.is_empty()),
|
||||
page_images: pb.page_image.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetPageResponse> for proto::GetPageResponse {
|
||||
fn from(response: GetPageResponse) -> Self {
|
||||
Self {
|
||||
request_id: response.request_id,
|
||||
status_code: response.status_code.into(),
|
||||
reason: response.reason.unwrap_or_default(),
|
||||
page_image: response.page_images.into_vec(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A GetPage response status code.
|
||||
///
|
||||
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
|
||||
/// (potentially shared by many backends), and a gRPC status response would terminate the stream so
|
||||
/// we send GetPageResponse messages with these codes instead.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum GetPageStatusCode {
|
||||
/// Unknown status. For forwards compatibility: used when an older client version receives a new
|
||||
/// status code from a newer server version.
|
||||
Unknown,
|
||||
/// The request was successful.
|
||||
Ok,
|
||||
/// The page did not exist. The tenant/timeline/shard has already been validated during stream
|
||||
/// setup.
|
||||
NotFound,
|
||||
/// The request was invalid.
|
||||
InvalidRequest,
|
||||
/// The request failed due to an internal server error.
|
||||
InternalError,
|
||||
/// The tenant is rate limited. Slow down and retry later.
|
||||
SlowDown,
|
||||
}
|
||||
|
||||
impl From<proto::GetPageStatusCode> for GetPageStatusCode {
|
||||
fn from(pb: proto::GetPageStatusCode) -> Self {
|
||||
match pb {
|
||||
proto::GetPageStatusCode::Unknown => Self::Unknown,
|
||||
proto::GetPageStatusCode::Ok => Self::Ok,
|
||||
proto::GetPageStatusCode::NotFound => Self::NotFound,
|
||||
proto::GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
|
||||
proto::GetPageStatusCode::InternalError => Self::InternalError,
|
||||
proto::GetPageStatusCode::SlowDown => Self::SlowDown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<i32> for GetPageStatusCode {
|
||||
fn from(status_code: i32) -> Self {
|
||||
proto::GetPageStatusCode::try_from(status_code)
|
||||
.unwrap_or(proto::GetPageStatusCode::Unknown)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetPageStatusCode> for proto::GetPageStatusCode {
|
||||
fn from(status_code: GetPageStatusCode) -> Self {
|
||||
match status_code {
|
||||
GetPageStatusCode::Unknown => Self::Unknown,
|
||||
GetPageStatusCode::Ok => Self::Ok,
|
||||
GetPageStatusCode::NotFound => Self::NotFound,
|
||||
GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
|
||||
GetPageStatusCode::InternalError => Self::InternalError,
|
||||
GetPageStatusCode::SlowDown => Self::SlowDown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetPageStatusCode> for i32 {
|
||||
fn from(status_code: GetPageStatusCode) -> Self {
|
||||
proto::GetPageStatusCode::from(status_code).into()
|
||||
}
|
||||
}
|
||||
|
||||
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
|
||||
// shards will error.
|
||||
pub struct GetRelSizeRequest {
|
||||
pub read_lsn: ReadLsn,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(proto: proto::GetRelSizeRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: proto
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetRelSizeRequest> for proto::GetRelSizeRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(request: GetRelSizeRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: Some(request.read_lsn.try_into()?),
|
||||
rel: Some(request.rel.into()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type GetRelSizeResponse = u32;
|
||||
|
||||
impl From<proto::GetRelSizeResponse> for GetRelSizeResponse {
|
||||
fn from(proto: proto::GetRelSizeResponse) -> Self {
|
||||
proto.num_blocks
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
|
||||
fn from(num_blocks: GetRelSizeResponse) -> Self {
|
||||
Self { num_blocks }
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests an SLRU segment. Only valid on shard 0, other shards will error.
|
||||
pub struct GetSlruSegmentRequest {
|
||||
pub read_lsn: ReadLsn,
|
||||
pub kind: SlruKind,
|
||||
pub segno: u32,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::GetSlruSegmentRequest> for GetSlruSegmentRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetSlruSegmentRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
kind: u8::try_from(pb.kind)
|
||||
.ok()
|
||||
.and_then(SlruKind::from_repr)
|
||||
.ok_or_else(|| ProtocolError::invalid("slru_kind", pb.kind))?,
|
||||
segno: pb.segno,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetSlruSegmentRequest> for proto::GetSlruSegmentRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(request: GetSlruSegmentRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: Some(request.read_lsn.try_into()?),
|
||||
kind: request.kind as u32,
|
||||
segno: request.segno,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type GetSlruSegmentResponse = Bytes;
|
||||
|
||||
impl TryFrom<proto::GetSlruSegmentResponse> for GetSlruSegmentResponse {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetSlruSegmentResponse) -> Result<Self, Self::Error> {
|
||||
if pb.segment.is_empty() {
|
||||
return Err(ProtocolError::Missing("segment"));
|
||||
}
|
||||
Ok(pb.segment)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(segment: GetSlruSegmentResponse) -> Result<Self, Self::Error> {
|
||||
if segment.is_empty() {
|
||||
return Err(ProtocolError::Missing("segment"));
|
||||
}
|
||||
Ok(Self { segment })
|
||||
}
|
||||
}
|
||||
|
||||
// SlruKind is defined in pageserver_api::reltag.
|
||||
pub type SlruKind = pageserver_api::reltag::SlruKind;
|
||||
@@ -21,7 +21,6 @@ use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields};
|
||||
use pageserver::controller_upcall_client::StorageControllerUpcallClient;
|
||||
use pageserver::deletion_queue::DeletionQueue;
|
||||
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
|
||||
use pageserver::feature_resolver::FeatureResolver;
|
||||
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
|
||||
use pageserver::task_mgr::{
|
||||
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
|
||||
@@ -389,30 +388,23 @@ fn start_pageserver(
|
||||
// We need to release the lock file only when the process exits.
|
||||
std::mem::forget(lock_file);
|
||||
|
||||
// Bind the HTTP, libpq, and gRPC ports early, to error out if they are
|
||||
// already in use.
|
||||
info!(
|
||||
"Starting pageserver http handler on {} with auth {:#?}",
|
||||
conf.listen_http_addr, conf.http_auth_type
|
||||
);
|
||||
let http_listener = tcp_listener::bind(&conf.listen_http_addr)?;
|
||||
// Bind the HTTP and libpq ports early, so that if they are in use by some other
|
||||
// process, we error out early.
|
||||
let http_addr = &conf.listen_http_addr;
|
||||
info!("Starting pageserver http handler on {http_addr}");
|
||||
let http_listener = tcp_listener::bind(http_addr)?;
|
||||
|
||||
let https_listener = match conf.listen_https_addr.as_ref() {
|
||||
Some(https_addr) => {
|
||||
info!(
|
||||
"Starting pageserver https handler on {https_addr} with auth {:#?}",
|
||||
conf.http_auth_type
|
||||
);
|
||||
info!("Starting pageserver https handler on {https_addr}");
|
||||
Some(tcp_listener::bind(https_addr)?)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
info!(
|
||||
"Starting pageserver pg protocol handler on {} with auth {:#?}",
|
||||
conf.listen_pg_addr, conf.pg_auth_type,
|
||||
);
|
||||
let pageserver_listener = tcp_listener::bind(&conf.listen_pg_addr)?;
|
||||
let pg_addr = &conf.listen_pg_addr;
|
||||
info!("Starting pageserver pg protocol handler on {pg_addr}");
|
||||
let pageserver_listener = tcp_listener::bind(pg_addr)?;
|
||||
|
||||
// Enable SO_KEEPALIVE on the socket, to detect dead connections faster.
|
||||
// These are configured via net.ipv4.tcp_keepalive_* sysctls.
|
||||
@@ -421,15 +413,6 @@ fn start_pageserver(
|
||||
// support enabling keepalives while using the default OS sysctls.
|
||||
setsockopt(&pageserver_listener, sockopt::KeepAlive, &true)?;
|
||||
|
||||
let mut grpc_listener = None;
|
||||
if let Some(grpc_addr) = &conf.listen_grpc_addr {
|
||||
info!(
|
||||
"Starting pageserver gRPC handler on {grpc_addr} with auth {:#?}",
|
||||
conf.grpc_auth_type
|
||||
);
|
||||
grpc_listener = Some(tcp_listener::bind(grpc_addr).map_err(|e| anyhow!("{e}"))?);
|
||||
}
|
||||
|
||||
// Launch broker client
|
||||
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
|
||||
let broker_client = WALRECEIVER_RUNTIME
|
||||
@@ -457,8 +440,7 @@ fn start_pageserver(
|
||||
// Initialize authentication for incoming connections
|
||||
let http_auth;
|
||||
let pg_auth;
|
||||
let grpc_auth;
|
||||
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type].contains(&AuthType::NeonJWT) {
|
||||
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
|
||||
// unwrap is ok because check is performed when creating config, so path is set and exists
|
||||
let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
|
||||
info!("Loading public key(s) for verifying JWT tokens from {key_path:?}");
|
||||
@@ -466,23 +448,20 @@ fn start_pageserver(
|
||||
let jwt_auth = JwtAuth::from_key_path(key_path)?;
|
||||
let auth: Arc<SwappableJwtAuth> = Arc::new(SwappableJwtAuth::new(jwt_auth));
|
||||
|
||||
http_auth = match conf.http_auth_type {
|
||||
http_auth = match &conf.http_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth.clone()),
|
||||
};
|
||||
pg_auth = match conf.pg_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth.clone()),
|
||||
};
|
||||
grpc_auth = match conf.grpc_auth_type {
|
||||
pg_auth = match &conf.pg_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth),
|
||||
};
|
||||
} else {
|
||||
http_auth = None;
|
||||
pg_auth = None;
|
||||
grpc_auth = None;
|
||||
}
|
||||
info!("Using auth for http API: {:#?}", conf.http_auth_type);
|
||||
info!("Using auth for pg connections: {:#?}", conf.pg_auth_type);
|
||||
|
||||
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api
|
||||
{
|
||||
@@ -523,12 +502,6 @@ fn start_pageserver(
|
||||
// Set up remote storage client
|
||||
let remote_storage = BACKGROUND_RUNTIME.block_on(create_remote_storage_client(conf))?;
|
||||
|
||||
let feature_resolver = create_feature_resolver(
|
||||
conf,
|
||||
shutdown_pageserver.clone(),
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
)?;
|
||||
|
||||
// Set up deletion queue
|
||||
let (deletion_queue, deletion_workers) = DeletionQueue::new(
|
||||
remote_storage.clone(),
|
||||
@@ -582,7 +555,6 @@ fn start_pageserver(
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
feature_resolver,
|
||||
},
|
||||
order,
|
||||
shutdown_pageserver.clone(),
|
||||
@@ -804,27 +776,9 @@ fn start_pageserver(
|
||||
} else {
|
||||
None
|
||||
},
|
||||
basebackup_cache.clone(),
|
||||
basebackup_cache,
|
||||
);
|
||||
|
||||
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for
|
||||
// each stream/request.
|
||||
//
|
||||
// TODO: this uses a separate Tokio runtime for the page service. If we want
|
||||
// other gRPC services, they will need their own port and runtime. Is this
|
||||
// necessary?
|
||||
let mut page_service_grpc = None;
|
||||
if let Some(grpc_listener) = grpc_listener {
|
||||
page_service_grpc = Some(page_service::spawn_grpc(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
grpc_auth,
|
||||
otel_guard.as_ref().map(|g| g.dispatch.clone()),
|
||||
grpc_listener,
|
||||
basebackup_cache,
|
||||
)?);
|
||||
}
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
BACKGROUND_RUNTIME.block_on(async move {
|
||||
let signal_token = CancellationToken::new();
|
||||
@@ -843,7 +797,6 @@ fn start_pageserver(
|
||||
http_endpoint_listener,
|
||||
https_endpoint_listener,
|
||||
page_service,
|
||||
page_service_grpc,
|
||||
consumption_metrics_tasks,
|
||||
disk_usage_eviction_task,
|
||||
&tenant_manager,
|
||||
@@ -857,14 +810,6 @@ fn start_pageserver(
|
||||
})
|
||||
}
|
||||
|
||||
fn create_feature_resolver(
|
||||
conf: &'static PageServerConf,
|
||||
shutdown_pageserver: CancellationToken,
|
||||
handle: &tokio::runtime::Handle,
|
||||
) -> anyhow::Result<FeatureResolver> {
|
||||
FeatureResolver::spawn(conf, shutdown_pageserver, handle)
|
||||
}
|
||||
|
||||
async fn create_remote_storage_client(
|
||||
conf: &'static PageServerConf,
|
||||
) -> anyhow::Result<GenericRemoteStorage> {
|
||||
|
||||
@@ -14,7 +14,7 @@ use std::time::Duration;
|
||||
use anyhow::{Context, bail, ensure};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes, PostHogConfig};
|
||||
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pem::Pem;
|
||||
@@ -58,16 +58,11 @@ pub struct PageServerConf {
|
||||
pub listen_http_addr: String,
|
||||
/// Example: 127.0.0.1:9899
|
||||
pub listen_https_addr: Option<String>,
|
||||
/// If set, expose a gRPC API on this address.
|
||||
/// Example: 127.0.0.1:51051
|
||||
///
|
||||
/// EXPERIMENTAL: this protocol is unstable and under active development.
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
|
||||
/// Path to a file with certificate's private key for https and gRPC API.
|
||||
/// Path to a file with certificate's private key for https API.
|
||||
/// Default: server.key
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
/// Path to a file with a X509 certificate for https and gRPC API.
|
||||
/// Path to a file with a X509 certificate for https API.
|
||||
/// Default: server.crt
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
/// Period to reload certificate and private key from files.
|
||||
@@ -105,8 +100,6 @@ pub struct PageServerConf {
|
||||
pub http_auth_type: AuthType,
|
||||
/// authentication method for libpq connections from compute
|
||||
pub pg_auth_type: AuthType,
|
||||
/// authentication method for gRPC connections from compute
|
||||
pub grpc_auth_type: AuthType,
|
||||
/// Path to a file or directory containing public key(s) for verifying JWT tokens.
|
||||
/// Used for both mgmt and compute auth, if enabled.
|
||||
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
|
||||
@@ -238,9 +231,6 @@ pub struct PageServerConf {
|
||||
/// This is insecure and should only be used in development environments.
|
||||
pub dev_mode: bool,
|
||||
|
||||
/// PostHog integration config.
|
||||
pub posthog_config: Option<PostHogConfig>,
|
||||
|
||||
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
|
||||
|
||||
pub basebackup_cache_config: Option<pageserver_api::config::BasebackupCacheConfig>,
|
||||
@@ -365,7 +355,6 @@ impl PageServerConf {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
ssl_key_file,
|
||||
ssl_cert_file,
|
||||
ssl_cert_reload_period,
|
||||
@@ -380,7 +369,6 @@ impl PageServerConf {
|
||||
pg_distrib_dir,
|
||||
http_auth_type,
|
||||
pg_auth_type,
|
||||
grpc_auth_type,
|
||||
auth_validation_public_key_path,
|
||||
remote_storage,
|
||||
broker_endpoint,
|
||||
@@ -424,7 +412,6 @@ impl PageServerConf {
|
||||
tracing,
|
||||
enable_tls_page_service_api,
|
||||
dev_mode,
|
||||
posthog_config,
|
||||
timeline_import_config,
|
||||
basebackup_cache_config,
|
||||
} = config_toml;
|
||||
@@ -436,7 +423,6 @@ impl PageServerConf {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
ssl_key_file,
|
||||
ssl_cert_file,
|
||||
ssl_cert_reload_period,
|
||||
@@ -449,7 +435,6 @@ impl PageServerConf {
|
||||
max_file_descriptors,
|
||||
http_auth_type,
|
||||
pg_auth_type,
|
||||
grpc_auth_type,
|
||||
auth_validation_public_key_path,
|
||||
remote_storage_config: remote_storage,
|
||||
broker_endpoint,
|
||||
@@ -540,16 +525,13 @@ impl PageServerConf {
|
||||
}
|
||||
None => Vec::new(),
|
||||
},
|
||||
posthog_config,
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// custom validation code that covers more than one field in isolation
|
||||
// ------------------------------------------------------------
|
||||
|
||||
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type]
|
||||
.contains(&AuthType::NeonJWT)
|
||||
{
|
||||
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
|
||||
let auth_validation_public_key_path = conf
|
||||
.auth_validation_public_key_path
|
||||
.get_or_insert_with(|| workdir.join("auth_public_key.pem"));
|
||||
|
||||
@@ -1,94 +0,0 @@
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
|
||||
use posthog_client_lite::{
|
||||
FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TenantId;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FeatureResolver {
|
||||
inner: Option<Arc<FeatureResolverBackgroundLoop>>,
|
||||
}
|
||||
|
||||
impl FeatureResolver {
|
||||
pub fn new_disabled() -> Self {
|
||||
Self { inner: None }
|
||||
}
|
||||
|
||||
pub fn spawn(
|
||||
conf: &PageServerConf,
|
||||
shutdown_pageserver: CancellationToken,
|
||||
handle: &tokio::runtime::Handle,
|
||||
) -> anyhow::Result<Self> {
|
||||
// DO NOT block in this function: make it return as fast as possible to avoid startup delays.
|
||||
if let Some(posthog_config) = &conf.posthog_config {
|
||||
let inner = FeatureResolverBackgroundLoop::new(
|
||||
PostHogClientConfig {
|
||||
server_api_key: posthog_config.server_api_key.clone(),
|
||||
client_api_key: posthog_config.client_api_key.clone(),
|
||||
project_id: posthog_config.project_id.clone(),
|
||||
private_api_url: posthog_config.private_api_url.clone(),
|
||||
public_api_url: posthog_config.public_api_url.clone(),
|
||||
},
|
||||
shutdown_pageserver,
|
||||
);
|
||||
let inner = Arc::new(inner);
|
||||
// TODO: make this configurable
|
||||
inner.clone().spawn(handle, Duration::from_secs(60));
|
||||
Ok(FeatureResolver { inner: Some(inner) })
|
||||
} else {
|
||||
Ok(FeatureResolver { inner: None })
|
||||
}
|
||||
}
|
||||
|
||||
/// Evaluate a multivariate feature flag. Currently, we do not support any properties.
|
||||
///
|
||||
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
|
||||
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
|
||||
/// propagated beyond where the feature flag gets resolved.
|
||||
pub fn evaluate_multivariate(
|
||||
&self,
|
||||
flag_key: &str,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<String, PostHogEvaluationError> {
|
||||
if let Some(inner) = &self.inner {
|
||||
inner.feature_store().evaluate_multivariate(
|
||||
flag_key,
|
||||
&tenant_id.to_string(),
|
||||
&HashMap::new(),
|
||||
)
|
||||
} else {
|
||||
Err(PostHogEvaluationError::NotAvailable(
|
||||
"PostHog integration is not enabled".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Evaluate a boolean feature flag. Currently, we do not support any properties.
|
||||
///
|
||||
/// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error.
|
||||
///
|
||||
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
|
||||
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
|
||||
/// propagated beyond where the feature flag gets resolved.
|
||||
pub fn evaluate_boolean(
|
||||
&self,
|
||||
flag_key: &str,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), PostHogEvaluationError> {
|
||||
if let Some(inner) = &self.inner {
|
||||
inner.feature_store().evaluate_boolean(
|
||||
flag_key,
|
||||
&tenant_id.to_string(),
|
||||
&HashMap::new(),
|
||||
)
|
||||
} else {
|
||||
Err(PostHogEvaluationError::NotAvailable(
|
||||
"PostHog integration is not enabled".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -353,33 +353,6 @@ paths:
|
||||
"200":
|
||||
description: OK
|
||||
|
||||
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/mark_invisible:
|
||||
parameters:
|
||||
- name: tenant_shard_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: timeline_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
put:
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
is_visible:
|
||||
type: boolean
|
||||
default: false
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
|
||||
/v1/tenant/{tenant_shard_id}/location_config:
|
||||
parameters:
|
||||
- name: tenant_shard_id
|
||||
@@ -653,8 +626,6 @@ paths:
|
||||
format: hex
|
||||
pg_version:
|
||||
type: integer
|
||||
read_only:
|
||||
type: boolean
|
||||
existing_initdb_timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
@@ -370,18 +370,6 @@ impl From<crate::tenant::secondary::SecondaryTenantError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::FinalizeTimelineImportError> for ApiError {
|
||||
fn from(err: crate::tenant::FinalizeTimelineImportError) -> ApiError {
|
||||
use crate::tenant::FinalizeTimelineImportError::*;
|
||||
match err {
|
||||
ImportTaskStillRunning => {
|
||||
ApiError::ResourceUnavailable("Import task still running".into())
|
||||
}
|
||||
ShuttingDown => ApiError::ShuttingDown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to construct a TimelineInfo struct for a timeline
|
||||
async fn build_timeline_info(
|
||||
timeline: &Arc<Timeline>,
|
||||
@@ -584,7 +572,6 @@ async fn timeline_create_handler(
|
||||
TimelineCreateRequestMode::Branch {
|
||||
ancestor_timeline_id,
|
||||
ancestor_start_lsn,
|
||||
read_only: _,
|
||||
pg_version: _,
|
||||
} => tenant::CreateTimelineParams::Branch(tenant::CreateTimelineParamsBranch {
|
||||
new_timeline_id,
|
||||
@@ -3545,7 +3532,10 @@ async fn activate_post_import_handler(
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
tenant.finalize_importing_timeline(timeline_id).await?;
|
||||
tenant
|
||||
.finalize_importing_timeline(timeline_id)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
match tenant.get_timeline(timeline_id, false) {
|
||||
Ok(_timeline) => {
|
||||
|
||||
@@ -10,7 +10,6 @@ pub mod context;
|
||||
pub mod controller_upcall_client;
|
||||
pub mod deletion_queue;
|
||||
pub mod disk_usage_eviction_task;
|
||||
pub mod feature_resolver;
|
||||
pub mod http;
|
||||
pub mod import_datadir;
|
||||
pub mod l0_flush;
|
||||
@@ -85,7 +84,6 @@ pub async fn shutdown_pageserver(
|
||||
http_listener: HttpEndpointListener,
|
||||
https_listener: Option<HttpsEndpointListener>,
|
||||
page_service: page_service::Listener,
|
||||
grpc_task: Option<CancellableTask>,
|
||||
consumption_metrics_worker: ConsumptionMetricsTasks,
|
||||
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
|
||||
tenant_manager: &TenantManager,
|
||||
@@ -179,16 +177,6 @@ pub async fn shutdown_pageserver(
|
||||
)
|
||||
.await;
|
||||
|
||||
// Shut down the gRPC server task, including request handlers.
|
||||
if let Some(grpc_task) = grpc_task {
|
||||
timed(
|
||||
grpc_task.shutdown(),
|
||||
"shutdown gRPC PageRequestHandler",
|
||||
Duration::from_secs(3),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Shut down all the tenants. This flushes everything to disk and kills
|
||||
// the checkpoint and GC tasks.
|
||||
timed(
|
||||
|
||||
@@ -2234,10 +2234,8 @@ impl BasebackupQueryTimeOngoingRecording<'_> {
|
||||
// If you want to change categorize of a specific error, also change it in `log_query_error`.
|
||||
let metric = match res {
|
||||
Ok(_) => &self.parent.ok,
|
||||
Err(QueryError::Shutdown) | Err(QueryError::Reconnect) => {
|
||||
// Do not observe ok/err for shutdown/reconnect.
|
||||
// Reconnect error might be raised when the operation is waiting for LSN and the tenant shutdown interrupts
|
||||
// the operation. A reconnect error will be issued and the client will retry.
|
||||
Err(QueryError::Shutdown) => {
|
||||
// Do not observe ok/err for shutdown
|
||||
return;
|
||||
}
|
||||
Err(QueryError::Disconnected(ConnectionError::Io(io_error)))
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
use std::borrow::Cow;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
@@ -13,7 +12,7 @@ use std::{io, str};
|
||||
use anyhow::{Context, bail};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
use futures::{FutureExt, Stream};
|
||||
use futures::FutureExt;
|
||||
use itertools::Itertools;
|
||||
use jsonwebtoken::TokenData;
|
||||
use once_cell::sync::OnceCell;
|
||||
@@ -31,7 +30,6 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_page_api::proto;
|
||||
use postgres_backend::{
|
||||
AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
|
||||
};
|
||||
@@ -43,20 +41,19 @@ use strum_macros::IntoStaticStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tonic::service::Interceptor as _;
|
||||
use tracing::*;
|
||||
use utils::auth::{Claims, Scope, SwappableJwtAuth};
|
||||
use utils::failpoint_support;
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::logging::log_slow;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::ShardIndex;
|
||||
use utils::simple_rcu::RcuReadGuard;
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::sync::spsc_fold;
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup::{self, BasebackupError};
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::basebackup_cache::BasebackupCache;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
@@ -78,7 +75,7 @@ use crate::tenant::mgr::{
|
||||
use crate::tenant::storage_layer::IoConcurrency;
|
||||
use crate::tenant::timeline::{self, WaitLsnError};
|
||||
use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
|
||||
use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
|
||||
use crate::{basebackup, timed_after_cancellation};
|
||||
|
||||
/// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which
|
||||
/// is not yet in state [`TenantState::Active`].
|
||||
@@ -89,26 +86,6 @@ const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
|
||||
/// Threshold at which to log slow GetPage requests.
|
||||
const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
|
||||
|
||||
/// The idle time before sending TCP keepalive probes for gRPC connections. The
|
||||
/// interval and timeout between each probe is configured via sysctl. This
|
||||
/// allows detecting dead connections sooner.
|
||||
const GRPC_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's
|
||||
/// algorithm, which can cause latency spikes for small messages.
|
||||
const GRPC_TCP_NODELAY: bool = true;
|
||||
|
||||
/// The interval between HTTP2 keepalive pings. This allows shutting down server
|
||||
/// tasks when clients are unresponsive.
|
||||
const GRPC_HTTP2_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
/// The timeout for HTTP2 keepalive pings. Should be <= GRPC_KEEPALIVE_INTERVAL.
|
||||
const GRPC_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
|
||||
/// Number of concurrent gRPC streams per TCP connection. We expect something
|
||||
/// like 8 GetPage streams per connections, plus any unary requests.
|
||||
const GRPC_MAX_CONCURRENT_STREAMS: u32 = 256;
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
pub struct Listener {
|
||||
@@ -163,94 +140,6 @@ pub fn spawn(
|
||||
Listener { cancel, task }
|
||||
}
|
||||
|
||||
/// Spawns a gRPC server for the page service.
|
||||
///
|
||||
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
|
||||
/// need to reimplement the TCP+TLS accept loop ourselves.
|
||||
pub fn spawn_grpc(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
listener: std::net::TcpListener,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
) -> anyhow::Result<CancellableTask> {
|
||||
let cancel = CancellationToken::new();
|
||||
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.perf_span_dispatch(perf_trace_dispatch)
|
||||
.detached_child();
|
||||
let gate = Gate::default();
|
||||
|
||||
// Set up the TCP socket. We take a preconfigured TcpListener to bind the
|
||||
// port early during startup.
|
||||
let incoming = {
|
||||
let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
|
||||
listener.set_nonblocking(true)?;
|
||||
tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(listener)?)
|
||||
.with_nodelay(Some(GRPC_TCP_NODELAY))
|
||||
.with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
|
||||
};
|
||||
|
||||
// Set up the gRPC server.
|
||||
//
|
||||
// TODO: consider tuning window sizes.
|
||||
// TODO: wire up tracing.
|
||||
let mut server = tonic::transport::Server::builder()
|
||||
.http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
|
||||
.http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
|
||||
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
|
||||
|
||||
// Main page service.
|
||||
let page_service_handler = PageServerHandler::new(
|
||||
tenant_manager,
|
||||
auth.clone(),
|
||||
PageServicePipeliningConfig::Serial, // TODO: unused with gRPC
|
||||
conf.get_vectored_concurrent_io,
|
||||
ConnectionPerfSpanFields::default(),
|
||||
basebackup_cache,
|
||||
ctx,
|
||||
cancel.clone(),
|
||||
gate.enter().expect("just created"),
|
||||
);
|
||||
|
||||
let mut tenant_interceptor = TenantMetadataInterceptor;
|
||||
let mut auth_interceptor = TenantAuthInterceptor::new(auth);
|
||||
let interceptors = move |mut req: tonic::Request<()>| {
|
||||
req = tenant_interceptor.call(req)?;
|
||||
req = auth_interceptor.call(req)?;
|
||||
Ok(req)
|
||||
};
|
||||
|
||||
let page_service =
|
||||
proto::PageServiceServer::with_interceptor(page_service_handler, interceptors);
|
||||
let server = server.add_service(page_service);
|
||||
|
||||
// Reflection service for use with e.g. grpcurl.
|
||||
let reflection_service = tonic_reflection::server::Builder::configure()
|
||||
.register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
|
||||
.build_v1()?;
|
||||
let server = server.add_service(reflection_service);
|
||||
|
||||
// Spawn server task.
|
||||
let task_cancel = cancel.clone();
|
||||
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
|
||||
"grpc listener",
|
||||
async move {
|
||||
let result = server
|
||||
.serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
|
||||
.await;
|
||||
if result.is_ok() {
|
||||
// TODO: revisit shutdown logic once page service is implemented.
|
||||
gate.close().await;
|
||||
}
|
||||
result
|
||||
},
|
||||
));
|
||||
|
||||
Ok(CancellableTask { task, cancel })
|
||||
}
|
||||
|
||||
impl Listener {
|
||||
pub async fn stop_accepting(self) -> Connections {
|
||||
self.cancel.cancel();
|
||||
@@ -370,7 +259,7 @@ type ConnectionHandlerResult = anyhow::Result<()>;
|
||||
|
||||
/// Perf root spans start at the per-request level, after shard routing.
|
||||
/// This struct carries connection-level information to the root perf span definition.
|
||||
#[derive(Clone, Default)]
|
||||
#[derive(Clone)]
|
||||
struct ConnectionPerfSpanFields {
|
||||
peer_addr: String,
|
||||
application_name: Option<String>,
|
||||
@@ -488,11 +377,6 @@ async fn page_service_conn_main(
|
||||
}
|
||||
}
|
||||
|
||||
/// Page service connection handler.
|
||||
///
|
||||
/// TODO: for gRPC, this will be shared by all requests from all connections.
|
||||
/// Decompose it into global state and per-connection/request state, and make
|
||||
/// libpq-specific options (e.g. pipelining) separate.
|
||||
struct PageServerHandler {
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
claims: Option<Claims>,
|
||||
@@ -769,9 +653,6 @@ struct BatchedGetPageRequest {
|
||||
timer: SmgrOpTimer,
|
||||
lsn_range: LsnRange,
|
||||
ctx: RequestContext,
|
||||
// If the request is perf enabled, this contains a context
|
||||
// with a perf span tracking the time spent waiting for the executor.
|
||||
batch_wait_ctx: Option<RequestContext>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -784,7 +665,6 @@ struct BatchedTestRequest {
|
||||
/// so that we don't keep the [`Timeline::gate`] open while the batch
|
||||
/// is being built up inside the [`spsc_fold`] (pagestream pipelining).
|
||||
#[derive(IntoStaticStr)]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
enum BatchedFeMessage {
|
||||
Exists {
|
||||
span: Span,
|
||||
@@ -1302,22 +1182,6 @@ impl PageServerHandler {
|
||||
}
|
||||
};
|
||||
|
||||
let batch_wait_ctx = if ctx.has_perf_span() {
|
||||
Some(
|
||||
RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"WAIT_EXECUTOR",
|
||||
)
|
||||
})
|
||||
.attached_child(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
BatchedFeMessage::GetPage {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
@@ -1329,7 +1193,6 @@ impl PageServerHandler {
|
||||
request_lsn: req.hdr.request_lsn
|
||||
},
|
||||
ctx,
|
||||
batch_wait_ctx,
|
||||
}],
|
||||
// The executor grabs the batch when it becomes idle.
|
||||
// Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
|
||||
@@ -1485,7 +1348,7 @@ impl PageServerHandler {
|
||||
let mut flush_timers = Vec::with_capacity(handler_results.len());
|
||||
for handler_result in &mut handler_results {
|
||||
let flush_timer = match handler_result {
|
||||
Ok((_response, timer, _ctx)) => Some(
|
||||
Ok((_, timer)) => Some(
|
||||
timer
|
||||
.observe_execution_end(flushing_start_time)
|
||||
.expect("we are the first caller"),
|
||||
@@ -1505,7 +1368,7 @@ impl PageServerHandler {
|
||||
// Some handler errors cause exit from pagestream protocol.
|
||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||
for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
|
||||
let (response_msg, ctx) = match handler_result {
|
||||
let response_msg = match handler_result {
|
||||
Err(e) => match &e.err {
|
||||
PageStreamError::Shutdown => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
@@ -1530,30 +1393,15 @@ impl PageServerHandler {
|
||||
error!("error reading relation or page version: {full:#}")
|
||||
});
|
||||
|
||||
(
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
req: e.req,
|
||||
message: e.err.to_string(),
|
||||
}),
|
||||
None,
|
||||
)
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
req: e.req,
|
||||
message: e.err.to_string(),
|
||||
})
|
||||
}
|
||||
},
|
||||
Ok((response_msg, _op_timer_already_observed, ctx)) => (response_msg, Some(ctx)),
|
||||
Ok((response_msg, _op_timer_already_observed)) => response_msg,
|
||||
};
|
||||
|
||||
let ctx = ctx.map(|req_ctx| {
|
||||
RequestContextBuilder::from(&req_ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"FLUSH_RESPONSE",
|
||||
)
|
||||
})
|
||||
.attached_child()
|
||||
});
|
||||
|
||||
//
|
||||
// marshal & transmit response message
|
||||
//
|
||||
@@ -1576,17 +1424,6 @@ impl PageServerHandler {
|
||||
)),
|
||||
None => futures::future::Either::Right(flush_fut),
|
||||
};
|
||||
|
||||
let flush_fut = if let Some(req_ctx) = ctx.as_ref() {
|
||||
futures::future::Either::Left(
|
||||
flush_fut.maybe_perf_instrument(req_ctx, |current_perf_span| {
|
||||
current_perf_span.clone()
|
||||
}),
|
||||
)
|
||||
} else {
|
||||
futures::future::Either::Right(flush_fut)
|
||||
};
|
||||
|
||||
// do it while respecting cancellation
|
||||
let _: () = async move {
|
||||
tokio::select! {
|
||||
@@ -1616,7 +1453,7 @@ impl PageServerHandler {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<
|
||||
(
|
||||
Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>,
|
||||
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
|
||||
Span,
|
||||
),
|
||||
QueryError,
|
||||
@@ -1643,7 +1480,7 @@ impl PageServerHandler {
|
||||
self.handle_get_rel_exists_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map(|msg| (msg, timer))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1662,7 +1499,7 @@ impl PageServerHandler {
|
||||
self.handle_get_nblocks_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map(|msg| (msg, timer))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1709,7 +1546,7 @@ impl PageServerHandler {
|
||||
self.handle_db_size_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map(|msg| (msg, timer))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1728,7 +1565,7 @@ impl PageServerHandler {
|
||||
self.handle_get_slru_segment_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map(|msg| (msg, timer))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -2080,25 +1917,12 @@ impl PageServerHandler {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let mut batch = match batch {
|
||||
let batch = match batch {
|
||||
Ok(batch) => batch,
|
||||
Err(e) => {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
if let BatchedFeMessage::GetPage {
|
||||
pages,
|
||||
span: _,
|
||||
shard: _,
|
||||
batch_break_reason: _,
|
||||
} = &mut batch
|
||||
{
|
||||
for req in pages {
|
||||
req.batch_wait_ctx.take();
|
||||
}
|
||||
}
|
||||
|
||||
self.pagestream_handle_batched_message(
|
||||
pgb_writer,
|
||||
batch,
|
||||
@@ -2411,8 +2235,7 @@ impl PageServerHandler {
|
||||
io_concurrency: IoConcurrency,
|
||||
batch_break_reason: GetPageBatchBreakReason,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
|
||||
{
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
timeline
|
||||
@@ -2519,7 +2342,6 @@ impl PageServerHandler {
|
||||
page,
|
||||
}),
|
||||
req.timer,
|
||||
req.ctx,
|
||||
)
|
||||
})
|
||||
.map_err(|e| BatchedPageStreamError {
|
||||
@@ -2564,8 +2386,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
requests: Vec<BatchedTestRequest>,
|
||||
_ctx: &RequestContext,
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
|
||||
{
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
|
||||
// real requests would do something with the timeline
|
||||
let mut results = Vec::with_capacity(requests.len());
|
||||
for _req in requests.iter() {
|
||||
@@ -2592,10 +2413,6 @@ impl PageServerHandler {
|
||||
req: req.req.clone(),
|
||||
}),
|
||||
req.timer,
|
||||
RequestContext::new(
|
||||
TaskKind::PageRequestHandler,
|
||||
DownloadBehavior::Warn,
|
||||
),
|
||||
)
|
||||
})
|
||||
.map_err(|e| BatchedPageStreamError {
|
||||
@@ -3300,60 +3117,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements the page service over gRPC.
|
||||
///
|
||||
/// TODO: not yet implemented, all methods return unimplemented.
|
||||
#[tonic::async_trait]
|
||||
impl proto::PageService for PageServerHandler {
|
||||
type GetBaseBackupStream = Pin<
|
||||
Box<dyn Stream<Item = Result<proto::GetBaseBackupResponseChunk, tonic::Status>> + Send>,
|
||||
>;
|
||||
type GetPagesStream =
|
||||
Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
|
||||
|
||||
async fn check_rel_exists(
|
||||
&self,
|
||||
_: tonic::Request<proto::CheckRelExistsRequest>,
|
||||
) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
}
|
||||
|
||||
async fn get_base_backup(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetBaseBackupRequest>,
|
||||
) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
}
|
||||
|
||||
async fn get_db_size(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetDbSizeRequest>,
|
||||
) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
}
|
||||
|
||||
async fn get_pages(
|
||||
&self,
|
||||
_: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
|
||||
) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
}
|
||||
|
||||
async fn get_rel_size(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetRelSizeRequest>,
|
||||
) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
}
|
||||
|
||||
async fn get_slru_segment(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetSlruSegmentRequest>,
|
||||
) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTenantError> for QueryError {
|
||||
fn from(e: GetActiveTenantError) -> Self {
|
||||
match e {
|
||||
@@ -3370,104 +3133,6 @@ impl From<GetActiveTenantError> for QueryError {
|
||||
}
|
||||
}
|
||||
|
||||
/// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type
|
||||
/// TenantTimelineId and ShardIndex.
|
||||
///
|
||||
/// TODO: consider looking up the timeline handle here and storing it.
|
||||
#[derive(Clone)]
|
||||
struct TenantMetadataInterceptor;
|
||||
|
||||
impl tonic::service::Interceptor for TenantMetadataInterceptor {
|
||||
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
|
||||
// Decode the tenant ID.
|
||||
let tenant_id = req
|
||||
.metadata()
|
||||
.get("neon-tenant-id")
|
||||
.ok_or_else(|| tonic::Status::invalid_argument("missing neon-tenant-id"))?
|
||||
.to_str()
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
|
||||
let tenant_id = TenantId::from_str(tenant_id)
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
|
||||
|
||||
// Decode the timeline ID.
|
||||
let timeline_id = req
|
||||
.metadata()
|
||||
.get("neon-timeline-id")
|
||||
.ok_or_else(|| tonic::Status::invalid_argument("missing neon-timeline-id"))?
|
||||
.to_str()
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
|
||||
let timeline_id = TimelineId::from_str(timeline_id)
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
|
||||
|
||||
// Decode the shard ID.
|
||||
let shard_index = req
|
||||
.metadata()
|
||||
.get("neon-shard-id")
|
||||
.ok_or_else(|| tonic::Status::invalid_argument("missing neon-shard-id"))?
|
||||
.to_str()
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
|
||||
let shard_index = ShardIndex::from_str(shard_index)
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
|
||||
|
||||
// Stash them in the request.
|
||||
let extensions = req.extensions_mut();
|
||||
extensions.insert(TenantTimelineId::new(tenant_id, timeline_id));
|
||||
extensions.insert(shard_index);
|
||||
|
||||
Ok(req)
|
||||
}
|
||||
}
|
||||
|
||||
/// Authenticates gRPC page service requests. Must run after TenantMetadataInterceptor.
|
||||
#[derive(Clone)]
|
||||
struct TenantAuthInterceptor {
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
}
|
||||
|
||||
impl TenantAuthInterceptor {
|
||||
fn new(auth: Option<Arc<SwappableJwtAuth>>) -> Self {
|
||||
Self { auth }
|
||||
}
|
||||
}
|
||||
|
||||
impl tonic::service::Interceptor for TenantAuthInterceptor {
|
||||
fn call(&mut self, req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
|
||||
// Do nothing if auth is disabled.
|
||||
let Some(auth) = self.auth.as_ref() else {
|
||||
return Ok(req);
|
||||
};
|
||||
|
||||
// Fetch the tenant ID that's been set by TenantMetadataInterceptor.
|
||||
let ttid = req
|
||||
.extensions()
|
||||
.get::<TenantTimelineId>()
|
||||
.expect("TenantMetadataInterceptor must run before TenantAuthInterceptor");
|
||||
|
||||
// Fetch and decode the JWT token.
|
||||
let jwt = req
|
||||
.metadata()
|
||||
.get("authorization")
|
||||
.ok_or_else(|| tonic::Status::unauthenticated("no authorization header"))?
|
||||
.to_str()
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid authorization header"))?
|
||||
.strip_prefix("Bearer ")
|
||||
.ok_or_else(|| tonic::Status::invalid_argument("invalid authorization header"))?
|
||||
.trim();
|
||||
let jwtdata: TokenData<Claims> = auth
|
||||
.decode(jwt)
|
||||
.map_err(|err| tonic::Status::invalid_argument(format!("invalid JWT token: {err}")))?;
|
||||
let claims = jwtdata.claims;
|
||||
|
||||
// Check if the token is valid for this tenant.
|
||||
check_permission(&claims, Some(ttid.tenant_id))
|
||||
.map_err(|err| tonic::Status::permission_denied(err.to_string()))?;
|
||||
|
||||
// TODO: consider stashing the claims in the request extensions, if needed.
|
||||
|
||||
Ok(req)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum GetActiveTimelineError {
|
||||
#[error(transparent)]
|
||||
|
||||
@@ -276,10 +276,9 @@ pub enum TaskKind {
|
||||
// HTTP endpoint listener.
|
||||
HttpEndpointListener,
|
||||
|
||||
/// Task that handles a single page service connection. A PageRequestHandler
|
||||
/// task starts detached from any particular tenant or timeline, but it can
|
||||
/// be associated with one later, after receiving a command from the client.
|
||||
/// Also used for the gRPC page service API, including the main server task.
|
||||
// Task that handles a single connection. A PageRequestHandler task
|
||||
// starts detached from any particular tenant or timeline, but it can be
|
||||
// associated with one later, after receiving a command from the client.
|
||||
PageRequestHandler,
|
||||
|
||||
/// Manages the WAL receiver connection for one timeline.
|
||||
|
||||
@@ -84,7 +84,6 @@ use crate::context;
|
||||
use crate::context::RequestContextBuilder;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
|
||||
use crate::feature_resolver::FeatureResolver;
|
||||
use crate::l0_flush::L0FlushGlobalState;
|
||||
use crate::metrics::{
|
||||
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
|
||||
@@ -160,7 +159,6 @@ pub struct TenantSharedResources {
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
pub l0_flush_global_state: L0FlushGlobalState,
|
||||
pub basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
pub feature_resolver: FeatureResolver,
|
||||
}
|
||||
|
||||
/// A [`TenantShard`] is really an _attached_ tenant. The configuration
|
||||
@@ -382,8 +380,6 @@ pub struct TenantShard {
|
||||
pub(crate) gc_block: gc_block::GcBlock,
|
||||
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
|
||||
feature_resolver: FeatureResolver,
|
||||
}
|
||||
impl std::fmt::Debug for TenantShard {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
@@ -864,14 +860,6 @@ impl Debug for SetStoppingError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum FinalizeTimelineImportError {
|
||||
#[error("Import task not done yet")]
|
||||
ImportTaskStillRunning,
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
/// Arguments to [`TenantShard::create_timeline`].
|
||||
///
|
||||
/// Not usable as an idempotency key for timeline creation because if [`CreateTimelineParamsBranch::ancestor_start_lsn`]
|
||||
@@ -1158,20 +1146,10 @@ impl TenantShard {
|
||||
ctx,
|
||||
)?;
|
||||
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
|
||||
|
||||
if !disk_consistent_lsn.is_valid() {
|
||||
// As opposed to normal timelines which get initialised with a disk consitent LSN
|
||||
// via initdb, imported timelines start from 0. If the import task stops before
|
||||
// it advances disk consitent LSN, allow it to resume.
|
||||
let in_progress_import = import_pgdata
|
||||
.as_ref()
|
||||
.map(|import| !import.is_done())
|
||||
.unwrap_or(false);
|
||||
if !in_progress_import {
|
||||
anyhow::bail!("Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn");
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::ensure!(
|
||||
disk_consistent_lsn.is_valid(),
|
||||
"Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
|
||||
);
|
||||
assert_eq!(
|
||||
disk_consistent_lsn,
|
||||
metadata.disk_consistent_lsn(),
|
||||
@@ -1265,25 +1243,20 @@ impl TenantShard {
|
||||
}
|
||||
}
|
||||
|
||||
if disk_consistent_lsn.is_valid() {
|
||||
// Sanity check: a timeline should have some content.
|
||||
// Exception: importing timelines might not yet have any
|
||||
anyhow::ensure!(
|
||||
ancestor.is_some()
|
||||
|| timeline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.expect(
|
||||
"currently loading, layer manager cannot be shutdown already"
|
||||
)
|
||||
.iter_historic_layers()
|
||||
.next()
|
||||
.is_some(),
|
||||
"Timeline has no ancestor and no layer files"
|
||||
);
|
||||
}
|
||||
// Sanity check: a timeline should have some content.
|
||||
anyhow::ensure!(
|
||||
ancestor.is_some()
|
||||
|| timeline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.expect("currently loading, layer manager cannot be shutdown already")
|
||||
.iter_historic_layers()
|
||||
.next()
|
||||
.is_some(),
|
||||
"Timeline has no ancestor and no layer files"
|
||||
);
|
||||
|
||||
Ok(TimelineInitAndSyncResult::ReadyToActivate)
|
||||
}
|
||||
@@ -1319,7 +1292,6 @@ impl TenantShard {
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
feature_resolver,
|
||||
} = resources;
|
||||
|
||||
let attach_mode = attached_conf.location.attach_mode;
|
||||
@@ -1336,7 +1308,6 @@ impl TenantShard {
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
feature_resolver,
|
||||
));
|
||||
|
||||
// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
|
||||
@@ -2883,13 +2854,13 @@ impl TenantShard {
|
||||
pub(crate) async fn finalize_importing_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<(), FinalizeTimelineImportError> {
|
||||
) -> anyhow::Result<()> {
|
||||
let timeline = {
|
||||
let locked = self.timelines_importing.lock().unwrap();
|
||||
match locked.get(&timeline_id) {
|
||||
Some(importing_timeline) => {
|
||||
if !importing_timeline.import_task_handle.is_finished() {
|
||||
return Err(FinalizeTimelineImportError::ImportTaskStillRunning);
|
||||
return Err(anyhow::anyhow!("Import task not done yet"));
|
||||
}
|
||||
|
||||
importing_timeline.timeline.clone()
|
||||
@@ -2902,13 +2873,8 @@ impl TenantShard {
|
||||
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_import_pgdata_finalize()
|
||||
.map_err(|_err| FinalizeTimelineImportError::ShuttingDown)?;
|
||||
timeline
|
||||
.remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.map_err(|_err| FinalizeTimelineImportError::ShuttingDown)?;
|
||||
.schedule_index_upload_for_import_pgdata_finalize()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
|
||||
self.timelines_importing
|
||||
.lock()
|
||||
@@ -3169,18 +3135,11 @@ impl TenantShard {
|
||||
.or_insert_with(|| Arc::new(GcCompactionQueue::new()))
|
||||
.clone()
|
||||
};
|
||||
let gc_compaction_strategy = self
|
||||
.feature_resolver
|
||||
.evaluate_multivariate("gc-comapction-strategy", self.tenant_shard_id.tenant_id)
|
||||
.ok();
|
||||
let span = if let Some(gc_compaction_strategy) = gc_compaction_strategy {
|
||||
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id, strategy = %gc_compaction_strategy)
|
||||
} else {
|
||||
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id)
|
||||
};
|
||||
outcome = queue
|
||||
.iteration(cancel, ctx, &self.gc_block, &timeline)
|
||||
.instrument(span)
|
||||
.instrument(
|
||||
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -3512,9 +3471,8 @@ impl TenantShard {
|
||||
let mut timelines_importing = self.timelines_importing.lock().unwrap();
|
||||
timelines_importing
|
||||
.drain()
|
||||
.for_each(|(timeline_id, importing_timeline)| {
|
||||
let span = tracing::info_span!("importing_timeline_shutdown", %timeline_id);
|
||||
js.spawn(async move { importing_timeline.shutdown().instrument(span).await });
|
||||
.for_each(|(_timeline_id, importing_timeline)| {
|
||||
importing_timeline.shutdown();
|
||||
});
|
||||
}
|
||||
// test_long_timeline_create_then_tenant_delete is leaning on this message
|
||||
@@ -4289,7 +4247,6 @@ impl TenantShard {
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
feature_resolver: FeatureResolver,
|
||||
) -> TenantShard {
|
||||
assert!(!attached_conf.location.generation.is_none());
|
||||
|
||||
@@ -4394,7 +4351,6 @@ impl TenantShard {
|
||||
gc_block: Default::default(),
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
feature_resolver,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5315,7 +5271,6 @@ impl TenantShard {
|
||||
l0_compaction_trigger: self.l0_compaction_trigger.clone(),
|
||||
l0_flush_global_state: self.l0_flush_global_state.clone(),
|
||||
basebackup_prepare_sender: self.basebackup_prepare_sender.clone(),
|
||||
feature_resolver: self.feature_resolver.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5918,7 +5873,6 @@ pub(crate) mod harness {
|
||||
// TODO: ideally we should run all unit tests with both configs
|
||||
L0FlushGlobalState::new(L0FlushConfig::default()),
|
||||
basebackup_requst_sender,
|
||||
FeatureResolver::new_disabled(),
|
||||
));
|
||||
|
||||
let preload = tenant
|
||||
@@ -8360,24 +8314,10 @@ mod tests {
|
||||
}
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
// Force layers to L1
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
{
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceL0Compaction);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if iter % 5 == 0 {
|
||||
let scan_lsn = Lsn(lsn.0 + 1);
|
||||
info!("scanning at {}", scan_lsn);
|
||||
let (_, before_delta_file_accessed) =
|
||||
scan_with_statistics(&tline, &keyspace, scan_lsn, &ctx, io_concurrency.clone())
|
||||
scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone())
|
||||
.await?;
|
||||
tline
|
||||
.compact(
|
||||
@@ -8386,14 +8326,13 @@ mod tests {
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceImageLayerCreation);
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags.insert(CompactFlags::ForceL0Compaction);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
let (_, after_delta_file_accessed) =
|
||||
scan_with_statistics(&tline, &keyspace, scan_lsn, &ctx, io_concurrency.clone())
|
||||
scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone())
|
||||
.await?;
|
||||
assert!(
|
||||
after_delta_file_accessed < before_delta_file_accessed,
|
||||
@@ -8834,8 +8773,6 @@ mod tests {
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// Image layer creation happens on the disk_consistent_lsn so we need to force set it now.
|
||||
tline.force_set_disk_consistent_lsn(Lsn(0x40));
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
@@ -8849,7 +8786,8 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// Image layers are created at repartition LSN
|
||||
|
||||
// Image layers are created at last_record_lsn
|
||||
let images = tline
|
||||
.inspect_image_layers(Lsn(0x40), &ctx, io_concurrency.clone())
|
||||
.await
|
||||
|
||||
@@ -103,7 +103,6 @@ use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
|
||||
use crate::feature_resolver::FeatureResolver;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
use crate::metrics::{
|
||||
@@ -199,7 +198,6 @@ pub struct TimelineResources {
|
||||
pub l0_compaction_trigger: Arc<Notify>,
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
pub basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
pub feature_resolver: FeatureResolver,
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
@@ -446,8 +444,6 @@ pub struct Timeline {
|
||||
|
||||
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
|
||||
basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
|
||||
feature_resolver: FeatureResolver,
|
||||
}
|
||||
|
||||
pub(crate) enum PreviousHeatmap {
|
||||
@@ -3076,8 +3072,6 @@ impl Timeline {
|
||||
wait_lsn_log_slow: tokio::sync::Semaphore::new(1),
|
||||
|
||||
basebackup_prepare_sender: resources.basebackup_prepare_sender,
|
||||
|
||||
feature_resolver: resources.feature_resolver,
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
@@ -4912,7 +4906,6 @@ impl Timeline {
|
||||
LastImageLayerCreationStatus::Initial,
|
||||
false, // don't yield for L0, we're flushing L0
|
||||
)
|
||||
.instrument(info_span!("create_image_layers", mode = %ImageLayerCreationMode::Initial, partition_mode = "initial", lsn = %self.initdb_lsn))
|
||||
.await?;
|
||||
debug_assert!(
|
||||
matches!(is_complete, LastImageLayerCreationStatus::Complete),
|
||||
@@ -5469,8 +5462,7 @@ impl Timeline {
|
||||
|
||||
/// Returns the image layers generated and an enum indicating whether the process is fully completed.
|
||||
/// true = we have generate all image layers, false = we preempt the process for L0 compaction.
|
||||
///
|
||||
/// `partition_mode` is only for logging purpose and is not used anywhere in this function.
|
||||
#[tracing::instrument(skip_all, fields(%lsn, %mode))]
|
||||
async fn create_image_layers(
|
||||
self: &Arc<Timeline>,
|
||||
partitioning: &KeyPartitioning,
|
||||
|
||||
@@ -206,8 +206,8 @@ pub struct GcCompactionQueue {
|
||||
}
|
||||
|
||||
static CONCURRENT_GC_COMPACTION_TASKS: Lazy<Arc<Semaphore>> = Lazy::new(|| {
|
||||
// Only allow one timeline on one pageserver to run gc compaction at a time.
|
||||
Arc::new(Semaphore::new(1))
|
||||
// Only allow two timelines on one pageserver to run gc compaction at a time.
|
||||
Arc::new(Semaphore::new(2))
|
||||
});
|
||||
|
||||
impl GcCompactionQueue {
|
||||
@@ -1278,55 +1278,11 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let gc_cutoff = *self.applied_gc_cutoff_lsn.read();
|
||||
let l0_l1_boundary_lsn = {
|
||||
// We do the repartition on the L0-L1 boundary. All data below the boundary
|
||||
// are compacted by L0 with low read amplification, thus making the `repartition`
|
||||
// function run fast.
|
||||
let guard = self.layers.read().await;
|
||||
guard
|
||||
.all_persistent_layers()
|
||||
.iter()
|
||||
.map(|x| {
|
||||
// Use the end LSN of delta layers OR the start LSN of image layers.
|
||||
if x.is_delta {
|
||||
x.lsn_range.end
|
||||
} else {
|
||||
x.lsn_range.start
|
||||
}
|
||||
})
|
||||
.max()
|
||||
};
|
||||
|
||||
let (partition_mode, partition_lsn) = if cfg!(test)
|
||||
|| cfg!(feature = "testing")
|
||||
|| self
|
||||
.feature_resolver
|
||||
.evaluate_boolean("image-compaction-boundary", self.tenant_shard_id.tenant_id)
|
||||
.is_ok()
|
||||
{
|
||||
let last_repartition_lsn = self.partitioning.read().1;
|
||||
let lsn = match l0_l1_boundary_lsn {
|
||||
Some(boundary) => gc_cutoff
|
||||
.max(boundary)
|
||||
.max(last_repartition_lsn)
|
||||
.max(self.initdb_lsn)
|
||||
.max(self.ancestor_lsn),
|
||||
None => self.get_last_record_lsn(),
|
||||
};
|
||||
if lsn <= self.initdb_lsn || lsn <= self.ancestor_lsn {
|
||||
// Do not attempt to create image layers below the initdb or ancestor LSN -- no data below it
|
||||
("l0_l1_boundary", self.get_last_record_lsn())
|
||||
} else {
|
||||
("l0_l1_boundary", lsn)
|
||||
}
|
||||
} else {
|
||||
("latest_record", self.get_last_record_lsn())
|
||||
};
|
||||
|
||||
// 2. Repartition and create image layers if necessary
|
||||
match self
|
||||
.repartition(
|
||||
partition_lsn,
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
options.flags,
|
||||
ctx,
|
||||
@@ -1345,19 +1301,18 @@ impl Timeline {
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
|
||||
// 3. Create new image layers for partitions that have been modified "enough".
|
||||
let mode = if options
|
||||
.flags
|
||||
.contains(CompactFlags::ForceImageLayerCreation)
|
||||
{
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
};
|
||||
let (image_layers, outcome) = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
mode,
|
||||
if options
|
||||
.flags
|
||||
.contains(CompactFlags::ForceImageLayerCreation)
|
||||
{
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
&image_ctx,
|
||||
self.last_image_layer_creation_status
|
||||
.load()
|
||||
@@ -1365,7 +1320,6 @@ impl Timeline {
|
||||
.clone(),
|
||||
options.flags.contains(CompactFlags::YieldForL0),
|
||||
)
|
||||
.instrument(info_span!("create_image_layers", mode = %mode, partition_mode = %partition_mode, lsn = %lsn))
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
if let CreateImageLayersError::GetVectoredError(
|
||||
@@ -1390,8 +1344,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
Ok(_) => {
|
||||
// This happens very frequently so we don't want to log it.
|
||||
debug!("skipping repartitioning due to image compaction LSN being below GC cutoff");
|
||||
info!("skipping repartitioning due to image compaction LSN being below GC cutoff");
|
||||
}
|
||||
|
||||
// Suppress errors when cancelled.
|
||||
|
||||
@@ -25,11 +25,8 @@ pub(crate) struct ImportingTimeline {
|
||||
}
|
||||
|
||||
impl ImportingTimeline {
|
||||
pub(crate) async fn shutdown(self) {
|
||||
pub(crate) fn shutdown(self) {
|
||||
self.import_task_handle.abort();
|
||||
let _ = self.import_task_handle.await;
|
||||
|
||||
self.timeline.remote_client.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,11 +93,6 @@ pub async fn doit(
|
||||
);
|
||||
}
|
||||
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_file_changes()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
|
||||
// Communicate that shard is done.
|
||||
// Ensure at-least-once delivery of the upcall to storage controller
|
||||
// before we mark the task as done and never come here again.
|
||||
|
||||
@@ -30,7 +30,6 @@
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -101,24 +100,8 @@ async fn run_v1(
|
||||
tasks: Vec::default(),
|
||||
};
|
||||
|
||||
// Use the job size limit encoded in the progress if we are resuming an import.
|
||||
// This ensures that imports have stable plans even if the pageserver config changes.
|
||||
let import_config = {
|
||||
match &import_progress {
|
||||
Some(progress) => {
|
||||
let base = &timeline.conf.timeline_import_config;
|
||||
TimelineImportConfig {
|
||||
import_job_soft_size_limit: NonZeroUsize::new(progress.job_soft_size_limit)
|
||||
.unwrap(),
|
||||
import_job_concurrency: base.import_job_concurrency,
|
||||
import_job_checkpoint_threshold: base.import_job_checkpoint_threshold,
|
||||
}
|
||||
}
|
||||
None => timeline.conf.timeline_import_config.clone(),
|
||||
}
|
||||
};
|
||||
|
||||
let plan = planner.plan(&import_config).await?;
|
||||
let import_config = &timeline.conf.timeline_import_config;
|
||||
let plan = planner.plan(import_config).await?;
|
||||
|
||||
// Hash the plan and compare with the hash of the plan we got back from the storage controller.
|
||||
// If the two match, it means that the planning stage had the same output.
|
||||
@@ -130,20 +113,20 @@ async fn run_v1(
|
||||
let plan_hash = hasher.finish();
|
||||
|
||||
if let Some(progress) = &import_progress {
|
||||
if plan_hash != progress.import_plan_hash {
|
||||
anyhow::bail!("Import plan does not match storcon metadata");
|
||||
}
|
||||
|
||||
// Handle collisions on jobs of unequal length
|
||||
if progress.jobs != plan.jobs.len() {
|
||||
anyhow::bail!("Import plan job length does not match storcon metadata")
|
||||
}
|
||||
|
||||
if plan_hash != progress.import_plan_hash {
|
||||
anyhow::bail!("Import plan does not match storcon metadata");
|
||||
}
|
||||
}
|
||||
|
||||
pausable_failpoint!("import-timeline-pre-execute-pausable");
|
||||
|
||||
let start_from_job_idx = import_progress.map(|progress| progress.completed);
|
||||
plan.execute(timeline, start_from_job_idx, plan_hash, &import_config, ctx)
|
||||
plan.execute(timeline, start_from_job_idx, plan_hash, import_config, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -235,19 +218,6 @@ impl Planner {
|
||||
checkpoint_buf,
|
||||
)));
|
||||
|
||||
// Sort the tasks by the key ranges they handle.
|
||||
// The plan being generated here needs to be stable across invocations
|
||||
// of this method.
|
||||
self.tasks.sort_by_key(|task| match task {
|
||||
AnyImportTask::SingleKey(key) => (key.key, key.key.next()),
|
||||
AnyImportTask::RelBlocks(rel_blocks) => {
|
||||
(rel_blocks.key_range.start, rel_blocks.key_range.end)
|
||||
}
|
||||
AnyImportTask::SlruBlocks(slru_blocks) => {
|
||||
(slru_blocks.key_range.start, slru_blocks.key_range.end)
|
||||
}
|
||||
});
|
||||
|
||||
// Assigns parts of key space to later parallel jobs
|
||||
let mut last_end_key = Key::MIN;
|
||||
let mut current_chunk = Vec::new();
|
||||
@@ -456,8 +426,6 @@ impl Plan {
|
||||
}));
|
||||
},
|
||||
maybe_complete_job_idx = work.next() => {
|
||||
pausable_failpoint!("import-task-complete-pausable");
|
||||
|
||||
match maybe_complete_job_idx {
|
||||
Some(Ok((job_idx, res))) => {
|
||||
assert!(last_completed_job_idx.checked_add(1).unwrap() == job_idx);
|
||||
@@ -470,12 +438,8 @@ impl Plan {
|
||||
jobs: jobs_in_plan,
|
||||
completed: last_completed_job_idx,
|
||||
import_plan_hash,
|
||||
job_soft_size_limit: import_config.import_job_soft_size_limit.into(),
|
||||
};
|
||||
|
||||
timeline.remote_client.schedule_index_upload_for_file_changes()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
|
||||
storcon_client.put_timeline_import_status(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
@@ -676,11 +640,7 @@ impl Hash for ImportSingleKeyTask {
|
||||
let ImportSingleKeyTask { key, buf } = self;
|
||||
|
||||
key.hash(state);
|
||||
// The key value might not have a stable binary representation.
|
||||
// For instance, the db directory uses an unstable hash-map.
|
||||
// To work around this we are a bit lax here and only hash the
|
||||
// size of the buffer which must be consistent.
|
||||
buf.len().hash(state);
|
||||
buf.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -955,7 +915,7 @@ impl ChunkProcessingJob {
|
||||
let guard = timeline.layers.read().await;
|
||||
let existing_layer = guard.try_get_from_key(&desc.key());
|
||||
if let Some(layer) = existing_layer {
|
||||
if layer.metadata().generation == timeline.generation {
|
||||
if layer.metadata().generation != timeline.generation {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Import attempted to rewrite layer file in the same generation: {}",
|
||||
layer.local_path()
|
||||
|
||||
@@ -69,6 +69,7 @@ struct NeonWALReader
|
||||
WALSegmentContext segcxt;
|
||||
WALOpenSegment seg;
|
||||
int wre_errno;
|
||||
TimeLineID local_active_tlid;
|
||||
/* Explains failure to read, static for simplicity. */
|
||||
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
|
||||
|
||||
@@ -106,7 +107,8 @@ struct NeonWALReader
|
||||
|
||||
/* palloc and initialize NeonWALReader */
|
||||
NeonWALReader *
|
||||
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix)
|
||||
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn,
|
||||
char *log_prefix, TimeLineID tlid)
|
||||
{
|
||||
NeonWALReader *reader;
|
||||
|
||||
@@ -118,6 +120,7 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_
|
||||
MemoryContextAllocZero(TopMemoryContext, sizeof(NeonWALReader));
|
||||
|
||||
reader->available_lsn = available_lsn;
|
||||
reader->local_active_tlid = tlid;
|
||||
reader->seg.ws_file = -1;
|
||||
reader->seg.ws_segno = 0;
|
||||
reader->seg.ws_tli = 0;
|
||||
@@ -577,6 +580,17 @@ NeonWALReaderIsRemConnEstablished(NeonWALReader *state)
|
||||
return state->rem_state == RS_ESTABLISHED;
|
||||
}
|
||||
|
||||
/*
|
||||
* Whether remote connection is established. Once this is done, until successful
|
||||
* local read or error socket is stable and user can update socket events
|
||||
* instead of readding it each time.
|
||||
*/
|
||||
TimeLineID
|
||||
NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state)
|
||||
{
|
||||
return state->local_active_tlid;
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns events user should wait on connection socket or 0 if remote
|
||||
* connection is not active.
|
||||
|
||||
@@ -19,9 +19,12 @@ typedef enum
|
||||
NEON_WALREAD_ERROR,
|
||||
} NeonWALReadResult;
|
||||
|
||||
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix);
|
||||
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size,
|
||||
XLogRecPtr available_lsn,
|
||||
char *log_prefix, TimeLineID tlid);
|
||||
extern void NeonWALReaderFree(NeonWALReader *state);
|
||||
extern void NeonWALReaderResetRemote(NeonWALReader *state);
|
||||
extern TimeLineID NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state);
|
||||
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
|
||||
extern uint32 NeonWALReaderEvents(NeonWALReader *state);
|
||||
|
||||
@@ -98,6 +98,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
wp = palloc0(sizeof(WalProposer));
|
||||
wp->config = config;
|
||||
wp->api = api;
|
||||
wp->localTimeLineID = config->pgTimeline;
|
||||
wp->state = WPS_COLLECTING_TERMS;
|
||||
wp->mconf.generation = INVALID_GENERATION;
|
||||
wp->mconf.members.len = 0;
|
||||
@@ -155,9 +156,8 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
int written = 0;
|
||||
|
||||
written = snprintf((char *) &sk->conninfo, MAXCONNINFO,
|
||||
"%s host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
|
||||
wp->config->safekeeper_conninfo_options, sk->host, sk->port,
|
||||
wp->config->neon_timeline, wp->config->neon_tenant);
|
||||
"host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
|
||||
sk->host, sk->port, wp->config->neon_timeline, wp->config->neon_tenant);
|
||||
if (written > MAXCONNINFO || written < 0)
|
||||
wp_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port);
|
||||
}
|
||||
@@ -1380,7 +1380,7 @@ ProcessPropStartPos(WalProposer *wp)
|
||||
* we must bail out, as clog and other non rel data is inconsistent.
|
||||
*/
|
||||
walprop_shared = wp->api.get_shmem_state(wp);
|
||||
if (!wp->config->syncSafekeepers)
|
||||
if (!wp->config->syncSafekeepers && !wp->config->replicaPromote)
|
||||
{
|
||||
/*
|
||||
* Basebackup LSN always points to the beginning of the record (not
|
||||
|
||||
@@ -714,9 +714,6 @@ typedef struct WalProposerConfig
|
||||
*/
|
||||
char *safekeepers_list;
|
||||
|
||||
/* libpq connection info options. */
|
||||
char *safekeeper_conninfo_options;
|
||||
|
||||
/*
|
||||
* WalProposer reconnects to offline safekeepers once in this interval.
|
||||
* Time is in milliseconds.
|
||||
@@ -742,6 +739,11 @@ typedef struct WalProposerConfig
|
||||
*/
|
||||
bool syncSafekeepers;
|
||||
|
||||
/*
|
||||
* Replica is promoted to primary
|
||||
*/
|
||||
bool replicaPromote;
|
||||
|
||||
/* Will be passed to safekeepers in greet request. */
|
||||
uint64 systemId;
|
||||
|
||||
@@ -808,6 +810,8 @@ typedef struct WalProposer
|
||||
|
||||
/* WAL has been generated up to this point */
|
||||
XLogRecPtr availableLsn;
|
||||
/* Current local TimeLineId in use */
|
||||
TimeLineID localTimeLineID;
|
||||
|
||||
/* cached GetAcknowledgedByQuorumWALPosition result */
|
||||
XLogRecPtr commitLsn;
|
||||
|
||||
@@ -64,7 +64,6 @@ char *wal_acceptors_list = "";
|
||||
int wal_acceptor_reconnect_timeout = 1000;
|
||||
int wal_acceptor_connection_timeout = 10000;
|
||||
int safekeeper_proto_version = 3;
|
||||
char *safekeeper_conninfo_options = "";
|
||||
|
||||
/* Set to true in the walproposer bgw. */
|
||||
static bool am_walproposer;
|
||||
@@ -74,6 +73,7 @@ static XLogRecPtr sentPtr = InvalidXLogRecPtr;
|
||||
static const walproposer_api walprop_pg;
|
||||
static volatile sig_atomic_t got_SIGUSR2 = false;
|
||||
static bool reported_sigusr2 = false;
|
||||
static bool start_as_replica = false;
|
||||
|
||||
static XLogRecPtr standby_flush_lsn = InvalidXLogRecPtr;
|
||||
static XLogRecPtr standby_apply_lsn = InvalidXLogRecPtr;
|
||||
@@ -120,11 +120,11 @@ init_walprop_config(bool syncSafekeepers)
|
||||
walprop_config.neon_timeline = neon_timeline;
|
||||
/* WalProposerCreate scribbles directly on it, so pstrdup */
|
||||
walprop_config.safekeepers_list = pstrdup(wal_acceptors_list);
|
||||
walprop_config.safekeeper_conninfo_options = pstrdup(safekeeper_conninfo_options);
|
||||
walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout;
|
||||
walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout;
|
||||
walprop_config.wal_segment_size = wal_segment_size;
|
||||
walprop_config.syncSafekeepers = syncSafekeepers;
|
||||
walprop_config.replicaPromote = start_as_replica;
|
||||
if (!syncSafekeepers)
|
||||
walprop_config.systemId = GetSystemIdentifier();
|
||||
else
|
||||
@@ -151,6 +151,8 @@ WalProposerSync(int argc, char *argv[])
|
||||
WalProposerStart(wp);
|
||||
}
|
||||
|
||||
#define GUC_POLL_DELAY 100000L // 0.1 sec
|
||||
|
||||
/*
|
||||
* WAL proposer bgworker entry point.
|
||||
*/
|
||||
@@ -159,12 +161,18 @@ WalProposerMain(Datum main_arg)
|
||||
{
|
||||
WalProposer *wp;
|
||||
|
||||
if (*wal_acceptors_list == '\0')
|
||||
{
|
||||
elog(PANIC, "Safekeepers list is empty");
|
||||
}
|
||||
|
||||
init_walprop_config(false);
|
||||
walprop_pg_init_bgworker();
|
||||
am_walproposer = true;
|
||||
walprop_pg_load_libpqwalreceiver();
|
||||
|
||||
wp = WalProposerCreate(&walprop_config, walprop_pg);
|
||||
wp->localTimeLineID = GetWALInsertionTimeLine();
|
||||
wp->last_reconnect_attempt = walprop_pg_get_current_timestamp(wp);
|
||||
|
||||
walprop_pg_init_walsender();
|
||||
@@ -205,16 +213,6 @@ nwp_register_gucs(void)
|
||||
* GUC_LIST_QUOTE */
|
||||
NULL, assign_neon_safekeepers, NULL);
|
||||
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_conninfo_options",
|
||||
"libpq keyword parameters and values to apply to safekeeper connections",
|
||||
NULL,
|
||||
&safekeeper_conninfo_options,
|
||||
"",
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.safekeeper_reconnect_timeout",
|
||||
"Walproposer reconnects to offline safekeepers once in this interval.",
|
||||
@@ -306,16 +304,15 @@ safekeepers_cmp(char *old, char *new)
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if
|
||||
* the list changed.
|
||||
*/
|
||||
static void
|
||||
assign_neon_safekeepers(const char *newval, void *extra)
|
||||
{
|
||||
char *newval_copy;
|
||||
char *oldval;
|
||||
|
||||
if (newval && *newval == '\0')
|
||||
start_as_replica = true;
|
||||
|
||||
if (!am_walproposer)
|
||||
return;
|
||||
|
||||
@@ -512,10 +509,6 @@ walprop_register_bgworker(void)
|
||||
{
|
||||
BackgroundWorker bgw;
|
||||
|
||||
/* If no wal acceptors are specified, don't start the background worker. */
|
||||
if (*wal_acceptors_list == '\0')
|
||||
return;
|
||||
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
||||
@@ -526,7 +519,6 @@ walprop_register_bgworker(void)
|
||||
bgw.bgw_restart_time = 1;
|
||||
bgw.bgw_notify_pid = 0;
|
||||
bgw.bgw_main_arg = (Datum) 0;
|
||||
|
||||
RegisterBackgroundWorker(&bgw);
|
||||
}
|
||||
|
||||
@@ -1508,7 +1500,10 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk)
|
||||
|
||||
snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port);
|
||||
Assert(!sk->xlogreader);
|
||||
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propTermStartLsn, log_prefix);
|
||||
/* note that WalProposer shouldn't access safekeepers when active */
|
||||
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size,
|
||||
sk->wp->propTermStartLsn, log_prefix,
|
||||
sk->wp->localTimeLineID);
|
||||
if (sk->xlogreader == NULL)
|
||||
wpg_log(FATAL, "failed to allocate xlog reader");
|
||||
}
|
||||
@@ -1522,7 +1517,7 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count,
|
||||
buf,
|
||||
startptr,
|
||||
count,
|
||||
walprop_pg_get_timeline_id());
|
||||
sk->wp->localTimeLineID);
|
||||
|
||||
if (res == NEON_WALREAD_SUCCESS)
|
||||
{
|
||||
@@ -2021,6 +2016,7 @@ walprop_pg_get_redo_start_lsn(WalProposer *wp)
|
||||
return GetRedoStartLsn();
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
walprop_pg_strong_random(WalProposer *wp, void *buf, size_t len)
|
||||
{
|
||||
|
||||
@@ -68,8 +68,7 @@ NeonWALReadWaitForWAL(XLogRecPtr loc)
|
||||
}
|
||||
|
||||
static int
|
||||
NeonWALPageRead(
|
||||
XLogReaderState *xlogreader,
|
||||
NeonWALPageRead(XLogReaderState *xlogreader,
|
||||
XLogRecPtr targetPagePtr,
|
||||
int reqLen,
|
||||
XLogRecPtr targetRecPtr,
|
||||
@@ -106,12 +105,11 @@ NeonWALPageRead(
|
||||
|
||||
for (;;)
|
||||
{
|
||||
NeonWALReadResult res = NeonWALRead(
|
||||
wal_reader,
|
||||
NeonWALReadResult res = NeonWALRead(wal_reader,
|
||||
readBuf,
|
||||
targetPagePtr,
|
||||
count,
|
||||
walprop_pg_get_timeline_id());
|
||||
NeonWALReaderLocalActiveTimeLineID(wal_reader));
|
||||
|
||||
if (res == NEON_WALREAD_SUCCESS)
|
||||
{
|
||||
@@ -202,7 +200,8 @@ NeonOnDemandXLogReaderRoutines(XLogReaderRoutine *xlr)
|
||||
{
|
||||
elog(ERROR, "unable to start walsender when basebackupLsn is 0");
|
||||
}
|
||||
wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn, "[walsender] ");
|
||||
wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn,
|
||||
"[walsender] ", 1);
|
||||
}
|
||||
xlr->page_read = NeonWALPageRead;
|
||||
xlr->segment_open = NeonWALReadSegmentOpen;
|
||||
|
||||
@@ -44,7 +44,6 @@ struct GlobalTimelinesState {
|
||||
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
|
||||
// this map is dropped on restart.
|
||||
tombstones: HashMap<TenantTimelineId, Instant>,
|
||||
tenant_tombstones: HashMap<TenantId, Instant>,
|
||||
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
@@ -82,25 +81,10 @@ impl GlobalTimelinesState {
|
||||
}
|
||||
}
|
||||
|
||||
fn has_tombstone(&self, ttid: &TenantTimelineId) -> bool {
|
||||
self.tombstones.contains_key(ttid) || self.tenant_tombstones.contains_key(&ttid.tenant_id)
|
||||
}
|
||||
|
||||
/// Removes all blocking tombstones for the given timeline ID.
|
||||
/// Returns `true` if there have been actual changes.
|
||||
fn remove_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
|
||||
self.tombstones.remove(ttid).is_some()
|
||||
|| self.tenant_tombstones.remove(&ttid.tenant_id).is_some()
|
||||
}
|
||||
|
||||
fn delete(&mut self, ttid: TenantTimelineId) {
|
||||
self.timelines.remove(&ttid);
|
||||
self.tombstones.insert(ttid, Instant::now());
|
||||
}
|
||||
|
||||
fn add_tenant_tombstone(&mut self, tenant_id: TenantId) {
|
||||
self.tenant_tombstones.insert(tenant_id, Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
/// A struct used to manage access to the global timelines map.
|
||||
@@ -115,7 +99,6 @@ impl GlobalTimelines {
|
||||
state: Mutex::new(GlobalTimelinesState {
|
||||
timelines: HashMap::new(),
|
||||
tombstones: HashMap::new(),
|
||||
tenant_tombstones: HashMap::new(),
|
||||
conf,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
global_rate_limiter: RateLimiter::new(1, 1),
|
||||
@@ -262,7 +245,7 @@ impl GlobalTimelines {
|
||||
return Ok(timeline);
|
||||
}
|
||||
|
||||
if state.has_tombstone(&ttid) {
|
||||
if state.tombstones.contains_key(&ttid) {
|
||||
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
|
||||
}
|
||||
|
||||
@@ -312,14 +295,13 @@ impl GlobalTimelines {
|
||||
_ => {}
|
||||
}
|
||||
if check_tombstone {
|
||||
if state.has_tombstone(&ttid) {
|
||||
if state.tombstones.contains_key(&ttid) {
|
||||
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
|
||||
}
|
||||
} else {
|
||||
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
|
||||
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
|
||||
// It's also possible that we enter this when the tenant has been deleted, even if the timeline itself has never existed.
|
||||
if state.remove_tombstone(&ttid) {
|
||||
if state.tombstones.remove(&ttid).is_some() {
|
||||
warn!("un-deleted timeline {ttid}");
|
||||
}
|
||||
}
|
||||
@@ -500,7 +482,6 @@ impl GlobalTimelines {
|
||||
let tli_res = {
|
||||
let state = self.state.lock().unwrap();
|
||||
|
||||
// Do NOT check tenant tombstones here: those were set earlier
|
||||
if state.tombstones.contains_key(ttid) {
|
||||
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
|
||||
info!("Timeline {ttid} was already deleted");
|
||||
@@ -576,10 +557,6 @@ impl GlobalTimelines {
|
||||
action: DeleteOrExclude,
|
||||
) -> Result<HashMap<TenantTimelineId, TimelineDeleteResult>> {
|
||||
info!("deleting all timelines for tenant {}", tenant_id);
|
||||
|
||||
// Adding a tombstone before getting the timelines to prevent new timeline additions
|
||||
self.state.lock().unwrap().add_tenant_tombstone(*tenant_id);
|
||||
|
||||
let to_delete = self.get_all_for_tenant(*tenant_id);
|
||||
|
||||
let mut err = None;
|
||||
@@ -623,9 +600,6 @@ impl GlobalTimelines {
|
||||
state
|
||||
.tombstones
|
||||
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
|
||||
state
|
||||
.tenant_tombstones
|
||||
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -87,7 +87,6 @@ impl WalProposer {
|
||||
let config = Config {
|
||||
ttid,
|
||||
safekeepers_list: addrs,
|
||||
safekeeper_conninfo_options: String::new(),
|
||||
safekeeper_reconnect_timeout: 1000,
|
||||
safekeeper_connection_timeout: 5000,
|
||||
sync_safekeepers,
|
||||
|
||||
@@ -3823,13 +3823,6 @@ impl Service {
|
||||
.await;
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
let is_import = create_req.is_import();
|
||||
let read_only = matches!(
|
||||
create_req.mode,
|
||||
models::TimelineCreateRequestMode::Branch {
|
||||
read_only: true,
|
||||
..
|
||||
}
|
||||
);
|
||||
|
||||
if is_import {
|
||||
// Ensure that there is no split on-going.
|
||||
@@ -3902,13 +3895,13 @@ impl Service {
|
||||
}
|
||||
|
||||
None
|
||||
} else if safekeepers || read_only {
|
||||
} else if safekeepers {
|
||||
// Note that for imported timelines, we do not create the timeline on the safekeepers
|
||||
// straight away. Instead, we do it once the import finalized such that we know what
|
||||
// start LSN to provide for the safekeepers. This is done in
|
||||
// [`Self::finalize_timeline_import`].
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
|
||||
.await?;
|
||||
Some(res)
|
||||
@@ -3922,11 +3915,6 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(
|
||||
tenant_id=%req.tenant_shard_id.tenant_id,
|
||||
shard_id=%req.tenant_shard_id.shard_slug(),
|
||||
timeline_id=%req.timeline_id,
|
||||
))]
|
||||
pub(crate) async fn handle_timeline_shard_import_progress(
|
||||
self: &Arc<Self>,
|
||||
req: TimelineImportStatusRequest,
|
||||
@@ -3976,11 +3964,6 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(
|
||||
tenant_id=%req.tenant_shard_id.tenant_id,
|
||||
shard_id=%req.tenant_shard_id.shard_slug(),
|
||||
timeline_id=%req.timeline_id,
|
||||
))]
|
||||
pub(crate) async fn handle_timeline_shard_import_progress_upcall(
|
||||
self: &Arc<Self>,
|
||||
req: PutTimelineImportStatusRequest,
|
||||
|
||||
@@ -208,7 +208,6 @@ impl Service {
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_info: &TimelineInfo,
|
||||
read_only: bool,
|
||||
) -> Result<SafekeepersInfo, ApiError> {
|
||||
let timeline_id = timeline_info.timeline_id;
|
||||
let pg_version = timeline_info.pg_version * 10000;
|
||||
@@ -221,11 +220,7 @@ impl Service {
|
||||
let start_lsn = timeline_info.last_record_lsn;
|
||||
|
||||
// Choose initial set of safekeepers respecting affinity
|
||||
let sks = if !read_only {
|
||||
self.safekeepers_for_new_timeline().await?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let sks = self.safekeepers_for_new_timeline().await?;
|
||||
let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
|
||||
// Add timeline to db
|
||||
let mut timeline_persist = TimelinePersistence {
|
||||
@@ -258,16 +253,6 @@ impl Service {
|
||||
)));
|
||||
}
|
||||
}
|
||||
let ret = SafekeepersInfo {
|
||||
generation: timeline_persist.generation as u32,
|
||||
safekeepers: sks.clone(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
if read_only {
|
||||
return Ok(ret);
|
||||
}
|
||||
|
||||
// Create the timeline on a quorum of safekeepers
|
||||
let remaining = self
|
||||
.tenant_timeline_create_safekeepers_quorum(
|
||||
@@ -331,7 +316,12 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ret)
|
||||
Ok(SafekeepersInfo {
|
||||
generation: timeline_persist.generation as u32,
|
||||
safekeepers: sks,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
|
||||
@@ -346,10 +336,8 @@ impl Service {
|
||||
return Err(TimelineImportFinalizeError::ShuttingDown);
|
||||
}
|
||||
|
||||
// This function is only used in non-read-only scenarios
|
||||
let read_only = false;
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
@@ -422,18 +410,6 @@ impl Service {
|
||||
.chain(tl.sk_set.iter())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
// The timeline has no safekeepers: we need to delete it from the db manually,
|
||||
// as no safekeeper reconciler will get to it
|
||||
if all_sks.is_empty() {
|
||||
if let Err(err) = self
|
||||
.persistence
|
||||
.delete_timeline(tenant_id, timeline_id)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
// Schedule reconciliations
|
||||
for &sk_id in all_sks.iter() {
|
||||
let pending_op = TimelinePendingOpPersistence {
|
||||
|
||||
@@ -404,29 +404,6 @@ class PageserverTracingConfig:
|
||||
return ("tracing", value)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageserverImportConfig:
|
||||
import_job_concurrency: int
|
||||
import_job_soft_size_limit: int
|
||||
import_job_checkpoint_threshold: int
|
||||
|
||||
@staticmethod
|
||||
def default() -> PageserverImportConfig:
|
||||
return PageserverImportConfig(
|
||||
import_job_concurrency=4,
|
||||
import_job_soft_size_limit=512 * 1024,
|
||||
import_job_checkpoint_threshold=4,
|
||||
)
|
||||
|
||||
def to_config_key_value(self) -> tuple[str, dict[str, Any]]:
|
||||
value = {
|
||||
"import_job_concurrency": self.import_job_concurrency,
|
||||
"import_job_soft_size_limit": self.import_job_soft_size_limit,
|
||||
"import_job_checkpoint_threshold": self.import_job_checkpoint_threshold,
|
||||
}
|
||||
return ("timeline_import_config", value)
|
||||
|
||||
|
||||
class NeonEnvBuilder:
|
||||
"""
|
||||
Builder object to create a Neon runtime environment
|
||||
@@ -477,7 +454,6 @@ class NeonEnvBuilder:
|
||||
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
|
||||
pageserver_get_vectored_concurrent_io: str | None = None,
|
||||
pageserver_tracing_config: PageserverTracingConfig | None = None,
|
||||
pageserver_import_config: PageserverImportConfig | None = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -535,7 +511,6 @@ class NeonEnvBuilder:
|
||||
)
|
||||
|
||||
self.pageserver_tracing_config = pageserver_tracing_config
|
||||
self.pageserver_import_config = pageserver_import_config
|
||||
|
||||
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
|
||||
pageserver_default_tenant_config_compaction_algorithm
|
||||
@@ -1204,10 +1179,6 @@ class NeonEnv:
|
||||
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
|
||||
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
|
||||
self.pageserver_tracing_config = config.pageserver_tracing_config
|
||||
if config.pageserver_import_config is None:
|
||||
self.pageserver_import_config = PageserverImportConfig.default()
|
||||
else:
|
||||
self.pageserver_import_config = config.pageserver_import_config
|
||||
|
||||
# Create the neon_local's `NeonLocalInitConf`
|
||||
cfg: dict[str, Any] = {
|
||||
@@ -1253,7 +1224,6 @@ class NeonEnv:
|
||||
# Create config for pageserver
|
||||
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
grpc_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
for ps_id in range(
|
||||
self.BASE_PAGESERVER_ID, self.BASE_PAGESERVER_ID + config.num_pageservers
|
||||
):
|
||||
@@ -1280,13 +1250,18 @@ class NeonEnv:
|
||||
else None,
|
||||
"pg_auth_type": pg_auth_type,
|
||||
"http_auth_type": http_auth_type,
|
||||
"grpc_auth_type": grpc_auth_type,
|
||||
"availability_zone": availability_zone,
|
||||
# Disable pageserver disk syncs in tests: when running tests concurrently, this avoids
|
||||
# the pageserver taking a long time to start up due to syncfs flushing other tests' data
|
||||
"no_sync": True,
|
||||
# Look for gaps in WAL received from safekeepeers
|
||||
"validate_wal_contiguity": True,
|
||||
# TODO(vlad): make these configurable through the builder
|
||||
"timeline_import_config": {
|
||||
"import_job_concurrency": 4,
|
||||
"import_job_soft_size_limit": 512 * 1024,
|
||||
"import_job_checkpoint_threshold": 4,
|
||||
},
|
||||
}
|
||||
|
||||
# Batching (https://github.com/neondatabase/neon/issues/9377):
|
||||
@@ -1348,12 +1323,6 @@ class NeonEnv:
|
||||
|
||||
ps_cfg[key] = value
|
||||
|
||||
if self.pageserver_import_config is not None:
|
||||
key, value = self.pageserver_import_config.to_config_key_value()
|
||||
|
||||
if key not in ps_cfg:
|
||||
ps_cfg[key] = value
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
ps = NeonPageserver(
|
||||
self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"]
|
||||
@@ -4689,7 +4658,7 @@ class EndpointFactory:
|
||||
origin: Endpoint,
|
||||
endpoint_id: str | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
):
|
||||
) -> Endpoint:
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
@@ -4708,7 +4677,7 @@ class EndpointFactory:
|
||||
origin: Endpoint,
|
||||
endpoint_id: str | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
):
|
||||
) -> Endpoint:
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
|
||||
@@ -74,8 +74,9 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
for query in queries:
|
||||
with s_con.cursor() as secondary_cursor:
|
||||
secondary_cursor.execute(query)
|
||||
response = secondary_cursor.fetchone()
|
||||
assert response is not None
|
||||
res = secondary_cursor.fetchone()
|
||||
assert res is not None
|
||||
response = res
|
||||
assert response == responses[query]
|
||||
|
||||
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
||||
@@ -164,7 +165,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
|
||||
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
res = s_cur.fetchone()
|
||||
assert res[0] == 10000
|
||||
assert res == (10000,)
|
||||
|
||||
# Clear the cache in the standby, so that when we
|
||||
# re-execute the query, it will make GetPage
|
||||
@@ -195,7 +196,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
log_replica_lag(primary, secondary)
|
||||
res = s_cur.fetchone()
|
||||
assert res[0] == 10000
|
||||
assert res == (10000,)
|
||||
|
||||
|
||||
def run_pgbench(connstr: str, pg_bin: PgBin):
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
import base64
|
||||
import concurrent.futures
|
||||
import json
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from enum import Enum, StrEnum
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from threading import Event
|
||||
|
||||
@@ -14,14 +11,7 @@ import pytest
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.fast_import import FastImport
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PageserverImportConfig,
|
||||
PgBin,
|
||||
PgProtocol,
|
||||
StorageControllerMigrationConfig,
|
||||
VanillaPostgres,
|
||||
)
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres
|
||||
from fixtures.pageserver.http import (
|
||||
ImportPgdataIdemptencyKey,
|
||||
)
|
||||
@@ -504,259 +494,6 @@ def test_import_respects_tenant_shutdown(
|
||||
wait_until(cplane_notified)
|
||||
|
||||
|
||||
@skip_in_debug_build("Validation query takes too long in debug builds")
|
||||
def test_import_chaos(
|
||||
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
|
||||
):
|
||||
"""
|
||||
Perform a timeline import while injecting chaos in the environment.
|
||||
We expect that the import completes eventually, that it passes validation and
|
||||
the resulting timeline can be written to.
|
||||
"""
|
||||
TARGET_RELBOCK_SIZE = 512 * 1024 * 1024 # 512 MiB
|
||||
ALLOWED_IMPORT_RUNTIME = 90 # seconds
|
||||
SHARD_COUNT = 4
|
||||
|
||||
neon_env_builder.num_pageservers = SHARD_COUNT
|
||||
neon_env_builder.pageserver_import_config = PageserverImportConfig(
|
||||
import_job_concurrency=1,
|
||||
import_job_soft_size_limit=64 * 1024,
|
||||
import_job_checkpoint_threshold=4,
|
||||
)
|
||||
|
||||
# Set up mock control plane HTTP server to listen for import completions
|
||||
import_completion_signaled = Event()
|
||||
# There's some Python magic at play here. A list can be updated from the
|
||||
# handler thread, but an optional cannot. Hence, use a list with one element.
|
||||
import_error = []
|
||||
|
||||
def handler(request: Request) -> Response:
|
||||
assert request.json is not None
|
||||
|
||||
body = request.json
|
||||
if "error" in body:
|
||||
if body["error"]:
|
||||
import_error.append(body["error"])
|
||||
|
||||
log.info(f"control plane /import_complete request: {request.json}")
|
||||
import_completion_signaled.set()
|
||||
return Response(json.dumps({}), status=200)
|
||||
|
||||
cplane_mgmt_api_server = make_httpserver
|
||||
cplane_mgmt_api_server.expect_request(
|
||||
"/storage/api/v1/import_complete", method="PUT"
|
||||
).respond_with_handler(handler)
|
||||
|
||||
# Plug the cplane mock in
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
|
||||
)
|
||||
|
||||
# The import will specifiy a local filesystem path mocking remote storage
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
|
||||
|
||||
nrows = 0
|
||||
while True:
|
||||
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {TARGET_RELBOCK_SIZE // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= TARGET_RELBOCK_SIZE:
|
||||
break
|
||||
addrows = int((TARGET_RELBOCK_SIZE - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
vanilla_pg.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
|
||||
vanilla_pg.stop()
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
# Pause after every import task to extend the test runtime and allow
|
||||
# for more chaos injection.
|
||||
for ps in env.pageservers:
|
||||
ps.add_persistent_failpoint("import-task-complete-pausable", "sleep(5)")
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
# The shard might have moved or the pageserver hosting the shard restarted
|
||||
".*Call to node.*management API.*failed.*",
|
||||
# Migrations have their time outs set to 0
|
||||
".*Timed out after.*downloading layers.*",
|
||||
".*Failed to prepare by downloading layers.*",
|
||||
# The test may kill the storage controller or pageservers
|
||||
".*request was dropped before completing.*",
|
||||
]
|
||||
)
|
||||
for ps in env.pageservers:
|
||||
ps.allowed_errors.extend(
|
||||
[
|
||||
# We might re-write a layer in a different generation if the import
|
||||
# needs to redo some of the progress since not each job is checkpointed.
|
||||
".*was unlinked but was not dangling.*",
|
||||
# The test may kill the storage controller or pageservers
|
||||
".*request was dropped before completing.*",
|
||||
# Test can SIGTERM pageserver while it is downloading
|
||||
".*removing local file.*temp_download.*",
|
||||
".*Failed to flush heatmap.*",
|
||||
# Test can SIGTERM the storage controller while pageserver
|
||||
# is attempting to upcall.
|
||||
".*storage controller upcall failed.*timeline_import_status.*",
|
||||
# TODO(vlad): TenantManager::reset_tenant returns a blanked anyhow error.
|
||||
# It should return ResourceUnavailable or something that doesn't error log.
|
||||
".*activate_post_import.*InternalServerError.*tenant map is shutting down.*",
|
||||
# TODO(vlad): How can this happen?
|
||||
".*Failed to download a remote file: deserialize index part file.*",
|
||||
".*Cancelled request finished with an error.*",
|
||||
]
|
||||
)
|
||||
|
||||
importbucket_path = neon_env_builder.repo_dir / "test_import_chaos_bucket"
|
||||
mock_import_bucket(vanilla_pg, importbucket_path)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
idempotency = ImportPgdataIdemptencyKey.random()
|
||||
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id, shard_count=SHARD_COUNT, placement_policy={"Attached": 1}
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
env.storage_controller.timeline_create(
|
||||
tenant_id,
|
||||
{
|
||||
"new_timeline_id": str(timeline_id),
|
||||
"import_pgdata": {
|
||||
"idempotency_key": str(idempotency),
|
||||
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
def chaos(stop_chaos: threading.Event):
|
||||
class ChaosType(StrEnum):
|
||||
MIGRATE_SHARD = "migrate_shard"
|
||||
RESTART_IMMEDIATE = "restart_immediate"
|
||||
RESTART = "restart"
|
||||
STORCON_RESTART_IMMEDIATE = "storcon_restart_immediate"
|
||||
|
||||
while not stop_chaos.is_set():
|
||||
chaos_type = random.choices(
|
||||
population=[
|
||||
ChaosType.MIGRATE_SHARD,
|
||||
ChaosType.RESTART,
|
||||
ChaosType.RESTART_IMMEDIATE,
|
||||
ChaosType.STORCON_RESTART_IMMEDIATE,
|
||||
],
|
||||
weights=[0.25, 0.25, 0.25, 0.25],
|
||||
k=1,
|
||||
)[0]
|
||||
|
||||
try:
|
||||
if chaos_type == ChaosType.MIGRATE_SHARD:
|
||||
target_shard_number = random.randint(0, SHARD_COUNT - 1)
|
||||
target_shard = TenantShardId(tenant_id, target_shard_number, SHARD_COUNT)
|
||||
|
||||
placements = env.storage_controller.get_tenants_placement()
|
||||
log.info(f"{placements=}")
|
||||
target_ps = placements[str(target_shard)]["intent"]["attached"]
|
||||
if len(placements[str(target_shard)]["intent"]["secondary"]) == 0:
|
||||
dest_ps = None
|
||||
else:
|
||||
dest_ps = placements[str(target_shard)]["intent"]["secondary"][0]
|
||||
|
||||
if target_ps is None or dest_ps is None:
|
||||
continue
|
||||
|
||||
config = StorageControllerMigrationConfig(
|
||||
secondary_warmup_timeout="0s",
|
||||
secondary_download_request_timeout="0s",
|
||||
prewarm=False,
|
||||
)
|
||||
env.storage_controller.tenant_shard_migrate(target_shard, dest_ps, config)
|
||||
|
||||
log.info(
|
||||
f"CHAOS: Migrating shard {target_shard} from pageserver {target_ps} to {dest_ps}"
|
||||
)
|
||||
elif chaos_type == ChaosType.RESTART_IMMEDIATE:
|
||||
target_ps = random.choice(env.pageservers)
|
||||
log.info(f"CHAOS: Immediate restart of pageserver {target_ps.id}")
|
||||
target_ps.stop(immediate=True)
|
||||
target_ps.start()
|
||||
elif chaos_type == ChaosType.RESTART:
|
||||
target_ps = random.choice(env.pageservers)
|
||||
log.info(f"CHAOS: Normal restart of pageserver {target_ps.id}")
|
||||
target_ps.stop(immediate=False)
|
||||
target_ps.start()
|
||||
elif chaos_type == ChaosType.STORCON_RESTART_IMMEDIATE:
|
||||
log.info("CHAOS: Immediate restart of storage controller")
|
||||
env.storage_controller.stop(immediate=True)
|
||||
env.storage_controller.start()
|
||||
except Exception as e:
|
||||
log.warning(f"CHAOS: Error during chaos operation {chaos_type}: {e}")
|
||||
|
||||
# Sleep before next chaos event
|
||||
time.sleep(1)
|
||||
|
||||
log.info("Chaos injector stopped")
|
||||
|
||||
def wait_for_import_completion():
|
||||
start = time.time()
|
||||
done = import_completion_signaled.wait(ALLOWED_IMPORT_RUNTIME)
|
||||
if not done:
|
||||
raise TimeoutError(f"Import did not signal completion within {ALLOWED_IMPORT_RUNTIME}")
|
||||
|
||||
end = time.time()
|
||||
|
||||
log.info(f"Import completion signalled after {end - start}s {import_error=}")
|
||||
|
||||
if import_error:
|
||||
raise RuntimeError(f"Import error: {import_error}")
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
stop_chaos = threading.Event()
|
||||
|
||||
wait_for_import_completion_fut = executor.submit(wait_for_import_completion)
|
||||
chaos_fut = executor.submit(chaos, stop_chaos)
|
||||
|
||||
try:
|
||||
wait_for_import_completion_fut.result()
|
||||
except Exception as e:
|
||||
raise e
|
||||
finally:
|
||||
stop_chaos.set()
|
||||
chaos_fut.result()
|
||||
|
||||
import_branch_name = "imported"
|
||||
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
|
||||
endpoint = env.endpoints.create_start(branch_name=import_branch_name, tenant_id=tenant_id)
|
||||
|
||||
# Validate the imported data is legit
|
||||
assert endpoint.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"SET statement_timeout='300s';",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]]
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
# Validate writes
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name=import_branch_name)
|
||||
workload.init()
|
||||
workload.write_rows(64)
|
||||
workload.validate()
|
||||
|
||||
|
||||
def test_fast_import_with_pageserver_ingest(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
|
||||
@@ -20,9 +20,6 @@ from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="We won't create future layers any more after https://github.com/neondatabase/neon/pull/10548"
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"attach_mode",
|
||||
["default_generation", "same_generation"],
|
||||
|
||||
130
test_runner/regress/test_replica_promotes.py
Normal file
130
test_runner/regress/test_replica_promotes.py
Normal file
@@ -0,0 +1,130 @@
|
||||
"""
|
||||
File with secondary->primary promotion testing.
|
||||
|
||||
This far, only contains a test that we don't break and that the data is persisted.
|
||||
"""
|
||||
|
||||
import psycopg2
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_replica_caughtup, wait_for_last_flush_lsn
|
||||
from fixtures.pg_version import PgVersion
|
||||
from pytest import raises
|
||||
|
||||
|
||||
def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
"""
|
||||
Test that a replica safely promotes, and can commit data updates which
|
||||
show up when the primary boots up after the promoted secondary endpoint
|
||||
shut down.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env: NeonEnv = neon_simple_env
|
||||
primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
|
||||
with primary.connect() as primary_conn:
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute(
|
||||
"create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)"
|
||||
)
|
||||
primary_cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
|
||||
primary_cur.execute(
|
||||
"""
|
||||
SELECT pg_current_wal_insert_lsn(),
|
||||
pg_current_wal_lsn(),
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"Primary: Current LSN after workload is {primary_cur.fetchone()}")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
with secondary.connect() as secondary_conn:
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
|
||||
assert secondary_cur.fetchone() == (100,)
|
||||
|
||||
with raises(psycopg2.Error):
|
||||
secondary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
|
||||
secondary_conn.commit()
|
||||
|
||||
secondary_conn.rollback()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100,)
|
||||
|
||||
primary.stop_and_destroy(mode="immediate")
|
||||
primary = None
|
||||
|
||||
# Reconnect to the secondary to make sure we get a read-write connection
|
||||
with secondary.connect() as promo_conn:
|
||||
promo_cur = promo_conn.cursor()
|
||||
|
||||
promo_cur.execute("SELECT * FROM pg_promote()")
|
||||
assert promo_cur.fetchone() == (True,)
|
||||
promo_cur.execute(
|
||||
"""
|
||||
SELECT pg_current_wal_insert_lsn(),
|
||||
pg_current_wal_lsn(),
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"Secondary: LSN after promotion is {promo_cur.fetchone()}")
|
||||
|
||||
# Reconnect to the secondary to make sure we get a read-write connection
|
||||
with secondary.connect() as new_primary_conn:
|
||||
new_primary_cur = new_primary_conn.cursor()
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (100,)
|
||||
|
||||
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload")
|
||||
assert new_primary_cur.fetchall() == [(it,) for it in range(101, 201)]
|
||||
|
||||
new_primary_cur = new_primary_conn.cursor()
|
||||
new_primary_cur.execute("select payload from t")
|
||||
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
|
||||
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (200,)
|
||||
new_primary_cur.execute(
|
||||
"""
|
||||
SELECT pg_current_wal_insert_lsn(),
|
||||
pg_current_wal_lsn(),
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"Secondary: LSN after workload is {new_primary_cur.fetchone()}")
|
||||
|
||||
|
||||
with secondary.connect() as second_viewpoint_conn:
|
||||
new_primary_cur = second_viewpoint_conn.cursor()
|
||||
new_primary_cur.execute("select payload from t")
|
||||
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
|
||||
|
||||
wait_for_last_flush_lsn(env, secondary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
secondary.stop_and_destroy()
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
|
||||
with primary.connect() as new_primary:
|
||||
new_primary_cur = new_primary.cursor()
|
||||
new_primary_cur.execute(
|
||||
"""
|
||||
SELECT pg_current_wal_insert_lsn(),
|
||||
pg_current_wal_lsn(),
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"New primary: Boot LSN is {new_primary_cur.fetchone()}")
|
||||
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (200,)
|
||||
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(201, 300)")
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (300,)
|
||||
|
||||
primary.stop(mode="immediate")
|
||||
@@ -4158,12 +4158,17 @@ def test_storcon_create_delete_sk_down(
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
with env.endpoints.create("main", tenant_id=tenant_id) as ep:
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
with env.endpoints.create("child_of_main", tenant_id=tenant_id) as ep:
|
||||
with env.endpoints.create(
|
||||
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
|
||||
) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
@@ -4192,10 +4197,10 @@ def test_storcon_create_delete_sk_down(
|
||||
# ensure the safekeeper deleted the timeline
|
||||
def timeline_deleted_on_active_sks():
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"((deleting timeline|Timeline) {tenant_id}/{child_timeline_id} (from disk|was already deleted)|DELETE.*tenant/{tenant_id} .*status: 200 OK)"
|
||||
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
|
||||
)
|
||||
env.safekeepers[2].assert_log_contains(
|
||||
f"((deleting timeline|Timeline) {tenant_id}/{child_timeline_id} (from disk|was already deleted)|DELETE.*tenant/{tenant_id} .*status: 200 OK)"
|
||||
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
|
||||
)
|
||||
|
||||
wait_until(timeline_deleted_on_active_sks)
|
||||
@@ -4210,7 +4215,7 @@ def test_storcon_create_delete_sk_down(
|
||||
# ensure that there is log msgs for the third safekeeper too
|
||||
def timeline_deleted_on_sk():
|
||||
env.safekeepers[1].assert_log_contains(
|
||||
f"((deleting timeline|Timeline) {tenant_id}/{child_timeline_id} (from disk|was already deleted)|DELETE.*tenant/{tenant_id} .*status: 200 OK)"
|
||||
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
|
||||
)
|
||||
|
||||
wait_until(timeline_deleted_on_sk)
|
||||
@@ -4244,12 +4249,17 @@ def test_storcon_few_sk(
|
||||
|
||||
env.safekeepers[0].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
|
||||
|
||||
with env.endpoints.create("main", tenant_id=tenant_id) as ep:
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
with env.endpoints.create("child_of_main", tenant_id=tenant_id) as ep:
|
||||
with env.endpoints.create(
|
||||
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
|
||||
) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
@@ -10,7 +10,6 @@ from queue import Empty, Queue
|
||||
from threading import Barrier
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.common_types import Lsn, TimelineArchivalState, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
@@ -402,25 +401,8 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots
|
||||
"earlier", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_pipe
|
||||
)
|
||||
|
||||
snapshot_branchpoint_old = TimelineId.generate()
|
||||
|
||||
env.storage_controller.timeline_create(
|
||||
env.initial_tenant,
|
||||
{
|
||||
"new_timeline_id": str(snapshot_branchpoint_old),
|
||||
"ancestor_start_lsn": str(branchpoint_y),
|
||||
"ancestor_timeline_id": str(env.initial_timeline),
|
||||
"read_only": True,
|
||||
},
|
||||
)
|
||||
sk = env.safekeepers[0]
|
||||
assert sk
|
||||
with pytest.raises(requests.exceptions.HTTPError, match="Not Found"):
|
||||
sk.http_client().timeline_status(
|
||||
tenant_id=env.initial_tenant, timeline_id=snapshot_branchpoint_old
|
||||
)
|
||||
env.neon_cli.mappings_map_branch(
|
||||
"snapshot_branchpoint_old", env.initial_tenant, snapshot_branchpoint_old
|
||||
snapshot_branchpoint_old = env.create_branch(
|
||||
"snapshot_branchpoint_old", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_y
|
||||
)
|
||||
|
||||
snapshot_branchpoint = env.create_branch(
|
||||
|
||||
@@ -2012,7 +2012,10 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
ep = env.endpoints.create("main")
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
ep = env.endpoints.create("main", config_lines=config_lines)
|
||||
|
||||
# expected to fail because timeline is not created on safekeepers
|
||||
with pytest.raises(Exception, match=r".*timed out.*"):
|
||||
@@ -2040,7 +2043,10 @@ def test_explicit_timeline_creation_storcon(neon_env_builder: NeonEnvBuilder):
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
ep = env.endpoints.create("main")
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
ep = env.endpoints.create("main", config_lines=config_lines)
|
||||
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
|
||||
@@ -637,7 +637,10 @@ async def quorum_sanity_single(
|
||||
# create timeline on `members_sks`
|
||||
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, members_sks)
|
||||
|
||||
ep = env.endpoints.create(branch_name)
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
ep = env.endpoints.create(branch_name, config_lines=config_lines)
|
||||
ep.start(safekeeper_generation=1, safekeepers=compute_sks_ids)
|
||||
ep.safe_psql("create table t(key int, value text)")
|
||||
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 55c0d45abe...78c1568afb
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: de7640f55d...f2cd317375
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 0bf96bd6d7...3d6896afaa
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 8be779fd3a...28b88cfedf
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.5",
|
||||
"8be779fd3ab9e87206da96a7e4842ef1abf04f44"
|
||||
"28b88cfedf9a6028cf7ea32a80ea15a9bf971803"
|
||||
],
|
||||
"v16": [
|
||||
"16.9",
|
||||
"0bf96bd6d70301a0b43b0b3457bb3cf8fb43c198"
|
||||
"3d6896afaa89c05b693e9a29275001b801b8b479"
|
||||
],
|
||||
"v15": [
|
||||
"15.13",
|
||||
"de7640f55da07512834d5cc40c4b3fb376b5f04f"
|
||||
"f2cd31737536218c39c66f920ff62f72c51d0d61"
|
||||
],
|
||||
"v14": [
|
||||
"14.18",
|
||||
"55c0d45abe6467c02084c2192bca117eda6ce1e7"
|
||||
"78c1568afbbacc1604ac5e5cc2ebc9b7ed0cfde9"
|
||||
]
|
||||
}
|
||||
|
||||
@@ -18,8 +18,6 @@ license.workspace = true
|
||||
ahash = { version = "0.8" }
|
||||
anstream = { version = "0.6" }
|
||||
anyhow = { version = "1", features = ["backtrace"] }
|
||||
axum = { version = "0.8", features = ["ws"] }
|
||||
axum-core = { version = "0.5", default-features = false, features = ["tracing"] }
|
||||
base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] }
|
||||
base64-647d43efb71741da = { package = "base64", version = "0.21" }
|
||||
base64ct = { version = "1", default-features = false, features = ["std"] }
|
||||
@@ -41,8 +39,10 @@ env_logger = { version = "0.11" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
form_urlencoded = { version = "1" }
|
||||
futures-channel = { version = "0.3", features = ["sink"] }
|
||||
futures-core = { version = "0.3" }
|
||||
futures-executor = { version = "0.3" }
|
||||
futures-io = { version = "0.3" }
|
||||
futures-task = { version = "0.3", default-features = false, features = ["std"] }
|
||||
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
|
||||
generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] }
|
||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
@@ -52,7 +52,7 @@ hex = { version = "0.4", features = ["serde"] }
|
||||
hmac = { version = "0.12", default-features = false, features = ["reset"] }
|
||||
hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "runtime", "server", "stream"] }
|
||||
hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] }
|
||||
hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] }
|
||||
hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "server", "service"] }
|
||||
indexmap = { version = "2", features = ["serde"] }
|
||||
itertools = { version = "0.12" }
|
||||
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
|
||||
@@ -72,6 +72,7 @@ num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
p256 = { version = "0.13", features = ["jwk"] }
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
percent-encoding = { version = "2" }
|
||||
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
regex = { version = "1" }
|
||||
@@ -97,7 +98,7 @@ tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unpref
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["full", "test-util"] }
|
||||
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
tokio-stream = { version = "0.1" }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
toml_edit = { version = "0.22", features = ["serde"] }
|
||||
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }
|
||||
|
||||
Reference in New Issue
Block a user