Compare commits

..

1 Commits

Author SHA1 Message Date
John Spray
f9fcb176e8 tests: extend node offline allow list 2025-04-03 13:52:03 +02:00
91 changed files with 417 additions and 1644 deletions

View File

@@ -8,7 +8,7 @@ target_tag = os.getenv("TARGET_TAG")
branch = os.getenv("BRANCH")
dev_acr = os.getenv("DEV_ACR")
prod_acr = os.getenv("PROD_ACR")
dev_aws = "12345"
dev_aws = os.getenv("DEV_AWS")
prod_aws = os.getenv("PROD_AWS")
aws_region = os.getenv("AWS_REGION")
@@ -39,18 +39,12 @@ registries = {
],
}
release_branches = ["release", "release-proxy", "release-compute"]
outputs: dict[str, dict[str, list[str]]] = {}
target_tags = (
[target_tag, "latest"]
if branch == "main"
else [target_tag, "released"]
if branch in release_branches
else [target_tag]
target_tags = [target_tag, "latest"] if branch == "main" else [target_tag]
target_stages = (
["dev", "prod"] if branch in ["release", "release-proxy", "release-compute"] else ["dev"]
)
target_stages = ["dev", "prod"] if branch in release_branches else ["dev"]
for component_name, component_images in components.items():
for stage in target_stages:

View File

@@ -2,9 +2,6 @@ import json
import os
import subprocess
RED = "\033[91m"
RESET = "\033[0m"
image_map = os.getenv("IMAGE_MAP")
if not image_map:
raise ValueError("IMAGE_MAP environment variable is not set")
@@ -32,14 +29,9 @@ while len(pending) > 0:
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
failures.append((" ".join(cmd), result.stdout, target))
failures.append((" ".join(cmd), result.stdout))
pending.append((source, target))
print(
f"{RED}[RETRY]{RESET} Push failed for {target}. Retrying... (failure count: {len(failures)})"
)
print(result.stdout)
if len(failures) > 0 and (github_output := os.getenv("GITHUB_OUTPUT")):
failed_targets = [target for _, _, target in failures]
with open(github_output, "a") as f:
f.write(f"push_failures={json.dumps(failed_targets)}\n")
f.write("slack_notify=true\n")

View File

@@ -110,19 +110,12 @@ jobs:
IMAGE_MAP: ${{ inputs.image-map }}
- name: Notify Slack if container image pushing fails
if: steps.push.outputs.push_failures || failure()
if: steps.push.outputs.slack_notify == 'true' || failure()
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
with:
method: chat.postMessage
token: ${{ secrets.SLACK_BOT_TOKEN }}
payload: |
channel: ${{ vars.SLACK_ON_CALL_DEVPROD_STREAM }}
text: >
*Container image pushing ${{
steps.push.outcome == 'failure' && 'failed completely' || 'succeeded with some retries'
}}* in
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
${{ steps.push.outputs.push_failures && format(
'*Failed targets:*\n• {0}', join(fromJson(steps.push.outputs.push_failures), '\n• ')
) || '' }}
text: |
Pushing container images failed in <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>

2
Cargo.lock generated
View File

@@ -4329,7 +4329,6 @@ dependencies = [
"strum",
"strum_macros",
"thiserror 1.0.69",
"tracing-utils",
"utils",
]
@@ -7604,7 +7603,6 @@ dependencies = [
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk",
"pin-project-lite",
"tokio",
"tracing",
"tracing-opentelemetry",

View File

@@ -292,7 +292,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.86.0
ENV RUSTC_VERSION=1.85.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1

View File

@@ -1055,6 +1055,34 @@ RUN if [ -d pg_embedding-src ]; then \
make -j $(getconf _NPROCESSORS_ONLN) install; \
fi
#########################################################################################
#
# Layer "pg_anon-build"
# compile anon extension
#
#########################################################################################
FROM build-deps AS pg_anon-src
ARG PG_VERSION
# This is an experimental extension, never got to real production.
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
WORKDIR /ext-src
RUN case "${PG_VERSION:?}" in "v17") \
echo "postgresql_anonymizer does not yet support PG17" && exit 0;; \
esac && \
wget https://github.com/neondatabase/postgresql_anonymizer/archive/refs/tags/neon_1.1.1.tar.gz -O pg_anon.tar.gz && \
echo "321ea8d5c1648880aafde850a2c576e4a9e7b9933a34ce272efc839328999fa9 pg_anon.tar.gz" | sha256sum --check && \
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C .
FROM pg-build AS pg_anon-build
COPY --from=pg_anon-src /ext-src/ /ext-src/
WORKDIR /ext-src
RUN if [ -d pg_anon-src ]; then \
cd pg_anon-src && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control; \
fi
#########################################################################################
#
# Layer "pg build with nonroot user and cargo installed"
@@ -1338,8 +1366,8 @@ ARG PG_VERSION
# Do not update without approve from proxy team
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
WORKDIR /ext-src
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.0.tar.gz -O pg_session_jwt.tar.gz && \
echo "19be2dc0b3834d643706ed430af998bb4c2cdf24b3c45e7b102bb3a550e8660c pg_session_jwt.tar.gz" | sha256sum --check && \
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.2.0.tar.gz -O pg_session_jwt.tar.gz && \
echo "5ace028e591f2e000ca10afa5b1ca62203ebff014c2907c0ec3b29c36f28a1bb pg_session_jwt.tar.gz" | sha256sum --check && \
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/version = "0.12.6"/version = "0.12.9"/g' pgrx-tests/Cargo.toml && \
@@ -1649,6 +1677,7 @@ COPY --from=pg_roaringbitmap-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_semver-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_embedding-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=wal2json-build /usr/local/pgsql /usr/local/pgsql
COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/

View File

@@ -33,7 +33,6 @@
import 'sql_exporter/lfc_hits.libsonnet',
import 'sql_exporter/lfc_misses.libsonnet',
import 'sql_exporter/lfc_used.libsonnet',
import 'sql_exporter/lfc_used_pages.libsonnet',
import 'sql_exporter/lfc_writes.libsonnet',
import 'sql_exporter/logical_slot_restart_lsn.libsonnet',
import 'sql_exporter/max_cluster_size.libsonnet',

View File

@@ -1,10 +0,0 @@
{
metric_name: 'lfc_used_pages',
type: 'gauge',
help: 'LFC pages used',
key_labels: null,
values: [
'lfc_used_pages',
],
query: importstr 'sql_exporter/lfc_used_pages.sql',
}

View File

@@ -1 +0,0 @@
SELECT lfc_value AS lfc_used_pages FROM neon.neon_lfc_stats WHERE lfc_key = 'file_cache_used_pages';

View File

@@ -2,6 +2,23 @@ diff --git a/expected/ut-A.out b/expected/ut-A.out
index da723b8..5328114 100644
--- a/expected/ut-A.out
+++ b/expected/ut-A.out
@@ -9,13 +9,16 @@ SET search_path TO public;
----
-- No.A-1-1-3
CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
-- No.A-1-2-3
DROP EXTENSION pg_hint_plan;
-- No.A-1-1-4
CREATE SCHEMA other_schema;
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
DROP SCHEMA other_schema;
----
---- No. A-5-1 comment pattern
@@ -3175,6 +3178,7 @@ SELECT s.query, s.calls
FROM public.pg_stat_statements s
JOIN pg_catalog.pg_database d
@@ -10,6 +27,18 @@ index da723b8..5328114 100644
ORDER BY 1;
query | calls
--------------------------------------+-------
diff --git a/expected/ut-fdw.out b/expected/ut-fdw.out
index d372459..6282afe 100644
--- a/expected/ut-fdw.out
+++ b/expected/ut-fdw.out
@@ -7,6 +7,7 @@ SET pg_hint_plan.debug_print TO on;
SET client_min_messages TO LOG;
SET pg_hint_plan.enable_hint TO on;
CREATE EXTENSION file_fdw;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');
diff --git a/sql/ut-A.sql b/sql/ut-A.sql
index 7c7d58a..4fd1a07 100644
--- a/sql/ut-A.sql

View File

@@ -1,3 +1,24 @@
diff --git a/expected/ut-A.out b/expected/ut-A.out
index e7d68a1..65a056c 100644
--- a/expected/ut-A.out
+++ b/expected/ut-A.out
@@ -9,13 +9,16 @@ SET search_path TO public;
----
-- No.A-1-1-3
CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
-- No.A-1-2-3
DROP EXTENSION pg_hint_plan;
-- No.A-1-1-4
CREATE SCHEMA other_schema;
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
DROP SCHEMA other_schema;
----
---- No. A-5-1 comment pattern
diff --git a/expected/ut-J.out b/expected/ut-J.out
index 2fa3c70..314e929 100644
--- a/expected/ut-J.out
@@ -139,3 +160,15 @@ index a09bd34..0ad227c 100644
error hint:
explain_filter
diff --git a/expected/ut-fdw.out b/expected/ut-fdw.out
index 017fa4b..98d989b 100644
--- a/expected/ut-fdw.out
+++ b/expected/ut-fdw.out
@@ -7,6 +7,7 @@ SET pg_hint_plan.debug_print TO on;
SET client_min_messages TO LOG;
SET pg_hint_plan.enable_hint TO on;
CREATE EXTENSION file_fdw;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');

View File

@@ -419,7 +419,7 @@ impl ComputeNode {
.iter()
.filter_map(|val| val.parse::<usize>().ok())
.map(|val| if val > 1 { val - 1 } else { 1 })
.next_back()
.last()
.unwrap_or(3)
}
}

View File

@@ -385,6 +385,8 @@ where
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
let ssl_ca_certs = match &cli.ssl_ca_file {
Some(ssl_ca_file) => {
let buf = tokio::fs::read(ssl_ca_file).await?;
@@ -399,11 +401,9 @@ async fn main() -> anyhow::Result<()> {
}
let http_client = http_client.build()?;
let storcon_client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
let mut trimmed = cli.api.to_string();
trimmed.pop();
let vps_client = mgmt_api::Client::new(http_client.clone(), trimmed, cli.jwt.as_deref());
let vps_client = mgmt_api::Client::new(http_client, trimmed, cli.jwt.as_deref());
match cli.command {
Command::NodeRegister {
@@ -1056,7 +1056,7 @@ async fn main() -> anyhow::Result<()> {
const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
let mut stream = futures::stream::iter(moves)
.map(|mv| {
let client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
let client = Client::new(cli.api.clone(), cli.jwt.clone());
async move {
client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(

View File

@@ -91,14 +91,14 @@ impl Server {
Ok(tls_stream) => tls_stream,
Err(err) => {
if !suppress_io_error(&err) {
info!(%remote_addr, "Failed to accept TLS connection: {err:#}");
info!("Failed to accept TLS connection: {err:#}");
}
return;
}
};
if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await {
if !suppress_hyper_error(&err) {
info!(%remote_addr, "Failed to serve HTTPS connection: {err:#}");
info!("Failed to serve HTTPS connection: {err:#}");
}
}
}
@@ -106,7 +106,7 @@ impl Server {
// Handle HTTP connection.
if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await {
if !suppress_hyper_error(&err) {
info!(%remote_addr, "Failed to serve HTTP connection: {err:#}");
info!("Failed to serve HTTP connection: {err:#}");
}
}
}

View File

@@ -34,7 +34,6 @@ postgres_backend.workspace = true
nix = {workspace = true, optional = true}
reqwest.workspace = true
rand.workspace = true
tracing-utils.workspace = true
[dev-dependencies]
bincode.workspace = true

View File

@@ -134,7 +134,6 @@ pub struct ConfigToml {
pub load_previous_heatmap: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub generate_unarchival_heatmap: Option<bool>,
pub tracing: Option<Tracing>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -192,54 +191,6 @@ pub enum GetVectoredConcurrentIo {
SidecarTask,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Ratio {
pub numerator: usize,
pub denominator: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct OtelExporterConfig {
pub endpoint: String,
pub protocol: OtelExporterProtocol,
#[serde(with = "humantime_serde")]
pub timeout: Duration,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum OtelExporterProtocol {
Grpc,
HttpBinary,
HttpJson,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Tracing {
pub sampling_ratio: Ratio,
pub export_config: OtelExporterConfig,
}
impl From<&OtelExporterConfig> for tracing_utils::ExportConfig {
fn from(val: &OtelExporterConfig) -> Self {
tracing_utils::ExportConfig {
endpoint: Some(val.endpoint.clone()),
protocol: val.protocol.into(),
timeout: val.timeout,
}
}
}
impl From<OtelExporterProtocol> for tracing_utils::Protocol {
fn from(val: OtelExporterProtocol) -> Self {
match val {
OtelExporterProtocol::Grpc => tracing_utils::Protocol::Grpc,
OtelExporterProtocol::HttpJson => tracing_utils::Protocol::HttpJson,
OtelExporterProtocol::HttpBinary => tracing_utils::Protocol::HttpBinary,
}
}
}
pub mod statvfs {
pub mod mock {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -586,7 +537,6 @@ impl Default for ConfigToml {
validate_wal_contiguity: None,
load_previous_heatmap: None,
generate_unarchival_heatmap: None,
tracing: None,
}
}
}

View File

@@ -558,7 +558,7 @@ async fn upload_large_enough_file(
) -> usize {
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
let body = bytes::Bytes::from(vec![0u8; 1024]);
let contents = std::iter::once(header).chain(std::iter::repeat_n(body, 128));
let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
let len = contents.clone().fold(0, |acc, next| acc + next.len());

View File

@@ -71,7 +71,6 @@ pub struct PeerInfo {
pub ts: Instant,
pub pg_connstr: String,
pub http_connstr: String,
pub https_connstr: Option<String>,
}
pub type FullTransactionId = u64;
@@ -228,8 +227,6 @@ pub struct TimelineDeleteResult {
pub dir_existed: bool,
}
pub type TenantDeleteResult = std::collections::HashMap<String, TimelineDeleteResult>;
fn lsn_invalid() -> Lsn {
Lsn::INVALID
}
@@ -262,8 +259,6 @@ pub struct SkTimelineInfo {
pub safekeeper_connstr: Option<String>,
#[serde(default)]
pub http_connstr: Option<String>,
#[serde(default)]
pub https_connstr: Option<String>,
// Minimum of all active RO replicas flush LSN
#[serde(default = "lsn_invalid")]
pub standby_horizon: Lsn,

View File

@@ -14,7 +14,6 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
pin-project-lite.workspace = true
[dev-dependencies]
tracing-subscriber.workspace = true # For examples in docs

View File

@@ -31,10 +31,10 @@
//! .init();
//! }
//! ```
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod http;
pub mod perf_span;
use opentelemetry::KeyValue;
use opentelemetry::trace::TracerProvider;

View File

@@ -1,153 +0,0 @@
//! Crutch module to work around tracing infrastructure deficiencies
//!
//! We wish to collect granular request spans without impacting performance
//! by much. Ideally, we should have zero overhead for a sampling rate of 0.
//!
//! The approach taken by the pageserver crate is to use a completely different
//! span hierarchy for the performance spans. Spans are explicitly stored in
//! the request context and use a different [`tracing::Subscriber`] in order
//! to avoid expensive filtering.
//!
//! [`tracing::Span`] instances record their [`tracing::Dispatch`] and, implcitly,
//! their [`tracing::Subscriber`] at creation time. However, upon exiting the span,
//! the global default [`tracing::Dispatch`] is used. This is problematic if one
//! wishes to juggle different subscribers.
//!
//! In order to work around this, this module provides a [`PerfSpan`] type which
//! wraps a [`Span`] and sets the default subscriber when exiting the span. This
//! achieves the correct routing.
//!
//! There's also a modified version of [`tracing::Instrument`] which works with
//! [`PerfSpan`].
use core::{
future::Future,
marker::Sized,
mem::ManuallyDrop,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
use tracing::{Dispatch, field, span::Span};
#[derive(Debug, Clone)]
pub struct PerfSpan {
inner: ManuallyDrop<Span>,
dispatch: Dispatch,
}
#[must_use = "once a span has been entered, it should be exited"]
pub struct PerfSpanEntered<'a> {
span: &'a PerfSpan,
}
impl PerfSpan {
pub fn new(span: Span, dispatch: Dispatch) -> Self {
Self {
inner: ManuallyDrop::new(span),
dispatch,
}
}
pub fn record<Q: field::AsField + ?Sized, V: field::Value>(
&self,
field: &Q,
value: V,
) -> &Self {
self.inner.record(field, value);
self
}
pub fn enter(&self) -> PerfSpanEntered {
if let Some(ref id) = self.inner.id() {
self.dispatch.enter(id);
}
PerfSpanEntered { span: self }
}
pub fn inner(&self) -> &Span {
&self.inner
}
}
impl Drop for PerfSpan {
fn drop(&mut self) {
// Bring the desired dispatch into scope before explicitly calling
// the span destructor. This routes the span exit to the correct
// [`tracing::Subscriber`].
let _dispatch_guard = tracing::dispatcher::set_default(&self.dispatch);
// SAFETY: ManuallyDrop in Drop implementation
unsafe { ManuallyDrop::drop(&mut self.inner) }
}
}
impl Drop for PerfSpanEntered<'_> {
fn drop(&mut self) {
assert!(self.span.inner.id().is_some());
let _dispatch_guard = tracing::dispatcher::set_default(&self.span.dispatch);
self.span.dispatch.exit(&self.span.inner.id().unwrap());
}
}
pub trait PerfInstrument: Sized {
fn instrument(self, span: PerfSpan) -> PerfInstrumented<Self> {
PerfInstrumented {
inner: ManuallyDrop::new(self),
span,
}
}
}
pin_project! {
#[project = PerfInstrumentedProj]
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PerfInstrumented<T> {
// `ManuallyDrop` is used here to to enter instrument `Drop` by entering
// `Span` and executing `ManuallyDrop::drop`.
#[pin]
inner: ManuallyDrop<T>,
span: PerfSpan,
}
impl<T> PinnedDrop for PerfInstrumented<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
let _enter = this.span.enter();
// SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
// different from wrapping `T` in `Option` and calling
// `Pin::set(&mut this.inner, None)`, except avoiding
// additional memory overhead.
// 2. `ManuallyDrop::drop()` is safe, because
// `PinnedDrop::drop()` is guaranteed to be called only
// once.
unsafe { ManuallyDrop::drop(this.inner.get_unchecked_mut()) }
}
}
}
impl<'a, T> PerfInstrumentedProj<'a, T> {
/// Get a mutable reference to the [`Span`] a pinned mutable reference to
/// the wrapped type.
fn span_and_inner_pin_mut(self) -> (&'a mut PerfSpan, Pin<&'a mut T>) {
// SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
// and `inner` is valid, because `ManuallyDrop::drop` is called
// only inside `Drop` of the `Instrumented`.
let inner = unsafe { self.inner.map_unchecked_mut(|v| &mut **v) };
(self.span, inner)
}
}
impl<T: Future> Future for PerfInstrumented<T> {
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (span, inner) = self.project().span_and_inner_pin_mut();
let _enter = span.enter();
inner.poll(cx)
}
}
impl<T: Sized> PerfInstrument for T {}

View File

@@ -35,7 +35,6 @@ use tokio::signal::unix::SignalKind;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::*;
use tracing_utils::OtelGuard;
use utils::auth::{JwtAuth, SwappableJwtAuth};
use utils::crashsafe::syncfs;
use utils::logging::TracingErrorLayerEnablement;
@@ -119,21 +118,6 @@ fn main() -> anyhow::Result<()> {
logging::Output::Stdout,
)?;
let otel_enablement = match &conf.tracing {
Some(cfg) => tracing_utils::OtelEnablement::Enabled {
service_name: "pageserver".to_string(),
export_config: (&cfg.export_config).into(),
runtime: *COMPUTE_REQUEST_RUNTIME,
},
None => tracing_utils::OtelEnablement::Disabled,
};
let otel_guard = tracing_utils::init_performance_tracing(otel_enablement);
if otel_guard.is_some() {
info!(?conf.tracing, "starting with OTEL tracing enabled");
}
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
// disarming this hook on pageserver, because we never tear down tracing.
logging::replace_panic_hook_with_tracing_panic_hook().forget();
@@ -207,7 +191,7 @@ fn main() -> anyhow::Result<()> {
tracing::info!("Initializing page_cache...");
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf, otel_guard).context("Failed to start pageserver")?;
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
scenario.teardown();
Ok(())
@@ -306,7 +290,6 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
fn start_pageserver(
launch_ts: &'static LaunchTimestamp,
conf: &'static PageServerConf,
otel_guard: Option<OtelGuard>,
) -> anyhow::Result<()> {
// Monotonic time for later calculating startup duration
let started_startup_at = Instant::now();
@@ -692,21 +675,13 @@ fn start_pageserver(
// Spawn a task to listen for libpq connections. It will spawn further tasks
// for each connection. We created the listener earlier already.
let perf_trace_dispatch = otel_guard.as_ref().map(|g| g.dispatch.clone());
let page_service = page_service::spawn(
conf,
tenant_manager.clone(),
pg_auth,
perf_trace_dispatch,
{
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
pageserver_listener
.set_nonblocking(true)
.context("set listener to nonblocking")?;
tokio::net::TcpListener::from_std(pageserver_listener)
.context("create tokio listener")?
},
);
let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, {
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
pageserver_listener
.set_nonblocking(true)
.context("set listener to nonblocking")?;
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
});
// All started up! Now just sit and wait for shutdown signal.
BACKGROUND_RUNTIME.block_on(async move {

View File

@@ -215,8 +215,6 @@ pub struct PageServerConf {
/// When set, include visible layers in the next uploaded heatmaps of an unarchived timeline.
pub generate_unarchival_heatmap: bool,
pub tracing: Option<pageserver_api::config::Tracing>,
}
/// Token for authentication to safekeepers
@@ -388,7 +386,6 @@ impl PageServerConf {
validate_wal_contiguity,
load_previous_heatmap,
generate_unarchival_heatmap,
tracing,
} = config_toml;
let mut conf = PageServerConf {
@@ -438,7 +435,6 @@ impl PageServerConf {
wal_receiver_protocol,
page_service_pipelining,
get_vectored_concurrent_io,
tracing,
// ------------------------------------------------------------
// fields that require additional validation or custom handling
@@ -510,17 +506,6 @@ impl PageServerConf {
);
}
if let Some(tracing_config) = conf.tracing.as_ref() {
let ratio = &tracing_config.sampling_ratio;
ensure!(
ratio.denominator != 0 && ratio.denominator >= ratio.numerator,
format!(
"Invalid sampling ratio: {}/{}",
ratio.numerator, ratio.denominator
)
);
}
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
.map_err(anyhow::Error::msg)
.with_context(|| {

View File

@@ -100,12 +100,6 @@ use crate::{
task_mgr::TaskKind,
tenant::Timeline,
};
use futures::FutureExt;
use futures::future::BoxFuture;
use std::future::Future;
use tracing_utils::perf_span::{PerfInstrument, PerfSpan};
use tracing::{Dispatch, Span};
// The main structure of this module, see module-level comment.
pub struct RequestContext {
@@ -115,8 +109,6 @@ pub struct RequestContext {
page_content_kind: PageContentKind,
read_path_debug: bool,
scope: Scope,
perf_span: Option<PerfSpan>,
perf_span_dispatch: Option<Dispatch>,
}
#[derive(Clone)]
@@ -271,15 +263,22 @@ impl RequestContextBuilder {
page_content_kind: PageContentKind::Unknown,
read_path_debug: false,
scope: Scope::new_global(),
perf_span: None,
perf_span_dispatch: None,
},
}
}
pub fn from(original: &RequestContext) -> Self {
pub fn extend(original: &RequestContext) -> Self {
Self {
inner: original.clone(),
// This is like a Copy, but avoid implementing Copy because ordinary users of
// RequestContext should always move or ref it.
inner: RequestContext {
task_kind: original.task_kind,
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
read_path_debug: original.read_path_debug,
scope: original.scope.clone(),
},
}
}
@@ -317,74 +316,12 @@ impl RequestContextBuilder {
self
}
pub(crate) fn perf_span_dispatch(mut self, dispatch: Option<Dispatch>) -> Self {
self.inner.perf_span_dispatch = dispatch;
self
}
pub fn root_perf_span<Fn>(mut self, make_span: Fn) -> Self
where
Fn: FnOnce() -> Span,
{
assert!(self.inner.perf_span.is_none());
assert!(self.inner.perf_span_dispatch.is_some());
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
let new_span = tracing::dispatcher::with_default(dispatcher, make_span);
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
self
}
pub fn perf_span<Fn>(mut self, make_span: Fn) -> Self
where
Fn: FnOnce(&Span) -> Span,
{
if let Some(ref perf_span) = self.inner.perf_span {
assert!(self.inner.perf_span_dispatch.is_some());
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
let new_span =
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
}
self
}
pub fn root(self) -> RequestContext {
self.inner
}
pub fn attached_child(self) -> RequestContext {
self.inner
}
pub fn detached_child(self) -> RequestContext {
pub fn build(self) -> RequestContext {
self.inner
}
}
impl RequestContext {
/// Private clone implementation
///
/// Callers should use the [`RequestContextBuilder`] or child spaning APIs of
/// [`RequestContext`].
fn clone(&self) -> Self {
Self {
task_kind: self.task_kind,
download_behavior: self.download_behavior,
access_stats_behavior: self.access_stats_behavior,
page_content_kind: self.page_content_kind,
read_path_debug: self.read_path_debug,
scope: self.scope.clone(),
perf_span: self.perf_span.clone(),
perf_span_dispatch: self.perf_span_dispatch.clone(),
}
}
/// Create a new RequestContext that has no parent.
///
/// The function is called `new` because, once we add children
@@ -400,7 +337,7 @@ impl RequestContext {
pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
RequestContextBuilder::new(task_kind)
.download_behavior(download_behavior)
.root()
.build()
}
/// Create a detached child context for a task that may outlive `self`.
@@ -421,10 +358,7 @@ impl RequestContext {
///
/// We could make new calls to this function fail if `self` is already canceled.
pub fn detached_child(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
RequestContextBuilder::from(self)
.task_kind(task_kind)
.download_behavior(download_behavior)
.detached_child()
self.child_impl(task_kind, download_behavior)
}
/// Create a child of context `self` for a task that shall not outlive `self`.
@@ -448,7 +382,7 @@ impl RequestContext {
/// The method to wait for child tasks would return an error, indicating
/// that the child task was not started because the context was canceled.
pub fn attached_child(&self) -> Self {
RequestContextBuilder::from(self).attached_child()
self.child_impl(self.task_kind(), self.download_behavior())
}
/// Use this function when you should be creating a child context using
@@ -463,10 +397,17 @@ impl RequestContext {
Self::new(task_kind, download_behavior)
}
fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
RequestContextBuilder::extend(self)
.task_kind(task_kind)
.download_behavior(download_behavior)
.build()
}
pub fn with_scope_timeline(&self, timeline: &Arc<Timeline>) -> Self {
RequestContextBuilder::from(self)
RequestContextBuilder::extend(self)
.scope(Scope::new_timeline(timeline))
.attached_child()
.build()
}
pub(crate) fn with_scope_page_service_pagestream(
@@ -475,9 +416,9 @@ impl RequestContext {
crate::page_service::TenantManagerTypes,
>,
) -> Self {
RequestContextBuilder::from(self)
RequestContextBuilder::extend(self)
.scope(Scope::new_page_service_pagestream(timeline_handle))
.attached_child()
.build()
}
pub fn with_scope_secondary_timeline(
@@ -485,30 +426,28 @@ impl RequestContext {
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Self {
RequestContextBuilder::from(self)
RequestContextBuilder::extend(self)
.scope(Scope::new_secondary_timeline(tenant_shard_id, timeline_id))
.attached_child()
.build()
}
pub fn with_scope_secondary_tenant(&self, tenant_shard_id: &TenantShardId) -> Self {
RequestContextBuilder::from(self)
RequestContextBuilder::extend(self)
.scope(Scope::new_secondary_tenant(tenant_shard_id))
.attached_child()
.build()
}
#[cfg(test)]
pub fn with_scope_unit_test(&self) -> Self {
RequestContextBuilder::from(self)
.task_kind(TaskKind::UnitTest)
RequestContextBuilder::new(TaskKind::UnitTest)
.scope(Scope::new_unit_test())
.attached_child()
.build()
}
pub fn with_scope_debug_tools(&self) -> Self {
RequestContextBuilder::from(self)
.task_kind(TaskKind::DebugTool)
RequestContextBuilder::new(TaskKind::DebugTool)
.scope(Scope::new_debug_tools())
.attached_child()
.build()
}
pub fn task_kind(&self) -> TaskKind {
@@ -565,61 +504,4 @@ impl RequestContext {
Scope::DebugTools { io_size_metrics } => io_size_metrics,
}
}
pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
span.inner().follows_from(from_span.inner());
}
}
pub(crate) fn perf_span_record<
Q: tracing::field::AsField + ?Sized,
V: tracing::field::Value,
>(
&self,
field: &Q,
value: V,
) {
if let Some(span) = &self.perf_span {
span.record(field, value);
}
}
pub(crate) fn has_perf_span(&self) -> bool {
self.perf_span.is_some()
}
}
/// [`Future`] extension trait that allow for creating performance
/// spans on sampled requests
pub(crate) trait PerfInstrumentFutureExt<'a>: Future + Send {
/// Instrument this future with a new performance span when the
/// provided request context indicates the originator request
/// was sampled. Otherwise, just box the future and return it as is.
fn maybe_perf_instrument<Fn>(
self,
ctx: &RequestContext,
make_span: Fn,
) -> BoxFuture<'a, Self::Output>
where
Self: Sized + 'a,
Fn: FnOnce(&Span) -> Span,
{
match &ctx.perf_span {
Some(perf_span) => {
assert!(ctx.perf_span_dispatch.is_some());
let dispatcher = ctx.perf_span_dispatch.as_ref().unwrap();
let new_span =
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
let new_perf_span = PerfSpan::new(new_span, dispatcher.clone());
self.instrument(new_perf_span).boxed()
}
None => self.boxed(),
}
}
}
// Implement the trait for all types that satisfy the trait bounds
impl<'a, T: Future + Send + 'a> PerfInstrumentFutureExt<'a> for T {}

View File

@@ -2697,12 +2697,11 @@ async fn getpage_at_lsn_handler_inner(
let lsn: Option<Lsn> = parse_query_param(&request, "lsn")?;
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
// Enable read path debugging
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
.download_behavior(DownloadBehavior::Download)
.scope(context::Scope::new_timeline(&timeline))
.read_path_debug(true)
.root();
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true)
.scope(context::Scope::new_timeline(&timeline)).build();
// Use last_record_lsn if no lsn is provided
let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
@@ -3189,8 +3188,7 @@ async fn list_aux_files(
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download)
.with_scope_timeline(&timeline);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let files = timeline
.list_aux_files(body.lsn, &ctx, io_concurrency)
.await?;
@@ -3434,15 +3432,14 @@ async fn put_tenant_timeline_import_wal(
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let span = info_span!("import_wal", tenant_id=%tenant_id, timeline_id=%timeline_id, start_lsn=%start_lsn, end_lsn=%end_lsn);
async move {
let state = get_state(&request);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, TenantShardId::unsharded(tenant_id), timeline_id).await?;
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
.download_behavior(DownloadBehavior::Warn)
.scope(context::Scope::new_timeline(&timeline))
.root();
let ctx = RequestContextBuilder::extend(&ctx).scope(context::Scope::new_timeline(&timeline)).build();
let mut body = StreamReader::new(request.into_body().map(|res| {
res.map_err(|error| {

View File

@@ -55,9 +55,6 @@ pub const DEFAULT_PG_VERSION: u32 = 16;
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
// Target used for performance traces.
pub const PERF_TRACE_TARGET: &str = "P";
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;

View File

@@ -9,7 +9,6 @@ use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use std::{io, str};
use crate::PERF_TRACE_TARGET;
use anyhow::{Context, bail};
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
@@ -18,7 +17,7 @@ use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedExecutionStrategy, Tracing,
PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::models::{
@@ -37,7 +36,6 @@ use postgres_ffi::BLCKSZ;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use pq_proto::framed::ConnectionError;
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
use rand::Rng;
use strum_macros::IntoStaticStr;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
@@ -55,9 +53,7 @@ use utils::sync::spsc_fold;
use crate::auth::check_permission;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
TimelineMetrics,
@@ -104,7 +100,6 @@ pub fn spawn(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
pg_auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
tcp_listener: tokio::net::TcpListener,
) -> Listener {
let cancel = CancellationToken::new();
@@ -122,7 +117,6 @@ pub fn spawn(
conf,
tenant_manager,
pg_auth,
perf_trace_dispatch,
tcp_listener,
conf.pg_auth_type,
conf.page_service_pipelining.clone(),
@@ -179,7 +173,6 @@ pub async fn libpq_listener_main(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
listener: tokio::net::TcpListener,
auth_type: AuthType,
pipelining_config: PageServicePipeliningConfig,
@@ -212,12 +205,8 @@ pub async fn libpq_listener_main(
// Connection established. Spawn a new task to handle it.
debug!("accepted connection from {}", peer_addr);
let local_auth = auth.clone();
let connection_ctx = RequestContextBuilder::from(&listener_ctx)
.task_kind(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
.perf_span_dispatch(perf_trace_dispatch.clone())
.detached_child();
let connection_ctx = listener_ctx
.detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
connection_handler_tasks.spawn(page_service_conn_main(
conf,
tenant_manager.clone(),
@@ -618,7 +607,6 @@ impl std::fmt::Display for BatchedPageStreamError {
struct BatchedGetPageRequest {
req: PagestreamGetPageRequest,
timer: SmgrOpTimer,
ctx: RequestContext,
}
#[cfg(feature = "testing")]
@@ -755,7 +743,6 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
timeline_handles: &mut TimelineHandles,
tracing_config: Option<&Tracing>,
cancel: &CancellationToken,
ctx: &RequestContext,
protocol_version: PagestreamProtocolVersion,
@@ -915,51 +902,10 @@ impl PageServerHandler {
}
let key = rel_block_to_key(req.rel, req.blkno);
let sampled = match tracing_config {
Some(conf) => {
let ratio = &conf.sampling_ratio;
if ratio.numerator == 0 {
false
} else {
rand::thread_rng().gen_range(0..ratio.denominator) < ratio.numerator
}
}
None => false,
};
let ctx = if sampled {
RequestContextBuilder::from(ctx)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"GET_PAGE",
tenant_id = %tenant_id,
shard_id = field::Empty,
timeline_id = %timeline_id,
lsn = %req.hdr.request_lsn,
request_id = %req.hdr.reqid,
key = %key,
)
})
.attached_child()
} else {
ctx.attached_child()
};
let res = timeline_handles
let shard = match timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.maybe_perf_instrument(&ctx, |current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"SHARD_SELECTION",
)
})
.await;
let shard = match res {
.await
{
Ok(tl) => tl,
Err(e) => {
let span = mkspan!(before shard routing);
@@ -986,60 +932,26 @@ impl PageServerHandler {
}
}
};
// This ctx travels as part of the BatchedFeMessage through
// batching into the request handler.
// The request handler needs to do some per-request work
// (relsize check) before dispatching the batch as a single
// get_vectored call to the Timeline.
// This ctx will be used for the reslize check, whereas the
// get_vectored call will be a different ctx with separate
// perf span.
let ctx = ctx.with_scope_page_service_pagestream(&shard);
// Similar game for this `span`: we funnel it through so that
// request handler log messages contain the request-specific fields.
let span = mkspan!(shard.tenant_shard_id.shard_slug());
// Enrich the perf span with shard_id now that shard routing is done.
ctx.perf_span_record(
"shard_id",
tracing::field::display(shard.get_shard_identity().shard_slug()),
);
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetPageAtLsn,
received_at,
)
.maybe_perf_instrument(&ctx, |current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"THROTTLE",
)
})
.await?;
// We're holding the Handle
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
let res = Self::wait_or_get_last_lsn(
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
req.hdr.request_lsn,
req.hdr.not_modified_since,
&shard.get_applied_gc_cutoff_lsn(),
&ctx,
ctx,
)
.maybe_perf_instrument(&ctx, |current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"WAIT_LSN",
)
})
.await;
let effective_request_lsn = match res {
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
.await
{
Ok(lsn) => lsn,
Err(e) => {
return respond_error!(span, e);
@@ -1049,7 +961,7 @@ impl PageServerHandler {
span,
shard: shard.downgrade(),
effective_request_lsn,
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer, ctx }],
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
}
}
#[cfg(feature = "testing")]
@@ -1602,15 +1514,12 @@ impl PageServerHandler {
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
let cancel = self.cancel.clone();
let tracing_config = self.conf.tracing.clone();
let err = loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
tracing_config.as_ref(),
&cancel,
ctx,
protocol_version,
@@ -1744,8 +1653,6 @@ impl PageServerHandler {
// Batcher
//
let tracing_config = self.conf.tracing.clone();
let cancel_batcher = self.cancel.child_token();
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
@@ -1759,7 +1666,6 @@ impl PageServerHandler {
tenant_id,
timeline_id,
&mut timeline_handles,
tracing_config.as_ref(),
&cancel_batcher,
&ctx,
protocol_version,
@@ -2098,9 +2004,7 @@ impl PageServerHandler {
let results = timeline
.get_rel_page_at_lsn_batched(
requests
.iter()
.map(|p| (&p.req.rel, &p.req.blkno, p.ctx.attached_child())),
requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
effective_lsn,
io_concurrency,
ctx,

View File

@@ -9,7 +9,6 @@
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
use std::ops::{ControlFlow, Range};
use crate::PERF_TRACE_TARGET;
use anyhow::{Context, ensure};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
@@ -32,7 +31,7 @@ use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, trace, warn};
use tracing::{debug, info, trace, warn};
use utils::bin_ser::{BeSer, DeserializeError};
use utils::lsn::Lsn;
use utils::pausable_failpoint;
@@ -40,7 +39,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use super::tenant::{PageReconstructError, Timeline};
use crate::aux_file;
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::{
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
@@ -210,9 +209,7 @@ impl Timeline {
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
let res = self
.get_rel_page_at_lsn_batched(
pages
.iter()
.map(|(tag, blknum)| (tag, blknum, ctx.attached_child())),
pages.iter().map(|(tag, blknum)| (tag, blknum)),
effective_lsn,
io_concurrency.clone(),
ctx,
@@ -251,7 +248,7 @@ impl Timeline {
/// The ordering of the returned vec corresponds to the ordering of `pages`.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, RequestContext)>,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
effective_lsn: Lsn,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
@@ -265,11 +262,8 @@ impl Timeline {
let mut result = Vec::with_capacity(pages.len());
let result_slots = result.spare_capacity_mut();
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
BTreeMap::default();
let mut perf_instrument = false;
for (response_slot_idx, (tag, blknum, ctx)) in pages.enumerate() {
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
if tag.relnode == 0 {
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
@@ -280,16 +274,7 @@ impl Timeline {
}
let nblocks = match self
.get_rel_size(*tag, Version::Lsn(effective_lsn), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_REL_SIZE",
reltag=%tag,
lsn=%effective_lsn,
)
})
.get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
.await
{
Ok(nblocks) => nblocks,
@@ -312,12 +297,8 @@ impl Timeline {
let key = rel_block_to_key(*tag, *blknum);
if ctx.has_perf_span() {
perf_instrument = true;
}
let key_slots = keys_slots.entry(key).or_default();
key_slots.push((response_slot_idx, ctx));
key_slots.push(response_slot_idx);
}
let keyspace = {
@@ -333,34 +314,16 @@ impl Timeline {
acc.to_keyspace()
};
let ctx = match perf_instrument {
true => RequestContextBuilder::from(ctx)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"GET_VECTORED",
tenant_id = %self.tenant_shard_id.tenant_id,
timeline_id = %self.timeline_id,
lsn = %effective_lsn,
shard = %self.tenant_shard_id.shard_slug(),
)
})
.attached_child(),
false => ctx.attached_child(),
};
let res = self
.get_vectored(keyspace, effective_lsn, io_concurrency, &ctx)
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
.await;
match res {
match self
.get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
.await
{
Ok(results) => {
for (key, res) in results {
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
let first_slot = key_slots.next().unwrap();
for (slot, req_ctx) in key_slots {
for slot in key_slots {
let clone = match &res {
Ok(buf) => Ok(buf.clone()),
Err(err) => Err(match err {
@@ -378,22 +341,17 @@ impl Timeline {
};
result_slots[slot].write(clone);
// There is no standardized way to express that the batched span followed from N request spans.
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
req_ctx.perf_follows_from(&ctx);
slots_filled += 1;
}
result_slots[first_slot].write(res);
first_req_ctx.perf_follows_from(&ctx);
slots_filled += 1;
}
}
Err(err) => {
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
for (slot, req_ctx) in keys_slots.values().flatten() {
for slot in keys_slots.values().flatten() {
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
// but without taking ownership of the GetVectoredError
let err = match &err {
@@ -425,7 +383,6 @@ impl Timeline {
}
};
req_ctx.perf_follows_from(&ctx);
result_slots[*slot].write(err);
}

View File

@@ -219,7 +219,8 @@ pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
// Bump this number when adding a new pageserver_runtime!
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = NonZeroUsize::new(4).unwrap();
// SAFETY: it's obviously correct
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) };
#[derive(Debug, Clone, Copy)]
pub struct PageserverTaskId(u64);

View File

@@ -3689,7 +3689,7 @@ impl Tenant {
}
}
}
TenantState::Active => {
TenantState::Active { .. } => {
return Ok(());
}
TenantState::Broken { reason, .. } => {
@@ -4205,9 +4205,9 @@ impl Tenant {
self.cancel.child_token(),
);
let timeline_ctx = RequestContextBuilder::from(ctx)
let timeline_ctx = RequestContextBuilder::extend(ctx)
.scope(context::Scope::new_timeline(&timeline))
.detached_child();
.build();
Ok((timeline, timeline_ctx))
}

View File

@@ -53,7 +53,7 @@ impl<Value: Clone> LayerCoverage<Value> {
///
/// Complexity: O(log N)
fn add_node(&mut self, key: i128) {
let value = match self.nodes.range(..=key).next_back() {
let value = match self.nodes.range(..=key).last() {
Some((_, Some(v))) => Some(v.clone()),
Some((_, None)) => None,
None => None,

View File

@@ -58,7 +58,7 @@ use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
/// For a tenant that appears in TenantsMap, it may either be
/// - `Attached`: has a full Tenant object, is elegible to service
/// reads and ingest WAL.
/// reads and ingest WAL.
/// - `Secondary`: is only keeping a local cache warm.
///
/// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because

View File

@@ -130,7 +130,7 @@ impl IndexPart {
/// Version history
/// - 2: added `deleted_at`
/// - 3: no longer deserialize `timeline_layers` (serialized format is the same, but timeline_layers
/// is always generated from the keys of `layer_metadata`)
/// is always generated from the keys of `layer_metadata`)
/// - 4: timeline_layers is fully removed.
/// - 5: lineage was added
/// - 6: last_aux_file_policy is added.

View File

@@ -13,13 +13,13 @@ pub mod merge_iterator;
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::PERF_TRACE_TARGET;
pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
use bytes::Bytes;
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
@@ -34,7 +34,7 @@ use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::value::Value;
use tracing::{Instrument, info_span, trace};
use tracing::{Instrument, trace};
use utils::lsn::Lsn;
use utils::sync::gate::GateGuard;
@@ -43,9 +43,7 @@ use super::PageReconstructError;
use super::layer_map::InMemoryLayerDesc;
use super::timeline::{GetVectoredError, ReadPath};
use crate::config::PageServerConf;
use crate::context::{
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
use crate::context::{AccessStatsBehavior, RequestContext};
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
@@ -876,37 +874,13 @@ impl ReadableLayer {
) -> Result<(), GetVectoredError> {
match self {
ReadableLayer::PersistentLayer(layer) => {
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"PLAN_LAYER",
layer = %layer
)
})
.attached_child();
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
.await
}
ReadableLayer::InMemoryLayer(layer) => {
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"PLAN_LAYER",
layer = %layer
)
})
.attached_child();
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
.await
}
}

View File

@@ -896,9 +896,9 @@ impl DeltaLayerInner {
where
Reader: BlockReader + Clone,
{
let ctx = RequestContextBuilder::from(ctx)
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.attached_child();
.build();
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;
@@ -1105,9 +1105,9 @@ impl DeltaLayerInner {
all_keys.push(entry);
true
},
&RequestContextBuilder::from(ctx)
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.attached_child(),
.build(),
)
.await?;
if let Some(last) = all_keys.last_mut() {

View File

@@ -481,9 +481,9 @@ impl ImageLayerInner {
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
let ctx = RequestContextBuilder::from(ctx)
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
.attached_child();
.build();
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;

View File

@@ -421,9 +421,9 @@ impl InMemoryLayer {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let ctx = RequestContextBuilder::from(ctx)
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.attached_child();
.build();
let inner = self.inner.read().await;

View File

@@ -3,13 +3,12 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
use crate::PERF_TRACE_TARGET;
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::HistoricLayerInfo;
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
use tracing::{Instrument, info_span};
use tracing::Instrument;
use utils::generation::Generation;
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -19,7 +18,7 @@ use super::delta_layer::{self};
use super::image_layer::{self};
use super::{
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
LayerVisibilityHint, PerfInstrumentFutureExt, PersistentLayerDesc, ValuesReconstructState,
LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
@@ -325,29 +324,16 @@ impl Layer {
reconstruct_data: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let downloaded = {
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_LAYER",
)
})
.attached_child();
let downloaded =
self.0
.get_or_maybe_download(true, &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone())
.get_or_maybe_download(true, ctx)
.await
.map_err(|err| match err {
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
GetVectoredError::Cancelled
}
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?
};
})?;
let this = ResidentLayer {
downloaded: downloaded.clone(),
owner: self.clone(),
@@ -355,20 +341,9 @@ impl Layer {
self.record_access(ctx);
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"VISIT_LAYER",
)
})
.attached_child();
downloaded
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, &ctx)
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
.map_err(|err| match err {
GetVectoredError::Other(err) => GetVectoredError::Other(
@@ -1070,34 +1045,15 @@ impl LayerInner {
return Err(DownloadError::DownloadRequired);
}
let ctx = if ctx.has_perf_span() {
let dl_ctx = RequestContextBuilder::from(ctx)
.task_kind(TaskKind::LayerDownload)
.download_behavior(DownloadBehavior::Download)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"DOWNLOAD_LAYER",
layer = %self,
reason = %reason
)
})
.detached_child();
ctx.perf_follows_from(&dl_ctx);
dl_ctx
} else {
ctx.attached_child()
};
let download_ctx = ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download);
async move {
tracing::info!(%reason, "downloading on-demand");
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
let res = self
.download_init_and_wait(timeline, permit, ctx.attached_child())
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.download_init_and_wait(timeline, permit, download_ctx)
.await?;
scopeguard::ScopeGuard::into_inner(init_cancelled);
Ok(res)
}
@@ -1764,9 +1720,9 @@ impl DownloadedLayer {
);
let res = if owner.desc.is_delta {
let ctx = RequestContextBuilder::from(ctx)
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
.attached_child();
.build();
let summary = Some(delta_layer::Summary::expected(
owner.desc.tenant_shard_id.tenant_id,
owner.desc.timeline_id,
@@ -1782,9 +1738,9 @@ impl DownloadedLayer {
.await
.map(LayerKind::Delta)
} else {
let ctx = RequestContextBuilder::from(ctx)
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(crate::context::PageContentKind::ImageLayerSummary)
.attached_child();
.build();
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
owner.desc.tenant_shard_id.tenant_id,

View File

@@ -119,10 +119,6 @@ async fn smoke_test() {
let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
assert!(matches!(e, EvictionError::NotFound));
let dl_ctx = RequestContextBuilder::from(ctx)
.download_behavior(DownloadBehavior::Download)
.attached_child();
// on accesses when the layer is evicted, it will automatically be downloaded.
let img_after = {
let mut data = ValuesReconstructState::new(io_concurrency.clone());
@@ -131,7 +127,7 @@ async fn smoke_test() {
controlfile_keyspace.clone(),
Lsn(0x10)..Lsn(0x11),
&mut data,
&dl_ctx,
ctx,
)
.instrument(download_span.clone())
.await
@@ -181,7 +177,7 @@ async fn smoke_test() {
// plain downloading is rarely needed
layer
.download_and_keep_resident(&dl_ctx)
.download_and_keep_resident(ctx)
.instrument(download_span)
.await
.unwrap();
@@ -649,10 +645,9 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
let ctx = ctx.with_scope_timeline(&timeline);
// This test does downloads
let ctx = RequestContextBuilder::from(&ctx)
let ctx = RequestContextBuilder::extend(&ctx)
.download_behavior(DownloadBehavior::Download)
.attached_child();
.build();
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
@@ -735,9 +730,9 @@ async fn evict_and_wait_does_not_wait_for_download() {
let ctx = ctx.with_scope_timeline(&timeline);
// This test does downloads
let ctx = RequestContextBuilder::from(&ctx)
let ctx = RequestContextBuilder::extend(&ctx)
.download_behavior(DownloadBehavior::Download)
.attached_child();
.build();
let layer = {
let mut layers = {

View File

@@ -23,7 +23,6 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::PERF_TRACE_TARGET;
use anyhow::{Context, Result, anyhow, bail, ensure};
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
@@ -97,9 +96,7 @@ use super::{
};
use crate::aux_file::AuxFileSizeEstimator;
use crate::config::PageServerConf;
use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
use crate::context::{DownloadBehavior, RequestContext};
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::l0_flush::{self, L0FlushGlobalState};
@@ -1292,22 +1289,9 @@ impl Timeline {
};
reconstruct_state.read_path = read_path;
let traversal_res: Result<(), _> = {
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"PLAN_IO",
)
})
.attached_child();
self.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
};
let traversal_res: Result<(), _> = self
.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
.await;
if let Err(err) = traversal_res {
// Wait for all the spawned IOs to complete.
// See comments on `spawn_io` inside `storage_layer` for more details.
@@ -1321,46 +1305,14 @@ impl Timeline {
let layers_visited = reconstruct_state.get_layers_visited();
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"RECONSTRUCT",
)
})
.attached_child();
let futs = FuturesUnordered::new();
for (key, state) in std::mem::take(&mut reconstruct_state.keys) {
futs.push({
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
let ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"RECONSTRUCT_KEY",
key = %key,
)
})
.attached_child();
async move {
assert_eq!(state.situation, ValueReconstructSituation::Complete);
let res = state
.collect_pending_ios()
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"WAIT_FOR_IO_COMPLETIONS",
)
})
.await;
let converted = match res {
let converted = match state.collect_pending_ios().await {
Ok(ok) => ok,
Err(err) => {
return (key, Err(err));
@@ -1377,27 +1329,16 @@ impl Timeline {
"{converted:?}"
);
let walredo_deltas = converted.num_deltas();
let walredo_res = walredo_self
.reconstruct_value(key, lsn, converted)
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"WALREDO",
deltas = %walredo_deltas,
)
})
.await;
(key, walredo_res)
(
key,
walredo_self.reconstruct_value(key, lsn, converted).await,
)
}
});
}
let results = futs
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await;
// For aux file keys (v1 or v2) the vectored read path does not return an error
@@ -2306,7 +2247,7 @@ impl Timeline {
.await
.expect("holding a reference to self");
}
TimelineState::Active => {
TimelineState::Active { .. } => {
return Ok(());
}
TimelineState::Broken { .. } | TimelineState::Stopping => {
@@ -3934,30 +3875,15 @@ impl Timeline {
let TimelineVisitOutcome {
completed_keyspace: completed,
image_covered_keyspace,
} = {
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"PLAN_IO_TIMELINE",
timeline = %timeline.timeline_id,
lsn = %cont_lsn,
)
})
.attached_child();
Self::get_vectored_reconstruct_data_timeline(
timeline,
keyspace.clone(),
cont_lsn,
reconstruct_state,
&self.cancel,
&ctx,
)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await?
};
} = Self::get_vectored_reconstruct_data_timeline(
timeline,
keyspace.clone(),
cont_lsn,
reconstruct_state,
&self.cancel,
ctx,
)
.await?;
keyspace.remove_overlapping_with(&completed);
@@ -4001,24 +3927,8 @@ impl Timeline {
// Take the min to avoid reconstructing a page with data newer than request Lsn.
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_ANCESTOR",
timeline = %timeline.timeline_id,
lsn = %cont_lsn,
ancestor = %ancestor_timeline.timeline_id,
ancestor_lsn = %timeline.ancestor_lsn
)
})
.attached_child();
timeline_owned = timeline
.get_ready_ancestor_timeline(ancestor_timeline, &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
.await?;
timeline = &*timeline_owned;
};
@@ -7349,9 +7259,9 @@ mod tests {
eprintln!("Downloading {layer} and re-generating heatmap");
let ctx = &RequestContextBuilder::from(ctx)
let ctx = &RequestContextBuilder::extend(ctx)
.download_behavior(crate::context::DownloadBehavior::Download)
.attached_child();
.build();
let _resident = layer
.download_and_keep_resident(ctx)

View File

@@ -26,7 +26,7 @@ use once_cell::sync::Lazy;
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
use pageserver_api::key::{KEY_SIZE, Key};
use pageserver_api::keyspace::{KeySpace, ShardedRange};
use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
use pageserver_api::models::CompactInfoResponse;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
@@ -61,7 +61,7 @@ use crate::tenant::timeline::{
DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
ResidentLayer, drop_rlock,
};
use crate::tenant::{DeltaLayer, MaybeOffloaded};
use crate::tenant::{DeltaLayer, MaybeOffloaded, gc_block};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
@@ -123,6 +123,7 @@ impl GcCompactionQueueItem {
#[derive(Default)]
struct GcCompactionGuardItems {
notify: Option<tokio::sync::oneshot::Sender<()>>,
gc_guard: Option<gc_block::Guard>,
permit: Option<OwnedSemaphorePermit>,
}
@@ -278,7 +279,7 @@ impl GcCompactionQueue {
gc_compaction_ratio_percent: u64,
) -> bool {
const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
if l1_size + l2_size >= AUTO_TRIGGER_LIMIT {
if l1_size >= AUTO_TRIGGER_LIMIT || l2_size >= AUTO_TRIGGER_LIMIT {
// Do not auto-trigger when physical size >= 150GB
return false;
}
@@ -318,12 +319,7 @@ impl GcCompactionQueue {
flags
},
sub_compaction: true,
// Only auto-trigger gc-compaction over the data keyspace due to concerns in
// https://github.com/neondatabase/neon/issues/11318.
compact_key_range: Some(CompactKeyRange {
start: Key::MIN,
end: Key::metadata_key_range().start,
}),
compact_key_range: None,
compact_lsn_range: None,
sub_compaction_max_job_size_mb: None,
},
@@ -347,45 +343,44 @@ impl GcCompactionQueue {
info!("compaction job id={} finished", id);
let mut guard = self.inner.lock().unwrap();
if let Some(items) = guard.guards.remove(&id) {
drop(items.gc_guard);
if let Some(tx) = items.notify {
let _ = tx.send(());
}
}
}
fn clear_running_job(&self) {
let mut guard = self.inner.lock().unwrap();
guard.running = None;
}
async fn handle_sub_compaction(
&self,
id: GcCompactionJobId,
options: CompactOptions,
timeline: &Arc<Timeline>,
gc_block: &GcBlock,
auto: bool,
) -> Result<(), CompactionError> {
info!(
"running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
);
let res = timeline
let jobs = timeline
.gc_compaction_split_jobs(
GcCompactJob::from_compact_options(options.clone()),
options.sub_compaction_max_job_size_mb,
)
.await;
let jobs = match res {
Ok(jobs) => jobs,
Err(err) => {
warn!("cannot split gc-compaction jobs: {}, unblocked gc", err);
self.notify_and_unblock(id);
return Err(err);
}
};
.await?;
if jobs.is_empty() {
info!("no jobs to run, skipping scheduled compaction task");
self.notify_and_unblock(id);
} else {
let gc_guard = match gc_block.start().await {
Ok(guard) => guard,
Err(e) => {
return Err(CompactionError::Other(anyhow!(
"cannot run gc-compaction because gc is blocked: {}",
e
)));
}
};
let jobs_len = jobs.len();
let mut pending_tasks = Vec::new();
// gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
@@ -420,6 +415,7 @@ impl GcCompactionQueue {
{
let mut guard = self.inner.lock().unwrap();
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
let mut tasks = Vec::new();
for task in pending_tasks {
let id = guard.next_id();
@@ -450,18 +446,7 @@ impl GcCompactionQueue {
if let Err(err) = &res {
log_compaction_error(err, None, cancel.is_cancelled());
}
match res {
Ok(res) => Ok(res),
Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
Err(_) => {
// There are some cases where traditional gc might collect some layer
// files causing gc-compaction cannot read the full history of the key.
// This needs to be resolved in the long-term by improving the compaction
// process. For now, let's simply avoid such errors triggering the
// circuit breaker.
Ok(CompactionOutcome::Skipped)
}
}
res
}
async fn iteration_inner(
@@ -509,32 +494,27 @@ impl GcCompactionQueue {
info!(
"running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
);
self.handle_sub_compaction(id, options, timeline, auto)
self.handle_sub_compaction(id, options, timeline, gc_block, auto)
.await?;
} else {
// Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
// in this branch.
let _gc_guard = match gc_block.start().await {
let gc_guard = match gc_block.start().await {
Ok(guard) => guard,
Err(e) => {
self.notify_and_unblock(id);
self.clear_running_job();
return Err(CompactionError::Other(anyhow!(
"cannot run gc-compaction because gc is blocked: {}",
e
)));
}
};
let res = timeline.compact_with_options(cancel, options, ctx).await;
let compaction_result = match res {
Ok(res) => res,
Err(err) => {
warn!(%err, "failed to run gc-compaction");
self.notify_and_unblock(id);
self.clear_running_job();
return Err(err);
}
};
{
let mut guard = self.inner.lock().unwrap();
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
}
let compaction_result =
timeline.compact_with_options(cancel, options, ctx).await?;
self.notify_and_unblock(id);
if compaction_result == CompactionOutcome::YieldForL0 {
yield_for_l0 = true;
}
@@ -542,25 +522,7 @@ impl GcCompactionQueue {
}
GcCompactionQueueItem::SubCompactionJob(options) => {
// TODO: error handling, clear the queue if any task fails?
let _gc_guard = match gc_block.start().await {
Ok(guard) => guard,
Err(e) => {
self.clear_running_job();
return Err(CompactionError::Other(anyhow!(
"cannot run gc-compaction because gc is blocked: {}",
e
)));
}
};
let res = timeline.compact_with_options(cancel, options, ctx).await;
let compaction_result = match res {
Ok(res) => res,
Err(err) => {
warn!(%err, "failed to run gc-compaction subcompaction job");
self.clear_running_job();
return Err(err);
}
};
let compaction_result = timeline.compact_with_options(cancel, options, ctx).await?;
if compaction_result == CompactionOutcome::YieldForL0 {
// We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
// again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
@@ -591,7 +553,10 @@ impl GcCompactionQueue {
}
}
}
self.clear_running_job();
{
let mut guard = self.inner.lock().unwrap();
guard.running = None;
}
Ok(if yield_for_l0 {
tracing::info!("give up gc-compaction: yield for L0 compaction");
CompactionOutcome::YieldForL0
@@ -1036,9 +1001,9 @@ impl Timeline {
{
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
let image_ctx = RequestContextBuilder::from(ctx)
let image_ctx = RequestContextBuilder::extend(ctx)
.access_stats_behavior(AccessStatsBehavior::Skip)
.attached_child();
.build();
let mut partitioning = dense_partitioning;
partitioning

View File

@@ -2,14 +2,10 @@ use std::collections::HashSet;
use std::sync::Arc;
use anyhow::Context;
use bytes::Bytes;
use http_utils::error::ApiError;
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::DetachBehavior;
use pageserver_api::models::detach_ancestor::AncestorDetached;
use pageserver_api::shard::ShardIdentity;
use pageserver_compaction::helpers::overlaps_with;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
@@ -26,10 +22,7 @@ use crate::task_mgr::TaskKind;
use crate::tenant::Tenant;
use crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor;
use crate::tenant::storage_layer::layer::local_layer_path;
use crate::tenant::storage_layer::{
AsLayerDesc as _, DeltaLayerWriter, ImageLayerWriter, IoConcurrency, Layer, ResidentLayer,
ValuesReconstructState,
};
use crate::tenant::storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
#[derive(Debug, thiserror::Error)]
@@ -177,92 +170,6 @@ impl Attempt {
}
}
async fn generate_tombstone_image_layer(
detached: &Arc<Timeline>,
ancestor: &Arc<Timeline>,
ancestor_lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<ResidentLayer>, Error> {
tracing::info!(
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
);
let io_concurrency = IoConcurrency::spawn_from_conf(
detached.conf,
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
);
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
// Directly use `get_vectored_impl` to skip the max_vectored_read_key limit check. Note that the keyspace should
// not contain too many keys, otherwise this takes a lot of memory. Currently we limit it to 10k keys in the compute.
let key_range = Key::sparse_non_inherited_keyspace();
// avoid generating a "future layer" which will then be removed
let image_lsn = ancestor_lsn;
{
let layers = detached.layers.read().await;
for layer in layers.all_persistent_layers() {
if !layer.is_delta
&& layer.lsn_range.start == image_lsn
&& overlaps_with(&key_range, &layer.key_range)
{
tracing::warn!(
layer=%layer, "image layer at the detach LSN already exists, skipping removing aux files"
);
return Ok(None);
}
}
}
let data = ancestor
.get_vectored_impl(
KeySpace::single(key_range.clone()),
image_lsn,
&mut reconstruct_state,
ctx,
)
.await
.context("failed to retrieve aux keys")
.map_err(|e| Error::launder(e, Error::Prepare))?;
if !data.is_empty() {
// TODO: is it possible that we can have an image at `image_lsn`? Unlikely because image layers are only generated
// upon compaction but theoretically possible.
let mut image_layer_writer = ImageLayerWriter::new(
detached.conf,
detached.timeline_id,
detached.tenant_shard_id,
&key_range,
image_lsn,
ctx,
)
.await
.context("failed to create image layer writer")
.map_err(Error::Prepare)?;
for key in data.keys() {
image_layer_writer
.put_image(*key, Bytes::new(), ctx)
.await
.context("failed to write key")
.map_err(|e| Error::launder(e, Error::Prepare))?;
}
let (desc, path) = image_layer_writer
.finish(ctx)
.await
.context("failed to finish image layer writer for removing the metadata keys")
.map_err(|e| Error::launder(e, Error::Prepare))?;
let generated = Layer::finish_creating(detached.conf, detached, desc, &path)
.map_err(|e| Error::launder(e, Error::Prepare))?;
detached
.remote_client
.upload_layer_file(&generated, &detached.cancel)
.await
.map_err(|e| Error::launder(e, Error::Prepare))?;
tracing::info!(layer=%generated, "wrote image layer");
Ok(Some(generated))
} else {
tracing::info!("no aux keys found in ancestor");
Ok(None)
}
}
/// See [`Timeline::prepare_to_detach_from_ancestor`]
pub(super) async fn prepare(
detached: &Arc<Timeline>,
@@ -445,16 +352,10 @@ pub(super) async fn prepare(
// TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
let mut new_layers: Vec<Layer> =
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len() + 1);
if let Some(tombstone_layer) =
generate_tombstone_image_layer(detached, &ancestor, ancestor_lsn, ctx).await?
{
new_layers.push(tombstone_layer.into());
}
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
{
tracing::info!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
let mut tasks = tokio::task::JoinSet::new();

View File

@@ -32,15 +32,9 @@ impl Client {
let Some(ref base_url) = conf.import_pgdata_upcall_api else {
anyhow::bail!("import_pgdata_upcall_api is not configured")
};
let mut http_client = reqwest::Client::builder();
for cert in &conf.ssl_ca_certs {
http_client = http_client.add_root_certificate(cert.clone());
}
let http_client = http_client.build()?;
Ok(Self {
base_url: base_url.to_string(),
client: http_client,
client: reqwest::Client::new(),
cancel,
authorization_header: conf
.import_pgdata_upcall_api_token

View File

@@ -25,8 +25,8 @@ impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
/// * `align` must be a power of two,
///
/// * `capacity`, when rounded up to the nearest multiple of `align`,
/// must not overflow isize (i.e., the rounded value must be
/// less than or equal to `isize::MAX`).
/// must not overflow isize (i.e., the rounded value must be
/// less than or equal to `isize::MAX`).
pub fn with_capacity(capacity: usize) -> Self {
AlignedBufferMut {
raw: RawAlignedBuffer::with_capacity(capacity),

View File

@@ -37,8 +37,8 @@ impl<const A: usize> RawAlignedBuffer<ConstAlign<A>> {
/// * `align` must be a power of two,
///
/// * `capacity`, when rounded up to the nearest multiple of `align`,
/// must not overflow isize (i.e., the rounded value must be
/// less than or equal to `isize::MAX`).
/// must not overflow isize (i.e., the rounded value must be
/// less than or equal to `isize::MAX`).
pub fn with_capacity(capacity: usize) -> Self {
let align = ConstAlign::<A>;
let layout = Layout::from_size_align(capacity, align.align()).expect("Invalid layout");

18
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@@ -1286,20 +1286,24 @@ files = [
[[package]]
name = "h2"
version = "4.2.0"
version = "4.1.0"
description = "Pure-Python HTTP/2 protocol implementation"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "h2-4.2.0-py3-none-any.whl", hash = "sha256:479a53ad425bb29af087f3458a61d30780bc818e4ebcf01f0b536ba916462ed0"},
{file = "h2-4.2.0.tar.gz", hash = "sha256:c8a52129695e88b1a0578d8d2cc6842bbd79128ac685463b887ee278126ad01f"},
]
files = []
develop = false
[package.dependencies]
hpack = ">=4.1,<5"
hyperframe = ">=6.1,<7"
[package.source]
type = "git"
url = "https://github.com/python-hyper/h2"
reference = "HEAD"
resolved_reference = "0b98b244b5fd1fe96100ac14905417a3b70a4286"
[[package]]
name = "hpack"
version = "4.1.0"
@@ -3840,4 +3844,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "7ab1e7b975af34b3271b7c6018fa22a261d3f73c7c0a0403b6b2bb86b5fbd36e"
content-hash = "fb50cb6b291169dce3188560cdb31a14af95647318f8f0f0d718131dbaf1817a"

View File

@@ -41,7 +41,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::Metrics;
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
pub(crate) const EXT_VERSION: &str = "0.3.0";
pub(crate) const EXT_VERSION: &str = "0.2.0";
pub(crate) const EXT_SCHEMA: &str = "auth";
#[derive(Clone)]

View File

@@ -43,7 +43,7 @@ websockets = "^12.0"
clickhouse-connect = "^0.7.16"
kafka-python = "^2.0.2"
jwcrypto = "^1.5.6"
h2 = "^4.2.0"
h2 = {git = "https://github.com/python-hyper/h2"}
types-jwcrypto = "^1.5.0.20240925"
pyyaml = "^6.0.2"
types-pyyaml = "^6.0.12.20240917"

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.86.0"
channel = "1.85.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -115,17 +115,13 @@ impl Client {
"{}/v1/tenant/{}/timeline/{}",
self.mgmt_api_endpoint, tenant_id, timeline_id
);
let resp = self
.request_maybe_body(Method::DELETE, &uri, None::<()>)
.await?;
let resp = self.request(Method::DELETE, &uri, ()).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> {
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
let resp = self
.request_maybe_body(Method::DELETE, &uri, None::<()>)
.await?;
let resp = self.request(Method::DELETE, &uri, ()).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
@@ -201,16 +197,6 @@ impl Client {
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
self.request_maybe_body(method, uri, Some(body)).await
}
/// Send the request and check that the status code is good, with an optional body.
async fn request_maybe_body<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: Option<B>,
) -> Result<reqwest::Response> {
let res = self.request_noerror(method, uri, body).await?;
let response = res.error_from_body().await?;
@@ -222,15 +208,12 @@ impl Client {
&self,
method: Method,
uri: U,
body: Option<B>,
body: B,
) -> Result<reqwest::Response> {
let mut req = self.client.request(method, uri);
if let Some(value) = &self.authorization_header {
req = req.header(reqwest::header::AUTHORIZATION, value.get_contents())
}
if let Some(body) = body {
req = req.json(&body);
}
req.send().await.map_err(Error::ReceiveBody)
req.json(&body).send().await.map_err(Error::ReceiveBody)
}
}

View File

@@ -219,10 +219,7 @@ struct Args {
pub ssl_cert_reload_period: Duration,
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
pub ssl_ca_file: Option<Utf8PathBuf>,
/// Flag to use https for requests to peer's safekeeper API.
#[arg(long)]
pub use_https_safekeeper_api: bool,
ssl_ca_file: Option<Utf8PathBuf>,
}
// Like PathBufValueParser, but allows empty string.
@@ -402,7 +399,6 @@ async fn main() -> anyhow::Result<()> {
ssl_cert_file: args.ssl_cert_file,
ssl_cert_reload_period: args.ssl_cert_reload_period,
ssl_ca_certs,
use_https_safekeeper_api: args.use_https_safekeeper_api,
});
// initialize sentry if SENTRY_DSN is provided

View File

@@ -16,9 +16,9 @@ use http_utils::{RequestExt, RouterBuilder};
use hyper::{Body, Request, Response, StatusCode};
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::{
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
TimelineStatus, TimelineTermBumpRequest,
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry,
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus,
TimelineTermBumpRequest,
};
use safekeeper_api::{ServerInfo, membership, models};
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
@@ -83,11 +83,13 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
.delete_all_for_tenant(&tenant_id, action)
.await
.map_err(ApiError::InternalServerError)?;
let response_body: TenantDeleteResult = delete_info
.iter()
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
.collect::<HashMap<String, TimelineDeleteResult>>();
json_response(StatusCode::OK, response_body)
json_response(
StatusCode::OK,
delete_info
.iter()
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
.collect::<HashMap<String, TimelineDeleteResult>>(),
)
}
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -536,7 +538,6 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
https_connstr: sk_info.https_connstr,
backup_lsn: sk_info.backup_lsn.0,
local_start_lsn: sk_info.local_start_lsn.0,
availability_zone: None,

View File

@@ -121,7 +121,6 @@ pub struct SafeKeeperConf {
pub ssl_cert_file: Utf8PathBuf,
pub ssl_cert_reload_period: Duration,
pub ssl_ca_certs: Vec<Certificate>,
pub use_https_safekeeper_api: bool,
}
impl SafeKeeperConf {
@@ -171,7 +170,6 @@ impl SafeKeeperConf {
ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
ssl_cert_reload_period: Duration::from_secs(60),
ssl_ca_certs: Vec::new(),
use_https_safekeeper_api: false,
}
}
}

View File

@@ -94,10 +94,10 @@ impl WalReceivers {
/// Get reference to locked slot contents. Slot must exist (registered
/// earlier).
fn get_slot(
self: &Arc<WalReceivers>,
fn get_slot<'a>(
self: &'a Arc<WalReceivers>,
id: WalReceiverId,
) -> MappedMutexGuard<'_, WalReceiverState> {
) -> MappedMutexGuard<'a, WalReceiverState> {
MutexGuard::map(self.mutex.lock(), |locked| {
locked.slots[id]
.as_mut()

View File

@@ -176,7 +176,6 @@ pub struct Donor {
pub flush_lsn: Lsn,
pub pg_connstr: String,
pub http_connstr: String,
pub https_connstr: Option<String>,
}
impl From<&PeerInfo> for Donor {
@@ -187,7 +186,6 @@ impl From<&PeerInfo> for Donor {
flush_lsn: p.flush_lsn,
pg_connstr: p.pg_connstr.clone(),
http_connstr: p.http_connstr.clone(),
https_connstr: p.https_connstr.clone(),
}
}
}
@@ -238,33 +236,11 @@ async fn recover(
conf: &SafeKeeperConf,
) -> anyhow::Result<String> {
// Learn donor term switch history to figure out starting point.
let mut client = reqwest::Client::builder();
for cert in &conf.ssl_ca_certs {
client = client.add_root_certificate(cert.clone());
}
let client = client
.build()
.context("Failed to build http client for recover")?;
let url = if conf.use_https_safekeeper_api {
if let Some(https_connstr) = donor.https_connstr.as_ref() {
format!("https://{https_connstr}")
} else {
anyhow::bail!(
"cannot recover from donor {}: \
https is enabled, but https_connstr is not specified",
donor.sk_id
);
}
} else {
format!("http://{}", donor.http_connstr)
};
let client = reqwest::Client::new();
let timeline_info: TimelineStatus = client
.get(format!(
"{}/v1/tenant/{}/timeline/{}",
url, tli.ttid.tenant_id, tli.ttid.timeline_id
"http://{}/v1/tenant/{}/timeline/{}",
donor.http_connstr, tli.ttid.tenant_id, tli.ttid.timeline_id
))
.send()
.await?

View File

@@ -50,7 +50,6 @@ fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> Peer
local_start_lsn: Lsn(sk_info.local_start_lsn),
pg_connstr: sk_info.safekeeper_connstr.clone(),
http_connstr: sk_info.http_connstr.clone(),
https_connstr: sk_info.https_connstr.clone(),
ts,
}
}
@@ -364,7 +363,6 @@ impl SharedState {
.to_owned()
.unwrap_or(conf.listen_pg_addr.clone()),
http_connstr: conf.listen_http_addr.to_owned(),
https_connstr: conf.listen_https_addr.to_owned(),
backup_lsn: self.sk.state().inmem.backup_lsn.0,
local_start_lsn: self.sk.state().local_start_lsn.0,
availability_zone: conf.availability_zone.clone(),
@@ -701,7 +699,7 @@ impl Timeline {
}
/// Take a writing mutual exclusive lock on timeline shared_state.
pub async fn write_shared_state(self: &Arc<Self>) -> WriteGuardSharedState<'_> {
pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
}

View File

@@ -116,7 +116,7 @@ fn test_many_tx() -> anyhow::Result<()> {
}
None
})
.next_back()
.last()
.unwrap();
let initdb_lsn = 21623024;

View File

@@ -184,7 +184,6 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
ssl_cert_file: Utf8PathBuf::from(""),
ssl_cert_reload_period: Duration::ZERO,
ssl_ca_certs: Vec::new(),
use_https_safekeeper_api: false,
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -141,7 +141,6 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
peer_horizon_lsn: 5,
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
http_connstr: "zenith-1-sk-1.local:7677".to_owned(),
https_connstr: Some("zenith-1-sk-1.local:7678".to_owned()),
local_start_lsn: 0,
availability_zone: None,
standby_horizon: 0,

View File

@@ -45,10 +45,8 @@ message SafekeeperTimelineInfo {
uint64 standby_horizon = 14;
// A connection string to use for WAL receiving.
string safekeeper_connstr = 10;
// HTTP endpoint connection string.
// HTTP endpoint connection string
string http_connstr = 13;
// HTTPS endpoint connection string.
optional string https_connstr = 15;
// Availability zone of a safekeeper.
optional string availability_zone = 11;
}

View File

@@ -764,7 +764,6 @@ mod tests {
peer_horizon_lsn: 5,
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
https_connstr: Some("neon-1-sk-1.local:7678".to_owned()),
local_start_lsn: 0,
availability_zone: None,
standby_horizon: 0,

View File

@@ -10,11 +10,13 @@ pub struct Client {
}
impl Client {
pub fn new(http_client: reqwest::Client, base_url: Url, jwt_token: Option<String>) -> Self {
pub fn new(base_url: Url, jwt_token: Option<String>) -> Self {
Self {
base_url,
jwt_token,
client: http_client,
client: reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client"),
}
}

View File

@@ -4,7 +4,6 @@ use std::error::Error as _;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv;
use futures::StreamExt;
@@ -365,28 +364,25 @@ pub(crate) struct ShardUpdate<'a> {
}
impl ComputeHook {
pub(super) fn new(config: Config) -> anyhow::Result<Self> {
pub(super) fn new(config: Config) -> Self {
let authorization_header = config
.control_plane_jwt_token
.clone()
.map(|jwt| format!("Bearer {}", jwt));
let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
for cert in &config.ssl_ca_certs {
client = client.add_root_certificate(cert.clone());
}
let client = client
let client = reqwest::ClientBuilder::new()
.timeout(NOTIFY_REQUEST_TIMEOUT)
.build()
.context("Failed to build http client for compute hook")?;
.expect("Failed to construct HTTP client");
Ok(Self {
Self {
state: Default::default(),
config,
authorization_header,
neon_local_lock: Default::default(),
api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
client,
})
}
}
/// For test environments: use neon_local's LocalEnv to update compute

View File

@@ -12,7 +12,6 @@ use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_client::mgmt_api;
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::id::NodeId;
use utils::logging::SecretString;
@@ -228,7 +227,6 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
Some((*node_id, status))
}
.instrument(tracing::info_span!("heartbeat_ps", %node_id))
});
}
@@ -255,7 +253,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
PageserverState::WarmingUp { .. } => {
warming_up += 1;
}
PageserverState::Offline => offline += 1,
PageserverState::Offline { .. } => offline += 1,
PageserverState::Available { .. } => {}
}
}
@@ -371,7 +369,6 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
Some((*node_id, status))
}
.instrument(tracing::info_span!("heartbeat_sk", %node_id))
});
}
@@ -394,7 +391,7 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
let mut offline = 0;
for state in new_state.values() {
match state {
SafekeeperState::Offline => offline += 1,
SafekeeperState::Offline { .. } => offline += 1,
SafekeeperState::Available { .. } => {}
}
}

View File

@@ -1733,9 +1733,9 @@ async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
};
if *self_addr == leader_addr {
return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
"Leader is stepped down instance".into(),
)));
return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
"Leader is stepped down instance"
))));
}
}
@@ -1744,17 +1744,19 @@ async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
// Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and
// include some leeway to get the timeout for proxied requests.
const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10);
let client = reqwest::ClientBuilder::new()
.timeout(PROXIED_REQUEST_TIMEOUT)
.build();
let client = match client {
Ok(client) => client,
Err(err) => {
return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
"Failed to build leader client for forwarding while in stepped down state: {err}"
))));
}
};
let client = state.service.get_http_client().clone();
let request: reqwest::Request = match convert_request(
req,
&client,
leader.address,
PROXIED_REQUEST_TIMEOUT,
)
.await
{
let request: reqwest::Request = match convert_request(req, &client, leader.address).await {
Ok(r) => r,
Err(err) => {
return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
@@ -1812,7 +1814,6 @@ async fn convert_request(
req: hyper::Request<Body>,
client: &reqwest::Client,
to_address: String,
timeout: Duration,
) -> Result<reqwest::Request, ApiError> {
use std::str::FromStr;
@@ -1867,7 +1868,6 @@ async fn convert_request(
.request(method, uri)
.headers(headers)
.body(body)
.timeout(timeout)
.build()
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))

View File

@@ -110,20 +110,7 @@ impl Leadership {
) -> Option<GlobalObservedState> {
tracing::info!("Sending step down request to {leader:?}");
let mut http_client = reqwest::Client::builder();
for cert in &self.config.ssl_ca_certs {
http_client = http_client.add_root_certificate(cert.clone());
}
let http_client = match http_client.build() {
Ok(http_client) => http_client,
Err(err) => {
tracing::error!("Failed to build client for leader step-down request: {err}");
return None;
}
};
let client = PeerClient::new(
http_client,
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
self.config.peer_jwt_token.clone(),
);

View File

@@ -283,8 +283,10 @@ impl Secrets {
fn load_secret(cli: &Option<String>, env_name: &str) -> Option<String> {
if let Some(v) = cli {
Some(v.clone())
} else if let Ok(v) = std::env::var(env_name) {
Some(v)
} else {
std::env::var(env_name).ok()
None
}
}
}

View File

@@ -59,11 +59,11 @@ impl ResponseErrorMessageExt for reqwest::Response {
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
impl PeerClient {
pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option<String>) -> Self {
pub(crate) fn new(uri: Uri, jwt: Option<String>) -> Self {
Self {
uri,
jwt,
client: http_client,
client: reqwest::Client::new(),
}
}

View File

@@ -1524,14 +1524,25 @@ impl Persistence {
/// Load pending operations from db.
pub(crate) async fn list_pending_ops(
&self,
filter_for_sk: Option<NodeId>,
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
use crate::schema::safekeeper_timeline_pending_ops::dsl;
const FILTER_VAL_1: i64 = 1;
const FILTER_VAL_2: i64 = 2;
let filter_opt = filter_for_sk.map(|id| id.0 as i64);
let timeline_from_db = self
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
Box::pin(async move {
let from_db: Vec<TimelinePendingOpPersistence> =
dsl::safekeeper_timeline_pending_ops.load(conn).await?;
dsl::safekeeper_timeline_pending_ops
.filter(
dsl::sk_id
.eq(filter_opt.unwrap_or(FILTER_VAL_1))
.and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))),
)
.load(conn)
.await?;
Ok(from_db)
})
})

View File

@@ -686,8 +686,6 @@ impl Reconciler {
.await?,
);
pausable_failpoint!("reconciler-live-migrate-post-generation-inc");
let dest_conf = build_location_config(
&self.shard,
&self.config,
@@ -762,9 +760,7 @@ impl Reconciler {
Ok(())
}
/// Returns true if the observed state of the attached location was refreshed
/// and false otherwise.
async fn maybe_refresh_observed(&mut self) -> Result<bool, ReconcileError> {
async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> {
// If the attached node has uncertain state, read it from the pageserver before proceeding: this
// is important to avoid spurious generation increments.
//
@@ -774,7 +770,7 @@ impl Reconciler {
let Some(attached_node) = self.intent.attached.as_ref() else {
// Nothing to do
return Ok(false);
return Ok(());
};
if matches!(
@@ -819,7 +815,7 @@ impl Reconciler {
}
}
Ok(true)
Ok(())
}
/// Reconciling a tenant makes API calls to pageservers until the observed state
@@ -835,7 +831,7 @@ impl Reconciler {
/// state where it still requires later reconciliation.
pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
// Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
let refreshed = self.maybe_refresh_observed().await?;
self.maybe_refresh_observed().await?;
// Special case: live migration
self.maybe_live_migrate().await?;
@@ -859,14 +855,8 @@ impl Reconciler {
);
match self.observed.locations.get(&node.get_id()) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
if refreshed {
tracing::info!(
node_id=%node.get_id(), "Observed configuration correct after refresh. Notifying compute.");
self.compute_notify().await?;
} else {
// Nothing to do
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.");
}
// Nothing to do
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
}
observed => {
// In all cases other than a matching observed configuration, we will

View File

@@ -101,7 +101,7 @@ impl SafekeeperClient {
pub(crate) async fn delete_tenant(
&self,
tenant_id: TenantId,
) -> Result<models::TenantDeleteResult> {
) -> Result<models::TimelineDeleteResult> {
measured_request!(
"delete_tenant",
crate::metrics::Method::Delete,

View File

@@ -1711,7 +1711,7 @@ impl Service {
))),
config: config.clone(),
persistence,
compute_hook: Arc::new(ComputeHook::new(config.clone())?),
compute_hook: Arc::new(ComputeHook::new(config.clone())),
result_tx,
heartbeater_ps,
heartbeater_sk,

View File

@@ -35,10 +35,6 @@ impl SafekeeperReconcilers {
service: &Arc<Service>,
reqs: Vec<ScheduleRequest>,
) {
tracing::info!(
"Scheduling {} pending safekeeper ops loaded from db",
reqs.len()
);
for req in reqs {
self.schedule_request(service, req);
}
@@ -78,7 +74,7 @@ pub(crate) async fn load_schedule_requests(
service: &Arc<Service>,
safekeepers: &HashMap<NodeId, Safekeeper>,
) -> anyhow::Result<Vec<ScheduleRequest>> {
let pending_ops = service.persistence.list_pending_ops().await?;
let pending_ops = service.persistence.list_pending_ops(None).await?;
let mut res = Vec::with_capacity(pending_ops.len());
for op_persist in pending_ops {
let node_id = NodeId(op_persist.sk_id as u64);
@@ -236,14 +232,12 @@ impl SafekeeperReconciler {
let kind = req.kind;
let tenant_id = req.tenant_id;
let timeline_id = req.timeline_id;
let node_id = req.safekeeper.skp.id;
self.reconcile_one(req, req_cancel)
.instrument(tracing::info_span!(
"reconcile_one",
?kind,
%tenant_id,
?timeline_id,
%node_id,
?timeline_id
))
.await;
}

View File

@@ -622,7 +622,7 @@ impl TenantShard {
.collect::<Vec<_>>();
attached_locs.sort_by_key(|i| i.1);
if let Some((node_id, _gen)) = attached_locs.into_iter().next_back() {
if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
self.intent.set_attached(scheduler, Some(*node_id));
}

View File

@@ -18,7 +18,7 @@ enum LargeObjectKind {
impl LargeObjectKind {
fn from_key(key: &str) -> Self {
let fname = key.split('/').next_back().unwrap();
let fname = key.split('/').last().unwrap();
let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
return LargeObjectKind::Other;

View File

@@ -295,8 +295,8 @@ pub struct ControllerClientConfig {
}
impl ControllerClientConfig {
pub fn build_client(self, http_client: reqwest::Client) -> control_api::Client {
control_api::Client::new(http_client, self.controller_api, Some(self.controller_jwt))
pub fn build_client(self) -> control_api::Client {
control_api::Client::new(self.controller_api, Some(self.controller_jwt))
}
}

View File

@@ -3,7 +3,7 @@ use camino::Utf8PathBuf;
use clap::{Parser, Subcommand};
use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse};
use pageserver_api::shard::TenantShardId;
use reqwest::{Certificate, Method, Url};
use reqwest::{Method, Url};
use storage_controller_client::control_api;
use storage_scrubber::garbage::{PurgeMode, find_garbage, purge_garbage};
use storage_scrubber::pageserver_physical_gc::{GcMode, pageserver_physical_gc};
@@ -41,10 +41,6 @@ struct Cli {
/// If set to true, the scrubber will exit with error code on fatal error.
#[arg(long, default_value_t = false)]
exit_code: bool,
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<Utf8PathBuf>,
}
#[derive(Subcommand, Debug)]
@@ -150,28 +146,13 @@ async fn main() -> anyhow::Result<()> {
tracing::info!("version: {}, build_tag {}", GIT_VERSION, BUILD_TAG);
let ssl_ca_certs = match cli.ssl_ca_file.as_ref() {
Some(ssl_ca_file) => {
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
let buf = tokio::fs::read(ssl_ca_file).await?;
Certificate::from_pem_bundle(&buf)?
}
None => Vec::new(),
};
let mut http_client = reqwest::Client::builder();
for cert in ssl_ca_certs {
http_client = http_client.add_root_certificate(cert);
}
let http_client = http_client.build()?;
let controller_client = cli.controller_api.map(|controller_api| {
ControllerClientConfig {
controller_api,
// Default to no key: this is a convenience when working in a development environment
controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
}
.build_client(http_client)
.build_client()
});
match cli.command {

View File

@@ -376,28 +376,6 @@ class PageserverWalReceiverProtocol(StrEnum):
raise ValueError(f"Unknown protocol type: {proto}")
@dataclass
class PageserverTracingConfig:
sampling_ratio: tuple[int, int]
endpoint: str
protocol: str
timeout: str
def to_config_key_value(self) -> tuple[str, dict[str, Any]]:
value = {
"sampling_ratio": {
"numerator": self.sampling_ratio[0],
"denominator": self.sampling_ratio[1],
},
"export_config": {
"endpoint": self.endpoint,
"protocol": self.protocol,
"timeout": self.timeout,
},
}
return ("tracing", value)
class NeonEnvBuilder:
"""
Builder object to create a Neon runtime environment
@@ -447,7 +425,6 @@ class NeonEnvBuilder:
pageserver_virtual_file_io_mode: str | None = None,
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
pageserver_get_vectored_concurrent_io: str | None = None,
pageserver_tracing_config: PageserverTracingConfig | None = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -501,8 +478,6 @@ class NeonEnvBuilder:
pageserver_get_vectored_concurrent_io
)
self.pageserver_tracing_config = pageserver_tracing_config
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
pageserver_default_tenant_config_compaction_algorithm
)
@@ -1163,7 +1138,6 @@ class NeonEnv:
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
self.pageserver_tracing_config = config.pageserver_tracing_config
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
@@ -1288,14 +1262,6 @@ class NeonEnv:
if key not in ps_cfg:
ps_cfg[key] = value
if self.pageserver_tracing_config is not None:
key, value = self.pageserver_tracing_config.to_config_key_value()
if key not in ps_cfg:
ps_cfg[key] = value
ps_cfg[key] = value
# Create a corresponding NeonPageserver object
self.pageservers.append(
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])
@@ -1318,7 +1284,6 @@ class NeonEnv:
"http_port": port.http,
"https_port": port.https,
"sync": config.safekeepers_enable_fsync,
"use_https_safekeeper_api": config.use_https_safekeeper_api,
}
if config.auth_enabled:
sk_cfg["auth_enabled"] = True

View File

@@ -110,7 +110,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
".*delaying layer flush by \\S+ for compaction backpressure.*",
".*stalling layer flushes for compaction backpressure.*",
".*layer roll waiting for flush due to compaction backpressure.*",
".*BatchSpanProcessor.*",
)
@@ -119,7 +118,7 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
# failing to connect to them.
".*Call to node.*management API.*failed.*receive body.*",
".*Call to node.*management API.*failed.*ReceiveBody.*",
".*Call to node.*management API.*failed.*Timeout.*",
".*Call to node.*management API still failed after .+ retries, giving up.*",
".*Failed to update node .+ after heartbeat round.*error sending request for url.*",
".*background_reconcile: failed to fetch top tenants:.*client error \\(Connect\\).*",
# Many tests will start up with a node offline

View File

@@ -1192,28 +1192,3 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
log.info(f"Got perf info response code: {res.status_code}")
self.verbose_error(res)
return res.json()
def ingest_aux_files(
self,
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
aux_files: dict[str, bytes],
):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/ingest_aux_files",
json={
"aux_files": aux_files,
},
)
self.verbose_error(res)
return res.json()
def list_aux_files(
self, tenant_id: TenantId | TenantShardId, timeline_id: TimelineId, lsn: Lsn
) -> Any:
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/list_aux_files",
json={"lsn": str(lsn)},
)
self.verbose_error(res)
return res.json()

View File

@@ -10,7 +10,6 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PageserverTracingConfig,
PgBin,
wait_for_last_flush_lsn,
)
@@ -112,15 +111,6 @@ def setup_and_run_pagebench_benchmark(
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
tracing_config = PageserverTracingConfig(
sampling_ratio=(0, 1000),
endpoint="http://localhost:4318/v1/traces",
protocol="http-binary",
timeout="10s",
)
neon_env_builder.pageserver_tracing_config = tracing_config
ratio = tracing_config.sampling_ratio[0] / tracing_config.sampling_ratio[1]
params.update(
{
"pageserver_config_override.page_cache_size": (
@@ -128,7 +118,6 @@ def setup_and_run_pagebench_benchmark(
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
"pageserver_config_override.sampling_ratio": (ratio, {"unit": ""}),
}
)

View File

@@ -7,6 +7,7 @@ import traceback
from typing import TYPE_CHECKING
import psycopg2
import psycopg2.extras
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.common_types import Lsn
@@ -25,11 +26,7 @@ if TYPE_CHECKING:
# Granularity of ~0.5 sec
def measure_replication_lag(
master: psycopg2.extensions.cursor,
replica: psycopg2.extensions.cursor,
timeout_sec: int = 600,
):
def measure_replication_lag(master, replica, timeout_sec=600):
start = time.time()
master.execute("SELECT pg_current_wal_flush_lsn()")
master_lsn = Lsn(master.fetchall()[0][0])
@@ -43,7 +40,7 @@ def measure_replication_lag(
raise TimeoutError(f"Replication sync took more than {timeout_sec} sec")
def check_pgbench_still_running(pgbench: subprocess.Popen[str]):
def check_pgbench_still_running(pgbench):
rc = pgbench.poll()
if rc is not None:
raise RuntimeError(f"Pgbench terminated early with return code {rc}")
@@ -64,8 +61,6 @@ def test_ro_replica_lag(
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: {}", project_id)
log.info("Primary endpoint ID: {}", project["project"]["endpoints"][0]["id"])
neon_api.wait_for_operation_to_finish(project_id)
error_occurred = False
try:
@@ -81,7 +76,6 @@ def test_ro_replica_lag(
endpoint_type="read_only",
settings={"pg_settings": {"hot_standby_feedback": "on"}},
)
log.info("Replica endpoint ID: {}", replica["endpoint"]["id"])
replica_env = master_env.copy()
replica_env["PGHOST"] = replica["endpoint"]["host"]
neon_api.wait_for_operation_to_finish(project_id)
@@ -197,8 +191,6 @@ def test_replication_start_stop(
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: {}", project_id)
log.info("Primary endpoint ID: {}", project["project"]["endpoints"][0]["id"])
neon_api.wait_for_operation_to_finish(project_id)
try:
branch_id = project["branch"]["id"]
@@ -208,15 +200,15 @@ def test_replication_start_stop(
)
replicas = []
for i in range(num_replicas):
replica = neon_api.create_endpoint(
project_id,
branch_id,
endpoint_type="read_only",
settings={"pg_settings": {"hot_standby_feedback": "on"}},
for _ in range(num_replicas):
replicas.append(
neon_api.create_endpoint(
project_id,
branch_id,
endpoint_type="read_only",
settings={"pg_settings": {"hot_standby_feedback": "on"}},
)
)
log.info("Replica {} endpoint ID: {}", i + 1, replica["endpoint"]["id"])
replicas.append(replica)
neon_api.wait_for_operation_to_finish(project_id)
replica_connstr = [

View File

@@ -249,7 +249,6 @@ def test_forward_compatibility(
top_output_dir: Path,
pg_version: PgVersion,
compatibility_snapshot_dir: Path,
compute_reconfigure_listener: ComputeReconfigure,
):
"""
Test that the old binaries can read new data
@@ -258,7 +257,6 @@ def test_forward_compatibility(
os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
)
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
neon_env_builder.test_may_use_compatibility_snapshot_binaries = True
try:

View File

@@ -4073,101 +4073,6 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB
assert reconciles_after_restart == 0
@run_only_on_default_postgres("PG version is not interesting here")
@pytest.mark.parametrize("restart_storcon", [True, False])
def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart_storcon: bool):
"""
Test that the storcon can create and delete tenants and timelines with a safekeeper being down.
- restart_storcon: tests whether the pending ops are persisted.
if we don't restart, we test that we don't require it to come from the db.
"""
neon_env_builder.num_safekeepers = 3
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
}
env = neon_env_builder.init_start()
env.safekeepers[0].stop()
# Wait for heartbeater to pick up that the safekeeper is gone
# This isn't really neccessary
def logged_offline():
env.storage_controller.assert_log_contains(
"Heartbeat round complete for 3 safekeepers, 1 offline"
)
wait_until(logged_offline)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(tenant_id, timeline_id)
env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
env.storage_controller.allowed_errors.extend(
[
".*Call to safekeeper.* management API still failed after.*",
".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API still failed after.*",
]
)
if restart_storcon:
# Restart the storcon to check that we persist operations
env.storage_controller.stop()
env.storage_controller.start()
config_lines = [
"neon.safekeeper_proto_version = 3",
]
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
env.storage_controller.assert_log_contains("writing pending op for sk id 1")
env.safekeepers[0].start()
# ensure that we applied the operation also for the safekeeper we just brought down
def logged_contains_on_sk():
env.safekeepers[0].assert_log_contains(
f"pulling timeline {tenant_id}/{timeline_id} from safekeeper"
)
wait_until(logged_contains_on_sk)
env.safekeepers[1].stop()
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
# ensure the safekeeper deleted the timeline
def timeline_deleted_on_active_sks():
env.safekeepers[0].assert_log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
)
env.safekeepers[2].assert_log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
)
wait_until(timeline_deleted_on_active_sks)
if restart_storcon:
# Restart the storcon to check that we persist operations
env.storage_controller.stop()
env.storage_controller.start()
env.safekeepers[1].start()
# ensure that there is log msgs for the third safekeeper too
def timeline_deleted_on_sk():
env.safekeepers[1].assert_log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
)
wait_until(timeline_deleted_on_sk)
@pytest.mark.parametrize("wrong_az", [True, False])
def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool):
"""
@@ -4271,121 +4176,3 @@ def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder,
)
else:
assert initial_ps.http_client().tenant_list_locations()["tenant_shards"] == []
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_storage_controller_migrate_with_pageserver_restart(
neon_env_builder: NeonEnvBuilder, make_httpserver
):
"""
Test that live migrations which fail right after incrementing the generation
due to the destination going offline eventually send a compute notification
after the destination re-attaches.
"""
neon_env_builder.num_pageservers = 2
neon_env_builder.storage_controller_config = {
# Disable transitions to offline
"max_offline": "600s",
"use_local_compute_notifications": False,
}
neon_env_builder.control_plane_hooks_api = (
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
notifications = []
def notify(request: Request):
log.info(f"Received notify-attach: {request}")
notifications.append(request.json)
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(notify)
env = neon_env_builder.init_start()
env.storage_controller.allowed_errors.extend(
[
".*Call to node.*management API failed.*",
".*Call to node.*management API still failed.*",
".*Reconcile error.*",
".*request.*PUT.*migrate.*",
]
)
env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}})
env.storage_controller.reconcile_until_idle()
initial_desc = env.storage_controller.tenant_describe(env.initial_tenant)["shards"][0]
log.info(f"{initial_desc=}")
primary = env.get_pageserver(initial_desc["node_attached"])
secondary = env.get_pageserver(initial_desc["node_secondary"][0])
# Pause the migration after incrementing the generation in the database
env.storage_controller.configure_failpoints(
("reconciler-live-migrate-post-generation-inc", "pause")
)
tenant_shard_id = TenantShardId(env.initial_tenant, 0, 0)
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
migrate_fut = executor.submit(
env.storage_controller.tenant_shard_migrate,
tenant_shard_id,
secondary.id,
config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True),
)
def has_hit_migration_failpoint():
expr = "at failpoint reconciler-live-migrate-post-generation-inc"
log.info(expr)
assert env.storage_controller.log_contains(expr)
wait_until(has_hit_migration_failpoint)
secondary.stop()
# Eventually migration completes
env.storage_controller.configure_failpoints(
("reconciler-live-migrate-post-generation-inc", "off")
)
try:
migrate_fut.result()
except StorageControllerApiException as err:
log.info(f"Migration failed: {err}")
except:
env.storage_controller.configure_failpoints(
("reconciler-live-migrate-post-generation-inc", "off")
)
raise
def process_migration_result():
dump = env.storage_controller.tenant_shard_dump()
observed = dump[0]["observed"]["locations"]
log.info(f"{observed=} primary={primary.id} secondary={secondary.id}")
assert observed[str(primary.id)]["conf"]["mode"] == "AttachedStale"
assert observed[str(secondary.id)]["conf"] is None
wait_until(process_migration_result)
# Start and wait for re-attach to be processed
secondary.start()
env.storage_controller.poll_node_status(
secondary.id,
desired_availability=PageserverAvailability.ACTIVE,
desired_scheduling_policy=None,
max_attempts=10,
backoff=1,
)
env.storage_controller.reconcile_until_idle()
assert notifications[-1] == {
"tenant_id": str(env.initial_tenant),
"stripe_size": None,
"shards": [{"node_id": int(secondary.id), "shard_number": 0}],
"preferred_az": DEFAULT_AZ_ID,
}

View File

@@ -776,7 +776,6 @@ def test_lsn_lease_storcon(neon_env_builder: NeonEnvBuilder):
env.initial_tenant, env.initial_timeline, last_flush_lsn
)
env.storage_controller.tenant_shard_split(env.initial_tenant, 8)
env.storage_controller.reconcile_until_idle(timeout_secs=120)
# TODO: do we preserve LSN leases across shard splits?
env.storage_controller.pageserver_api().timeline_lsn_lease(
env.initial_tenant, env.initial_timeline, last_flush_lsn

View File

@@ -1768,87 +1768,6 @@ def test_pageserver_compaction_detach_ancestor_smoke(neon_env_builder: NeonEnvBu
workload_child.validate(env.pageserver.id)
def test_timeline_detach_with_aux_files_with_detach_v1(
neon_env_builder: NeonEnvBuilder,
):
"""
Validate that "branches do not inherit their parent" is invariant over detach_ancestor.
Branches hide parent branch aux files etc by stopping lookup of non-inherited keyspace at the parent-child boundary.
We had a bug where detach_ancestor running on a child branch would copy aux files key range from child to parent,
thereby making parent aux files reappear.
"""
env = neon_env_builder.init_start(
initial_tenant_conf={
"gc_period": "1s",
"lsn_lease_length": "0s",
}
)
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main", tenant_id=env.initial_tenant)
lsn0 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
endpoint.safe_psql(
"SELECT pg_create_logical_replication_slot('test_slot_parent_1', 'pgoutput')"
)
lsn1 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
endpoint.safe_psql(
"SELECT pg_create_logical_replication_slot('test_slot_parent_2', 'pgoutput')"
)
lsn2 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn0).keys()) == set(
[]
)
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn1).keys()) == set(
["pg_replslot/test_slot_parent_1/state"]
)
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn2).keys()) == set(
["pg_replslot/test_slot_parent_1/state", "pg_replslot/test_slot_parent_2/state"]
)
# Restore at LSN1
branch_timeline_id = env.create_branch("restore", env.initial_tenant, "main", lsn1)
endpoint2 = env.endpoints.create_start("restore", tenant_id=env.initial_tenant)
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
# Add a new slot file to the restore branch (This won't happen in reality because cplane immediately detaches the branch on restore,
# but we want to ensure that aux files on the detached branch are NOT inherited during ancestor detach. We could change the behavior
# in the future.
# TL;DR we should NEVER automatically detach a branch as a background optimization for those tenants that already used the restore
# feature before branch detach was introduced because it will clean up the aux files and stop logical replication.
endpoint2.safe_psql(
"SELECT pg_create_logical_replication_slot('test_slot_restore', 'pgoutput')"
)
lsn3 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn3).keys()) == set(
["pg_replslot/test_slot_restore/state"]
)
print("lsn0=", lsn0)
print("lsn1=", lsn1)
print("lsn2=", lsn2)
print("lsn3=", lsn3)
# Detach the restore branch so that main doesn't have any child branches.
all_reparented = http.detach_ancestor(
env.initial_tenant, branch_timeline_id, detach_behavior="v1"
)
assert all_reparented == set([])
# We need to ensure all safekeeper data are ingested before checking aux files: the API does not wait for LSN.
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn2).keys()) == set(
["pg_replslot/test_slot_parent_1/state", "pg_replslot/test_slot_parent_2/state"]
), "main branch unaffected"
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn3).keys()) == set(
["pg_replslot/test_slot_restore/state"]
)
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
# TODO:
# - branch near existing L1 boundary, image layers?
# - investigate: why are layers started at uneven lsn? not just after branching, but in general.

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.4",
"7ec41bf6cd92a4af751272145fdd590270c491da"
"22533c63fc42cdc1dbe138650ba1eca10a70c5d7"
],
"v16": [
"16.8",
"26c7d3f6de6f361c8923bb80d7563853b4a04958"
"473f68210d52ff8508f71c15b0c77c01296f4ace"
],
"v15": [
"15.12",
"4ac24a747cd897119ce9b20547b3b04eba2cacbd"
"6cea02e23caa950d5f06932491a91b6af8f54360"
],
"v14": [
"14.17",
"bce3e48d8a72e70e72dfee1b7421fecd0f1b00ac"
"35bc1b0cba55680e3b37abce4e67a46bb15f3315"
]
}