mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-21 20:32:56 +00:00
Compare commits
12 Commits
proxy-kick
...
tmp-repro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9a0aad582e | ||
|
|
d175570387 | ||
|
|
e0ac28d2fa | ||
|
|
93f3f4ab5f | ||
|
|
f6e2e0042d | ||
|
|
b917270c67 | ||
|
|
b067378d0d | ||
|
|
768c8d9972 | ||
|
|
f1d960d2c2 | ||
|
|
cd17802b1f | ||
|
|
10a5d36af8 | ||
|
|
a7ab53c80c |
@@ -19,7 +19,7 @@ inputs:
|
||||
run_in_parallel:
|
||||
description: 'Whether to run tests in parallel'
|
||||
required: false
|
||||
default: 'true'
|
||||
default: 'false'
|
||||
save_perf_report:
|
||||
description: 'Whether to upload the performance report, if true PERF_TEST_RESULT_CONNSTR env variable should be set'
|
||||
required: false
|
||||
@@ -171,7 +171,7 @@ runs:
|
||||
--junitxml=$TEST_OUTPUT/junit.xml \
|
||||
--alluredir=$TEST_OUTPUT/allure/results \
|
||||
--tb=short \
|
||||
--verbose \
|
||||
--verbose -k "test_forward or test_create_snapsh" -x \
|
||||
-rA $TEST_SELECTION $EXTRA_PARAMS
|
||||
|
||||
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
|
||||
|
||||
18
.github/ansible/deploy.yaml
vendored
18
.github/ansible/deploy.yaml
vendored
@@ -91,6 +91,15 @@
|
||||
tags:
|
||||
- pageserver
|
||||
|
||||
# used in `pageserver.service` template
|
||||
- name: learn current availability_zone
|
||||
shell:
|
||||
cmd: "curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone"
|
||||
register: ec2_availability_zone
|
||||
|
||||
- set_fact:
|
||||
ec2_availability_zone={{ ec2_availability_zone.stdout }}
|
||||
|
||||
- name: upload systemd service definition
|
||||
ansible.builtin.template:
|
||||
src: systemd/pageserver.service
|
||||
@@ -153,6 +162,15 @@
|
||||
tags:
|
||||
- safekeeper
|
||||
|
||||
# used in `safekeeper.service` template
|
||||
- name: learn current availability_zone
|
||||
shell:
|
||||
cmd: "curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone"
|
||||
register: ec2_availability_zone
|
||||
|
||||
- set_fact:
|
||||
ec2_availability_zone={{ ec2_availability_zone.stdout }}
|
||||
|
||||
# in the future safekeepers should discover pageservers byself
|
||||
# but currently use first pageserver that was discovered
|
||||
- name: set first pageserver var for safekeepers
|
||||
|
||||
2
.github/ansible/prod.eu-central-1.hosts.yaml
vendored
2
.github/ansible/prod.eu-central-1.hosts.yaml
vendored
@@ -27,6 +27,8 @@ storage:
|
||||
ansible_host: i-0cd8d316ecbb715be
|
||||
pageserver-1.eu-central-1.aws.neon.tech:
|
||||
ansible_host: i-090044ed3d383fef0
|
||||
pageserver-2.eu-central-1.aws.neon.tech:
|
||||
ansible_host: i-033584edf3f4b6742
|
||||
|
||||
safekeepers:
|
||||
hosts:
|
||||
|
||||
2
.github/ansible/systemd/pageserver.service
vendored
2
.github/ansible/systemd/pageserver.service
vendored
@@ -6,7 +6,7 @@ After=network.target auditd.service
|
||||
Type=simple
|
||||
User=pageserver
|
||||
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_PAGESERVER }} SENTRY_ENVIRONMENT={{ sentry_environment }}
|
||||
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoint='{{ broker_endpoint }}'" -D /storage/pageserver/data
|
||||
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoint='{{ broker_endpoint }}'" -c "availability_zone='{{ ec2_availability_zone }}'" -D /storage/pageserver/data
|
||||
ExecReload=/bin/kill -HUP $MAINPID
|
||||
KillMode=mixed
|
||||
KillSignal=SIGINT
|
||||
|
||||
2
.github/ansible/systemd/safekeeper.service
vendored
2
.github/ansible/systemd/safekeeper.service
vendored
@@ -6,7 +6,7 @@ After=network.target auditd.service
|
||||
Type=simple
|
||||
User=safekeeper
|
||||
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_SAFEKEEPER }} SENTRY_ENVIRONMENT={{ sentry_environment }}
|
||||
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoint={{ broker_endpoint }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}'
|
||||
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoint={{ broker_endpoint }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}' --availability-zone={{ ec2_availability_zone }}
|
||||
ExecReload=/bin/kill -HUP $MAINPID
|
||||
KillMode=mixed
|
||||
KillSignal=SIGINT
|
||||
|
||||
1
.github/workflows/build_and_test.yml
vendored
1
.github/workflows/build_and_test.yml
vendored
@@ -5,6 +5,7 @@ on:
|
||||
branches:
|
||||
- main
|
||||
- release
|
||||
- tmp-repro
|
||||
pull_request:
|
||||
|
||||
defaults:
|
||||
|
||||
@@ -2,26 +2,69 @@
|
||||
|
||||
ARG SRC_IMAGE
|
||||
ARG VM_INFORMANT_VERSION=v0.1.14
|
||||
# on libcgroup update, make sure to check bootstrap.sh for changes
|
||||
ARG LIBCGROUP_VERSION=v2.0.3
|
||||
|
||||
# Pull VM informant and set up inittab
|
||||
# Pull VM informant, to copy from later
|
||||
FROM neondatabase/vm-informant:$VM_INFORMANT_VERSION as informant
|
||||
|
||||
# Build cgroup-tools
|
||||
#
|
||||
# At time of writing (2023-03-14), debian bullseye has a version of cgroup-tools (technically
|
||||
# libcgroup) that doesn't support cgroup v2 (version 0.41-11). Unfortunately, the vm-informant
|
||||
# requires cgroup v2, so we'll build cgroup-tools ourselves.
|
||||
FROM debian:bullseye-slim as libcgroup-builder
|
||||
ARG LIBCGROUP_VERSION
|
||||
|
||||
RUN set -exu \
|
||||
&& apt update \
|
||||
&& apt install --no-install-recommends -y \
|
||||
git \
|
||||
ca-certificates \
|
||||
automake \
|
||||
cmake \
|
||||
make \
|
||||
gcc \
|
||||
byacc \
|
||||
flex \
|
||||
libtool \
|
||||
libpam0g-dev \
|
||||
&& git clone --depth 1 -b $LIBCGROUP_VERSION https://github.com/libcgroup/libcgroup \
|
||||
&& INSTALL_DIR="/libcgroup-install" \
|
||||
&& mkdir -p "$INSTALL_DIR/bin" "$INSTALL_DIR/include" \
|
||||
&& cd libcgroup \
|
||||
# extracted from bootstrap.sh, with modified flags:
|
||||
&& (test -d m4 || mkdir m4) \
|
||||
&& autoreconf -fi \
|
||||
&& rm -rf autom4te.cache \
|
||||
&& CFLAGS="-O3" ./configure --prefix="$INSTALL_DIR" --sysconfdir=/etc --localstatedir=/var --enable-opaque-hierarchy="name=systemd" \
|
||||
# actually build the thing...
|
||||
&& make install
|
||||
|
||||
# Combine, starting from non-VM compute node image.
|
||||
FROM $SRC_IMAGE as base
|
||||
|
||||
# Temporarily set user back to root so we can run adduser, set inittab
|
||||
USER root
|
||||
RUN adduser vm-informant --disabled-password --no-create-home
|
||||
|
||||
RUN set -e \
|
||||
&& rm -f /etc/inittab \
|
||||
&& touch /etc/inittab
|
||||
|
||||
RUN set -e \
|
||||
&& echo "::sysinit:cgconfigparser -l /etc/cgconfig.conf -s 1664" >> /etc/inittab \
|
||||
&& CONNSTR="dbname=neondb user=cloud_admin sslmode=disable" \
|
||||
&& ARGS="--auto-restart --pgconnstr=\"$CONNSTR\"" \
|
||||
&& ARGS="--auto-restart --cgroup=neon-postgres --pgconnstr=\"$CONNSTR\"" \
|
||||
&& echo "::respawn:su vm-informant -c '/usr/local/bin/vm-informant $ARGS'" >> /etc/inittab
|
||||
|
||||
# Combine, starting from non-VM compute node image.
|
||||
FROM $SRC_IMAGE as base
|
||||
|
||||
# Temporarily set user back to root so we can run adduser
|
||||
USER root
|
||||
RUN adduser vm-informant --disabled-password --no-create-home
|
||||
USER postgres
|
||||
|
||||
COPY --from=informant /etc/inittab /etc/inittab
|
||||
ADD vm-cgconfig.conf /etc/cgconfig.conf
|
||||
COPY --from=informant /usr/bin/vm-informant /usr/local/bin/vm-informant
|
||||
|
||||
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
|
||||
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
|
||||
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/
|
||||
|
||||
ENTRYPOINT ["/usr/sbin/cgexec", "-g", "*:neon-postgres", "/usr/local/bin/compute_ctl"]
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
[pageserver]
|
||||
listen_pg_addr = '127.0.0.1:64000'
|
||||
listen_http_addr = '127.0.0.1:9898'
|
||||
auth_type = 'Trust'
|
||||
pg_auth_type = 'Trust'
|
||||
http_auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
id = 1
|
||||
|
||||
@@ -3,7 +3,8 @@
|
||||
[pageserver]
|
||||
listen_pg_addr = '127.0.0.1:64000'
|
||||
listen_http_addr = '127.0.0.1:9898'
|
||||
auth_type = 'Trust'
|
||||
pg_auth_type = 'Trust'
|
||||
http_auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
id = 1
|
||||
|
||||
@@ -53,14 +53,15 @@ listen_addr = '{DEFAULT_BROKER_ADDR}'
|
||||
id = {DEFAULT_PAGESERVER_ID}
|
||||
listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}'
|
||||
listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}'
|
||||
auth_type = '{pageserver_auth_type}'
|
||||
pg_auth_type = '{trust_auth}'
|
||||
http_auth_type = '{trust_auth}'
|
||||
|
||||
[[safekeepers]]
|
||||
id = {DEFAULT_SAFEKEEPER_ID}
|
||||
pg_port = {DEFAULT_SAFEKEEPER_PG_PORT}
|
||||
http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT}
|
||||
"#,
|
||||
pageserver_auth_type = AuthType::Trust,
|
||||
trust_auth = AuthType::Trust,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -627,7 +628,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
|
||||
let node = cplane.nodes.get(&(tenant_id, node_name.to_string()));
|
||||
|
||||
let auth_token = if matches!(env.pageserver.auth_type, AuthType::NeonJWT) {
|
||||
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
|
||||
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
|
||||
|
||||
Some(env.generate_auth_token(&claims)?)
|
||||
|
||||
@@ -97,7 +97,7 @@ impl ComputeControlPlane {
|
||||
});
|
||||
|
||||
node.create_pgdata()?;
|
||||
node.setup_pg_conf(self.env.pageserver.auth_type)?;
|
||||
node.setup_pg_conf(self.env.pageserver.pg_auth_type)?;
|
||||
|
||||
self.nodes
|
||||
.insert((tenant_id, node.name.clone()), Arc::clone(&node));
|
||||
|
||||
@@ -110,12 +110,14 @@ impl NeonBroker {
|
||||
pub struct PageServerConf {
|
||||
// node id
|
||||
pub id: NodeId,
|
||||
|
||||
// Pageserver connection settings
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
|
||||
// used to determine which auth type is used
|
||||
pub auth_type: AuthType,
|
||||
// auth type used for the PG and HTTP ports
|
||||
pub pg_auth_type: AuthType,
|
||||
pub http_auth_type: AuthType,
|
||||
|
||||
// jwt auth token used for communication with pageserver
|
||||
pub auth_token: String,
|
||||
@@ -127,7 +129,8 @@ impl Default for PageServerConf {
|
||||
id: NodeId(0),
|
||||
listen_pg_addr: String::new(),
|
||||
listen_http_addr: String::new(),
|
||||
auth_type: AuthType::Trust,
|
||||
pg_auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
auth_token: String::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ impl PageServerNode {
|
||||
let (host, port) = parse_host_port(&env.pageserver.listen_pg_addr)
|
||||
.expect("Unable to parse listen_pg_addr");
|
||||
let port = port.unwrap_or(5432);
|
||||
let password = if env.pageserver.auth_type == AuthType::NeonJWT {
|
||||
let password = if env.pageserver.pg_auth_type == AuthType::NeonJWT {
|
||||
Some(env.pageserver.auth_token.clone())
|
||||
} else {
|
||||
None
|
||||
@@ -106,25 +106,32 @@ impl PageServerNode {
|
||||
self.env.pg_distrib_dir_raw().display()
|
||||
);
|
||||
|
||||
let authg_type_param = format!("auth_type='{}'", self.env.pageserver.auth_type);
|
||||
let http_auth_type_param =
|
||||
format!("http_auth_type='{}'", self.env.pageserver.http_auth_type);
|
||||
let listen_http_addr_param = format!(
|
||||
"listen_http_addr='{}'",
|
||||
self.env.pageserver.listen_http_addr
|
||||
);
|
||||
|
||||
let pg_auth_type_param = format!("pg_auth_type='{}'", self.env.pageserver.pg_auth_type);
|
||||
let listen_pg_addr_param =
|
||||
format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr);
|
||||
|
||||
let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
|
||||
|
||||
let mut overrides = vec![
|
||||
id,
|
||||
pg_distrib_dir_param,
|
||||
authg_type_param,
|
||||
http_auth_type_param,
|
||||
pg_auth_type_param,
|
||||
listen_http_addr_param,
|
||||
listen_pg_addr_param,
|
||||
broker_endpoint_param,
|
||||
];
|
||||
|
||||
if self.env.pageserver.auth_type != AuthType::Trust {
|
||||
if self.env.pageserver.http_auth_type != AuthType::Trust
|
||||
|| self.env.pageserver.pg_auth_type != AuthType::Trust
|
||||
{
|
||||
overrides.push("auth_validation_public_key_path='auth_public_key.pem'".to_owned());
|
||||
}
|
||||
overrides
|
||||
@@ -247,7 +254,10 @@ impl PageServerNode {
|
||||
}
|
||||
|
||||
fn pageserver_env_variables(&self) -> anyhow::Result<Vec<(String, String)>> {
|
||||
Ok(if self.env.pageserver.auth_type != AuthType::Trust {
|
||||
// FIXME: why is this tied to pageserver's auth type? Whether or not the safekeeper
|
||||
// needs a token, and how to generate that token, seems independent to whether
|
||||
// the pageserver requires a token in incoming requests.
|
||||
Ok(if self.env.pageserver.http_auth_type != AuthType::Trust {
|
||||
// Generate a token to connect from the pageserver to a safekeeper
|
||||
let token = self
|
||||
.env
|
||||
@@ -283,7 +293,7 @@ impl PageServerNode {
|
||||
|
||||
fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
|
||||
let mut builder = self.http_client.request(method, url);
|
||||
if self.env.pageserver.auth_type == AuthType::NeonJWT {
|
||||
if self.env.pageserver.http_auth_type == AuthType::NeonJWT {
|
||||
builder = builder.bearer_auth(&self.env.pageserver.auth_token)
|
||||
}
|
||||
builder
|
||||
|
||||
@@ -115,6 +115,10 @@ impl SafekeeperNode {
|
||||
let datadir = self.datadir_path();
|
||||
|
||||
let id_string = id.to_string();
|
||||
// TODO: add availability_zone to the config.
|
||||
// Right now we just specify any value here and use it to check metrics in tests.
|
||||
let availability_zone = format!("sk-{}", id_string);
|
||||
|
||||
let mut args = vec![
|
||||
"-D",
|
||||
datadir.to_str().with_context(|| {
|
||||
@@ -126,6 +130,8 @@ impl SafekeeperNode {
|
||||
&listen_pg,
|
||||
"--listen-http",
|
||||
&listen_http,
|
||||
"--availability-zone",
|
||||
&availability_zone,
|
||||
];
|
||||
if !self.conf.sync {
|
||||
args.push("--no-sync");
|
||||
|
||||
@@ -137,10 +137,12 @@ Each compute should present a token valid for the timeline's tenant.
|
||||
Pageserver also has HTTP API: some parts are per-tenant,
|
||||
some parts are server-wide, these are different scopes.
|
||||
|
||||
The `auth_type` configuration variable in Pageserver's config may have
|
||||
either of three values:
|
||||
Authentication can be enabled separately for the HTTP mgmt API, and
|
||||
for the libpq connections from compute. The `http_auth_type` and
|
||||
`pg_auth_type` configuration variables in Pageserver's config may
|
||||
have one of these values:
|
||||
|
||||
* `Trust` removes all authentication. The outdated `MD5` value does likewise
|
||||
* `Trust` removes all authentication.
|
||||
* `NeonJWT` enables JWT validation.
|
||||
Tokens are validated using the public key which lies in a PEM file
|
||||
specified in the `auth_validation_public_key_path` config.
|
||||
|
||||
@@ -63,9 +63,9 @@ impl<S> Framed<S> {
|
||||
&self.stream
|
||||
}
|
||||
|
||||
/// Extract the underlying stream.
|
||||
pub fn into_inner(self) -> S {
|
||||
self.stream
|
||||
/// Deconstruct into the underlying stream and read buffer.
|
||||
pub fn into_inner(self) -> (S, BytesMut) {
|
||||
(self.stream, self.read_buf)
|
||||
}
|
||||
|
||||
/// Return new Framed with stream type transformed by async f, for TLS
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::ops::{Add, AddAssign};
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use tracing::info;
|
||||
|
||||
use crate::seqwait::MonotonicCounter;
|
||||
|
||||
@@ -239,6 +240,7 @@ impl MonotonicCounter<Lsn> for RecordLsn {
|
||||
let new_prev = self.last;
|
||||
self.last = lsn;
|
||||
self.prev = new_prev;
|
||||
info!("advanced record lsn to {}/{}", self.last, self.prev);
|
||||
}
|
||||
fn cnt_value(&self) -> Lsn {
|
||||
self.last
|
||||
|
||||
@@ -270,15 +270,31 @@ fn start_pageserver(
|
||||
WALRECEIVER_RUNTIME.block_on(pageserver::broker_client::init_broker_client(conf))?;
|
||||
|
||||
// Initialize authentication for incoming connections
|
||||
let auth = match &conf.auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => {
|
||||
// unwrap is ok because check is performed when creating config, so path is set and file exists
|
||||
let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
|
||||
Some(JwtAuth::from_key_path(key_path)?.into())
|
||||
}
|
||||
};
|
||||
info!("Using auth: {:#?}", conf.auth_type);
|
||||
let http_auth;
|
||||
let pg_auth;
|
||||
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
|
||||
// unwrap is ok because check is performed when creating config, so path is set and file exists
|
||||
let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
|
||||
info!(
|
||||
"Loading public key for verifying JWT tokens from {:#?}",
|
||||
key_path
|
||||
);
|
||||
let auth: Arc<JwtAuth> = Arc::new(JwtAuth::from_key_path(key_path)?);
|
||||
|
||||
http_auth = match &conf.http_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth.clone()),
|
||||
};
|
||||
pg_auth = match &conf.pg_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth),
|
||||
};
|
||||
} else {
|
||||
http_auth = None;
|
||||
pg_auth = None;
|
||||
}
|
||||
info!("Using auth for http API: {:#?}", conf.http_auth_type);
|
||||
info!("Using auth for pg connections: {:#?}", conf.pg_auth_type);
|
||||
|
||||
match var("NEON_AUTH_TOKEN") {
|
||||
Ok(v) => {
|
||||
@@ -308,7 +324,7 @@ fn start_pageserver(
|
||||
{
|
||||
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
|
||||
|
||||
let router = http::make_router(conf, launch_ts, auth.clone(), remote_storage)?
|
||||
let router = http::make_router(conf, launch_ts, http_auth, remote_storage)?
|
||||
.build()
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let service = utils::http::RouterService::new(router).unwrap();
|
||||
@@ -382,9 +398,9 @@ fn start_pageserver(
|
||||
async move {
|
||||
page_service::libpq_listener_main(
|
||||
conf,
|
||||
auth,
|
||||
pg_auth,
|
||||
pageserver_listener,
|
||||
conf.auth_type,
|
||||
conf.pg_auth_type,
|
||||
libpq_ctx,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -118,6 +118,9 @@ pub struct PageServerConf {
|
||||
/// Example (default): 127.0.0.1:9898
|
||||
pub listen_http_addr: String,
|
||||
|
||||
/// Current availability zone. Used for traffic metrics.
|
||||
pub availability_zone: Option<String>,
|
||||
|
||||
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
||||
pub wait_lsn_timeout: Duration,
|
||||
// How long to wait for WAL redo to complete.
|
||||
@@ -138,9 +141,15 @@ pub struct PageServerConf {
|
||||
|
||||
pub pg_distrib_dir: PathBuf,
|
||||
|
||||
pub auth_type: AuthType,
|
||||
|
||||
// Authentication
|
||||
/// authentication method for the HTTP mgmt API
|
||||
pub http_auth_type: AuthType,
|
||||
/// authentication method for libpq connections from compute
|
||||
pub pg_auth_type: AuthType,
|
||||
/// Path to a file containing public key for verifying JWT tokens.
|
||||
/// Used for both mgmt and compute auth, if enabled.
|
||||
pub auth_validation_public_key_path: Option<PathBuf>,
|
||||
|
||||
pub remote_storage_config: Option<RemoteStorageConfig>,
|
||||
|
||||
pub default_tenant_conf: TenantConf,
|
||||
@@ -196,6 +205,8 @@ struct PageServerConfigBuilder {
|
||||
|
||||
listen_http_addr: BuilderValue<String>,
|
||||
|
||||
availability_zone: BuilderValue<Option<String>>,
|
||||
|
||||
wait_lsn_timeout: BuilderValue<Duration>,
|
||||
wal_redo_timeout: BuilderValue<Duration>,
|
||||
|
||||
@@ -208,7 +219,8 @@ struct PageServerConfigBuilder {
|
||||
|
||||
pg_distrib_dir: BuilderValue<PathBuf>,
|
||||
|
||||
auth_type: BuilderValue<AuthType>,
|
||||
http_auth_type: BuilderValue<AuthType>,
|
||||
pg_auth_type: BuilderValue<AuthType>,
|
||||
|
||||
//
|
||||
auth_validation_public_key_path: BuilderValue<Option<PathBuf>>,
|
||||
@@ -240,6 +252,7 @@ impl Default for PageServerConfigBuilder {
|
||||
Self {
|
||||
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
|
||||
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||
availability_zone: Set(None),
|
||||
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
|
||||
.expect("cannot parse default wait lsn timeout")),
|
||||
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
||||
@@ -251,7 +264,8 @@ impl Default for PageServerConfigBuilder {
|
||||
pg_distrib_dir: Set(env::current_dir()
|
||||
.expect("cannot access current directory")
|
||||
.join("pg_install")),
|
||||
auth_type: Set(AuthType::Trust),
|
||||
http_auth_type: Set(AuthType::Trust),
|
||||
pg_auth_type: Set(AuthType::Trust),
|
||||
auth_validation_public_key_path: Set(None),
|
||||
remote_storage_config: Set(None),
|
||||
id: NotSet,
|
||||
@@ -295,6 +309,10 @@ impl PageServerConfigBuilder {
|
||||
self.listen_http_addr = BuilderValue::Set(listen_http_addr)
|
||||
}
|
||||
|
||||
pub fn availability_zone(&mut self, availability_zone: Option<String>) {
|
||||
self.availability_zone = BuilderValue::Set(availability_zone)
|
||||
}
|
||||
|
||||
pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
|
||||
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
|
||||
}
|
||||
@@ -323,8 +341,12 @@ impl PageServerConfigBuilder {
|
||||
self.pg_distrib_dir = BuilderValue::Set(pg_distrib_dir)
|
||||
}
|
||||
|
||||
pub fn auth_type(&mut self, auth_type: AuthType) {
|
||||
self.auth_type = BuilderValue::Set(auth_type)
|
||||
pub fn http_auth_type(&mut self, auth_type: AuthType) {
|
||||
self.http_auth_type = BuilderValue::Set(auth_type)
|
||||
}
|
||||
|
||||
pub fn pg_auth_type(&mut self, auth_type: AuthType) {
|
||||
self.pg_auth_type = BuilderValue::Set(auth_type)
|
||||
}
|
||||
|
||||
pub fn auth_validation_public_key_path(
|
||||
@@ -402,6 +424,9 @@ impl PageServerConfigBuilder {
|
||||
listen_http_addr: self
|
||||
.listen_http_addr
|
||||
.ok_or(anyhow!("missing listen_http_addr"))?,
|
||||
availability_zone: self
|
||||
.availability_zone
|
||||
.ok_or(anyhow!("missing availability_zone"))?,
|
||||
wait_lsn_timeout: self
|
||||
.wait_lsn_timeout
|
||||
.ok_or(anyhow!("missing wait_lsn_timeout"))?,
|
||||
@@ -419,7 +444,10 @@ impl PageServerConfigBuilder {
|
||||
pg_distrib_dir: self
|
||||
.pg_distrib_dir
|
||||
.ok_or(anyhow!("missing pg_distrib_dir"))?,
|
||||
auth_type: self.auth_type.ok_or(anyhow!("missing auth_type"))?,
|
||||
http_auth_type: self
|
||||
.http_auth_type
|
||||
.ok_or(anyhow!("missing http_auth_type"))?,
|
||||
pg_auth_type: self.pg_auth_type.ok_or(anyhow!("missing pg_auth_type"))?,
|
||||
auth_validation_public_key_path: self
|
||||
.auth_validation_public_key_path
|
||||
.ok_or(anyhow!("missing auth_validation_public_key_path"))?,
|
||||
@@ -599,6 +627,7 @@ impl PageServerConf {
|
||||
match key {
|
||||
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
|
||||
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
|
||||
"availability_zone" => builder.availability_zone(Some(parse_toml_string(key, item)?)),
|
||||
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
|
||||
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
|
||||
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
|
||||
@@ -612,7 +641,8 @@ impl PageServerConf {
|
||||
"auth_validation_public_key_path" => builder.auth_validation_public_key_path(Some(
|
||||
PathBuf::from(parse_toml_string(key, item)?),
|
||||
)),
|
||||
"auth_type" => builder.auth_type(parse_toml_from_str(key, item)?),
|
||||
"http_auth_type" => builder.http_auth_type(parse_toml_from_str(key, item)?),
|
||||
"pg_auth_type" => builder.pg_auth_type(parse_toml_from_str(key, item)?),
|
||||
"remote_storage" => {
|
||||
builder.remote_storage_config(RemoteStorageConfig::from_toml(item)?)
|
||||
}
|
||||
@@ -647,7 +677,7 @@ impl PageServerConf {
|
||||
|
||||
let mut conf = builder.build().context("invalid config")?;
|
||||
|
||||
if conf.auth_type == AuthType::NeonJWT {
|
||||
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
|
||||
let auth_validation_public_key_path = conf
|
||||
.auth_validation_public_key_path
|
||||
.get_or_insert_with(|| workdir.join("auth_public_key.pem"));
|
||||
@@ -763,10 +793,12 @@ impl PageServerConf {
|
||||
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
availability_zone: None,
|
||||
superuser: "cloud_admin".to_string(),
|
||||
workdir: repo_dir,
|
||||
pg_distrib_dir,
|
||||
auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
pg_auth_type: AuthType::Trust,
|
||||
auth_validation_public_key_path: None,
|
||||
remote_storage_config: None,
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
@@ -944,6 +976,7 @@ log_format = 'json'
|
||||
id: NodeId(10),
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
availability_zone: None,
|
||||
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
|
||||
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
|
||||
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
|
||||
@@ -951,7 +984,8 @@ log_format = 'json'
|
||||
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
workdir,
|
||||
pg_distrib_dir,
|
||||
auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
pg_auth_type: AuthType::Trust,
|
||||
auth_validation_public_key_path: None,
|
||||
remote_storage_config: None,
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
@@ -1001,6 +1035,7 @@ log_format = 'json'
|
||||
id: NodeId(10),
|
||||
listen_pg_addr: "127.0.0.1:64000".to_string(),
|
||||
listen_http_addr: "127.0.0.1:9898".to_string(),
|
||||
availability_zone: None,
|
||||
wait_lsn_timeout: Duration::from_secs(111),
|
||||
wal_redo_timeout: Duration::from_secs(111),
|
||||
superuser: "zzzz".to_string(),
|
||||
@@ -1008,7 +1043,8 @@ log_format = 'json'
|
||||
max_file_descriptors: 333,
|
||||
workdir,
|
||||
pg_distrib_dir,
|
||||
auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
pg_auth_type: AuthType::Trust,
|
||||
auth_validation_public_key_path: None,
|
||||
remote_storage_config: None,
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::mgr::TenantMapInsertError;
|
||||
use crate::tenant::mgr::{TenantMapInsertError, TenantStateError};
|
||||
use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
@@ -82,38 +82,52 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
|
||||
get_state(request).conf
|
||||
}
|
||||
|
||||
/// Check that the requester is authorized to operate on given tenant
|
||||
fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
|
||||
check_permission_with(request, |claims| {
|
||||
crate::auth::check_permission(claims, tenant_id)
|
||||
})
|
||||
}
|
||||
|
||||
fn apierror_from_prerror(err: PageReconstructError) -> ApiError {
|
||||
match err {
|
||||
PageReconstructError::Other(err) => ApiError::InternalServerError(err),
|
||||
PageReconstructError::NeedsDownload(_, _) => {
|
||||
// This shouldn't happen, because we use a RequestContext that requests to
|
||||
// download any missing layer files on-demand.
|
||||
ApiError::InternalServerError(anyhow::anyhow!("need to download remote layer file"))
|
||||
}
|
||||
PageReconstructError::Cancelled => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
|
||||
}
|
||||
PageReconstructError::WalRedo(err) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(err))
|
||||
impl From<PageReconstructError> for ApiError {
|
||||
fn from(pre: PageReconstructError) -> ApiError {
|
||||
match pre {
|
||||
PageReconstructError::Other(pre) => ApiError::InternalServerError(pre),
|
||||
PageReconstructError::NeedsDownload(_, _) => {
|
||||
// This shouldn't happen, because we use a RequestContext that requests to
|
||||
// download any missing layer files on-demand.
|
||||
ApiError::InternalServerError(anyhow::anyhow!("need to download remote layer file"))
|
||||
}
|
||||
PageReconstructError::Cancelled => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
|
||||
}
|
||||
PageReconstructError::WalRedo(pre) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(pre))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn apierror_from_tenant_map_insert_error(e: TenantMapInsertError) -> ApiError {
|
||||
match e {
|
||||
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||
impl From<TenantMapInsertError> for ApiError {
|
||||
fn from(tmie: TenantMapInsertError) -> ApiError {
|
||||
match tmie {
|
||||
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(tmie))
|
||||
}
|
||||
TenantMapInsertError::TenantAlreadyExists(id, state) => {
|
||||
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
|
||||
}
|
||||
TenantMapInsertError::Closure(e) => ApiError::InternalServerError(e),
|
||||
}
|
||||
TenantMapInsertError::TenantAlreadyExists(id, state) => {
|
||||
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TenantStateError> for ApiError {
|
||||
fn from(tse: TenantStateError) -> ApiError {
|
||||
match tse {
|
||||
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid)),
|
||||
_ => ApiError::InternalServerError(anyhow::Error::new(tse)),
|
||||
}
|
||||
TenantMapInsertError::Closure(e) => ApiError::InternalServerError(e),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -217,9 +231,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||
|
||||
let tenant = mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
match tenant.create_timeline(
|
||||
new_timeline_id,
|
||||
request_data.ancestor_timeline_id.map(TimelineId::from),
|
||||
@@ -249,9 +261,7 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
|
||||
let response_data = async {
|
||||
let tenant = mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
let timelines = tenant.list_timelines();
|
||||
|
||||
let mut response_data = Vec::with_capacity(timelines.len());
|
||||
@@ -267,7 +277,7 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
|
||||
response_data.push(timeline_info);
|
||||
}
|
||||
Ok(response_data)
|
||||
Ok::<Vec<TimelineInfo>, ApiError>(response_data)
|
||||
}
|
||||
.instrument(info_span!("timeline_list", tenant = %tenant_id))
|
||||
.await?;
|
||||
@@ -286,9 +296,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
|
||||
let timeline_info = async {
|
||||
let tenant = mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, false)
|
||||
@@ -324,10 +332,7 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let result = timeline
|
||||
.find_lsn_for_timestamp(timestamp_pg, &ctx)
|
||||
.await
|
||||
.map_err(apierror_from_prerror)?;
|
||||
let result = timeline.find_lsn_for_timestamp(timestamp_pg, &ctx).await?;
|
||||
|
||||
let result = match result {
|
||||
LsnForTimestamp::Present(lsn) => format!("{lsn}"),
|
||||
@@ -352,8 +357,7 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
if let Some(remote_storage) = &state.remote_storage {
|
||||
mgr::attach_tenant(state.conf, tenant_id, remote_storage.clone(), &ctx)
|
||||
.instrument(info_span!("tenant_attach", tenant = %tenant_id))
|
||||
.await
|
||||
.map_err(apierror_from_tenant_map_insert_error)?;
|
||||
.await?;
|
||||
} else {
|
||||
return Err(ApiError::BadRequest(anyhow!(
|
||||
"attach_tenant is not possible because pageserver was configured without remote storage"
|
||||
@@ -372,11 +376,7 @@ async fn timeline_delete_handler(request: Request<Body>) -> Result<Response<Body
|
||||
|
||||
mgr::delete_timeline(tenant_id, timeline_id, &ctx)
|
||||
.instrument(info_span!("timeline_delete", tenant = %tenant_id, timeline = %timeline_id))
|
||||
.await
|
||||
// FIXME: Errors from `delete_timeline` can occur for a number of reasons, incuding both
|
||||
// user and internal errors. Replace this with better handling once the error type permits
|
||||
// it.
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -389,10 +389,7 @@ async fn tenant_detach_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
let conf = state.conf;
|
||||
mgr::detach_tenant(conf, tenant_id)
|
||||
.instrument(info_span!("tenant_detach", tenant = %tenant_id))
|
||||
.await
|
||||
// FIXME: Errors from `detach_tenant` can be caused by both both user and internal errors.
|
||||
// Replace this with better handling once the error type permits it.
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -406,8 +403,7 @@ async fn tenant_load_handler(request: Request<Body>) -> Result<Response<Body>, A
|
||||
let state = get_state(&request);
|
||||
mgr::load_tenant(state.conf, tenant_id, state.remote_storage.clone(), &ctx)
|
||||
.instrument(info_span!("load", tenant = %tenant_id))
|
||||
.await
|
||||
.map_err(apierror_from_tenant_map_insert_error)?;
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
@@ -420,10 +416,7 @@ async fn tenant_ignore_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
let conf = state.conf;
|
||||
mgr::ignore_tenant(conf, tenant_id)
|
||||
.instrument(info_span!("ignore_tenant", tenant = %tenant_id))
|
||||
.await
|
||||
// FIXME: Errors from `ignore_tenant` can be caused by both both user and internal errors.
|
||||
// Replace this with better handling once the error type permits it.
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -497,9 +490,7 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
|
||||
let headers = request.headers();
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let tenant = mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
|
||||
// this can be long operation
|
||||
let inputs = tenant
|
||||
@@ -762,8 +753,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!("tenant_create", tenant = ?target_tenant_id))
|
||||
.await
|
||||
.map_err(apierror_from_tenant_map_insert_error)?;
|
||||
.await?;
|
||||
|
||||
// We created the tenant. Existing API semantics are that the tenant
|
||||
// is Active when this function returns.
|
||||
@@ -787,9 +777,7 @@ async fn get_tenant_config_handler(request: Request<Body>) -> Result<Response<Bo
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let tenant = mgr::get_tenant(tenant_id, false)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let tenant = mgr::get_tenant(tenant_id, false).await?;
|
||||
|
||||
let response = HashMap::from([
|
||||
(
|
||||
@@ -884,10 +872,7 @@ async fn update_tenant_config_handler(
|
||||
let state = get_state(&request);
|
||||
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
|
||||
.instrument(info_span!("tenant_config", tenant = ?tenant_id))
|
||||
.await
|
||||
// FIXME: `update_tenant_config` can fail because of both user and internal errors.
|
||||
// Replace this `map_err` with better error handling once the type permits it
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -1024,9 +1009,7 @@ async fn active_timeline_of_active_tenant(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<Arc<Timeline>, ApiError> {
|
||||
let tenant = mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(ApiError::NotFound)
|
||||
|
||||
@@ -1107,7 +1107,10 @@ async fn get_active_tenant_with_timeout(
|
||||
tenant_id: TenantId,
|
||||
_ctx: &RequestContext, /* require get a context to support cancellation in the future */
|
||||
) -> Result<Arc<Tenant>, GetActiveTenantError> {
|
||||
let tenant = mgr::get_tenant(tenant_id, false).await?;
|
||||
let tenant = match mgr::get_tenant(tenant_id, false).await {
|
||||
Ok(tenant) => tenant,
|
||||
Err(e) => return Err(GetActiveTenantError::Other(e.into())),
|
||||
};
|
||||
let wait_time = Duration::from_secs(30);
|
||||
match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
|
||||
Ok(Ok(())) => Ok(tenant),
|
||||
|
||||
@@ -222,48 +222,6 @@ impl TenantConfOpt {
|
||||
eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update(&mut self, other: &TenantConfOpt) {
|
||||
if let Some(checkpoint_distance) = other.checkpoint_distance {
|
||||
self.checkpoint_distance = Some(checkpoint_distance);
|
||||
}
|
||||
if let Some(checkpoint_timeout) = other.checkpoint_timeout {
|
||||
self.checkpoint_timeout = Some(checkpoint_timeout);
|
||||
}
|
||||
if let Some(compaction_target_size) = other.compaction_target_size {
|
||||
self.compaction_target_size = Some(compaction_target_size);
|
||||
}
|
||||
if let Some(compaction_period) = other.compaction_period {
|
||||
self.compaction_period = Some(compaction_period);
|
||||
}
|
||||
if let Some(compaction_threshold) = other.compaction_threshold {
|
||||
self.compaction_threshold = Some(compaction_threshold);
|
||||
}
|
||||
if let Some(gc_horizon) = other.gc_horizon {
|
||||
self.gc_horizon = Some(gc_horizon);
|
||||
}
|
||||
if let Some(gc_period) = other.gc_period {
|
||||
self.gc_period = Some(gc_period);
|
||||
}
|
||||
if let Some(image_creation_threshold) = other.image_creation_threshold {
|
||||
self.image_creation_threshold = Some(image_creation_threshold);
|
||||
}
|
||||
if let Some(pitr_interval) = other.pitr_interval {
|
||||
self.pitr_interval = Some(pitr_interval);
|
||||
}
|
||||
if let Some(walreceiver_connect_timeout) = other.walreceiver_connect_timeout {
|
||||
self.walreceiver_connect_timeout = Some(walreceiver_connect_timeout);
|
||||
}
|
||||
if let Some(lagging_wal_timeout) = other.lagging_wal_timeout {
|
||||
self.lagging_wal_timeout = Some(lagging_wal_timeout);
|
||||
}
|
||||
if let Some(max_lsn_wal_lag) = other.max_lsn_wal_lag {
|
||||
self.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||
}
|
||||
if let Some(trace_read_requests) = other.trace_read_requests {
|
||||
self.trace_read_requests = Some(trace_read_requests);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TenantConf {
|
||||
|
||||
@@ -289,7 +289,7 @@ pub async fn set_new_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
new_tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), TenantStateError> {
|
||||
info!("configuring tenant {tenant_id}");
|
||||
let tenant = get_tenant(tenant_id, true).await?;
|
||||
|
||||
@@ -306,16 +306,20 @@ pub async fn set_new_tenant_config(
|
||||
|
||||
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
|
||||
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
|
||||
pub async fn get_tenant(tenant_id: TenantId, active_only: bool) -> anyhow::Result<Arc<Tenant>> {
|
||||
pub async fn get_tenant(
|
||||
tenant_id: TenantId,
|
||||
active_only: bool,
|
||||
) -> Result<Arc<Tenant>, TenantStateError> {
|
||||
let m = TENANTS.read().await;
|
||||
let tenant = m
|
||||
.get(&tenant_id)
|
||||
.with_context(|| format!("Tenant {tenant_id} not found in the local state"))?;
|
||||
.ok_or(TenantStateError::NotFound(tenant_id))?;
|
||||
if active_only && !tenant.is_active() {
|
||||
anyhow::bail!(
|
||||
tracing::warn!(
|
||||
"Tenant {tenant_id} is not active. Current state: {:?}",
|
||||
tenant.current_state()
|
||||
)
|
||||
);
|
||||
Err(TenantStateError::NotActive(tenant_id))
|
||||
} else {
|
||||
Ok(Arc::clone(tenant))
|
||||
}
|
||||
@@ -325,21 +329,28 @@ pub async fn delete_timeline(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
match get_tenant(tenant_id, true).await {
|
||||
Ok(tenant) => {
|
||||
tenant.delete_timeline(timeline_id, ctx).await?;
|
||||
}
|
||||
Err(e) => anyhow::bail!("Cannot access tenant {tenant_id} in local tenant state: {e:?}"),
|
||||
}
|
||||
|
||||
) -> Result<(), TenantStateError> {
|
||||
let tenant = get_tenant(tenant_id, true).await?;
|
||||
tenant.delete_timeline(timeline_id, ctx).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TenantStateError {
|
||||
#[error("Tenant {0} not found")]
|
||||
NotFound(TenantId),
|
||||
#[error("Tenant {0} is stopping")]
|
||||
IsStopping(TenantId),
|
||||
#[error("Tenant {0} is not active")]
|
||||
NotActive(TenantId),
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
pub async fn detach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), TenantStateError> {
|
||||
remove_tenant_from_memory(tenant_id, async {
|
||||
let local_tenant_directory = conf.tenant_path(&tenant_id);
|
||||
fs::remove_dir_all(&local_tenant_directory)
|
||||
@@ -379,7 +390,7 @@ pub async fn load_tenant(
|
||||
pub async fn ignore_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), TenantStateError> {
|
||||
remove_tenant_from_memory(tenant_id, async {
|
||||
let ignore_mark_file = conf.tenant_ignore_mark_file_path(tenant_id);
|
||||
fs::File::create(&ignore_mark_file)
|
||||
@@ -489,7 +500,7 @@ where
|
||||
async fn remove_tenant_from_memory<V, F>(
|
||||
tenant_id: TenantId,
|
||||
tenant_cleanup: F,
|
||||
) -> anyhow::Result<V>
|
||||
) -> Result<V, TenantStateError>
|
||||
where
|
||||
F: std::future::Future<Output = anyhow::Result<V>>,
|
||||
{
|
||||
@@ -505,11 +516,9 @@ where
|
||||
| TenantState::Loading
|
||||
| TenantState::Broken
|
||||
| TenantState::Active => tenant.set_stopping(),
|
||||
TenantState::Stopping => {
|
||||
anyhow::bail!("Tenant {tenant_id} is stopping already")
|
||||
}
|
||||
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
|
||||
},
|
||||
None => anyhow::bail!("Tenant not found for id {tenant_id}"),
|
||||
None => return Err(TenantStateError::NotFound(tenant_id)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -532,10 +541,15 @@ where
|
||||
Err(e) => {
|
||||
let tenants_accessor = TENANTS.read().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => tenant.set_broken(&e.to_string()),
|
||||
None => warn!("Tenant {tenant_id} got removed from memory"),
|
||||
Some(tenant) => {
|
||||
tenant.set_broken(&e.to_string());
|
||||
}
|
||||
None => {
|
||||
warn!("Tenant {tenant_id} got removed from memory");
|
||||
return Err(TenantStateError::NotFound(tenant_id));
|
||||
}
|
||||
}
|
||||
Err(e)
|
||||
Err(TenantStateError::Other(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -555,7 +569,7 @@ pub async fn immediate_gc(
|
||||
let tenant = guard
|
||||
.get(&tenant_id)
|
||||
.map(Arc::clone)
|
||||
.with_context(|| format!("Tenant {tenant_id} not found"))
|
||||
.with_context(|| format!("tenant {tenant_id}"))
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
|
||||
@@ -605,7 +619,7 @@ pub async fn immediate_compact(
|
||||
let tenant = guard
|
||||
.get(&tenant_id)
|
||||
.map(Arc::clone)
|
||||
.with_context(|| format!("Tenant {tenant_id} not found"))
|
||||
.with_context(|| format!("tenant {tenant_id}"))
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let timeline = tenant
|
||||
|
||||
@@ -1246,6 +1246,11 @@ impl Timeline {
|
||||
|
||||
state,
|
||||
};
|
||||
info!(
|
||||
"initialized lsrlsn to {}/{}",
|
||||
disk_consistent_lsn,
|
||||
metadata.prev_record_lsn().unwrap_or(Lsn(0))
|
||||
);
|
||||
result.repartition_threshold = result.get_checkpoint_distance() / 10;
|
||||
result
|
||||
.metrics
|
||||
@@ -1335,6 +1340,7 @@ impl Timeline {
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
|
||||
self.conf.availability_zone.clone(),
|
||||
background_ctx,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -45,6 +45,7 @@ pub fn spawn_connection_manager_task(
|
||||
lagging_wal_timeout: Duration,
|
||||
max_lsn_wal_lag: NonZeroU64,
|
||||
auth_token: Option<Arc<String>>,
|
||||
availability_zone: Option<String>,
|
||||
ctx: RequestContext,
|
||||
) {
|
||||
let mut broker_client = get_broker_client().clone();
|
||||
@@ -67,6 +68,7 @@ pub fn spawn_connection_manager_task(
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
auth_token,
|
||||
availability_zone,
|
||||
);
|
||||
loop {
|
||||
select! {
|
||||
@@ -334,6 +336,7 @@ struct WalreceiverState {
|
||||
/// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
|
||||
wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
|
||||
auth_token: Option<Arc<String>>,
|
||||
availability_zone: Option<String>,
|
||||
}
|
||||
|
||||
/// Current connection data.
|
||||
@@ -381,6 +384,7 @@ impl WalreceiverState {
|
||||
lagging_wal_timeout: Duration,
|
||||
max_lsn_wal_lag: NonZeroU64,
|
||||
auth_token: Option<Arc<String>>,
|
||||
availability_zone: Option<String>,
|
||||
) -> Self {
|
||||
let id = TenantTimelineId {
|
||||
tenant_id: timeline.tenant_id,
|
||||
@@ -396,6 +400,7 @@ impl WalreceiverState {
|
||||
wal_stream_candidates: HashMap::new(),
|
||||
wal_connection_retries: HashMap::new(),
|
||||
auth_token,
|
||||
availability_zone,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -740,6 +745,7 @@ impl WalreceiverState {
|
||||
None => None,
|
||||
Some(x) => Some(x),
|
||||
},
|
||||
self.availability_zone.as_deref(),
|
||||
) {
|
||||
Ok(connstr) => Some((*sk_id, info, connstr)),
|
||||
Err(e) => {
|
||||
@@ -824,17 +830,24 @@ fn wal_stream_connection_config(
|
||||
}: TenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
auth_token: Option<&str>,
|
||||
availability_zone: Option<&str>,
|
||||
) -> anyhow::Result<PgConnectionConfig> {
|
||||
let (host, port) =
|
||||
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
let port = port.unwrap_or(5432);
|
||||
Ok(PgConnectionConfig::new_host_port(host, port)
|
||||
let mut connstr = PgConnectionConfig::new_host_port(host, port)
|
||||
.extend_options([
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", timeline_id),
|
||||
format!("tenant_id={}", tenant_id),
|
||||
])
|
||||
.set_password(auth_token.map(|s| s.to_owned())))
|
||||
.set_password(auth_token.map(|s| s.to_owned()));
|
||||
|
||||
if let Some(availability_zone) = availability_zone {
|
||||
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
|
||||
}
|
||||
|
||||
Ok(connstr)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -1273,6 +1286,7 @@ mod tests {
|
||||
wal_stream_candidates: HashMap::new(),
|
||||
wal_connection_retries: HashMap::new(),
|
||||
auth_token: None,
|
||||
availability_zone: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
19
poetry.lock
generated
19
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 1.4.0 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 1.4.1 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohttp"
|
||||
@@ -1932,6 +1932,21 @@ pytest = [
|
||||
{version = ">=6.2.4", markers = "python_version >= \"3.10\""},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pytest-repeat"
|
||||
version = "0.9.1"
|
||||
description = "pytest plugin for repeating tests"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
|
||||
files = [
|
||||
{file = "pytest-repeat-0.9.1.tar.gz", hash = "sha256:5cd3289745ab3156d43eb9c8e7f7d00a926f3ae5c9cf425bec649b2fe15bad5b"},
|
||||
{file = "pytest_repeat-0.9.1-py2.py3-none-any.whl", hash = "sha256:4474a7d9e9137f6d8cc8ae297f8c4168d33c56dd740aa78cfffe562557e6b96e"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
pytest = ">=3.6"
|
||||
|
||||
[[package]]
|
||||
name = "pytest-timeout"
|
||||
version = "2.1.0"
|
||||
@@ -2597,4 +2612,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "2515a9320c2960076012fbc036fb33c4f6a23515c8d143785931dc18c6722d91"
|
||||
content-hash = "ce00f0cf8735e850b1cfb29d8ddf8f66fab2395c589fe7e16becc774edcc7845"
|
||||
|
||||
@@ -16,7 +16,7 @@ use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCou
|
||||
use once_cell::sync::Lazy;
|
||||
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tracing::{error, info, warn};
|
||||
use utils::measured_stream::MeasuredStream;
|
||||
|
||||
@@ -209,9 +209,18 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
if let Some(tls) = tls.take() {
|
||||
// Upgrade raw stream into a secure TLS-backed stream.
|
||||
// NOTE: We've consumed `tls`; this fact will be used later.
|
||||
stream = PqStream::new(
|
||||
stream.into_inner().upgrade(tls.to_server_config()).await?,
|
||||
);
|
||||
|
||||
let (raw, read_buf) = stream.into_inner();
|
||||
// TODO: Normally, client doesn't send any data before
|
||||
// server says TLS handshake is ok and read_buf is empy.
|
||||
// However, you could imagine pipelining of postgres
|
||||
// SSLRequest + TLS ClientHello in one hunk similar to
|
||||
// pipelining in our node js driver. We should probably
|
||||
// support that by chaining read_buf with the stream.
|
||||
if !read_buf.is_empty() {
|
||||
bail!("data is sent before server replied with EncryptionResponse");
|
||||
}
|
||||
stream = PqStream::new(raw.upgrade(tls.to_server_config()).await?);
|
||||
}
|
||||
}
|
||||
_ => bail!(ERR_PROTO_VIOLATION),
|
||||
@@ -443,11 +452,17 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
value: mut node_info,
|
||||
} = auth_result;
|
||||
|
||||
let node = connect_to_compute(&mut node_info, params, &extra, &creds)
|
||||
let mut node = connect_to_compute(&mut node_info, params, &extra, &creds)
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
|
||||
proxy_pass(stream.into_inner(), node.stream, &node_info.aux).await
|
||||
// Before proxy passing, forward to compute whatever data is left in the
|
||||
// PqStream input buffer. Normally there is none, but our serverless npm
|
||||
// driver in pipeline mode sends startup, password and first query
|
||||
// immediately after opening the connection.
|
||||
let (stream, read_buf) = stream.into_inner();
|
||||
node.stream.write_all(&read_buf).await?;
|
||||
proxy_pass(stream, node.stream, &node_info.aux).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::error::UserFacingError;
|
||||
use anyhow::bail;
|
||||
use bytes::BytesMut;
|
||||
use pin_project_lite::pin_project;
|
||||
use pq_proto::framed::{ConnectionError, Framed};
|
||||
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ProtocolError};
|
||||
@@ -27,8 +28,8 @@ impl<S> PqStream<S> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract the underlying stream.
|
||||
pub fn into_inner(self) -> S {
|
||||
/// Extract the underlying stream and read buffer.
|
||||
pub fn into_inner(self) -> (S, BytesMut) {
|
||||
self.framed.into_inner()
|
||||
}
|
||||
|
||||
|
||||
@@ -34,6 +34,7 @@ types-psutil = "^5.9.5.4"
|
||||
types-toml = "^0.10.8"
|
||||
pytest-httpserver = "^1.0.6"
|
||||
aiohttp = "3.7.4"
|
||||
pytest-repeat = "^0.9.1"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
black = "^23.1.0"
|
||||
|
||||
@@ -71,6 +71,9 @@ struct Args {
|
||||
/// Listen http endpoint for management and metrics in the form host:port.
|
||||
#[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)]
|
||||
listen_http: String,
|
||||
/// Availability zone of the safekeeper.
|
||||
#[arg(long)]
|
||||
availability_zone: Option<String>,
|
||||
/// Do not wait for changes to be written safely to disk. Unsafe.
|
||||
#[arg(short, long)]
|
||||
no_sync: bool,
|
||||
@@ -166,6 +169,7 @@ fn main() -> anyhow::Result<()> {
|
||||
my_id: id,
|
||||
listen_pg_addr: args.listen_pg,
|
||||
listen_http_addr: args.listen_http,
|
||||
availability_zone: args.availability_zone,
|
||||
no_sync: args.no_sync,
|
||||
broker_endpoint: args.broker_endpoint,
|
||||
broker_keepalive_interval: args.broker_keepalive_interval,
|
||||
|
||||
@@ -9,6 +9,7 @@ use tracing::{info, info_span, Instrument};
|
||||
use crate::auth::check_permission;
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
|
||||
use crate::metrics::TrafficMetrics;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
use postgres_backend::QueryError;
|
||||
@@ -33,6 +34,7 @@ pub struct SafekeeperPostgresHandler {
|
||||
/// Unique connection id is logged in spans for observability.
|
||||
pub conn_id: ConnectionId,
|
||||
claims: Option<Claims>,
|
||||
io_metrics: Option<TrafficMetrics>,
|
||||
}
|
||||
|
||||
/// Parsed Postgres command.
|
||||
@@ -94,6 +96,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
format!("Failed to parse {value} as timeline id")
|
||||
})?);
|
||||
}
|
||||
Some(("availability_zone", client_az)) => {
|
||||
if let Some(metrics) = self.io_metrics.as_ref() {
|
||||
metrics.set_client_az(client_az)
|
||||
}
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
@@ -101,6 +108,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
self.appname = Some(app_name.to_owned());
|
||||
if let Some(metrics) = self.io_metrics.as_ref() {
|
||||
metrics.set_app_name(app_name)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -187,7 +197,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
}
|
||||
|
||||
impl SafekeeperPostgresHandler {
|
||||
pub fn new(conf: SafeKeeperConf, conn_id: u32) -> Self {
|
||||
pub fn new(conf: SafeKeeperConf, conn_id: u32, io_metrics: Option<TrafficMetrics>) -> Self {
|
||||
SafekeeperPostgresHandler {
|
||||
conf,
|
||||
appname: None,
|
||||
@@ -196,6 +206,7 @@ impl SafekeeperPostgresHandler {
|
||||
ttid: TenantTimelineId::empty(),
|
||||
conn_id,
|
||||
claims: None,
|
||||
io_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -52,6 +52,7 @@ pub struct SafeKeeperConf {
|
||||
pub my_id: NodeId,
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub availability_zone: Option<String>,
|
||||
pub no_sync: bool,
|
||||
pub broker_endpoint: Uri,
|
||||
pub broker_keepalive_interval: Duration,
|
||||
@@ -82,6 +83,7 @@ impl SafeKeeperConf {
|
||||
no_sync: false,
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
availability_zone: None,
|
||||
remote_storage: None,
|
||||
my_id: NodeId(0),
|
||||
broker_endpoint: storage_broker::DEFAULT_ENDPOINT
|
||||
|
||||
@@ -1,15 +1,19 @@
|
||||
//! Global safekeeper mertics and per-timeline safekeeper metrics.
|
||||
|
||||
use std::time::{Instant, SystemTime};
|
||||
use std::{
|
||||
sync::{Arc, RwLock},
|
||||
time::{Instant, SystemTime},
|
||||
};
|
||||
|
||||
use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
|
||||
use anyhow::Result;
|
||||
use metrics::{
|
||||
core::{AtomicU64, Collector, Desc, GenericGaugeVec, Opts},
|
||||
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
|
||||
proto::MetricFamily,
|
||||
register_int_counter_vec, Gauge, IntCounterVec, IntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use postgres_ffi::XLogSegNo;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
@@ -63,13 +67,132 @@ pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
|
||||
});
|
||||
pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"safekeeper_pg_io_bytes",
|
||||
"safekeeper_pg_io_bytes_total",
|
||||
"Bytes read from or written to any PostgreSQL connection",
|
||||
&["direction"]
|
||||
&["client_az", "sk_az", "app_name", "dir", "same_az"]
|
||||
)
|
||||
.expect("Failed to register safekeeper_pg_io_bytes gauge")
|
||||
});
|
||||
|
||||
pub const LABEL_UNKNOWN: &str = "unknown";
|
||||
|
||||
/// Labels for traffic metrics.
|
||||
#[derive(Clone)]
|
||||
struct ConnectionLabels {
|
||||
/// Availability zone of the connection origin.
|
||||
client_az: String,
|
||||
/// Availability zone of the current safekeeper.
|
||||
sk_az: String,
|
||||
/// Client application name.
|
||||
app_name: String,
|
||||
}
|
||||
|
||||
impl ConnectionLabels {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
client_az: LABEL_UNKNOWN.to_string(),
|
||||
sk_az: LABEL_UNKNOWN.to_string(),
|
||||
app_name: LABEL_UNKNOWN.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_metrics(
|
||||
&self,
|
||||
) -> (
|
||||
GenericCounter<metrics::core::AtomicU64>,
|
||||
GenericCounter<metrics::core::AtomicU64>,
|
||||
) {
|
||||
let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
|
||||
(LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
|
||||
(client_az, sk_az) => {
|
||||
if client_az == sk_az {
|
||||
"true"
|
||||
} else {
|
||||
"false"
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let read = PG_IO_BYTES.with_label_values(&[
|
||||
&self.client_az,
|
||||
&self.sk_az,
|
||||
&self.app_name,
|
||||
"read",
|
||||
same_az,
|
||||
]);
|
||||
let write = PG_IO_BYTES.with_label_values(&[
|
||||
&self.client_az,
|
||||
&self.sk_az,
|
||||
&self.app_name,
|
||||
"write",
|
||||
same_az,
|
||||
]);
|
||||
(read, write)
|
||||
}
|
||||
}
|
||||
|
||||
struct TrafficMetricsState {
|
||||
/// Labels for traffic metrics.
|
||||
labels: ConnectionLabels,
|
||||
/// Total bytes read from this connection.
|
||||
read: GenericCounter<metrics::core::AtomicU64>,
|
||||
/// Total bytes written to this connection.
|
||||
write: GenericCounter<metrics::core::AtomicU64>,
|
||||
}
|
||||
|
||||
/// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
|
||||
#[derive(Clone)]
|
||||
pub struct TrafficMetrics {
|
||||
state: Arc<RwLock<TrafficMetricsState>>,
|
||||
}
|
||||
|
||||
impl Default for TrafficMetrics {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl TrafficMetrics {
|
||||
pub fn new() -> Self {
|
||||
let labels = ConnectionLabels::new();
|
||||
let (read, write) = labels.build_metrics();
|
||||
let state = TrafficMetricsState {
|
||||
labels,
|
||||
read,
|
||||
write,
|
||||
};
|
||||
Self {
|
||||
state: Arc::new(RwLock::new(state)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_client_az(&self, value: &str) {
|
||||
let mut state = self.state.write().unwrap();
|
||||
state.labels.client_az = value.to_string();
|
||||
(state.read, state.write) = state.labels.build_metrics();
|
||||
}
|
||||
|
||||
pub fn set_sk_az(&self, value: &str) {
|
||||
let mut state = self.state.write().unwrap();
|
||||
state.labels.sk_az = value.to_string();
|
||||
(state.read, state.write) = state.labels.build_metrics();
|
||||
}
|
||||
|
||||
pub fn set_app_name(&self, value: &str) {
|
||||
let mut state = self.state.write().unwrap();
|
||||
state.labels.app_name = value.to_string();
|
||||
(state.read, state.write) = state.labels.build_metrics();
|
||||
}
|
||||
|
||||
pub fn observe_read(&self, cnt: usize) {
|
||||
self.state.read().unwrap().read.inc_by(cnt as u64)
|
||||
}
|
||||
|
||||
pub fn observe_write(&self, cnt: usize) {
|
||||
self.state.read().unwrap().write.inc_by(cnt as u64)
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for WalStorage in a single timeline.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct WalStorageMetrics {
|
||||
|
||||
@@ -9,8 +9,9 @@ use tokio::net::TcpStream;
|
||||
use tracing::*;
|
||||
use utils::measured_stream::MeasuredStream;
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::metrics::TrafficMetrics;
|
||||
use crate::SafeKeeperConf;
|
||||
use crate::{handler::SafekeeperPostgresHandler, metrics::PG_IO_BYTES};
|
||||
use postgres_backend::{AuthType, PostgresBackend};
|
||||
|
||||
/// Accept incoming TCP connections and spawn them into a background thread.
|
||||
@@ -68,20 +69,21 @@ fn handle_socket(
|
||||
.build()?;
|
||||
let local = tokio::task::LocalSet::new();
|
||||
|
||||
let read_metrics = PG_IO_BYTES.with_label_values(&["read"]);
|
||||
let write_metrics = PG_IO_BYTES.with_label_values(&["write"]);
|
||||
|
||||
socket.set_nodelay(true)?;
|
||||
let peer_addr = socket.peer_addr()?;
|
||||
|
||||
// TODO: measure cross-az traffic
|
||||
let traffic_metrics = TrafficMetrics::new();
|
||||
if let Some(current_az) = conf.availability_zone.as_deref() {
|
||||
traffic_metrics.set_sk_az(current_az);
|
||||
}
|
||||
|
||||
let socket = MeasuredStream::new(
|
||||
socket,
|
||||
|cnt| {
|
||||
read_metrics.inc_by(cnt as u64);
|
||||
traffic_metrics.observe_read(cnt);
|
||||
},
|
||||
|cnt| {
|
||||
write_metrics.inc_by(cnt as u64);
|
||||
traffic_metrics.observe_write(cnt);
|
||||
},
|
||||
);
|
||||
|
||||
@@ -89,7 +91,8 @@ fn handle_socket(
|
||||
None => AuthType::Trust,
|
||||
Some(_) => AuthType::NeonJWT,
|
||||
};
|
||||
let mut conn_handler = SafekeeperPostgresHandler::new(conf, conn_id);
|
||||
let mut conn_handler =
|
||||
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()));
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
// libpq protocol between safekeeper and walproposer / pageserver
|
||||
// We don't use shutdown.
|
||||
|
||||
@@ -77,7 +77,7 @@ DEFAULT_BRANCH_NAME: str = "main"
|
||||
DEFAULT_PG_VERSION_DEFAULT: str = "14"
|
||||
|
||||
BASE_PORT: int = 15000
|
||||
WORKER_PORT_NUM: int = 1000
|
||||
WORKER_PORT_NUM: int = 10000
|
||||
|
||||
|
||||
def pytest_configure(config: Config):
|
||||
@@ -924,7 +924,8 @@ class NeonEnv:
|
||||
pg=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
)
|
||||
pageserver_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
|
||||
toml += textwrap.dedent(
|
||||
f"""
|
||||
@@ -932,7 +933,8 @@ class NeonEnv:
|
||||
id=1
|
||||
listen_pg_addr = 'localhost:{pageserver_port.pg}'
|
||||
listen_http_addr = 'localhost:{pageserver_port.http}'
|
||||
auth_type = '{pageserver_auth_type}'
|
||||
pg_auth_type = '{pg_auth_type}'
|
||||
http_auth_type = '{http_auth_type}'
|
||||
"""
|
||||
)
|
||||
|
||||
@@ -2084,8 +2086,8 @@ class NeonPageserver(PgProtocol):
|
||||
# https://github.com/neondatabase/neon/issues/2442
|
||||
".*could not remove ephemeral file.*No such file or directory.*",
|
||||
# FIXME: These need investigation
|
||||
".*gc_loop.*Failed to get a tenant .* Tenant .* not found in the local state.*",
|
||||
".*compaction_loop.*Failed to get a tenant .* Tenant .* not found in the local state.*",
|
||||
".*gc_loop.*Failed to get a tenant .* Tenant .* not found.*",
|
||||
".*compaction_loop.*Failed to get a tenant .* Tenant .* not found.*",
|
||||
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
|
||||
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
|
||||
".*Removing intermediate uninit mark file.*",
|
||||
|
||||
@@ -149,7 +149,7 @@ def get_scale_for_db(size_mb: int) -> int:
|
||||
|
||||
|
||||
ATTACHMENT_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
|
||||
r"regression\.diffs|.+\.(?:log|stderr|stdout|filediff|metrics|html)"
|
||||
r"config|postgresql\.conf|regression\.diffs|.+\.(?:log|stderr|stdout|filediff|metrics|html)"
|
||||
)
|
||||
|
||||
|
||||
|
||||
10
test_runner/pg_clients/README.md
Normal file
10
test_runner/pg_clients/README.md
Normal file
@@ -0,0 +1,10 @@
|
||||
# pg_clients
|
||||
|
||||
To run a single test locally:
|
||||
|
||||
```bash
|
||||
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
|
||||
|
||||
# will filter only tests with "serverless" in the name
|
||||
./scripts/pytest -m remote_cluster -k serverless
|
||||
```
|
||||
@@ -22,6 +22,7 @@ from fixtures.utils import subprocess_capture
|
||||
),
|
||||
"swift/PostgresNIOExample",
|
||||
"typescript/postgresql-client",
|
||||
"typescript/serverless-driver",
|
||||
],
|
||||
)
|
||||
def test_pg_clients(test_output_dir: Path, remote_pg: RemotePostgres, client: str):
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
node_modules/
|
||||
1
test_runner/pg_clients/typescript/serverless-driver/.gitignore
vendored
Normal file
1
test_runner/pg_clients/typescript/serverless-driver/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
node_modules/
|
||||
@@ -0,0 +1,7 @@
|
||||
FROM node:18
|
||||
WORKDIR /source
|
||||
|
||||
COPY . .
|
||||
RUN npm clean-install
|
||||
|
||||
CMD ["/source/index.js"]
|
||||
20
test_runner/pg_clients/typescript/serverless-driver/index.js
Executable file
20
test_runner/pg_clients/typescript/serverless-driver/index.js
Executable file
@@ -0,0 +1,20 @@
|
||||
#! /usr/bin/env node
|
||||
|
||||
import { Client } from '@neondatabase/serverless'
|
||||
|
||||
(async () => {
|
||||
const client = new Client({
|
||||
host: process.env.NEON_HOST,
|
||||
database: process.env.NEON_DATABASE,
|
||||
user: process.env.NEON_USER,
|
||||
password: process.env.NEON_PASSWORD,
|
||||
});
|
||||
client.connect();
|
||||
const result = await client.query({
|
||||
text: 'select 1',
|
||||
rowMode: 'array',
|
||||
});
|
||||
const rows = result.rows;
|
||||
await client.end();
|
||||
console.log(rows[0][0]);
|
||||
})()
|
||||
51
test_runner/pg_clients/typescript/serverless-driver/package-lock.json
generated
Normal file
51
test_runner/pg_clients/typescript/serverless-driver/package-lock.json
generated
Normal file
@@ -0,0 +1,51 @@
|
||||
{
|
||||
"name": "serverless-driver",
|
||||
"lockfileVersion": 2,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"dependencies": {
|
||||
"@neondatabase/serverless": "^0.2.8",
|
||||
"ws": "^8.13.0"
|
||||
}
|
||||
},
|
||||
"node_modules/@neondatabase/serverless": {
|
||||
"version": "0.2.8",
|
||||
"resolved": "https://registry.npmjs.org/@neondatabase/serverless/-/serverless-0.2.8.tgz",
|
||||
"integrity": "sha512-+yWjIOJsFnrtt2xvtLVEzWM2lfvemawk/DBg4mD2cZOF/IC6Jn4wEctZyk60TscZMSxfozNkPoxmZvBmNuQ0vA=="
|
||||
},
|
||||
"node_modules/ws": {
|
||||
"version": "8.13.0",
|
||||
"resolved": "https://registry.npmjs.org/ws/-/ws-8.13.0.tgz",
|
||||
"integrity": "sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==",
|
||||
"engines": {
|
||||
"node": ">=10.0.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"bufferutil": "^4.0.1",
|
||||
"utf-8-validate": ">=5.0.2"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"bufferutil": {
|
||||
"optional": true
|
||||
},
|
||||
"utf-8-validate": {
|
||||
"optional": true
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"dependencies": {
|
||||
"@neondatabase/serverless": {
|
||||
"version": "0.2.8",
|
||||
"resolved": "https://registry.npmjs.org/@neondatabase/serverless/-/serverless-0.2.8.tgz",
|
||||
"integrity": "sha512-+yWjIOJsFnrtt2xvtLVEzWM2lfvemawk/DBg4mD2cZOF/IC6Jn4wEctZyk60TscZMSxfozNkPoxmZvBmNuQ0vA=="
|
||||
},
|
||||
"ws": {
|
||||
"version": "8.13.0",
|
||||
"resolved": "https://registry.npmjs.org/ws/-/ws-8.13.0.tgz",
|
||||
"integrity": "sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==",
|
||||
"requires": {}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"@neondatabase/serverless": "^0.2.8",
|
||||
"ws": "^8.13.0"
|
||||
}
|
||||
}
|
||||
@@ -136,6 +136,7 @@ def test_backward_compatibility(
|
||||
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.order(after="test_create_snapshot")
|
||||
@pytest.mark.repeat(1000)
|
||||
def test_forward_compatibility(
|
||||
test_output_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
@@ -246,6 +247,13 @@ def prepare_snapshot(
|
||||
if get_neon_version(neon_binpath) == "49da498f651b9f3a53b56c7c0697636d880ddfe0":
|
||||
pageserver_config["broker_endpoints"] = etcd_broker_endpoints # old etcd version
|
||||
|
||||
# Older pageserver versions had just one `auth_type` setting. Now there
|
||||
# are separate settings for pg and http ports. We don't use authentication
|
||||
# in compatibility tests so just remove authentication related settings.
|
||||
pageserver_config.pop("auth_type", None)
|
||||
pageserver_config.pop("pg_auth_type", None)
|
||||
pageserver_config.pop("http_auth_type", None)
|
||||
|
||||
if pg_distrib_dir:
|
||||
pageserver_config["pg_distrib_dir"] = str(pg_distrib_dir)
|
||||
|
||||
|
||||
@@ -17,6 +17,12 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
|
||||
n_restarts = 10
|
||||
scale = 10
|
||||
|
||||
# the background task may complete the init task delay after finding an
|
||||
# active tenant, but shutdown starts right before Tenant::gc_iteration
|
||||
env.pageserver.allowed_errors.append(
|
||||
r".*Gc failed, retrying in \S+: Cannot run GC iteration on inactive tenant"
|
||||
)
|
||||
|
||||
def run_pgbench(pg: Postgres):
|
||||
connstr = pg.connstr()
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
|
||||
@@ -225,7 +225,7 @@ def test_tenant_reattach_while_busy(
|
||||
|
||||
# Attempts to connect from compute to pageserver while the tenant is
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(".*Tenant .* not found in the local state.*")
|
||||
env.pageserver.allowed_errors.append(".*Tenant .* not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Tenant .* will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
@@ -257,18 +257,18 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
env.pageserver.allowed_errors.append(".*NotFound: Tenant .* not found")
|
||||
env.pageserver.allowed_errors.append(".*NotFound: Tenant .*")
|
||||
|
||||
# first check for non existing tenant
|
||||
tenant_id = TenantId.generate()
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException,
|
||||
match=f"Tenant not found for id {tenant_id}",
|
||||
match=f"NotFound: tenant {tenant_id}",
|
||||
):
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
|
||||
# the error will be printed to the log too
|
||||
env.pageserver.allowed_errors.append(".*Tenant not found for id.*")
|
||||
env.pageserver.allowed_errors.append(".*NotFound: tenant *")
|
||||
|
||||
# create new nenant
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
@@ -294,7 +294,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# the error will be printed to the log too
|
||||
env.pageserver.allowed_errors.append(".*gc target timeline does not exist.*")
|
||||
# Timelines get stopped during detach, ignore the gc calls that error, whitnessing that
|
||||
# Timelines get stopped during detach, ignore the gc calls that error, witnessing that
|
||||
env.pageserver.allowed_errors.append(".*InternalServerError\\(timeline is Stopping.*")
|
||||
|
||||
# Detach while running manual GC.
|
||||
@@ -320,7 +320,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
assert not (env.repo_dir / "tenants" / str(tenant_id)).exists()
|
||||
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException, match=f"Tenant {tenant_id} not found"
|
||||
expected_exception=PageserverApiException, match=f"NotFound: tenant {tenant_id}"
|
||||
):
|
||||
pageserver_http.timeline_gc(tenant_id, timeline_id, 0)
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ import shutil
|
||||
import time
|
||||
from contextlib import closing
|
||||
from datetime import datetime
|
||||
from itertools import chain
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
@@ -87,6 +88,7 @@ def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.pageserver_config_override = "availability_zone='test_ps_az'"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_1, _ = env.neon_cli.create_tenant()
|
||||
@@ -122,6 +124,17 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
ps_metrics = all_metrics[0]
|
||||
sk_metrics = all_metrics[1:]
|
||||
|
||||
# Find all metrics among all safekeepers, accepts the same arguments as query_all()
|
||||
def query_all_safekeepers(name, filter):
|
||||
return list(
|
||||
chain.from_iterable(
|
||||
map(
|
||||
lambda sk: sk.query_all(name, filter),
|
||||
sk_metrics,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
ttids = [
|
||||
{"tenant_id": str(tenant_1), "timeline_id": str(timeline_1)},
|
||||
{"tenant_id": str(tenant_2), "timeline_id": str(timeline_2)},
|
||||
@@ -162,6 +175,40 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
f"process_start_time_seconds (UTC): {datetime.fromtimestamp(metrics.query_one('process_start_time_seconds').value)}"
|
||||
)
|
||||
|
||||
for io_direction in ["read", "write"]:
|
||||
# Querying all metrics for number of bytes read/written by pageserver in another AZ
|
||||
io_metrics = query_all_safekeepers(
|
||||
"safekeeper_pg_io_bytes_total",
|
||||
{
|
||||
"app_name": "pageserver",
|
||||
"client_az": "test_ps_az",
|
||||
"dir": io_direction,
|
||||
"same_az": "false",
|
||||
},
|
||||
)
|
||||
total_bytes = sum(int(metric.value) for metric in io_metrics)
|
||||
log.info(f"Pageserver {io_direction} bytes from another AZ: {total_bytes}")
|
||||
# We expect some bytes to be read/written, to make sure metrics are working
|
||||
assert total_bytes > 0
|
||||
|
||||
# Test (a subset of) safekeeper global metrics
|
||||
for sk_m in sk_metrics:
|
||||
# Test that every safekeeper has read some bytes
|
||||
assert any(
|
||||
map(
|
||||
lambda x: x.value > 0,
|
||||
sk_m.query_all("safekeeper_pg_io_bytes_total", {"dir": "read"}),
|
||||
)
|
||||
), f"{sk_m.name} has not read bytes"
|
||||
|
||||
# Test that every safekeeper has written some bytes
|
||||
assert any(
|
||||
map(
|
||||
lambda x: x.value > 0,
|
||||
sk_m.query_all("safekeeper_pg_io_bytes_total", {"dir": "write"}),
|
||||
)
|
||||
), f"{sk_m.name} has not written bytes"
|
||||
|
||||
# Test (a subset of) pageserver global metrics
|
||||
for metric in PAGESERVER_GLOBAL_METRICS:
|
||||
ps_samples = ps_metrics.query_all(metric, {})
|
||||
|
||||
@@ -10,7 +10,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
|
||||
env.pageserver.allowed_errors.append(".*Timeline .* was not found.*")
|
||||
env.pageserver.allowed_errors.append(".*timeline not found.*")
|
||||
env.pageserver.allowed_errors.append(".*Cannot delete timeline which has child timelines.*")
|
||||
env.pageserver.allowed_errors.append(".*Tenant .* not found in the local state.*")
|
||||
env.pageserver.allowed_errors.append(".*NotFound: tenant .*")
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
@@ -24,7 +24,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
|
||||
invalid_tenant_id = TenantId.generate()
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match=f"Tenant {invalid_tenant_id} not found in the local state",
|
||||
match=f"NotFound: tenant {invalid_tenant_id}",
|
||||
):
|
||||
ps_http.timeline_delete(tenant_id=invalid_tenant_id, timeline_id=invalid_timeline_id)
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ def assert_child_processes(pageserver_pid, wal_redo_present=False, defunct_prese
|
||||
def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
# We intentionally test for a non-existent tenant.
|
||||
env.pageserver.allowed_errors.append(".*Tenant not found.*")
|
||||
env.pageserver.allowed_errors.append(".*NotFound: tenant.*")
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
pagserver_pid = int((env.repo_dir / "pageserver.pid").read_text())
|
||||
@@ -34,7 +34,7 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_id = TenantId.generate()
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException,
|
||||
match=f"Tenant not found for id {tenant_id}",
|
||||
match=f"NotFound: tenant {tenant_id}",
|
||||
):
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
|
||||
|
||||
12
vm-cgconfig.conf
Normal file
12
vm-cgconfig.conf
Normal file
@@ -0,0 +1,12 @@
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
group neon-postgres {
|
||||
perm {
|
||||
admin {
|
||||
uid = vm-informant;
|
||||
}
|
||||
task {
|
||||
gid = users;
|
||||
}
|
||||
}
|
||||
memory {}
|
||||
}
|
||||
Reference in New Issue
Block a user