Compare commits

..

6 Commits

Author SHA1 Message Date
Christian Schwarz
1251ef6dd4 initial pageserver SLO sloth rules
The generated files were deployed manually in
https://github.com/neondatabase/cloud/pull/5812
2023-07-12 16:03:28 +02:00
bojanserafimov
618d36ee6d compute_ctl: log a structured event on successful start (#4679) 2023-07-10 15:34:26 -04:00
Alexander Bayandin
33c2d94ba6 Fix git-env version for PRs (#4641)
## Problem

Binaries created from PRs (both in docker images and for tests) have 
wrong git-env versions, they point to phantom merge commits.

## Summary of changes
- Prefer GIT_VERSION env variable even if git information was accessible
- Use `${{ github.event.pull_request.head.sha || github.sha }}` instead
of `${{ github.sha }}` for `GIT_VERSION` in workflows

So the builds will still happen from this phantom commit, but we will
report the PR commit.

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-07-10 20:01:01 +01:00
Alex Chi Z
08bfe1c826 remove LayerDescriptor and use LayerObject for tests (#4637)
## Problem

part of https://github.com/neondatabase/neon/pull/4340

## Summary of changes

Remove LayerDescriptor and remove `todo!`. At the same time, this PR
adds `AsLayerDesc` trait for all persistent layers and changed
`LayerFileManager` to have a generic type. For tests, we are now using
`LayerObject`, which is a wrapper around `PersistentLayerDesc`.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-10 19:40:37 +03:00
Christian Schwarz
65ff256bb8 page_service: add peer_addr span field, and set tenant_id / timeline_id fields earlier (#4638)
Before this PR, during shutdown, we'd find naked logs like this one for every active page service connection:

```
2023-07-05T14:13:50.791992Z  INFO shutdown request received in run_message_loop
```

This PR
1. adds a peer_addr span field to distinguish the connections in logs
2. sets the tenant_id / timeline_id fields earlier

It would be nice to have `tenant_id` and `timeline_id` directly on
 the `page_service_conn_main` span (empty, initially), then set
them at the top of `process_query`.
The problem is that the debug asserts for `tenant_id` and
`timeline_id` presence in the tracing span doesn't support
detecting empty values [1].
So, I'm a bit hesitant about over-using `Span::record`.

[1] https://github.com/neondatabase/neon/issues/4676
2023-07-10 15:23:40 +02:00
Alex Chi Z
5177c1e4b1 pagectl: separate xy margin for draw timeline (#4669)
We were computing margin by lsn range, but this will cause problems for
layer maps with large overlapping LSN range. Now we compute x, y margin
separately to avoid this issue.

## Summary of changes

before:

<img width="1651" alt="image"
src="https://github.com/neondatabase/neon/assets/4198311/3bfb50cb-960b-4d8f-9bbe-a55c89d82a28">

we have a lot of rectangles of negative width, and they disappear in the
layer map.

after:

<img width="1320" alt="image"
src="https://github.com/neondatabase/neon/assets/4198311/550f0f96-849f-4bdc-a852-b977499f04f4">


Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-10 09:22:06 -04:00
31 changed files with 526 additions and 369 deletions

View File

@@ -155,7 +155,7 @@ jobs:
build_type: [ debug, release ]
env:
BUILD_TYPE: ${{ matrix.build_type }}
GIT_VERSION: ${{ github.sha }}
GIT_VERSION: ${{ github.event.pull_request.head.sha || github.sha }}
steps:
- name: Fix git ownership
@@ -614,7 +614,7 @@ jobs:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}
@@ -658,7 +658,7 @@ jobs:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--dockerfile Dockerfile.compute-tools
@@ -715,7 +715,7 @@ jobs:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
--build-arg PG_VERSION=${{ matrix.version }}
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
@@ -742,7 +742,7 @@ jobs:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true \
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache \
--context . \
--build-arg GIT_VERSION=${{ github.sha }} \
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }} \
--build-arg PG_VERSION=${{ matrix.version }} \
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}} \
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com \

3
.gitmodules vendored
View File

@@ -6,3 +6,6 @@
path = vendor/postgres-v15
url = https://github.com/neondatabase/postgres.git
branch = REL_15_STABLE_neon
[submodule "pageserver/slo/sloth.git"]
path = pageserver/slo/sloth.git
url = https://github.com/neondatabase/sloth.git

2
Cargo.lock generated
View File

@@ -4666,7 +4666,6 @@ dependencies = [
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"pin-project-lite",
"reqwest",
"tokio",
"tracing",
@@ -4818,6 +4817,7 @@ dependencies = [
"byteorder",
"bytes",
"chrono",
"const_format",
"criterion",
"futures",
"heapless",

View File

@@ -549,6 +549,13 @@ impl ComputeNode {
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
);
// Log metrics so that we can search for slow operations in logs
let metrics = {
let state = self.state.lock().unwrap();
state.metrics.clone()
};
info!(?metrics, "compute start finished");
Ok(pg)
}

View File

@@ -16,4 +16,3 @@ tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
workspace_hack.workspace = true
pin-project-lite = "0.2"

View File

@@ -1,72 +0,0 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
use tracing::{instrument::Instrumented, Level};
pub trait InstrumentCancel: Sized {
type Inner;
fn with_cancel_info(self) -> CancelLog<Self::Inner> {
self.with_cancel_log(Level::INFO)
}
fn with_cancel_log(self, level: Level) -> CancelLog<Self::Inner>;
}
impl<T: Sized> InstrumentCancel for Instrumented<T> {
type Inner = T;
fn with_cancel_log(self, level: Level) -> CancelLog<T> {
CancelLog {
inner: self,
cancel_level: Some(level),
}
}
}
pin_project! {
/// A [`Future`] that has been instrumented with a `tracing` [`Span`]. It will emit a log on cancellation
///
/// This type is returned by the [`Instrument`] extension trait. See that
/// trait's documentation for details.
///
/// [`Future`]: std::future::Future
/// [`Span`]: crate::Span
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CancelLog<T> {
#[pin]
inner: Instrumented<T>,
cancel_level: Option<Level>,
}
impl<T> PinnedDrop for CancelLog<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
if let Some(level) = this.cancel_level.take() {
let _enter = this.inner.span().enter();
match level {
Level::TRACE => tracing::event!(Level::TRACE, "task was cancelled"),
Level::DEBUG => tracing::event!(Level::DEBUG, "task was cancelled"),
Level::INFO => tracing::event!(Level::INFO, "task was cancelled"),
Level::WARN => tracing::event!(Level::WARN, "task was cancelled"),
Level::ERROR => tracing::event!(Level::ERROR, "task was cancelled"),
}
}
}
}
}
impl<T: Future> Future for CancelLog<T> {
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = this.inner.poll(cx);
if res.is_ready() {
*this.cancel_level = None;
}
res
}
}

View File

@@ -41,7 +41,6 @@ use opentelemetry_otlp::{OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_
pub use tracing_opentelemetry::OpenTelemetryLayer;
pub mod http;
pub mod instrument;
/// Set up OpenTelemetry exporter, using configuration from environment variables.
///

View File

@@ -40,6 +40,8 @@ pq_proto.workspace = true
metrics.workspace = true
workspace_hack.workspace = true
const_format.workspace = true
[dev-dependencies]
byteorder.workspace = true
bytes.workspace = true

View File

@@ -109,10 +109,16 @@ pub use failpoint_macro_helpers::failpoint_sleep_helper;
/// * building in docker (either in CI or locally)
///
/// One thing to note is that .git is not available in docker (and it is bad to include it there).
/// So everything becides docker build is covered by git_version crate, and docker uses a `GIT_VERSION` argument to get the value required.
/// It takes variable from build process env and puts it to the rustc env. And then we can retrieve it here by using env! macro.
/// Git version received from environment variable used as a fallback in git_version invocation.
/// And to avoid running buildscript every recompilation, we use rerun-if-env-changed option.
/// When building locally, the `git_version` is used to query .git. When building on CI and docker,
/// we don't build the actual PR branch commits, but always a "phantom" would be merge commit to
/// the target branch -- the actual PR commit from which we build from is supplied as GIT_VERSION
/// environment variable.
///
/// We ended up with this compromise between phantom would be merge commits vs. pull request branch
/// heads due to old logs becoming more reliable (github could gc the phantom merge commit
/// anytime) in #4641.
///
/// To avoid running buildscript every recompilation, we use rerun-if-env-changed option.
/// So the build script will be run only when GIT_VERSION envvar has changed.
///
/// Why not to use buildscript to get git commit sha directly without procmacro from different crate?
@@ -132,17 +138,28 @@ pub use failpoint_macro_helpers::failpoint_sleep_helper;
#[macro_export]
macro_rules! project_git_version {
($const_identifier:ident) => {
const $const_identifier: &str = git_version::git_version!(
prefix = "git:",
fallback = concat!(
"git-env:",
env!("GIT_VERSION", "Missing GIT_VERSION envvar")
),
args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha
);
// this should try GIT_VERSION first only then git_version::git_version!
const $const_identifier: &::core::primitive::str = {
const __COMMIT_FROM_GIT: &::core::primitive::str = git_version::git_version! {
prefix = "",
fallback = "unknown",
args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha
};
const __ARG: &[&::core::primitive::str; 2] = &match ::core::option_env!("GIT_VERSION") {
::core::option::Option::Some(x) => ["git-env:", x],
::core::option::Option::None => ["git:", __COMMIT_FROM_GIT],
};
$crate::__const_format::concatcp!(__ARG[0], __ARG[1])
};
};
}
/// Re-export for `project_git_version` macro
#[doc(hidden)]
pub use const_format as __const_format;
/// Same as `assert!`, but evaluated during compilation and gets optimized out in runtime.
#[macro_export]
macro_rules! const_assert {

View File

@@ -1,8 +1,8 @@
use pageserver::keyspace::{KeyPartitioning, KeySpace};
use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::{tests::LayerDescriptor, Layer, LayerFileName};
use pageserver::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::storage_layer::PersistentLayerDesc;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use std::cmp::{max, min};
use std::fs::File;
@@ -28,13 +28,13 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
for fname in filenames {
let fname = fname.unwrap();
let fname = LayerFileName::from_str(&fname).unwrap();
let layer = LayerDescriptor::from(fname);
let layer = PersistentLayerDesc::from(fname);
let lsn_range = layer.get_lsn_range();
min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(layer.layer_desc().clone());
updates.insert_historic(layer);
}
println!("min: {min_lsn}, max: {max_lsn}");
@@ -210,15 +210,15 @@ fn bench_sequential(c: &mut Criterion) {
for i in 0..100_000 {
let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = LayerDescriptor::from(PersistentLayerDesc::new_img(
let layer = PersistentLayerDesc::new_img(
TenantId::generate(),
TimelineId::generate(),
zero.add(10 * i32)..zero.add(10 * i32 + 1),
Lsn(i),
false,
0,
));
updates.insert_historic(layer.layer_desc().clone());
);
updates.insert_historic(layer);
}
updates.flush();
println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -117,7 +117,8 @@ pub fn main() -> Result<()> {
let mut lsn_diff = (lsn_end - lsn_start) as f32;
let mut fill = Fill::None;
let mut margin = 0.05 * lsn_diff; // Height-dependent margin to disambiguate overlapping deltas
let mut ymargin = 0.05 * lsn_diff; // Height-dependent margin to disambiguate overlapping deltas
let xmargin = 0.05; // Height-dependent margin to disambiguate overlapping deltas
let mut lsn_offset = 0.0;
// Fill in and thicken rectangle if it's an
@@ -128,7 +129,7 @@ pub fn main() -> Result<()> {
num_images += 1;
lsn_diff = 0.3;
lsn_offset = -lsn_diff / 2.0;
margin = 0.05;
ymargin = 0.05;
fill = Fill::Color(rgb(0, 0, 0));
}
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
@@ -137,10 +138,10 @@ pub fn main() -> Result<()> {
println!(
" {}",
rectangle(
key_start as f32 + stretch * margin,
stretch * (lsn_max as f32 - (lsn_end as f32 - margin - lsn_offset)),
key_diff as f32 - stretch * 2.0 * margin,
stretch * (lsn_diff - 2.0 * margin)
key_start as f32 + stretch * xmargin,
stretch * (lsn_max as f32 - (lsn_end as f32 - ymargin - lsn_offset)),
key_diff as f32 - stretch * 2.0 * xmargin,
stretch * (lsn_diff - 2.0 * ymargin)
)
.fill(fill)
.stroke(Stroke::Color(rgb(0, 0, 0), 0.1))

1
pageserver/slo/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
*.sloth-output.yaml

20
pageserver/slo/Makefile Normal file
View File

@@ -0,0 +1,20 @@
BINARY := sloth.git/bin/sloth-linux-amd64
.PHONY: all
all: pageserver-slo.dev.sloth-output.yaml pageserver-slo.prod.sloth-output.yaml
%.sloth-output.yaml: %.sloth.yaml $(BINARY)
$(BINARY) generate \
--disable-promExpr-validation \
--input $< \
--out $@
CLOUD_GIT_CHECKOUT := /nonexistent
.PHONY: sync-to-cloud-git
sync-to-cloud-git: pageserver-slo.dev.sloth-output.yaml pageserver-slo.prod.sloth-output.yaml
cp pageserver-slo.dev.sloth-output.yaml \
$(CLOUD_GIT_CHECKOUT)/ops/infra/workloads/values/dev-eu-central-1-alpha/neon-vm/pageserver-slo.dev.sloth-output.yaml
cp pageserver-slo.prod.sloth-output.yaml \
$(CLOUD_GIT_CHECKOUT)/ops/infra/workloads/values/prod-eu-central-1-gamma/neon-vm/pageserver-slo.prod.sloth-output.yaml

83
pageserver/slo/README.md Normal file
View File

@@ -0,0 +1,83 @@
## Install Sloth
https://sloth.dev/introduction/install/
```sh
wget https://github.com/slok/sloth/releases/download/v0.11.0/sloth-linux-amd64
chmod +x ./sloth-linux-amd64
```
## Background on Sloth
https://sloth.dev/introduction/
https://sloth.dev/introduction/architecture/
## Generate Prometheus Rules From Sloth Spec
```
./sloth-linux-amd64 generate --input ./spec.sloth.yml --out generated.prometheus.rules.yml
```
## Background reading:
SRE workbook chapter on "Implementing SLOs", section "Calculating the SLIs"
https://sre.google/workbook/implementing-slos/
Citation:
```
Availability
sum(rate(http_requests_total{host="api", status!~"5.."}[7d]))
/
sum(rate(http_requests_total{host="api"}[7d])
Latency
histogram_quantile(0.9, rate(http_request_duration_seconds_bucket[7d]))
histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[7d]))
```
## Sloth Rule Syntax
It's under-documented.
Best to go to the Go types:
https://pkg.go.dev/github.com/slok/sloth@v0.6.0/pkg/prometheus/api/v1#section-readme
For latency SLOs, pageserver, we want the "Raw SLI" type SLI, not the one that is based on events.
Seach for `error_ratio_query` ; example: https://sloth.dev/examples/default/raw-sli/
Use victoriametrics `histogram_share` to compute the error ratio.
It's the inverese of histogram_quantile.
https://docs.victoriametrics.com/MetricsQL.html#histogram_share
`share_le_over_time` seems also useful
https://docs.victoriametrics.com/MetricsQL.html#share_le_over_time
https://stackoverflow.com/questions/72559302/is-it-possible-to-calculate-ranks-of-metrics?rq=1
Problem with the VictoriaMetrics-only functions is that sloth has an internal validation pass:
https://github.com/slok/sloth/issues/510
Option to skip the check:
https://github.com/slok/sloth/pull/511
=>
```
git submodule update --init
pushd sloth.git
make build
popd
sloth.git/bin/sloth-linux-amd64 generate \
--disable-promExpr-validation \
--input ./spec.sloth.yml \
--out generated.prometheus.rules.yml
```
## Notes On How To Scale The Process To Multiple Teams / Automate Sloth In Neon
* SLO directory discovery: https://sloth.dev/usage/cli/
* allows using directories instead of indivudal files as input

View File

@@ -0,0 +1,4 @@
version: "prometheus/v1"
service: "pageserver"
slos: []

View File

@@ -0,0 +1,63 @@
version: "prometheus/v1"
service: "pageserver"
#labels:
# owner: "pageserver-team"
# repo: "myorg/myservice"
# tier: "2"
slos:
# # We allow failing (5xx and 429) 1 request every 1000 requests (99.9%).
# - name: "requests-availability"
# objective: 99.9
# description: "Common SLO based on availability for HTTP request responses."
# sli:
# events:
# error_query: sum(rate(http_request_duration_seconds_count{job="myservice",code=~"(5..|429)"}[{{.window}}]))
# total_query: sum(rate(http_request_duration_seconds_count{job="myservice"}[{{.window}}]))
# alerting:
# name: MyServiceHighErrorRate
# labels:
# category: "availability"
# annotations:
# # Overwrite default Sloth SLO alert summmary on ticket and page alerts.
# summary: "High error rate on 'myservice' requests responses"
# page_alert:
# labels:
# severity: pageteam
# routing_key: myteam
# ticket_alert:
# labels:
# severity: "slack"
# slack_channel: "#alerts-myteam"
- name: "basebackup_ms-latency"
description: "basebackup_ms latency should be less than 0.2s for 99.9% of requests"
objective: 99.9
#labels:
sli:
raw:
# VictoriaMetrics-only (fails due to sloth validation pass)
error_ratio_query: |
histogram_share(200, sum by (le) (compute_basebackup_ms_bucket[5m]))
# error_ratio_query: |
# https://docs.victoriametrics.com/MetricsQL.html#share_le_over_time
#
# Prometheus has a native one, not supported by VictoriaMetrics validation pass
#error_ratio_query: |
# histogram_fraction(0, 200, compute_basebackup_ms_bucket[{{.window}}])
# error_ratio_query: |
# histogram_quantile(0.999, sum by (le) (rate(compute_basebackup_ms_bucket[{{.window}}])))
alerting:
name: "basebackup_ms-latency"
labels:
category: "latency"
annotations: {}
page_alert:
labels:
severity: "pageteam"
routing_key: "myteam"
ticket_alert:
labels:
severity: "slack"
slack_channel: "#alerts-myteam"

View File

@@ -33,6 +33,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
use tracing::field;
use tracing::*;
use utils::id::ConnectionId;
use utils::{
@@ -51,6 +52,7 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::mgr;
use crate::tenant::mgr::GetTenantError;
use crate::tenant::{Tenant, Timeline};
@@ -238,6 +240,7 @@ pub async fn libpq_listener_main(
Ok(())
}
#[instrument(skip_all, fields(peer_addr))]
async fn page_service_conn_main(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
@@ -260,6 +263,7 @@ async fn page_service_conn_main(
.context("could not set TCP_NODELAY")?;
let peer_addr = socket.peer_addr().context("get peer address")?;
tracing::Span::current().record("peer_addr", field::display(peer_addr));
// setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
// - long enough for most valid compute connections
@@ -362,7 +366,7 @@ impl PageServerHandler {
}
}
#[instrument(skip(self, pgb, ctx))]
#[instrument(skip_all)]
async fn handle_pagerequests<IO>(
&self,
pgb: &mut PostgresBackend<IO>,
@@ -373,6 +377,8 @@ impl PageServerHandler {
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
debug_assert_current_span_has_tenant_and_timeline_id();
// NOTE: pagerequests handler exits when connection is closed,
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
@@ -473,7 +479,7 @@ impl PageServerHandler {
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip(self, pgb, ctx))]
#[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
async fn handle_import_basebackup<IO>(
&self,
pgb: &mut PostgresBackend<IO>,
@@ -487,6 +493,8 @@ impl PageServerHandler {
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
debug_assert_current_span_has_tenant_and_timeline_id();
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
@@ -531,7 +539,7 @@ impl PageServerHandler {
Ok(())
}
#[instrument(skip(self, pgb, ctx))]
#[instrument(skip_all, fields(%start_lsn, %end_lsn))]
async fn handle_import_wal<IO>(
&self,
pgb: &mut PostgresBackend<IO>,
@@ -544,6 +552,7 @@ impl PageServerHandler {
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
debug_assert_current_span_has_tenant_and_timeline_id();
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
@@ -738,7 +747,7 @@ impl PageServerHandler {
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip(self, pgb, ctx))]
#[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
async fn handle_basebackup_request<IO>(
&mut self,
pgb: &mut PostgresBackend<IO>,
@@ -752,6 +761,8 @@ impl PageServerHandler {
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
debug_assert_current_span_has_tenant_and_timeline_id();
let started = std::time::Instant::now();
// check that the timeline exists
@@ -862,6 +873,7 @@ where
Ok(())
}
#[instrument(skip_all, fields(tenant_id, timeline_id))]
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
@@ -883,6 +895,10 @@ where
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_id))?;
self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
@@ -902,6 +918,10 @@ where
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_id))?;
let lsn = if params.len() >= 3 {
@@ -948,6 +968,10 @@ where
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_id))?;
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
@@ -979,6 +1003,10 @@ where
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));
// The caller is responsible for providing correct lsn and prev_lsn.
let lsn = if params.len() > 2 {
Some(
@@ -1033,6 +1061,10 @@ where
let pg_version = u32::from_str(params[4])
.with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_id))?;
match self
@@ -1077,6 +1109,10 @@ where
let end_lsn = Lsn::from_str(params[3])
.with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_id))?;
match self
@@ -1108,6 +1144,8 @@ where
let tenant_id = TenantId::from_str(params[0])
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
tracing::Span::current().record("tenant_id", field::display(tenant_id));
self.check_permission(Some(tenant_id))?;
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;

View File

@@ -651,19 +651,35 @@ impl LayerMap {
#[cfg(test)]
mod tests {
use super::LayerMap;
use crate::tenant::storage_layer::{tests::LayerDescriptor, LayerFileName};
use crate::tenant::storage_layer::LayerFileName;
use std::str::FromStr;
use std::sync::Arc;
mod l0_delta_layers_updated {
use crate::tenant::{
storage_layer::{PersistentLayer, PersistentLayerDesc},
storage_layer::{AsLayerDesc, PersistentLayerDesc},
timeline::LayerFileManager,
};
use super::*;
struct LayerObject(PersistentLayerDesc);
impl AsLayerDesc for LayerObject {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.0
}
}
impl LayerObject {
fn new(desc: PersistentLayerDesc) -> Self {
LayerObject(desc)
}
}
type TestLayerFileManager = LayerFileManager<LayerObject>;
#[test]
fn for_full_range_delta() {
// l0_delta_layers are used by compaction, and should observe all buffered updates
@@ -700,18 +716,18 @@ mod tests {
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
let layer = LayerFileName::from_str(layer).unwrap();
let layer = LayerDescriptor::from(layer);
let layer = PersistentLayerDesc::from(layer);
// same skeletan construction; see scenario below
let not_found = Arc::new(layer.clone());
let new_version = Arc::new(layer);
let not_found = Arc::new(LayerObject::new(layer.clone()));
let new_version = Arc::new(LayerObject::new(layer));
// after the immutable storage state refactor, the replace operation
// will not use layer map any more. We keep it here for consistency in test cases
// and can remove it in the future.
let _map = LayerMap::default();
let mut mapping = LayerFileManager::new();
let mut mapping = TestLayerFileManager::new();
mapping
.replace_and_verify(not_found, new_version)
@@ -720,10 +736,10 @@ mod tests {
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
let name = LayerFileName::from_str(layer_name).unwrap();
let skeleton = LayerDescriptor::from(name);
let skeleton = PersistentLayerDesc::from(name);
let remote = Arc::new(skeleton.clone());
let downloaded = Arc::new(skeleton);
let remote = Arc::new(LayerObject::new(skeleton.clone()));
let downloaded = Arc::new(LayerObject::new(skeleton));
let mut map = LayerMap::default();
let mut mapping = LayerFileManager::new();

View File

@@ -376,6 +376,12 @@ pub type LayerIter<'i> = Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + 'i
/// Returned by [`Layer::key_iter`]
pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i + Send>;
/// Get a layer descriptor from a layer.
pub trait AsLayerDesc {
/// Get the layer descriptor.
fn layer_desc(&self) -> &PersistentLayerDesc;
}
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
/// range of LSNs.
///
@@ -389,10 +395,8 @@ pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i + Send
/// A delta layer contains all modifications within a range of LSNs and keys.
/// An image layer is a snapshot of all the data in a key-range, at a single
/// LSN.
pub trait PersistentLayer: Layer {
/// Get the layer descriptor.
fn layer_desc(&self) -> &PersistentLayerDesc;
pub trait PersistentLayer: Layer + AsLayerDesc {
/// Identify the tenant this layer belongs to
fn get_tenant_id(&self) -> TenantId {
self.layer_desc().tenant_id
}
@@ -458,119 +462,32 @@ pub fn downcast_remote_layer(
pub mod tests {
use super::*;
/// Holds metadata about a layer without any content. Used mostly for testing.
///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
/// LayerDescriptor.
#[derive(Clone, Debug)]
pub struct LayerDescriptor {
base: PersistentLayerDesc,
}
impl From<PersistentLayerDesc> for LayerDescriptor {
fn from(base: PersistentLayerDesc) -> Self {
Self { base }
}
}
impl Layer for LayerDescriptor {
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_key_range(&self) -> Range<Key> {
self.layer_desc().key_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_lsn_range(&self) -> Range<Lsn> {
self.layer_desc().lsn_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental
}
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
impl std::fmt::Display for LayerDescriptor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
}
}
impl PersistentLayer for LayerDescriptor {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.base
}
fn local_path(&self) -> Option<PathBuf> {
unimplemented!()
}
fn iter(&self, _: &RequestContext) -> Result<LayerIter<'_>> {
unimplemented!()
}
fn key_iter(&self, _: &RequestContext) -> Result<LayerKeyIter<'_>> {
unimplemented!()
}
fn delete_resident_layer_file(&self) -> Result<()> {
unimplemented!()
}
fn info(&self, _: LayerAccessStatsReset) -> HistoricLayerInfo {
unimplemented!()
}
fn access_stats(&self) -> &LayerAccessStats {
unimplemented!()
}
}
impl From<DeltaFileName> for LayerDescriptor {
impl From<DeltaFileName> for PersistentLayerDesc {
fn from(value: DeltaFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_delta(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn_range,
233,
),
}
PersistentLayerDesc::new_delta(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn_range,
233,
)
}
}
impl From<ImageFileName> for LayerDescriptor {
impl From<ImageFileName> for PersistentLayerDesc {
fn from(value: ImageFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_img(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn,
false,
233,
),
}
PersistentLayerDesc::new_img(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn,
false,
233,
)
}
}
impl From<LayerFileName> for LayerDescriptor {
impl From<LayerFileName> for PersistentLayerDesc {
fn from(value: LayerFileName) -> Self {
match value {
LayerFileName::Delta(d) => Self::from(d),

View File

@@ -56,8 +56,8 @@ use utils::{
};
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
PathOrConf, PersistentLayerDesc,
AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerIter,
LayerKeyIter, PathOrConf, PersistentLayerDesc,
};
///
@@ -403,11 +403,13 @@ impl std::fmt::Display for DeltaLayer {
}
}
impl PersistentLayer for DeltaLayer {
impl AsLayerDesc for DeltaLayer {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.desc
}
}
impl PersistentLayer for DeltaLayer {
fn local_path(&self) -> Option<PathBuf> {
Some(self.path())
}

View File

@@ -53,7 +53,9 @@ use utils::{
};
use super::filename::ImageFileName;
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf, PersistentLayerDesc};
use super::{
AsLayerDesc, Layer, LayerAccessStatsReset, LayerIter, PathOrConf, PersistentLayerDesc,
};
///
/// Header stored in the beginning of the file
@@ -241,11 +243,13 @@ impl std::fmt::Display for ImageLayer {
}
}
impl PersistentLayer for ImageLayer {
impl AsLayerDesc for ImageLayer {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.desc
}
}
impl PersistentLayer for ImageLayer {
fn local_path(&self) -> Option<PathBuf> {
Some(self.path())
}

View File

@@ -20,8 +20,8 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName};
use super::{
DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter,
LayerKeyIter, LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
@@ -115,11 +115,13 @@ impl std::fmt::Display for RemoteLayer {
}
}
impl PersistentLayer for RemoteLayer {
impl AsLayerDesc for RemoteLayer {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.desc
}
}
impl PersistentLayer for RemoteLayer {
fn local_path(&self) -> Option<PathBuf> {
None
}

View File

@@ -90,7 +90,8 @@ use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::storage_layer::{
DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc, PersistentLayerKey,
AsLayerDesc, DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc,
PersistentLayerKey,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -124,10 +125,12 @@ impl PartialOrd for Hole {
}
}
pub struct LayerFileManager(HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>);
pub struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
HashMap<PersistentLayerKey, Arc<T>>,
);
impl LayerFileManager {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<T> {
// The assumption for the `expect()` is that all code maintains the following invariant:
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
self.0
@@ -137,7 +140,7 @@ impl LayerFileManager {
.clone()
}
pub(crate) fn insert(&mut self, layer: Arc<dyn PersistentLayer>) {
pub(crate) fn insert(&mut self, layer: Arc<T>) {
let present = self.0.insert(layer.layer_desc().key(), layer.clone());
if present.is_some() && cfg!(debug_assertions) {
panic!("overwriting a layer: {:?}", layer.layer_desc())
@@ -148,7 +151,7 @@ impl LayerFileManager {
Self(HashMap::new())
}
pub(crate) fn remove(&mut self, layer: Arc<dyn PersistentLayer>) {
pub(crate) fn remove(&mut self, layer: Arc<T>) {
let present = self.0.remove(&layer.layer_desc().key());
if present.is_none() && cfg!(debug_assertions) {
panic!(
@@ -158,11 +161,7 @@ impl LayerFileManager {
}
}
pub(crate) fn replace_and_verify(
&mut self,
expected: Arc<dyn PersistentLayer>,
new: Arc<dyn PersistentLayer>,
) -> Result<()> {
pub(crate) fn replace_and_verify(&mut self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
let key = expected.layer_desc().key();
let other = new.layer_desc().key();
@@ -209,7 +208,6 @@ fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
drop(rlock)
}
pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,

View File

@@ -9,7 +9,6 @@ use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
use std::future;
use tokio::net::{TcpListener, TcpStream};
use tracing::{error, info, info_span, Instrument};
use tracing_utils::instrument::InstrumentCancel;
static CPLANE_WAITERS: Lazy<Waiters<ComputeReady>> = Lazy::new(Default::default);
@@ -51,14 +50,23 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result<()> {
async move {
info!("serving a new console management API connection");
// these might be long running connections, have a separate logging for cancelling
// on shutdown and other ways of stopping.
let cancelled = scopeguard::guard(tracing::Span::current(), |span| {
let _e = span.entered();
info!("console management API task cancelled");
});
if let Err(e) = handle_connection(socket).await {
error!("serving failed with an error: {e}");
} else {
info!("serving completed");
}
// we can no longer get dropped
scopeguard::ScopeGuard::into_inner(cancelled);
}
.instrument(span)
.with_cancel_info(),
.instrument(span),
);
}
}

View File

@@ -1,7 +1,6 @@
use parking_lot::Mutex;
use pq_proto::StartupMessageParams;
use std::fmt;
use std::ops::ControlFlow;
use std::{collections::HashMap, sync::Arc};
use tokio::time;
@@ -10,7 +9,8 @@ use crate::{auth, console};
use super::sql_over_http::MAX_RESPONSE_SIZE;
use crate::proxy::{invalidate_cache, retry_after, try_wake, NUM_RETRIES_WAKE_COMPUTE};
use crate::proxy::try_wake;
use crate::proxy::{BASE_RETRY_WAIT_DURATION, NUM_RETRIES_WAKE_COMPUTE};
use tracing::error;
use tracing::info;
@@ -184,10 +184,11 @@ impl GlobalConnPool {
}
}
//
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
#[tracing::instrument(skip_all)]
//
async fn connect_to_compute(
config: &config::ProxyConfig,
conn_info: &ConnInfo,
@@ -219,72 +220,54 @@ async fn connect_to_compute(
let node_info = &mut creds.wake_compute(&extra).await?.expect("msg");
// This code is a copy of `connect_to_compute` from `src/proxy.rs` with
// the difference that it uses `tokio_postgres` for the connection.
let mut num_retries = 0;
let mut wait_duration = time::Duration::ZERO;
let mut should_wake_with_error = None;
let mut should_wake = true;
loop {
if !wait_duration.is_zero() {
time::sleep(wait_duration).await;
}
// try wake the compute node if we have determined it's sensible to do so
if let Some(err) = should_wake_with_error.take() {
match try_wake(node_info, &extra, &creds).await {
// we can't wake up the compute node
Ok(None) => return Err(err),
// there was an error communicating with the control plane
Err(e) => return Err(e.into()),
// failed to wake up but we can continue to retry
Ok(Some(ControlFlow::Continue(()))) => {
wait_duration = retry_after(num_retries);
should_wake_with_error = Some(err);
num_retries += 1;
info!(num_retries, "retrying wake compute");
continue;
}
// successfully woke up a compute node and can break the wakeup loop
Ok(Some(ControlFlow::Break(()))) => {}
}
}
match connect_to_compute_once(node_info, conn_info).await {
Ok(res) => return Ok(res),
Err(e) => {
error!(error = ?e, "could not connect to compute node");
if !can_retry_error(&e, num_retries) {
Err(e) if num_retries == NUM_RETRIES_WAKE_COMPUTE => {
if let Some(wait_duration) = retry_connect_in(&e, num_retries) {
error!(error = ?e, "could not connect to compute node");
if should_wake {
match try_wake(node_info, &extra, &creds).await {
Ok(Some(x)) => should_wake = x,
Ok(None) => return Err(e.into()),
Err(e) => return Err(e.into()),
}
}
if !wait_duration.is_zero() {
time::sleep(wait_duration).await;
}
} else {
return Err(e.into());
}
wait_duration = retry_after(num_retries);
// after the first connect failure,
// we should invalidate the cache and wake up a new compute node
if num_retries == 0 {
invalidate_cache(node_info);
should_wake_with_error = Some(e.into());
}
}
other => return Ok(other?),
}
num_retries += 1;
info!(num_retries, "retrying connect");
info!(retries_left = num_retries, "retrying connect");
}
}
fn can_retry_error(err: &tokio_postgres::Error, num_retries: u32) -> bool {
fn retry_connect_in(err: &tokio_postgres::Error, num_retries: u32) -> Option<time::Duration> {
use tokio_postgres::error::SqlState;
match err.code() {
// retry all errors at least once
_ if num_retries == 0 => true,
// keep retrying connection errors
// retry all errors at least once immediately
_ if num_retries == 0 => Some(time::Duration::ZERO),
// keep retrying connection errors every 100ms
Some(
&SqlState::CONNECTION_FAILURE
| &SqlState::CONNECTION_EXCEPTION
| &SqlState::CONNECTION_DOES_NOT_EXIST
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
) if num_retries < NUM_RETRIES_WAKE_COMPUTE => true,
) => {
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries))
}
// otherwise, don't retry
_ => false,
_ => None,
}
}

View File

@@ -20,7 +20,7 @@ use hyper::StatusCode;
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
use std::{ops::ControlFlow, sync::Arc};
use std::sync::Arc;
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
time,
@@ -32,7 +32,7 @@ use utils::measured_stream::MeasuredStream;
/// Number of times we should retry the `/proxy_wake_compute` http request.
/// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n
pub const NUM_RETRIES_WAKE_COMPUTE: u32 = 10;
const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100);
pub const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100);
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
@@ -335,37 +335,11 @@ async fn connect_to_compute(
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
) -> Result<PostgresConnection, compute::ConnectionError> {
let mut num_retries = 0;
let mut wait_duration = time::Duration::ZERO;
let mut should_wake_with_error = None;
let mut should_wake = true;
loop {
// Apply startup params to the (possibly, cached) compute node info.
node_info.config.set_startup_params(params);
if !wait_duration.is_zero() {
time::sleep(wait_duration).await;
}
// try wake the compute node if we have determined it's sensible to do so
if let Some(err) = should_wake_with_error.take() {
match try_wake(node_info, extra, creds).await {
// we can't wake up the compute node
Ok(None) => return Err(err),
// there was an error communicating with the control plane
Err(e) => return Err(io_error(e).into()),
// failed to wake up but we can continue to retry
Ok(Some(ControlFlow::Continue(()))) => {
wait_duration = retry_after(num_retries);
should_wake_with_error = Some(err);
num_retries += 1;
info!(num_retries, "retrying wake compute");
continue;
}
// successfully woke up a compute node and can break the wakeup loop
Ok(Some(ControlFlow::Break(()))) => {}
}
}
// Set a shorter timeout for the initial connection attempt.
//
// In case we try to connect to an outdated address that is no longer valid, the
@@ -386,25 +360,30 @@ async fn connect_to_compute(
};
match connect_to_compute_once(node_info, timeout).await {
Ok(res) => return Ok(res),
Err(e) => {
error!(error = ?e, "could not connect to compute node");
if !can_retry_error(&e, num_retries) {
Err(e) if num_retries < NUM_RETRIES_WAKE_COMPUTE => {
if let Some(wait_duration) = retry_connect_in(&e, num_retries) {
error!(error = ?e, "could not connect to compute node");
if should_wake {
match try_wake(node_info, extra, creds).await {
Ok(Some(x)) => {
should_wake = x;
}
Ok(None) => return Err(e),
Err(e) => return Err(io_error(e).into()),
}
}
if !wait_duration.is_zero() {
time::sleep(wait_duration).await;
}
} else {
return Err(e);
}
wait_duration = retry_after(num_retries);
// after the first connect failure,
// we should invalidate the cache and wake up a new compute node
if num_retries == 0 {
invalidate_cache(node_info);
should_wake_with_error = Some(e);
}
}
other => return other,
}
num_retries += 1;
info!(num_retries, "retrying connect");
info!(retries_left = num_retries, "retrying connect");
}
}
@@ -416,51 +395,41 @@ pub async fn try_wake(
node_info: &mut console::CachedNodeInfo,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
) -> Result<Option<ControlFlow<()>>, WakeComputeError> {
) -> Result<Option<bool>, WakeComputeError> {
info!("compute node's state has likely changed; requesting a wake-up");
invalidate_cache(node_info);
match creds.wake_compute(extra).await {
// retry wake if the compute was in an invalid state
Err(WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
})) => Ok(Some(ControlFlow::Continue(()))),
})) => Ok(Some(true)),
// Update `node_info` and try again.
Ok(Some(mut new)) => {
new.config.reuse_password(&node_info.config);
*node_info = new;
Ok(Some(ControlFlow::Break(())))
Ok(Some(false))
}
Err(e) => Err(e),
Ok(None) => Ok(None),
}
}
fn can_retry_error(err: &compute::ConnectionError, num_retries: u32) -> bool {
fn retry_connect_in(err: &compute::ConnectionError, num_retries: u32) -> Option<time::Duration> {
use std::io::ErrorKind;
match err {
// retry all errors at least once
_ if num_retries == 0 => true,
// keep retrying connection errors
compute::ConnectionError::CouldNotConnect(io_err)
if num_retries < NUM_RETRIES_WAKE_COMPUTE =>
{
matches!(
io_err.kind(),
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable
)
}
// retry all errors at least once immediately
_ if num_retries == 0 => Some(time::Duration::ZERO),
// keep retrying connection errors every 100ms
compute::ConnectionError::CouldNotConnect(io_err) => match io_err.kind() {
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable => {
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries))
}
_ => None,
},
// otherwise, don't retry
_ => false,
}
}
pub fn retry_after(num_retries: u32) -> time::Duration {
match num_retries {
0 => time::Duration::ZERO,
_ => {
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)
}
_ => None,
}
}

View File

@@ -1,4 +1,6 @@
//! A group of high-level tests for connection establishing logic and auth.
use std::io;
use super::*;
use crate::{auth, sasl, scram};
use async_trait::async_trait;
@@ -297,9 +299,14 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
#[test]
fn connect_compute_total_wait() {
let err = compute::ConnectionError::CouldNotConnect(io::Error::new(
io::ErrorKind::ConnectionRefused,
"conn refused",
));
let mut total_wait = tokio::time::Duration::ZERO;
for num_retries in 0..10 {
total_wait += retry_after(num_retries);
total_wait += retry_connect_in(&err, num_retries).unwrap();
}
assert!(total_wait < tokio::time::Duration::from_secs(12));
assert!(total_wait > tokio::time::Duration::from_secs(10));

View File

@@ -3109,3 +3109,18 @@ def last_flush_lsn_upload(
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
return last_flush_lsn
def parse_project_git_version_output(s: str) -> str:
"""
Parses the git commit hash out of the --version output supported at least by neon_local.
The information is generated by utils::project_git_version!
"""
import re
res = re.search(r"git(-env)?:([0-9a-fA-F]{8,40})(-\S+)?", s)
if res and (commit := res.group(2)):
return commit
raise ValueError(f"unable to parse --version output: '{s}'")

View File

@@ -14,6 +14,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
PortDistributor,
parse_project_git_version_output,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import (
@@ -352,7 +353,7 @@ def prepare_snapshot(
# get git SHA of neon binary
def get_neon_version(neon_binpath: Path):
out = subprocess.check_output([neon_binpath / "neon_local", "--version"]).decode("utf-8")
return out.split("git:", 1)[1].rstrip()
return parse_project_git_version_output(out)
def check_neon_works(

View File

@@ -1,12 +1,18 @@
import os
import subprocess
from pathlib import Path
from typing import cast
import pytest
import requests
from fixtures.neon_fixtures import (
DEFAULT_BRANCH_NAME,
NeonEnv,
NeonEnvBuilder,
parse_project_git_version_output,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pg_version import PgVersion, skip_on_postgres
from fixtures.types import TenantId, TimelineId
@@ -131,3 +137,66 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
# Default stop
res = env.neon_cli.raw_cli(["stop"])
res.check_returncode()
@skip_on_postgres(PgVersion.V14, reason="does not use postgres")
@pytest.mark.skipif(
os.environ.get("BUILD_TYPE") == "debug", reason="unit test for test support, either build works"
)
def test_parse_project_git_version_output_positive():
commit = "b6f77b5816cf1dba12a3bc8747941182ce220846"
positive = [
# most likely when developing locally
f"Neon CLI git:{commit}-modified",
# when developing locally
f"Neon CLI git:{commit}",
# this is not produced in practice, but the impl supports it
f"Neon CLI git-env:{commit}-modified",
# most likely from CI or docker build
f"Neon CLI git-env:{commit}",
]
for example in positive:
assert parse_project_git_version_output(example) == commit
@skip_on_postgres(PgVersion.V14, reason="does not use postgres")
@pytest.mark.skipif(
os.environ.get("BUILD_TYPE") == "debug", reason="unit test for test support, either build works"
)
def test_parse_project_git_version_output_local_docker():
"""
Makes sure the tests don't accept the default version in Dockerfile one gets without providing
a commit lookalike in --build-arg GIT_VERSION=XXX
"""
input = "Neon CLI git-env:local"
with pytest.raises(ValueError) as e:
parse_project_git_version_output(input)
assert input in str(e)
@skip_on_postgres(PgVersion.V14, reason="does not use postgres")
@pytest.mark.skipif(
os.environ.get("BUILD_TYPE") == "debug", reason="cli api sanity, either build works"
)
def test_binaries_version_parses(neon_binpath: Path):
"""
Ensures that we can parse the actual outputs of --version from a set of binaries.
The list is not meant to be exhaustive, and compute_ctl has a different way for example.
"""
binaries = [
"neon_local",
"pageserver",
"safekeeper",
"proxy",
"pg_sni_router",
"storage_broker",
]
for bin in binaries:
out = subprocess.check_output([neon_binpath / bin, "--version"]).decode("utf-8")
parse_project_git_version_output(out)