mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 04:20:39 +00:00
Compare commits
6 Commits
instrument
...
problame/p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1251ef6dd4 | ||
|
|
618d36ee6d | ||
|
|
33c2d94ba6 | ||
|
|
08bfe1c826 | ||
|
|
65ff256bb8 | ||
|
|
5177c1e4b1 |
10
.github/workflows/build_and_test.yml
vendored
10
.github/workflows/build_and_test.yml
vendored
@@ -155,7 +155,7 @@ jobs:
|
||||
build_type: [ debug, release ]
|
||||
env:
|
||||
BUILD_TYPE: ${{ matrix.build_type }}
|
||||
GIT_VERSION: ${{ github.sha }}
|
||||
GIT_VERSION: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
|
||||
steps:
|
||||
- name: Fix git ownership
|
||||
@@ -614,7 +614,7 @@ jobs:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.sha }}
|
||||
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
|
||||
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}
|
||||
@@ -658,7 +658,7 @@ jobs:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.sha }}
|
||||
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}}
|
||||
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
--dockerfile Dockerfile.compute-tools
|
||||
@@ -715,7 +715,7 @@ jobs:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.sha }}
|
||||
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
--build-arg PG_VERSION=${{ matrix.version }}
|
||||
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}}
|
||||
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
@@ -742,7 +742,7 @@ jobs:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true \
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache \
|
||||
--context . \
|
||||
--build-arg GIT_VERSION=${{ github.sha }} \
|
||||
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }} \
|
||||
--build-arg PG_VERSION=${{ matrix.version }} \
|
||||
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}} \
|
||||
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com \
|
||||
|
||||
3
.gitmodules
vendored
3
.gitmodules
vendored
@@ -6,3 +6,6 @@
|
||||
path = vendor/postgres-v15
|
||||
url = https://github.com/neondatabase/postgres.git
|
||||
branch = REL_15_STABLE_neon
|
||||
[submodule "pageserver/slo/sloth.git"]
|
||||
path = pageserver/slo/sloth.git
|
||||
url = https://github.com/neondatabase/sloth.git
|
||||
|
||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -4666,7 +4666,6 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"pin-project-lite",
|
||||
"reqwest",
|
||||
"tokio",
|
||||
"tracing",
|
||||
@@ -4818,6 +4817,7 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"const_format",
|
||||
"criterion",
|
||||
"futures",
|
||||
"heapless",
|
||||
|
||||
@@ -549,6 +549,13 @@ impl ComputeNode {
|
||||
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
|
||||
);
|
||||
|
||||
// Log metrics so that we can search for slow operations in logs
|
||||
let metrics = {
|
||||
let state = self.state.lock().unwrap();
|
||||
state.metrics.clone()
|
||||
};
|
||||
info!(?metrics, "compute start finished");
|
||||
|
||||
Ok(pg)
|
||||
}
|
||||
|
||||
|
||||
@@ -16,4 +16,3 @@ tracing-opentelemetry.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
pin-project-lite = "0.2"
|
||||
|
||||
@@ -1,72 +0,0 @@
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
use tracing::{instrument::Instrumented, Level};
|
||||
|
||||
pub trait InstrumentCancel: Sized {
|
||||
type Inner;
|
||||
fn with_cancel_info(self) -> CancelLog<Self::Inner> {
|
||||
self.with_cancel_log(Level::INFO)
|
||||
}
|
||||
fn with_cancel_log(self, level: Level) -> CancelLog<Self::Inner>;
|
||||
}
|
||||
|
||||
impl<T: Sized> InstrumentCancel for Instrumented<T> {
|
||||
type Inner = T;
|
||||
fn with_cancel_log(self, level: Level) -> CancelLog<T> {
|
||||
CancelLog {
|
||||
inner: self,
|
||||
cancel_level: Some(level),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// A [`Future`] that has been instrumented with a `tracing` [`Span`]. It will emit a log on cancellation
|
||||
///
|
||||
/// This type is returned by the [`Instrument`] extension trait. See that
|
||||
/// trait's documentation for details.
|
||||
///
|
||||
/// [`Future`]: std::future::Future
|
||||
/// [`Span`]: crate::Span
|
||||
#[derive(Debug, Clone)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct CancelLog<T> {
|
||||
#[pin]
|
||||
inner: Instrumented<T>,
|
||||
cancel_level: Option<Level>,
|
||||
}
|
||||
|
||||
impl<T> PinnedDrop for CancelLog<T> {
|
||||
fn drop(this: Pin<&mut Self>) {
|
||||
let this = this.project();
|
||||
if let Some(level) = this.cancel_level.take() {
|
||||
let _enter = this.inner.span().enter();
|
||||
match level {
|
||||
Level::TRACE => tracing::event!(Level::TRACE, "task was cancelled"),
|
||||
Level::DEBUG => tracing::event!(Level::DEBUG, "task was cancelled"),
|
||||
Level::INFO => tracing::event!(Level::INFO, "task was cancelled"),
|
||||
Level::WARN => tracing::event!(Level::WARN, "task was cancelled"),
|
||||
Level::ERROR => tracing::event!(Level::ERROR, "task was cancelled"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Future> Future for CancelLog<T> {
|
||||
type Output = T::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let res = this.inner.poll(cx);
|
||||
if res.is_ready() {
|
||||
*this.cancel_level = None;
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
@@ -41,7 +41,6 @@ use opentelemetry_otlp::{OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_
|
||||
pub use tracing_opentelemetry::OpenTelemetryLayer;
|
||||
|
||||
pub mod http;
|
||||
pub mod instrument;
|
||||
|
||||
/// Set up OpenTelemetry exporter, using configuration from environment variables.
|
||||
///
|
||||
|
||||
@@ -40,6 +40,8 @@ pq_proto.workspace = true
|
||||
metrics.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
const_format.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
byteorder.workspace = true
|
||||
bytes.workspace = true
|
||||
|
||||
@@ -109,10 +109,16 @@ pub use failpoint_macro_helpers::failpoint_sleep_helper;
|
||||
/// * building in docker (either in CI or locally)
|
||||
///
|
||||
/// One thing to note is that .git is not available in docker (and it is bad to include it there).
|
||||
/// So everything becides docker build is covered by git_version crate, and docker uses a `GIT_VERSION` argument to get the value required.
|
||||
/// It takes variable from build process env and puts it to the rustc env. And then we can retrieve it here by using env! macro.
|
||||
/// Git version received from environment variable used as a fallback in git_version invocation.
|
||||
/// And to avoid running buildscript every recompilation, we use rerun-if-env-changed option.
|
||||
/// When building locally, the `git_version` is used to query .git. When building on CI and docker,
|
||||
/// we don't build the actual PR branch commits, but always a "phantom" would be merge commit to
|
||||
/// the target branch -- the actual PR commit from which we build from is supplied as GIT_VERSION
|
||||
/// environment variable.
|
||||
///
|
||||
/// We ended up with this compromise between phantom would be merge commits vs. pull request branch
|
||||
/// heads due to old logs becoming more reliable (github could gc the phantom merge commit
|
||||
/// anytime) in #4641.
|
||||
///
|
||||
/// To avoid running buildscript every recompilation, we use rerun-if-env-changed option.
|
||||
/// So the build script will be run only when GIT_VERSION envvar has changed.
|
||||
///
|
||||
/// Why not to use buildscript to get git commit sha directly without procmacro from different crate?
|
||||
@@ -132,17 +138,28 @@ pub use failpoint_macro_helpers::failpoint_sleep_helper;
|
||||
#[macro_export]
|
||||
macro_rules! project_git_version {
|
||||
($const_identifier:ident) => {
|
||||
const $const_identifier: &str = git_version::git_version!(
|
||||
prefix = "git:",
|
||||
fallback = concat!(
|
||||
"git-env:",
|
||||
env!("GIT_VERSION", "Missing GIT_VERSION envvar")
|
||||
),
|
||||
args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha
|
||||
);
|
||||
// this should try GIT_VERSION first only then git_version::git_version!
|
||||
const $const_identifier: &::core::primitive::str = {
|
||||
const __COMMIT_FROM_GIT: &::core::primitive::str = git_version::git_version! {
|
||||
prefix = "",
|
||||
fallback = "unknown",
|
||||
args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha
|
||||
};
|
||||
|
||||
const __ARG: &[&::core::primitive::str; 2] = &match ::core::option_env!("GIT_VERSION") {
|
||||
::core::option::Option::Some(x) => ["git-env:", x],
|
||||
::core::option::Option::None => ["git:", __COMMIT_FROM_GIT],
|
||||
};
|
||||
|
||||
$crate::__const_format::concatcp!(__ARG[0], __ARG[1])
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
/// Re-export for `project_git_version` macro
|
||||
#[doc(hidden)]
|
||||
pub use const_format as __const_format;
|
||||
|
||||
/// Same as `assert!`, but evaluated during compilation and gets optimized out in runtime.
|
||||
#[macro_export]
|
||||
macro_rules! const_assert {
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use pageserver::keyspace::{KeyPartitioning, KeySpace};
|
||||
use pageserver::repository::Key;
|
||||
use pageserver::tenant::layer_map::LayerMap;
|
||||
use pageserver::tenant::storage_layer::{tests::LayerDescriptor, Layer, LayerFileName};
|
||||
use pageserver::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
|
||||
use pageserver::tenant::storage_layer::LayerFileName;
|
||||
use pageserver::tenant::storage_layer::PersistentLayerDesc;
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use std::cmp::{max, min};
|
||||
use std::fs::File;
|
||||
@@ -28,13 +28,13 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
|
||||
for fname in filenames {
|
||||
let fname = fname.unwrap();
|
||||
let fname = LayerFileName::from_str(&fname).unwrap();
|
||||
let layer = LayerDescriptor::from(fname);
|
||||
let layer = PersistentLayerDesc::from(fname);
|
||||
|
||||
let lsn_range = layer.get_lsn_range();
|
||||
min_lsn = min(min_lsn, lsn_range.start);
|
||||
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
|
||||
|
||||
updates.insert_historic(layer.layer_desc().clone());
|
||||
updates.insert_historic(layer);
|
||||
}
|
||||
|
||||
println!("min: {min_lsn}, max: {max_lsn}");
|
||||
@@ -210,15 +210,15 @@ fn bench_sequential(c: &mut Criterion) {
|
||||
for i in 0..100_000 {
|
||||
let i32 = (i as u32) % 100;
|
||||
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
|
||||
let layer = LayerDescriptor::from(PersistentLayerDesc::new_img(
|
||||
let layer = PersistentLayerDesc::new_img(
|
||||
TenantId::generate(),
|
||||
TimelineId::generate(),
|
||||
zero.add(10 * i32)..zero.add(10 * i32 + 1),
|
||||
Lsn(i),
|
||||
false,
|
||||
0,
|
||||
));
|
||||
updates.insert_historic(layer.layer_desc().clone());
|
||||
);
|
||||
updates.insert_historic(layer);
|
||||
}
|
||||
updates.flush();
|
||||
println!("Finished layer map init in {:?}", now.elapsed());
|
||||
|
||||
@@ -117,7 +117,8 @@ pub fn main() -> Result<()> {
|
||||
|
||||
let mut lsn_diff = (lsn_end - lsn_start) as f32;
|
||||
let mut fill = Fill::None;
|
||||
let mut margin = 0.05 * lsn_diff; // Height-dependent margin to disambiguate overlapping deltas
|
||||
let mut ymargin = 0.05 * lsn_diff; // Height-dependent margin to disambiguate overlapping deltas
|
||||
let xmargin = 0.05; // Height-dependent margin to disambiguate overlapping deltas
|
||||
let mut lsn_offset = 0.0;
|
||||
|
||||
// Fill in and thicken rectangle if it's an
|
||||
@@ -128,7 +129,7 @@ pub fn main() -> Result<()> {
|
||||
num_images += 1;
|
||||
lsn_diff = 0.3;
|
||||
lsn_offset = -lsn_diff / 2.0;
|
||||
margin = 0.05;
|
||||
ymargin = 0.05;
|
||||
fill = Fill::Color(rgb(0, 0, 0));
|
||||
}
|
||||
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
|
||||
@@ -137,10 +138,10 @@ pub fn main() -> Result<()> {
|
||||
println!(
|
||||
" {}",
|
||||
rectangle(
|
||||
key_start as f32 + stretch * margin,
|
||||
stretch * (lsn_max as f32 - (lsn_end as f32 - margin - lsn_offset)),
|
||||
key_diff as f32 - stretch * 2.0 * margin,
|
||||
stretch * (lsn_diff - 2.0 * margin)
|
||||
key_start as f32 + stretch * xmargin,
|
||||
stretch * (lsn_max as f32 - (lsn_end as f32 - ymargin - lsn_offset)),
|
||||
key_diff as f32 - stretch * 2.0 * xmargin,
|
||||
stretch * (lsn_diff - 2.0 * ymargin)
|
||||
)
|
||||
.fill(fill)
|
||||
.stroke(Stroke::Color(rgb(0, 0, 0), 0.1))
|
||||
|
||||
1
pageserver/slo/.gitignore
vendored
Normal file
1
pageserver/slo/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
*.sloth-output.yaml
|
||||
20
pageserver/slo/Makefile
Normal file
20
pageserver/slo/Makefile
Normal file
@@ -0,0 +1,20 @@
|
||||
|
||||
BINARY := sloth.git/bin/sloth-linux-amd64
|
||||
|
||||
.PHONY: all
|
||||
all: pageserver-slo.dev.sloth-output.yaml pageserver-slo.prod.sloth-output.yaml
|
||||
|
||||
%.sloth-output.yaml: %.sloth.yaml $(BINARY)
|
||||
$(BINARY) generate \
|
||||
--disable-promExpr-validation \
|
||||
--input $< \
|
||||
--out $@
|
||||
|
||||
CLOUD_GIT_CHECKOUT := /nonexistent
|
||||
|
||||
.PHONY: sync-to-cloud-git
|
||||
sync-to-cloud-git: pageserver-slo.dev.sloth-output.yaml pageserver-slo.prod.sloth-output.yaml
|
||||
cp pageserver-slo.dev.sloth-output.yaml \
|
||||
$(CLOUD_GIT_CHECKOUT)/ops/infra/workloads/values/dev-eu-central-1-alpha/neon-vm/pageserver-slo.dev.sloth-output.yaml
|
||||
cp pageserver-slo.prod.sloth-output.yaml \
|
||||
$(CLOUD_GIT_CHECKOUT)/ops/infra/workloads/values/prod-eu-central-1-gamma/neon-vm/pageserver-slo.prod.sloth-output.yaml
|
||||
83
pageserver/slo/README.md
Normal file
83
pageserver/slo/README.md
Normal file
@@ -0,0 +1,83 @@
|
||||
|
||||
## Install Sloth
|
||||
|
||||
https://sloth.dev/introduction/install/
|
||||
|
||||
```sh
|
||||
wget https://github.com/slok/sloth/releases/download/v0.11.0/sloth-linux-amd64
|
||||
chmod +x ./sloth-linux-amd64
|
||||
```
|
||||
|
||||
## Background on Sloth
|
||||
|
||||
https://sloth.dev/introduction/
|
||||
https://sloth.dev/introduction/architecture/
|
||||
|
||||
|
||||
## Generate Prometheus Rules From Sloth Spec
|
||||
|
||||
```
|
||||
./sloth-linux-amd64 generate --input ./spec.sloth.yml --out generated.prometheus.rules.yml
|
||||
```
|
||||
|
||||
## Background reading:
|
||||
|
||||
SRE workbook chapter on "Implementing SLOs", section "Calculating the SLIs"
|
||||
|
||||
https://sre.google/workbook/implementing-slos/
|
||||
|
||||
Citation:
|
||||
|
||||
```
|
||||
Availability
|
||||
|
||||
sum(rate(http_requests_total{host="api", status!~"5.."}[7d]))
|
||||
/
|
||||
sum(rate(http_requests_total{host="api"}[7d])
|
||||
|
||||
Latency
|
||||
|
||||
histogram_quantile(0.9, rate(http_request_duration_seconds_bucket[7d]))
|
||||
|
||||
histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[7d]))
|
||||
```
|
||||
|
||||
## Sloth Rule Syntax
|
||||
|
||||
It's under-documented.
|
||||
|
||||
Best to go to the Go types:
|
||||
https://pkg.go.dev/github.com/slok/sloth@v0.6.0/pkg/prometheus/api/v1#section-readme
|
||||
|
||||
For latency SLOs, pageserver, we want the "Raw SLI" type SLI, not the one that is based on events.
|
||||
Seach for `error_ratio_query` ; example: https://sloth.dev/examples/default/raw-sli/
|
||||
|
||||
Use victoriametrics `histogram_share` to compute the error ratio.
|
||||
It's the inverese of histogram_quantile.
|
||||
https://docs.victoriametrics.com/MetricsQL.html#histogram_share
|
||||
|
||||
`share_le_over_time` seems also useful
|
||||
https://docs.victoriametrics.com/MetricsQL.html#share_le_over_time
|
||||
|
||||
https://stackoverflow.com/questions/72559302/is-it-possible-to-calculate-ranks-of-metrics?rq=1
|
||||
|
||||
Problem with the VictoriaMetrics-only functions is that sloth has an internal validation pass:
|
||||
https://github.com/slok/sloth/issues/510
|
||||
Option to skip the check:
|
||||
https://github.com/slok/sloth/pull/511
|
||||
=>
|
||||
```
|
||||
git submodule update --init
|
||||
pushd sloth.git
|
||||
make build
|
||||
popd
|
||||
sloth.git/bin/sloth-linux-amd64 generate \
|
||||
--disable-promExpr-validation \
|
||||
--input ./spec.sloth.yml \
|
||||
--out generated.prometheus.rules.yml
|
||||
```
|
||||
|
||||
## Notes On How To Scale The Process To Multiple Teams / Automate Sloth In Neon
|
||||
|
||||
* SLO directory discovery: https://sloth.dev/usage/cli/
|
||||
* allows using directories instead of indivudal files as input
|
||||
4
pageserver/slo/pageserver-slo.dev.sloth.yaml
Normal file
4
pageserver/slo/pageserver-slo.dev.sloth.yaml
Normal file
@@ -0,0 +1,4 @@
|
||||
version: "prometheus/v1"
|
||||
service: "pageserver"
|
||||
slos: []
|
||||
|
||||
63
pageserver/slo/pageserver-slo.prod.sloth.yaml
Normal file
63
pageserver/slo/pageserver-slo.prod.sloth.yaml
Normal file
@@ -0,0 +1,63 @@
|
||||
version: "prometheus/v1"
|
||||
service: "pageserver"
|
||||
#labels:
|
||||
# owner: "pageserver-team"
|
||||
# repo: "myorg/myservice"
|
||||
# tier: "2"
|
||||
slos:
|
||||
# # We allow failing (5xx and 429) 1 request every 1000 requests (99.9%).
|
||||
# - name: "requests-availability"
|
||||
# objective: 99.9
|
||||
# description: "Common SLO based on availability for HTTP request responses."
|
||||
# sli:
|
||||
# events:
|
||||
# error_query: sum(rate(http_request_duration_seconds_count{job="myservice",code=~"(5..|429)"}[{{.window}}]))
|
||||
# total_query: sum(rate(http_request_duration_seconds_count{job="myservice"}[{{.window}}]))
|
||||
# alerting:
|
||||
# name: MyServiceHighErrorRate
|
||||
# labels:
|
||||
# category: "availability"
|
||||
# annotations:
|
||||
# # Overwrite default Sloth SLO alert summmary on ticket and page alerts.
|
||||
# summary: "High error rate on 'myservice' requests responses"
|
||||
# page_alert:
|
||||
# labels:
|
||||
# severity: pageteam
|
||||
# routing_key: myteam
|
||||
# ticket_alert:
|
||||
# labels:
|
||||
# severity: "slack"
|
||||
# slack_channel: "#alerts-myteam"
|
||||
|
||||
- name: "basebackup_ms-latency"
|
||||
description: "basebackup_ms latency should be less than 0.2s for 99.9% of requests"
|
||||
objective: 99.9
|
||||
#labels:
|
||||
sli:
|
||||
raw:
|
||||
# VictoriaMetrics-only (fails due to sloth validation pass)
|
||||
error_ratio_query: |
|
||||
histogram_share(200, sum by (le) (compute_basebackup_ms_bucket[5m]))
|
||||
# error_ratio_query: |
|
||||
# https://docs.victoriametrics.com/MetricsQL.html#share_le_over_time
|
||||
#
|
||||
# Prometheus has a native one, not supported by VictoriaMetrics validation pass
|
||||
#error_ratio_query: |
|
||||
# histogram_fraction(0, 200, compute_basebackup_ms_bucket[{{.window}}])
|
||||
# error_ratio_query: |
|
||||
# histogram_quantile(0.999, sum by (le) (rate(compute_basebackup_ms_bucket[{{.window}}])))
|
||||
alerting:
|
||||
name: "basebackup_ms-latency"
|
||||
labels:
|
||||
category: "latency"
|
||||
annotations: {}
|
||||
page_alert:
|
||||
labels:
|
||||
severity: "pageteam"
|
||||
routing_key: "myteam"
|
||||
ticket_alert:
|
||||
labels:
|
||||
severity: "slack"
|
||||
slack_channel: "#alerts-myteam"
|
||||
|
||||
|
||||
1
pageserver/slo/sloth.git
Submodule
1
pageserver/slo/sloth.git
Submodule
Submodule pageserver/slo/sloth.git added at fcbbc5e34f
@@ -33,6 +33,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::field;
|
||||
use tracing::*;
|
||||
use utils::id::ConnectionId;
|
||||
use utils::{
|
||||
@@ -51,6 +52,7 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant;
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::GetTenantError;
|
||||
use crate::tenant::{Tenant, Timeline};
|
||||
@@ -238,6 +240,7 @@ pub async fn libpq_listener_main(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(peer_addr))]
|
||||
async fn page_service_conn_main(
|
||||
conf: &'static PageServerConf,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
@@ -260,6 +263,7 @@ async fn page_service_conn_main(
|
||||
.context("could not set TCP_NODELAY")?;
|
||||
|
||||
let peer_addr = socket.peer_addr().context("get peer address")?;
|
||||
tracing::Span::current().record("peer_addr", field::display(peer_addr));
|
||||
|
||||
// setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
|
||||
// - long enough for most valid compute connections
|
||||
@@ -362,7 +366,7 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self, pgb, ctx))]
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_pagerequests<IO>(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -373,6 +377,8 @@ impl PageServerHandler {
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// NOTE: pagerequests handler exits when connection is closed,
|
||||
// so there is no need to reset the association
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
@@ -473,7 +479,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip(self, pgb, ctx))]
|
||||
#[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
|
||||
async fn handle_import_basebackup<IO>(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -487,6 +493,8 @@ impl PageServerHandler {
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
@@ -531,7 +539,7 @@ impl PageServerHandler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self, pgb, ctx))]
|
||||
#[instrument(skip_all, fields(%start_lsn, %end_lsn))]
|
||||
async fn handle_import_wal<IO>(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -544,6 +552,7 @@ impl PageServerHandler {
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
|
||||
@@ -738,7 +747,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip(self, pgb, ctx))]
|
||||
#[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
|
||||
async fn handle_basebackup_request<IO>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -752,6 +761,8 @@ impl PageServerHandler {
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let started = std::time::Instant::now();
|
||||
|
||||
// check that the timeline exists
|
||||
@@ -862,6 +873,7 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id, timeline_id))]
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -883,6 +895,10 @@ where
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
|
||||
@@ -902,6 +918,10 @@ where
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
let lsn = if params.len() >= 3 {
|
||||
@@ -948,6 +968,10 @@ where
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
|
||||
|
||||
@@ -979,6 +1003,10 @@ where
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
// The caller is responsible for providing correct lsn and prev_lsn.
|
||||
let lsn = if params.len() > 2 {
|
||||
Some(
|
||||
@@ -1033,6 +1061,10 @@ where
|
||||
let pg_version = u32::from_str(params[4])
|
||||
.with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
match self
|
||||
@@ -1077,6 +1109,10 @@ where
|
||||
let end_lsn = Lsn::from_str(params[3])
|
||||
.with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
match self
|
||||
@@ -1108,6 +1144,8 @@ where
|
||||
let tenant_id = TenantId::from_str(params[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
|
||||
|
||||
tracing::Span::current().record("tenant_id", field::display(tenant_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
|
||||
@@ -651,19 +651,35 @@ impl LayerMap {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::LayerMap;
|
||||
use crate::tenant::storage_layer::{tests::LayerDescriptor, LayerFileName};
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
mod l0_delta_layers_updated {
|
||||
|
||||
use crate::tenant::{
|
||||
storage_layer::{PersistentLayer, PersistentLayerDesc},
|
||||
storage_layer::{AsLayerDesc, PersistentLayerDesc},
|
||||
timeline::LayerFileManager,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
struct LayerObject(PersistentLayerDesc);
|
||||
|
||||
impl AsLayerDesc for LayerObject {
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerObject {
|
||||
fn new(desc: PersistentLayerDesc) -> Self {
|
||||
LayerObject(desc)
|
||||
}
|
||||
}
|
||||
|
||||
type TestLayerFileManager = LayerFileManager<LayerObject>;
|
||||
|
||||
#[test]
|
||||
fn for_full_range_delta() {
|
||||
// l0_delta_layers are used by compaction, and should observe all buffered updates
|
||||
@@ -700,18 +716,18 @@ mod tests {
|
||||
|
||||
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
|
||||
let layer = LayerFileName::from_str(layer).unwrap();
|
||||
let layer = LayerDescriptor::from(layer);
|
||||
let layer = PersistentLayerDesc::from(layer);
|
||||
|
||||
// same skeletan construction; see scenario below
|
||||
let not_found = Arc::new(layer.clone());
|
||||
let new_version = Arc::new(layer);
|
||||
let not_found = Arc::new(LayerObject::new(layer.clone()));
|
||||
let new_version = Arc::new(LayerObject::new(layer));
|
||||
|
||||
// after the immutable storage state refactor, the replace operation
|
||||
// will not use layer map any more. We keep it here for consistency in test cases
|
||||
// and can remove it in the future.
|
||||
let _map = LayerMap::default();
|
||||
|
||||
let mut mapping = LayerFileManager::new();
|
||||
let mut mapping = TestLayerFileManager::new();
|
||||
|
||||
mapping
|
||||
.replace_and_verify(not_found, new_version)
|
||||
@@ -720,10 +736,10 @@ mod tests {
|
||||
|
||||
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
|
||||
let name = LayerFileName::from_str(layer_name).unwrap();
|
||||
let skeleton = LayerDescriptor::from(name);
|
||||
let skeleton = PersistentLayerDesc::from(name);
|
||||
|
||||
let remote = Arc::new(skeleton.clone());
|
||||
let downloaded = Arc::new(skeleton);
|
||||
let remote = Arc::new(LayerObject::new(skeleton.clone()));
|
||||
let downloaded = Arc::new(LayerObject::new(skeleton));
|
||||
|
||||
let mut map = LayerMap::default();
|
||||
let mut mapping = LayerFileManager::new();
|
||||
|
||||
@@ -376,6 +376,12 @@ pub type LayerIter<'i> = Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + 'i
|
||||
/// Returned by [`Layer::key_iter`]
|
||||
pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i + Send>;
|
||||
|
||||
/// Get a layer descriptor from a layer.
|
||||
pub trait AsLayerDesc {
|
||||
/// Get the layer descriptor.
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc;
|
||||
}
|
||||
|
||||
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
|
||||
/// range of LSNs.
|
||||
///
|
||||
@@ -389,10 +395,8 @@ pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i + Send
|
||||
/// A delta layer contains all modifications within a range of LSNs and keys.
|
||||
/// An image layer is a snapshot of all the data in a key-range, at a single
|
||||
/// LSN.
|
||||
pub trait PersistentLayer: Layer {
|
||||
/// Get the layer descriptor.
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc;
|
||||
|
||||
pub trait PersistentLayer: Layer + AsLayerDesc {
|
||||
/// Identify the tenant this layer belongs to
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
self.layer_desc().tenant_id
|
||||
}
|
||||
@@ -458,119 +462,32 @@ pub fn downcast_remote_layer(
|
||||
pub mod tests {
|
||||
use super::*;
|
||||
|
||||
/// Holds metadata about a layer without any content. Used mostly for testing.
|
||||
///
|
||||
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
|
||||
/// LayerDescriptor.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LayerDescriptor {
|
||||
base: PersistentLayerDesc,
|
||||
}
|
||||
|
||||
impl From<PersistentLayerDesc> for LayerDescriptor {
|
||||
fn from(base: PersistentLayerDesc) -> Self {
|
||||
Self { base }
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for LayerDescriptor {
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_data: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
todo!("This method shouldn't be part of the Layer trait")
|
||||
}
|
||||
|
||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.layer_desc().key_range.clone()
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
self.layer_desc().lsn_range.clone()
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn is_incremental(&self) -> bool {
|
||||
self.layer_desc().is_incremental
|
||||
}
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
impl std::fmt::Display for LayerDescriptor {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.layer_desc().short_id())
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for LayerDescriptor {
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc {
|
||||
&self.base
|
||||
}
|
||||
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn iter(&self, _: &RequestContext) -> Result<LayerIter<'_>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn key_iter(&self, _: &RequestContext) -> Result<LayerKeyIter<'_>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn delete_resident_layer_file(&self) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn info(&self, _: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DeltaFileName> for LayerDescriptor {
|
||||
impl From<DeltaFileName> for PersistentLayerDesc {
|
||||
fn from(value: DeltaFileName) -> Self {
|
||||
LayerDescriptor {
|
||||
base: PersistentLayerDesc::new_delta(
|
||||
TenantId::from_array([0; 16]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
value.key_range,
|
||||
value.lsn_range,
|
||||
233,
|
||||
),
|
||||
}
|
||||
PersistentLayerDesc::new_delta(
|
||||
TenantId::from_array([0; 16]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
value.key_range,
|
||||
value.lsn_range,
|
||||
233,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ImageFileName> for LayerDescriptor {
|
||||
impl From<ImageFileName> for PersistentLayerDesc {
|
||||
fn from(value: ImageFileName) -> Self {
|
||||
LayerDescriptor {
|
||||
base: PersistentLayerDesc::new_img(
|
||||
TenantId::from_array([0; 16]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
value.key_range,
|
||||
value.lsn,
|
||||
false,
|
||||
233,
|
||||
),
|
||||
}
|
||||
PersistentLayerDesc::new_img(
|
||||
TenantId::from_array([0; 16]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
value.key_range,
|
||||
value.lsn,
|
||||
false,
|
||||
233,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LayerFileName> for LayerDescriptor {
|
||||
impl From<LayerFileName> for PersistentLayerDesc {
|
||||
fn from(value: LayerFileName) -> Self {
|
||||
match value {
|
||||
LayerFileName::Delta(d) => Self::from(d),
|
||||
|
||||
@@ -56,8 +56,8 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::{
|
||||
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
|
||||
PathOrConf, PersistentLayerDesc,
|
||||
AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerIter,
|
||||
LayerKeyIter, PathOrConf, PersistentLayerDesc,
|
||||
};
|
||||
|
||||
///
|
||||
@@ -403,11 +403,13 @@ impl std::fmt::Display for DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for DeltaLayer {
|
||||
impl AsLayerDesc for DeltaLayer {
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc {
|
||||
&self.desc
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for DeltaLayer {
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
Some(self.path())
|
||||
}
|
||||
|
||||
@@ -53,7 +53,9 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::filename::ImageFileName;
|
||||
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf, PersistentLayerDesc};
|
||||
use super::{
|
||||
AsLayerDesc, Layer, LayerAccessStatsReset, LayerIter, PathOrConf, PersistentLayerDesc,
|
||||
};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -241,11 +243,13 @@ impl std::fmt::Display for ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for ImageLayer {
|
||||
impl AsLayerDesc for ImageLayer {
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc {
|
||||
&self.desc
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for ImageLayer {
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
Some(self.path())
|
||||
}
|
||||
|
||||
@@ -20,8 +20,8 @@ use utils::{
|
||||
|
||||
use super::filename::{DeltaFileName, ImageFileName};
|
||||
use super::{
|
||||
DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
|
||||
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
|
||||
AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter,
|
||||
LayerKeyIter, LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
|
||||
};
|
||||
|
||||
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
|
||||
@@ -115,11 +115,13 @@ impl std::fmt::Display for RemoteLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for RemoteLayer {
|
||||
impl AsLayerDesc for RemoteLayer {
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc {
|
||||
&self.desc
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for RemoteLayer {
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
None
|
||||
}
|
||||
|
||||
@@ -90,7 +90,8 @@ use super::layer_map::BatchedUpdates;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::storage_layer::{
|
||||
DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc, PersistentLayerKey,
|
||||
AsLayerDesc, DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc,
|
||||
PersistentLayerKey,
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
@@ -124,10 +125,12 @@ impl PartialOrd for Hole {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LayerFileManager(HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>);
|
||||
pub struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
|
||||
HashMap<PersistentLayerKey, Arc<T>>,
|
||||
);
|
||||
|
||||
impl LayerFileManager {
|
||||
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
|
||||
impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
|
||||
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<T> {
|
||||
// The assumption for the `expect()` is that all code maintains the following invariant:
|
||||
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
|
||||
self.0
|
||||
@@ -137,7 +140,7 @@ impl LayerFileManager {
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn insert(&mut self, layer: Arc<dyn PersistentLayer>) {
|
||||
pub(crate) fn insert(&mut self, layer: Arc<T>) {
|
||||
let present = self.0.insert(layer.layer_desc().key(), layer.clone());
|
||||
if present.is_some() && cfg!(debug_assertions) {
|
||||
panic!("overwriting a layer: {:?}", layer.layer_desc())
|
||||
@@ -148,7 +151,7 @@ impl LayerFileManager {
|
||||
Self(HashMap::new())
|
||||
}
|
||||
|
||||
pub(crate) fn remove(&mut self, layer: Arc<dyn PersistentLayer>) {
|
||||
pub(crate) fn remove(&mut self, layer: Arc<T>) {
|
||||
let present = self.0.remove(&layer.layer_desc().key());
|
||||
if present.is_none() && cfg!(debug_assertions) {
|
||||
panic!(
|
||||
@@ -158,11 +161,7 @@ impl LayerFileManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replace_and_verify(
|
||||
&mut self,
|
||||
expected: Arc<dyn PersistentLayer>,
|
||||
new: Arc<dyn PersistentLayer>,
|
||||
) -> Result<()> {
|
||||
pub(crate) fn replace_and_verify(&mut self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
|
||||
let key = expected.layer_desc().key();
|
||||
let other = new.layer_desc().key();
|
||||
|
||||
@@ -209,7 +208,6 @@ fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
|
||||
fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
|
||||
drop(rlock)
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<RwLock<TenantConfOpt>>,
|
||||
|
||||
@@ -9,7 +9,6 @@ use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
|
||||
use std::future;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tracing::{error, info, info_span, Instrument};
|
||||
use tracing_utils::instrument::InstrumentCancel;
|
||||
|
||||
static CPLANE_WAITERS: Lazy<Waiters<ComputeReady>> = Lazy::new(Default::default);
|
||||
|
||||
@@ -51,14 +50,23 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result<()> {
|
||||
async move {
|
||||
info!("serving a new console management API connection");
|
||||
|
||||
// these might be long running connections, have a separate logging for cancelling
|
||||
// on shutdown and other ways of stopping.
|
||||
let cancelled = scopeguard::guard(tracing::Span::current(), |span| {
|
||||
let _e = span.entered();
|
||||
info!("console management API task cancelled");
|
||||
});
|
||||
|
||||
if let Err(e) = handle_connection(socket).await {
|
||||
error!("serving failed with an error: {e}");
|
||||
} else {
|
||||
info!("serving completed");
|
||||
}
|
||||
|
||||
// we can no longer get dropped
|
||||
scopeguard::ScopeGuard::into_inner(cancelled);
|
||||
}
|
||||
.instrument(span)
|
||||
.with_cancel_info(),
|
||||
.instrument(span),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use parking_lot::Mutex;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use std::fmt;
|
||||
use std::ops::ControlFlow;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::time;
|
||||
|
||||
@@ -10,7 +9,8 @@ use crate::{auth, console};
|
||||
|
||||
use super::sql_over_http::MAX_RESPONSE_SIZE;
|
||||
|
||||
use crate::proxy::{invalidate_cache, retry_after, try_wake, NUM_RETRIES_WAKE_COMPUTE};
|
||||
use crate::proxy::try_wake;
|
||||
use crate::proxy::{BASE_RETRY_WAIT_DURATION, NUM_RETRIES_WAKE_COMPUTE};
|
||||
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
@@ -184,10 +184,11 @@ impl GlobalConnPool {
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Wake up the destination if needed. Code here is a bit involved because
|
||||
// we reuse the code from the usual proxy and we need to prepare few structures
|
||||
// that this code expects.
|
||||
#[tracing::instrument(skip_all)]
|
||||
//
|
||||
async fn connect_to_compute(
|
||||
config: &config::ProxyConfig,
|
||||
conn_info: &ConnInfo,
|
||||
@@ -219,72 +220,54 @@ async fn connect_to_compute(
|
||||
|
||||
let node_info = &mut creds.wake_compute(&extra).await?.expect("msg");
|
||||
|
||||
// This code is a copy of `connect_to_compute` from `src/proxy.rs` with
|
||||
// the difference that it uses `tokio_postgres` for the connection.
|
||||
let mut num_retries = 0;
|
||||
let mut wait_duration = time::Duration::ZERO;
|
||||
let mut should_wake_with_error = None;
|
||||
let mut should_wake = true;
|
||||
loop {
|
||||
if !wait_duration.is_zero() {
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
|
||||
// try wake the compute node if we have determined it's sensible to do so
|
||||
if let Some(err) = should_wake_with_error.take() {
|
||||
match try_wake(node_info, &extra, &creds).await {
|
||||
// we can't wake up the compute node
|
||||
Ok(None) => return Err(err),
|
||||
// there was an error communicating with the control plane
|
||||
Err(e) => return Err(e.into()),
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(Some(ControlFlow::Continue(()))) => {
|
||||
wait_duration = retry_after(num_retries);
|
||||
should_wake_with_error = Some(err);
|
||||
|
||||
num_retries += 1;
|
||||
info!(num_retries, "retrying wake compute");
|
||||
continue;
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
Ok(Some(ControlFlow::Break(()))) => {}
|
||||
}
|
||||
}
|
||||
|
||||
match connect_to_compute_once(node_info, conn_info).await {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if !can_retry_error(&e, num_retries) {
|
||||
Err(e) if num_retries == NUM_RETRIES_WAKE_COMPUTE => {
|
||||
if let Some(wait_duration) = retry_connect_in(&e, num_retries) {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if should_wake {
|
||||
match try_wake(node_info, &extra, &creds).await {
|
||||
Ok(Some(x)) => should_wake = x,
|
||||
Ok(None) => return Err(e.into()),
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
if !wait_duration.is_zero() {
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
} else {
|
||||
return Err(e.into());
|
||||
}
|
||||
wait_duration = retry_after(num_retries);
|
||||
|
||||
// after the first connect failure,
|
||||
// we should invalidate the cache and wake up a new compute node
|
||||
if num_retries == 0 {
|
||||
invalidate_cache(node_info);
|
||||
should_wake_with_error = Some(e.into());
|
||||
}
|
||||
}
|
||||
other => return Ok(other?),
|
||||
}
|
||||
|
||||
num_retries += 1;
|
||||
info!(num_retries, "retrying connect");
|
||||
info!(retries_left = num_retries, "retrying connect");
|
||||
}
|
||||
}
|
||||
|
||||
fn can_retry_error(err: &tokio_postgres::Error, num_retries: u32) -> bool {
|
||||
fn retry_connect_in(err: &tokio_postgres::Error, num_retries: u32) -> Option<time::Duration> {
|
||||
use tokio_postgres::error::SqlState;
|
||||
match err.code() {
|
||||
// retry all errors at least once
|
||||
_ if num_retries == 0 => true,
|
||||
// keep retrying connection errors
|
||||
// retry all errors at least once immediately
|
||||
_ if num_retries == 0 => Some(time::Duration::ZERO),
|
||||
// keep retrying connection errors every 100ms
|
||||
Some(
|
||||
&SqlState::CONNECTION_FAILURE
|
||||
| &SqlState::CONNECTION_EXCEPTION
|
||||
| &SqlState::CONNECTION_DOES_NOT_EXIST
|
||||
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
|
||||
) if num_retries < NUM_RETRIES_WAKE_COMPUTE => true,
|
||||
) => {
|
||||
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
|
||||
Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries))
|
||||
}
|
||||
// otherwise, don't retry
|
||||
_ => false,
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ use hyper::StatusCode;
|
||||
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
|
||||
use std::{ops::ControlFlow, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
|
||||
time,
|
||||
@@ -32,7 +32,7 @@ use utils::measured_stream::MeasuredStream;
|
||||
/// Number of times we should retry the `/proxy_wake_compute` http request.
|
||||
/// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n
|
||||
pub const NUM_RETRIES_WAKE_COMPUTE: u32 = 10;
|
||||
const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100);
|
||||
pub const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100);
|
||||
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
const ERR_PROTO_VIOLATION: &str = "protocol violation";
|
||||
@@ -335,37 +335,11 @@ async fn connect_to_compute(
|
||||
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
|
||||
) -> Result<PostgresConnection, compute::ConnectionError> {
|
||||
let mut num_retries = 0;
|
||||
let mut wait_duration = time::Duration::ZERO;
|
||||
let mut should_wake_with_error = None;
|
||||
let mut should_wake = true;
|
||||
loop {
|
||||
// Apply startup params to the (possibly, cached) compute node info.
|
||||
node_info.config.set_startup_params(params);
|
||||
|
||||
if !wait_duration.is_zero() {
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
|
||||
// try wake the compute node if we have determined it's sensible to do so
|
||||
if let Some(err) = should_wake_with_error.take() {
|
||||
match try_wake(node_info, extra, creds).await {
|
||||
// we can't wake up the compute node
|
||||
Ok(None) => return Err(err),
|
||||
// there was an error communicating with the control plane
|
||||
Err(e) => return Err(io_error(e).into()),
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(Some(ControlFlow::Continue(()))) => {
|
||||
wait_duration = retry_after(num_retries);
|
||||
should_wake_with_error = Some(err);
|
||||
|
||||
num_retries += 1;
|
||||
info!(num_retries, "retrying wake compute");
|
||||
continue;
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
Ok(Some(ControlFlow::Break(()))) => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Set a shorter timeout for the initial connection attempt.
|
||||
//
|
||||
// In case we try to connect to an outdated address that is no longer valid, the
|
||||
@@ -386,25 +360,30 @@ async fn connect_to_compute(
|
||||
};
|
||||
|
||||
match connect_to_compute_once(node_info, timeout).await {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if !can_retry_error(&e, num_retries) {
|
||||
Err(e) if num_retries < NUM_RETRIES_WAKE_COMPUTE => {
|
||||
if let Some(wait_duration) = retry_connect_in(&e, num_retries) {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if should_wake {
|
||||
match try_wake(node_info, extra, creds).await {
|
||||
Ok(Some(x)) => {
|
||||
should_wake = x;
|
||||
}
|
||||
Ok(None) => return Err(e),
|
||||
Err(e) => return Err(io_error(e).into()),
|
||||
}
|
||||
}
|
||||
if !wait_duration.is_zero() {
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
wait_duration = retry_after(num_retries);
|
||||
|
||||
// after the first connect failure,
|
||||
// we should invalidate the cache and wake up a new compute node
|
||||
if num_retries == 0 {
|
||||
invalidate_cache(node_info);
|
||||
should_wake_with_error = Some(e);
|
||||
}
|
||||
}
|
||||
other => return other,
|
||||
}
|
||||
|
||||
num_retries += 1;
|
||||
info!(num_retries, "retrying connect");
|
||||
info!(retries_left = num_retries, "retrying connect");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -416,51 +395,41 @@ pub async fn try_wake(
|
||||
node_info: &mut console::CachedNodeInfo,
|
||||
extra: &console::ConsoleReqExtra<'_>,
|
||||
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
|
||||
) -> Result<Option<ControlFlow<()>>, WakeComputeError> {
|
||||
) -> Result<Option<bool>, WakeComputeError> {
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
invalidate_cache(node_info);
|
||||
match creds.wake_compute(extra).await {
|
||||
// retry wake if the compute was in an invalid state
|
||||
Err(WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
})) => Ok(Some(ControlFlow::Continue(()))),
|
||||
})) => Ok(Some(true)),
|
||||
// Update `node_info` and try again.
|
||||
Ok(Some(mut new)) => {
|
||||
new.config.reuse_password(&node_info.config);
|
||||
*node_info = new;
|
||||
Ok(Some(ControlFlow::Break(())))
|
||||
Ok(Some(false))
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
Ok(None) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn can_retry_error(err: &compute::ConnectionError, num_retries: u32) -> bool {
|
||||
fn retry_connect_in(err: &compute::ConnectionError, num_retries: u32) -> Option<time::Duration> {
|
||||
use std::io::ErrorKind;
|
||||
match err {
|
||||
// retry all errors at least once
|
||||
_ if num_retries == 0 => true,
|
||||
// keep retrying connection errors
|
||||
compute::ConnectionError::CouldNotConnect(io_err)
|
||||
if num_retries < NUM_RETRIES_WAKE_COMPUTE =>
|
||||
{
|
||||
matches!(
|
||||
io_err.kind(),
|
||||
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable
|
||||
)
|
||||
}
|
||||
// retry all errors at least once immediately
|
||||
_ if num_retries == 0 => Some(time::Duration::ZERO),
|
||||
// keep retrying connection errors every 100ms
|
||||
compute::ConnectionError::CouldNotConnect(io_err) => match io_err.kind() {
|
||||
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable => {
|
||||
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
|
||||
Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries))
|
||||
}
|
||||
_ => None,
|
||||
},
|
||||
// otherwise, don't retry
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn retry_after(num_retries: u32) -> time::Duration {
|
||||
match num_retries {
|
||||
0 => time::Duration::ZERO,
|
||||
_ => {
|
||||
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
|
||||
BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
//! A group of high-level tests for connection establishing logic and auth.
|
||||
use std::io;
|
||||
|
||||
use super::*;
|
||||
use crate::{auth, sasl, scram};
|
||||
use async_trait::async_trait;
|
||||
@@ -297,9 +299,14 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
|
||||
|
||||
#[test]
|
||||
fn connect_compute_total_wait() {
|
||||
let err = compute::ConnectionError::CouldNotConnect(io::Error::new(
|
||||
io::ErrorKind::ConnectionRefused,
|
||||
"conn refused",
|
||||
));
|
||||
|
||||
let mut total_wait = tokio::time::Duration::ZERO;
|
||||
for num_retries in 0..10 {
|
||||
total_wait += retry_after(num_retries);
|
||||
total_wait += retry_connect_in(&err, num_retries).unwrap();
|
||||
}
|
||||
assert!(total_wait < tokio::time::Duration::from_secs(12));
|
||||
assert!(total_wait > tokio::time::Duration::from_secs(10));
|
||||
|
||||
@@ -3109,3 +3109,18 @@ def last_flush_lsn_upload(
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
|
||||
return last_flush_lsn
|
||||
|
||||
|
||||
def parse_project_git_version_output(s: str) -> str:
|
||||
"""
|
||||
Parses the git commit hash out of the --version output supported at least by neon_local.
|
||||
|
||||
The information is generated by utils::project_git_version!
|
||||
"""
|
||||
import re
|
||||
|
||||
res = re.search(r"git(-env)?:([0-9a-fA-F]{8,40})(-\S+)?", s)
|
||||
if res and (commit := res.group(2)):
|
||||
return commit
|
||||
|
||||
raise ValueError(f"unable to parse --version output: '{s}'")
|
||||
|
||||
@@ -14,6 +14,7 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
PortDistributor,
|
||||
parse_project_git_version_output,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
@@ -352,7 +353,7 @@ def prepare_snapshot(
|
||||
# get git SHA of neon binary
|
||||
def get_neon_version(neon_binpath: Path):
|
||||
out = subprocess.check_output([neon_binpath / "neon_local", "--version"]).decode("utf-8")
|
||||
return out.split("git:", 1)[1].rstrip()
|
||||
return parse_project_git_version_output(out)
|
||||
|
||||
|
||||
def check_neon_works(
|
||||
|
||||
@@ -1,12 +1,18 @@
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.neon_fixtures import (
|
||||
DEFAULT_BRANCH_NAME,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
parse_project_git_version_output,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pg_version import PgVersion, skip_on_postgres
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
@@ -131,3 +137,66 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
|
||||
# Default stop
|
||||
res = env.neon_cli.raw_cli(["stop"])
|
||||
res.check_returncode()
|
||||
|
||||
|
||||
@skip_on_postgres(PgVersion.V14, reason="does not use postgres")
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("BUILD_TYPE") == "debug", reason="unit test for test support, either build works"
|
||||
)
|
||||
def test_parse_project_git_version_output_positive():
|
||||
commit = "b6f77b5816cf1dba12a3bc8747941182ce220846"
|
||||
|
||||
positive = [
|
||||
# most likely when developing locally
|
||||
f"Neon CLI git:{commit}-modified",
|
||||
# when developing locally
|
||||
f"Neon CLI git:{commit}",
|
||||
# this is not produced in practice, but the impl supports it
|
||||
f"Neon CLI git-env:{commit}-modified",
|
||||
# most likely from CI or docker build
|
||||
f"Neon CLI git-env:{commit}",
|
||||
]
|
||||
|
||||
for example in positive:
|
||||
assert parse_project_git_version_output(example) == commit
|
||||
|
||||
|
||||
@skip_on_postgres(PgVersion.V14, reason="does not use postgres")
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("BUILD_TYPE") == "debug", reason="unit test for test support, either build works"
|
||||
)
|
||||
def test_parse_project_git_version_output_local_docker():
|
||||
"""
|
||||
Makes sure the tests don't accept the default version in Dockerfile one gets without providing
|
||||
a commit lookalike in --build-arg GIT_VERSION=XXX
|
||||
"""
|
||||
input = "Neon CLI git-env:local"
|
||||
|
||||
with pytest.raises(ValueError) as e:
|
||||
parse_project_git_version_output(input)
|
||||
|
||||
assert input in str(e)
|
||||
|
||||
|
||||
@skip_on_postgres(PgVersion.V14, reason="does not use postgres")
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("BUILD_TYPE") == "debug", reason="cli api sanity, either build works"
|
||||
)
|
||||
def test_binaries_version_parses(neon_binpath: Path):
|
||||
"""
|
||||
Ensures that we can parse the actual outputs of --version from a set of binaries.
|
||||
|
||||
The list is not meant to be exhaustive, and compute_ctl has a different way for example.
|
||||
"""
|
||||
|
||||
binaries = [
|
||||
"neon_local",
|
||||
"pageserver",
|
||||
"safekeeper",
|
||||
"proxy",
|
||||
"pg_sni_router",
|
||||
"storage_broker",
|
||||
]
|
||||
for bin in binaries:
|
||||
out = subprocess.check_output([neon_binpath / bin, "--version"]).decode("utf-8")
|
||||
parse_project_git_version_output(out)
|
||||
|
||||
Reference in New Issue
Block a user