mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 20:40:37 +00:00
Compare commits
8 Commits
heikki/psq
...
sergey/no-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b9b66a714a | ||
|
|
dbbe032c39 | ||
|
|
cb9473928d | ||
|
|
fa20e37574 | ||
|
|
4911d7ce6f | ||
|
|
e83684b868 | ||
|
|
afbbc61036 | ||
|
|
7ba5c286b7 |
8
.github/ansible/deploy.yaml
vendored
8
.github/ansible/deploy.yaml
vendored
@@ -126,8 +126,8 @@
|
||||
shell:
|
||||
cmd: |
|
||||
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
|
||||
curl -sfS -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" {{ console_mgmt_base_url }}/management/api/v2/pageservers/$INSTANCE_ID | jq '.version = {{ current_version }}' > /tmp/new_version
|
||||
curl -sfS -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" -H "Content-Type: application/json" -X POST -d@/tmp/new_version {{ console_mgmt_base_url }}/management/api/v2/pageservers
|
||||
curl -sfS {{ console_mgmt_base_url }}/management/api/v2/pageservers/$INSTANCE_ID | jq '.version = {{ current_version }}' > /tmp/new_version
|
||||
curl -sfS -H "Content-Type: application/json" -X POST -d@/tmp/new_version {{ console_mgmt_base_url }}/management/api/v2/pageservers
|
||||
tags:
|
||||
- pageserver
|
||||
|
||||
@@ -205,7 +205,7 @@
|
||||
shell:
|
||||
cmd: |
|
||||
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
|
||||
curl -sfS -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" {{ console_mgmt_base_url }}/management/api/v2/safekeepers/$INSTANCE_ID | jq '.version = {{ current_version }}' > /tmp/new_version
|
||||
curl -sfS -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" -H "Content-Type: application/json" -X POST -d@/tmp/new_version {{ console_mgmt_base_url }}/management/api/v2/safekeepers
|
||||
curl -sfS {{ console_mgmt_base_url }}/management/api/v2/safekeepers/$INSTANCE_ID | jq '.version = {{ current_version }}' > /tmp/new_version
|
||||
curl -sfS -H "Content-Type: application/json" -X POST -d@/tmp/new_version {{ console_mgmt_base_url }}/management/api/v2/safekeepers
|
||||
tags:
|
||||
- safekeeper
|
||||
|
||||
4
.github/ansible/scripts/init_pageserver.sh
vendored
4
.github/ansible/scripts/init_pageserver.sh
vendored
@@ -27,10 +27,10 @@ cat <<EOF | tee /tmp/payload
|
||||
EOF
|
||||
|
||||
# check if pageserver already registered or not
|
||||
if ! curl -sf -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" {{ console_mgmt_base_url }}/management/api/v2/pageservers/${INSTANCE_ID} -o /dev/null; then
|
||||
if ! curl -sf {{ console_mgmt_base_url }}/management/api/v2/pageservers/${INSTANCE_ID} -o /dev/null; then
|
||||
|
||||
# not registered, so register it now
|
||||
ID=$(curl -sf -X POST -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" -H "Content-Type: application/json" {{ console_mgmt_base_url }}/management/api/v2/pageservers -d@/tmp/payload | jq -r '.id')
|
||||
ID=$(curl -sf -X POST -H "Content-Type: application/json" {{ console_mgmt_base_url }}/management/api/v2/pageservers -d@/tmp/payload | jq -r '.id')
|
||||
|
||||
# init pageserver
|
||||
sudo -u pageserver /usr/local/bin/pageserver -c "id=${ID}" -c "pg_distrib_dir='/usr/local'" --init -D /storage/pageserver/data
|
||||
|
||||
4
.github/ansible/scripts/init_safekeeper.sh
vendored
4
.github/ansible/scripts/init_safekeeper.sh
vendored
@@ -22,10 +22,10 @@ cat <<EOF | tee /tmp/payload
|
||||
EOF
|
||||
|
||||
# check if safekeeper already registered or not
|
||||
if ! curl -sf -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" {{ console_mgmt_base_url }}/management/api/v2/safekeepers/${INSTANCE_ID} -o /dev/null; then
|
||||
if ! curl -sf {{ console_mgmt_base_url }}/management/api/v2/safekeepers/${INSTANCE_ID} -o /dev/null; then
|
||||
|
||||
# not registered, so register it now
|
||||
ID=$(curl -sf -X POST -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" -H "Content-Type: application/json" {{ console_mgmt_base_url }}/management/api/v2/safekeepers -d@/tmp/payload | jq -r '.id')
|
||||
ID=$(curl -sf -X POST -H "Content-Type: application/json" {{ console_mgmt_base_url }}/management/api/v2/safekeepers -d@/tmp/payload | jq -r '.id')
|
||||
# init safekeeper
|
||||
sudo -u safekeeper /usr/local/bin/safekeeper --id ${ID} --init -D /storage/safekeeper/data
|
||||
fi
|
||||
|
||||
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -938,7 +938,7 @@ jobs:
|
||||
./get_binaries.sh
|
||||
|
||||
ansible-galaxy collection install sivel.toiletwater
|
||||
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
rm -f neon_install.tar.gz .neon_current_version
|
||||
|
||||
- name: Cleanup ansible folder
|
||||
|
||||
2
.github/workflows/deploy-dev.yml
vendored
2
.github/workflows/deploy-dev.yml
vendored
@@ -67,7 +67,7 @@ jobs:
|
||||
./get_binaries.sh
|
||||
|
||||
ansible-galaxy collection install sivel.toiletwater
|
||||
ansible-playbook -v deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
ansible-playbook -v deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
rm -f neon_install.tar.gz .neon_current_version
|
||||
|
||||
- name: Cleanup ansible folder
|
||||
|
||||
2
.github/workflows/deploy-prod.yml
vendored
2
.github/workflows/deploy-prod.yml
vendored
@@ -68,7 +68,7 @@ jobs:
|
||||
./get_binaries.sh
|
||||
|
||||
ansible-galaxy collection install sivel.toiletwater
|
||||
ansible-playbook -v deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
ansible-playbook -v deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
rm -f neon_install.tar.gz .neon_current_version
|
||||
|
||||
deploy-proxy-prod-new:
|
||||
|
||||
@@ -73,7 +73,7 @@ fn main() -> Result<()> {
|
||||
// Try to use just 'postgres' if no path is provided
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap();
|
||||
|
||||
let mut spec = None;
|
||||
let spec;
|
||||
let mut live_config_allowed = false;
|
||||
match spec_json {
|
||||
// First, try to get cluster spec from the cli argument
|
||||
@@ -89,9 +89,13 @@ fn main() -> Result<()> {
|
||||
} else if let Some(id) = compute_id {
|
||||
if let Some(cp_base) = control_plane_uri {
|
||||
live_config_allowed = true;
|
||||
if let Ok(s) = get_spec_from_control_plane(cp_base, id) {
|
||||
spec = Some(s);
|
||||
}
|
||||
spec = match get_spec_from_control_plane(cp_base, id) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
error!("cannot get response from control plane: {}", e);
|
||||
panic!("neither spec nor confirmation that compute is in the Empty state was received");
|
||||
}
|
||||
};
|
||||
} else {
|
||||
panic!("must specify both --control-plane-uri and --compute-id or none");
|
||||
}
|
||||
@@ -114,7 +118,6 @@ fn main() -> Result<()> {
|
||||
spec_set = false;
|
||||
}
|
||||
let compute_node = ComputeNode {
|
||||
start_time: Utc::now(),
|
||||
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
||||
pgdata: pgdata.to_string(),
|
||||
pgbin: pgbin.to_string(),
|
||||
@@ -147,6 +150,17 @@ fn main() -> Result<()> {
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
let pspec = state.pspec.as_ref().expect("spec must be set");
|
||||
let startup_tracing_context = pspec.spec.startup_tracing_context.clone();
|
||||
|
||||
// Record for how long we slept waiting for the spec.
|
||||
state.metrics.wait_for_spec_ms = Utc::now()
|
||||
.signed_duration_since(state.start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
// Reset start time to the actual start of the configuration, so that
|
||||
// total startup time was properly measured at the end.
|
||||
state.start_time = Utc::now();
|
||||
|
||||
state.status = ComputeStatus::Init;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
|
||||
@@ -38,7 +38,6 @@ use crate::spec::*;
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
pub struct ComputeNode {
|
||||
pub start_time: DateTime<Utc>,
|
||||
// Url type maintains proper escaping
|
||||
pub connstr: url::Url,
|
||||
pub pgdata: String,
|
||||
@@ -66,6 +65,7 @@ pub struct ComputeNode {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ComputeState {
|
||||
pub start_time: DateTime<Utc>,
|
||||
pub status: ComputeStatus,
|
||||
/// Timestamp of the last Postgres activity
|
||||
pub last_active: DateTime<Utc>,
|
||||
@@ -77,6 +77,7 @@ pub struct ComputeState {
|
||||
impl ComputeState {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
start_time: Utc::now(),
|
||||
status: ComputeStatus::Empty,
|
||||
last_active: Utc::now(),
|
||||
error: None,
|
||||
@@ -425,7 +426,7 @@ impl ComputeNode {
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
state.metrics.total_startup_ms = startup_end_time
|
||||
.signed_duration_since(self.start_time)
|
||||
.signed_duration_since(compute_state.start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
@@ -18,6 +18,7 @@ use tracing_utils::http::OtelName;
|
||||
|
||||
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
|
||||
ComputeStatusResponse {
|
||||
start_time: state.start_time,
|
||||
tenant: state
|
||||
.pspec
|
||||
.as_ref()
|
||||
|
||||
@@ -152,11 +152,14 @@ components:
|
||||
type: object
|
||||
description: Compute startup metrics.
|
||||
required:
|
||||
- wait_for_spec_ms
|
||||
- sync_safekeepers_ms
|
||||
- basebackup_ms
|
||||
- config_ms
|
||||
- total_startup_ms
|
||||
properties:
|
||||
wait_for_spec_ms:
|
||||
type: integer
|
||||
sync_safekeepers_ms:
|
||||
type: integer
|
||||
basebackup_ms:
|
||||
@@ -181,6 +184,13 @@ components:
|
||||
- status
|
||||
- last_active
|
||||
properties:
|
||||
start_time:
|
||||
type: string
|
||||
description: |
|
||||
Time when compute was started. If initially compute was started in the `empty`
|
||||
state and then provided with valid spec, `start_time` will be reset to the
|
||||
moment, when spec was received.
|
||||
example: "2022-10-12T07:20:50.52Z"
|
||||
status:
|
||||
$ref: '#/components/schemas/ComputeStatus'
|
||||
last_active:
|
||||
|
||||
@@ -4,42 +4,117 @@ use std::str::FromStr;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use postgres::config::Config;
|
||||
use postgres::{Client, NoTls};
|
||||
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
|
||||
use reqwest::StatusCode;
|
||||
use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
|
||||
|
||||
use crate::config;
|
||||
use crate::params::PG_HBA_ALL_MD5;
|
||||
use crate::pg_helpers::*;
|
||||
|
||||
use compute_api::responses::ControlPlaneSpecResponse;
|
||||
use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
|
||||
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
|
||||
|
||||
// Do control plane request and return response if any. In case of error it
|
||||
// returns a bool flag indicating whether it makes sense to retry the request
|
||||
// and a string with error message.
|
||||
fn do_control_plane_request(
|
||||
uri: &str,
|
||||
jwt: &str,
|
||||
) -> Result<ControlPlaneSpecResponse, (bool, String)> {
|
||||
let resp = reqwest::blocking::Client::new()
|
||||
.get(uri)
|
||||
.header("Authorization", jwt)
|
||||
.send()
|
||||
.map_err(|e| {
|
||||
(
|
||||
true,
|
||||
format!("could not perform spec request to control plane: {}", e),
|
||||
)
|
||||
})?;
|
||||
|
||||
match resp.status() {
|
||||
StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
|
||||
Ok(spec_resp) => Ok(spec_resp),
|
||||
Err(e) => Err((
|
||||
true,
|
||||
format!("could not deserialize control plane response: {}", e),
|
||||
)),
|
||||
},
|
||||
StatusCode::SERVICE_UNAVAILABLE => {
|
||||
Err((true, "control plane is temporarily unavailable".to_string()))
|
||||
}
|
||||
StatusCode::BAD_GATEWAY => {
|
||||
// We have a problem with intermittent 502 errors now
|
||||
// https://github.com/neondatabase/cloud/issues/2353
|
||||
// It's fine to retry GET request in this case.
|
||||
Err((true, "control plane request failed with 502".to_string()))
|
||||
}
|
||||
// Another code, likely 500 or 404, means that compute is unknown to the control plane
|
||||
// or some internal failure happened. Doesn't make much sense to retry in this case.
|
||||
_ => Err((
|
||||
false,
|
||||
format!(
|
||||
"unexpected control plane response status code: {}",
|
||||
resp.status()
|
||||
),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
|
||||
/// env variable is set, it will be used for authorization.
|
||||
pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeSpec> {
|
||||
pub fn get_spec_from_control_plane(
|
||||
base_uri: &str,
|
||||
compute_id: &str,
|
||||
) -> Result<Option<ComputeSpec>> {
|
||||
let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
|
||||
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
|
||||
let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
};
|
||||
let mut attempt = 1;
|
||||
let mut spec: Result<Option<ComputeSpec>> = Ok(None);
|
||||
|
||||
info!("getting spec from control plane: {}", cp_uri);
|
||||
|
||||
// TODO: check the response. We should distinguish cases when it's
|
||||
// - network error, then retry
|
||||
// - no spec for compute yet, then wait
|
||||
// - compute id is unknown or any other error, then bail out
|
||||
let resp: ControlPlaneSpecResponse = reqwest::blocking::Client::new()
|
||||
.get(cp_uri)
|
||||
.header("Authorization", jwt)
|
||||
.send()
|
||||
.map_err(|e| anyhow!("could not send spec request to control plane: {}", e))?
|
||||
.json()
|
||||
.map_err(|e| anyhow!("could not get compute spec from control plane: {}", e))?;
|
||||
// Do 3 attempts to get spec from the control plane using the following logic:
|
||||
// - network error -> then retry
|
||||
// - compute id is unknown or any other error -> bail out
|
||||
// - no spec for compute yet (Empty state) -> return Ok(None)
|
||||
// - got spec -> return Ok(Some(spec))
|
||||
while attempt < 4 {
|
||||
spec = match do_control_plane_request(&cp_uri, &jwt) {
|
||||
Ok(spec_resp) => match spec_resp.status {
|
||||
ControlPlaneComputeStatus::Empty => Ok(None),
|
||||
ControlPlaneComputeStatus::Attached => {
|
||||
if let Some(spec) = spec_resp.spec {
|
||||
Ok(Some(spec))
|
||||
} else {
|
||||
bail!("compute is attached, but spec is empty")
|
||||
}
|
||||
}
|
||||
},
|
||||
Err((retry, msg)) => {
|
||||
if retry {
|
||||
Err(anyhow!(msg))
|
||||
} else {
|
||||
bail!(msg);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(spec) = resp.spec {
|
||||
Ok(spec)
|
||||
} else {
|
||||
bail!("could not get compute spec from control plane")
|
||||
if let Err(e) = &spec {
|
||||
error!("attempt {} to get spec failed with: {}", attempt, e);
|
||||
} else {
|
||||
return spec;
|
||||
}
|
||||
|
||||
attempt += 1;
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
}
|
||||
|
||||
// All attempts failed, return error.
|
||||
spec
|
||||
}
|
||||
|
||||
/// It takes cluster specification and does the following:
|
||||
|
||||
@@ -359,8 +359,8 @@ impl PageServerNode {
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
eviction_policy: settings
|
||||
.get("eviction_policy")
|
||||
.map(|x| serde_json::from_str(x))
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse 'eviction_policy' json")?,
|
||||
min_resident_size_override: settings
|
||||
|
||||
@@ -14,6 +14,7 @@ pub struct GenericAPIError {
|
||||
#[derive(Serialize, Debug)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct ComputeStatusResponse {
|
||||
pub start_time: DateTime<Utc>,
|
||||
pub tenant: Option<String>,
|
||||
pub timeline: Option<String>,
|
||||
pub status: ComputeStatus,
|
||||
@@ -63,6 +64,7 @@ where
|
||||
/// Response of the /metrics.json API
|
||||
#[derive(Clone, Debug, Default, Serialize)]
|
||||
pub struct ComputeMetrics {
|
||||
pub wait_for_spec_ms: u64,
|
||||
pub sync_safekeepers_ms: u64,
|
||||
pub basebackup_ms: u64,
|
||||
pub config_ms: u64,
|
||||
@@ -75,4 +77,16 @@ pub struct ComputeMetrics {
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ControlPlaneSpecResponse {
|
||||
pub spec: Option<ComputeSpec>,
|
||||
pub status: ControlPlaneComputeStatus,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ControlPlaneComputeStatus {
|
||||
// Compute is known to control-plane, but it's not
|
||||
// yet attached to any timeline / endpoint.
|
||||
Empty,
|
||||
// Compute is attached to some timeline / endpoint and
|
||||
// should be able to start with provided spec.
|
||||
Attached,
|
||||
}
|
||||
|
||||
@@ -76,6 +76,7 @@ where
|
||||
|
||||
let log_quietly = method == Method::GET;
|
||||
async move {
|
||||
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
|
||||
if log_quietly {
|
||||
debug!("Handling request");
|
||||
} else {
|
||||
@@ -87,7 +88,11 @@ where
|
||||
// Usage of the error handler also means that we expect only the `ApiError` errors to be raised in this call.
|
||||
//
|
||||
// Panics are not handled separately, there's a `tracing_panic_hook` from another module to do that globally.
|
||||
match (self.0)(request).await {
|
||||
let res = (self.0)(request).await;
|
||||
|
||||
cancellation_guard.disarm();
|
||||
|
||||
match res {
|
||||
Ok(response) => {
|
||||
let response_status = response.status();
|
||||
if log_quietly && response_status.is_success() {
|
||||
@@ -105,6 +110,38 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop guard to WARN in case the request was dropped before completion.
|
||||
struct RequestCancelled {
|
||||
warn: Option<tracing::Span>,
|
||||
}
|
||||
|
||||
impl RequestCancelled {
|
||||
/// Create the drop guard using the [`tracing::Span::current`] as the span.
|
||||
fn warn_when_dropped_without_responding() -> Self {
|
||||
RequestCancelled {
|
||||
warn: Some(tracing::Span::current()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Consume the drop guard without logging anything.
|
||||
fn disarm(mut self) {
|
||||
self.warn = None;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RequestCancelled {
|
||||
fn drop(&mut self) {
|
||||
if let Some(span) = self.warn.take() {
|
||||
// the span has all of the info already, but the outer `.instrument(span)` has already
|
||||
// been dropped, so we need to manually re-enter it for this message.
|
||||
//
|
||||
// this is what the instrument would do before polling so it is fine.
|
||||
let _g = span.entered();
|
||||
warn!("request was dropped before completing");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
SERVE_METRICS_COUNT.inc();
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, EnumVariantNames};
|
||||
|
||||
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
@@ -23,25 +24,64 @@ impl LogFormat {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
|
||||
let default_filter_str = "info";
|
||||
static TRACING_EVENT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
metrics::register_int_counter_vec!(
|
||||
"libmetrics_tracing_event_count",
|
||||
"Number of tracing events, by level",
|
||||
&["level"]
|
||||
)
|
||||
.expect("failed to define metric")
|
||||
});
|
||||
|
||||
struct TracingEventCountLayer(&'static metrics::IntCounterVec);
|
||||
|
||||
impl<S> tracing_subscriber::layer::Layer<S> for TracingEventCountLayer
|
||||
where
|
||||
S: tracing::Subscriber,
|
||||
{
|
||||
fn on_event(
|
||||
&self,
|
||||
event: &tracing::Event<'_>,
|
||||
_ctx: tracing_subscriber::layer::Context<'_, S>,
|
||||
) {
|
||||
let level = event.metadata().level();
|
||||
let level = match *level {
|
||||
tracing::Level::ERROR => "error",
|
||||
tracing::Level::WARN => "warn",
|
||||
tracing::Level::INFO => "info",
|
||||
tracing::Level::DEBUG => "debug",
|
||||
tracing::Level::TRACE => "trace",
|
||||
};
|
||||
self.0.with_label_values(&[level]).inc();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
|
||||
// We fall back to printing all spans at info-level or above if
|
||||
// the RUST_LOG environment variable is not set.
|
||||
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str));
|
||||
let rust_log_env_filter = || {
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
|
||||
};
|
||||
|
||||
let base_logger = tracing_subscriber::fmt()
|
||||
.with_env_filter(env_filter)
|
||||
.with_target(false)
|
||||
.with_ansi(atty::is(atty::Stream::Stdout))
|
||||
.with_writer(std::io::stdout);
|
||||
|
||||
match log_format {
|
||||
LogFormat::Json => base_logger.json().init(),
|
||||
LogFormat::Plain => base_logger.init(),
|
||||
LogFormat::Test => base_logger.with_test_writer().init(),
|
||||
}
|
||||
// NB: the order of the with() calls does not matter.
|
||||
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
|
||||
use tracing_subscriber::prelude::*;
|
||||
tracing_subscriber::registry()
|
||||
.with({
|
||||
let log_layer = tracing_subscriber::fmt::layer()
|
||||
.with_target(false)
|
||||
.with_ansi(atty::is(atty::Stream::Stdout))
|
||||
.with_writer(std::io::stdout);
|
||||
let log_layer = match log_format {
|
||||
LogFormat::Json => log_layer.json().boxed(),
|
||||
LogFormat::Plain => log_layer.boxed(),
|
||||
LogFormat::Test => log_layer.with_test_writer().boxed(),
|
||||
};
|
||||
log_layer.with_filter(rust_log_env_filter())
|
||||
})
|
||||
.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()))
|
||||
.init();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -157,3 +197,33 @@ impl std::fmt::Debug for PrettyLocation<'_, '_> {
|
||||
<Self as std::fmt::Display>::fmt(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use metrics::{core::Opts, IntCounterVec};
|
||||
|
||||
use super::TracingEventCountLayer;
|
||||
|
||||
#[test]
|
||||
fn tracing_event_count_metric() {
|
||||
let counter_vec =
|
||||
IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap();
|
||||
let counter_vec = Box::leak(Box::new(counter_vec)); // make it 'static
|
||||
let layer = TracingEventCountLayer(counter_vec);
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {
|
||||
tracing::trace!("foo");
|
||||
tracing::debug!("foo");
|
||||
tracing::info!("foo");
|
||||
tracing::warn!("foo");
|
||||
tracing::error!("foo");
|
||||
});
|
||||
|
||||
assert_eq!(counter_vec.with_label_values(&["trace"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["debug"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["info"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["warn"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["error"]).get(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -520,6 +520,43 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/synthetic_size:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
get:
|
||||
description: |
|
||||
Calculate tenant's synthetic size
|
||||
responses:
|
||||
"200":
|
||||
description: Tenant's synthetic size
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/SyntheticSizeResponse"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/size:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -948,6 +985,84 @@ components:
|
||||
latest_gc_cutoff_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
SyntheticSizeResponse:
|
||||
type: object
|
||||
required:
|
||||
- id
|
||||
- size
|
||||
- segment_sizes
|
||||
- inputs
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
format: hex
|
||||
size:
|
||||
type: integer
|
||||
segment_sizes:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/SegmentSize"
|
||||
inputs:
|
||||
type: object
|
||||
properties:
|
||||
segments:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/SegmentData"
|
||||
timeline_inputs:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/TimelineInput"
|
||||
|
||||
SegmentSize:
|
||||
type: object
|
||||
required:
|
||||
- method
|
||||
- accum_size
|
||||
properties:
|
||||
method:
|
||||
type: string
|
||||
accum_size:
|
||||
type: integer
|
||||
|
||||
SegmentData:
|
||||
type: object
|
||||
required:
|
||||
- segment
|
||||
properties:
|
||||
segment:
|
||||
type: object
|
||||
required:
|
||||
- lsn
|
||||
properties:
|
||||
parent:
|
||||
type: integer
|
||||
lsn:
|
||||
type: integer
|
||||
size:
|
||||
type: integer
|
||||
needed:
|
||||
type: boolean
|
||||
timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
kind:
|
||||
type: string
|
||||
|
||||
TimelineInput:
|
||||
type: object
|
||||
required:
|
||||
- timeline_id
|
||||
properties:
|
||||
ancestor_id:
|
||||
type: string
|
||||
ancestor_lsn:
|
||||
type: string
|
||||
timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
Error:
|
||||
type: object
|
||||
required:
|
||||
|
||||
@@ -1201,6 +1201,36 @@ async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
)
|
||||
}
|
||||
|
||||
async fn post_tracing_event_handler(mut r: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
enum Level {
|
||||
Error,
|
||||
Warn,
|
||||
Info,
|
||||
Debug,
|
||||
Trace,
|
||||
}
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct Request {
|
||||
level: Level,
|
||||
message: String,
|
||||
}
|
||||
let body: Request = json_request(&mut r)
|
||||
.await
|
||||
.map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
|
||||
|
||||
match body.level {
|
||||
Level::Error => tracing::error!(?body.message),
|
||||
Level::Warn => tracing::warn!(?body.message),
|
||||
Level::Info => tracing::info!(?body.message),
|
||||
Level::Debug => tracing::debug!(?body.message),
|
||||
Level::Trace => tracing::trace!(?body.message),
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
pub fn make_router(
|
||||
conf: &'static PageServerConf,
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
@@ -1341,5 +1371,9 @@ pub fn make_router(
|
||||
testing_api!("set tenant state to broken", handle_tenant_break),
|
||||
)
|
||||
.get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r))
|
||||
.post(
|
||||
"/v1/tracing/event",
|
||||
testing_api!("emit a tracing event", post_tracing_event_handler),
|
||||
)
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -385,6 +385,26 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_remote_timeline_client_bytes_started",
|
||||
"Incremented by the number of bytes associated with a remote timeline client operation. \
|
||||
The increment happens when the operation is scheduled.",
|
||||
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_remote_timeline_client_bytes_finished",
|
||||
"Incremented by the number of bytes associated with a remote timeline client operation. \
|
||||
The increment happens when the operation finishes (regardless of success/failure/shutdown).",
|
||||
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum RemoteOpKind {
|
||||
Upload,
|
||||
@@ -739,6 +759,8 @@ pub struct RemoteTimelineClientMetrics {
|
||||
remote_operation_time: Mutex<HashMap<(&'static str, &'static str, &'static str), Histogram>>,
|
||||
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
|
||||
calls_started_hist: Mutex<HashMap<(&'static str, &'static str), Histogram>>,
|
||||
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientMetrics {
|
||||
@@ -749,6 +771,8 @@ impl RemoteTimelineClientMetrics {
|
||||
remote_operation_time: Mutex::new(HashMap::default()),
|
||||
calls_unfinished_gauge: Mutex::new(HashMap::default()),
|
||||
calls_started_hist: Mutex::new(HashMap::default()),
|
||||
bytes_started_counter: Mutex::new(HashMap::default()),
|
||||
bytes_finished_counter: Mutex::new(HashMap::default()),
|
||||
remote_physical_size_gauge: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
@@ -787,6 +811,7 @@ impl RemoteTimelineClientMetrics {
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn calls_unfinished_gauge(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
@@ -828,32 +853,125 @@ impl RemoteTimelineClientMetrics {
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn bytes_started_counter(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> IntCounter {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.bytes_started_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
key.0,
|
||||
key.1,
|
||||
])
|
||||
.unwrap()
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn bytes_finished_counter(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> IntCounter {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.bytes_finished_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
key.0,
|
||||
key.1,
|
||||
])
|
||||
.unwrap()
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl RemoteTimelineClientMetrics {
|
||||
pub fn get_bytes_started_counter_value(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> Option<u64> {
|
||||
let guard = self.bytes_started_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
guard.get(&key).map(|counter| counter.get())
|
||||
}
|
||||
|
||||
pub fn get_bytes_finished_counter_value(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> Option<u64> {
|
||||
let guard = self.bytes_finished_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
guard.get(&key).map(|counter| counter.get())
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteTimelineClientMetrics::call_begin`].
|
||||
#[must_use]
|
||||
pub(crate) struct RemoteTimelineClientCallMetricGuard(Option<IntGauge>);
|
||||
pub(crate) struct RemoteTimelineClientCallMetricGuard {
|
||||
/// Decremented on drop.
|
||||
calls_unfinished_metric: Option<IntGauge>,
|
||||
/// If Some(), this references the bytes_finished metric, and we increment it by the given `u64` on drop.
|
||||
bytes_finished: Option<(IntCounter, u64)>,
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientCallMetricGuard {
|
||||
/// Consume this guard object without decrementing the metric.
|
||||
/// The caller vouches to do this manually, so that the prior increment of the gauge will cancel out.
|
||||
/// Consume this guard object without performing the metric updates it would do on `drop()`.
|
||||
/// The caller vouches to do the metric updates manually.
|
||||
pub fn will_decrement_manually(mut self) {
|
||||
self.0 = None; // prevent drop() from decrementing
|
||||
let RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric,
|
||||
bytes_finished,
|
||||
} = &mut self;
|
||||
calls_unfinished_metric.take();
|
||||
bytes_finished.take();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RemoteTimelineClientCallMetricGuard {
|
||||
fn drop(&mut self) {
|
||||
if let RemoteTimelineClientCallMetricGuard(Some(guard)) = self {
|
||||
let RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric,
|
||||
bytes_finished,
|
||||
} = self;
|
||||
if let Some(guard) = calls_unfinished_metric.take() {
|
||||
guard.dec();
|
||||
}
|
||||
if let Some((bytes_finished_metric, value)) = bytes_finished {
|
||||
bytes_finished_metric.inc_by(*value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The enum variants communicate to the [`RemoteTimelineClientMetrics`] whether to
|
||||
/// track the byte size of this call in applicable metric(s).
|
||||
pub(crate) enum RemoteTimelineClientMetricsCallTrackSize {
|
||||
/// Do not account for this call's byte size in any metrics.
|
||||
/// The `reason` field is there to make the call sites self-documenting
|
||||
/// about why they don't need the metric.
|
||||
DontTrackSize { reason: &'static str },
|
||||
/// Track the byte size of the call in applicable metric(s).
|
||||
Bytes(u64),
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientMetrics {
|
||||
/// Increment the metrics that track ongoing calls to the remote timeline client instance.
|
||||
/// Update the metrics that change when a call to the remote timeline client instance starts.
|
||||
///
|
||||
/// Drop the returned guard object once the operation is finished to decrement the values.
|
||||
/// Drop the returned guard object once the operation is finished to updates corresponding metrics that track completions.
|
||||
/// Or, use [`RemoteTimelineClientCallMetricGuard::will_decrement_manually`] and [`call_end`] if that
|
||||
/// is more suitable.
|
||||
/// Never do both.
|
||||
@@ -861,24 +979,51 @@ impl RemoteTimelineClientMetrics {
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
size: RemoteTimelineClientMetricsCallTrackSize,
|
||||
) -> RemoteTimelineClientCallMetricGuard {
|
||||
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
self.calls_started_hist(file_kind, op_kind)
|
||||
.observe(unfinished_metric.get() as f64);
|
||||
unfinished_metric.inc();
|
||||
RemoteTimelineClientCallMetricGuard(Some(unfinished_metric))
|
||||
.observe(calls_unfinished_metric.get() as f64);
|
||||
calls_unfinished_metric.inc(); // NB: inc after the histogram, see comment on underlying metric
|
||||
|
||||
let bytes_finished = match size {
|
||||
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {
|
||||
// nothing to do
|
||||
None
|
||||
}
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
|
||||
self.bytes_started_counter(file_kind, op_kind).inc_by(size);
|
||||
let finished_counter = self.bytes_finished_counter(file_kind, op_kind);
|
||||
Some((finished_counter, size))
|
||||
}
|
||||
};
|
||||
RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric: Some(calls_unfinished_metric),
|
||||
bytes_finished,
|
||||
}
|
||||
}
|
||||
|
||||
/// Manually decrement the metric instead of using the guard object.
|
||||
/// Manually udpate the metrics that track completions, instead of using the guard object.
|
||||
/// Using the guard object is generally preferable.
|
||||
/// See [`call_begin`] for more context.
|
||||
pub(crate) fn call_end(&self, file_kind: &RemoteOpFileKind, op_kind: &RemoteOpKind) {
|
||||
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
pub(crate) fn call_end(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
size: RemoteTimelineClientMetricsCallTrackSize,
|
||||
) {
|
||||
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
debug_assert!(
|
||||
unfinished_metric.get() > 0,
|
||||
calls_unfinished_metric.get() > 0,
|
||||
"begin and end should cancel out"
|
||||
);
|
||||
unfinished_metric.dec();
|
||||
calls_unfinished_metric.dec();
|
||||
match size {
|
||||
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {}
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
|
||||
self.bytes_finished_counter(file_kind, op_kind).inc_by(size);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -891,6 +1036,8 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
remote_operation_time,
|
||||
calls_unfinished_gauge,
|
||||
calls_started_hist,
|
||||
bytes_started_counter,
|
||||
bytes_finished_counter,
|
||||
} = self;
|
||||
for ((a, b, c), _) in remote_operation_time.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_OPERATION_TIME.remove_label_values(&[tenant_id, timeline_id, a, b, c]);
|
||||
@@ -911,6 +1058,22 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
b,
|
||||
]);
|
||||
}
|
||||
for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
a,
|
||||
b,
|
||||
]);
|
||||
}
|
||||
for ((a, b), _) in bytes_finished_counter.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER.remove_label_values(&[
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
a,
|
||||
b,
|
||||
]);
|
||||
}
|
||||
{
|
||||
let _ = remote_physical_size_gauge; // use to avoid 'unused' warning in desctructuring above
|
||||
let _ = REMOTE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
|
||||
@@ -700,6 +700,8 @@ impl PageServerHandler {
|
||||
full_backup: bool,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let started = std::time::Instant::now();
|
||||
|
||||
// check that the timeline exists
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
@@ -712,6 +714,8 @@ impl PageServerHandler {
|
||||
.context("invalid basebackup lsn")?;
|
||||
}
|
||||
|
||||
let lsn_awaited_after = started.elapsed();
|
||||
|
||||
// switch client to COPYOUT
|
||||
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
|
||||
pgb.flush().await?;
|
||||
@@ -732,7 +736,17 @@ impl PageServerHandler {
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)?;
|
||||
pgb.flush().await?;
|
||||
info!("basebackup complete");
|
||||
|
||||
let basebackup_after = started
|
||||
.elapsed()
|
||||
.checked_sub(lsn_awaited_after)
|
||||
.unwrap_or(Duration::ZERO);
|
||||
|
||||
info!(
|
||||
lsn_await_millis = lsn_awaited_after.as_millis(),
|
||||
basebackup_millis = basebackup_after.as_millis(),
|
||||
"basebackup complete"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -219,7 +219,8 @@ use utils::lsn::Lsn;
|
||||
|
||||
use crate::metrics::{
|
||||
MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
|
||||
REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
|
||||
RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
|
||||
REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
|
||||
};
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::{
|
||||
@@ -367,9 +368,13 @@ impl RemoteTimelineClient {
|
||||
|
||||
/// Download index file
|
||||
pub async fn download_index_file(&self) -> Result<IndexPart, DownloadError> {
|
||||
let _unfinished_gauge_guard = self
|
||||
.metrics
|
||||
.call_begin(&RemoteOpFileKind::Index, &RemoteOpKind::Download);
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
&RemoteOpFileKind::Index,
|
||||
&RemoteOpKind::Download,
|
||||
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
|
||||
download::download_index_part(
|
||||
self.conf,
|
||||
@@ -398,9 +403,13 @@ impl RemoteTimelineClient {
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
) -> anyhow::Result<u64> {
|
||||
let downloaded_size = {
|
||||
let _unfinished_gauge_guard = self
|
||||
.metrics
|
||||
.call_begin(&RemoteOpFileKind::Layer, &RemoteOpKind::Download);
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
&RemoteOpFileKind::Layer,
|
||||
&RemoteOpKind::Download,
|
||||
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
download::download_layer_file(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
@@ -886,11 +895,32 @@ impl RemoteTimelineClient {
|
||||
fn calls_unfinished_metric_impl(
|
||||
&self,
|
||||
op: &UploadOp,
|
||||
) -> Option<(RemoteOpFileKind, RemoteOpKind)> {
|
||||
) -> Option<(
|
||||
RemoteOpFileKind,
|
||||
RemoteOpKind,
|
||||
RemoteTimelineClientMetricsCallTrackSize,
|
||||
)> {
|
||||
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
|
||||
let res = match op {
|
||||
UploadOp::UploadLayer(_, _) => (RemoteOpFileKind::Layer, RemoteOpKind::Upload),
|
||||
UploadOp::UploadMetadata(_, _) => (RemoteOpFileKind::Index, RemoteOpKind::Upload),
|
||||
UploadOp::Delete(file_kind, _) => (*file_kind, RemoteOpKind::Delete),
|
||||
UploadOp::UploadLayer(_, m) => (
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
|
||||
),
|
||||
UploadOp::UploadMetadata(_, _) => (
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Upload,
|
||||
DontTrackSize {
|
||||
reason: "metadata uploads are tiny",
|
||||
},
|
||||
),
|
||||
UploadOp::Delete(file_kind, _) => (
|
||||
*file_kind,
|
||||
RemoteOpKind::Delete,
|
||||
DontTrackSize {
|
||||
reason: "should we track deletes? positive or negative sign?",
|
||||
},
|
||||
),
|
||||
UploadOp::Barrier(_) => {
|
||||
// we do not account these
|
||||
return None;
|
||||
@@ -900,20 +930,20 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
|
||||
Some(x) => x,
|
||||
None => return,
|
||||
};
|
||||
let guard = self.metrics.call_begin(&file_kind, &op_kind);
|
||||
let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
|
||||
guard.will_decrement_manually(); // in unfinished_ops_metric_end()
|
||||
}
|
||||
|
||||
fn calls_unfinished_metric_end(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
|
||||
Some(x) => x,
|
||||
None => return,
|
||||
};
|
||||
self.metrics.call_end(&file_kind, &op_kind);
|
||||
self.metrics.call_end(&file_kind, &op_kind, track_bytes);
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
@@ -981,11 +1011,19 @@ impl RemoteTimelineClient {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
tenant::harness::{TenantHarness, TIMELINE_ID},
|
||||
context::RequestContext,
|
||||
tenant::{
|
||||
harness::{TenantHarness, TIMELINE_ID},
|
||||
Tenant,
|
||||
},
|
||||
DEFAULT_PG_VERSION,
|
||||
};
|
||||
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
|
||||
use std::{collections::HashSet, path::Path};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tokio::runtime::EnterGuard;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
|
||||
@@ -1034,39 +1072,80 @@ mod tests {
|
||||
assert_eq!(found, expected);
|
||||
}
|
||||
|
||||
struct TestSetup {
|
||||
runtime: &'static tokio::runtime::Runtime,
|
||||
entered_runtime: EnterGuard<'static>,
|
||||
harness: TenantHarness<'static>,
|
||||
tenant: Arc<Tenant>,
|
||||
tenant_ctx: RequestContext,
|
||||
remote_fs_dir: PathBuf,
|
||||
client: Arc<RemoteTimelineClient>,
|
||||
}
|
||||
|
||||
impl TestSetup {
|
||||
fn new(test_name: &str) -> anyhow::Result<Self> {
|
||||
// Use a current-thread runtime in the test
|
||||
let runtime = Box::leak(Box::new(
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?,
|
||||
));
|
||||
let entered_runtime = runtime.enter();
|
||||
|
||||
let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let timeline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = timeline.initialize(&ctx).unwrap();
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
|
||||
|
||||
let storage_config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: std::num::NonZeroUsize::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
|
||||
)
|
||||
.unwrap(),
|
||||
max_sync_errors: std::num::NonZeroU32::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
|
||||
)
|
||||
.unwrap(),
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
};
|
||||
|
||||
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
|
||||
|
||||
let client = Arc::new(RemoteTimelineClient {
|
||||
conf: harness.conf,
|
||||
runtime,
|
||||
tenant_id: harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
storage_impl: storage,
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(
|
||||
&harness.tenant_id,
|
||||
&TIMELINE_ID,
|
||||
)),
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
runtime,
|
||||
entered_runtime,
|
||||
harness,
|
||||
tenant,
|
||||
tenant_ctx: ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Test scheduling
|
||||
#[test]
|
||||
fn upload_scheduling() -> anyhow::Result<()> {
|
||||
// Use a current-thread runtime in the test
|
||||
let runtime = Box::leak(Box::new(
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?,
|
||||
));
|
||||
let _entered = runtime.enter();
|
||||
|
||||
let harness = TenantHarness::create("upload_scheduling")?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
let _timeline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
|
||||
|
||||
let storage_config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: std::num::NonZeroUsize::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
|
||||
)
|
||||
.unwrap(),
|
||||
max_sync_errors: std::num::NonZeroU32::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
|
||||
)
|
||||
.unwrap(),
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
};
|
||||
|
||||
// Test outline:
|
||||
//
|
||||
// Schedule upload of a bunch of layers. Check that they are started immediately, not queued
|
||||
@@ -1081,21 +1160,19 @@ mod tests {
|
||||
// Schedule another deletion. Check that it's launched immediately.
|
||||
// Schedule index upload. Check that it's queued
|
||||
|
||||
println!("workdir: {}", harness.conf.workdir.display());
|
||||
|
||||
let storage_impl = GenericRemoteStorage::from_config(&storage_config)?;
|
||||
let client = Arc::new(RemoteTimelineClient {
|
||||
conf: harness.conf,
|
||||
let TestSetup {
|
||||
runtime,
|
||||
tenant_id: harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
storage_impl,
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(
|
||||
&harness.tenant_id,
|
||||
&TIMELINE_ID,
|
||||
)),
|
||||
});
|
||||
entered_runtime: _entered_runtime,
|
||||
harness,
|
||||
tenant: _tenant,
|
||||
tenant_ctx: _tenant_ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
} = TestSetup::new("upload_scheduling").unwrap();
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
println!("workdir: {}", harness.conf.workdir.display());
|
||||
|
||||
let remote_timeline_dir =
|
||||
remote_fs_dir.join(timeline_path.strip_prefix(&harness.conf.workdir)?);
|
||||
@@ -1216,4 +1293,90 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bytes_unfinished_gauge_for_layer_file_uploads() -> anyhow::Result<()> {
|
||||
// Setup
|
||||
|
||||
let TestSetup {
|
||||
runtime,
|
||||
harness,
|
||||
client,
|
||||
..
|
||||
} = TestSetup::new("metrics")?;
|
||||
|
||||
let metadata = dummy_metadata(Lsn(0x10));
|
||||
client.init_upload_queue_for_empty_remote(&metadata)?;
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let content_1 = dummy_contents("foo");
|
||||
std::fs::write(
|
||||
timeline_path.join(layer_file_name_1.file_name()),
|
||||
&content_1,
|
||||
)?;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct BytesStartedFinished {
|
||||
started: Option<usize>,
|
||||
finished: Option<usize>,
|
||||
}
|
||||
let get_bytes_started_stopped = || {
|
||||
let started = client
|
||||
.metrics
|
||||
.get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
|
||||
.map(|v| v.try_into().unwrap());
|
||||
let stopped = client
|
||||
.metrics
|
||||
.get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
|
||||
.map(|v| v.try_into().unwrap());
|
||||
BytesStartedFinished {
|
||||
started,
|
||||
finished: stopped,
|
||||
}
|
||||
};
|
||||
|
||||
// Test
|
||||
|
||||
let init = get_bytes_started_stopped();
|
||||
|
||||
client.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
)?;
|
||||
|
||||
let pre = get_bytes_started_stopped();
|
||||
|
||||
runtime.block_on(client.wait_completion())?;
|
||||
|
||||
let post = get_bytes_started_stopped();
|
||||
|
||||
// Validate
|
||||
|
||||
assert_eq!(
|
||||
init,
|
||||
BytesStartedFinished {
|
||||
started: None,
|
||||
finished: None
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
pre,
|
||||
BytesStartedFinished {
|
||||
started: Some(content_1.len()),
|
||||
// assert that the _finished metric is created eagerly so that subtractions work on first sample
|
||||
finished: Some(0),
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
post,
|
||||
BytesStartedFinished {
|
||||
started: Some(content_1.len()),
|
||||
finished: Some(content_1.len())
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +45,8 @@ PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
|
||||
*[f"pageserver_remote_timeline_client_calls_started_{x}" for x in ["bucket", "count", "sum"]],
|
||||
*[f"pageserver_remote_operation_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
"pageserver_remote_physical_size",
|
||||
"pageserver_remote_timeline_client_bytes_started_total",
|
||||
"pageserver_remote_timeline_client_bytes_finished_total",
|
||||
)
|
||||
|
||||
PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
@@ -53,6 +55,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_storage_operations_seconds_global_bucket",
|
||||
"libmetrics_launch_timestamp",
|
||||
"libmetrics_build_info",
|
||||
"libmetrics_tracing_event_count_total",
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
|
||||
@@ -550,3 +550,13 @@ class PageserverHttpClient(requests.Session):
|
||||
def tenant_break(self, tenant_id: TenantId):
|
||||
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
|
||||
self.verbose_error(res)
|
||||
|
||||
def post_tracing_event(self, level: str, message: str):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tracing/event",
|
||||
json={
|
||||
"level": level,
|
||||
"message": message,
|
||||
},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
49
test_runner/regress/test_logging.py
Normal file
49
test_runner/regress/test_logging.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "error"])
|
||||
def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str):
|
||||
# self-test: make sure the event is logged (i.e., our testing endpoint works)
|
||||
log_expected = {
|
||||
"trace": False,
|
||||
"debug": False,
|
||||
"info": True,
|
||||
"warn": True,
|
||||
"error": True,
|
||||
}[level]
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
msg_id = uuid.uuid4().hex
|
||||
|
||||
# NB: the _total suffix is added by our prometheus client
|
||||
before = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
|
||||
|
||||
# post the event
|
||||
ps_http.post_tracing_event(level, msg_id)
|
||||
if log_expected:
|
||||
env.pageserver.allowed_errors.append(f".*{msg_id}.*")
|
||||
|
||||
def assert_logged():
|
||||
if not log_expected:
|
||||
return
|
||||
assert env.pageserver.log_contains(f".*{msg_id}.*")
|
||||
|
||||
wait_until(10, 0.5, assert_logged)
|
||||
|
||||
# make sure it's counted
|
||||
def assert_metric_value():
|
||||
if not log_expected:
|
||||
return
|
||||
# NB: the _total suffix is added by our prometheus client
|
||||
val = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
|
||||
val = val or 0.0
|
||||
log.info("libmetrics_tracing_event_count: %s", val)
|
||||
assert val > (before or 0.0)
|
||||
|
||||
wait_until(10, 1, assert_metric_value)
|
||||
@@ -1,3 +1,4 @@
|
||||
import json
|
||||
from contextlib import closing
|
||||
|
||||
import psycopg2.extras
|
||||
@@ -22,6 +23,7 @@ wait_lsn_timeout='111 s';
|
||||
checkpoint_distance = 10000
|
||||
compaction_target_size = 1048576
|
||||
evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold = "23 hours" }
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -44,6 +46,7 @@ evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
"checkpoint_distance": "20000",
|
||||
"gc_period": "30sec",
|
||||
"evictions_low_residence_duration_metric_threshold": "42s",
|
||||
"eviction_policy": json.dumps({"kind": "NoEviction"}),
|
||||
}
|
||||
tenant, _ = env.neon_cli.create_tenant(conf=new_conf)
|
||||
|
||||
@@ -84,6 +87,11 @@ evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
assert effective_config["image_creation_threshold"] == 3
|
||||
assert effective_config["pitr_interval"] == "7days"
|
||||
assert effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
|
||||
assert effective_config["eviction_policy"] == {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"period": "20s",
|
||||
"threshold": "23h",
|
||||
}
|
||||
|
||||
# check the configuration of the new tenant
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
@@ -121,6 +129,9 @@ evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
assert (
|
||||
new_effective_config["evictions_low_residence_duration_metric_threshold"] == "42s"
|
||||
), "Should override default value"
|
||||
assert new_effective_config["eviction_policy"] == {
|
||||
"kind": "NoEviction"
|
||||
}, "Specific 'eviction_policy' config should override the default value"
|
||||
assert new_effective_config["compaction_target_size"] == 1048576
|
||||
assert new_effective_config["compaction_period"] == "20s"
|
||||
assert new_effective_config["compaction_threshold"] == 10
|
||||
@@ -135,6 +146,9 @@ evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
"compaction_period": "80sec",
|
||||
"image_creation_threshold": "2",
|
||||
"evictions_low_residence_duration_metric_threshold": "23h",
|
||||
"eviction_policy": json.dumps(
|
||||
{"kind": "LayerAccessThreshold", "period": "80s", "threshold": "42h"}
|
||||
),
|
||||
}
|
||||
env.neon_cli.config_tenant(
|
||||
tenant_id=tenant,
|
||||
@@ -180,6 +194,11 @@ evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
assert (
|
||||
updated_effective_config["evictions_low_residence_duration_metric_threshold"] == "23h"
|
||||
), "Should override default value"
|
||||
assert updated_effective_config["eviction_policy"] == {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"period": "1m 20s",
|
||||
"threshold": "1day 18h",
|
||||
}, "Specific 'eviction_policy' config should override the default value"
|
||||
assert updated_effective_config["compaction_target_size"] == 1048576
|
||||
assert updated_effective_config["compaction_threshold"] == 10
|
||||
assert updated_effective_config["gc_horizon"] == 67108864
|
||||
@@ -239,6 +258,11 @@ evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
assert final_effective_config["gc_period"] == "1h"
|
||||
assert final_effective_config["image_creation_threshold"] == 3
|
||||
assert final_effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
|
||||
assert final_effective_config["eviction_policy"] == {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"period": "20s",
|
||||
"threshold": "23h",
|
||||
}
|
||||
|
||||
# restart the pageserver and ensure that the config is still correct
|
||||
env.pageserver.stop()
|
||||
|
||||
Reference in New Issue
Block a user