mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Compare commits
25 Commits
amasterov/
...
erik/grpc-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2d2d29bf38 | ||
|
|
e8ebb8e433 | ||
|
|
232591e457 | ||
|
|
8daf272561 | ||
|
|
67ddf1de28 | ||
|
|
541fcd8d2f | ||
|
|
e77961c1c6 | ||
|
|
cdfa06caad | ||
|
|
f0bb93a9c9 | ||
|
|
30adf8e2bd | ||
|
|
5d538a9503 | ||
|
|
f3976e5c60 | ||
|
|
9657fbc194 | ||
|
|
dd501554c9 | ||
|
|
fe1513ca57 | ||
|
|
3e86008e66 | ||
|
|
23fc611461 | ||
|
|
dc953de85d | ||
|
|
841517ee37 | ||
|
|
1369d73dcd | ||
|
|
7cd0defaf0 | ||
|
|
a082f9814a | ||
|
|
ec991877f4 | ||
|
|
abc6c84262 | ||
|
|
6768a71c86 |
40
Cargo.lock
generated
40
Cargo.lock
generated
@@ -4236,6 +4236,7 @@ name = "pagebench"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"camino",
|
||||
"clap",
|
||||
"futures",
|
||||
@@ -4244,12 +4245,15 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_page_api",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
@@ -4321,6 +4325,7 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_compaction",
|
||||
"pageserver_page_api",
|
||||
"pem",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
@@ -4329,6 +4334,7 @@ dependencies = [
|
||||
"postgres_connection",
|
||||
"postgres_ffi",
|
||||
"postgres_initdb",
|
||||
"posthog_client_lite",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"procfs",
|
||||
@@ -4363,6 +4369,8 @@ dependencies = [
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tonic 0.13.1",
|
||||
"tonic-reflection",
|
||||
"tracing",
|
||||
"tracing-utils",
|
||||
"twox-hash",
|
||||
@@ -4455,9 +4463,15 @@ dependencies = [
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"pageserver_api",
|
||||
"postgres_ffi",
|
||||
"prost 0.13.5",
|
||||
"smallvec",
|
||||
"thiserror 1.0.69",
|
||||
"tonic 0.13.1",
|
||||
"tonic-build",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -4898,11 +4912,16 @@ name = "posthog_client_lite"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -7520,8 +7539,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"h2 0.4.4",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
@@ -7532,6 +7553,7 @@ dependencies = [
|
||||
"pin-project",
|
||||
"prost 0.13.5",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.2",
|
||||
"tokio-stream",
|
||||
@@ -7555,6 +7577,19 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic-reflection"
|
||||
version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9687bd5bfeafebdded2356950f278bba8226f0b32109537c4253406e09aafe1"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.3",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic 0.13.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.13"
|
||||
@@ -8526,6 +8561,8 @@ dependencies = [
|
||||
"ahash",
|
||||
"anstream",
|
||||
"anyhow",
|
||||
"axum",
|
||||
"axum-core",
|
||||
"base64 0.13.1",
|
||||
"base64 0.21.7",
|
||||
"base64ct",
|
||||
@@ -8548,10 +8585,8 @@ dependencies = [
|
||||
"fail",
|
||||
"form_urlencoded",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
"futures-io",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
"generic-array",
|
||||
"getrandom 0.2.11",
|
||||
@@ -8581,7 +8616,6 @@ dependencies = [
|
||||
"once_cell",
|
||||
"p256 0.13.2",
|
||||
"parquet",
|
||||
"percent-encoding",
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
"prost 0.13.5",
|
||||
|
||||
@@ -199,7 +199,8 @@ tokio-tar = "0.3"
|
||||
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
|
||||
toml = "0.8"
|
||||
toml_edit = "0.22"
|
||||
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "tls-ring", "tls-native-roots"] }
|
||||
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
|
||||
tonic-reflection = { version = "0.13.1", features = ["server"] }
|
||||
tower = { version = "0.5.2", default-features = false }
|
||||
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
|
||||
|
||||
@@ -246,6 +247,7 @@ azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rus
|
||||
## Local libraries
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
|
||||
desim = { version = "0.1", path = "./libs/desim" }
|
||||
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
|
||||
http-utils = { version = "0.1", path = "./libs/http-utils/" }
|
||||
metrics = { version = "0.1", path = "./libs/metrics/" }
|
||||
@@ -258,19 +260,19 @@ postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
|
||||
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
|
||||
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
|
||||
postgres_initdb = { path = "./libs/postgres_initdb" }
|
||||
posthog_client_lite = { version = "0.1", path = "./libs/posthog_client_lite" }
|
||||
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
|
||||
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
|
||||
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
|
||||
safekeeper_client = { path = "./safekeeper/client" }
|
||||
desim = { version = "0.1", path = "./libs/desim" }
|
||||
storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy.
|
||||
storage_controller_client = { path = "./storage_controller/client" }
|
||||
tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" }
|
||||
tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" }
|
||||
utils = { version = "0.1", path = "./libs/utils/" }
|
||||
vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" }
|
||||
walproposer = { version = "0.1", path = "./libs/walproposer/" }
|
||||
wal_decoder = { version = "0.1", path = "./libs/wal_decoder" }
|
||||
walproposer = { version = "0.1", path = "./libs/walproposer/" }
|
||||
|
||||
## Common library dependency
|
||||
workspace_hack = { version = "0.1", path = "./workspace_hack/" }
|
||||
|
||||
@@ -155,7 +155,7 @@ RUN set -e \
|
||||
|
||||
# Keep the version the same as in compute/compute-node.Dockerfile and
|
||||
# test_runner/regress/test_compute_metrics.py.
|
||||
ENV SQL_EXPORTER_VERSION=0.17.0
|
||||
ENV SQL_EXPORTER_VERSION=0.17.3
|
||||
RUN curl -fsSL \
|
||||
"https://github.com/burningalchemist/sql_exporter/releases/download/${SQL_EXPORTER_VERSION}/sql_exporter-${SQL_EXPORTER_VERSION}.linux-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac).tar.gz" \
|
||||
--output sql_exporter.tar.gz \
|
||||
|
||||
@@ -1784,17 +1784,17 @@ ARG TARGETARCH
|
||||
RUN if [ "$TARGETARCH" = "amd64" ]; then\
|
||||
postgres_exporter_sha256='59aa4a7bb0f7d361f5e05732f5ed8c03cc08f78449cef5856eadec33a627694b';\
|
||||
pgbouncer_exporter_sha256='c9f7cf8dcff44f0472057e9bf52613d93f3ffbc381ad7547a959daa63c5e84ac';\
|
||||
sql_exporter_sha256='38e439732bbf6e28ca4a94d7bc3686d3fa1abdb0050773d5617a9efdb9e64d08';\
|
||||
sql_exporter_sha256='9a41127a493e8bfebfe692bf78c7ed2872a58a3f961ee534d1b0da9ae584aaab';\
|
||||
else\
|
||||
postgres_exporter_sha256='d1dedea97f56c6d965837bfd1fbb3e35a3b4a4556f8cccee8bd513d8ee086124';\
|
||||
pgbouncer_exporter_sha256='217c4afd7e6492ae904055bc14fe603552cf9bac458c063407e991d68c519da3';\
|
||||
sql_exporter_sha256='11918b00be6e2c3a67564adfdb2414fdcbb15a5db76ea17d1d1a944237a893c6';\
|
||||
sql_exporter_sha256='530e6afc77c043497ed965532c4c9dfa873bc2a4f0b3047fad367715c0081d6a';\
|
||||
fi\
|
||||
&& curl -sL https://github.com/prometheus-community/postgres_exporter/releases/download/v0.17.1/postgres_exporter-0.17.1.linux-${TARGETARCH}.tar.gz\
|
||||
| tar xzf - --strip-components=1 -C.\
|
||||
&& curl -sL https://github.com/prometheus-community/pgbouncer_exporter/releases/download/v0.10.2/pgbouncer_exporter-0.10.2.linux-${TARGETARCH}.tar.gz\
|
||||
| tar xzf - --strip-components=1 -C.\
|
||||
&& curl -sL https://github.com/burningalchemist/sql_exporter/releases/download/0.17.0/sql_exporter-0.17.0.linux-${TARGETARCH}.tar.gz\
|
||||
&& curl -sL https://github.com/burningalchemist/sql_exporter/releases/download/0.17.3/sql_exporter-0.17.3.linux-${TARGETARCH}.tar.gz\
|
||||
| tar xzf - --strip-components=1 -C.\
|
||||
&& echo "${postgres_exporter_sha256} postgres_exporter" | sha256sum -c -\
|
||||
&& echo "${pgbouncer_exporter_sha256} pgbouncer_exporter" | sha256sum -c -\
|
||||
@@ -1847,7 +1847,7 @@ COPY docker-compose/ext-src/ /ext-src/
|
||||
COPY --from=pg-build /postgres /postgres
|
||||
#COPY --from=postgis-src /ext-src/ /ext-src/
|
||||
COPY --from=plv8-src /ext-src/ /ext-src/
|
||||
#COPY --from=h3-pg-src /ext-src/ /ext-src/
|
||||
COPY --from=h3-pg-src /ext-src/h3-pg-src /ext-src/h3-pg-src
|
||||
COPY --from=postgresql-unit-src /ext-src/ /ext-src/
|
||||
COPY --from=pgvector-src /ext-src/ /ext-src/
|
||||
COPY --from=pgjwt-src /ext-src/ /ext-src/
|
||||
|
||||
@@ -136,6 +136,10 @@ struct Cli {
|
||||
requires = "compute-id"
|
||||
)]
|
||||
pub control_plane_uri: Option<String>,
|
||||
|
||||
/// Interval in seconds for collecting installed extensions statistics
|
||||
#[arg(long, default_value = "3600")]
|
||||
pub installed_extensions_collection_interval: u64,
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
@@ -179,6 +183,7 @@ fn main() -> Result<()> {
|
||||
cgroup: cli.cgroup,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor_addr: cli.vm_monitor_addr,
|
||||
installed_extensions_collection_interval: cli.installed_extensions_collection_interval,
|
||||
},
|
||||
config,
|
||||
)?;
|
||||
|
||||
@@ -97,6 +97,9 @@ pub struct ComputeNodeParams {
|
||||
|
||||
/// the address of extension storage proxy gateway
|
||||
pub remote_ext_base_url: Option<String>,
|
||||
|
||||
/// Interval for installed extensions collection
|
||||
pub installed_extensions_collection_interval: u64,
|
||||
}
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
@@ -695,25 +698,18 @@ impl ComputeNode {
|
||||
let log_directory_path = Path::new(&self.params.pgdata).join("log");
|
||||
let log_directory_path = log_directory_path.to_string_lossy().to_string();
|
||||
|
||||
// Add project_id,endpoint_id tag to identify the logs.
|
||||
// Add project_id,endpoint_id to identify the logs.
|
||||
//
|
||||
// These ids are passed from cplane,
|
||||
// for backwards compatibility (old computes that don't have them),
|
||||
// we set them to None.
|
||||
// TODO: Clean up this code when all computes have them.
|
||||
let tag: Option<String> = match (
|
||||
pspec.spec.project_id.as_deref(),
|
||||
pspec.spec.endpoint_id.as_deref(),
|
||||
) {
|
||||
(Some(project_id), Some(endpoint_id)) => {
|
||||
Some(format!("{project_id}/{endpoint_id}"))
|
||||
}
|
||||
(Some(project_id), None) => Some(format!("{project_id}/None")),
|
||||
(None, Some(endpoint_id)) => Some(format!("None,{endpoint_id}")),
|
||||
(None, None) => None,
|
||||
};
|
||||
let endpoint_id = pspec.spec.endpoint_id.as_deref().unwrap_or("");
|
||||
let project_id = pspec.spec.project_id.as_deref().unwrap_or("");
|
||||
|
||||
configure_audit_rsyslog(log_directory_path.clone(), tag, &remote_endpoint)?;
|
||||
configure_audit_rsyslog(
|
||||
log_directory_path.clone(),
|
||||
endpoint_id,
|
||||
project_id,
|
||||
&remote_endpoint,
|
||||
)?;
|
||||
|
||||
// Launch a background task to clean up the audit logs
|
||||
launch_pgaudit_gc(log_directory_path);
|
||||
@@ -749,17 +745,7 @@ impl ComputeNode {
|
||||
|
||||
let conf = self.get_tokio_conn_conf(None);
|
||||
tokio::task::spawn(async {
|
||||
let res = get_installed_extensions(conf).await;
|
||||
match res {
|
||||
Ok(extensions) => {
|
||||
info!(
|
||||
"[NEON_EXT_STAT] {}",
|
||||
serde_json::to_string(&extensions)
|
||||
.expect("failed to serialize extensions list")
|
||||
);
|
||||
}
|
||||
Err(err) => error!("could not get installed extensions: {err:?}"),
|
||||
}
|
||||
let _ = installed_extensions(conf).await;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -789,6 +775,9 @@ impl ComputeNode {
|
||||
// Log metrics so that we can search for slow operations in logs
|
||||
info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished");
|
||||
|
||||
// Spawn the extension stats background task
|
||||
self.spawn_extension_stats_task();
|
||||
|
||||
if pspec.spec.prewarm_lfc_on_startup {
|
||||
self.prewarm_lfc();
|
||||
}
|
||||
@@ -2199,6 +2188,41 @@ LIMIT 100",
|
||||
info!("Pageserver config changed");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_extension_stats_task(&self) {
|
||||
let conf = self.tokio_conn_conf.clone();
|
||||
let installed_extensions_collection_interval =
|
||||
self.params.installed_extensions_collection_interval;
|
||||
tokio::spawn(async move {
|
||||
// An initial sleep is added to ensure that two collections don't happen at the same time.
|
||||
// The first collection happens during compute startup.
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(
|
||||
installed_extensions_collection_interval,
|
||||
))
|
||||
.await;
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
|
||||
installed_extensions_collection_interval,
|
||||
));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let _ = installed_extensions(conf.clone()).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn installed_extensions(conf: tokio_postgres::Config) -> Result<()> {
|
||||
let res = get_installed_extensions(conf).await;
|
||||
match res {
|
||||
Ok(extensions) => {
|
||||
info!(
|
||||
"[NEON_EXT_STAT] {}",
|
||||
serde_json::to_string(&extensions).expect("failed to serialize extensions list")
|
||||
);
|
||||
}
|
||||
Err(err) => error!("could not get installed extensions: {err:?}"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn forward_termination_signal() {
|
||||
|
||||
@@ -2,10 +2,24 @@
|
||||
module(load="imfile")
|
||||
|
||||
# Input configuration for log files in the specified directory
|
||||
# Replace {log_directory} with the directory containing the log files
|
||||
input(type="imfile" File="{log_directory}/*.log" Tag="{tag}" Severity="info" Facility="local0")
|
||||
# The messages can be multiline. The start of the message is a timestamp
|
||||
# in "%Y-%m-%d %H:%M:%S.%3N GMT" (so timezone hardcoded).
|
||||
# Replace log_directory with the directory containing the log files
|
||||
input(type="imfile" File="{log_directory}/*.log"
|
||||
Tag="pgaudit_log" Severity="info" Facility="local5"
|
||||
startmsg.regex="^[[:digit:]]{{4}}-[[:digit:]]{{2}}-[[:digit:]]{{2}} [[:digit:]]{{2}}:[[:digit:]]{{2}}:[[:digit:]]{{2}}.[[:digit:]]{{3}} GMT,")
|
||||
|
||||
# the directory to store rsyslog state files
|
||||
global(workDirectory="/var/log/rsyslog")
|
||||
|
||||
# Forward logs to remote syslog server
|
||||
*.* @@{remote_endpoint}
|
||||
# Construct json, endpoint_id and project_id as additional metadata
|
||||
set $.json_log!endpoint_id = "{endpoint_id}";
|
||||
set $.json_log!project_id = "{project_id}";
|
||||
set $.json_log!msg = $msg;
|
||||
|
||||
# Template suitable for rfc5424 syslog format
|
||||
template(name="PgAuditLog" type="string"
|
||||
string="<%PRI%>1 %TIMESTAMP:::date-rfc3339% %HOSTNAME% - - - - %$.json_log%")
|
||||
|
||||
# Forward to remote syslog receiver (@@<hostname>:<port>;format
|
||||
local5.info @@{remote_endpoint};PgAuditLog
|
||||
|
||||
@@ -84,13 +84,15 @@ fn restart_rsyslog() -> Result<()> {
|
||||
|
||||
pub fn configure_audit_rsyslog(
|
||||
log_directory: String,
|
||||
tag: Option<String>,
|
||||
endpoint_id: &str,
|
||||
project_id: &str,
|
||||
remote_endpoint: &str,
|
||||
) -> Result<()> {
|
||||
let config_content: String = format!(
|
||||
include_str!("config_template/compute_audit_rsyslog_template.conf"),
|
||||
log_directory = log_directory,
|
||||
tag = tag.unwrap_or("".to_string()),
|
||||
endpoint_id = endpoint_id,
|
||||
project_id = project_id,
|
||||
remote_endpoint = remote_endpoint
|
||||
);
|
||||
|
||||
|
||||
@@ -2,8 +2,10 @@
|
||||
[pageserver]
|
||||
listen_pg_addr = '127.0.0.1:64000'
|
||||
listen_http_addr = '127.0.0.1:9898'
|
||||
listen_grpc_addr = '127.0.0.1:51051'
|
||||
pg_auth_type = 'Trust'
|
||||
http_auth_type = 'Trust'
|
||||
grpc_auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
id = 1
|
||||
|
||||
@@ -4,8 +4,10 @@
|
||||
id=1
|
||||
listen_pg_addr = '127.0.0.1:64000'
|
||||
listen_http_addr = '127.0.0.1:9898'
|
||||
listen_grpc_addr = '127.0.0.1:51051'
|
||||
pg_auth_type = 'Trust'
|
||||
http_auth_type = 'Trust'
|
||||
grpc_auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
id = 1
|
||||
|
||||
@@ -32,6 +32,7 @@ use control_plane::storage_controller::{
|
||||
};
|
||||
use nix::fcntl::{Flock, FlockArg};
|
||||
use pageserver_api::config::{
|
||||
DEFAULT_GRPC_LISTEN_PORT as DEFAULT_PAGESERVER_GRPC_PORT,
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
|
||||
};
|
||||
@@ -1007,13 +1008,16 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
|
||||
let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
|
||||
let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
|
||||
let grpc_port = DEFAULT_PAGESERVER_GRPC_PORT + i;
|
||||
NeonLocalInitPageserverConf {
|
||||
id: pageserver_id,
|
||||
listen_pg_addr: format!("127.0.0.1:{pg_port}"),
|
||||
listen_http_addr: format!("127.0.0.1:{http_port}"),
|
||||
listen_https_addr: None,
|
||||
listen_grpc_addr: Some(format!("127.0.0.1:{grpc_port}")),
|
||||
pg_auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
grpc_auth_type: AuthType::Trust,
|
||||
other: Default::default(),
|
||||
// Typical developer machines use disks with slow fsync, and we don't care
|
||||
// about data integrity: disable disk syncs.
|
||||
@@ -1275,6 +1279,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
|
||||
mode: pageserver_api::models::TimelineCreateRequestMode::Branch {
|
||||
ancestor_timeline_id,
|
||||
ancestor_start_lsn: start_lsn,
|
||||
read_only: false,
|
||||
pg_version: None,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -278,8 +278,10 @@ pub struct PageServerConf {
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub pg_auth_type: AuthType,
|
||||
pub http_auth_type: AuthType,
|
||||
pub grpc_auth_type: AuthType,
|
||||
pub no_sync: bool,
|
||||
}
|
||||
|
||||
@@ -290,8 +292,10 @@ impl Default for PageServerConf {
|
||||
listen_pg_addr: String::new(),
|
||||
listen_http_addr: String::new(),
|
||||
listen_https_addr: None,
|
||||
listen_grpc_addr: None,
|
||||
pg_auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
grpc_auth_type: AuthType::Trust,
|
||||
no_sync: false,
|
||||
}
|
||||
}
|
||||
@@ -306,8 +310,10 @@ pub struct NeonLocalInitPageserverConf {
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub pg_auth_type: AuthType,
|
||||
pub http_auth_type: AuthType,
|
||||
pub grpc_auth_type: AuthType,
|
||||
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
|
||||
pub no_sync: bool,
|
||||
#[serde(flatten)]
|
||||
@@ -321,8 +327,10 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
grpc_auth_type,
|
||||
no_sync,
|
||||
other: _,
|
||||
} = conf;
|
||||
@@ -331,7 +339,9 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
|
||||
listen_pg_addr: listen_pg_addr.clone(),
|
||||
listen_http_addr: listen_http_addr.clone(),
|
||||
listen_https_addr: listen_https_addr.clone(),
|
||||
listen_grpc_addr: listen_grpc_addr.clone(),
|
||||
pg_auth_type: *pg_auth_type,
|
||||
grpc_auth_type: *grpc_auth_type,
|
||||
http_auth_type: *http_auth_type,
|
||||
no_sync: *no_sync,
|
||||
}
|
||||
@@ -707,8 +717,10 @@ impl LocalEnv {
|
||||
listen_pg_addr: String,
|
||||
listen_http_addr: String,
|
||||
listen_https_addr: Option<String>,
|
||||
listen_grpc_addr: Option<String>,
|
||||
pg_auth_type: AuthType,
|
||||
http_auth_type: AuthType,
|
||||
grpc_auth_type: AuthType,
|
||||
#[serde(default)]
|
||||
no_sync: bool,
|
||||
}
|
||||
@@ -732,8 +744,10 @@ impl LocalEnv {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
grpc_auth_type,
|
||||
no_sync,
|
||||
} = config_toml;
|
||||
let IdentityTomlSubset {
|
||||
@@ -750,8 +764,10 @@ impl LocalEnv {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
grpc_auth_type,
|
||||
no_sync,
|
||||
};
|
||||
pageservers.push(conf);
|
||||
|
||||
@@ -129,7 +129,9 @@ impl PageServerNode {
|
||||
));
|
||||
}
|
||||
|
||||
if conf.http_auth_type != AuthType::Trust || conf.pg_auth_type != AuthType::Trust {
|
||||
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type]
|
||||
.contains(&AuthType::NeonJWT)
|
||||
{
|
||||
// Keys are generated in the toplevel repo dir, pageservers' workdirs
|
||||
// are one level below that, so refer to keys with ../
|
||||
overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());
|
||||
|
||||
@@ -20,7 +20,7 @@ first_path="$(ldconfig --verbose 2>/dev/null \
|
||||
| grep --invert-match ^$'\t' \
|
||||
| cut --delimiter=: --fields=1 \
|
||||
| head --lines=1)"
|
||||
test "$first_path" == '/usr/local/lib' || true # Remove the || true in a follow-up PR. Needed for backwards compat.
|
||||
test "$first_path" == '/usr/local/lib'
|
||||
|
||||
echo "Waiting pageserver become ready."
|
||||
while ! nc -z pageserver 6400; do
|
||||
|
||||
16
docker-compose/ext-src/h3-pg-src/neon-test.sh
Executable file
16
docker-compose/ext-src/h3-pg-src/neon-test.sh
Executable file
@@ -0,0 +1,16 @@
|
||||
#!/usr/bin/env bash
|
||||
set -ex
|
||||
cd "$(dirname "${0}")"
|
||||
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
|
||||
dropdb --if-exists contrib_regression
|
||||
createdb contrib_regression
|
||||
cd h3_postgis/test
|
||||
psql -d contrib_regression -c "CREATE EXTENSION postgis" -c "CREATE EXTENSION postgis_raster" -c "CREATE EXTENSION h3" -c "CREATE EXTENSION h3_postgis"
|
||||
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
|
||||
${PG_REGRESS} --use-existing --dbname contrib_regression ${TESTS}
|
||||
cd ../../h3/test
|
||||
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
|
||||
dropdb --if-exists contrib_regression
|
||||
createdb contrib_regression
|
||||
psql -d contrib_regression -c "CREATE EXTENSION h3"
|
||||
${PG_REGRESS} --use-existing --dbname contrib_regression ${TESTS}
|
||||
7
docker-compose/ext-src/h3-pg-src/test-upgrade.sh
Executable file
7
docker-compose/ext-src/h3-pg-src/test-upgrade.sh
Executable file
@@ -0,0 +1,7 @@
|
||||
#!/bin/sh
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
|
||||
cd h3/test
|
||||
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
|
||||
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression ${TESTS}
|
||||
6
docker-compose/ext-src/online_advisor-src/neon-test.sh
Executable file
6
docker-compose/ext-src/online_advisor-src/neon-test.sh
Executable file
@@ -0,0 +1,6 @@
|
||||
#!/bin/sh
|
||||
set -ex
|
||||
cd "$(dirname "${0}")"
|
||||
if [ -f Makefile ]; then
|
||||
make installcheck
|
||||
fi
|
||||
9
docker-compose/ext-src/online_advisor-src/regular-test.sh
Executable file
9
docker-compose/ext-src/online_advisor-src/regular-test.sh
Executable file
@@ -0,0 +1,9 @@
|
||||
#!/bin/sh
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
[ -f Makefile ] || exit 0
|
||||
dropdb --if-exist contrib_regression
|
||||
createdb contrib_regression
|
||||
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
|
||||
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
|
||||
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression ${TESTS}
|
||||
@@ -82,7 +82,8 @@ EXTENSIONS='[
|
||||
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
|
||||
{"extname": "pgjwt", "extdir": "pgjwt-src"},
|
||||
{"extname": "pgtap", "extdir": "pgtap-src"},
|
||||
{"extname": "pg_repack", "extdir": "pg_repack-src"}
|
||||
{"extname": "pg_repack", "extdir": "pg_repack-src"},
|
||||
{"extname": "h3", "extdir": "h3-pg-src"}
|
||||
]'
|
||||
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
|
||||
COMPUTE_TAG=${NEW_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d
|
||||
|
||||
@@ -8,6 +8,8 @@ pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
// TODO: gRPC is disabled by default for now, but the port is used in neon_local.
|
||||
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::num::{NonZeroU64, NonZeroUsize};
|
||||
@@ -43,6 +45,21 @@ pub struct NodeMetadata {
|
||||
pub other: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
/// PostHog integration config.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct PostHogConfig {
|
||||
/// PostHog project ID
|
||||
pub project_id: String,
|
||||
/// Server-side (private) API key
|
||||
pub server_api_key: String,
|
||||
/// Client-side (public) API key
|
||||
pub client_api_key: String,
|
||||
/// Private API URL
|
||||
pub private_api_url: String,
|
||||
/// Public API URL
|
||||
pub public_api_url: String,
|
||||
}
|
||||
|
||||
/// `pageserver.toml`
|
||||
///
|
||||
/// We use serde derive with `#[serde(default)]` to generate a deserializer
|
||||
@@ -104,6 +121,7 @@ pub struct ConfigToml {
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
#[serde(with = "humantime_serde")]
|
||||
@@ -123,6 +141,7 @@ pub struct ConfigToml {
|
||||
pub http_auth_type: AuthType,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub pg_auth_type: AuthType,
|
||||
pub grpc_auth_type: AuthType,
|
||||
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub tenant_config: TenantConfigToml,
|
||||
@@ -182,6 +201,8 @@ pub struct ConfigToml {
|
||||
pub tracing: Option<Tracing>,
|
||||
pub enable_tls_page_service_api: bool,
|
||||
pub dev_mode: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub posthog_config: Option<PostHogConfig>,
|
||||
pub timeline_import_config: TimelineImportConfig,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub basebackup_cache_config: Option<BasebackupCacheConfig>,
|
||||
@@ -588,6 +609,7 @@ impl Default for ConfigToml {
|
||||
listen_pg_addr: (DEFAULT_PG_LISTEN_ADDR.to_string()),
|
||||
listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||
listen_https_addr: (None),
|
||||
listen_grpc_addr: None, // TODO: default to 127.0.0.1:51051
|
||||
ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE),
|
||||
ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE),
|
||||
ssl_cert_reload_period: Duration::from_secs(60),
|
||||
@@ -604,6 +626,7 @@ impl Default for ConfigToml {
|
||||
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()
|
||||
http_auth_type: (AuthType::Trust),
|
||||
pg_auth_type: (AuthType::Trust),
|
||||
grpc_auth_type: (AuthType::Trust),
|
||||
auth_validation_public_key_path: (None),
|
||||
remote_storage: None,
|
||||
broker_endpoint: (storage_broker::DEFAULT_ENDPOINT
|
||||
@@ -695,6 +718,7 @@ impl Default for ConfigToml {
|
||||
import_job_checkpoint_threshold: NonZeroUsize::new(128).unwrap(),
|
||||
},
|
||||
basebackup_cache_config: None,
|
||||
posthog_config: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -402,6 +402,8 @@ pub enum TimelineCreateRequestMode {
|
||||
// using a flattened enum, so, it was an accepted field, and
|
||||
// we continue to accept it by having it here.
|
||||
pg_version: Option<u32>,
|
||||
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
|
||||
read_only: bool,
|
||||
},
|
||||
ImportPgdata {
|
||||
import_pgdata: TimelineCreateRequestModeImportPgdata,
|
||||
@@ -1929,7 +1931,7 @@ pub enum PagestreamFeMessage {
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
#[derive(strum_macros::EnumProperty)]
|
||||
#[derive(Debug, strum_macros::EnumProperty)]
|
||||
pub enum PagestreamBeMessage {
|
||||
Exists(PagestreamExistsResponse),
|
||||
Nblocks(PagestreamNblocksResponse),
|
||||
@@ -2040,7 +2042,7 @@ pub enum PagestreamProtocolVersion {
|
||||
|
||||
pub type RequestId = u64;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamRequest {
|
||||
pub reqid: RequestId,
|
||||
pub request_lsn: Lsn,
|
||||
@@ -2059,7 +2061,7 @@ pub struct PagestreamNblocksRequest {
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamGetPageRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub rel: RelTag,
|
||||
|
||||
@@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize};
|
||||
// FIXME: should move 'forknum' as last field to keep this consistent with Postgres.
|
||||
// Then we could replace the custom Ord and PartialOrd implementations below with
|
||||
// deriving them. This will require changes in walredoproc.c.
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct RelTag {
|
||||
pub forknum: u8,
|
||||
pub spcnode: Oid,
|
||||
|
||||
@@ -6,9 +6,14 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde.workspace = true
|
||||
sha2.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
|
||||
tokio-util.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
tracing.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
59
libs/posthog_client_lite/src/background_loop.rs
Normal file
59
libs/posthog_client_lite/src/background_loop.rs
Normal file
@@ -0,0 +1,59 @@
|
||||
//! A background loop that fetches feature flags from PostHog and updates the feature store.
|
||||
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{FeatureStore, PostHogClient, PostHogClientConfig};
|
||||
|
||||
/// A background loop that fetches feature flags from PostHog and updates the feature store.
|
||||
pub struct FeatureResolverBackgroundLoop {
|
||||
posthog_client: PostHogClient,
|
||||
feature_store: ArcSwap<FeatureStore>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl FeatureResolverBackgroundLoop {
|
||||
pub fn new(config: PostHogClientConfig, shutdown_pageserver: CancellationToken) -> Self {
|
||||
Self {
|
||||
posthog_client: PostHogClient::new(config),
|
||||
feature_store: ArcSwap::new(Arc::new(FeatureStore::new())),
|
||||
cancel: shutdown_pageserver,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn(self: Arc<Self>, handle: &tokio::runtime::Handle, refresh_period: Duration) {
|
||||
let this = self.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
handle.spawn(async move {
|
||||
tracing::info!("Starting PostHog feature resolver");
|
||||
let mut ticker = tokio::time::interval(refresh_period);
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = ticker.tick() => {}
|
||||
_ = cancel.cancelled() => break
|
||||
}
|
||||
let resp = match this
|
||||
.posthog_client
|
||||
.get_feature_flags_local_evaluation()
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
tracing::warn!("Cannot get feature flags: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let feature_store = FeatureStore::new_with_flags(resp.flags);
|
||||
this.feature_store.store(Arc::new(feature_store));
|
||||
}
|
||||
tracing::info!("PostHog feature resolver stopped");
|
||||
});
|
||||
}
|
||||
|
||||
pub fn feature_store(&self) -> Arc<FeatureStore> {
|
||||
self.feature_store.load_full()
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,9 @@
|
||||
//! A lite version of the PostHog client that only supports local evaluation of feature flags.
|
||||
|
||||
mod background_loop;
|
||||
|
||||
pub use background_loop::FeatureResolverBackgroundLoop;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -20,8 +24,7 @@ pub enum PostHogEvaluationError {
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct LocalEvaluationResponse {
|
||||
#[allow(dead_code)]
|
||||
flags: Vec<LocalEvaluationFlag>,
|
||||
pub flags: Vec<LocalEvaluationFlag>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -34,7 +37,7 @@ pub struct LocalEvaluationFlag {
|
||||
#[derive(Deserialize)]
|
||||
pub struct LocalEvaluationFlagFilters {
|
||||
groups: Vec<LocalEvaluationFlagFilterGroup>,
|
||||
multivariate: LocalEvaluationFlagMultivariate,
|
||||
multivariate: Option<LocalEvaluationFlagMultivariate>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -94,6 +97,12 @@ impl FeatureStore {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_flags(flags: Vec<LocalEvaluationFlag>) -> Self {
|
||||
let mut store = Self::new();
|
||||
store.set_flags(flags);
|
||||
store
|
||||
}
|
||||
|
||||
pub fn set_flags(&mut self, flags: Vec<LocalEvaluationFlag>) {
|
||||
self.flags.clear();
|
||||
for flag in flags {
|
||||
@@ -245,7 +254,7 @@ impl FeatureStore {
|
||||
}
|
||||
}
|
||||
|
||||
/// Evaluate a multivariate feature flag. Returns `None` if the flag is not available or if there are errors
|
||||
/// Evaluate a multivariate feature flag. Returns an error if the flag is not available or if there are errors
|
||||
/// during the evaluation.
|
||||
///
|
||||
/// The parsing logic is as follows:
|
||||
@@ -263,10 +272,15 @@ impl FeatureStore {
|
||||
/// Example: we have a multivariate flag with 3 groups of the configured global rollout percentage: A (10%), B (20%), C (70%).
|
||||
/// There is a single group with a condition that has a rollout percentage of 10% and it does not have a variant override.
|
||||
/// Then, we will have 1% of the users evaluated to A, 2% to B, and 7% to C.
|
||||
///
|
||||
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
|
||||
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
|
||||
/// propagated beyond where the feature flag gets resolved.
|
||||
pub fn evaluate_multivariate(
|
||||
&self,
|
||||
flag_key: &str,
|
||||
user_id: &str,
|
||||
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> Result<String, PostHogEvaluationError> {
|
||||
let hash_on_global_rollout_percentage =
|
||||
Self::consistent_hash(user_id, flag_key, "multivariate");
|
||||
@@ -276,10 +290,39 @@ impl FeatureStore {
|
||||
flag_key,
|
||||
hash_on_global_rollout_percentage,
|
||||
hash_on_group_rollout_percentage,
|
||||
&HashMap::new(),
|
||||
properties,
|
||||
)
|
||||
}
|
||||
|
||||
/// Evaluate a boolean feature flag. Returns an error if the flag is not available or if there are errors
|
||||
/// during the evaluation.
|
||||
///
|
||||
/// The parsing logic is as follows:
|
||||
///
|
||||
/// * Generate a consistent hash for the tenant-feature.
|
||||
/// * Match each filter group.
|
||||
/// - If a group is matched, it will first determine whether the user is in the range of the rollout
|
||||
/// percentage.
|
||||
/// - If the hash falls within the group's rollout percentage, return true.
|
||||
/// * Otherwise, continue with the next group until all groups are evaluated and no group is within the
|
||||
/// rollout percentage.
|
||||
/// * If there are no matching groups, return an error.
|
||||
///
|
||||
/// Returns `Ok(())` if the feature flag evaluates to true. In the future, it will return a payload.
|
||||
///
|
||||
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
|
||||
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
|
||||
/// propagated beyond where the feature flag gets resolved.
|
||||
pub fn evaluate_boolean(
|
||||
&self,
|
||||
flag_key: &str,
|
||||
user_id: &str,
|
||||
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> Result<(), PostHogEvaluationError> {
|
||||
let hash_on_global_rollout_percentage = Self::consistent_hash(user_id, flag_key, "boolean");
|
||||
self.evaluate_boolean_inner(flag_key, hash_on_global_rollout_percentage, properties)
|
||||
}
|
||||
|
||||
/// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID
|
||||
/// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests
|
||||
/// and avoid duplicate computations.
|
||||
@@ -306,6 +349,11 @@ impl FeatureStore {
|
||||
flag_key
|
||||
)));
|
||||
}
|
||||
let Some(ref multivariate) = flag_config.filters.multivariate else {
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"No multivariate available, should use evaluate_boolean?: {flag_key}"
|
||||
)));
|
||||
};
|
||||
// TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog
|
||||
// Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it
|
||||
// does not matter.
|
||||
@@ -314,7 +362,7 @@ impl FeatureStore {
|
||||
GroupEvaluationResult::MatchedAndOverride(variant) => return Ok(variant),
|
||||
GroupEvaluationResult::MatchedAndEvaluate => {
|
||||
let mut percentage = 0;
|
||||
for variant in &flag_config.filters.multivariate.variants {
|
||||
for variant in &multivariate.variants {
|
||||
percentage += variant.rollout_percentage;
|
||||
if self
|
||||
.evaluate_percentage(hash_on_global_rollout_percentage, percentage)
|
||||
@@ -342,6 +390,77 @@ impl FeatureStore {
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID
|
||||
/// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests
|
||||
/// and avoid duplicate computations.
|
||||
///
|
||||
/// Use a different consistent hash for evaluating the group rollout percentage.
|
||||
/// The behavior: if the condition is set to rolling out to 10% of the users, and
|
||||
/// we set the variant A to 20% in the global config, then 2% of the total users will
|
||||
/// be evaluated to variant A.
|
||||
///
|
||||
/// Note that the hash to determine group rollout percentage is shared across all groups. So if we have two
|
||||
/// exactly-the-same conditions with 10% and 20% rollout percentage respectively, a total of 20% of the users
|
||||
/// will be evaluated (versus 30% if group evaluation is done independently).
|
||||
pub(crate) fn evaluate_boolean_inner(
|
||||
&self,
|
||||
flag_key: &str,
|
||||
hash_on_global_rollout_percentage: f64,
|
||||
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> Result<(), PostHogEvaluationError> {
|
||||
if let Some(flag_config) = self.flags.get(flag_key) {
|
||||
if !flag_config.active {
|
||||
return Err(PostHogEvaluationError::NotAvailable(format!(
|
||||
"The feature flag is not active: {}",
|
||||
flag_key
|
||||
)));
|
||||
}
|
||||
if flag_config.filters.multivariate.is_some() {
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"This looks like a multivariate flag, should use evaluate_multivariate?: {flag_key}"
|
||||
)));
|
||||
};
|
||||
// TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog
|
||||
// Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it
|
||||
// does not matter.
|
||||
for group in &flag_config.filters.groups {
|
||||
match self.evaluate_group(group, hash_on_global_rollout_percentage, properties)? {
|
||||
GroupEvaluationResult::MatchedAndOverride(_) => {
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"Boolean flag cannot have overrides: {}",
|
||||
flag_key
|
||||
)));
|
||||
}
|
||||
GroupEvaluationResult::MatchedAndEvaluate => {
|
||||
return Ok(());
|
||||
}
|
||||
GroupEvaluationResult::Unmatched => continue,
|
||||
}
|
||||
}
|
||||
// If no group is matched, the feature is not available, and up to the caller to decide what to do.
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
} else {
|
||||
// The feature flag is not available yet
|
||||
Err(PostHogEvaluationError::NotAvailable(format!(
|
||||
"Not found in the local evaluation spec: {}",
|
||||
flag_key
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostHogClientConfig {
|
||||
/// The server API key.
|
||||
pub server_api_key: String,
|
||||
/// The client API key.
|
||||
pub client_api_key: String,
|
||||
/// The project ID.
|
||||
pub project_id: String,
|
||||
/// The private API URL.
|
||||
pub private_api_url: String,
|
||||
/// The public API URL.
|
||||
pub public_api_url: String,
|
||||
}
|
||||
|
||||
/// A lite PostHog client.
|
||||
@@ -360,37 +479,16 @@ impl FeatureStore {
|
||||
/// want to report the feature flag usage back to PostHog. The current plan is to use PostHog only as an UI to
|
||||
/// configure feature flags so it is very likely that the client API will not be used.
|
||||
pub struct PostHogClient {
|
||||
/// The server API key.
|
||||
server_api_key: String,
|
||||
/// The client API key.
|
||||
client_api_key: String,
|
||||
/// The project ID.
|
||||
project_id: String,
|
||||
/// The private API URL.
|
||||
private_api_url: String,
|
||||
/// The public API URL.
|
||||
public_api_url: String,
|
||||
/// The config.
|
||||
config: PostHogClientConfig,
|
||||
/// The HTTP client.
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
impl PostHogClient {
|
||||
pub fn new(
|
||||
server_api_key: String,
|
||||
client_api_key: String,
|
||||
project_id: String,
|
||||
private_api_url: String,
|
||||
public_api_url: String,
|
||||
) -> Self {
|
||||
pub fn new(config: PostHogClientConfig) -> Self {
|
||||
let client = reqwest::Client::new();
|
||||
Self {
|
||||
server_api_key,
|
||||
client_api_key,
|
||||
project_id,
|
||||
private_api_url,
|
||||
public_api_url,
|
||||
client,
|
||||
}
|
||||
Self { config, client }
|
||||
}
|
||||
|
||||
pub fn new_with_us_region(
|
||||
@@ -398,13 +496,13 @@ impl PostHogClient {
|
||||
client_api_key: String,
|
||||
project_id: String,
|
||||
) -> Self {
|
||||
Self::new(
|
||||
Self::new(PostHogClientConfig {
|
||||
server_api_key,
|
||||
client_api_key,
|
||||
project_id,
|
||||
"https://us.posthog.com".to_string(),
|
||||
"https://us.i.posthog.com".to_string(),
|
||||
)
|
||||
private_api_url: "https://us.posthog.com".to_string(),
|
||||
public_api_url: "https://us.i.posthog.com".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Fetch the feature flag specs from the server.
|
||||
@@ -422,12 +520,12 @@ impl PostHogClient {
|
||||
// with bearer token of self.server_api_key
|
||||
let url = format!(
|
||||
"{}/api/projects/{}/feature_flags/local_evaluation",
|
||||
self.private_api_url, self.project_id
|
||||
self.config.private_api_url, self.config.project_id
|
||||
);
|
||||
let response = self
|
||||
.client
|
||||
.get(url)
|
||||
.bearer_auth(&self.server_api_key)
|
||||
.bearer_auth(&self.config.server_api_key)
|
||||
.send()
|
||||
.await?;
|
||||
let body = response.text().await?;
|
||||
@@ -446,11 +544,11 @@ impl PostHogClient {
|
||||
) -> anyhow::Result<()> {
|
||||
// PUBLIC_URL/capture/
|
||||
// with bearer token of self.client_api_key
|
||||
let url = format!("{}/capture/", self.public_api_url);
|
||||
let url = format!("{}/capture/", self.config.public_api_url);
|
||||
self.client
|
||||
.post(url)
|
||||
.body(serde_json::to_string(&json!({
|
||||
"api_key": self.client_api_key,
|
||||
"api_key": self.config.client_api_key,
|
||||
"distinct_id": distinct_id,
|
||||
"event": event,
|
||||
"properties": properties,
|
||||
@@ -467,95 +565,162 @@ mod tests {
|
||||
|
||||
fn data() -> &'static str {
|
||||
r#"{
|
||||
"flags": [
|
||||
{
|
||||
"id": 132794,
|
||||
"team_id": 152860,
|
||||
"name": "",
|
||||
"key": "gc-compaction",
|
||||
"filters": {
|
||||
"groups": [
|
||||
{
|
||||
"variant": "enabled-stage-2",
|
||||
"properties": [
|
||||
{
|
||||
"key": "plan_type",
|
||||
"type": "person",
|
||||
"value": [
|
||||
"free"
|
||||
],
|
||||
"operator": "exact"
|
||||
},
|
||||
{
|
||||
"key": "pageserver_remote_size",
|
||||
"type": "person",
|
||||
"value": "10000000",
|
||||
"operator": "lt"
|
||||
}
|
||||
],
|
||||
"rollout_percentage": 50
|
||||
},
|
||||
{
|
||||
"properties": [
|
||||
{
|
||||
"key": "plan_type",
|
||||
"type": "person",
|
||||
"value": [
|
||||
"free"
|
||||
],
|
||||
"operator": "exact"
|
||||
},
|
||||
{
|
||||
"key": "pageserver_remote_size",
|
||||
"type": "person",
|
||||
"value": "10000000",
|
||||
"operator": "lt"
|
||||
}
|
||||
],
|
||||
"rollout_percentage": 80
|
||||
}
|
||||
],
|
||||
"payloads": {},
|
||||
"multivariate": {
|
||||
"variants": [
|
||||
{
|
||||
"key": "disabled",
|
||||
"name": "",
|
||||
"rollout_percentage": 90
|
||||
},
|
||||
{
|
||||
"key": "enabled-stage-1",
|
||||
"name": "",
|
||||
"rollout_percentage": 10
|
||||
},
|
||||
{
|
||||
"key": "enabled-stage-2",
|
||||
"name": "",
|
||||
"rollout_percentage": 0
|
||||
},
|
||||
{
|
||||
"key": "enabled-stage-3",
|
||||
"name": "",
|
||||
"rollout_percentage": 0
|
||||
},
|
||||
{
|
||||
"key": "enabled",
|
||||
"name": "",
|
||||
"rollout_percentage": 0
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"deleted": false,
|
||||
"active": true,
|
||||
"ensure_experience_continuity": false,
|
||||
"has_encrypted_payloads": false,
|
||||
"version": 6
|
||||
}
|
||||
"flags": [
|
||||
{
|
||||
"id": 141807,
|
||||
"team_id": 152860,
|
||||
"name": "",
|
||||
"key": "image-compaction-boundary",
|
||||
"filters": {
|
||||
"groups": [
|
||||
{
|
||||
"variant": null,
|
||||
"properties": [
|
||||
{
|
||||
"key": "plan_type",
|
||||
"type": "person",
|
||||
"value": [
|
||||
"free"
|
||||
],
|
||||
"operator": "exact"
|
||||
}
|
||||
],
|
||||
"group_type_mapping": {},
|
||||
"cohorts": {}
|
||||
}"#
|
||||
"rollout_percentage": 40
|
||||
},
|
||||
{
|
||||
"variant": null,
|
||||
"properties": [],
|
||||
"rollout_percentage": 10
|
||||
}
|
||||
],
|
||||
"payloads": {},
|
||||
"multivariate": null
|
||||
},
|
||||
"deleted": false,
|
||||
"active": true,
|
||||
"ensure_experience_continuity": false,
|
||||
"has_encrypted_payloads": false,
|
||||
"version": 1
|
||||
},
|
||||
{
|
||||
"id": 135586,
|
||||
"team_id": 152860,
|
||||
"name": "",
|
||||
"key": "boolean-flag",
|
||||
"filters": {
|
||||
"groups": [
|
||||
{
|
||||
"variant": null,
|
||||
"properties": [
|
||||
{
|
||||
"key": "plan_type",
|
||||
"type": "person",
|
||||
"value": [
|
||||
"free"
|
||||
],
|
||||
"operator": "exact"
|
||||
}
|
||||
],
|
||||
"rollout_percentage": 47
|
||||
}
|
||||
],
|
||||
"payloads": {},
|
||||
"multivariate": null
|
||||
},
|
||||
"deleted": false,
|
||||
"active": true,
|
||||
"ensure_experience_continuity": false,
|
||||
"has_encrypted_payloads": false,
|
||||
"version": 1
|
||||
},
|
||||
{
|
||||
"id": 132794,
|
||||
"team_id": 152860,
|
||||
"name": "",
|
||||
"key": "gc-compaction",
|
||||
"filters": {
|
||||
"groups": [
|
||||
{
|
||||
"variant": "enabled-stage-2",
|
||||
"properties": [
|
||||
{
|
||||
"key": "plan_type",
|
||||
"type": "person",
|
||||
"value": [
|
||||
"free"
|
||||
],
|
||||
"operator": "exact"
|
||||
},
|
||||
{
|
||||
"key": "pageserver_remote_size",
|
||||
"type": "person",
|
||||
"value": "10000000",
|
||||
"operator": "lt"
|
||||
}
|
||||
],
|
||||
"rollout_percentage": 50
|
||||
},
|
||||
{
|
||||
"properties": [
|
||||
{
|
||||
"key": "plan_type",
|
||||
"type": "person",
|
||||
"value": [
|
||||
"free"
|
||||
],
|
||||
"operator": "exact"
|
||||
},
|
||||
{
|
||||
"key": "pageserver_remote_size",
|
||||
"type": "person",
|
||||
"value": "10000000",
|
||||
"operator": "lt"
|
||||
}
|
||||
],
|
||||
"rollout_percentage": 80
|
||||
}
|
||||
],
|
||||
"payloads": {},
|
||||
"multivariate": {
|
||||
"variants": [
|
||||
{
|
||||
"key": "disabled",
|
||||
"name": "",
|
||||
"rollout_percentage": 90
|
||||
},
|
||||
{
|
||||
"key": "enabled-stage-1",
|
||||
"name": "",
|
||||
"rollout_percentage": 10
|
||||
},
|
||||
{
|
||||
"key": "enabled-stage-2",
|
||||
"name": "",
|
||||
"rollout_percentage": 0
|
||||
},
|
||||
{
|
||||
"key": "enabled-stage-3",
|
||||
"name": "",
|
||||
"rollout_percentage": 0
|
||||
},
|
||||
{
|
||||
"key": "enabled",
|
||||
"name": "",
|
||||
"rollout_percentage": 0
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"deleted": false,
|
||||
"active": true,
|
||||
"ensure_experience_continuity": false,
|
||||
"has_encrypted_payloads": false,
|
||||
"version": 7
|
||||
}
|
||||
],
|
||||
"group_type_mapping": {},
|
||||
"cohorts": {}
|
||||
}"#
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -631,4 +796,125 @@ mod tests {
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
),);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evaluate_boolean_1() {
|
||||
// The `boolean-flag` feature flag only has one group that matches on the free user.
|
||||
|
||||
let mut store = FeatureStore::new();
|
||||
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
|
||||
store.set_flags(response.flags);
|
||||
|
||||
// This lacks the required properties and cannot be evaluated.
|
||||
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &HashMap::new());
|
||||
assert!(matches!(
|
||||
variant,
|
||||
Err(PostHogEvaluationError::NotAvailable(_))
|
||||
),);
|
||||
|
||||
let properties_unmatched = HashMap::from([
|
||||
(
|
||||
"plan_type".to_string(),
|
||||
PostHogFlagFilterPropertyValue::String("paid".to_string()),
|
||||
),
|
||||
(
|
||||
"pageserver_remote_size".to_string(),
|
||||
PostHogFlagFilterPropertyValue::Number(1000.0),
|
||||
),
|
||||
]);
|
||||
|
||||
// This does not match any group so there will be an error.
|
||||
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &properties_unmatched);
|
||||
assert!(matches!(
|
||||
variant,
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
),);
|
||||
|
||||
let properties = HashMap::from([
|
||||
(
|
||||
"plan_type".to_string(),
|
||||
PostHogFlagFilterPropertyValue::String("free".to_string()),
|
||||
),
|
||||
(
|
||||
"pageserver_remote_size".to_string(),
|
||||
PostHogFlagFilterPropertyValue::Number(1000.0),
|
||||
),
|
||||
]);
|
||||
|
||||
// It matches the first group as 0.10 <= 0.50 and the properties are matched. Then it gets evaluated to the variant override.
|
||||
let variant = store.evaluate_boolean_inner("boolean-flag", 0.10, &properties);
|
||||
assert!(variant.is_ok());
|
||||
|
||||
// It matches the group conditions but not the group rollout percentage.
|
||||
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &properties);
|
||||
assert!(matches!(
|
||||
variant,
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
),);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evaluate_boolean_2() {
|
||||
// The `image-compaction-boundary` feature flag has one group that matches on the free user and a group that matches on all users.
|
||||
|
||||
let mut store = FeatureStore::new();
|
||||
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
|
||||
store.set_flags(response.flags);
|
||||
|
||||
// This lacks the required properties and cannot be evaluated.
|
||||
let variant =
|
||||
store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &HashMap::new());
|
||||
assert!(matches!(
|
||||
variant,
|
||||
Err(PostHogEvaluationError::NotAvailable(_))
|
||||
),);
|
||||
|
||||
let properties_unmatched = HashMap::from([
|
||||
(
|
||||
"plan_type".to_string(),
|
||||
PostHogFlagFilterPropertyValue::String("paid".to_string()),
|
||||
),
|
||||
(
|
||||
"pageserver_remote_size".to_string(),
|
||||
PostHogFlagFilterPropertyValue::Number(1000.0),
|
||||
),
|
||||
]);
|
||||
|
||||
// This does not match the filtered group but the all user group.
|
||||
let variant =
|
||||
store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &properties_unmatched);
|
||||
assert!(matches!(
|
||||
variant,
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
),);
|
||||
let variant =
|
||||
store.evaluate_boolean_inner("image-compaction-boundary", 0.05, &properties_unmatched);
|
||||
assert!(variant.is_ok());
|
||||
|
||||
let properties = HashMap::from([
|
||||
(
|
||||
"plan_type".to_string(),
|
||||
PostHogFlagFilterPropertyValue::String("free".to_string()),
|
||||
),
|
||||
(
|
||||
"pageserver_remote_size".to_string(),
|
||||
PostHogFlagFilterPropertyValue::Number(1000.0),
|
||||
),
|
||||
]);
|
||||
|
||||
// It matches the first group as 0.30 <= 0.40 and the properties are matched. Then it gets evaluated to the variant override.
|
||||
let variant = store.evaluate_boolean_inner("image-compaction-boundary", 0.30, &properties);
|
||||
assert!(variant.is_ok());
|
||||
|
||||
// It matches the group conditions but not the group rollout percentage.
|
||||
let variant = store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &properties);
|
||||
assert!(matches!(
|
||||
variant,
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
),);
|
||||
|
||||
// It matches the second "all" group conditions.
|
||||
let variant = store.evaluate_boolean_inner("image-compaction-boundary", 0.09, &properties);
|
||||
assert!(variant.is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum BindError {
|
||||
Conversion(Box<dyn Error + marker::Sync + Send>),
|
||||
Serialization(io::Error),
|
||||
@@ -288,6 +289,12 @@ pub fn sync(buf: &mut BytesMut) {
|
||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn flush(buf: &mut BytesMut) {
|
||||
buf.put_u8(b'H');
|
||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn terminate(buf: &mut BytesMut) {
|
||||
buf.put_u8(b'X');
|
||||
|
||||
@@ -9,7 +9,6 @@ use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
#[doc(inline)]
|
||||
pub use postgres_protocol2::Oid;
|
||||
@@ -27,41 +26,6 @@ macro_rules! accepts {
|
||||
)
|
||||
}
|
||||
|
||||
/// Generates an implementation of `ToSql::to_sql_checked`.
|
||||
///
|
||||
/// All `ToSql` implementations should use this macro.
|
||||
macro_rules! to_sql_checked {
|
||||
() => {
|
||||
fn to_sql_checked(
|
||||
&self,
|
||||
ty: &$crate::Type,
|
||||
out: &mut $crate::private::BytesMut,
|
||||
) -> ::std::result::Result<
|
||||
$crate::IsNull,
|
||||
Box<dyn ::std::error::Error + ::std::marker::Sync + ::std::marker::Send>,
|
||||
> {
|
||||
$crate::__to_sql_checked(self, ty, out)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// WARNING: this function is not considered part of this crate's public API.
|
||||
// It is subject to change at any time.
|
||||
#[doc(hidden)]
|
||||
pub fn __to_sql_checked<T>(
|
||||
v: &T,
|
||||
ty: &Type,
|
||||
out: &mut BytesMut,
|
||||
) -> Result<IsNull, Box<dyn Error + Sync + Send>>
|
||||
where
|
||||
T: ToSql,
|
||||
{
|
||||
if !T::accepts(ty) {
|
||||
return Err(Box::new(WrongType::new::<T>(ty.clone())));
|
||||
}
|
||||
v.to_sql(ty, out)
|
||||
}
|
||||
|
||||
// mod pg_lsn;
|
||||
#[doc(hidden)]
|
||||
pub mod private;
|
||||
@@ -142,7 +106,7 @@ pub enum Kind {
|
||||
/// An array type along with the type of its elements.
|
||||
Array(Type),
|
||||
/// A range type along with the type of its elements.
|
||||
Range(Type),
|
||||
Range(Oid),
|
||||
/// A multirange type along with the type of its elements.
|
||||
Multirange(Type),
|
||||
/// A domain type along with its underlying type.
|
||||
@@ -377,43 +341,6 @@ pub enum IsNull {
|
||||
No,
|
||||
}
|
||||
|
||||
/// A trait for types that can be converted into Postgres values.
|
||||
pub trait ToSql: fmt::Debug {
|
||||
/// Converts the value of `self` into the binary format of the specified
|
||||
/// Postgres `Type`, appending it to `out`.
|
||||
///
|
||||
/// The caller of this method is responsible for ensuring that this type
|
||||
/// is compatible with the Postgres `Type`.
|
||||
///
|
||||
/// The return value indicates if this value should be represented as
|
||||
/// `NULL`. If this is the case, implementations **must not** write
|
||||
/// anything to `out`.
|
||||
fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>>
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
/// Determines if a value of this type can be converted to the specified
|
||||
/// Postgres `Type`.
|
||||
fn accepts(ty: &Type) -> bool
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
/// An adaptor method used internally by Rust-Postgres.
|
||||
///
|
||||
/// *All* implementations of this method should be generated by the
|
||||
/// `to_sql_checked!()` macro.
|
||||
fn to_sql_checked(
|
||||
&self,
|
||||
ty: &Type,
|
||||
out: &mut BytesMut,
|
||||
) -> Result<IsNull, Box<dyn Error + Sync + Send>>;
|
||||
|
||||
/// Specify the encode format
|
||||
fn encode_format(&self, _ty: &Type) -> Format {
|
||||
Format::Binary
|
||||
}
|
||||
}
|
||||
|
||||
/// Supported Postgres message format types
|
||||
///
|
||||
/// Using Text format in a message assumes a Postgres `SERVER_ENCODING` of `UTF8`
|
||||
@@ -424,52 +351,3 @@ pub enum Format {
|
||||
/// Compact, typed binary format
|
||||
Binary,
|
||||
}
|
||||
|
||||
impl ToSql for &str {
|
||||
fn to_sql(&self, ty: &Type, w: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
|
||||
match *ty {
|
||||
ref ty if ty.name() == "ltree" => types::ltree_to_sql(self, w),
|
||||
ref ty if ty.name() == "lquery" => types::lquery_to_sql(self, w),
|
||||
ref ty if ty.name() == "ltxtquery" => types::ltxtquery_to_sql(self, w),
|
||||
_ => types::text_to_sql(self, w),
|
||||
}
|
||||
Ok(IsNull::No)
|
||||
}
|
||||
|
||||
fn accepts(ty: &Type) -> bool {
|
||||
match *ty {
|
||||
Type::VARCHAR | Type::TEXT | Type::BPCHAR | Type::NAME | Type::UNKNOWN => true,
|
||||
ref ty
|
||||
if (ty.name() == "citext"
|
||||
|| ty.name() == "ltree"
|
||||
|| ty.name() == "lquery"
|
||||
|| ty.name() == "ltxtquery") =>
|
||||
{
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
to_sql_checked!();
|
||||
}
|
||||
|
||||
macro_rules! simple_to {
|
||||
($t:ty, $f:ident, $($expected:ident),+) => {
|
||||
impl ToSql for $t {
|
||||
fn to_sql(&self,
|
||||
_: &Type,
|
||||
w: &mut BytesMut)
|
||||
-> Result<IsNull, Box<dyn Error + Sync + Send>> {
|
||||
types::$f(*self, w);
|
||||
Ok(IsNull::No)
|
||||
}
|
||||
|
||||
accepts!($($expected),+);
|
||||
|
||||
to_sql_checked!();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
simple_to!(u32, oid_to_sql, OID);
|
||||
|
||||
@@ -393,7 +393,7 @@ impl Inner {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn oid(&self) -> Oid {
|
||||
pub const fn const_oid(&self) -> Oid {
|
||||
match *self {
|
||||
Inner::Bool => 16,
|
||||
Inner::Bytea => 17,
|
||||
@@ -580,7 +580,14 @@ impl Inner {
|
||||
Inner::TstzmultiRangeArray => 6153,
|
||||
Inner::DatemultiRangeArray => 6155,
|
||||
Inner::Int8multiRangeArray => 6157,
|
||||
Inner::Other(_) => u32::MAX,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn oid(&self) -> Oid {
|
||||
match *self {
|
||||
Inner::Other(ref u) => u.oid,
|
||||
_ => self.const_oid(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -727,17 +734,17 @@ impl Inner {
|
||||
Inner::JsonbArray => &Kind::Array(Type(Inner::Jsonb)),
|
||||
Inner::AnyRange => &Kind::Pseudo,
|
||||
Inner::EventTrigger => &Kind::Pseudo,
|
||||
Inner::Int4Range => &Kind::Range(Type(Inner::Int4)),
|
||||
Inner::Int4Range => &const { Kind::Range(Inner::Int4.const_oid()) },
|
||||
Inner::Int4RangeArray => &Kind::Array(Type(Inner::Int4Range)),
|
||||
Inner::NumRange => &Kind::Range(Type(Inner::Numeric)),
|
||||
Inner::NumRange => &const { Kind::Range(Inner::Numeric.const_oid()) },
|
||||
Inner::NumRangeArray => &Kind::Array(Type(Inner::NumRange)),
|
||||
Inner::TsRange => &Kind::Range(Type(Inner::Timestamp)),
|
||||
Inner::TsRange => &const { Kind::Range(Inner::Timestamp.const_oid()) },
|
||||
Inner::TsRangeArray => &Kind::Array(Type(Inner::TsRange)),
|
||||
Inner::TstzRange => &Kind::Range(Type(Inner::Timestamptz)),
|
||||
Inner::TstzRange => &const { Kind::Range(Inner::Timestamptz.const_oid()) },
|
||||
Inner::TstzRangeArray => &Kind::Array(Type(Inner::TstzRange)),
|
||||
Inner::DateRange => &Kind::Range(Type(Inner::Date)),
|
||||
Inner::DateRange => &const { Kind::Range(Inner::Date.const_oid()) },
|
||||
Inner::DateRangeArray => &Kind::Array(Type(Inner::DateRange)),
|
||||
Inner::Int8Range => &Kind::Range(Type(Inner::Int8)),
|
||||
Inner::Int8Range => &const { Kind::Range(Inner::Int8.const_oid()) },
|
||||
Inner::Int8RangeArray => &Kind::Array(Type(Inner::Int8Range)),
|
||||
Inner::Jsonpath => &Kind::Simple,
|
||||
Inner::JsonpathArray => &Kind::Array(Type(Inner::Jsonpath)),
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{TryStreamExt, future, ready};
|
||||
use parking_lot::Mutex;
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::message::frontend;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -16,29 +14,52 @@ use tokio::sync::mpsc;
|
||||
|
||||
use crate::codec::{BackendMessages, FrontendMessage};
|
||||
use crate::config::{Host, SslMode};
|
||||
use crate::connection::{Request, RequestMessages};
|
||||
use crate::query::RowStream;
|
||||
use crate::simple_query::SimpleQueryStream;
|
||||
use crate::types::{Oid, Type};
|
||||
use crate::{
|
||||
CancelToken, Error, ReadyForQueryStatus, SimpleQueryMessage, Statement, Transaction,
|
||||
TransactionBuilder, query, simple_query,
|
||||
CancelToken, Error, ReadyForQueryStatus, SimpleQueryMessage, Transaction, TransactionBuilder,
|
||||
query, simple_query,
|
||||
};
|
||||
|
||||
pub struct Responses {
|
||||
/// new messages from conn
|
||||
receiver: mpsc::Receiver<BackendMessages>,
|
||||
/// current batch of messages
|
||||
cur: BackendMessages,
|
||||
/// number of total queries sent.
|
||||
waiting: usize,
|
||||
/// number of ReadyForQuery messages received.
|
||||
received: usize,
|
||||
}
|
||||
|
||||
impl Responses {
|
||||
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
|
||||
loop {
|
||||
match self.cur.next().map_err(Error::parse)? {
|
||||
Some(Message::ErrorResponse(body)) => return Poll::Ready(Err(Error::db(body))),
|
||||
Some(message) => return Poll::Ready(Ok(message)),
|
||||
None => {}
|
||||
// get the next saved message
|
||||
if let Some(message) = self.cur.next().map_err(Error::parse)? {
|
||||
let received = self.received;
|
||||
|
||||
// increase the query head if this is the last message.
|
||||
if let Message::ReadyForQuery(_) = message {
|
||||
self.received += 1;
|
||||
}
|
||||
|
||||
// check if the client has skipped this query.
|
||||
if received + 1 < self.waiting {
|
||||
// grab the next message.
|
||||
continue;
|
||||
}
|
||||
|
||||
// convenience: turn the error messaage into a proper error.
|
||||
let res = match message {
|
||||
Message::ErrorResponse(body) => Err(Error::db(body)),
|
||||
message => Ok(message),
|
||||
};
|
||||
return Poll::Ready(res);
|
||||
}
|
||||
|
||||
// get the next batch of messages.
|
||||
match ready!(self.receiver.poll_recv(cx)) {
|
||||
Some(messages) => self.cur = messages,
|
||||
None => return Poll::Ready(Err(Error::closed())),
|
||||
@@ -55,44 +76,87 @@ impl Responses {
|
||||
/// (corresponding to the queries in the [crate::prepare] module).
|
||||
#[derive(Default)]
|
||||
pub(crate) struct CachedTypeInfo {
|
||||
/// A statement for basic information for a type from its
|
||||
/// OID. Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_QUERY) (or its
|
||||
/// fallback).
|
||||
pub(crate) typeinfo: Option<Statement>,
|
||||
|
||||
/// Cache of types already looked up.
|
||||
pub(crate) types: HashMap<Oid, Type>,
|
||||
}
|
||||
|
||||
pub struct InnerClient {
|
||||
sender: mpsc::UnboundedSender<Request>,
|
||||
sender: mpsc::UnboundedSender<FrontendMessage>,
|
||||
responses: Responses,
|
||||
|
||||
/// A buffer to use when writing out postgres commands.
|
||||
buffer: Mutex<BytesMut>,
|
||||
buffer: BytesMut,
|
||||
}
|
||||
|
||||
impl InnerClient {
|
||||
pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
|
||||
let (sender, receiver) = mpsc::channel(1);
|
||||
let request = Request { messages, sender };
|
||||
self.sender.send(request).map_err(|_| Error::closed())?;
|
||||
|
||||
Ok(Responses {
|
||||
receiver,
|
||||
cur: BackendMessages::empty(),
|
||||
})
|
||||
pub fn start(&mut self) -> Result<PartialQuery, Error> {
|
||||
self.responses.waiting += 1;
|
||||
Ok(PartialQuery(Some(self)))
|
||||
}
|
||||
|
||||
/// Call the given function with a buffer to be used when writing out
|
||||
/// postgres commands.
|
||||
pub fn with_buf<F, R>(&self, f: F) -> R
|
||||
// pub fn send_with_sync<F>(&mut self, f: F) -> Result<&mut Responses, Error>
|
||||
// where
|
||||
// F: FnOnce(&mut BytesMut) -> Result<(), Error>,
|
||||
// {
|
||||
// self.start()?.send_with_sync(f)
|
||||
// }
|
||||
|
||||
pub fn send_simple_query(&mut self, query: &str) -> Result<&mut Responses, Error> {
|
||||
self.responses.waiting += 1;
|
||||
|
||||
self.buffer.clear();
|
||||
// simple queries do not need sync.
|
||||
frontend::query(query, &mut self.buffer).map_err(Error::encode)?;
|
||||
let buf = self.buffer.split().freeze();
|
||||
self.send_message(FrontendMessage::Raw(buf))
|
||||
}
|
||||
|
||||
fn send_message(&mut self, messages: FrontendMessage) -> Result<&mut Responses, Error> {
|
||||
self.sender.send(messages).map_err(|_| Error::closed())?;
|
||||
Ok(&mut self.responses)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PartialQuery<'a>(Option<&'a mut InnerClient>);
|
||||
|
||||
impl Drop for PartialQuery<'_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(client) = self.0.take() {
|
||||
client.buffer.clear();
|
||||
frontend::sync(&mut client.buffer);
|
||||
let buf = client.buffer.split().freeze();
|
||||
let _ = client.send_message(FrontendMessage::Raw(buf));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> PartialQuery<'a> {
|
||||
pub fn send_with_flush<F>(&mut self, f: F) -> Result<&mut Responses, Error>
|
||||
where
|
||||
F: FnOnce(&mut BytesMut) -> R,
|
||||
F: FnOnce(&mut BytesMut) -> Result<(), Error>,
|
||||
{
|
||||
let mut buffer = self.buffer.lock();
|
||||
let r = f(&mut buffer);
|
||||
buffer.clear();
|
||||
r
|
||||
let client = self.0.as_deref_mut().unwrap();
|
||||
|
||||
client.buffer.clear();
|
||||
f(&mut client.buffer)?;
|
||||
frontend::flush(&mut client.buffer);
|
||||
let buf = client.buffer.split().freeze();
|
||||
client.send_message(FrontendMessage::Raw(buf))
|
||||
}
|
||||
|
||||
pub fn send_with_sync<F>(mut self, f: F) -> Result<&'a mut Responses, Error>
|
||||
where
|
||||
F: FnOnce(&mut BytesMut) -> Result<(), Error>,
|
||||
{
|
||||
let client = self.0.as_deref_mut().unwrap();
|
||||
|
||||
client.buffer.clear();
|
||||
f(&mut client.buffer)?;
|
||||
frontend::sync(&mut client.buffer);
|
||||
let buf = client.buffer.split().freeze();
|
||||
let _ = client.send_message(FrontendMessage::Raw(buf));
|
||||
|
||||
Ok(&mut self.0.take().unwrap().responses)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,7 +173,7 @@ pub struct SocketConfig {
|
||||
/// The client is one half of what is returned when a connection is established. Users interact with the database
|
||||
/// through this client object.
|
||||
pub struct Client {
|
||||
inner: Arc<InnerClient>,
|
||||
inner: InnerClient,
|
||||
cached_typeinfo: CachedTypeInfo,
|
||||
|
||||
socket_config: SocketConfig,
|
||||
@@ -120,17 +184,24 @@ pub struct Client {
|
||||
|
||||
impl Client {
|
||||
pub(crate) fn new(
|
||||
sender: mpsc::UnboundedSender<Request>,
|
||||
sender: mpsc::UnboundedSender<FrontendMessage>,
|
||||
receiver: mpsc::Receiver<BackendMessages>,
|
||||
socket_config: SocketConfig,
|
||||
ssl_mode: SslMode,
|
||||
process_id: i32,
|
||||
secret_key: i32,
|
||||
) -> Client {
|
||||
Client {
|
||||
inner: Arc::new(InnerClient {
|
||||
inner: InnerClient {
|
||||
sender,
|
||||
responses: Responses {
|
||||
receiver,
|
||||
cur: BackendMessages::empty(),
|
||||
waiting: 0,
|
||||
received: 0,
|
||||
},
|
||||
buffer: Default::default(),
|
||||
}),
|
||||
},
|
||||
cached_typeinfo: Default::default(),
|
||||
|
||||
socket_config,
|
||||
@@ -145,19 +216,29 @@ impl Client {
|
||||
self.process_id
|
||||
}
|
||||
|
||||
pub(crate) fn inner(&self) -> &Arc<InnerClient> {
|
||||
&self.inner
|
||||
pub(crate) fn inner_mut(&mut self) -> &mut InnerClient {
|
||||
&mut self.inner
|
||||
}
|
||||
|
||||
/// Pass text directly to the Postgres backend to allow it to sort out typing itself and
|
||||
/// to save a roundtrip
|
||||
pub async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
|
||||
pub async fn query_raw_txt<S, I>(
|
||||
&mut self,
|
||||
statement: &str,
|
||||
params: I,
|
||||
) -> Result<RowStream, Error>
|
||||
where
|
||||
S: AsRef<str>,
|
||||
I: IntoIterator<Item = Option<S>>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
query::query_txt(&self.inner, statement, params).await
|
||||
query::query_txt(
|
||||
&mut self.inner,
|
||||
&mut self.cached_typeinfo,
|
||||
statement,
|
||||
params,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
|
||||
@@ -173,12 +254,15 @@ impl Client {
|
||||
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
|
||||
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
|
||||
/// them to this method!
|
||||
pub async fn simple_query(&self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
|
||||
pub async fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
|
||||
self.simple_query_raw(query).await?.try_collect().await
|
||||
}
|
||||
|
||||
pub(crate) async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
|
||||
simple_query::simple_query(self.inner(), query).await
|
||||
pub(crate) async fn simple_query_raw(
|
||||
&mut self,
|
||||
query: &str,
|
||||
) -> Result<SimpleQueryStream, Error> {
|
||||
simple_query::simple_query(self.inner_mut(), query).await
|
||||
}
|
||||
|
||||
/// Executes a sequence of SQL statements using the simple query protocol.
|
||||
@@ -191,15 +275,11 @@ impl Client {
|
||||
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
|
||||
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
|
||||
/// them to this method!
|
||||
pub async fn batch_execute(&self, query: &str) -> Result<ReadyForQueryStatus, Error> {
|
||||
simple_query::batch_execute(self.inner(), query).await
|
||||
pub async fn batch_execute(&mut self, query: &str) -> Result<ReadyForQueryStatus, Error> {
|
||||
simple_query::batch_execute(self.inner_mut(), query).await
|
||||
}
|
||||
|
||||
pub async fn discard_all(&mut self) -> Result<ReadyForQueryStatus, Error> {
|
||||
// clear the prepared statements that are about to be nuked from the postgres session
|
||||
|
||||
self.cached_typeinfo.typeinfo = None;
|
||||
|
||||
self.batch_execute("discard all").await
|
||||
}
|
||||
|
||||
@@ -208,7 +288,7 @@ impl Client {
|
||||
/// The transaction will roll back by default - use the `commit` method to commit it.
|
||||
pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
|
||||
struct RollbackIfNotDone<'me> {
|
||||
client: &'me Client,
|
||||
client: &'me mut Client,
|
||||
done: bool,
|
||||
}
|
||||
|
||||
@@ -218,14 +298,7 @@ impl Client {
|
||||
return;
|
||||
}
|
||||
|
||||
let buf = self.client.inner().with_buf(|buf| {
|
||||
frontend::query("ROLLBACK", buf).unwrap();
|
||||
buf.split().freeze()
|
||||
});
|
||||
let _ = self
|
||||
.client
|
||||
.inner()
|
||||
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
|
||||
let _ = self.client.inner.send_simple_query("ROLLBACK");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -239,7 +312,7 @@ impl Client {
|
||||
client: self,
|
||||
done: false,
|
||||
};
|
||||
self.batch_execute("BEGIN").await?;
|
||||
cleaner.client.batch_execute("BEGIN").await?;
|
||||
cleaner.done = true;
|
||||
}
|
||||
|
||||
@@ -265,11 +338,6 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
/// Query for type information
|
||||
pub(crate) async fn get_type_inner(&mut self, oid: Oid) -> Result<Type, Error> {
|
||||
crate::prepare::get_type(&self.inner, &mut self.cached_typeinfo, oid).await
|
||||
}
|
||||
|
||||
/// Determines if the connection to the server has already closed.
|
||||
///
|
||||
/// In that case, all future queries will fail.
|
||||
|
||||
@@ -1,21 +1,16 @@
|
||||
use std::io;
|
||||
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use postgres_protocol2::message::backend;
|
||||
use postgres_protocol2::message::frontend::CopyData;
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
|
||||
pub enum FrontendMessage {
|
||||
Raw(Bytes),
|
||||
CopyData(CopyData<Box<dyn Buf + Send>>),
|
||||
}
|
||||
|
||||
pub enum BackendMessage {
|
||||
Normal {
|
||||
messages: BackendMessages,
|
||||
request_complete: bool,
|
||||
},
|
||||
Normal { messages: BackendMessages },
|
||||
Async(backend::Message),
|
||||
}
|
||||
|
||||
@@ -44,7 +39,6 @@ impl Encoder<FrontendMessage> for PostgresCodec {
|
||||
fn encode(&mut self, item: FrontendMessage, dst: &mut BytesMut) -> io::Result<()> {
|
||||
match item {
|
||||
FrontendMessage::Raw(buf) => dst.extend_from_slice(&buf),
|
||||
FrontendMessage::CopyData(data) => data.write(dst),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -57,7 +51,6 @@ impl Decoder for PostgresCodec {
|
||||
|
||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BackendMessage>, io::Error> {
|
||||
let mut idx = 0;
|
||||
let mut request_complete = false;
|
||||
|
||||
while let Some(header) = backend::Header::parse(&src[idx..])? {
|
||||
let len = header.len() as usize + 1;
|
||||
@@ -82,7 +75,6 @@ impl Decoder for PostgresCodec {
|
||||
idx += len;
|
||||
|
||||
if header.tag() == backend::READY_FOR_QUERY_TAG {
|
||||
request_complete = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -92,7 +84,6 @@ impl Decoder for PostgresCodec {
|
||||
} else {
|
||||
Ok(Some(BackendMessage::Normal {
|
||||
messages: BackendMessages(src.split_to(idx)),
|
||||
request_complete,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,9 +59,11 @@ where
|
||||
connect_timeout: config.connect_timeout,
|
||||
};
|
||||
|
||||
let (sender, receiver) = mpsc::unbounded_channel();
|
||||
let (client_tx, conn_rx) = mpsc::unbounded_channel();
|
||||
let (conn_tx, client_rx) = mpsc::channel(4);
|
||||
let client = Client::new(
|
||||
sender,
|
||||
client_tx,
|
||||
client_rx,
|
||||
socket_config,
|
||||
config.ssl_mode,
|
||||
process_id,
|
||||
@@ -74,7 +76,7 @@ where
|
||||
.map(|m| BackendMessage::Async(Message::NoticeResponse(m)))
|
||||
.collect();
|
||||
|
||||
let connection = Connection::new(stream, delayed, parameters, receiver);
|
||||
let connection = Connection::new(stream, delayed, parameters, conn_tx, conn_rx);
|
||||
|
||||
Ok((client, connection))
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use bytes::BytesMut;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{Sink, Stream, ready};
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::message::frontend;
|
||||
@@ -19,30 +18,12 @@ use crate::error::DbError;
|
||||
use crate::maybe_tls_stream::MaybeTlsStream;
|
||||
use crate::{AsyncMessage, Error, Notification};
|
||||
|
||||
pub enum RequestMessages {
|
||||
Single(FrontendMessage),
|
||||
}
|
||||
|
||||
pub struct Request {
|
||||
pub messages: RequestMessages,
|
||||
pub sender: mpsc::Sender<BackendMessages>,
|
||||
}
|
||||
|
||||
pub struct Response {
|
||||
sender: PollSender<BackendMessages>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Debug)]
|
||||
enum State {
|
||||
Active,
|
||||
Closing,
|
||||
}
|
||||
|
||||
enum WriteReady {
|
||||
Terminating,
|
||||
WaitingOnRead,
|
||||
}
|
||||
|
||||
/// A connection to a PostgreSQL database.
|
||||
///
|
||||
/// This is one half of what is returned when a new connection is established. It performs the actual IO with the
|
||||
@@ -56,9 +37,11 @@ pub struct Connection<S, T> {
|
||||
pub stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
|
||||
/// HACK: we need this in the Neon Proxy to forward params.
|
||||
pub parameters: HashMap<String, String>,
|
||||
receiver: mpsc::UnboundedReceiver<Request>,
|
||||
|
||||
sender: PollSender<BackendMessages>,
|
||||
receiver: mpsc::UnboundedReceiver<FrontendMessage>,
|
||||
|
||||
pending_responses: VecDeque<BackendMessage>,
|
||||
responses: VecDeque<Response>,
|
||||
state: State,
|
||||
}
|
||||
|
||||
@@ -71,14 +54,15 @@ where
|
||||
stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
|
||||
pending_responses: VecDeque<BackendMessage>,
|
||||
parameters: HashMap<String, String>,
|
||||
receiver: mpsc::UnboundedReceiver<Request>,
|
||||
sender: mpsc::Sender<BackendMessages>,
|
||||
receiver: mpsc::UnboundedReceiver<FrontendMessage>,
|
||||
) -> Connection<S, T> {
|
||||
Connection {
|
||||
stream,
|
||||
parameters,
|
||||
sender: PollSender::new(sender),
|
||||
receiver,
|
||||
pending_responses,
|
||||
responses: VecDeque::new(),
|
||||
state: State::Active,
|
||||
}
|
||||
}
|
||||
@@ -110,7 +94,7 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
let (mut messages, request_complete) = match message {
|
||||
let messages = match message {
|
||||
BackendMessage::Async(Message::NoticeResponse(body)) => {
|
||||
let error = DbError::parse(&mut body.fields()).map_err(Error::parse)?;
|
||||
return Poll::Ready(Ok(AsyncMessage::Notice(error)));
|
||||
@@ -131,41 +115,19 @@ where
|
||||
continue;
|
||||
}
|
||||
BackendMessage::Async(_) => unreachable!(),
|
||||
BackendMessage::Normal {
|
||||
messages,
|
||||
request_complete,
|
||||
} => (messages, request_complete),
|
||||
BackendMessage::Normal { messages } => messages,
|
||||
};
|
||||
|
||||
let mut response = match self.responses.pop_front() {
|
||||
Some(response) => response,
|
||||
None => match messages.next().map_err(Error::parse)? {
|
||||
Some(Message::ErrorResponse(error)) => {
|
||||
return Poll::Ready(Err(Error::db(error)));
|
||||
}
|
||||
_ => return Poll::Ready(Err(Error::unexpected_message())),
|
||||
},
|
||||
};
|
||||
|
||||
match response.sender.poll_reserve(cx) {
|
||||
match self.sender.poll_reserve(cx) {
|
||||
Poll::Ready(Ok(())) => {
|
||||
let _ = response.sender.send_item(messages);
|
||||
if !request_complete {
|
||||
self.responses.push_front(response);
|
||||
}
|
||||
let _ = self.sender.send_item(messages);
|
||||
}
|
||||
Poll::Ready(Err(_)) => {
|
||||
// we need to keep paging through the rest of the messages even if the receiver's hung up
|
||||
if !request_complete {
|
||||
self.responses.push_front(response);
|
||||
}
|
||||
return Poll::Ready(Err(Error::closed()));
|
||||
}
|
||||
Poll::Pending => {
|
||||
self.responses.push_front(response);
|
||||
self.pending_responses.push_back(BackendMessage::Normal {
|
||||
messages,
|
||||
request_complete,
|
||||
});
|
||||
self.pending_responses
|
||||
.push_back(BackendMessage::Normal { messages });
|
||||
trace!("poll_read: waiting on sender");
|
||||
return Poll::Pending;
|
||||
}
|
||||
@@ -174,7 +136,7 @@ where
|
||||
}
|
||||
|
||||
/// Fetch the next client request and enqueue the response sender.
|
||||
fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Option<RequestMessages>> {
|
||||
fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Option<FrontendMessage>> {
|
||||
if self.receiver.is_closed() {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
@@ -182,10 +144,7 @@ where
|
||||
match self.receiver.poll_recv(cx) {
|
||||
Poll::Ready(Some(request)) => {
|
||||
trace!("polled new request");
|
||||
self.responses.push_back(Response {
|
||||
sender: PollSender::new(request.sender),
|
||||
});
|
||||
Poll::Ready(Some(request.messages))
|
||||
Poll::Ready(Some(request))
|
||||
}
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Pending => Poll::Pending,
|
||||
@@ -194,7 +153,7 @@ where
|
||||
|
||||
/// Process client requests and write them to the postgres connection, flushing if necessary.
|
||||
/// client -> postgres
|
||||
fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<WriteReady, Error>> {
|
||||
fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
|
||||
loop {
|
||||
if Pin::new(&mut self.stream)
|
||||
.poll_ready(cx)
|
||||
@@ -209,14 +168,14 @@ where
|
||||
|
||||
match self.poll_request(cx) {
|
||||
// send the message to postgres
|
||||
Poll::Ready(Some(RequestMessages::Single(request))) => {
|
||||
Poll::Ready(Some(request)) => {
|
||||
Pin::new(&mut self.stream)
|
||||
.start_send(request)
|
||||
.map_err(Error::io)?;
|
||||
}
|
||||
// No more messages from the client, and no more responses to wait for.
|
||||
// Send a terminate message to postgres
|
||||
Poll::Ready(None) if self.responses.is_empty() => {
|
||||
Poll::Ready(None) => {
|
||||
trace!("poll_write: at eof, terminating");
|
||||
let mut request = BytesMut::new();
|
||||
frontend::terminate(&mut request);
|
||||
@@ -228,16 +187,7 @@ where
|
||||
|
||||
trace!("poll_write: sent eof, closing");
|
||||
trace!("poll_write: done");
|
||||
return Poll::Ready(Ok(WriteReady::Terminating));
|
||||
}
|
||||
// No more messages from the client, but there are still some responses to wait for.
|
||||
Poll::Ready(None) => {
|
||||
trace!(
|
||||
"poll_write: at eof, pending responses {}",
|
||||
self.responses.len()
|
||||
);
|
||||
ready!(self.poll_flush(cx))?;
|
||||
return Poll::Ready(Ok(WriteReady::WaitingOnRead));
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
// Still waiting for a message from the client.
|
||||
Poll::Pending => {
|
||||
@@ -298,7 +248,7 @@ where
|
||||
// if the state is still active, try read from and write to postgres.
|
||||
let message = self.poll_read(cx)?;
|
||||
let closing = self.poll_write(cx)?;
|
||||
if let Poll::Ready(WriteReady::Terminating) = closing {
|
||||
if let Poll::Ready(()) = closing {
|
||||
self.state = State::Closing;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
#![allow(async_fn_in_trait)]
|
||||
|
||||
use postgres_protocol2::Oid;
|
||||
|
||||
use crate::query::RowStream;
|
||||
use crate::types::Type;
|
||||
use crate::{Client, Error, Transaction};
|
||||
|
||||
mod private {
|
||||
@@ -15,20 +12,17 @@ mod private {
|
||||
/// This trait is "sealed", and cannot be implemented outside of this crate.
|
||||
pub trait GenericClient: private::Sealed {
|
||||
/// Like `Client::query_raw_txt`.
|
||||
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
|
||||
async fn query_raw_txt<S, I>(&mut self, statement: &str, params: I) -> Result<RowStream, Error>
|
||||
where
|
||||
S: AsRef<str> + Sync + Send,
|
||||
I: IntoIterator<Item = Option<S>> + Sync + Send,
|
||||
I::IntoIter: ExactSizeIterator + Sync + Send;
|
||||
|
||||
/// Query for type information
|
||||
async fn get_type(&mut self, oid: Oid) -> Result<Type, Error>;
|
||||
}
|
||||
|
||||
impl private::Sealed for Client {}
|
||||
|
||||
impl GenericClient for Client {
|
||||
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
|
||||
async fn query_raw_txt<S, I>(&mut self, statement: &str, params: I) -> Result<RowStream, Error>
|
||||
where
|
||||
S: AsRef<str> + Sync + Send,
|
||||
I: IntoIterator<Item = Option<S>> + Sync + Send,
|
||||
@@ -36,17 +30,12 @@ impl GenericClient for Client {
|
||||
{
|
||||
self.query_raw_txt(statement, params).await
|
||||
}
|
||||
|
||||
/// Query for type information
|
||||
async fn get_type(&mut self, oid: Oid) -> Result<Type, Error> {
|
||||
self.get_type_inner(oid).await
|
||||
}
|
||||
}
|
||||
|
||||
impl private::Sealed for Transaction<'_> {}
|
||||
|
||||
impl GenericClient for Transaction<'_> {
|
||||
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
|
||||
async fn query_raw_txt<S, I>(&mut self, statement: &str, params: I) -> Result<RowStream, Error>
|
||||
where
|
||||
S: AsRef<str> + Sync + Send,
|
||||
I: IntoIterator<Item = Option<S>> + Sync + Send,
|
||||
@@ -54,9 +43,4 @@ impl GenericClient for Transaction<'_> {
|
||||
{
|
||||
self.query_raw_txt(statement, params).await
|
||||
}
|
||||
|
||||
/// Query for type information
|
||||
async fn get_type(&mut self, oid: Oid) -> Result<Type, Error> {
|
||||
self.client_mut().get_type(oid).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ pub use crate::statement::{Column, Statement};
|
||||
pub use crate::tls::NoTls;
|
||||
pub use crate::transaction::Transaction;
|
||||
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
|
||||
use crate::types::ToSql;
|
||||
|
||||
/// After executing a query, the connection will be in one of these states
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
@@ -120,9 +119,3 @@ pub enum SimpleQueryMessage {
|
||||
/// The number of rows modified or selected is returned.
|
||||
CommandComplete(u64),
|
||||
}
|
||||
|
||||
fn slice_iter<'a>(
|
||||
s: &'a [&'a (dyn ToSql + Sync)],
|
||||
) -> impl ExactSizeIterator<Item = &'a (dyn ToSql + Sync)> + 'a {
|
||||
s.iter().map(|s| *s as _)
|
||||
}
|
||||
|
||||
@@ -1,19 +1,14 @@
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::Bytes;
|
||||
use bytes::BytesMut;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{TryStreamExt, pin_mut};
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::IsNull;
|
||||
use postgres_protocol2::message::backend::{Message, RowDescriptionBody};
|
||||
use postgres_protocol2::message::frontend;
|
||||
use tracing::debug;
|
||||
use postgres_protocol2::types::oid_to_sql;
|
||||
use postgres_types2::Format;
|
||||
|
||||
use crate::client::{CachedTypeInfo, InnerClient};
|
||||
use crate::codec::FrontendMessage;
|
||||
use crate::connection::RequestMessages;
|
||||
use crate::client::{CachedTypeInfo, PartialQuery, Responses};
|
||||
use crate::types::{Kind, Oid, Type};
|
||||
use crate::{Column, Error, Statement, query, slice_iter};
|
||||
use crate::{Column, Error, Row, Statement};
|
||||
|
||||
pub(crate) const TYPEINFO_QUERY: &str = "\
|
||||
SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, t.typbasetype, n.nspname, t.typrelid
|
||||
@@ -23,22 +18,51 @@ INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid
|
||||
WHERE t.oid = $1
|
||||
";
|
||||
|
||||
/// we need to make sure we close this prepared statement.
|
||||
struct CloseStmt<'a, 'b> {
|
||||
client: Option<&'a mut PartialQuery<'b>>,
|
||||
name: &'static str,
|
||||
}
|
||||
|
||||
impl<'a> CloseStmt<'a, '_> {
|
||||
fn close(mut self) -> Result<&'a mut Responses, Error> {
|
||||
let client = self.client.take().unwrap();
|
||||
client.send_with_flush(|buf| {
|
||||
frontend::close(b'S', self.name, buf).map_err(Error::encode)?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for CloseStmt<'_, '_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(client) = self.client.take() {
|
||||
let _ = client.send_with_flush(|buf| {
|
||||
frontend::close(b'S', self.name, buf).map_err(Error::encode)?;
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn prepare_typecheck(
|
||||
client: &Arc<InnerClient>,
|
||||
client: &mut PartialQuery<'_>,
|
||||
name: &'static str,
|
||||
query: &str,
|
||||
types: &[Type],
|
||||
) -> Result<Statement, Error> {
|
||||
let buf = encode(client, name, query, types)?;
|
||||
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
|
||||
let responses = client.send_with_flush(|buf| {
|
||||
frontend::parse(name, query, [], buf).map_err(Error::encode)?;
|
||||
frontend::describe(b'S', name, buf).map_err(Error::encode)?;
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
match responses.next().await? {
|
||||
Message::ParseComplete => {}
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
}
|
||||
|
||||
let parameter_description = match responses.next().await? {
|
||||
Message::ParameterDescription(body) => body,
|
||||
match responses.next().await? {
|
||||
Message::ParameterDescription(_) => {}
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
};
|
||||
|
||||
@@ -48,13 +72,6 @@ async fn prepare_typecheck(
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
};
|
||||
|
||||
let mut parameters = vec![];
|
||||
let mut it = parameter_description.parameters();
|
||||
while let Some(oid) = it.next().map_err(Error::parse)? {
|
||||
let type_ = Type::from_oid(oid).ok_or_else(Error::unexpected_message)?;
|
||||
parameters.push(type_);
|
||||
}
|
||||
|
||||
let mut columns = vec![];
|
||||
if let Some(row_description) = row_description {
|
||||
let mut it = row_description.fields();
|
||||
@@ -65,98 +82,168 @@ async fn prepare_typecheck(
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Statement::new(client, name, parameters, columns))
|
||||
Ok(Statement::new(name, columns))
|
||||
}
|
||||
|
||||
fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Result<Bytes, Error> {
|
||||
if types.is_empty() {
|
||||
debug!("preparing query {}: {}", name, query);
|
||||
} else {
|
||||
debug!("preparing query {} with types {:?}: {}", name, types, query);
|
||||
}
|
||||
|
||||
client.with_buf(|buf| {
|
||||
frontend::parse(name, query, types.iter().map(Type::oid), buf).map_err(Error::encode)?;
|
||||
frontend::describe(b'S', name, buf).map_err(Error::encode)?;
|
||||
frontend::sync(buf);
|
||||
Ok(buf.split().freeze())
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_type(
|
||||
client: &Arc<InnerClient>,
|
||||
typecache: &mut CachedTypeInfo,
|
||||
oid: Oid,
|
||||
) -> Result<Type, Error> {
|
||||
fn try_from_cache(typecache: &CachedTypeInfo, oid: Oid) -> Option<Type> {
|
||||
if let Some(type_) = Type::from_oid(oid) {
|
||||
return Ok(type_);
|
||||
return Some(type_);
|
||||
}
|
||||
|
||||
if let Some(type_) = typecache.types.get(&oid) {
|
||||
return Ok(type_.clone());
|
||||
return Some(type_.clone());
|
||||
};
|
||||
|
||||
let stmt = typeinfo_statement(client, typecache).await?;
|
||||
None
|
||||
}
|
||||
|
||||
let rows = query::query(client, stmt, slice_iter(&[&oid])).await?;
|
||||
pin_mut!(rows);
|
||||
pub async fn parse_row_description(
|
||||
client: &mut PartialQuery<'_>,
|
||||
typecache: &mut CachedTypeInfo,
|
||||
row_description: Option<RowDescriptionBody>,
|
||||
) -> Result<Vec<Column>, Error> {
|
||||
let mut columns = vec![];
|
||||
|
||||
let row = match rows.try_next().await? {
|
||||
Some(row) => row,
|
||||
None => return Err(Error::unexpected_message()),
|
||||
if let Some(row_description) = row_description {
|
||||
let mut it = row_description.fields();
|
||||
while let Some(field) = it.next().map_err(Error::parse)? {
|
||||
let type_ = try_from_cache(typecache, field.type_oid()).unwrap_or(Type::UNKNOWN);
|
||||
let column = Column::new(field.name().to_string(), type_, field);
|
||||
columns.push(column);
|
||||
}
|
||||
}
|
||||
|
||||
let all_known = columns.iter().all(|c| c.type_ != Type::UNKNOWN);
|
||||
if all_known {
|
||||
// all known, return early.
|
||||
return Ok(columns);
|
||||
}
|
||||
|
||||
let typeinfo = "neon_proxy_typeinfo";
|
||||
|
||||
// make sure to close the typeinfo statement before exiting.
|
||||
let mut guard = CloseStmt {
|
||||
name: typeinfo,
|
||||
client: None,
|
||||
};
|
||||
let client = guard.client.insert(client);
|
||||
|
||||
// get the typeinfo statement.
|
||||
let stmt = prepare_typecheck(client, typeinfo, TYPEINFO_QUERY).await?;
|
||||
|
||||
for column in &mut columns {
|
||||
column.type_ = get_type(client, typecache, &stmt, column.type_oid()).await?;
|
||||
}
|
||||
|
||||
// cancel the close guard.
|
||||
let responses = guard.close()?;
|
||||
|
||||
match responses.next().await? {
|
||||
Message::CloseComplete => {}
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
}
|
||||
|
||||
Ok(columns)
|
||||
}
|
||||
|
||||
async fn get_type(
|
||||
client: &mut PartialQuery<'_>,
|
||||
typecache: &mut CachedTypeInfo,
|
||||
stmt: &Statement,
|
||||
mut oid: Oid,
|
||||
) -> Result<Type, Error> {
|
||||
let mut stack = vec![];
|
||||
let mut type_ = loop {
|
||||
if let Some(type_) = try_from_cache(typecache, oid) {
|
||||
break type_;
|
||||
}
|
||||
|
||||
let row = exec(client, stmt, oid).await?;
|
||||
if stack.len() > 8 {
|
||||
return Err(Error::unexpected_message());
|
||||
}
|
||||
|
||||
let name: String = row.try_get(0)?;
|
||||
let type_: i8 = row.try_get(1)?;
|
||||
let elem_oid: Oid = row.try_get(2)?;
|
||||
let rngsubtype: Option<Oid> = row.try_get(3)?;
|
||||
let basetype: Oid = row.try_get(4)?;
|
||||
let schema: String = row.try_get(5)?;
|
||||
let relid: Oid = row.try_get(6)?;
|
||||
|
||||
let kind = if type_ == b'e' as i8 {
|
||||
Kind::Enum
|
||||
} else if type_ == b'p' as i8 {
|
||||
Kind::Pseudo
|
||||
} else if basetype != 0 {
|
||||
Kind::Domain(basetype)
|
||||
} else if elem_oid != 0 {
|
||||
stack.push((name, oid, schema));
|
||||
oid = elem_oid;
|
||||
continue;
|
||||
} else if relid != 0 {
|
||||
Kind::Composite(relid)
|
||||
} else if let Some(rngsubtype) = rngsubtype {
|
||||
Kind::Range(rngsubtype)
|
||||
} else {
|
||||
Kind::Simple
|
||||
};
|
||||
|
||||
let type_ = Type::new(name, oid, kind, schema);
|
||||
typecache.types.insert(oid, type_.clone());
|
||||
break type_;
|
||||
};
|
||||
|
||||
let name: String = row.try_get(0)?;
|
||||
let type_: i8 = row.try_get(1)?;
|
||||
let elem_oid: Oid = row.try_get(2)?;
|
||||
let rngsubtype: Option<Oid> = row.try_get(3)?;
|
||||
let basetype: Oid = row.try_get(4)?;
|
||||
let schema: String = row.try_get(5)?;
|
||||
let relid: Oid = row.try_get(6)?;
|
||||
|
||||
let kind = if type_ == b'e' as i8 {
|
||||
Kind::Enum
|
||||
} else if type_ == b'p' as i8 {
|
||||
Kind::Pseudo
|
||||
} else if basetype != 0 {
|
||||
Kind::Domain(basetype)
|
||||
} else if elem_oid != 0 {
|
||||
let type_ = get_type_rec(client, typecache, elem_oid).await?;
|
||||
Kind::Array(type_)
|
||||
} else if relid != 0 {
|
||||
Kind::Composite(relid)
|
||||
} else if let Some(rngsubtype) = rngsubtype {
|
||||
let type_ = get_type_rec(client, typecache, rngsubtype).await?;
|
||||
Kind::Range(type_)
|
||||
} else {
|
||||
Kind::Simple
|
||||
};
|
||||
|
||||
let type_ = Type::new(name, oid, kind, schema);
|
||||
typecache.types.insert(oid, type_.clone());
|
||||
while let Some((name, oid, schema)) = stack.pop() {
|
||||
type_ = Type::new(name, oid, Kind::Array(type_), schema);
|
||||
typecache.types.insert(oid, type_.clone());
|
||||
}
|
||||
|
||||
Ok(type_)
|
||||
}
|
||||
|
||||
fn get_type_rec<'a>(
|
||||
client: &'a Arc<InnerClient>,
|
||||
typecache: &'a mut CachedTypeInfo,
|
||||
oid: Oid,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Type, Error>> + Send + 'a>> {
|
||||
Box::pin(get_type(client, typecache, oid))
|
||||
}
|
||||
/// exec the typeinfo statement returning one row.
|
||||
async fn exec(
|
||||
client: &mut PartialQuery<'_>,
|
||||
statement: &Statement,
|
||||
param: Oid,
|
||||
) -> Result<Row, Error> {
|
||||
let responses = client.send_with_flush(|buf| {
|
||||
encode_bind(statement, param, "", buf);
|
||||
frontend::execute("", 0, buf).map_err(Error::encode)?;
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
async fn typeinfo_statement(
|
||||
client: &Arc<InnerClient>,
|
||||
typecache: &mut CachedTypeInfo,
|
||||
) -> Result<Statement, Error> {
|
||||
if let Some(stmt) = &typecache.typeinfo {
|
||||
return Ok(stmt.clone());
|
||||
match responses.next().await? {
|
||||
Message::BindComplete => {}
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
}
|
||||
|
||||
let typeinfo = "neon_proxy_typeinfo";
|
||||
let stmt = prepare_typecheck(client, typeinfo, TYPEINFO_QUERY, &[]).await?;
|
||||
let row = match responses.next().await? {
|
||||
Message::DataRow(body) => Row::new(statement.clone(), body, Format::Binary)?,
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
};
|
||||
|
||||
typecache.typeinfo = Some(stmt.clone());
|
||||
Ok(stmt)
|
||||
match responses.next().await? {
|
||||
Message::CommandComplete(_) => {}
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
};
|
||||
|
||||
Ok(row)
|
||||
}
|
||||
|
||||
fn encode_bind(statement: &Statement, param: Oid, portal: &str, buf: &mut BytesMut) {
|
||||
frontend::bind(
|
||||
portal,
|
||||
statement.name(),
|
||||
[Format::Binary as i16],
|
||||
[param],
|
||||
|param, buf| {
|
||||
oid_to_sql(param, buf);
|
||||
Ok(IsNull::No)
|
||||
},
|
||||
[Format::Binary as i16],
|
||||
buf,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -1,76 +1,43 @@
|
||||
use std::fmt;
|
||||
use std::marker::PhantomPinned;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use bytes::BufMut;
|
||||
use futures_util::{Stream, ready};
|
||||
use pin_project_lite::pin_project;
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::message::frontend;
|
||||
use postgres_types2::{Format, ToSql, Type};
|
||||
use tracing::debug;
|
||||
use postgres_types2::Format;
|
||||
|
||||
use crate::client::{InnerClient, Responses};
|
||||
use crate::codec::FrontendMessage;
|
||||
use crate::connection::RequestMessages;
|
||||
use crate::types::IsNull;
|
||||
use crate::{Column, Error, ReadyForQueryStatus, Row, Statement};
|
||||
use crate::client::{CachedTypeInfo, InnerClient, Responses};
|
||||
use crate::{Error, ReadyForQueryStatus, Row, Statement};
|
||||
|
||||
struct BorrowToSqlParamsDebug<'a>(&'a [&'a (dyn ToSql + Sync)]);
|
||||
|
||||
impl fmt::Debug for BorrowToSqlParamsDebug<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_list().entries(self.0.iter()).finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn query<'a, I>(
|
||||
client: &InnerClient,
|
||||
statement: Statement,
|
||||
params: I,
|
||||
) -> Result<RowStream, Error>
|
||||
where
|
||||
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
let buf = if tracing::enabled!(tracing::Level::DEBUG) {
|
||||
let params = params.into_iter().collect::<Vec<_>>();
|
||||
debug!(
|
||||
"executing statement {} with parameters: {:?}",
|
||||
statement.name(),
|
||||
BorrowToSqlParamsDebug(params.as_slice()),
|
||||
);
|
||||
encode(client, &statement, params)?
|
||||
} else {
|
||||
encode(client, &statement, params)?
|
||||
};
|
||||
let responses = start(client, buf).await?;
|
||||
Ok(RowStream {
|
||||
statement,
|
||||
responses,
|
||||
command_tag: None,
|
||||
status: ReadyForQueryStatus::Unknown,
|
||||
output_format: Format::Binary,
|
||||
_p: PhantomPinned,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn query_txt<S, I>(
|
||||
client: &Arc<InnerClient>,
|
||||
pub async fn query_txt<'a, S, I>(
|
||||
client: &'a mut InnerClient,
|
||||
typecache: &mut CachedTypeInfo,
|
||||
query: &str,
|
||||
params: I,
|
||||
) -> Result<RowStream, Error>
|
||||
) -> Result<RowStream<'a>, Error>
|
||||
where
|
||||
S: AsRef<str>,
|
||||
I: IntoIterator<Item = Option<S>>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
let params = params.into_iter();
|
||||
let mut client = client.start()?;
|
||||
|
||||
let buf = client.with_buf(|buf| {
|
||||
// Flow:
|
||||
// 1. Parse the query
|
||||
// 2. Inspect the row description for OIDs
|
||||
// 3. If there's any OIDs we don't already know about, perform the typeinfo routine
|
||||
// 4. Execute the query
|
||||
// 5. Sync.
|
||||
//
|
||||
// The typeinfo routine:
|
||||
// 1. Parse the typeinfo query
|
||||
// 2. Execute the query on each OID
|
||||
// 3. If the result does not match an OID we know, repeat 2.
|
||||
|
||||
// parse the query and get type info
|
||||
let responses = client.send_with_flush(|buf| {
|
||||
frontend::parse(
|
||||
"", // unnamed prepared statement
|
||||
query, // query to parse
|
||||
@@ -79,7 +46,30 @@ where
|
||||
)
|
||||
.map_err(Error::encode)?;
|
||||
frontend::describe(b'S', "", buf).map_err(Error::encode)?;
|
||||
// Bind, pass params as text, retrieve as binary
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
match responses.next().await? {
|
||||
Message::ParseComplete => {}
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
}
|
||||
|
||||
match responses.next().await? {
|
||||
Message::ParameterDescription(_) => {}
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
};
|
||||
|
||||
let row_description = match responses.next().await? {
|
||||
Message::RowDescription(body) => Some(body),
|
||||
Message::NoData => None,
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
};
|
||||
|
||||
let columns =
|
||||
crate::prepare::parse_row_description(&mut client, typecache, row_description).await?;
|
||||
|
||||
let responses = client.send_with_sync(|buf| {
|
||||
// Bind, pass params as text, retrieve as text
|
||||
match frontend::bind(
|
||||
"", // empty string selects the unnamed portal
|
||||
"", // unnamed prepared statement
|
||||
@@ -102,173 +92,55 @@ where
|
||||
|
||||
// Execute
|
||||
frontend::execute("", 0, buf).map_err(Error::encode)?;
|
||||
// Sync
|
||||
frontend::sync(buf);
|
||||
|
||||
Ok(buf.split().freeze())
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
// now read the responses
|
||||
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
|
||||
|
||||
match responses.next().await? {
|
||||
Message::ParseComplete => {}
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
}
|
||||
|
||||
let parameter_description = match responses.next().await? {
|
||||
Message::ParameterDescription(body) => body,
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
};
|
||||
|
||||
let row_description = match responses.next().await? {
|
||||
Message::RowDescription(body) => Some(body),
|
||||
Message::NoData => None,
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
};
|
||||
|
||||
match responses.next().await? {
|
||||
Message::BindComplete => {}
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
}
|
||||
|
||||
let mut parameters = vec![];
|
||||
let mut it = parameter_description.parameters();
|
||||
while let Some(oid) = it.next().map_err(Error::parse)? {
|
||||
let type_ = Type::from_oid(oid).unwrap_or(Type::UNKNOWN);
|
||||
parameters.push(type_);
|
||||
}
|
||||
|
||||
let mut columns = vec![];
|
||||
if let Some(row_description) = row_description {
|
||||
let mut it = row_description.fields();
|
||||
while let Some(field) = it.next().map_err(Error::parse)? {
|
||||
let type_ = Type::from_oid(field.type_oid()).unwrap_or(Type::UNKNOWN);
|
||||
let column = Column::new(field.name().to_string(), type_, field);
|
||||
columns.push(column);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(RowStream {
|
||||
statement: Statement::new_anonymous(parameters, columns),
|
||||
responses,
|
||||
statement: Statement::new("", columns),
|
||||
command_tag: None,
|
||||
status: ReadyForQueryStatus::Unknown,
|
||||
output_format: Format::Text,
|
||||
_p: PhantomPinned,
|
||||
})
|
||||
}
|
||||
|
||||
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
|
||||
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
|
||||
|
||||
match responses.next().await? {
|
||||
Message::BindComplete => {}
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
}
|
||||
|
||||
Ok(responses)
|
||||
/// A stream of table rows.
|
||||
pub struct RowStream<'a> {
|
||||
responses: &'a mut Responses,
|
||||
output_format: Format,
|
||||
pub statement: Statement,
|
||||
pub command_tag: Option<String>,
|
||||
pub status: ReadyForQueryStatus,
|
||||
}
|
||||
|
||||
pub fn encode<'a, I>(client: &InnerClient, statement: &Statement, params: I) -> Result<Bytes, Error>
|
||||
where
|
||||
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
client.with_buf(|buf| {
|
||||
encode_bind(statement, params, "", buf)?;
|
||||
frontend::execute("", 0, buf).map_err(Error::encode)?;
|
||||
frontend::sync(buf);
|
||||
Ok(buf.split().freeze())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn encode_bind<'a, I>(
|
||||
statement: &Statement,
|
||||
params: I,
|
||||
portal: &str,
|
||||
buf: &mut BytesMut,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
let param_types = statement.params();
|
||||
let params = params.into_iter();
|
||||
|
||||
assert!(
|
||||
param_types.len() == params.len(),
|
||||
"expected {} parameters but got {}",
|
||||
param_types.len(),
|
||||
params.len()
|
||||
);
|
||||
|
||||
let (param_formats, params): (Vec<_>, Vec<_>) = params
|
||||
.zip(param_types.iter())
|
||||
.map(|(p, ty)| (p.encode_format(ty) as i16, p))
|
||||
.unzip();
|
||||
|
||||
let params = params.into_iter();
|
||||
|
||||
let mut error_idx = 0;
|
||||
let r = frontend::bind(
|
||||
portal,
|
||||
statement.name(),
|
||||
param_formats,
|
||||
params.zip(param_types).enumerate(),
|
||||
|(idx, (param, ty)), buf| match param.to_sql_checked(ty, buf) {
|
||||
Ok(IsNull::No) => Ok(postgres_protocol2::IsNull::No),
|
||||
Ok(IsNull::Yes) => Ok(postgres_protocol2::IsNull::Yes),
|
||||
Err(e) => {
|
||||
error_idx = idx;
|
||||
Err(e)
|
||||
}
|
||||
},
|
||||
Some(1),
|
||||
buf,
|
||||
);
|
||||
match r {
|
||||
Ok(()) => Ok(()),
|
||||
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, error_idx)),
|
||||
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// A stream of table rows.
|
||||
pub struct RowStream {
|
||||
statement: Statement,
|
||||
responses: Responses,
|
||||
command_tag: Option<String>,
|
||||
output_format: Format,
|
||||
status: ReadyForQueryStatus,
|
||||
#[pin]
|
||||
_p: PhantomPinned,
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for RowStream {
|
||||
impl Stream for RowStream<'_> {
|
||||
type Item = Result<Row, Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
let this = self.get_mut();
|
||||
loop {
|
||||
match ready!(this.responses.poll_next(cx)?) {
|
||||
Message::DataRow(body) => {
|
||||
return Poll::Ready(Some(Ok(Row::new(
|
||||
this.statement.clone(),
|
||||
body,
|
||||
*this.output_format,
|
||||
this.output_format,
|
||||
)?)));
|
||||
}
|
||||
Message::EmptyQueryResponse | Message::PortalSuspended => {}
|
||||
Message::CommandComplete(body) => {
|
||||
if let Ok(tag) = body.tag() {
|
||||
*this.command_tag = Some(tag.to_string());
|
||||
this.command_tag = Some(tag.to_string());
|
||||
}
|
||||
}
|
||||
Message::ReadyForQuery(status) => {
|
||||
*this.status = status.into();
|
||||
this.status = status.into();
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
|
||||
@@ -276,24 +148,3 @@ impl Stream for RowStream {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RowStream {
|
||||
/// Returns information about the columns of data in the row.
|
||||
pub fn columns(&self) -> &[Column] {
|
||||
self.statement.columns()
|
||||
}
|
||||
|
||||
/// Returns the command tag of this query.
|
||||
///
|
||||
/// This is only available after the stream has been exhausted.
|
||||
pub fn command_tag(&self) -> Option<String> {
|
||||
self.command_tag.clone()
|
||||
}
|
||||
|
||||
/// Returns if the connection is ready for querying, with the status of the connection.
|
||||
///
|
||||
/// This might be available only after the stream has been exhausted.
|
||||
pub fn ready_status(&self) -> ReadyForQueryStatus {
|
||||
self.status
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,19 +1,14 @@
|
||||
use std::marker::PhantomPinned;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use bytes::Bytes;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{Stream, ready};
|
||||
use pin_project_lite::pin_project;
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::message::frontend;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::client::{InnerClient, Responses};
|
||||
use crate::codec::FrontendMessage;
|
||||
use crate::connection::RequestMessages;
|
||||
use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
|
||||
|
||||
/// Information about a column of a single query row.
|
||||
@@ -33,28 +28,28 @@ impl SimpleColumn {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
|
||||
pub async fn simple_query<'a>(
|
||||
client: &'a mut InnerClient,
|
||||
query: &str,
|
||||
) -> Result<SimpleQueryStream<'a>, Error> {
|
||||
debug!("executing simple query: {}", query);
|
||||
|
||||
let buf = encode(client, query)?;
|
||||
let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
|
||||
let responses = client.send_simple_query(query)?;
|
||||
|
||||
Ok(SimpleQueryStream {
|
||||
responses,
|
||||
columns: None,
|
||||
status: ReadyForQueryStatus::Unknown,
|
||||
_p: PhantomPinned,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn batch_execute(
|
||||
client: &InnerClient,
|
||||
client: &mut InnerClient,
|
||||
query: &str,
|
||||
) -> Result<ReadyForQueryStatus, Error> {
|
||||
debug!("executing statement batch: {}", query);
|
||||
|
||||
let buf = encode(client, query)?;
|
||||
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
|
||||
let responses = client.send_simple_query(query)?;
|
||||
|
||||
loop {
|
||||
match responses.next().await? {
|
||||
@@ -68,25 +63,16 @@ pub async fn batch_execute(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
|
||||
client.with_buf(|buf| {
|
||||
frontend::query(query, buf).map_err(Error::encode)?;
|
||||
Ok(buf.split().freeze())
|
||||
})
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// A stream of simple query results.
|
||||
pub struct SimpleQueryStream {
|
||||
responses: Responses,
|
||||
pub struct SimpleQueryStream<'a> {
|
||||
responses: &'a mut Responses,
|
||||
columns: Option<Arc<[SimpleColumn]>>,
|
||||
status: ReadyForQueryStatus,
|
||||
#[pin]
|
||||
_p: PhantomPinned,
|
||||
}
|
||||
}
|
||||
|
||||
impl SimpleQueryStream {
|
||||
impl SimpleQueryStream<'_> {
|
||||
/// Returns if the connection is ready for querying, with the status of the connection.
|
||||
///
|
||||
/// This might be available only after the stream has been exhausted.
|
||||
@@ -95,7 +81,7 @@ impl SimpleQueryStream {
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for SimpleQueryStream {
|
||||
impl Stream for SimpleQueryStream<'_> {
|
||||
type Item = Result<SimpleQueryMessage, Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
|
||||
@@ -1,35 +1,15 @@
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::types::Type;
|
||||
use postgres_protocol2::Oid;
|
||||
use postgres_protocol2::message::backend::Field;
|
||||
use postgres_protocol2::message::frontend;
|
||||
|
||||
use crate::client::InnerClient;
|
||||
use crate::codec::FrontendMessage;
|
||||
use crate::connection::RequestMessages;
|
||||
use crate::types::Type;
|
||||
|
||||
struct StatementInner {
|
||||
client: Weak<InnerClient>,
|
||||
name: &'static str,
|
||||
params: Vec<Type>,
|
||||
columns: Vec<Column>,
|
||||
}
|
||||
|
||||
impl Drop for StatementInner {
|
||||
fn drop(&mut self) {
|
||||
if let Some(client) = self.client.upgrade() {
|
||||
let buf = client.with_buf(|buf| {
|
||||
frontend::close(b'S', self.name, buf).unwrap();
|
||||
frontend::sync(buf);
|
||||
buf.split().freeze()
|
||||
});
|
||||
let _ = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A prepared statement.
|
||||
///
|
||||
/// Prepared statements can only be used with the connection that created them.
|
||||
@@ -37,38 +17,14 @@ impl Drop for StatementInner {
|
||||
pub struct Statement(Arc<StatementInner>);
|
||||
|
||||
impl Statement {
|
||||
pub(crate) fn new(
|
||||
inner: &Arc<InnerClient>,
|
||||
name: &'static str,
|
||||
params: Vec<Type>,
|
||||
columns: Vec<Column>,
|
||||
) -> Statement {
|
||||
Statement(Arc::new(StatementInner {
|
||||
client: Arc::downgrade(inner),
|
||||
name,
|
||||
params,
|
||||
columns,
|
||||
}))
|
||||
}
|
||||
|
||||
pub(crate) fn new_anonymous(params: Vec<Type>, columns: Vec<Column>) -> Statement {
|
||||
Statement(Arc::new(StatementInner {
|
||||
client: Weak::new(),
|
||||
name: "<anonymous>",
|
||||
params,
|
||||
columns,
|
||||
}))
|
||||
pub(crate) fn new(name: &'static str, columns: Vec<Column>) -> Statement {
|
||||
Statement(Arc::new(StatementInner { name, columns }))
|
||||
}
|
||||
|
||||
pub(crate) fn name(&self) -> &str {
|
||||
self.0.name
|
||||
}
|
||||
|
||||
/// Returns the expected types of the statement's parameters.
|
||||
pub fn params(&self) -> &[Type] {
|
||||
&self.0.params
|
||||
}
|
||||
|
||||
/// Returns information about the columns returned when the statement is queried.
|
||||
pub fn columns(&self) -> &[Column] {
|
||||
&self.0.columns
|
||||
@@ -78,7 +34,7 @@ impl Statement {
|
||||
/// Information about a column of a query.
|
||||
pub struct Column {
|
||||
name: String,
|
||||
type_: Type,
|
||||
pub(crate) type_: Type,
|
||||
|
||||
// raw fields from RowDescription
|
||||
table_oid: Oid,
|
||||
|
||||
@@ -1,7 +1,3 @@
|
||||
use postgres_protocol2::message::frontend;
|
||||
|
||||
use crate::codec::FrontendMessage;
|
||||
use crate::connection::RequestMessages;
|
||||
use crate::query::RowStream;
|
||||
use crate::{CancelToken, Client, Error, ReadyForQueryStatus};
|
||||
|
||||
@@ -20,14 +16,7 @@ impl Drop for Transaction<'_> {
|
||||
return;
|
||||
}
|
||||
|
||||
let buf = self.client.inner().with_buf(|buf| {
|
||||
frontend::query("ROLLBACK", buf).unwrap();
|
||||
buf.split().freeze()
|
||||
});
|
||||
let _ = self
|
||||
.client
|
||||
.inner()
|
||||
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
|
||||
let _ = self.client.inner_mut().send_simple_query("ROLLBACK");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,7 +43,11 @@ impl<'a> Transaction<'a> {
|
||||
}
|
||||
|
||||
/// Like `Client::query_raw_txt`.
|
||||
pub async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
|
||||
pub async fn query_raw_txt<S, I>(
|
||||
&mut self,
|
||||
statement: &str,
|
||||
params: I,
|
||||
) -> Result<RowStream, Error>
|
||||
where
|
||||
S: AsRef<str>,
|
||||
I: IntoIterator<Item = Option<S>>,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#![allow(clippy::todo)]
|
||||
|
||||
use std::ffi::CString;
|
||||
use std::str::FromStr;
|
||||
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use utils::id::TenantTimelineId;
|
||||
@@ -173,6 +174,8 @@ pub struct Config {
|
||||
pub ttid: TenantTimelineId,
|
||||
/// List of safekeepers in format `host:port`
|
||||
pub safekeepers_list: Vec<String>,
|
||||
/// libpq connection info options
|
||||
pub safekeeper_conninfo_options: String,
|
||||
/// Safekeeper reconnect timeout in milliseconds
|
||||
pub safekeeper_reconnect_timeout: i32,
|
||||
/// Safekeeper connection timeout in milliseconds
|
||||
@@ -202,6 +205,9 @@ impl Wrapper {
|
||||
.into_bytes_with_nul();
|
||||
assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
|
||||
let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
|
||||
let safekeeper_conninfo_options = CString::from_str(&config.safekeeper_conninfo_options)
|
||||
.unwrap()
|
||||
.into_raw();
|
||||
|
||||
let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
|
||||
|
||||
@@ -209,6 +215,7 @@ impl Wrapper {
|
||||
neon_tenant,
|
||||
neon_timeline,
|
||||
safekeepers_list,
|
||||
safekeeper_conninfo_options,
|
||||
safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
|
||||
safekeeper_connection_timeout: config.safekeeper_connection_timeout,
|
||||
wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
|
||||
@@ -576,6 +583,7 @@ mod tests {
|
||||
let config = crate::walproposer::Config {
|
||||
ttid,
|
||||
safekeepers_list: vec!["localhost:5000".to_string()],
|
||||
safekeeper_conninfo_options: String::new(),
|
||||
safekeeper_reconnect_timeout: 1000,
|
||||
safekeeper_connection_timeout: 10000,
|
||||
sync_safekeepers: true,
|
||||
|
||||
@@ -17,50 +17,69 @@ anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-stream.workspace = true
|
||||
bit_field.workspace = true
|
||||
bincode.workspace = true
|
||||
bit_field.workspace = true
|
||||
byteorder.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
camino.workspace = true
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
clap = { workspace = true, features = ["string"] }
|
||||
consumption_metrics.workspace = true
|
||||
crc32c.workspace = true
|
||||
either.workspace = true
|
||||
enum-map.workspace = true
|
||||
enumset = { workspace = true, features = ["serde"]}
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
hashlink.workspace = true
|
||||
hex.workspace = true
|
||||
humantime.workspace = true
|
||||
http-utils.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
humantime.workspace = true
|
||||
hyper0.workspace = true
|
||||
itertools.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
md5.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
num_cpus.workspace = true
|
||||
num_cpus.workspace = true # hack to get the number of worker threads tokio uses
|
||||
num-traits.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that
|
||||
pageserver_compaction.workspace = true
|
||||
pageserver_page_api.workspace = true
|
||||
pem.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
postgres_initdb.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
postgres-types.workspace = true
|
||||
postgres_initdb.workspace = true
|
||||
posthog_client_lite.workspace = true
|
||||
pprof.workspace = true
|
||||
pq_proto.workspace = true
|
||||
rand.workspace = true
|
||||
range-set-blaze = { version = "0.1.16", features = ["alloc"] }
|
||||
regex.workspace = true
|
||||
remote_storage.workspace = true
|
||||
reqwest.workspace = true
|
||||
rpds.workspace = true
|
||||
rustls.workspace = true
|
||||
scopeguard.workspace = true
|
||||
send-future.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json = { workspace = true, features = ["raw_value"] }
|
||||
serde_path_to_error.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde.workspace = true
|
||||
smallvec.workspace = true
|
||||
storage_broker.workspace = true
|
||||
strum_macros.workspace = true
|
||||
strum.workspace = true
|
||||
sysinfo.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
tenant_size_model.workspace = true
|
||||
thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
|
||||
@@ -69,34 +88,18 @@ tokio-io-timeout.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit = { workspace = true, features = [ "serde" ] }
|
||||
tonic.workspace = true
|
||||
tonic-reflection.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
url.workspace = true
|
||||
walkdir.workspace = true
|
||||
metrics.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that
|
||||
pageserver_compaction.workspace = true
|
||||
pem.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
pq_proto.workspace = true
|
||||
remote_storage.workspace = true
|
||||
storage_broker.workspace = true
|
||||
tenant_size_model.workspace = true
|
||||
http-utils.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
reqwest.workspace = true
|
||||
rpds.workspace = true
|
||||
enum-map.workspace = true
|
||||
enumset = { workspace = true, features = ["serde"]}
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
wal_decoder.workspace = true
|
||||
smallvec.workspace = true
|
||||
walkdir.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
twox-hash.workspace = true
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
|
||||
@@ -5,8 +5,14 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
bytes.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
prost.workspace = true
|
||||
smallvec.workspace = true
|
||||
thiserror.workspace = true
|
||||
tonic.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
|
||||
@@ -54,9 +54,9 @@ service PageService {
|
||||
// RPCs use regular unary requests, since they are not as frequent and
|
||||
// performance-critical, and this simplifies implementation.
|
||||
//
|
||||
// NB: a status response (e.g. errors) will terminate the stream. The stream
|
||||
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
|
||||
// Most errors are therefore sent as GetPageResponse.status instead.
|
||||
// NB: a gRPC status response (e.g. errors) will terminate the stream. The
|
||||
// stream may be shared by multiple Postgres backends, so we avoid this by
|
||||
// sending them as GetPageResponse.status_code instead.
|
||||
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
|
||||
|
||||
// Returns the size of a relation, as # of blocks.
|
||||
@@ -159,8 +159,8 @@ message GetPageRequest {
|
||||
// A GetPageRequest class. Primarily intended for observability, but may also be
|
||||
// used for prioritization in the future.
|
||||
enum GetPageClass {
|
||||
// Unknown class. For forwards compatibility: used when the client sends a
|
||||
// class that the server doesn't know about.
|
||||
// Unknown class. For backwards compatibility: used when an older client version sends a class
|
||||
// that a newer server version has removed.
|
||||
GET_PAGE_CLASS_UNKNOWN = 0;
|
||||
// A normal request. This is the default.
|
||||
GET_PAGE_CLASS_NORMAL = 1;
|
||||
@@ -180,31 +180,37 @@ message GetPageResponse {
|
||||
// The original request's ID.
|
||||
uint64 request_id = 1;
|
||||
// The response status code.
|
||||
GetPageStatus status = 2;
|
||||
GetPageStatusCode status_code = 2;
|
||||
// A string describing the status, if any.
|
||||
string reason = 3;
|
||||
// The 8KB page images, in the same order as the request. Empty if status != OK.
|
||||
// The 8KB page images, in the same order as the request. Empty if status_code != OK.
|
||||
repeated bytes page_image = 4;
|
||||
}
|
||||
|
||||
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
|
||||
// want to send errors as gRPC statuses, since this would terminate the stream.
|
||||
enum GetPageStatus {
|
||||
// Unknown status. For forwards compatibility: used when the server sends a
|
||||
// status code that the client doesn't know about.
|
||||
GET_PAGE_STATUS_UNKNOWN = 0;
|
||||
// A GetPageResponse status code.
|
||||
//
|
||||
// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
|
||||
// (potentially shared by many backends), and a gRPC status response would terminate the stream so
|
||||
// we send GetPageResponse messages with these codes instead.
|
||||
enum GetPageStatusCode {
|
||||
// Unknown status. For forwards compatibility: used when an older client version receives a new
|
||||
// status code from a newer server version.
|
||||
GET_PAGE_STATUS_CODE_UNKNOWN = 0;
|
||||
// The request was successful.
|
||||
GET_PAGE_STATUS_OK = 1;
|
||||
GET_PAGE_STATUS_CODE_OK = 1;
|
||||
// The page did not exist. The tenant/timeline/shard has already been
|
||||
// validated during stream setup.
|
||||
GET_PAGE_STATUS_NOT_FOUND = 2;
|
||||
GET_PAGE_STATUS_CODE_NOT_FOUND = 2;
|
||||
// The request was invalid.
|
||||
GET_PAGE_STATUS_INVALID = 3;
|
||||
GET_PAGE_STATUS_CODE_INVALID_REQUEST = 3;
|
||||
// The request failed due to an internal server error.
|
||||
GET_PAGE_STATUS_CODE_INTERNAL_ERROR = 4;
|
||||
// The tenant is rate limited. Slow down and retry later.
|
||||
GET_PAGE_STATUS_SLOW_DOWN = 4;
|
||||
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
|
||||
// layer download. This could free up the server task to process other
|
||||
// requests while the layer download is in progress.
|
||||
GET_PAGE_STATUS_CODE_SLOW_DOWN = 5;
|
||||
// NB: shutdown errors are emitted as a gRPC Unavailable status.
|
||||
//
|
||||
// TODO: consider adding a GET_PAGE_STATUS_CODE_LAYER_DOWNLOAD in the case of a layer download.
|
||||
// This could free up the server task to process other requests while the download is in progress.
|
||||
}
|
||||
|
||||
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on
|
||||
|
||||
@@ -17,3 +17,7 @@ pub mod proto {
|
||||
pub use page_service_client::PageServiceClient;
|
||||
pub use page_service_server::{PageService, PageServiceServer};
|
||||
}
|
||||
|
||||
mod model;
|
||||
|
||||
pub use model::*;
|
||||
|
||||
596
pageserver/page_api/src/model.rs
Normal file
596
pageserver/page_api/src/model.rs
Normal file
@@ -0,0 +1,596 @@
|
||||
//! Structs representing the canonical page service API.
|
||||
//!
|
||||
//! These mirror the autogenerated Protobuf types. The differences are:
|
||||
//!
|
||||
//! - Types that are in fact required by the API are not Options. The protobuf "required"
|
||||
//! attribute is deprecated and 'prost' marks a lot of members as optional because of that.
|
||||
//! (See <https://github.com/tokio-rs/prost/issues/800> for a gripe on this)
|
||||
//!
|
||||
//! - Use more precise datatypes, e.g. Lsn and uints shorter than 32 bits.
|
||||
//!
|
||||
//! - Validate protocol invariants, via try_from() and try_into().
|
||||
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::Oid;
|
||||
use smallvec::SmallVec;
|
||||
// TODO: split out Lsn, RelTag, SlruKind, Oid and other basic types to a separate crate, to avoid
|
||||
// pulling in all of their other crate dependencies when building the client.
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::proto;
|
||||
|
||||
/// A protocol error. Typically returned via try_from() or try_into().
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ProtocolError {
|
||||
#[error("field '{0}' has invalid value '{1}'")]
|
||||
Invalid(&'static str, String),
|
||||
#[error("required field '{0}' is missing")]
|
||||
Missing(&'static str),
|
||||
}
|
||||
|
||||
impl ProtocolError {
|
||||
/// Helper to generate a new ProtocolError::Invalid for the given field and value.
|
||||
pub fn invalid(field: &'static str, value: impl std::fmt::Debug) -> Self {
|
||||
Self::Invalid(field, format!("{value:?}"))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ProtocolError> for tonic::Status {
|
||||
fn from(err: ProtocolError) -> Self {
|
||||
tonic::Status::invalid_argument(format!("{err}"))
|
||||
}
|
||||
}
|
||||
|
||||
/// The LSN a request should read at.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct ReadLsn {
|
||||
/// The request's read LSN.
|
||||
pub request_lsn: Lsn,
|
||||
/// If given, the caller guarantees that the page has not been modified since this LSN. Must be
|
||||
/// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page
|
||||
/// without waiting for the request LSN to arrive. Valid for all request types.
|
||||
///
|
||||
/// It is undefined behaviour to make a request such that the page was, in fact, modified
|
||||
/// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an
|
||||
/// error, or it might return the old page version or the new page version. Setting
|
||||
/// not_modified_since_lsn equal to request_lsn is always safe, but can lead to unnecessary
|
||||
/// waiting.
|
||||
pub not_modified_since_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
impl ReadLsn {
|
||||
/// Validates the ReadLsn.
|
||||
pub fn validate(&self) -> Result<(), ProtocolError> {
|
||||
if self.request_lsn == Lsn::INVALID {
|
||||
return Err(ProtocolError::invalid("request_lsn", self.request_lsn));
|
||||
}
|
||||
if self.not_modified_since_lsn > Some(self.request_lsn) {
|
||||
return Err(ProtocolError::invalid(
|
||||
"not_modified_since_lsn",
|
||||
self.not_modified_since_lsn,
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<proto::ReadLsn> for ReadLsn {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::ReadLsn) -> Result<Self, Self::Error> {
|
||||
let read_lsn = Self {
|
||||
request_lsn: Lsn(pb.request_lsn),
|
||||
not_modified_since_lsn: match pb.not_modified_since_lsn {
|
||||
0 => None,
|
||||
lsn => Some(Lsn(lsn)),
|
||||
},
|
||||
};
|
||||
read_lsn.validate()?;
|
||||
Ok(read_lsn)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ReadLsn> for proto::ReadLsn {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(read_lsn: ReadLsn) -> Result<Self, Self::Error> {
|
||||
read_lsn.validate()?;
|
||||
Ok(Self {
|
||||
request_lsn: read_lsn.request_lsn.0,
|
||||
not_modified_since_lsn: read_lsn.not_modified_since_lsn.unwrap_or_default().0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// RelTag is defined in pageserver_api::reltag.
|
||||
pub type RelTag = pageserver_api::reltag::RelTag;
|
||||
|
||||
impl TryFrom<proto::RelTag> for RelTag {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::RelTag) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
spcnode: pb.spc_oid,
|
||||
dbnode: pb.db_oid,
|
||||
relnode: pb.rel_number,
|
||||
forknum: pb
|
||||
.fork_number
|
||||
.try_into()
|
||||
.map_err(|_| ProtocolError::invalid("fork_number", pb.fork_number))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RelTag> for proto::RelTag {
|
||||
fn from(rel_tag: RelTag) -> Self {
|
||||
Self {
|
||||
spc_oid: rel_tag.spcnode,
|
||||
db_oid: rel_tag.dbnode,
|
||||
rel_number: rel_tag.relnode,
|
||||
fork_number: rel_tag.forknum as u32,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct CheckRelExistsRequest {
|
||||
pub read_lsn: ReadLsn,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::CheckRelExistsRequest> for CheckRelExistsRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::CheckRelExistsRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type CheckRelExistsResponse = bool;
|
||||
|
||||
impl From<proto::CheckRelExistsResponse> for CheckRelExistsResponse {
|
||||
fn from(pb: proto::CheckRelExistsResponse) -> Self {
|
||||
pb.exists
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
|
||||
fn from(exists: CheckRelExistsResponse) -> Self {
|
||||
Self { exists }
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests a base backup at a given LSN.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct GetBaseBackupRequest {
|
||||
/// The LSN to fetch a base backup at.
|
||||
pub read_lsn: ReadLsn,
|
||||
/// If true, logical replication slots will not be created.
|
||||
pub replica: bool,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
replica: pb.replica,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(request: GetBaseBackupRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: Some(request.read_lsn.try_into()?),
|
||||
replica: request.replica,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type GetBaseBackupResponseChunk = Bytes;
|
||||
|
||||
impl TryFrom<proto::GetBaseBackupResponseChunk> for GetBaseBackupResponseChunk {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetBaseBackupResponseChunk) -> Result<Self, Self::Error> {
|
||||
if pb.chunk.is_empty() {
|
||||
return Err(ProtocolError::Missing("chunk"));
|
||||
}
|
||||
Ok(pb.chunk)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetBaseBackupResponseChunk> for proto::GetBaseBackupResponseChunk {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(chunk: GetBaseBackupResponseChunk) -> Result<Self, Self::Error> {
|
||||
if chunk.is_empty() {
|
||||
return Err(ProtocolError::Missing("chunk"));
|
||||
}
|
||||
Ok(Self { chunk })
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests the size of a database, as # of bytes. Only valid on shard 0, other shards will error.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct GetDbSizeRequest {
|
||||
pub read_lsn: ReadLsn,
|
||||
pub db_oid: Oid,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::GetDbSizeRequest> for GetDbSizeRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetDbSizeRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
db_oid: pb.db_oid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetDbSizeRequest> for proto::GetDbSizeRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(request: GetDbSizeRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: Some(request.read_lsn.try_into()?),
|
||||
db_oid: request.db_oid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type GetDbSizeResponse = u64;
|
||||
|
||||
impl From<proto::GetDbSizeResponse> for GetDbSizeResponse {
|
||||
fn from(pb: proto::GetDbSizeResponse) -> Self {
|
||||
pb.num_bytes
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetDbSizeResponse> for proto::GetDbSizeResponse {
|
||||
fn from(num_bytes: GetDbSizeResponse) -> Self {
|
||||
Self { num_bytes }
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests one or more pages.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct GetPageRequest {
|
||||
/// A request ID. Will be included in the response. Should be unique for in-flight requests on
|
||||
/// the stream.
|
||||
pub request_id: RequestID,
|
||||
/// The request class.
|
||||
pub request_class: GetPageClass,
|
||||
/// The LSN to read at.
|
||||
pub read_lsn: ReadLsn,
|
||||
/// The relation to read from.
|
||||
pub rel: RelTag,
|
||||
/// Page numbers to read. Must belong to the remote shard.
|
||||
///
|
||||
/// Multiple pages will be executed as a single batch by the Pageserver, amortizing layer access
|
||||
/// costs and parallelizing them. This may increase the latency of any individual request, but
|
||||
/// improves the overall latency and throughput of the batch as a whole.
|
||||
pub block_numbers: SmallVec<[u32; 1]>,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::GetPageRequest> for GetPageRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetPageRequest) -> Result<Self, Self::Error> {
|
||||
if pb.block_number.is_empty() {
|
||||
return Err(ProtocolError::Missing("block_number"));
|
||||
}
|
||||
Ok(Self {
|
||||
request_id: pb.request_id,
|
||||
request_class: pb.request_class.into(),
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
|
||||
block_numbers: pb.block_number.into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetPageRequest> for proto::GetPageRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(request: GetPageRequest) -> Result<Self, Self::Error> {
|
||||
if request.block_numbers.is_empty() {
|
||||
return Err(ProtocolError::Missing("block_number"));
|
||||
}
|
||||
Ok(Self {
|
||||
request_id: request.request_id,
|
||||
request_class: request.request_class.into(),
|
||||
read_lsn: Some(request.read_lsn.try_into()?),
|
||||
rel: Some(request.rel.into()),
|
||||
block_number: request.block_numbers.into_vec(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A GetPage request ID.
|
||||
pub type RequestID = u64;
|
||||
|
||||
/// A GetPage request class.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum GetPageClass {
|
||||
/// Unknown class. For backwards compatibility: used when an older client version sends a class
|
||||
/// that a newer server version has removed.
|
||||
Unknown,
|
||||
/// A normal request. This is the default.
|
||||
Normal,
|
||||
/// A prefetch request. NB: can only be classified on pg < 18.
|
||||
Prefetch,
|
||||
/// A background request (e.g. vacuum).
|
||||
Background,
|
||||
}
|
||||
|
||||
impl From<proto::GetPageClass> for GetPageClass {
|
||||
fn from(pb: proto::GetPageClass) -> Self {
|
||||
match pb {
|
||||
proto::GetPageClass::Unknown => Self::Unknown,
|
||||
proto::GetPageClass::Normal => Self::Normal,
|
||||
proto::GetPageClass::Prefetch => Self::Prefetch,
|
||||
proto::GetPageClass::Background => Self::Background,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<i32> for GetPageClass {
|
||||
fn from(class: i32) -> Self {
|
||||
proto::GetPageClass::try_from(class)
|
||||
.unwrap_or(proto::GetPageClass::Unknown)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetPageClass> for proto::GetPageClass {
|
||||
fn from(class: GetPageClass) -> Self {
|
||||
match class {
|
||||
GetPageClass::Unknown => Self::Unknown,
|
||||
GetPageClass::Normal => Self::Normal,
|
||||
GetPageClass::Prefetch => Self::Prefetch,
|
||||
GetPageClass::Background => Self::Background,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetPageClass> for i32 {
|
||||
fn from(class: GetPageClass) -> Self {
|
||||
proto::GetPageClass::from(class).into()
|
||||
}
|
||||
}
|
||||
|
||||
/// A GetPage response.
|
||||
///
|
||||
/// A batch response will contain all of the requested pages. We could eagerly emit individual pages
|
||||
/// as soon as they are ready, but on a readv() Postgres holds buffer pool locks on all pages in the
|
||||
/// batch and we'll only return once the entire batch is ready, so no one can make use of the
|
||||
/// individual pages.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct GetPageResponse {
|
||||
/// The original request's ID.
|
||||
pub request_id: RequestID,
|
||||
/// The response status code.
|
||||
pub status_code: GetPageStatusCode,
|
||||
/// A string describing the status, if any.
|
||||
pub reason: Option<String>,
|
||||
/// The 8KB page images, in the same order as the request. Empty if status != OK.
|
||||
pub page_images: SmallVec<[Bytes; 1]>,
|
||||
}
|
||||
|
||||
impl From<proto::GetPageResponse> for GetPageResponse {
|
||||
fn from(pb: proto::GetPageResponse) -> Self {
|
||||
Self {
|
||||
request_id: pb.request_id,
|
||||
status_code: pb.status_code.into(),
|
||||
reason: Some(pb.reason).filter(|r| !r.is_empty()),
|
||||
page_images: pb.page_image.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetPageResponse> for proto::GetPageResponse {
|
||||
fn from(response: GetPageResponse) -> Self {
|
||||
Self {
|
||||
request_id: response.request_id,
|
||||
status_code: response.status_code.into(),
|
||||
reason: response.reason.unwrap_or_default(),
|
||||
page_image: response.page_images.into_vec(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A GetPage response status code.
|
||||
///
|
||||
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
|
||||
/// (potentially shared by many backends), and a gRPC status response would terminate the stream so
|
||||
/// we send GetPageResponse messages with these codes instead.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum GetPageStatusCode {
|
||||
/// Unknown status. For forwards compatibility: used when an older client version receives a new
|
||||
/// status code from a newer server version.
|
||||
Unknown,
|
||||
/// The request was successful.
|
||||
Ok,
|
||||
/// The page did not exist. The tenant/timeline/shard has already been validated during stream
|
||||
/// setup.
|
||||
NotFound,
|
||||
/// The request was invalid.
|
||||
InvalidRequest,
|
||||
/// The request failed due to an internal server error.
|
||||
InternalError,
|
||||
/// The tenant is rate limited. Slow down and retry later.
|
||||
SlowDown,
|
||||
}
|
||||
|
||||
impl From<proto::GetPageStatusCode> for GetPageStatusCode {
|
||||
fn from(pb: proto::GetPageStatusCode) -> Self {
|
||||
match pb {
|
||||
proto::GetPageStatusCode::Unknown => Self::Unknown,
|
||||
proto::GetPageStatusCode::Ok => Self::Ok,
|
||||
proto::GetPageStatusCode::NotFound => Self::NotFound,
|
||||
proto::GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
|
||||
proto::GetPageStatusCode::InternalError => Self::InternalError,
|
||||
proto::GetPageStatusCode::SlowDown => Self::SlowDown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<i32> for GetPageStatusCode {
|
||||
fn from(status_code: i32) -> Self {
|
||||
proto::GetPageStatusCode::try_from(status_code)
|
||||
.unwrap_or(proto::GetPageStatusCode::Unknown)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetPageStatusCode> for proto::GetPageStatusCode {
|
||||
fn from(status_code: GetPageStatusCode) -> Self {
|
||||
match status_code {
|
||||
GetPageStatusCode::Unknown => Self::Unknown,
|
||||
GetPageStatusCode::Ok => Self::Ok,
|
||||
GetPageStatusCode::NotFound => Self::NotFound,
|
||||
GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
|
||||
GetPageStatusCode::InternalError => Self::InternalError,
|
||||
GetPageStatusCode::SlowDown => Self::SlowDown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetPageStatusCode> for i32 {
|
||||
fn from(status_code: GetPageStatusCode) -> Self {
|
||||
proto::GetPageStatusCode::from(status_code).into()
|
||||
}
|
||||
}
|
||||
|
||||
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
|
||||
// shards will error.
|
||||
pub struct GetRelSizeRequest {
|
||||
pub read_lsn: ReadLsn,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(proto: proto::GetRelSizeRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: proto
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetRelSizeRequest> for proto::GetRelSizeRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(request: GetRelSizeRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: Some(request.read_lsn.try_into()?),
|
||||
rel: Some(request.rel.into()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type GetRelSizeResponse = u32;
|
||||
|
||||
impl From<proto::GetRelSizeResponse> for GetRelSizeResponse {
|
||||
fn from(proto: proto::GetRelSizeResponse) -> Self {
|
||||
proto.num_blocks
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
|
||||
fn from(num_blocks: GetRelSizeResponse) -> Self {
|
||||
Self { num_blocks }
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests an SLRU segment. Only valid on shard 0, other shards will error.
|
||||
pub struct GetSlruSegmentRequest {
|
||||
pub read_lsn: ReadLsn,
|
||||
pub kind: SlruKind,
|
||||
pub segno: u32,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::GetSlruSegmentRequest> for GetSlruSegmentRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetSlruSegmentRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
kind: u8::try_from(pb.kind)
|
||||
.ok()
|
||||
.and_then(SlruKind::from_repr)
|
||||
.ok_or_else(|| ProtocolError::invalid("slru_kind", pb.kind))?,
|
||||
segno: pb.segno,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetSlruSegmentRequest> for proto::GetSlruSegmentRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(request: GetSlruSegmentRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: Some(request.read_lsn.try_into()?),
|
||||
kind: request.kind as u32,
|
||||
segno: request.segno,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type GetSlruSegmentResponse = Bytes;
|
||||
|
||||
impl TryFrom<proto::GetSlruSegmentResponse> for GetSlruSegmentResponse {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetSlruSegmentResponse) -> Result<Self, Self::Error> {
|
||||
if pb.segment.is_empty() {
|
||||
return Err(ProtocolError::Missing("segment"));
|
||||
}
|
||||
Ok(pb.segment)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(segment: GetSlruSegmentResponse) -> Result<Self, Self::Error> {
|
||||
// TODO: can a segment legitimately be empty?
|
||||
if segment.is_empty() {
|
||||
return Err(ProtocolError::Missing("segment"));
|
||||
}
|
||||
Ok(Self { segment })
|
||||
}
|
||||
}
|
||||
|
||||
// SlruKind is defined in pageserver_api::reltag.
|
||||
pub type SlruKind = pageserver_api::reltag::SlruKind;
|
||||
@@ -8,6 +8,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
@@ -15,14 +16,17 @@ hdrhistogram.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace=true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
pageserver_client.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_page_api.workspace = true
|
||||
utils = { path = "../../libs/utils/" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
@@ -7,11 +7,15 @@ use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
|
||||
use pageserver_api::models::{
|
||||
PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_page_api::proto;
|
||||
use rand::prelude::*;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -22,6 +26,12 @@ use utils::lsn::Lsn;
|
||||
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
|
||||
use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
|
||||
#[derive(clap::ValueEnum, Clone, Debug)]
|
||||
enum Protocol {
|
||||
Libpq,
|
||||
Grpc,
|
||||
}
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
@@ -35,6 +45,8 @@ pub(crate) struct Args {
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
#[clap(long, value_enum, default_value = "libpq")]
|
||||
protocol: Protocol,
|
||||
/// Each client sends requests at the given rate.
|
||||
///
|
||||
/// If a request takes too long and we should be issuing a new request already,
|
||||
@@ -303,7 +315,20 @@ async fn main_impl(
|
||||
.unwrap();
|
||||
|
||||
Box::pin(async move {
|
||||
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
|
||||
let client: Box<dyn Client> = match args.protocol {
|
||||
Protocol::Libpq => Box::new(
|
||||
LibpqClient::new(args.page_service_connstring.clone(), worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
|
||||
Protocol::Grpc => Box::new(
|
||||
GrpcClient::new(args.page_service_connstring.clone(), worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
};
|
||||
run_worker(args, client, ss, cancel, rps_period, ranges, weights).await
|
||||
})
|
||||
};
|
||||
|
||||
@@ -355,23 +380,15 @@ async fn main_impl(
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
async fn client_libpq(
|
||||
async fn run_worker(
|
||||
args: &Args,
|
||||
worker_id: WorkerId,
|
||||
mut client: Box<dyn Client>,
|
||||
shared_state: Arc<SharedState>,
|
||||
cancel: CancellationToken,
|
||||
rps_period: Option<Duration>,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: rand::distributions::weighted::WeightedIndex<i128>,
|
||||
) {
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
@@ -415,12 +432,12 @@ async fn client_libpq(
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage_send(req).await.unwrap();
|
||||
client.send_get_page(req).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
}
|
||||
|
||||
let start = inflight.pop_front().unwrap();
|
||||
client.getpage_recv().await.unwrap();
|
||||
client.recv_get_page().await.unwrap();
|
||||
let end = Instant::now();
|
||||
shared_state.live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
@@ -442,3 +459,101 @@ async fn client_libpq(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A benchmark client, to allow switching out the transport protocol.
|
||||
///
|
||||
/// For simplicity, this just uses separate asynchronous send/recv methods. The send method could
|
||||
/// return a future that resolves when the response is received, but we don't really need it.
|
||||
#[async_trait]
|
||||
trait Client: Send {
|
||||
/// Sends an asynchronous GetPage request to the pageserver.
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()>;
|
||||
|
||||
/// Receives the next GetPage response from the pageserver.
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse>;
|
||||
}
|
||||
|
||||
/// A libpq-based Pageserver client.
|
||||
struct LibpqClient {
|
||||
inner: pageserver_client::page_service::PagestreamClient,
|
||||
}
|
||||
|
||||
impl LibpqClient {
|
||||
async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let inner = pageserver_client::page_service::Client::new(connstring)
|
||||
.await?
|
||||
.pagestream(ttid.tenant_id, ttid.timeline_id)
|
||||
.await?;
|
||||
Ok(Self { inner })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for LibpqClient {
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
|
||||
self.inner.getpage_send(req).await
|
||||
}
|
||||
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
self.inner.getpage_recv().await
|
||||
}
|
||||
}
|
||||
|
||||
/// A gRPC client using the raw, no-frills gRPC client.
|
||||
struct GrpcClient {
|
||||
req_tx: tokio::sync::mpsc::Sender<proto::GetPageRequest>,
|
||||
resp_rx: tonic::Streaming<proto::GetPageResponse>,
|
||||
}
|
||||
|
||||
impl GrpcClient {
|
||||
async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let mut client = pageserver_page_api::proto::PageServiceClient::connect(connstring).await?;
|
||||
|
||||
let (req_tx, req_rx) = tokio::sync::mpsc::channel(1);
|
||||
let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx);
|
||||
let mut req = tonic::Request::new(req_stream);
|
||||
let metadata = req.metadata_mut();
|
||||
metadata.insert("neon-tenant-id", ttid.tenant_id.to_string().try_into()?);
|
||||
metadata.insert("neon-timeline-id", ttid.timeline_id.to_string().try_into()?);
|
||||
metadata.insert("neon-shard-id", "0000".try_into()?);
|
||||
|
||||
let resp = client.get_pages(req).await?;
|
||||
let resp_stream = resp.into_inner();
|
||||
|
||||
Ok(Self {
|
||||
req_tx,
|
||||
resp_rx: resp_stream,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for GrpcClient {
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
|
||||
let req = proto::GetPageRequest {
|
||||
request_id: 0,
|
||||
request_class: proto::GetPageClass::Normal as i32,
|
||||
read_lsn: Some(proto::ReadLsn {
|
||||
request_lsn: req.hdr.request_lsn.0,
|
||||
not_modified_since_lsn: req.hdr.not_modified_since.0,
|
||||
}),
|
||||
rel: Some(req.rel.into()),
|
||||
block_number: vec![req.blkno],
|
||||
};
|
||||
self.req_tx.send(req).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
let resp = self.resp_rx.message().await?.unwrap();
|
||||
anyhow::ensure!(
|
||||
resp.status_code == proto::GetPageStatusCode::Ok as i32,
|
||||
"unexpected status code: {}",
|
||||
resp.status_code
|
||||
);
|
||||
Ok(PagestreamGetPageResponse {
|
||||
page: resp.page_image[0].clone(),
|
||||
req: PagestreamGetPageRequest::default(), // dummy
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields};
|
||||
use pageserver::controller_upcall_client::StorageControllerUpcallClient;
|
||||
use pageserver::deletion_queue::DeletionQueue;
|
||||
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
|
||||
use pageserver::feature_resolver::FeatureResolver;
|
||||
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
|
||||
use pageserver::task_mgr::{
|
||||
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
|
||||
@@ -388,23 +389,30 @@ fn start_pageserver(
|
||||
// We need to release the lock file only when the process exits.
|
||||
std::mem::forget(lock_file);
|
||||
|
||||
// Bind the HTTP and libpq ports early, so that if they are in use by some other
|
||||
// process, we error out early.
|
||||
let http_addr = &conf.listen_http_addr;
|
||||
info!("Starting pageserver http handler on {http_addr}");
|
||||
let http_listener = tcp_listener::bind(http_addr)?;
|
||||
// Bind the HTTP, libpq, and gRPC ports early, to error out if they are
|
||||
// already in use.
|
||||
info!(
|
||||
"Starting pageserver http handler on {} with auth {:#?}",
|
||||
conf.listen_http_addr, conf.http_auth_type
|
||||
);
|
||||
let http_listener = tcp_listener::bind(&conf.listen_http_addr)?;
|
||||
|
||||
let https_listener = match conf.listen_https_addr.as_ref() {
|
||||
Some(https_addr) => {
|
||||
info!("Starting pageserver https handler on {https_addr}");
|
||||
info!(
|
||||
"Starting pageserver https handler on {https_addr} with auth {:#?}",
|
||||
conf.http_auth_type
|
||||
);
|
||||
Some(tcp_listener::bind(https_addr)?)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let pg_addr = &conf.listen_pg_addr;
|
||||
info!("Starting pageserver pg protocol handler on {pg_addr}");
|
||||
let pageserver_listener = tcp_listener::bind(pg_addr)?;
|
||||
info!(
|
||||
"Starting pageserver pg protocol handler on {} with auth {:#?}",
|
||||
conf.listen_pg_addr, conf.pg_auth_type,
|
||||
);
|
||||
let pageserver_listener = tcp_listener::bind(&conf.listen_pg_addr)?;
|
||||
|
||||
// Enable SO_KEEPALIVE on the socket, to detect dead connections faster.
|
||||
// These are configured via net.ipv4.tcp_keepalive_* sysctls.
|
||||
@@ -413,6 +421,15 @@ fn start_pageserver(
|
||||
// support enabling keepalives while using the default OS sysctls.
|
||||
setsockopt(&pageserver_listener, sockopt::KeepAlive, &true)?;
|
||||
|
||||
let mut grpc_listener = None;
|
||||
if let Some(grpc_addr) = &conf.listen_grpc_addr {
|
||||
info!(
|
||||
"Starting pageserver gRPC handler on {grpc_addr} with auth {:#?}",
|
||||
conf.grpc_auth_type
|
||||
);
|
||||
grpc_listener = Some(tcp_listener::bind(grpc_addr).map_err(|e| anyhow!("{e}"))?);
|
||||
}
|
||||
|
||||
// Launch broker client
|
||||
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
|
||||
let broker_client = WALRECEIVER_RUNTIME
|
||||
@@ -440,7 +457,8 @@ fn start_pageserver(
|
||||
// Initialize authentication for incoming connections
|
||||
let http_auth;
|
||||
let pg_auth;
|
||||
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
|
||||
let grpc_auth;
|
||||
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type].contains(&AuthType::NeonJWT) {
|
||||
// unwrap is ok because check is performed when creating config, so path is set and exists
|
||||
let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
|
||||
info!("Loading public key(s) for verifying JWT tokens from {key_path:?}");
|
||||
@@ -448,20 +466,23 @@ fn start_pageserver(
|
||||
let jwt_auth = JwtAuth::from_key_path(key_path)?;
|
||||
let auth: Arc<SwappableJwtAuth> = Arc::new(SwappableJwtAuth::new(jwt_auth));
|
||||
|
||||
http_auth = match &conf.http_auth_type {
|
||||
http_auth = match conf.http_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth.clone()),
|
||||
};
|
||||
pg_auth = match &conf.pg_auth_type {
|
||||
pg_auth = match conf.pg_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth.clone()),
|
||||
};
|
||||
grpc_auth = match conf.grpc_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth),
|
||||
};
|
||||
} else {
|
||||
http_auth = None;
|
||||
pg_auth = None;
|
||||
grpc_auth = None;
|
||||
}
|
||||
info!("Using auth for http API: {:#?}", conf.http_auth_type);
|
||||
info!("Using auth for pg connections: {:#?}", conf.pg_auth_type);
|
||||
|
||||
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api
|
||||
{
|
||||
@@ -502,6 +523,12 @@ fn start_pageserver(
|
||||
// Set up remote storage client
|
||||
let remote_storage = BACKGROUND_RUNTIME.block_on(create_remote_storage_client(conf))?;
|
||||
|
||||
let feature_resolver = create_feature_resolver(
|
||||
conf,
|
||||
shutdown_pageserver.clone(),
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
)?;
|
||||
|
||||
// Set up deletion queue
|
||||
let (deletion_queue, deletion_workers) = DeletionQueue::new(
|
||||
remote_storage.clone(),
|
||||
@@ -555,6 +582,7 @@ fn start_pageserver(
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
feature_resolver,
|
||||
},
|
||||
order,
|
||||
shutdown_pageserver.clone(),
|
||||
@@ -779,6 +807,22 @@ fn start_pageserver(
|
||||
basebackup_cache,
|
||||
);
|
||||
|
||||
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for
|
||||
// each stream/request.
|
||||
//
|
||||
// TODO: this uses a separate Tokio runtime for the page service. If we want
|
||||
// other gRPC services, they will need their own port and runtime. Is this
|
||||
// necessary?
|
||||
let mut page_service_grpc = None;
|
||||
if let Some(grpc_listener) = grpc_listener {
|
||||
page_service_grpc = Some(page_service::spawn_grpc(
|
||||
tenant_manager.clone(),
|
||||
grpc_auth,
|
||||
otel_guard.as_ref().map(|g| g.dispatch.clone()),
|
||||
grpc_listener,
|
||||
)?);
|
||||
}
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
BACKGROUND_RUNTIME.block_on(async move {
|
||||
let signal_token = CancellationToken::new();
|
||||
@@ -797,6 +841,7 @@ fn start_pageserver(
|
||||
http_endpoint_listener,
|
||||
https_endpoint_listener,
|
||||
page_service,
|
||||
page_service_grpc,
|
||||
consumption_metrics_tasks,
|
||||
disk_usage_eviction_task,
|
||||
&tenant_manager,
|
||||
@@ -810,6 +855,14 @@ fn start_pageserver(
|
||||
})
|
||||
}
|
||||
|
||||
fn create_feature_resolver(
|
||||
conf: &'static PageServerConf,
|
||||
shutdown_pageserver: CancellationToken,
|
||||
handle: &tokio::runtime::Handle,
|
||||
) -> anyhow::Result<FeatureResolver> {
|
||||
FeatureResolver::spawn(conf, shutdown_pageserver, handle)
|
||||
}
|
||||
|
||||
async fn create_remote_storage_client(
|
||||
conf: &'static PageServerConf,
|
||||
) -> anyhow::Result<GenericRemoteStorage> {
|
||||
|
||||
@@ -14,7 +14,7 @@ use std::time::Duration;
|
||||
use anyhow::{Context, bail, ensure};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
|
||||
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes, PostHogConfig};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pem::Pem;
|
||||
@@ -58,11 +58,16 @@ pub struct PageServerConf {
|
||||
pub listen_http_addr: String,
|
||||
/// Example: 127.0.0.1:9899
|
||||
pub listen_https_addr: Option<String>,
|
||||
/// If set, expose a gRPC API on this address.
|
||||
/// Example: 127.0.0.1:51051
|
||||
///
|
||||
/// EXPERIMENTAL: this protocol is unstable and under active development.
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
|
||||
/// Path to a file with certificate's private key for https API.
|
||||
/// Path to a file with certificate's private key for https and gRPC API.
|
||||
/// Default: server.key
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
/// Path to a file with a X509 certificate for https API.
|
||||
/// Path to a file with a X509 certificate for https and gRPC API.
|
||||
/// Default: server.crt
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
/// Period to reload certificate and private key from files.
|
||||
@@ -100,6 +105,8 @@ pub struct PageServerConf {
|
||||
pub http_auth_type: AuthType,
|
||||
/// authentication method for libpq connections from compute
|
||||
pub pg_auth_type: AuthType,
|
||||
/// authentication method for gRPC connections from compute
|
||||
pub grpc_auth_type: AuthType,
|
||||
/// Path to a file or directory containing public key(s) for verifying JWT tokens.
|
||||
/// Used for both mgmt and compute auth, if enabled.
|
||||
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
|
||||
@@ -231,6 +238,9 @@ pub struct PageServerConf {
|
||||
/// This is insecure and should only be used in development environments.
|
||||
pub dev_mode: bool,
|
||||
|
||||
/// PostHog integration config.
|
||||
pub posthog_config: Option<PostHogConfig>,
|
||||
|
||||
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
|
||||
|
||||
pub basebackup_cache_config: Option<pageserver_api::config::BasebackupCacheConfig>,
|
||||
@@ -355,6 +365,7 @@ impl PageServerConf {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
ssl_key_file,
|
||||
ssl_cert_file,
|
||||
ssl_cert_reload_period,
|
||||
@@ -369,6 +380,7 @@ impl PageServerConf {
|
||||
pg_distrib_dir,
|
||||
http_auth_type,
|
||||
pg_auth_type,
|
||||
grpc_auth_type,
|
||||
auth_validation_public_key_path,
|
||||
remote_storage,
|
||||
broker_endpoint,
|
||||
@@ -412,6 +424,7 @@ impl PageServerConf {
|
||||
tracing,
|
||||
enable_tls_page_service_api,
|
||||
dev_mode,
|
||||
posthog_config,
|
||||
timeline_import_config,
|
||||
basebackup_cache_config,
|
||||
} = config_toml;
|
||||
@@ -423,6 +436,7 @@ impl PageServerConf {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
ssl_key_file,
|
||||
ssl_cert_file,
|
||||
ssl_cert_reload_period,
|
||||
@@ -435,6 +449,7 @@ impl PageServerConf {
|
||||
max_file_descriptors,
|
||||
http_auth_type,
|
||||
pg_auth_type,
|
||||
grpc_auth_type,
|
||||
auth_validation_public_key_path,
|
||||
remote_storage_config: remote_storage,
|
||||
broker_endpoint,
|
||||
@@ -525,13 +540,16 @@ impl PageServerConf {
|
||||
}
|
||||
None => Vec::new(),
|
||||
},
|
||||
posthog_config,
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// custom validation code that covers more than one field in isolation
|
||||
// ------------------------------------------------------------
|
||||
|
||||
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
|
||||
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type]
|
||||
.contains(&AuthType::NeonJWT)
|
||||
{
|
||||
let auth_validation_public_key_path = conf
|
||||
.auth_validation_public_key_path
|
||||
.get_or_insert_with(|| workdir.join("auth_public_key.pem"));
|
||||
|
||||
94
pageserver/src/feature_resolver.rs
Normal file
94
pageserver/src/feature_resolver.rs
Normal file
@@ -0,0 +1,94 @@
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
|
||||
use posthog_client_lite::{
|
||||
FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TenantId;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FeatureResolver {
|
||||
inner: Option<Arc<FeatureResolverBackgroundLoop>>,
|
||||
}
|
||||
|
||||
impl FeatureResolver {
|
||||
pub fn new_disabled() -> Self {
|
||||
Self { inner: None }
|
||||
}
|
||||
|
||||
pub fn spawn(
|
||||
conf: &PageServerConf,
|
||||
shutdown_pageserver: CancellationToken,
|
||||
handle: &tokio::runtime::Handle,
|
||||
) -> anyhow::Result<Self> {
|
||||
// DO NOT block in this function: make it return as fast as possible to avoid startup delays.
|
||||
if let Some(posthog_config) = &conf.posthog_config {
|
||||
let inner = FeatureResolverBackgroundLoop::new(
|
||||
PostHogClientConfig {
|
||||
server_api_key: posthog_config.server_api_key.clone(),
|
||||
client_api_key: posthog_config.client_api_key.clone(),
|
||||
project_id: posthog_config.project_id.clone(),
|
||||
private_api_url: posthog_config.private_api_url.clone(),
|
||||
public_api_url: posthog_config.public_api_url.clone(),
|
||||
},
|
||||
shutdown_pageserver,
|
||||
);
|
||||
let inner = Arc::new(inner);
|
||||
// TODO: make this configurable
|
||||
inner.clone().spawn(handle, Duration::from_secs(60));
|
||||
Ok(FeatureResolver { inner: Some(inner) })
|
||||
} else {
|
||||
Ok(FeatureResolver { inner: None })
|
||||
}
|
||||
}
|
||||
|
||||
/// Evaluate a multivariate feature flag. Currently, we do not support any properties.
|
||||
///
|
||||
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
|
||||
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
|
||||
/// propagated beyond where the feature flag gets resolved.
|
||||
pub fn evaluate_multivariate(
|
||||
&self,
|
||||
flag_key: &str,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<String, PostHogEvaluationError> {
|
||||
if let Some(inner) = &self.inner {
|
||||
inner.feature_store().evaluate_multivariate(
|
||||
flag_key,
|
||||
&tenant_id.to_string(),
|
||||
&HashMap::new(),
|
||||
)
|
||||
} else {
|
||||
Err(PostHogEvaluationError::NotAvailable(
|
||||
"PostHog integration is not enabled".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Evaluate a boolean feature flag. Currently, we do not support any properties.
|
||||
///
|
||||
/// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error.
|
||||
///
|
||||
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
|
||||
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
|
||||
/// propagated beyond where the feature flag gets resolved.
|
||||
pub fn evaluate_boolean(
|
||||
&self,
|
||||
flag_key: &str,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), PostHogEvaluationError> {
|
||||
if let Some(inner) = &self.inner {
|
||||
inner.feature_store().evaluate_boolean(
|
||||
flag_key,
|
||||
&tenant_id.to_string(),
|
||||
&HashMap::new(),
|
||||
)
|
||||
} else {
|
||||
Err(PostHogEvaluationError::NotAvailable(
|
||||
"PostHog integration is not enabled".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -353,6 +353,33 @@ paths:
|
||||
"200":
|
||||
description: OK
|
||||
|
||||
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/mark_invisible:
|
||||
parameters:
|
||||
- name: tenant_shard_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: timeline_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
put:
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
is_visible:
|
||||
type: boolean
|
||||
default: false
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
|
||||
/v1/tenant/{tenant_shard_id}/location_config:
|
||||
parameters:
|
||||
- name: tenant_shard_id
|
||||
@@ -626,6 +653,8 @@ paths:
|
||||
format: hex
|
||||
pg_version:
|
||||
type: integer
|
||||
read_only:
|
||||
type: boolean
|
||||
existing_initdb_timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
@@ -370,6 +370,18 @@ impl From<crate::tenant::secondary::SecondaryTenantError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::FinalizeTimelineImportError> for ApiError {
|
||||
fn from(err: crate::tenant::FinalizeTimelineImportError) -> ApiError {
|
||||
use crate::tenant::FinalizeTimelineImportError::*;
|
||||
match err {
|
||||
ImportTaskStillRunning => {
|
||||
ApiError::ResourceUnavailable("Import task still running".into())
|
||||
}
|
||||
ShuttingDown => ApiError::ShuttingDown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to construct a TimelineInfo struct for a timeline
|
||||
async fn build_timeline_info(
|
||||
timeline: &Arc<Timeline>,
|
||||
@@ -572,6 +584,7 @@ async fn timeline_create_handler(
|
||||
TimelineCreateRequestMode::Branch {
|
||||
ancestor_timeline_id,
|
||||
ancestor_start_lsn,
|
||||
read_only: _,
|
||||
pg_version: _,
|
||||
} => tenant::CreateTimelineParams::Branch(tenant::CreateTimelineParamsBranch {
|
||||
new_timeline_id,
|
||||
@@ -3532,10 +3545,7 @@ async fn activate_post_import_handler(
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
tenant
|
||||
.finalize_importing_timeline(timeline_id)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
tenant.finalize_importing_timeline(timeline_id).await?;
|
||||
|
||||
match tenant.get_timeline(timeline_id, false) {
|
||||
Ok(_timeline) => {
|
||||
|
||||
@@ -10,6 +10,7 @@ pub mod context;
|
||||
pub mod controller_upcall_client;
|
||||
pub mod deletion_queue;
|
||||
pub mod disk_usage_eviction_task;
|
||||
pub mod feature_resolver;
|
||||
pub mod http;
|
||||
pub mod import_datadir;
|
||||
pub mod l0_flush;
|
||||
@@ -84,6 +85,7 @@ pub async fn shutdown_pageserver(
|
||||
http_listener: HttpEndpointListener,
|
||||
https_listener: Option<HttpsEndpointListener>,
|
||||
page_service: page_service::Listener,
|
||||
grpc_task: Option<CancellableTask>,
|
||||
consumption_metrics_worker: ConsumptionMetricsTasks,
|
||||
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
|
||||
tenant_manager: &TenantManager,
|
||||
@@ -177,6 +179,16 @@ pub async fn shutdown_pageserver(
|
||||
)
|
||||
.await;
|
||||
|
||||
// Shut down the gRPC server task, including request handlers.
|
||||
if let Some(grpc_task) = grpc_task {
|
||||
timed(
|
||||
grpc_task.shutdown(),
|
||||
"shutdown gRPC PageRequestHandler",
|
||||
Duration::from_secs(3),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Shut down all the tenants. This flushes everything to disk and kills
|
||||
// the checkpoint and GC tasks.
|
||||
timed(
|
||||
|
||||
@@ -2234,8 +2234,10 @@ impl BasebackupQueryTimeOngoingRecording<'_> {
|
||||
// If you want to change categorize of a specific error, also change it in `log_query_error`.
|
||||
let metric = match res {
|
||||
Ok(_) => &self.parent.ok,
|
||||
Err(QueryError::Shutdown) => {
|
||||
// Do not observe ok/err for shutdown
|
||||
Err(QueryError::Shutdown) | Err(QueryError::Reconnect) => {
|
||||
// Do not observe ok/err for shutdown/reconnect.
|
||||
// Reconnect error might be raised when the operation is waiting for LSN and the tenant shutdown interrupts
|
||||
// the operation. A reconnect error will be issued and the client will retry.
|
||||
return;
|
||||
}
|
||||
Err(QueryError::Disconnected(ConnectionError::Io(io_error)))
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -274,7 +274,7 @@ impl Timeline {
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<Bytes, PageReconstructError>> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
//debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let mut slots_filled = 0;
|
||||
let page_count = pages.len();
|
||||
|
||||
@@ -276,9 +276,10 @@ pub enum TaskKind {
|
||||
// HTTP endpoint listener.
|
||||
HttpEndpointListener,
|
||||
|
||||
// Task that handles a single connection. A PageRequestHandler task
|
||||
// starts detached from any particular tenant or timeline, but it can be
|
||||
// associated with one later, after receiving a command from the client.
|
||||
/// Task that handles a single page service connection. A PageRequestHandler
|
||||
/// task starts detached from any particular tenant or timeline, but it can
|
||||
/// be associated with one later, after receiving a command from the client.
|
||||
/// Also used for the gRPC page service API, including the main server task.
|
||||
PageRequestHandler,
|
||||
|
||||
/// Manages the WAL receiver connection for one timeline.
|
||||
|
||||
@@ -84,6 +84,7 @@ use crate::context;
|
||||
use crate::context::RequestContextBuilder;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
|
||||
use crate::feature_resolver::FeatureResolver;
|
||||
use crate::l0_flush::L0FlushGlobalState;
|
||||
use crate::metrics::{
|
||||
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
|
||||
@@ -159,6 +160,7 @@ pub struct TenantSharedResources {
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
pub l0_flush_global_state: L0FlushGlobalState,
|
||||
pub basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
pub feature_resolver: FeatureResolver,
|
||||
}
|
||||
|
||||
/// A [`TenantShard`] is really an _attached_ tenant. The configuration
|
||||
@@ -380,6 +382,8 @@ pub struct TenantShard {
|
||||
pub(crate) gc_block: gc_block::GcBlock,
|
||||
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
|
||||
feature_resolver: FeatureResolver,
|
||||
}
|
||||
impl std::fmt::Debug for TenantShard {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
@@ -860,6 +864,14 @@ impl Debug for SetStoppingError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum FinalizeTimelineImportError {
|
||||
#[error("Import task not done yet")]
|
||||
ImportTaskStillRunning,
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
/// Arguments to [`TenantShard::create_timeline`].
|
||||
///
|
||||
/// Not usable as an idempotency key for timeline creation because if [`CreateTimelineParamsBranch::ancestor_start_lsn`]
|
||||
@@ -1146,10 +1158,20 @@ impl TenantShard {
|
||||
ctx,
|
||||
)?;
|
||||
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
|
||||
anyhow::ensure!(
|
||||
disk_consistent_lsn.is_valid(),
|
||||
"Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
|
||||
);
|
||||
|
||||
if !disk_consistent_lsn.is_valid() {
|
||||
// As opposed to normal timelines which get initialised with a disk consitent LSN
|
||||
// via initdb, imported timelines start from 0. If the import task stops before
|
||||
// it advances disk consitent LSN, allow it to resume.
|
||||
let in_progress_import = import_pgdata
|
||||
.as_ref()
|
||||
.map(|import| !import.is_done())
|
||||
.unwrap_or(false);
|
||||
if !in_progress_import {
|
||||
anyhow::bail!("Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn");
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
disk_consistent_lsn,
|
||||
metadata.disk_consistent_lsn(),
|
||||
@@ -1243,20 +1265,25 @@ impl TenantShard {
|
||||
}
|
||||
}
|
||||
|
||||
// Sanity check: a timeline should have some content.
|
||||
anyhow::ensure!(
|
||||
ancestor.is_some()
|
||||
|| timeline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.expect("currently loading, layer manager cannot be shutdown already")
|
||||
.iter_historic_layers()
|
||||
.next()
|
||||
.is_some(),
|
||||
"Timeline has no ancestor and no layer files"
|
||||
);
|
||||
if disk_consistent_lsn.is_valid() {
|
||||
// Sanity check: a timeline should have some content.
|
||||
// Exception: importing timelines might not yet have any
|
||||
anyhow::ensure!(
|
||||
ancestor.is_some()
|
||||
|| timeline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.expect(
|
||||
"currently loading, layer manager cannot be shutdown already"
|
||||
)
|
||||
.iter_historic_layers()
|
||||
.next()
|
||||
.is_some(),
|
||||
"Timeline has no ancestor and no layer files"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(TimelineInitAndSyncResult::ReadyToActivate)
|
||||
}
|
||||
@@ -1292,6 +1319,7 @@ impl TenantShard {
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
feature_resolver,
|
||||
} = resources;
|
||||
|
||||
let attach_mode = attached_conf.location.attach_mode;
|
||||
@@ -1308,6 +1336,7 @@ impl TenantShard {
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
feature_resolver,
|
||||
));
|
||||
|
||||
// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
|
||||
@@ -2854,13 +2883,13 @@ impl TenantShard {
|
||||
pub(crate) async fn finalize_importing_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), FinalizeTimelineImportError> {
|
||||
let timeline = {
|
||||
let locked = self.timelines_importing.lock().unwrap();
|
||||
match locked.get(&timeline_id) {
|
||||
Some(importing_timeline) => {
|
||||
if !importing_timeline.import_task_handle.is_finished() {
|
||||
return Err(anyhow::anyhow!("Import task not done yet"));
|
||||
return Err(FinalizeTimelineImportError::ImportTaskStillRunning);
|
||||
}
|
||||
|
||||
importing_timeline.timeline.clone()
|
||||
@@ -2873,8 +2902,13 @@ impl TenantShard {
|
||||
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_import_pgdata_finalize()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
.schedule_index_upload_for_import_pgdata_finalize()
|
||||
.map_err(|_err| FinalizeTimelineImportError::ShuttingDown)?;
|
||||
timeline
|
||||
.remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.map_err(|_err| FinalizeTimelineImportError::ShuttingDown)?;
|
||||
|
||||
self.timelines_importing
|
||||
.lock()
|
||||
@@ -3135,11 +3169,18 @@ impl TenantShard {
|
||||
.or_insert_with(|| Arc::new(GcCompactionQueue::new()))
|
||||
.clone()
|
||||
};
|
||||
let gc_compaction_strategy = self
|
||||
.feature_resolver
|
||||
.evaluate_multivariate("gc-comapction-strategy", self.tenant_shard_id.tenant_id)
|
||||
.ok();
|
||||
let span = if let Some(gc_compaction_strategy) = gc_compaction_strategy {
|
||||
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id, strategy = %gc_compaction_strategy)
|
||||
} else {
|
||||
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id)
|
||||
};
|
||||
outcome = queue
|
||||
.iteration(cancel, ctx, &self.gc_block, &timeline)
|
||||
.instrument(
|
||||
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id),
|
||||
)
|
||||
.instrument(span)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -3471,8 +3512,9 @@ impl TenantShard {
|
||||
let mut timelines_importing = self.timelines_importing.lock().unwrap();
|
||||
timelines_importing
|
||||
.drain()
|
||||
.for_each(|(_timeline_id, importing_timeline)| {
|
||||
importing_timeline.shutdown();
|
||||
.for_each(|(timeline_id, importing_timeline)| {
|
||||
let span = tracing::info_span!("importing_timeline_shutdown", %timeline_id);
|
||||
js.spawn(async move { importing_timeline.shutdown().instrument(span).await });
|
||||
});
|
||||
}
|
||||
// test_long_timeline_create_then_tenant_delete is leaning on this message
|
||||
@@ -4247,6 +4289,7 @@ impl TenantShard {
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
feature_resolver: FeatureResolver,
|
||||
) -> TenantShard {
|
||||
assert!(!attached_conf.location.generation.is_none());
|
||||
|
||||
@@ -4351,6 +4394,7 @@ impl TenantShard {
|
||||
gc_block: Default::default(),
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
feature_resolver,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5271,6 +5315,7 @@ impl TenantShard {
|
||||
l0_compaction_trigger: self.l0_compaction_trigger.clone(),
|
||||
l0_flush_global_state: self.l0_flush_global_state.clone(),
|
||||
basebackup_prepare_sender: self.basebackup_prepare_sender.clone(),
|
||||
feature_resolver: self.feature_resolver.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5873,6 +5918,7 @@ pub(crate) mod harness {
|
||||
// TODO: ideally we should run all unit tests with both configs
|
||||
L0FlushGlobalState::new(L0FlushConfig::default()),
|
||||
basebackup_requst_sender,
|
||||
FeatureResolver::new_disabled(),
|
||||
));
|
||||
|
||||
let preload = tenant
|
||||
@@ -8314,10 +8360,24 @@ mod tests {
|
||||
}
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
// Force layers to L1
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
{
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceL0Compaction);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if iter % 5 == 0 {
|
||||
let scan_lsn = Lsn(lsn.0 + 1);
|
||||
info!("scanning at {}", scan_lsn);
|
||||
let (_, before_delta_file_accessed) =
|
||||
scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone())
|
||||
scan_with_statistics(&tline, &keyspace, scan_lsn, &ctx, io_concurrency.clone())
|
||||
.await?;
|
||||
tline
|
||||
.compact(
|
||||
@@ -8326,13 +8386,14 @@ mod tests {
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceImageLayerCreation);
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags.insert(CompactFlags::ForceL0Compaction);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
let (_, after_delta_file_accessed) =
|
||||
scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone())
|
||||
scan_with_statistics(&tline, &keyspace, scan_lsn, &ctx, io_concurrency.clone())
|
||||
.await?;
|
||||
assert!(
|
||||
after_delta_file_accessed < before_delta_file_accessed,
|
||||
@@ -8773,6 +8834,8 @@ mod tests {
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// Image layer creation happens on the disk_consistent_lsn so we need to force set it now.
|
||||
tline.force_set_disk_consistent_lsn(Lsn(0x40));
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
@@ -8786,8 +8849,7 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Image layers are created at last_record_lsn
|
||||
// Image layers are created at repartition LSN
|
||||
let images = tline
|
||||
.inspect_image_layers(Lsn(0x40), &ctx, io_concurrency.clone())
|
||||
.await
|
||||
|
||||
@@ -103,6 +103,7 @@ use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
|
||||
use crate::feature_resolver::FeatureResolver;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
use crate::metrics::{
|
||||
@@ -198,6 +199,7 @@ pub struct TimelineResources {
|
||||
pub l0_compaction_trigger: Arc<Notify>,
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
pub basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
pub feature_resolver: FeatureResolver,
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
@@ -444,6 +446,8 @@ pub struct Timeline {
|
||||
|
||||
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
|
||||
basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
|
||||
feature_resolver: FeatureResolver,
|
||||
}
|
||||
|
||||
pub(crate) enum PreviousHeatmap {
|
||||
@@ -3072,6 +3076,8 @@ impl Timeline {
|
||||
wait_lsn_log_slow: tokio::sync::Semaphore::new(1),
|
||||
|
||||
basebackup_prepare_sender: resources.basebackup_prepare_sender,
|
||||
|
||||
feature_resolver: resources.feature_resolver,
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
@@ -4906,6 +4912,7 @@ impl Timeline {
|
||||
LastImageLayerCreationStatus::Initial,
|
||||
false, // don't yield for L0, we're flushing L0
|
||||
)
|
||||
.instrument(info_span!("create_image_layers", mode = %ImageLayerCreationMode::Initial, partition_mode = "initial", lsn = %self.initdb_lsn))
|
||||
.await?;
|
||||
debug_assert!(
|
||||
matches!(is_complete, LastImageLayerCreationStatus::Complete),
|
||||
@@ -5462,7 +5469,8 @@ impl Timeline {
|
||||
|
||||
/// Returns the image layers generated and an enum indicating whether the process is fully completed.
|
||||
/// true = we have generate all image layers, false = we preempt the process for L0 compaction.
|
||||
#[tracing::instrument(skip_all, fields(%lsn, %mode))]
|
||||
///
|
||||
/// `partition_mode` is only for logging purpose and is not used anywhere in this function.
|
||||
async fn create_image_layers(
|
||||
self: &Arc<Timeline>,
|
||||
partitioning: &KeyPartitioning,
|
||||
|
||||
@@ -1278,11 +1278,55 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let gc_cutoff = *self.applied_gc_cutoff_lsn.read();
|
||||
let l0_l1_boundary_lsn = {
|
||||
// We do the repartition on the L0-L1 boundary. All data below the boundary
|
||||
// are compacted by L0 with low read amplification, thus making the `repartition`
|
||||
// function run fast.
|
||||
let guard = self.layers.read().await;
|
||||
guard
|
||||
.all_persistent_layers()
|
||||
.iter()
|
||||
.map(|x| {
|
||||
// Use the end LSN of delta layers OR the start LSN of image layers.
|
||||
if x.is_delta {
|
||||
x.lsn_range.end
|
||||
} else {
|
||||
x.lsn_range.start
|
||||
}
|
||||
})
|
||||
.max()
|
||||
};
|
||||
|
||||
let (partition_mode, partition_lsn) = if cfg!(test)
|
||||
|| cfg!(feature = "testing")
|
||||
|| self
|
||||
.feature_resolver
|
||||
.evaluate_boolean("image-compaction-boundary", self.tenant_shard_id.tenant_id)
|
||||
.is_ok()
|
||||
{
|
||||
let last_repartition_lsn = self.partitioning.read().1;
|
||||
let lsn = match l0_l1_boundary_lsn {
|
||||
Some(boundary) => gc_cutoff
|
||||
.max(boundary)
|
||||
.max(last_repartition_lsn)
|
||||
.max(self.initdb_lsn)
|
||||
.max(self.ancestor_lsn),
|
||||
None => self.get_last_record_lsn(),
|
||||
};
|
||||
if lsn <= self.initdb_lsn || lsn <= self.ancestor_lsn {
|
||||
// Do not attempt to create image layers below the initdb or ancestor LSN -- no data below it
|
||||
("l0_l1_boundary", self.get_last_record_lsn())
|
||||
} else {
|
||||
("l0_l1_boundary", lsn)
|
||||
}
|
||||
} else {
|
||||
("latest_record", self.get_last_record_lsn())
|
||||
};
|
||||
|
||||
// 2. Repartition and create image layers if necessary
|
||||
match self
|
||||
.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
partition_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
options.flags,
|
||||
ctx,
|
||||
@@ -1301,18 +1345,19 @@ impl Timeline {
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
|
||||
// 3. Create new image layers for partitions that have been modified "enough".
|
||||
let mode = if options
|
||||
.flags
|
||||
.contains(CompactFlags::ForceImageLayerCreation)
|
||||
{
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
};
|
||||
let (image_layers, outcome) = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
if options
|
||||
.flags
|
||||
.contains(CompactFlags::ForceImageLayerCreation)
|
||||
{
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
mode,
|
||||
&image_ctx,
|
||||
self.last_image_layer_creation_status
|
||||
.load()
|
||||
@@ -1320,6 +1365,7 @@ impl Timeline {
|
||||
.clone(),
|
||||
options.flags.contains(CompactFlags::YieldForL0),
|
||||
)
|
||||
.instrument(info_span!("create_image_layers", mode = %mode, partition_mode = %partition_mode, lsn = %lsn))
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
if let CreateImageLayersError::GetVectoredError(
|
||||
@@ -1344,7 +1390,8 @@ impl Timeline {
|
||||
}
|
||||
|
||||
Ok(_) => {
|
||||
info!("skipping repartitioning due to image compaction LSN being below GC cutoff");
|
||||
// This happens very frequently so we don't want to log it.
|
||||
debug!("skipping repartitioning due to image compaction LSN being below GC cutoff");
|
||||
}
|
||||
|
||||
// Suppress errors when cancelled.
|
||||
|
||||
@@ -25,8 +25,11 @@ pub(crate) struct ImportingTimeline {
|
||||
}
|
||||
|
||||
impl ImportingTimeline {
|
||||
pub(crate) fn shutdown(self) {
|
||||
pub(crate) async fn shutdown(self) {
|
||||
self.import_task_handle.abort();
|
||||
let _ = self.import_task_handle.await;
|
||||
|
||||
self.timeline.remote_client.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,6 +96,11 @@ pub async fn doit(
|
||||
);
|
||||
}
|
||||
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_file_changes()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
|
||||
// Communicate that shard is done.
|
||||
// Ensure at-least-once delivery of the upcall to storage controller
|
||||
// before we mark the task as done and never come here again.
|
||||
|
||||
@@ -113,14 +113,14 @@ async fn run_v1(
|
||||
let plan_hash = hasher.finish();
|
||||
|
||||
if let Some(progress) = &import_progress {
|
||||
if plan_hash != progress.import_plan_hash {
|
||||
anyhow::bail!("Import plan does not match storcon metadata");
|
||||
}
|
||||
|
||||
// Handle collisions on jobs of unequal length
|
||||
if progress.jobs != plan.jobs.len() {
|
||||
anyhow::bail!("Import plan job length does not match storcon metadata")
|
||||
}
|
||||
|
||||
if plan_hash != progress.import_plan_hash {
|
||||
anyhow::bail!("Import plan does not match storcon metadata");
|
||||
}
|
||||
}
|
||||
|
||||
pausable_failpoint!("import-timeline-pre-execute-pausable");
|
||||
@@ -218,6 +218,19 @@ impl Planner {
|
||||
checkpoint_buf,
|
||||
)));
|
||||
|
||||
// Sort the tasks by the key ranges they handle.
|
||||
// The plan being generated here needs to be stable across invocations
|
||||
// of this method.
|
||||
self.tasks.sort_by_key(|task| match task {
|
||||
AnyImportTask::SingleKey(key) => (key.key, key.key.next()),
|
||||
AnyImportTask::RelBlocks(rel_blocks) => {
|
||||
(rel_blocks.key_range.start, rel_blocks.key_range.end)
|
||||
}
|
||||
AnyImportTask::SlruBlocks(slru_blocks) => {
|
||||
(slru_blocks.key_range.start, slru_blocks.key_range.end)
|
||||
}
|
||||
});
|
||||
|
||||
// Assigns parts of key space to later parallel jobs
|
||||
let mut last_end_key = Key::MIN;
|
||||
let mut current_chunk = Vec::new();
|
||||
@@ -426,6 +439,8 @@ impl Plan {
|
||||
}));
|
||||
},
|
||||
maybe_complete_job_idx = work.next() => {
|
||||
pausable_failpoint!("import-task-complete-pausable");
|
||||
|
||||
match maybe_complete_job_idx {
|
||||
Some(Ok((job_idx, res))) => {
|
||||
assert!(last_completed_job_idx.checked_add(1).unwrap() == job_idx);
|
||||
@@ -440,6 +455,9 @@ impl Plan {
|
||||
import_plan_hash,
|
||||
};
|
||||
|
||||
timeline.remote_client.schedule_index_upload_for_file_changes()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
|
||||
storcon_client.put_timeline_import_status(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
@@ -640,7 +658,11 @@ impl Hash for ImportSingleKeyTask {
|
||||
let ImportSingleKeyTask { key, buf } = self;
|
||||
|
||||
key.hash(state);
|
||||
buf.hash(state);
|
||||
// The key value might not have a stable binary representation.
|
||||
// For instance, the db directory uses an unstable hash-map.
|
||||
// To work around this we are a bit lax here and only hash the
|
||||
// size of the buffer which must be consistent.
|
||||
buf.len().hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -915,7 +937,7 @@ impl ChunkProcessingJob {
|
||||
let guard = timeline.layers.read().await;
|
||||
let existing_layer = guard.try_get_from_key(&desc.key());
|
||||
if let Some(layer) = existing_layer {
|
||||
if layer.metadata().generation != timeline.generation {
|
||||
if layer.metadata().generation == timeline.generation {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Import attempted to rewrite layer file in the same generation: {}",
|
||||
layer.local_path()
|
||||
|
||||
@@ -155,8 +155,9 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
int written = 0;
|
||||
|
||||
written = snprintf((char *) &sk->conninfo, MAXCONNINFO,
|
||||
"host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
|
||||
sk->host, sk->port, wp->config->neon_timeline, wp->config->neon_tenant);
|
||||
"%s host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
|
||||
wp->config->safekeeper_conninfo_options, sk->host, sk->port,
|
||||
wp->config->neon_timeline, wp->config->neon_tenant);
|
||||
if (written > MAXCONNINFO || written < 0)
|
||||
wp_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port);
|
||||
}
|
||||
|
||||
@@ -714,6 +714,9 @@ typedef struct WalProposerConfig
|
||||
*/
|
||||
char *safekeepers_list;
|
||||
|
||||
/* libpq connection info options. */
|
||||
char *safekeeper_conninfo_options;
|
||||
|
||||
/*
|
||||
* WalProposer reconnects to offline safekeepers once in this interval.
|
||||
* Time is in milliseconds.
|
||||
|
||||
@@ -64,6 +64,7 @@ char *wal_acceptors_list = "";
|
||||
int wal_acceptor_reconnect_timeout = 1000;
|
||||
int wal_acceptor_connection_timeout = 10000;
|
||||
int safekeeper_proto_version = 3;
|
||||
char *safekeeper_conninfo_options = "";
|
||||
|
||||
/* Set to true in the walproposer bgw. */
|
||||
static bool am_walproposer;
|
||||
@@ -119,6 +120,7 @@ init_walprop_config(bool syncSafekeepers)
|
||||
walprop_config.neon_timeline = neon_timeline;
|
||||
/* WalProposerCreate scribbles directly on it, so pstrdup */
|
||||
walprop_config.safekeepers_list = pstrdup(wal_acceptors_list);
|
||||
walprop_config.safekeeper_conninfo_options = pstrdup(safekeeper_conninfo_options);
|
||||
walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout;
|
||||
walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout;
|
||||
walprop_config.wal_segment_size = wal_segment_size;
|
||||
@@ -203,6 +205,16 @@ nwp_register_gucs(void)
|
||||
* GUC_LIST_QUOTE */
|
||||
NULL, assign_neon_safekeepers, NULL);
|
||||
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_conninfo_options",
|
||||
"libpq keyword parameters and values to apply to safekeeper connections",
|
||||
NULL,
|
||||
&safekeeper_conninfo_options,
|
||||
"",
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.safekeeper_reconnect_timeout",
|
||||
"Walproposer reconnects to offline safekeepers once in this interval.",
|
||||
|
||||
@@ -14,7 +14,9 @@ use hyper::http::{HeaderName, HeaderValue};
|
||||
use hyper::{HeaderMap, Request, Response, StatusCode, header};
|
||||
use indexmap::IndexMap;
|
||||
use postgres_client::error::{DbError, ErrorPosition, SqlState};
|
||||
use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
|
||||
use postgres_client::{
|
||||
GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, RowStream, Transaction,
|
||||
};
|
||||
use pq_proto::StartupMessageParamsBuilder;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
@@ -1092,22 +1094,41 @@ async fn query_to_json<T: GenericClient>(
|
||||
let query_start = Instant::now();
|
||||
|
||||
let query_params = data.params;
|
||||
let mut row_stream = std::pin::pin!(
|
||||
client
|
||||
.query_raw_txt(&data.query, query_params)
|
||||
.await
|
||||
.map_err(SqlOverHttpError::Postgres)?
|
||||
);
|
||||
let mut row_stream = client
|
||||
.query_raw_txt(&data.query, query_params)
|
||||
.await
|
||||
.map_err(SqlOverHttpError::Postgres)?;
|
||||
let query_acknowledged = Instant::now();
|
||||
|
||||
let columns_len = row_stream.statement.columns().len();
|
||||
let mut fields = Vec::with_capacity(columns_len);
|
||||
let mut types = Vec::with_capacity(columns_len);
|
||||
|
||||
for c in row_stream.statement.columns() {
|
||||
fields.push(json!({
|
||||
"name": c.name().to_owned(),
|
||||
"dataTypeID": c.type_().oid(),
|
||||
"tableID": c.table_oid(),
|
||||
"columnID": c.column_id(),
|
||||
"dataTypeSize": c.type_size(),
|
||||
"dataTypeModifier": c.type_modifier(),
|
||||
"format": "text",
|
||||
}));
|
||||
|
||||
types.push(c.type_().clone());
|
||||
}
|
||||
|
||||
let raw_output = parsed_headers.raw_output;
|
||||
let array_mode = data.array_mode.unwrap_or(parsed_headers.default_array_mode);
|
||||
|
||||
// Manually drain the stream into a vector to leave row_stream hanging
|
||||
// around to get a command tag. Also check that the response is not too
|
||||
// big.
|
||||
let mut rows: Vec<postgres_client::Row> = Vec::new();
|
||||
let mut rows = Vec::new();
|
||||
while let Some(row) = row_stream.next().await {
|
||||
let row = row.map_err(SqlOverHttpError::Postgres)?;
|
||||
*current_size += row.body_len();
|
||||
rows.push(row);
|
||||
|
||||
// we don't have a streaming response support yet so this is to prevent OOM
|
||||
// from a malicious query (eg a cross join)
|
||||
if *current_size > config.max_response_size_bytes {
|
||||
@@ -1115,13 +1136,26 @@ async fn query_to_json<T: GenericClient>(
|
||||
config.max_response_size_bytes,
|
||||
));
|
||||
}
|
||||
|
||||
let row = pg_text_row_to_json(&row, &types, raw_output, array_mode)?;
|
||||
rows.push(row);
|
||||
|
||||
// assumption: parsing pg text and converting to json takes CPU time.
|
||||
// let's assume it is slightly expensive, so we should consume some cooperative budget.
|
||||
// Especially considering that `RowStream::next` might be pulling from a batch
|
||||
// of rows and never hit the tokio mpsc for a long time (although unlikely).
|
||||
tokio::task::consume_budget().await;
|
||||
}
|
||||
|
||||
let query_resp_end = Instant::now();
|
||||
let ready = row_stream.ready_status();
|
||||
let RowStream {
|
||||
command_tag,
|
||||
status: ready,
|
||||
..
|
||||
} = row_stream;
|
||||
|
||||
// grab the command tag and number of rows affected
|
||||
let command_tag = row_stream.command_tag().unwrap_or_default();
|
||||
let command_tag = command_tag.unwrap_or_default();
|
||||
let mut command_tag_split = command_tag.split(' ');
|
||||
let command_tag_name = command_tag_split.next().unwrap_or_default();
|
||||
let command_tag_count = if command_tag_name == "INSERT" {
|
||||
@@ -1142,38 +1176,6 @@ async fn query_to_json<T: GenericClient>(
|
||||
"finished executing query"
|
||||
);
|
||||
|
||||
let columns_len = row_stream.columns().len();
|
||||
let mut fields = Vec::with_capacity(columns_len);
|
||||
let mut columns = Vec::with_capacity(columns_len);
|
||||
|
||||
for c in row_stream.columns() {
|
||||
fields.push(json!({
|
||||
"name": c.name().to_owned(),
|
||||
"dataTypeID": c.type_().oid(),
|
||||
"tableID": c.table_oid(),
|
||||
"columnID": c.column_id(),
|
||||
"dataTypeSize": c.type_size(),
|
||||
"dataTypeModifier": c.type_modifier(),
|
||||
"format": "text",
|
||||
}));
|
||||
|
||||
match client.get_type(c.type_oid()).await {
|
||||
Ok(t) => columns.push(t),
|
||||
Err(err) => {
|
||||
tracing::warn!(?err, "unable to query type information");
|
||||
return Err(SqlOverHttpError::InternalPostgres(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let array_mode = data.array_mode.unwrap_or(parsed_headers.default_array_mode);
|
||||
|
||||
// convert rows to JSON
|
||||
let rows = rows
|
||||
.iter()
|
||||
.map(|row| pg_text_row_to_json(row, &columns, parsed_headers.raw_output, array_mode))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
// Resulting JSON format is based on the format of node-postgres result.
|
||||
let results = json!({
|
||||
"command": command_tag_name.to_string(),
|
||||
|
||||
@@ -87,6 +87,7 @@ impl WalProposer {
|
||||
let config = Config {
|
||||
ttid,
|
||||
safekeepers_list: addrs,
|
||||
safekeeper_conninfo_options: String::new(),
|
||||
safekeeper_reconnect_timeout: 1000,
|
||||
safekeeper_connection_timeout: 5000,
|
||||
sync_safekeepers,
|
||||
|
||||
@@ -3823,6 +3823,13 @@ impl Service {
|
||||
.await;
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
let is_import = create_req.is_import();
|
||||
let read_only = matches!(
|
||||
create_req.mode,
|
||||
models::TimelineCreateRequestMode::Branch {
|
||||
read_only: true,
|
||||
..
|
||||
}
|
||||
);
|
||||
|
||||
if is_import {
|
||||
// Ensure that there is no split on-going.
|
||||
@@ -3895,13 +3902,13 @@ impl Service {
|
||||
}
|
||||
|
||||
None
|
||||
} else if safekeepers {
|
||||
} else if safekeepers || read_only {
|
||||
// Note that for imported timelines, we do not create the timeline on the safekeepers
|
||||
// straight away. Instead, we do it once the import finalized such that we know what
|
||||
// start LSN to provide for the safekeepers. This is done in
|
||||
// [`Self::finalize_timeline_import`].
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
|
||||
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
|
||||
.await?;
|
||||
Some(res)
|
||||
@@ -3915,6 +3922,11 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(
|
||||
tenant_id=%req.tenant_shard_id.tenant_id,
|
||||
shard_id=%req.tenant_shard_id.shard_slug(),
|
||||
timeline_id=%req.timeline_id,
|
||||
))]
|
||||
pub(crate) async fn handle_timeline_shard_import_progress(
|
||||
self: &Arc<Self>,
|
||||
req: TimelineImportStatusRequest,
|
||||
@@ -3964,6 +3976,11 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(
|
||||
tenant_id=%req.tenant_shard_id.tenant_id,
|
||||
shard_id=%req.tenant_shard_id.shard_slug(),
|
||||
timeline_id=%req.timeline_id,
|
||||
))]
|
||||
pub(crate) async fn handle_timeline_shard_import_progress_upcall(
|
||||
self: &Arc<Self>,
|
||||
req: PutTimelineImportStatusRequest,
|
||||
|
||||
@@ -208,6 +208,7 @@ impl Service {
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_info: &TimelineInfo,
|
||||
read_only: bool,
|
||||
) -> Result<SafekeepersInfo, ApiError> {
|
||||
let timeline_id = timeline_info.timeline_id;
|
||||
let pg_version = timeline_info.pg_version * 10000;
|
||||
@@ -220,7 +221,11 @@ impl Service {
|
||||
let start_lsn = timeline_info.last_record_lsn;
|
||||
|
||||
// Choose initial set of safekeepers respecting affinity
|
||||
let sks = self.safekeepers_for_new_timeline().await?;
|
||||
let sks = if !read_only {
|
||||
self.safekeepers_for_new_timeline().await?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
|
||||
// Add timeline to db
|
||||
let mut timeline_persist = TimelinePersistence {
|
||||
@@ -253,6 +258,16 @@ impl Service {
|
||||
)));
|
||||
}
|
||||
}
|
||||
let ret = SafekeepersInfo {
|
||||
generation: timeline_persist.generation as u32,
|
||||
safekeepers: sks.clone(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
if read_only {
|
||||
return Ok(ret);
|
||||
}
|
||||
|
||||
// Create the timeline on a quorum of safekeepers
|
||||
let remaining = self
|
||||
.tenant_timeline_create_safekeepers_quorum(
|
||||
@@ -316,12 +331,7 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(SafekeepersInfo {
|
||||
generation: timeline_persist.generation as u32,
|
||||
safekeepers: sks,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
|
||||
@@ -336,8 +346,10 @@ impl Service {
|
||||
return Err(TimelineImportFinalizeError::ShuttingDown);
|
||||
}
|
||||
|
||||
// This function is only used in non-read-only scenarios
|
||||
let read_only = false;
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
@@ -410,6 +422,18 @@ impl Service {
|
||||
.chain(tl.sk_set.iter())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
// The timeline has no safekeepers: we need to delete it from the db manually,
|
||||
// as no safekeeper reconciler will get to it
|
||||
if all_sks.is_empty() {
|
||||
if let Err(err) = self
|
||||
.persistence
|
||||
.delete_timeline(tenant_id, timeline_id)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
// Schedule reconciliations
|
||||
for &sk_id in all_sks.iter() {
|
||||
let pending_op = TimelinePendingOpPersistence {
|
||||
|
||||
@@ -404,6 +404,29 @@ class PageserverTracingConfig:
|
||||
return ("tracing", value)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageserverImportConfig:
|
||||
import_job_concurrency: int
|
||||
import_job_soft_size_limit: int
|
||||
import_job_checkpoint_threshold: int
|
||||
|
||||
@staticmethod
|
||||
def default() -> PageserverImportConfig:
|
||||
return PageserverImportConfig(
|
||||
import_job_concurrency=4,
|
||||
import_job_soft_size_limit=512 * 1024,
|
||||
import_job_checkpoint_threshold=4,
|
||||
)
|
||||
|
||||
def to_config_key_value(self) -> tuple[str, dict[str, Any]]:
|
||||
value = {
|
||||
"import_job_concurrency": self.import_job_concurrency,
|
||||
"import_job_soft_size_limit": self.import_job_soft_size_limit,
|
||||
"import_job_checkpoint_threshold": self.import_job_checkpoint_threshold,
|
||||
}
|
||||
return ("timeline_import_config", value)
|
||||
|
||||
|
||||
class NeonEnvBuilder:
|
||||
"""
|
||||
Builder object to create a Neon runtime environment
|
||||
@@ -454,6 +477,7 @@ class NeonEnvBuilder:
|
||||
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
|
||||
pageserver_get_vectored_concurrent_io: str | None = None,
|
||||
pageserver_tracing_config: PageserverTracingConfig | None = None,
|
||||
pageserver_import_config: PageserverImportConfig | None = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -511,6 +535,7 @@ class NeonEnvBuilder:
|
||||
)
|
||||
|
||||
self.pageserver_tracing_config = pageserver_tracing_config
|
||||
self.pageserver_import_config = pageserver_import_config
|
||||
|
||||
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
|
||||
pageserver_default_tenant_config_compaction_algorithm
|
||||
@@ -1179,6 +1204,10 @@ class NeonEnv:
|
||||
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
|
||||
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
|
||||
self.pageserver_tracing_config = config.pageserver_tracing_config
|
||||
if config.pageserver_import_config is None:
|
||||
self.pageserver_import_config = PageserverImportConfig.default()
|
||||
else:
|
||||
self.pageserver_import_config = config.pageserver_import_config
|
||||
|
||||
# Create the neon_local's `NeonLocalInitConf`
|
||||
cfg: dict[str, Any] = {
|
||||
@@ -1224,6 +1253,7 @@ class NeonEnv:
|
||||
# Create config for pageserver
|
||||
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
grpc_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
for ps_id in range(
|
||||
self.BASE_PAGESERVER_ID, self.BASE_PAGESERVER_ID + config.num_pageservers
|
||||
):
|
||||
@@ -1250,18 +1280,13 @@ class NeonEnv:
|
||||
else None,
|
||||
"pg_auth_type": pg_auth_type,
|
||||
"http_auth_type": http_auth_type,
|
||||
"grpc_auth_type": grpc_auth_type,
|
||||
"availability_zone": availability_zone,
|
||||
# Disable pageserver disk syncs in tests: when running tests concurrently, this avoids
|
||||
# the pageserver taking a long time to start up due to syncfs flushing other tests' data
|
||||
"no_sync": True,
|
||||
# Look for gaps in WAL received from safekeepeers
|
||||
"validate_wal_contiguity": True,
|
||||
# TODO(vlad): make these configurable through the builder
|
||||
"timeline_import_config": {
|
||||
"import_job_concurrency": 4,
|
||||
"import_job_soft_size_limit": 512 * 1024,
|
||||
"import_job_checkpoint_threshold": 4,
|
||||
},
|
||||
}
|
||||
|
||||
# Batching (https://github.com/neondatabase/neon/issues/9377):
|
||||
@@ -1323,6 +1348,12 @@ class NeonEnv:
|
||||
|
||||
ps_cfg[key] = value
|
||||
|
||||
if self.pageserver_import_config is not None:
|
||||
key, value = self.pageserver_import_config.to_config_key_value()
|
||||
|
||||
if key not in ps_cfg:
|
||||
ps_cfg[key] = value
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
ps = NeonPageserver(
|
||||
self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"]
|
||||
|
||||
@@ -217,11 +217,11 @@ if SQL_EXPORTER is None:
|
||||
self, logs_dir: Path, config_file: Path, collector_file: Path, port: int
|
||||
) -> None:
|
||||
# NOTE: Keep the version the same as in
|
||||
# compute/Dockerfile.compute-node and Dockerfile.build-tools.
|
||||
# compute/compute-node.Dockerfile and build-tools.Dockerfile.
|
||||
#
|
||||
# The "host" network mode allows sql_exporter to talk to the
|
||||
# endpoint which is running on the host.
|
||||
super().__init__("docker.io/burningalchemist/sql_exporter:0.17.0", network_mode="host")
|
||||
super().__init__("docker.io/burningalchemist/sql_exporter:0.17.3", network_mode="host")
|
||||
|
||||
self.__logs_dir = logs_dir
|
||||
self.__port = port
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
import base64
|
||||
import concurrent.futures
|
||||
import json
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from enum import Enum
|
||||
from enum import Enum, StrEnum
|
||||
from pathlib import Path
|
||||
from threading import Event
|
||||
|
||||
@@ -11,7 +14,14 @@ import pytest
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.fast_import import FastImport
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PageserverImportConfig,
|
||||
PgBin,
|
||||
PgProtocol,
|
||||
StorageControllerMigrationConfig,
|
||||
VanillaPostgres,
|
||||
)
|
||||
from fixtures.pageserver.http import (
|
||||
ImportPgdataIdemptencyKey,
|
||||
)
|
||||
@@ -494,6 +504,259 @@ def test_import_respects_tenant_shutdown(
|
||||
wait_until(cplane_notified)
|
||||
|
||||
|
||||
@skip_in_debug_build("Validation query takes too long in debug builds")
|
||||
def test_import_chaos(
|
||||
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
|
||||
):
|
||||
"""
|
||||
Perform a timeline import while injecting chaos in the environment.
|
||||
We expect that the import completes eventually, that it passes validation and
|
||||
the resulting timeline can be written to.
|
||||
"""
|
||||
TARGET_RELBOCK_SIZE = 512 * 1024 * 1024 # 512 MiB
|
||||
ALLOWED_IMPORT_RUNTIME = 90 # seconds
|
||||
SHARD_COUNT = 4
|
||||
|
||||
neon_env_builder.num_pageservers = SHARD_COUNT
|
||||
neon_env_builder.pageserver_import_config = PageserverImportConfig(
|
||||
import_job_concurrency=1,
|
||||
import_job_soft_size_limit=64 * 1024,
|
||||
import_job_checkpoint_threshold=4,
|
||||
)
|
||||
|
||||
# Set up mock control plane HTTP server to listen for import completions
|
||||
import_completion_signaled = Event()
|
||||
# There's some Python magic at play here. A list can be updated from the
|
||||
# handler thread, but an optional cannot. Hence, use a list with one element.
|
||||
import_error = []
|
||||
|
||||
def handler(request: Request) -> Response:
|
||||
assert request.json is not None
|
||||
|
||||
body = request.json
|
||||
if "error" in body:
|
||||
if body["error"]:
|
||||
import_error.append(body["error"])
|
||||
|
||||
log.info(f"control plane /import_complete request: {request.json}")
|
||||
import_completion_signaled.set()
|
||||
return Response(json.dumps({}), status=200)
|
||||
|
||||
cplane_mgmt_api_server = make_httpserver
|
||||
cplane_mgmt_api_server.expect_request(
|
||||
"/storage/api/v1/import_complete", method="PUT"
|
||||
).respond_with_handler(handler)
|
||||
|
||||
# Plug the cplane mock in
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
|
||||
)
|
||||
|
||||
# The import will specifiy a local filesystem path mocking remote storage
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
|
||||
|
||||
nrows = 0
|
||||
while True:
|
||||
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {TARGET_RELBOCK_SIZE // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= TARGET_RELBOCK_SIZE:
|
||||
break
|
||||
addrows = int((TARGET_RELBOCK_SIZE - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
vanilla_pg.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
|
||||
vanilla_pg.stop()
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
# Pause after every import task to extend the test runtime and allow
|
||||
# for more chaos injection.
|
||||
for ps in env.pageservers:
|
||||
ps.add_persistent_failpoint("import-task-complete-pausable", "sleep(5)")
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
# The shard might have moved or the pageserver hosting the shard restarted
|
||||
".*Call to node.*management API.*failed.*",
|
||||
# Migrations have their time outs set to 0
|
||||
".*Timed out after.*downloading layers.*",
|
||||
".*Failed to prepare by downloading layers.*",
|
||||
# The test may kill the storage controller or pageservers
|
||||
".*request was dropped before completing.*",
|
||||
]
|
||||
)
|
||||
for ps in env.pageservers:
|
||||
ps.allowed_errors.extend(
|
||||
[
|
||||
# We might re-write a layer in a different generation if the import
|
||||
# needs to redo some of the progress since not each job is checkpointed.
|
||||
".*was unlinked but was not dangling.*",
|
||||
# The test may kill the storage controller or pageservers
|
||||
".*request was dropped before completing.*",
|
||||
# Test can SIGTERM pageserver while it is downloading
|
||||
".*removing local file.*temp_download.*",
|
||||
".*Failed to flush heatmap.*",
|
||||
# Test can SIGTERM the storage controller while pageserver
|
||||
# is attempting to upcall.
|
||||
".*storage controller upcall failed.*timeline_import_status.*",
|
||||
# TODO(vlad): TenantManager::reset_tenant returns a blanked anyhow error.
|
||||
# It should return ResourceUnavailable or something that doesn't error log.
|
||||
".*activate_post_import.*InternalServerError.*tenant map is shutting down.*",
|
||||
# TODO(vlad): How can this happen?
|
||||
".*Failed to download a remote file: deserialize index part file.*",
|
||||
".*Cancelled request finished with an error.*",
|
||||
]
|
||||
)
|
||||
|
||||
importbucket_path = neon_env_builder.repo_dir / "test_import_chaos_bucket"
|
||||
mock_import_bucket(vanilla_pg, importbucket_path)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
idempotency = ImportPgdataIdemptencyKey.random()
|
||||
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id, shard_count=SHARD_COUNT, placement_policy={"Attached": 1}
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
env.storage_controller.timeline_create(
|
||||
tenant_id,
|
||||
{
|
||||
"new_timeline_id": str(timeline_id),
|
||||
"import_pgdata": {
|
||||
"idempotency_key": str(idempotency),
|
||||
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
def chaos(stop_chaos: threading.Event):
|
||||
class ChaosType(StrEnum):
|
||||
MIGRATE_SHARD = "migrate_shard"
|
||||
RESTART_IMMEDIATE = "restart_immediate"
|
||||
RESTART = "restart"
|
||||
STORCON_RESTART_IMMEDIATE = "storcon_restart_immediate"
|
||||
|
||||
while not stop_chaos.is_set():
|
||||
chaos_type = random.choices(
|
||||
population=[
|
||||
ChaosType.MIGRATE_SHARD,
|
||||
ChaosType.RESTART,
|
||||
ChaosType.RESTART_IMMEDIATE,
|
||||
ChaosType.STORCON_RESTART_IMMEDIATE,
|
||||
],
|
||||
weights=[0.25, 0.25, 0.25, 0.25],
|
||||
k=1,
|
||||
)[0]
|
||||
|
||||
try:
|
||||
if chaos_type == ChaosType.MIGRATE_SHARD:
|
||||
target_shard_number = random.randint(0, SHARD_COUNT - 1)
|
||||
target_shard = TenantShardId(tenant_id, target_shard_number, SHARD_COUNT)
|
||||
|
||||
placements = env.storage_controller.get_tenants_placement()
|
||||
log.info(f"{placements=}")
|
||||
target_ps = placements[str(target_shard)]["intent"]["attached"]
|
||||
if len(placements[str(target_shard)]["intent"]["secondary"]) == 0:
|
||||
dest_ps = None
|
||||
else:
|
||||
dest_ps = placements[str(target_shard)]["intent"]["secondary"][0]
|
||||
|
||||
if target_ps is None or dest_ps is None:
|
||||
continue
|
||||
|
||||
config = StorageControllerMigrationConfig(
|
||||
secondary_warmup_timeout="0s",
|
||||
secondary_download_request_timeout="0s",
|
||||
prewarm=False,
|
||||
)
|
||||
env.storage_controller.tenant_shard_migrate(target_shard, dest_ps, config)
|
||||
|
||||
log.info(
|
||||
f"CHAOS: Migrating shard {target_shard} from pageserver {target_ps} to {dest_ps}"
|
||||
)
|
||||
elif chaos_type == ChaosType.RESTART_IMMEDIATE:
|
||||
target_ps = random.choice(env.pageservers)
|
||||
log.info(f"CHAOS: Immediate restart of pageserver {target_ps.id}")
|
||||
target_ps.stop(immediate=True)
|
||||
target_ps.start()
|
||||
elif chaos_type == ChaosType.RESTART:
|
||||
target_ps = random.choice(env.pageservers)
|
||||
log.info(f"CHAOS: Normal restart of pageserver {target_ps.id}")
|
||||
target_ps.stop(immediate=False)
|
||||
target_ps.start()
|
||||
elif chaos_type == ChaosType.STORCON_RESTART_IMMEDIATE:
|
||||
log.info("CHAOS: Immediate restart of storage controller")
|
||||
env.storage_controller.stop(immediate=True)
|
||||
env.storage_controller.start()
|
||||
except Exception as e:
|
||||
log.warning(f"CHAOS: Error during chaos operation {chaos_type}: {e}")
|
||||
|
||||
# Sleep before next chaos event
|
||||
time.sleep(1)
|
||||
|
||||
log.info("Chaos injector stopped")
|
||||
|
||||
def wait_for_import_completion():
|
||||
start = time.time()
|
||||
done = import_completion_signaled.wait(ALLOWED_IMPORT_RUNTIME)
|
||||
if not done:
|
||||
raise TimeoutError(f"Import did not signal completion within {ALLOWED_IMPORT_RUNTIME}")
|
||||
|
||||
end = time.time()
|
||||
|
||||
log.info(f"Import completion signalled after {end - start}s {import_error=}")
|
||||
|
||||
if import_error:
|
||||
raise RuntimeError(f"Import error: {import_error}")
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
stop_chaos = threading.Event()
|
||||
|
||||
wait_for_import_completion_fut = executor.submit(wait_for_import_completion)
|
||||
chaos_fut = executor.submit(chaos, stop_chaos)
|
||||
|
||||
try:
|
||||
wait_for_import_completion_fut.result()
|
||||
except Exception as e:
|
||||
raise e
|
||||
finally:
|
||||
stop_chaos.set()
|
||||
chaos_fut.result()
|
||||
|
||||
import_branch_name = "imported"
|
||||
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
|
||||
endpoint = env.endpoints.create_start(branch_name=import_branch_name, tenant_id=tenant_id)
|
||||
|
||||
# Validate the imported data is legit
|
||||
assert endpoint.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"SET statement_timeout='300s';",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]]
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
# Validate writes
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name=import_branch_name)
|
||||
workload.init()
|
||||
workload.write_rows(64)
|
||||
workload.validate()
|
||||
|
||||
|
||||
def test_fast_import_with_pageserver_ingest(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
|
||||
@@ -20,6 +20,9 @@ from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="We won't create future layers any more after https://github.com/neondatabase/neon/pull/10548"
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"attach_mode",
|
||||
["default_generation", "same_generation"],
|
||||
|
||||
@@ -4158,17 +4158,12 @@ def test_storcon_create_delete_sk_down(
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
|
||||
with env.endpoints.create("main", tenant_id=tenant_id) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
with env.endpoints.create(
|
||||
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
|
||||
) as ep:
|
||||
with env.endpoints.create("child_of_main", tenant_id=tenant_id) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
@@ -4249,17 +4244,12 @@ def test_storcon_few_sk(
|
||||
|
||||
env.safekeepers[0].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
|
||||
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
|
||||
with env.endpoints.create("main", tenant_id=tenant_id) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
with env.endpoints.create(
|
||||
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
|
||||
) as ep:
|
||||
with env.endpoints.create("child_of_main", tenant_id=tenant_id) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
@@ -10,6 +10,7 @@ from queue import Empty, Queue
|
||||
from threading import Barrier
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.common_types import Lsn, TimelineArchivalState, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
@@ -401,8 +402,25 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots
|
||||
"earlier", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_pipe
|
||||
)
|
||||
|
||||
snapshot_branchpoint_old = env.create_branch(
|
||||
"snapshot_branchpoint_old", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_y
|
||||
snapshot_branchpoint_old = TimelineId.generate()
|
||||
|
||||
env.storage_controller.timeline_create(
|
||||
env.initial_tenant,
|
||||
{
|
||||
"new_timeline_id": str(snapshot_branchpoint_old),
|
||||
"ancestor_start_lsn": str(branchpoint_y),
|
||||
"ancestor_timeline_id": str(env.initial_timeline),
|
||||
"read_only": True,
|
||||
},
|
||||
)
|
||||
sk = env.safekeepers[0]
|
||||
assert sk
|
||||
with pytest.raises(requests.exceptions.HTTPError, match="Not Found"):
|
||||
sk.http_client().timeline_status(
|
||||
tenant_id=env.initial_tenant, timeline_id=snapshot_branchpoint_old
|
||||
)
|
||||
env.neon_cli.mappings_map_branch(
|
||||
"snapshot_branchpoint_old", env.initial_tenant, snapshot_branchpoint_old
|
||||
)
|
||||
|
||||
snapshot_branchpoint = env.create_branch(
|
||||
|
||||
@@ -2012,10 +2012,7 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
ep = env.endpoints.create("main", config_lines=config_lines)
|
||||
ep = env.endpoints.create("main")
|
||||
|
||||
# expected to fail because timeline is not created on safekeepers
|
||||
with pytest.raises(Exception, match=r".*timed out.*"):
|
||||
@@ -2043,10 +2040,7 @@ def test_explicit_timeline_creation_storcon(neon_env_builder: NeonEnvBuilder):
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
ep = env.endpoints.create("main", config_lines=config_lines)
|
||||
ep = env.endpoints.create("main")
|
||||
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
|
||||
@@ -637,10 +637,7 @@ async def quorum_sanity_single(
|
||||
# create timeline on `members_sks`
|
||||
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, members_sks)
|
||||
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
ep = env.endpoints.create(branch_name, config_lines=config_lines)
|
||||
ep = env.endpoints.create(branch_name)
|
||||
ep.start(safekeeper_generation=1, safekeepers=compute_sks_ids)
|
||||
ep.safe_psql("create table t(key int, value text)")
|
||||
|
||||
|
||||
@@ -18,6 +18,8 @@ license.workspace = true
|
||||
ahash = { version = "0.8" }
|
||||
anstream = { version = "0.6" }
|
||||
anyhow = { version = "1", features = ["backtrace"] }
|
||||
axum = { version = "0.8", features = ["ws"] }
|
||||
axum-core = { version = "0.5", default-features = false, features = ["tracing"] }
|
||||
base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] }
|
||||
base64-647d43efb71741da = { package = "base64", version = "0.21" }
|
||||
base64ct = { version = "1", default-features = false, features = ["std"] }
|
||||
@@ -39,10 +41,8 @@ env_logger = { version = "0.11" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
form_urlencoded = { version = "1" }
|
||||
futures-channel = { version = "0.3", features = ["sink"] }
|
||||
futures-core = { version = "0.3" }
|
||||
futures-executor = { version = "0.3" }
|
||||
futures-io = { version = "0.3" }
|
||||
futures-task = { version = "0.3", default-features = false, features = ["std"] }
|
||||
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
|
||||
generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] }
|
||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
@@ -52,7 +52,7 @@ hex = { version = "0.4", features = ["serde"] }
|
||||
hmac = { version = "0.12", default-features = false, features = ["reset"] }
|
||||
hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "runtime", "server", "stream"] }
|
||||
hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] }
|
||||
hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "server", "service"] }
|
||||
hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] }
|
||||
indexmap = { version = "2", features = ["serde"] }
|
||||
itertools = { version = "0.12" }
|
||||
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
|
||||
@@ -72,7 +72,6 @@ num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
p256 = { version = "0.13", features = ["jwk"] }
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
percent-encoding = { version = "2" }
|
||||
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
regex = { version = "1" }
|
||||
@@ -98,7 +97,7 @@ tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unpref
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["full", "test-util"] }
|
||||
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
|
||||
tokio-stream = { version = "0.1" }
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
toml_edit = { version = "0.22", features = ["serde"] }
|
||||
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }
|
||||
|
||||
Reference in New Issue
Block a user