mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 06:00:38 +00:00
Compare commits
1 Commits
jc/test-br
...
jcsp/stabi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f9fcb176e8 |
14
.github/scripts/generate_image_maps.py
vendored
14
.github/scripts/generate_image_maps.py
vendored
@@ -8,7 +8,7 @@ target_tag = os.getenv("TARGET_TAG")
|
||||
branch = os.getenv("BRANCH")
|
||||
dev_acr = os.getenv("DEV_ACR")
|
||||
prod_acr = os.getenv("PROD_ACR")
|
||||
dev_aws = "12345"
|
||||
dev_aws = os.getenv("DEV_AWS")
|
||||
prod_aws = os.getenv("PROD_AWS")
|
||||
aws_region = os.getenv("AWS_REGION")
|
||||
|
||||
@@ -39,18 +39,12 @@ registries = {
|
||||
],
|
||||
}
|
||||
|
||||
release_branches = ["release", "release-proxy", "release-compute"]
|
||||
|
||||
outputs: dict[str, dict[str, list[str]]] = {}
|
||||
|
||||
target_tags = (
|
||||
[target_tag, "latest"]
|
||||
if branch == "main"
|
||||
else [target_tag, "released"]
|
||||
if branch in release_branches
|
||||
else [target_tag]
|
||||
target_tags = [target_tag, "latest"] if branch == "main" else [target_tag]
|
||||
target_stages = (
|
||||
["dev", "prod"] if branch in ["release", "release-proxy", "release-compute"] else ["dev"]
|
||||
)
|
||||
target_stages = ["dev", "prod"] if branch in release_branches else ["dev"]
|
||||
|
||||
for component_name, component_images in components.items():
|
||||
for stage in target_stages:
|
||||
|
||||
12
.github/scripts/push_with_image_map.py
vendored
12
.github/scripts/push_with_image_map.py
vendored
@@ -2,9 +2,6 @@ import json
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
RED = "\033[91m"
|
||||
RESET = "\033[0m"
|
||||
|
||||
image_map = os.getenv("IMAGE_MAP")
|
||||
if not image_map:
|
||||
raise ValueError("IMAGE_MAP environment variable is not set")
|
||||
@@ -32,14 +29,9 @@ while len(pending) > 0:
|
||||
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
|
||||
if result.returncode != 0:
|
||||
failures.append((" ".join(cmd), result.stdout, target))
|
||||
failures.append((" ".join(cmd), result.stdout))
|
||||
pending.append((source, target))
|
||||
print(
|
||||
f"{RED}[RETRY]{RESET} Push failed for {target}. Retrying... (failure count: {len(failures)})"
|
||||
)
|
||||
print(result.stdout)
|
||||
|
||||
if len(failures) > 0 and (github_output := os.getenv("GITHUB_OUTPUT")):
|
||||
failed_targets = [target for _, _, target in failures]
|
||||
with open(github_output, "a") as f:
|
||||
f.write(f"push_failures={json.dumps(failed_targets)}\n")
|
||||
f.write("slack_notify=true\n")
|
||||
|
||||
@@ -110,19 +110,12 @@ jobs:
|
||||
IMAGE_MAP: ${{ inputs.image-map }}
|
||||
|
||||
- name: Notify Slack if container image pushing fails
|
||||
if: steps.push.outputs.push_failures || failure()
|
||||
if: steps.push.outputs.slack_notify == 'true' || failure()
|
||||
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
|
||||
with:
|
||||
method: chat.postMessage
|
||||
token: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
payload: |
|
||||
channel: ${{ vars.SLACK_ON_CALL_DEVPROD_STREAM }}
|
||||
text: >
|
||||
*Container image pushing ${{
|
||||
steps.push.outcome == 'failure' && 'failed completely' || 'succeeded with some retries'
|
||||
}}* in
|
||||
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
|
||||
|
||||
${{ steps.push.outputs.push_failures && format(
|
||||
'*Failed targets:*\n• {0}', join(fromJson(steps.push.outputs.push_failures), '\n• ')
|
||||
) || '' }}
|
||||
text: |
|
||||
Pushing container images failed in <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
|
||||
|
||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -4329,7 +4329,6 @@ dependencies = [
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror 1.0.69",
|
||||
"tracing-utils",
|
||||
"utils",
|
||||
]
|
||||
|
||||
@@ -7604,7 +7603,6 @@ dependencies = [
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"opentelemetry_sdk",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
|
||||
@@ -292,7 +292,7 @@ WORKDIR /home/nonroot
|
||||
|
||||
# Rust
|
||||
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
|
||||
ENV RUSTC_VERSION=1.86.0
|
||||
ENV RUSTC_VERSION=1.85.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
ARG RUSTFILT_VERSION=0.2.1
|
||||
|
||||
@@ -1055,6 +1055,34 @@ RUN if [ -d pg_embedding-src ]; then \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install; \
|
||||
fi
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_anon-build"
|
||||
# compile anon extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg_anon-src
|
||||
ARG PG_VERSION
|
||||
|
||||
# This is an experimental extension, never got to real production.
|
||||
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
|
||||
WORKDIR /ext-src
|
||||
RUN case "${PG_VERSION:?}" in "v17") \
|
||||
echo "postgresql_anonymizer does not yet support PG17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/neondatabase/postgresql_anonymizer/archive/refs/tags/neon_1.1.1.tar.gz -O pg_anon.tar.gz && \
|
||||
echo "321ea8d5c1648880aafde850a2c576e4a9e7b9933a34ce272efc839328999fa9 pg_anon.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C .
|
||||
|
||||
FROM pg-build AS pg_anon-build
|
||||
COPY --from=pg_anon-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src
|
||||
RUN if [ -d pg_anon-src ]; then \
|
||||
cd pg_anon-src && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control; \
|
||||
fi
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg build with nonroot user and cargo installed"
|
||||
@@ -1338,8 +1366,8 @@ ARG PG_VERSION
|
||||
# Do not update without approve from proxy team
|
||||
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
|
||||
WORKDIR /ext-src
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.0.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "19be2dc0b3834d643706ed430af998bb4c2cdf24b3c45e7b102bb3a550e8660c pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.2.0.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "5ace028e591f2e000ca10afa5b1ca62203ebff014c2907c0ec3b29c36f28a1bb pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/version = "0.12.6"/version = "0.12.9"/g' pgrx-tests/Cargo.toml && \
|
||||
@@ -1649,6 +1677,7 @@ COPY --from=pg_roaringbitmap-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_semver-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_embedding-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=wal2json-build /usr/local/pgsql /usr/local/pgsql
|
||||
COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
@@ -33,7 +33,6 @@
|
||||
import 'sql_exporter/lfc_hits.libsonnet',
|
||||
import 'sql_exporter/lfc_misses.libsonnet',
|
||||
import 'sql_exporter/lfc_used.libsonnet',
|
||||
import 'sql_exporter/lfc_used_pages.libsonnet',
|
||||
import 'sql_exporter/lfc_writes.libsonnet',
|
||||
import 'sql_exporter/logical_slot_restart_lsn.libsonnet',
|
||||
import 'sql_exporter/max_cluster_size.libsonnet',
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
{
|
||||
metric_name: 'lfc_used_pages',
|
||||
type: 'gauge',
|
||||
help: 'LFC pages used',
|
||||
key_labels: null,
|
||||
values: [
|
||||
'lfc_used_pages',
|
||||
],
|
||||
query: importstr 'sql_exporter/lfc_used_pages.sql',
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
SELECT lfc_value AS lfc_used_pages FROM neon.neon_lfc_stats WHERE lfc_key = 'file_cache_used_pages';
|
||||
@@ -2,6 +2,23 @@ diff --git a/expected/ut-A.out b/expected/ut-A.out
|
||||
index da723b8..5328114 100644
|
||||
--- a/expected/ut-A.out
|
||||
+++ b/expected/ut-A.out
|
||||
@@ -9,13 +9,16 @@ SET search_path TO public;
|
||||
----
|
||||
-- No.A-1-1-3
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
-- No.A-1-2-3
|
||||
DROP EXTENSION pg_hint_plan;
|
||||
-- No.A-1-1-4
|
||||
CREATE SCHEMA other_schema;
|
||||
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
DROP SCHEMA other_schema;
|
||||
----
|
||||
---- No. A-5-1 comment pattern
|
||||
@@ -3175,6 +3178,7 @@ SELECT s.query, s.calls
|
||||
FROM public.pg_stat_statements s
|
||||
JOIN pg_catalog.pg_database d
|
||||
@@ -10,6 +27,18 @@ index da723b8..5328114 100644
|
||||
ORDER BY 1;
|
||||
query | calls
|
||||
--------------------------------------+-------
|
||||
diff --git a/expected/ut-fdw.out b/expected/ut-fdw.out
|
||||
index d372459..6282afe 100644
|
||||
--- a/expected/ut-fdw.out
|
||||
+++ b/expected/ut-fdw.out
|
||||
@@ -7,6 +7,7 @@ SET pg_hint_plan.debug_print TO on;
|
||||
SET client_min_messages TO LOG;
|
||||
SET pg_hint_plan.enable_hint TO on;
|
||||
CREATE EXTENSION file_fdw;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
|
||||
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
|
||||
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
|
||||
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');
|
||||
diff --git a/sql/ut-A.sql b/sql/ut-A.sql
|
||||
index 7c7d58a..4fd1a07 100644
|
||||
--- a/sql/ut-A.sql
|
||||
|
||||
@@ -1,3 +1,24 @@
|
||||
diff --git a/expected/ut-A.out b/expected/ut-A.out
|
||||
index e7d68a1..65a056c 100644
|
||||
--- a/expected/ut-A.out
|
||||
+++ b/expected/ut-A.out
|
||||
@@ -9,13 +9,16 @@ SET search_path TO public;
|
||||
----
|
||||
-- No.A-1-1-3
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
-- No.A-1-2-3
|
||||
DROP EXTENSION pg_hint_plan;
|
||||
-- No.A-1-1-4
|
||||
CREATE SCHEMA other_schema;
|
||||
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
DROP SCHEMA other_schema;
|
||||
----
|
||||
---- No. A-5-1 comment pattern
|
||||
diff --git a/expected/ut-J.out b/expected/ut-J.out
|
||||
index 2fa3c70..314e929 100644
|
||||
--- a/expected/ut-J.out
|
||||
@@ -139,3 +160,15 @@ index a09bd34..0ad227c 100644
|
||||
error hint:
|
||||
|
||||
explain_filter
|
||||
diff --git a/expected/ut-fdw.out b/expected/ut-fdw.out
|
||||
index 017fa4b..98d989b 100644
|
||||
--- a/expected/ut-fdw.out
|
||||
+++ b/expected/ut-fdw.out
|
||||
@@ -7,6 +7,7 @@ SET pg_hint_plan.debug_print TO on;
|
||||
SET client_min_messages TO LOG;
|
||||
SET pg_hint_plan.enable_hint TO on;
|
||||
CREATE EXTENSION file_fdw;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
|
||||
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
|
||||
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
|
||||
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');
|
||||
|
||||
@@ -419,7 +419,7 @@ impl ComputeNode {
|
||||
.iter()
|
||||
.filter_map(|val| val.parse::<usize>().ok())
|
||||
.map(|val| if val > 1 { val - 1 } else { 1 })
|
||||
.next_back()
|
||||
.last()
|
||||
.unwrap_or(3)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -385,6 +385,8 @@ where
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
|
||||
|
||||
let ssl_ca_certs = match &cli.ssl_ca_file {
|
||||
Some(ssl_ca_file) => {
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
@@ -399,11 +401,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
let storcon_client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
|
||||
|
||||
let mut trimmed = cli.api.to_string();
|
||||
trimmed.pop();
|
||||
let vps_client = mgmt_api::Client::new(http_client.clone(), trimmed, cli.jwt.as_deref());
|
||||
let vps_client = mgmt_api::Client::new(http_client, trimmed, cli.jwt.as_deref());
|
||||
|
||||
match cli.command {
|
||||
Command::NodeRegister {
|
||||
@@ -1056,7 +1056,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
|
||||
let mut stream = futures::stream::iter(moves)
|
||||
.map(|mv| {
|
||||
let client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
|
||||
let client = Client::new(cli.api.clone(), cli.jwt.clone());
|
||||
async move {
|
||||
client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
|
||||
@@ -91,14 +91,14 @@ impl Server {
|
||||
Ok(tls_stream) => tls_stream,
|
||||
Err(err) => {
|
||||
if !suppress_io_error(&err) {
|
||||
info!(%remote_addr, "Failed to accept TLS connection: {err:#}");
|
||||
info!("Failed to accept TLS connection: {err:#}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await {
|
||||
if !suppress_hyper_error(&err) {
|
||||
info!(%remote_addr, "Failed to serve HTTPS connection: {err:#}");
|
||||
info!("Failed to serve HTTPS connection: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -106,7 +106,7 @@ impl Server {
|
||||
// Handle HTTP connection.
|
||||
if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await {
|
||||
if !suppress_hyper_error(&err) {
|
||||
info!(%remote_addr, "Failed to serve HTTP connection: {err:#}");
|
||||
info!("Failed to serve HTTP connection: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +34,6 @@ postgres_backend.workspace = true
|
||||
nix = {workspace = true, optional = true}
|
||||
reqwest.workspace = true
|
||||
rand.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
bincode.workspace = true
|
||||
|
||||
@@ -134,7 +134,6 @@ pub struct ConfigToml {
|
||||
pub load_previous_heatmap: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generate_unarchival_heatmap: Option<bool>,
|
||||
pub tracing: Option<Tracing>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -192,54 +191,6 @@ pub enum GetVectoredConcurrentIo {
|
||||
SidecarTask,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Ratio {
|
||||
pub numerator: usize,
|
||||
pub denominator: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct OtelExporterConfig {
|
||||
pub endpoint: String,
|
||||
pub protocol: OtelExporterProtocol,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum OtelExporterProtocol {
|
||||
Grpc,
|
||||
HttpBinary,
|
||||
HttpJson,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Tracing {
|
||||
pub sampling_ratio: Ratio,
|
||||
pub export_config: OtelExporterConfig,
|
||||
}
|
||||
|
||||
impl From<&OtelExporterConfig> for tracing_utils::ExportConfig {
|
||||
fn from(val: &OtelExporterConfig) -> Self {
|
||||
tracing_utils::ExportConfig {
|
||||
endpoint: Some(val.endpoint.clone()),
|
||||
protocol: val.protocol.into(),
|
||||
timeout: val.timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OtelExporterProtocol> for tracing_utils::Protocol {
|
||||
fn from(val: OtelExporterProtocol) -> Self {
|
||||
match val {
|
||||
OtelExporterProtocol::Grpc => tracing_utils::Protocol::Grpc,
|
||||
OtelExporterProtocol::HttpJson => tracing_utils::Protocol::HttpJson,
|
||||
OtelExporterProtocol::HttpBinary => tracing_utils::Protocol::HttpBinary,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod statvfs {
|
||||
pub mod mock {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -586,7 +537,6 @@ impl Default for ConfigToml {
|
||||
validate_wal_contiguity: None,
|
||||
load_previous_heatmap: None,
|
||||
generate_unarchival_heatmap: None,
|
||||
tracing: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -558,7 +558,7 @@ async fn upload_large_enough_file(
|
||||
) -> usize {
|
||||
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
|
||||
let body = bytes::Bytes::from(vec![0u8; 1024]);
|
||||
let contents = std::iter::once(header).chain(std::iter::repeat_n(body, 128));
|
||||
let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
|
||||
|
||||
let len = contents.clone().fold(0, |acc, next| acc + next.len());
|
||||
|
||||
|
||||
@@ -71,7 +71,6 @@ pub struct PeerInfo {
|
||||
pub ts: Instant,
|
||||
pub pg_connstr: String,
|
||||
pub http_connstr: String,
|
||||
pub https_connstr: Option<String>,
|
||||
}
|
||||
|
||||
pub type FullTransactionId = u64;
|
||||
@@ -228,8 +227,6 @@ pub struct TimelineDeleteResult {
|
||||
pub dir_existed: bool,
|
||||
}
|
||||
|
||||
pub type TenantDeleteResult = std::collections::HashMap<String, TimelineDeleteResult>;
|
||||
|
||||
fn lsn_invalid() -> Lsn {
|
||||
Lsn::INVALID
|
||||
}
|
||||
@@ -262,8 +259,6 @@ pub struct SkTimelineInfo {
|
||||
pub safekeeper_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub http_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub https_connstr: Option<String>,
|
||||
// Minimum of all active RO replicas flush LSN
|
||||
#[serde(default = "lsn_invalid")]
|
||||
pub standby_horizon: Lsn,
|
||||
|
||||
@@ -14,7 +14,6 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tracing.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber.workspace = true # For examples in docs
|
||||
|
||||
@@ -31,10 +31,10 @@
|
||||
//! .init();
|
||||
//! }
|
||||
//! ```
|
||||
#![deny(unsafe_code)]
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
pub mod http;
|
||||
pub mod perf_span;
|
||||
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
|
||||
@@ -1,153 +0,0 @@
|
||||
//! Crutch module to work around tracing infrastructure deficiencies
|
||||
//!
|
||||
//! We wish to collect granular request spans without impacting performance
|
||||
//! by much. Ideally, we should have zero overhead for a sampling rate of 0.
|
||||
//!
|
||||
//! The approach taken by the pageserver crate is to use a completely different
|
||||
//! span hierarchy for the performance spans. Spans are explicitly stored in
|
||||
//! the request context and use a different [`tracing::Subscriber`] in order
|
||||
//! to avoid expensive filtering.
|
||||
//!
|
||||
//! [`tracing::Span`] instances record their [`tracing::Dispatch`] and, implcitly,
|
||||
//! their [`tracing::Subscriber`] at creation time. However, upon exiting the span,
|
||||
//! the global default [`tracing::Dispatch`] is used. This is problematic if one
|
||||
//! wishes to juggle different subscribers.
|
||||
//!
|
||||
//! In order to work around this, this module provides a [`PerfSpan`] type which
|
||||
//! wraps a [`Span`] and sets the default subscriber when exiting the span. This
|
||||
//! achieves the correct routing.
|
||||
//!
|
||||
//! There's also a modified version of [`tracing::Instrument`] which works with
|
||||
//! [`PerfSpan`].
|
||||
|
||||
use core::{
|
||||
future::Future,
|
||||
marker::Sized,
|
||||
mem::ManuallyDrop,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use pin_project_lite::pin_project;
|
||||
use tracing::{Dispatch, field, span::Span};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PerfSpan {
|
||||
inner: ManuallyDrop<Span>,
|
||||
dispatch: Dispatch,
|
||||
}
|
||||
|
||||
#[must_use = "once a span has been entered, it should be exited"]
|
||||
pub struct PerfSpanEntered<'a> {
|
||||
span: &'a PerfSpan,
|
||||
}
|
||||
|
||||
impl PerfSpan {
|
||||
pub fn new(span: Span, dispatch: Dispatch) -> Self {
|
||||
Self {
|
||||
inner: ManuallyDrop::new(span),
|
||||
dispatch,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record<Q: field::AsField + ?Sized, V: field::Value>(
|
||||
&self,
|
||||
field: &Q,
|
||||
value: V,
|
||||
) -> &Self {
|
||||
self.inner.record(field, value);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn enter(&self) -> PerfSpanEntered {
|
||||
if let Some(ref id) = self.inner.id() {
|
||||
self.dispatch.enter(id);
|
||||
}
|
||||
|
||||
PerfSpanEntered { span: self }
|
||||
}
|
||||
|
||||
pub fn inner(&self) -> &Span {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PerfSpan {
|
||||
fn drop(&mut self) {
|
||||
// Bring the desired dispatch into scope before explicitly calling
|
||||
// the span destructor. This routes the span exit to the correct
|
||||
// [`tracing::Subscriber`].
|
||||
let _dispatch_guard = tracing::dispatcher::set_default(&self.dispatch);
|
||||
// SAFETY: ManuallyDrop in Drop implementation
|
||||
unsafe { ManuallyDrop::drop(&mut self.inner) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PerfSpanEntered<'_> {
|
||||
fn drop(&mut self) {
|
||||
assert!(self.span.inner.id().is_some());
|
||||
|
||||
let _dispatch_guard = tracing::dispatcher::set_default(&self.span.dispatch);
|
||||
self.span.dispatch.exit(&self.span.inner.id().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
pub trait PerfInstrument: Sized {
|
||||
fn instrument(self, span: PerfSpan) -> PerfInstrumented<Self> {
|
||||
PerfInstrumented {
|
||||
inner: ManuallyDrop::new(self),
|
||||
span,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
#[project = PerfInstrumentedProj]
|
||||
#[derive(Debug, Clone)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct PerfInstrumented<T> {
|
||||
// `ManuallyDrop` is used here to to enter instrument `Drop` by entering
|
||||
// `Span` and executing `ManuallyDrop::drop`.
|
||||
#[pin]
|
||||
inner: ManuallyDrop<T>,
|
||||
span: PerfSpan,
|
||||
}
|
||||
|
||||
impl<T> PinnedDrop for PerfInstrumented<T> {
|
||||
fn drop(this: Pin<&mut Self>) {
|
||||
let this = this.project();
|
||||
let _enter = this.span.enter();
|
||||
// SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
|
||||
// different from wrapping `T` in `Option` and calling
|
||||
// `Pin::set(&mut this.inner, None)`, except avoiding
|
||||
// additional memory overhead.
|
||||
// 2. `ManuallyDrop::drop()` is safe, because
|
||||
// `PinnedDrop::drop()` is guaranteed to be called only
|
||||
// once.
|
||||
unsafe { ManuallyDrop::drop(this.inner.get_unchecked_mut()) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> PerfInstrumentedProj<'a, T> {
|
||||
/// Get a mutable reference to the [`Span`] a pinned mutable reference to
|
||||
/// the wrapped type.
|
||||
fn span_and_inner_pin_mut(self) -> (&'a mut PerfSpan, Pin<&'a mut T>) {
|
||||
// SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
|
||||
// and `inner` is valid, because `ManuallyDrop::drop` is called
|
||||
// only inside `Drop` of the `Instrumented`.
|
||||
let inner = unsafe { self.inner.map_unchecked_mut(|v| &mut **v) };
|
||||
(self.span, inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Future> Future for PerfInstrumented<T> {
|
||||
type Output = T::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let (span, inner) = self.project().span_and_inner_pin_mut();
|
||||
let _enter = span.enter();
|
||||
inner.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Sized> PerfInstrument for T {}
|
||||
@@ -35,7 +35,6 @@ use tokio::signal::unix::SignalKind;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use tracing_utils::OtelGuard;
|
||||
use utils::auth::{JwtAuth, SwappableJwtAuth};
|
||||
use utils::crashsafe::syncfs;
|
||||
use utils::logging::TracingErrorLayerEnablement;
|
||||
@@ -119,21 +118,6 @@ fn main() -> anyhow::Result<()> {
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
let otel_enablement = match &conf.tracing {
|
||||
Some(cfg) => tracing_utils::OtelEnablement::Enabled {
|
||||
service_name: "pageserver".to_string(),
|
||||
export_config: (&cfg.export_config).into(),
|
||||
runtime: *COMPUTE_REQUEST_RUNTIME,
|
||||
},
|
||||
None => tracing_utils::OtelEnablement::Disabled,
|
||||
};
|
||||
|
||||
let otel_guard = tracing_utils::init_performance_tracing(otel_enablement);
|
||||
|
||||
if otel_guard.is_some() {
|
||||
info!(?conf.tracing, "starting with OTEL tracing enabled");
|
||||
}
|
||||
|
||||
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
|
||||
// disarming this hook on pageserver, because we never tear down tracing.
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
@@ -207,7 +191,7 @@ fn main() -> anyhow::Result<()> {
|
||||
tracing::info!("Initializing page_cache...");
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf, otel_guard).context("Failed to start pageserver")?;
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
|
||||
scenario.teardown();
|
||||
Ok(())
|
||||
@@ -306,7 +290,6 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
|
||||
fn start_pageserver(
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
conf: &'static PageServerConf,
|
||||
otel_guard: Option<OtelGuard>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Monotonic time for later calculating startup duration
|
||||
let started_startup_at = Instant::now();
|
||||
@@ -692,21 +675,13 @@ fn start_pageserver(
|
||||
|
||||
// Spawn a task to listen for libpq connections. It will spawn further tasks
|
||||
// for each connection. We created the listener earlier already.
|
||||
let perf_trace_dispatch = otel_guard.as_ref().map(|g| g.dispatch.clone());
|
||||
let page_service = page_service::spawn(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
pg_auth,
|
||||
perf_trace_dispatch,
|
||||
{
|
||||
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
|
||||
pageserver_listener
|
||||
.set_nonblocking(true)
|
||||
.context("set listener to nonblocking")?;
|
||||
tokio::net::TcpListener::from_std(pageserver_listener)
|
||||
.context("create tokio listener")?
|
||||
},
|
||||
);
|
||||
let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, {
|
||||
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
|
||||
pageserver_listener
|
||||
.set_nonblocking(true)
|
||||
.context("set listener to nonblocking")?;
|
||||
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
|
||||
});
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
BACKGROUND_RUNTIME.block_on(async move {
|
||||
|
||||
@@ -215,8 +215,6 @@ pub struct PageServerConf {
|
||||
|
||||
/// When set, include visible layers in the next uploaded heatmaps of an unarchived timeline.
|
||||
pub generate_unarchival_heatmap: bool,
|
||||
|
||||
pub tracing: Option<pageserver_api::config::Tracing>,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -388,7 +386,6 @@ impl PageServerConf {
|
||||
validate_wal_contiguity,
|
||||
load_previous_heatmap,
|
||||
generate_unarchival_heatmap,
|
||||
tracing,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -438,7 +435,6 @@ impl PageServerConf {
|
||||
wal_receiver_protocol,
|
||||
page_service_pipelining,
|
||||
get_vectored_concurrent_io,
|
||||
tracing,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
@@ -510,17 +506,6 @@ impl PageServerConf {
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(tracing_config) = conf.tracing.as_ref() {
|
||||
let ratio = &tracing_config.sampling_ratio;
|
||||
ensure!(
|
||||
ratio.denominator != 0 && ratio.denominator >= ratio.numerator,
|
||||
format!(
|
||||
"Invalid sampling ratio: {}/{}",
|
||||
ratio.numerator, ratio.denominator
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
|
||||
.map_err(anyhow::Error::msg)
|
||||
.with_context(|| {
|
||||
|
||||
@@ -100,12 +100,6 @@ use crate::{
|
||||
task_mgr::TaskKind,
|
||||
tenant::Timeline,
|
||||
};
|
||||
use futures::FutureExt;
|
||||
use futures::future::BoxFuture;
|
||||
use std::future::Future;
|
||||
use tracing_utils::perf_span::{PerfInstrument, PerfSpan};
|
||||
|
||||
use tracing::{Dispatch, Span};
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
pub struct RequestContext {
|
||||
@@ -115,8 +109,6 @@ pub struct RequestContext {
|
||||
page_content_kind: PageContentKind,
|
||||
read_path_debug: bool,
|
||||
scope: Scope,
|
||||
perf_span: Option<PerfSpan>,
|
||||
perf_span_dispatch: Option<Dispatch>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -271,15 +263,22 @@ impl RequestContextBuilder {
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
read_path_debug: false,
|
||||
scope: Scope::new_global(),
|
||||
perf_span: None,
|
||||
perf_span_dispatch: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from(original: &RequestContext) -> Self {
|
||||
pub fn extend(original: &RequestContext) -> Self {
|
||||
Self {
|
||||
inner: original.clone(),
|
||||
// This is like a Copy, but avoid implementing Copy because ordinary users of
|
||||
// RequestContext should always move or ref it.
|
||||
inner: RequestContext {
|
||||
task_kind: original.task_kind,
|
||||
download_behavior: original.download_behavior,
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
read_path_debug: original.read_path_debug,
|
||||
scope: original.scope.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -317,74 +316,12 @@ impl RequestContextBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn perf_span_dispatch(mut self, dispatch: Option<Dispatch>) -> Self {
|
||||
self.inner.perf_span_dispatch = dispatch;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn root_perf_span<Fn>(mut self, make_span: Fn) -> Self
|
||||
where
|
||||
Fn: FnOnce() -> Span,
|
||||
{
|
||||
assert!(self.inner.perf_span.is_none());
|
||||
assert!(self.inner.perf_span_dispatch.is_some());
|
||||
|
||||
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
|
||||
let new_span = tracing::dispatcher::with_default(dispatcher, make_span);
|
||||
|
||||
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn perf_span<Fn>(mut self, make_span: Fn) -> Self
|
||||
where
|
||||
Fn: FnOnce(&Span) -> Span,
|
||||
{
|
||||
if let Some(ref perf_span) = self.inner.perf_span {
|
||||
assert!(self.inner.perf_span_dispatch.is_some());
|
||||
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
|
||||
|
||||
let new_span =
|
||||
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
|
||||
|
||||
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn root(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn attached_child(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn detached_child(self) -> RequestContext {
|
||||
pub fn build(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestContext {
|
||||
/// Private clone implementation
|
||||
///
|
||||
/// Callers should use the [`RequestContextBuilder`] or child spaning APIs of
|
||||
/// [`RequestContext`].
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
task_kind: self.task_kind,
|
||||
download_behavior: self.download_behavior,
|
||||
access_stats_behavior: self.access_stats_behavior,
|
||||
page_content_kind: self.page_content_kind,
|
||||
read_path_debug: self.read_path_debug,
|
||||
scope: self.scope.clone(),
|
||||
perf_span: self.perf_span.clone(),
|
||||
perf_span_dispatch: self.perf_span_dispatch.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new RequestContext that has no parent.
|
||||
///
|
||||
/// The function is called `new` because, once we add children
|
||||
@@ -400,7 +337,7 @@ impl RequestContext {
|
||||
pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
RequestContextBuilder::new(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.root()
|
||||
.build()
|
||||
}
|
||||
|
||||
/// Create a detached child context for a task that may outlive `self`.
|
||||
@@ -421,10 +358,7 @@ impl RequestContext {
|
||||
///
|
||||
/// We could make new calls to this function fail if `self` is already canceled.
|
||||
pub fn detached_child(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.detached_child()
|
||||
self.child_impl(task_kind, download_behavior)
|
||||
}
|
||||
|
||||
/// Create a child of context `self` for a task that shall not outlive `self`.
|
||||
@@ -448,7 +382,7 @@ impl RequestContext {
|
||||
/// The method to wait for child tasks would return an error, indicating
|
||||
/// that the child task was not started because the context was canceled.
|
||||
pub fn attached_child(&self) -> Self {
|
||||
RequestContextBuilder::from(self).attached_child()
|
||||
self.child_impl(self.task_kind(), self.download_behavior())
|
||||
}
|
||||
|
||||
/// Use this function when you should be creating a child context using
|
||||
@@ -463,10 +397,17 @@ impl RequestContext {
|
||||
Self::new(task_kind, download_behavior)
|
||||
}
|
||||
|
||||
fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
.task_kind(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn with_scope_timeline(&self, timeline: &Arc<Timeline>) -> Self {
|
||||
RequestContextBuilder::from(self)
|
||||
RequestContextBuilder::extend(self)
|
||||
.scope(Scope::new_timeline(timeline))
|
||||
.attached_child()
|
||||
.build()
|
||||
}
|
||||
|
||||
pub(crate) fn with_scope_page_service_pagestream(
|
||||
@@ -475,9 +416,9 @@ impl RequestContext {
|
||||
crate::page_service::TenantManagerTypes,
|
||||
>,
|
||||
) -> Self {
|
||||
RequestContextBuilder::from(self)
|
||||
RequestContextBuilder::extend(self)
|
||||
.scope(Scope::new_page_service_pagestream(timeline_handle))
|
||||
.attached_child()
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn with_scope_secondary_timeline(
|
||||
@@ -485,30 +426,28 @@ impl RequestContext {
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> Self {
|
||||
RequestContextBuilder::from(self)
|
||||
RequestContextBuilder::extend(self)
|
||||
.scope(Scope::new_secondary_timeline(tenant_shard_id, timeline_id))
|
||||
.attached_child()
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn with_scope_secondary_tenant(&self, tenant_shard_id: &TenantShardId) -> Self {
|
||||
RequestContextBuilder::from(self)
|
||||
RequestContextBuilder::extend(self)
|
||||
.scope(Scope::new_secondary_tenant(tenant_shard_id))
|
||||
.attached_child()
|
||||
.build()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn with_scope_unit_test(&self) -> Self {
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(TaskKind::UnitTest)
|
||||
RequestContextBuilder::new(TaskKind::UnitTest)
|
||||
.scope(Scope::new_unit_test())
|
||||
.attached_child()
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn with_scope_debug_tools(&self) -> Self {
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(TaskKind::DebugTool)
|
||||
RequestContextBuilder::new(TaskKind::DebugTool)
|
||||
.scope(Scope::new_debug_tools())
|
||||
.attached_child()
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn task_kind(&self) -> TaskKind {
|
||||
@@ -565,61 +504,4 @@ impl RequestContext {
|
||||
Scope::DebugTools { io_size_metrics } => io_size_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
|
||||
if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
|
||||
span.inner().follows_from(from_span.inner());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn perf_span_record<
|
||||
Q: tracing::field::AsField + ?Sized,
|
||||
V: tracing::field::Value,
|
||||
>(
|
||||
&self,
|
||||
field: &Q,
|
||||
value: V,
|
||||
) {
|
||||
if let Some(span) = &self.perf_span {
|
||||
span.record(field, value);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn has_perf_span(&self) -> bool {
|
||||
self.perf_span.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
/// [`Future`] extension trait that allow for creating performance
|
||||
/// spans on sampled requests
|
||||
pub(crate) trait PerfInstrumentFutureExt<'a>: Future + Send {
|
||||
/// Instrument this future with a new performance span when the
|
||||
/// provided request context indicates the originator request
|
||||
/// was sampled. Otherwise, just box the future and return it as is.
|
||||
fn maybe_perf_instrument<Fn>(
|
||||
self,
|
||||
ctx: &RequestContext,
|
||||
make_span: Fn,
|
||||
) -> BoxFuture<'a, Self::Output>
|
||||
where
|
||||
Self: Sized + 'a,
|
||||
Fn: FnOnce(&Span) -> Span,
|
||||
{
|
||||
match &ctx.perf_span {
|
||||
Some(perf_span) => {
|
||||
assert!(ctx.perf_span_dispatch.is_some());
|
||||
let dispatcher = ctx.perf_span_dispatch.as_ref().unwrap();
|
||||
|
||||
let new_span =
|
||||
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
|
||||
|
||||
let new_perf_span = PerfSpan::new(new_span, dispatcher.clone());
|
||||
self.instrument(new_perf_span).boxed()
|
||||
}
|
||||
None => self.boxed(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Implement the trait for all types that satisfy the trait bounds
|
||||
impl<'a, T: Future + Send + 'a> PerfInstrumentFutureExt<'a> for T {}
|
||||
|
||||
@@ -2697,12 +2697,11 @@ async fn getpage_at_lsn_handler_inner(
|
||||
let lsn: Option<Lsn> = parse_query_param(&request, "lsn")?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
// Enable read path debugging
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.read_path_debug(true)
|
||||
.root();
|
||||
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true)
|
||||
.scope(context::Scope::new_timeline(&timeline)).build();
|
||||
|
||||
// Use last_record_lsn if no lsn is provided
|
||||
let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
|
||||
@@ -3189,8 +3188,7 @@ async fn list_aux_files(
|
||||
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
|
||||
);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download)
|
||||
.with_scope_timeline(&timeline);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let files = timeline
|
||||
.list_aux_files(body.lsn, &ctx, io_concurrency)
|
||||
.await?;
|
||||
@@ -3434,15 +3432,14 @@ async fn put_tenant_timeline_import_wal(
|
||||
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let span = info_span!("import_wal", tenant_id=%tenant_id, timeline_id=%timeline_id, start_lsn=%start_lsn, end_lsn=%end_lsn);
|
||||
async move {
|
||||
let state = get_state(&request);
|
||||
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, TenantShardId::unsharded(tenant_id), timeline_id).await?;
|
||||
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
|
||||
.download_behavior(DownloadBehavior::Warn)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.root();
|
||||
let ctx = RequestContextBuilder::extend(&ctx).scope(context::Scope::new_timeline(&timeline)).build();
|
||||
|
||||
let mut body = StreamReader::new(request.into_body().map(|res| {
|
||||
res.map_err(|error| {
|
||||
|
||||
@@ -55,9 +55,6 @@ pub const DEFAULT_PG_VERSION: u32 = 16;
|
||||
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
|
||||
pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
|
||||
|
||||
// Target used for performance traces.
|
||||
pub const PERF_TRACE_TARGET: &str = "P";
|
||||
|
||||
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
pub use crate::metrics::preinitialize_metrics;
|
||||
|
||||
@@ -9,7 +9,6 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{io, str};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, bail};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
@@ -18,7 +17,7 @@ use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{
|
||||
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
PageServiceProtocolPipelinedExecutionStrategy, Tracing,
|
||||
PageServiceProtocolPipelinedExecutionStrategy,
|
||||
};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::models::{
|
||||
@@ -37,7 +36,6 @@ use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
|
||||
use rand::Rng;
|
||||
use strum_macros::IntoStaticStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -55,9 +53,7 @@ use utils::sync::spsc_fold;
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::{
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
|
||||
TimelineMetrics,
|
||||
@@ -104,7 +100,6 @@ pub fn spawn(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
pg_auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
tcp_listener: tokio::net::TcpListener,
|
||||
) -> Listener {
|
||||
let cancel = CancellationToken::new();
|
||||
@@ -122,7 +117,6 @@ pub fn spawn(
|
||||
conf,
|
||||
tenant_manager,
|
||||
pg_auth,
|
||||
perf_trace_dispatch,
|
||||
tcp_listener,
|
||||
conf.pg_auth_type,
|
||||
conf.page_service_pipelining.clone(),
|
||||
@@ -179,7 +173,6 @@ pub async fn libpq_listener_main(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
listener: tokio::net::TcpListener,
|
||||
auth_type: AuthType,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
@@ -212,12 +205,8 @@ pub async fn libpq_listener_main(
|
||||
// Connection established. Spawn a new task to handle it.
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
let local_auth = auth.clone();
|
||||
let connection_ctx = RequestContextBuilder::from(&listener_ctx)
|
||||
.task_kind(TaskKind::PageRequestHandler)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.perf_span_dispatch(perf_trace_dispatch.clone())
|
||||
.detached_child();
|
||||
|
||||
let connection_ctx = listener_ctx
|
||||
.detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
|
||||
connection_handler_tasks.spawn(page_service_conn_main(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
@@ -618,7 +607,6 @@ impl std::fmt::Display for BatchedPageStreamError {
|
||||
struct BatchedGetPageRequest {
|
||||
req: PagestreamGetPageRequest,
|
||||
timer: SmgrOpTimer,
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -755,7 +743,6 @@ impl PageServerHandler {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
timeline_handles: &mut TimelineHandles,
|
||||
tracing_config: Option<&Tracing>,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
protocol_version: PagestreamProtocolVersion,
|
||||
@@ -915,51 +902,10 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
|
||||
let sampled = match tracing_config {
|
||||
Some(conf) => {
|
||||
let ratio = &conf.sampling_ratio;
|
||||
|
||||
if ratio.numerator == 0 {
|
||||
false
|
||||
} else {
|
||||
rand::thread_rng().gen_range(0..ratio.denominator) < ratio.numerator
|
||||
}
|
||||
}
|
||||
None => false,
|
||||
};
|
||||
|
||||
let ctx = if sampled {
|
||||
RequestContextBuilder::from(ctx)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"GET_PAGE",
|
||||
tenant_id = %tenant_id,
|
||||
shard_id = field::Empty,
|
||||
timeline_id = %timeline_id,
|
||||
lsn = %req.hdr.request_lsn,
|
||||
request_id = %req.hdr.reqid,
|
||||
key = %key,
|
||||
)
|
||||
})
|
||||
.attached_child()
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
|
||||
let res = timeline_handles
|
||||
let shard = match timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Page(key))
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"SHARD_SELECTION",
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let shard = match res {
|
||||
.await
|
||||
{
|
||||
Ok(tl) => tl,
|
||||
Err(e) => {
|
||||
let span = mkspan!(before shard routing);
|
||||
@@ -986,60 +932,26 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// This ctx travels as part of the BatchedFeMessage through
|
||||
// batching into the request handler.
|
||||
// The request handler needs to do some per-request work
|
||||
// (relsize check) before dispatching the batch as a single
|
||||
// get_vectored call to the Timeline.
|
||||
// This ctx will be used for the reslize check, whereas the
|
||||
// get_vectored call will be a different ctx with separate
|
||||
// perf span.
|
||||
let ctx = ctx.with_scope_page_service_pagestream(&shard);
|
||||
|
||||
// Similar game for this `span`: we funnel it through so that
|
||||
// request handler log messages contain the request-specific fields.
|
||||
let span = mkspan!(shard.tenant_shard_id.shard_slug());
|
||||
|
||||
// Enrich the perf span with shard_id now that shard routing is done.
|
||||
ctx.perf_span_record(
|
||||
"shard_id",
|
||||
tracing::field::display(shard.get_shard_identity().shard_slug()),
|
||||
);
|
||||
|
||||
let timer = record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
)
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"THROTTLE",
|
||||
)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// We're holding the Handle
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
let res = Self::wait_or_get_last_lsn(
|
||||
let effective_request_lsn = match Self::wait_or_get_last_lsn(
|
||||
&shard,
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&shard.get_applied_gc_cutoff_lsn(),
|
||||
&ctx,
|
||||
ctx,
|
||||
)
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"WAIT_LSN",
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let effective_request_lsn = match res {
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
.await
|
||||
{
|
||||
Ok(lsn) => lsn,
|
||||
Err(e) => {
|
||||
return respond_error!(span, e);
|
||||
@@ -1049,7 +961,7 @@ impl PageServerHandler {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
effective_request_lsn,
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer, ctx }],
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -1602,15 +1514,12 @@ impl PageServerHandler {
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
let cancel = self.cancel.clone();
|
||||
let tracing_config = self.conf.tracing.clone();
|
||||
|
||||
let err = loop {
|
||||
let msg = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
tracing_config.as_ref(),
|
||||
&cancel,
|
||||
ctx,
|
||||
protocol_version,
|
||||
@@ -1744,8 +1653,6 @@ impl PageServerHandler {
|
||||
// Batcher
|
||||
//
|
||||
|
||||
let tracing_config = self.conf.tracing.clone();
|
||||
|
||||
let cancel_batcher = self.cancel.child_token();
|
||||
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
|
||||
let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
|
||||
@@ -1759,7 +1666,6 @@ impl PageServerHandler {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
tracing_config.as_ref(),
|
||||
&cancel_batcher,
|
||||
&ctx,
|
||||
protocol_version,
|
||||
@@ -2098,9 +2004,7 @@ impl PageServerHandler {
|
||||
|
||||
let results = timeline
|
||||
.get_rel_page_at_lsn_batched(
|
||||
requests
|
||||
.iter()
|
||||
.map(|p| (&p.req.rel, &p.req.blkno, p.ctx.attached_child())),
|
||||
requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
|
||||
effective_lsn,
|
||||
io_concurrency,
|
||||
ctx,
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
|
||||
use std::ops::{ControlFlow, Range};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, ensure};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
@@ -32,7 +31,7 @@ use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, info_span, trace, warn};
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use utils::bin_ser::{BeSer, DeserializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pausable_failpoint;
|
||||
@@ -40,7 +39,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
|
||||
use crate::context::RequestContext;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
@@ -210,9 +209,7 @@ impl Timeline {
|
||||
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
|
||||
let res = self
|
||||
.get_rel_page_at_lsn_batched(
|
||||
pages
|
||||
.iter()
|
||||
.map(|(tag, blknum)| (tag, blknum, ctx.attached_child())),
|
||||
pages.iter().map(|(tag, blknum)| (tag, blknum)),
|
||||
effective_lsn,
|
||||
io_concurrency.clone(),
|
||||
ctx,
|
||||
@@ -251,7 +248,7 @@ impl Timeline {
|
||||
/// The ordering of the returned vec corresponds to the ordering of `pages`.
|
||||
pub(crate) async fn get_rel_page_at_lsn_batched(
|
||||
&self,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, RequestContext)>,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
|
||||
effective_lsn: Lsn,
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
@@ -265,11 +262,8 @@ impl Timeline {
|
||||
let mut result = Vec::with_capacity(pages.len());
|
||||
let result_slots = result.spare_capacity_mut();
|
||||
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
|
||||
BTreeMap::default();
|
||||
|
||||
let mut perf_instrument = false;
|
||||
for (response_slot_idx, (tag, blknum, ctx)) in pages.enumerate() {
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
|
||||
for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
|
||||
if tag.relnode == 0 {
|
||||
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
@@ -280,16 +274,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let nblocks = match self
|
||||
.get_rel_size(*tag, Version::Lsn(effective_lsn), &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_REL_SIZE",
|
||||
reltag=%tag,
|
||||
lsn=%effective_lsn,
|
||||
)
|
||||
})
|
||||
.get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
|
||||
.await
|
||||
{
|
||||
Ok(nblocks) => nblocks,
|
||||
@@ -312,12 +297,8 @@ impl Timeline {
|
||||
|
||||
let key = rel_block_to_key(*tag, *blknum);
|
||||
|
||||
if ctx.has_perf_span() {
|
||||
perf_instrument = true;
|
||||
}
|
||||
|
||||
let key_slots = keys_slots.entry(key).or_default();
|
||||
key_slots.push((response_slot_idx, ctx));
|
||||
key_slots.push(response_slot_idx);
|
||||
}
|
||||
|
||||
let keyspace = {
|
||||
@@ -333,34 +314,16 @@ impl Timeline {
|
||||
acc.to_keyspace()
|
||||
};
|
||||
|
||||
let ctx = match perf_instrument {
|
||||
true => RequestContextBuilder::from(ctx)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"GET_VECTORED",
|
||||
tenant_id = %self.tenant_shard_id.tenant_id,
|
||||
timeline_id = %self.timeline_id,
|
||||
lsn = %effective_lsn,
|
||||
shard = %self.tenant_shard_id.shard_slug(),
|
||||
)
|
||||
})
|
||||
.attached_child(),
|
||||
false => ctx.attached_child(),
|
||||
};
|
||||
|
||||
let res = self
|
||||
.get_vectored(keyspace, effective_lsn, io_concurrency, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
|
||||
.await;
|
||||
|
||||
match res {
|
||||
match self
|
||||
.get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
|
||||
.await
|
||||
{
|
||||
Ok(results) => {
|
||||
for (key, res) in results {
|
||||
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
|
||||
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
|
||||
let first_slot = key_slots.next().unwrap();
|
||||
|
||||
for (slot, req_ctx) in key_slots {
|
||||
for slot in key_slots {
|
||||
let clone = match &res {
|
||||
Ok(buf) => Ok(buf.clone()),
|
||||
Err(err) => Err(match err {
|
||||
@@ -378,22 +341,17 @@ impl Timeline {
|
||||
};
|
||||
|
||||
result_slots[slot].write(clone);
|
||||
// There is no standardized way to express that the batched span followed from N request spans.
|
||||
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
||||
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
||||
req_ctx.perf_follows_from(&ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
|
||||
result_slots[first_slot].write(res);
|
||||
first_req_ctx.perf_follows_from(&ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
|
||||
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
|
||||
for (slot, req_ctx) in keys_slots.values().flatten() {
|
||||
for slot in keys_slots.values().flatten() {
|
||||
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
|
||||
// but without taking ownership of the GetVectoredError
|
||||
let err = match &err {
|
||||
@@ -425,7 +383,6 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
req_ctx.perf_follows_from(&ctx);
|
||||
result_slots[*slot].write(err);
|
||||
}
|
||||
|
||||
|
||||
@@ -219,7 +219,8 @@ pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
|
||||
pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
|
||||
pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
|
||||
// Bump this number when adding a new pageserver_runtime!
|
||||
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = NonZeroUsize::new(4).unwrap();
|
||||
// SAFETY: it's obviously correct
|
||||
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) };
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PageserverTaskId(u64);
|
||||
|
||||
@@ -3689,7 +3689,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
}
|
||||
TenantState::Active => {
|
||||
TenantState::Active { .. } => {
|
||||
return Ok(());
|
||||
}
|
||||
TenantState::Broken { reason, .. } => {
|
||||
@@ -4205,9 +4205,9 @@ impl Tenant {
|
||||
self.cancel.child_token(),
|
||||
);
|
||||
|
||||
let timeline_ctx = RequestContextBuilder::from(ctx)
|
||||
let timeline_ctx = RequestContextBuilder::extend(ctx)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.detached_child();
|
||||
.build();
|
||||
|
||||
Ok((timeline, timeline_ctx))
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
///
|
||||
/// Complexity: O(log N)
|
||||
fn add_node(&mut self, key: i128) {
|
||||
let value = match self.nodes.range(..=key).next_back() {
|
||||
let value = match self.nodes.range(..=key).last() {
|
||||
Some((_, Some(v))) => Some(v.clone()),
|
||||
Some((_, None)) => None,
|
||||
None => None,
|
||||
|
||||
@@ -58,7 +58,7 @@ use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
/// - `Attached`: has a full Tenant object, is elegible to service
|
||||
/// reads and ingest WAL.
|
||||
/// reads and ingest WAL.
|
||||
/// - `Secondary`: is only keeping a local cache warm.
|
||||
///
|
||||
/// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
|
||||
|
||||
@@ -130,7 +130,7 @@ impl IndexPart {
|
||||
/// Version history
|
||||
/// - 2: added `deleted_at`
|
||||
/// - 3: no longer deserialize `timeline_layers` (serialized format is the same, but timeline_layers
|
||||
/// is always generated from the keys of `layer_metadata`)
|
||||
/// is always generated from the keys of `layer_metadata`)
|
||||
/// - 4: timeline_layers is fully removed.
|
||||
/// - 5: lineage was added
|
||||
/// - 6: last_aux_file_policy is added.
|
||||
|
||||
@@ -13,13 +13,13 @@ pub mod merge_iterator;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::future::Future;
|
||||
use std::ops::Range;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
|
||||
use bytes::Bytes;
|
||||
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
|
||||
@@ -34,7 +34,7 @@ use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::value::Value;
|
||||
use tracing::{Instrument, info_span, trace};
|
||||
use tracing::{Instrument, trace};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
@@ -43,9 +43,7 @@ use super::PageReconstructError;
|
||||
use super::layer_map::InMemoryLayerDesc;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
@@ -876,37 +874,13 @@ impl ReadableLayer {
|
||||
) -> Result<(), GetVectoredError> {
|
||||
match self {
|
||||
ReadableLayer::PersistentLayer(layer) => {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_LAYER",
|
||||
layer = %layer
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.await
|
||||
}
|
||||
ReadableLayer::InMemoryLayer(layer) => {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_LAYER",
|
||||
layer = %layer
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -896,9 +896,9 @@ impl DeltaLayerInner {
|
||||
where
|
||||
Reader: BlockReader + Clone,
|
||||
{
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.attached_child();
|
||||
.build();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
@@ -1105,9 +1105,9 @@ impl DeltaLayerInner {
|
||||
all_keys.push(entry);
|
||||
true
|
||||
},
|
||||
&RequestContextBuilder::from(ctx)
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.attached_child(),
|
||||
.build(),
|
||||
)
|
||||
.await?;
|
||||
if let Some(last) = all_keys.last_mut() {
|
||||
|
||||
@@ -481,9 +481,9 @@ impl ImageLayerInner {
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
|
||||
.attached_child();
|
||||
.build();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
@@ -421,9 +421,9 @@ impl InMemoryLayer {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.attached_child();
|
||||
.build();
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
|
||||
@@ -3,13 +3,12 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
|
||||
use tracing::{Instrument, info_span};
|
||||
use tracing::Instrument;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -19,7 +18,7 @@ use super::delta_layer::{self};
|
||||
use super::image_layer::{self};
|
||||
use super::{
|
||||
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
|
||||
LayerVisibilityHint, PerfInstrumentFutureExt, PersistentLayerDesc, ValuesReconstructState,
|
||||
LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
|
||||
};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
@@ -325,29 +324,16 @@ impl Layer {
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let downloaded = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_LAYER",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let downloaded =
|
||||
self.0
|
||||
.get_or_maybe_download(true, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone())
|
||||
.get_or_maybe_download(true, ctx)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
|
||||
GetVectoredError::Cancelled
|
||||
}
|
||||
other => GetVectoredError::Other(anyhow::anyhow!(other)),
|
||||
})?
|
||||
};
|
||||
|
||||
})?;
|
||||
let this = ResidentLayer {
|
||||
downloaded: downloaded.clone(),
|
||||
owner: self.clone(),
|
||||
@@ -355,20 +341,9 @@ impl Layer {
|
||||
|
||||
self.record_access(ctx);
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"VISIT_LAYER",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
downloaded
|
||||
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, &ctx)
|
||||
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
GetVectoredError::Other(err) => GetVectoredError::Other(
|
||||
@@ -1070,34 +1045,15 @@ impl LayerInner {
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
let ctx = if ctx.has_perf_span() {
|
||||
let dl_ctx = RequestContextBuilder::from(ctx)
|
||||
.task_kind(TaskKind::LayerDownload)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"DOWNLOAD_LAYER",
|
||||
layer = %self,
|
||||
reason = %reason
|
||||
)
|
||||
})
|
||||
.detached_child();
|
||||
ctx.perf_follows_from(&dl_ctx);
|
||||
dl_ctx
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
let download_ctx = ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download);
|
||||
|
||||
async move {
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
let res = self
|
||||
.download_init_and_wait(timeline, permit, ctx.attached_child())
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.download_init_and_wait(timeline, permit, download_ctx)
|
||||
.await?;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
Ok(res)
|
||||
}
|
||||
@@ -1764,9 +1720,9 @@ impl DownloadedLayer {
|
||||
);
|
||||
|
||||
let res = if owner.desc.is_delta {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
|
||||
.attached_child();
|
||||
.build();
|
||||
let summary = Some(delta_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
@@ -1782,9 +1738,9 @@ impl DownloadedLayer {
|
||||
.await
|
||||
.map(LayerKind::Delta)
|
||||
} else {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::ImageLayerSummary)
|
||||
.attached_child();
|
||||
.build();
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
|
||||
@@ -119,10 +119,6 @@ async fn smoke_test() {
|
||||
let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
|
||||
assert!(matches!(e, EvictionError::NotFound));
|
||||
|
||||
let dl_ctx = RequestContextBuilder::from(ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.attached_child();
|
||||
|
||||
// on accesses when the layer is evicted, it will automatically be downloaded.
|
||||
let img_after = {
|
||||
let mut data = ValuesReconstructState::new(io_concurrency.clone());
|
||||
@@ -131,7 +127,7 @@ async fn smoke_test() {
|
||||
controlfile_keyspace.clone(),
|
||||
Lsn(0x10)..Lsn(0x11),
|
||||
&mut data,
|
||||
&dl_ctx,
|
||||
ctx,
|
||||
)
|
||||
.instrument(download_span.clone())
|
||||
.await
|
||||
@@ -181,7 +177,7 @@ async fn smoke_test() {
|
||||
|
||||
// plain downloading is rarely needed
|
||||
layer
|
||||
.download_and_keep_resident(&dl_ctx)
|
||||
.download_and_keep_resident(ctx)
|
||||
.instrument(download_span)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -649,10 +645,9 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
|
||||
let ctx = ctx.with_scope_timeline(&timeline);
|
||||
|
||||
// This test does downloads
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
let ctx = RequestContextBuilder::extend(&ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.attached_child();
|
||||
|
||||
.build();
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
let layers = timeline.layers.read().await;
|
||||
@@ -735,9 +730,9 @@ async fn evict_and_wait_does_not_wait_for_download() {
|
||||
let ctx = ctx.with_scope_timeline(&timeline);
|
||||
|
||||
// This test does downloads
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
let ctx = RequestContextBuilder::extend(&ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.attached_child();
|
||||
.build();
|
||||
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
|
||||
@@ -23,7 +23,6 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, Result, anyhow, bail, ensure};
|
||||
use arc_swap::{ArcSwap, ArcSwapOption};
|
||||
use bytes::Bytes;
|
||||
@@ -97,9 +96,7 @@ use super::{
|
||||
};
|
||||
use crate::aux_file::AuxFileSizeEstimator;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
@@ -1292,22 +1289,9 @@ impl Timeline {
|
||||
};
|
||||
reconstruct_state.read_path = read_path;
|
||||
|
||||
let traversal_res: Result<(), _> = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_IO",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
self.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
};
|
||||
|
||||
let traversal_res: Result<(), _> = self
|
||||
.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
|
||||
.await;
|
||||
if let Err(err) = traversal_res {
|
||||
// Wait for all the spawned IOs to complete.
|
||||
// See comments on `spawn_io` inside `storage_layer` for more details.
|
||||
@@ -1321,46 +1305,14 @@ impl Timeline {
|
||||
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"RECONSTRUCT",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let futs = FuturesUnordered::new();
|
||||
for (key, state) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
futs.push({
|
||||
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"RECONSTRUCT_KEY",
|
||||
key = %key,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
async move {
|
||||
assert_eq!(state.situation, ValueReconstructSituation::Complete);
|
||||
|
||||
let res = state
|
||||
.collect_pending_ios()
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"WAIT_FOR_IO_COMPLETIONS",
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let converted = match res {
|
||||
let converted = match state.collect_pending_ios().await {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
return (key, Err(err));
|
||||
@@ -1377,27 +1329,16 @@ impl Timeline {
|
||||
"{converted:?}"
|
||||
);
|
||||
|
||||
let walredo_deltas = converted.num_deltas();
|
||||
let walredo_res = walredo_self
|
||||
.reconstruct_value(key, lsn, converted)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"WALREDO",
|
||||
deltas = %walredo_deltas,
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
(key, walredo_res)
|
||||
(
|
||||
key,
|
||||
walredo_self.reconstruct_value(key, lsn, converted).await,
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let results = futs
|
||||
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await;
|
||||
|
||||
// For aux file keys (v1 or v2) the vectored read path does not return an error
|
||||
@@ -2306,7 +2247,7 @@ impl Timeline {
|
||||
.await
|
||||
.expect("holding a reference to self");
|
||||
}
|
||||
TimelineState::Active => {
|
||||
TimelineState::Active { .. } => {
|
||||
return Ok(());
|
||||
}
|
||||
TimelineState::Broken { .. } | TimelineState::Stopping => {
|
||||
@@ -3934,30 +3875,15 @@ impl Timeline {
|
||||
let TimelineVisitOutcome {
|
||||
completed_keyspace: completed,
|
||||
image_covered_keyspace,
|
||||
} = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_IO_TIMELINE",
|
||||
timeline = %timeline.timeline_id,
|
||||
lsn = %cont_lsn,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
Self::get_vectored_reconstruct_data_timeline(
|
||||
timeline,
|
||||
keyspace.clone(),
|
||||
cont_lsn,
|
||||
reconstruct_state,
|
||||
&self.cancel,
|
||||
&ctx,
|
||||
)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await?
|
||||
};
|
||||
} = Self::get_vectored_reconstruct_data_timeline(
|
||||
timeline,
|
||||
keyspace.clone(),
|
||||
cont_lsn,
|
||||
reconstruct_state,
|
||||
&self.cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
keyspace.remove_overlapping_with(&completed);
|
||||
|
||||
@@ -4001,24 +3927,8 @@ impl Timeline {
|
||||
|
||||
// Take the min to avoid reconstructing a page with data newer than request Lsn.
|
||||
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_ANCESTOR",
|
||||
timeline = %timeline.timeline_id,
|
||||
lsn = %cont_lsn,
|
||||
ancestor = %ancestor_timeline.timeline_id,
|
||||
ancestor_lsn = %timeline.ancestor_lsn
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
timeline_owned = timeline
|
||||
.get_ready_ancestor_timeline(ancestor_timeline, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
|
||||
.await?;
|
||||
timeline = &*timeline_owned;
|
||||
};
|
||||
@@ -7349,9 +7259,9 @@ mod tests {
|
||||
|
||||
eprintln!("Downloading {layer} and re-generating heatmap");
|
||||
|
||||
let ctx = &RequestContextBuilder::from(ctx)
|
||||
let ctx = &RequestContextBuilder::extend(ctx)
|
||||
.download_behavior(crate::context::DownloadBehavior::Download)
|
||||
.attached_child();
|
||||
.build();
|
||||
|
||||
let _resident = layer
|
||||
.download_and_keep_resident(ctx)
|
||||
|
||||
@@ -26,7 +26,7 @@ use once_cell::sync::Lazy;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
|
||||
use pageserver_api::key::{KEY_SIZE, Key};
|
||||
use pageserver_api::keyspace::{KeySpace, ShardedRange};
|
||||
use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
|
||||
use pageserver_api::models::CompactInfoResponse;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
use pageserver_api::value::Value;
|
||||
@@ -61,7 +61,7 @@ use crate::tenant::timeline::{
|
||||
DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
|
||||
ResidentLayer, drop_rlock,
|
||||
};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded, gc_block};
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
|
||||
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
|
||||
@@ -123,6 +123,7 @@ impl GcCompactionQueueItem {
|
||||
#[derive(Default)]
|
||||
struct GcCompactionGuardItems {
|
||||
notify: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
gc_guard: Option<gc_block::Guard>,
|
||||
permit: Option<OwnedSemaphorePermit>,
|
||||
}
|
||||
|
||||
@@ -278,7 +279,7 @@ impl GcCompactionQueue {
|
||||
gc_compaction_ratio_percent: u64,
|
||||
) -> bool {
|
||||
const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
|
||||
if l1_size + l2_size >= AUTO_TRIGGER_LIMIT {
|
||||
if l1_size >= AUTO_TRIGGER_LIMIT || l2_size >= AUTO_TRIGGER_LIMIT {
|
||||
// Do not auto-trigger when physical size >= 150GB
|
||||
return false;
|
||||
}
|
||||
@@ -318,12 +319,7 @@ impl GcCompactionQueue {
|
||||
flags
|
||||
},
|
||||
sub_compaction: true,
|
||||
// Only auto-trigger gc-compaction over the data keyspace due to concerns in
|
||||
// https://github.com/neondatabase/neon/issues/11318.
|
||||
compact_key_range: Some(CompactKeyRange {
|
||||
start: Key::MIN,
|
||||
end: Key::metadata_key_range().start,
|
||||
}),
|
||||
compact_key_range: None,
|
||||
compact_lsn_range: None,
|
||||
sub_compaction_max_job_size_mb: None,
|
||||
},
|
||||
@@ -347,45 +343,44 @@ impl GcCompactionQueue {
|
||||
info!("compaction job id={} finished", id);
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
if let Some(items) = guard.guards.remove(&id) {
|
||||
drop(items.gc_guard);
|
||||
if let Some(tx) = items.notify {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_running_job(&self) {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.running = None;
|
||||
}
|
||||
|
||||
async fn handle_sub_compaction(
|
||||
&self,
|
||||
id: GcCompactionJobId,
|
||||
options: CompactOptions,
|
||||
timeline: &Arc<Timeline>,
|
||||
gc_block: &GcBlock,
|
||||
auto: bool,
|
||||
) -> Result<(), CompactionError> {
|
||||
info!(
|
||||
"running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
|
||||
);
|
||||
let res = timeline
|
||||
let jobs = timeline
|
||||
.gc_compaction_split_jobs(
|
||||
GcCompactJob::from_compact_options(options.clone()),
|
||||
options.sub_compaction_max_job_size_mb,
|
||||
)
|
||||
.await;
|
||||
let jobs = match res {
|
||||
Ok(jobs) => jobs,
|
||||
Err(err) => {
|
||||
warn!("cannot split gc-compaction jobs: {}, unblocked gc", err);
|
||||
self.notify_and_unblock(id);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
.await?;
|
||||
if jobs.is_empty() {
|
||||
info!("no jobs to run, skipping scheduled compaction task");
|
||||
self.notify_and_unblock(id);
|
||||
} else {
|
||||
let gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let jobs_len = jobs.len();
|
||||
let mut pending_tasks = Vec::new();
|
||||
// gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
|
||||
@@ -420,6 +415,7 @@ impl GcCompactionQueue {
|
||||
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
|
||||
let mut tasks = Vec::new();
|
||||
for task in pending_tasks {
|
||||
let id = guard.next_id();
|
||||
@@ -450,18 +446,7 @@ impl GcCompactionQueue {
|
||||
if let Err(err) = &res {
|
||||
log_compaction_error(err, None, cancel.is_cancelled());
|
||||
}
|
||||
match res {
|
||||
Ok(res) => Ok(res),
|
||||
Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
|
||||
Err(_) => {
|
||||
// There are some cases where traditional gc might collect some layer
|
||||
// files causing gc-compaction cannot read the full history of the key.
|
||||
// This needs to be resolved in the long-term by improving the compaction
|
||||
// process. For now, let's simply avoid such errors triggering the
|
||||
// circuit breaker.
|
||||
Ok(CompactionOutcome::Skipped)
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
async fn iteration_inner(
|
||||
@@ -509,32 +494,27 @@ impl GcCompactionQueue {
|
||||
info!(
|
||||
"running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
|
||||
);
|
||||
self.handle_sub_compaction(id, options, timeline, auto)
|
||||
self.handle_sub_compaction(id, options, timeline, gc_block, auto)
|
||||
.await?;
|
||||
} else {
|
||||
// Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
|
||||
// in this branch.
|
||||
let _gc_guard = match gc_block.start().await {
|
||||
let gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
self.notify_and_unblock(id);
|
||||
self.clear_running_job();
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
let res = timeline.compact_with_options(cancel, options, ctx).await;
|
||||
let compaction_result = match res {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!(%err, "failed to run gc-compaction");
|
||||
self.notify_and_unblock(id);
|
||||
self.clear_running_job();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
|
||||
}
|
||||
let compaction_result =
|
||||
timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
self.notify_and_unblock(id);
|
||||
if compaction_result == CompactionOutcome::YieldForL0 {
|
||||
yield_for_l0 = true;
|
||||
}
|
||||
@@ -542,25 +522,7 @@ impl GcCompactionQueue {
|
||||
}
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => {
|
||||
// TODO: error handling, clear the queue if any task fails?
|
||||
let _gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
self.clear_running_job();
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
let res = timeline.compact_with_options(cancel, options, ctx).await;
|
||||
let compaction_result = match res {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!(%err, "failed to run gc-compaction subcompaction job");
|
||||
self.clear_running_job();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
let compaction_result = timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
if compaction_result == CompactionOutcome::YieldForL0 {
|
||||
// We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
|
||||
// again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
|
||||
@@ -591,7 +553,10 @@ impl GcCompactionQueue {
|
||||
}
|
||||
}
|
||||
}
|
||||
self.clear_running_job();
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.running = None;
|
||||
}
|
||||
Ok(if yield_for_l0 {
|
||||
tracing::info!("give up gc-compaction: yield for L0 compaction");
|
||||
CompactionOutcome::YieldForL0
|
||||
@@ -1036,9 +1001,9 @@ impl Timeline {
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::from(ctx)
|
||||
let image_ctx = RequestContextBuilder::extend(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
.attached_child();
|
||||
.build();
|
||||
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
|
||||
@@ -2,14 +2,10 @@ use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use http_utils::error::ApiError;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::DetachBehavior;
|
||||
use pageserver_api::models::detach_ancestor::AncestorDetached;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
@@ -26,10 +22,7 @@ use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Tenant;
|
||||
use crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor;
|
||||
use crate::tenant::storage_layer::layer::local_layer_path;
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc as _, DeltaLayerWriter, ImageLayerWriter, IoConcurrency, Layer, ResidentLayer,
|
||||
ValuesReconstructState,
|
||||
};
|
||||
use crate::tenant::storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer};
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -177,92 +170,6 @@ impl Attempt {
|
||||
}
|
||||
}
|
||||
|
||||
async fn generate_tombstone_image_layer(
|
||||
detached: &Arc<Timeline>,
|
||||
ancestor: &Arc<Timeline>,
|
||||
ancestor_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<ResidentLayer>, Error> {
|
||||
tracing::info!(
|
||||
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
|
||||
);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
detached.conf,
|
||||
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
// Directly use `get_vectored_impl` to skip the max_vectored_read_key limit check. Note that the keyspace should
|
||||
// not contain too many keys, otherwise this takes a lot of memory. Currently we limit it to 10k keys in the compute.
|
||||
let key_range = Key::sparse_non_inherited_keyspace();
|
||||
// avoid generating a "future layer" which will then be removed
|
||||
let image_lsn = ancestor_lsn;
|
||||
|
||||
{
|
||||
let layers = detached.layers.read().await;
|
||||
for layer in layers.all_persistent_layers() {
|
||||
if !layer.is_delta
|
||||
&& layer.lsn_range.start == image_lsn
|
||||
&& overlaps_with(&key_range, &layer.key_range)
|
||||
{
|
||||
tracing::warn!(
|
||||
layer=%layer, "image layer at the detach LSN already exists, skipping removing aux files"
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let data = ancestor
|
||||
.get_vectored_impl(
|
||||
KeySpace::single(key_range.clone()),
|
||||
image_lsn,
|
||||
&mut reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("failed to retrieve aux keys")
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
if !data.is_empty() {
|
||||
// TODO: is it possible that we can have an image at `image_lsn`? Unlikely because image layers are only generated
|
||||
// upon compaction but theoretically possible.
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
detached.conf,
|
||||
detached.timeline_id,
|
||||
detached.tenant_shard_id,
|
||||
&key_range,
|
||||
image_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("failed to create image layer writer")
|
||||
.map_err(Error::Prepare)?;
|
||||
for key in data.keys() {
|
||||
image_layer_writer
|
||||
.put_image(*key, Bytes::new(), ctx)
|
||||
.await
|
||||
.context("failed to write key")
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
}
|
||||
let (desc, path) = image_layer_writer
|
||||
.finish(ctx)
|
||||
.await
|
||||
.context("failed to finish image layer writer for removing the metadata keys")
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
let generated = Layer::finish_creating(detached.conf, detached, desc, &path)
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
detached
|
||||
.remote_client
|
||||
.upload_layer_file(&generated, &detached.cancel)
|
||||
.await
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
tracing::info!(layer=%generated, "wrote image layer");
|
||||
Ok(Some(generated))
|
||||
} else {
|
||||
tracing::info!("no aux keys found in ancestor");
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`Timeline::prepare_to_detach_from_ancestor`]
|
||||
pub(super) async fn prepare(
|
||||
detached: &Arc<Timeline>,
|
||||
@@ -445,16 +352,10 @@ pub(super) async fn prepare(
|
||||
|
||||
// TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
|
||||
let mut new_layers: Vec<Layer> =
|
||||
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len() + 1);
|
||||
|
||||
if let Some(tombstone_layer) =
|
||||
generate_tombstone_image_layer(detached, &ancestor, ancestor_lsn, ctx).await?
|
||||
{
|
||||
new_layers.push(tombstone_layer.into());
|
||||
}
|
||||
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
|
||||
|
||||
{
|
||||
tracing::info!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
|
||||
tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
|
||||
|
||||
let mut tasks = tokio::task::JoinSet::new();
|
||||
|
||||
|
||||
@@ -32,15 +32,9 @@ impl Client {
|
||||
let Some(ref base_url) = conf.import_pgdata_upcall_api else {
|
||||
anyhow::bail!("import_pgdata_upcall_api is not configured")
|
||||
};
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
Ok(Self {
|
||||
base_url: base_url.to_string(),
|
||||
client: http_client,
|
||||
client: reqwest::Client::new(),
|
||||
cancel,
|
||||
authorization_header: conf
|
||||
.import_pgdata_upcall_api_token
|
||||
|
||||
@@ -25,8 +25,8 @@ impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
|
||||
/// * `align` must be a power of two,
|
||||
///
|
||||
/// * `capacity`, when rounded up to the nearest multiple of `align`,
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
AlignedBufferMut {
|
||||
raw: RawAlignedBuffer::with_capacity(capacity),
|
||||
|
||||
@@ -37,8 +37,8 @@ impl<const A: usize> RawAlignedBuffer<ConstAlign<A>> {
|
||||
/// * `align` must be a power of two,
|
||||
///
|
||||
/// * `capacity`, when rounded up to the nearest multiple of `align`,
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
let align = ConstAlign::<A>;
|
||||
let layout = Layout::from_size_align(capacity, align.align()).expect("Invalid layout");
|
||||
|
||||
18
poetry.lock
generated
18
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohappyeyeballs"
|
||||
@@ -1286,20 +1286,24 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "4.2.0"
|
||||
version = "4.1.0"
|
||||
description = "Pure-Python HTTP/2 protocol implementation"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "h2-4.2.0-py3-none-any.whl", hash = "sha256:479a53ad425bb29af087f3458a61d30780bc818e4ebcf01f0b536ba916462ed0"},
|
||||
{file = "h2-4.2.0.tar.gz", hash = "sha256:c8a52129695e88b1a0578d8d2cc6842bbd79128ac685463b887ee278126ad01f"},
|
||||
]
|
||||
files = []
|
||||
develop = false
|
||||
|
||||
[package.dependencies]
|
||||
hpack = ">=4.1,<5"
|
||||
hyperframe = ">=6.1,<7"
|
||||
|
||||
[package.source]
|
||||
type = "git"
|
||||
url = "https://github.com/python-hyper/h2"
|
||||
reference = "HEAD"
|
||||
resolved_reference = "0b98b244b5fd1fe96100ac14905417a3b70a4286"
|
||||
|
||||
[[package]]
|
||||
name = "hpack"
|
||||
version = "4.1.0"
|
||||
@@ -3840,4 +3844,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "7ab1e7b975af34b3271b7c6018fa22a261d3f73c7c0a0403b6b2bb86b5fbd36e"
|
||||
content-hash = "fb50cb6b291169dce3188560cdb31a14af95647318f8f0f0d718131dbaf1817a"
|
||||
|
||||
@@ -41,7 +41,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
|
||||
pub(crate) const EXT_VERSION: &str = "0.3.0";
|
||||
pub(crate) const EXT_VERSION: &str = "0.2.0";
|
||||
pub(crate) const EXT_SCHEMA: &str = "auth";
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -43,7 +43,7 @@ websockets = "^12.0"
|
||||
clickhouse-connect = "^0.7.16"
|
||||
kafka-python = "^2.0.2"
|
||||
jwcrypto = "^1.5.6"
|
||||
h2 = "^4.2.0"
|
||||
h2 = {git = "https://github.com/python-hyper/h2"}
|
||||
types-jwcrypto = "^1.5.0.20240925"
|
||||
pyyaml = "^6.0.2"
|
||||
types-pyyaml = "^6.0.12.20240917"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.86.0"
|
||||
channel = "1.85.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -115,17 +115,13 @@ impl Client {
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
);
|
||||
let resp = self
|
||||
.request_maybe_body(Method::DELETE, &uri, None::<()>)
|
||||
.await?;
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> {
|
||||
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
|
||||
let resp = self
|
||||
.request_maybe_body(Method::DELETE, &uri, None::<()>)
|
||||
.await?;
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
@@ -201,16 +197,6 @@ impl Client {
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
self.request_maybe_body(method, uri, Some(body)).await
|
||||
}
|
||||
|
||||
/// Send the request and check that the status code is good, with an optional body.
|
||||
async fn request_maybe_body<B: serde::Serialize, U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: Option<B>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let res = self.request_noerror(method, uri, body).await?;
|
||||
let response = res.error_from_body().await?;
|
||||
@@ -222,15 +208,12 @@ impl Client {
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: Option<B>,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
let mut req = self.client.request(method, uri);
|
||||
if let Some(value) = &self.authorization_header {
|
||||
req = req.header(reqwest::header::AUTHORIZATION, value.get_contents())
|
||||
}
|
||||
if let Some(body) = body {
|
||||
req = req.json(&body);
|
||||
}
|
||||
req.send().await.map_err(Error::ReceiveBody)
|
||||
req.json(&body).send().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,10 +219,7 @@ struct Args {
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
#[arg(long)]
|
||||
pub ssl_ca_file: Option<Utf8PathBuf>,
|
||||
/// Flag to use https for requests to peer's safekeeper API.
|
||||
#[arg(long)]
|
||||
pub use_https_safekeeper_api: bool,
|
||||
ssl_ca_file: Option<Utf8PathBuf>,
|
||||
}
|
||||
|
||||
// Like PathBufValueParser, but allows empty string.
|
||||
@@ -402,7 +399,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
ssl_cert_file: args.ssl_cert_file,
|
||||
ssl_cert_reload_period: args.ssl_cert_reload_period,
|
||||
ssl_ca_certs,
|
||||
use_https_safekeeper_api: args.use_https_safekeeper_api,
|
||||
});
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
|
||||
@@ -16,9 +16,9 @@ use http_utils::{RequestExt, RouterBuilder};
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::{
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
|
||||
TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
|
||||
TimelineStatus, TimelineTermBumpRequest,
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry,
|
||||
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus,
|
||||
TimelineTermBumpRequest,
|
||||
};
|
||||
use safekeeper_api::{ServerInfo, membership, models};
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
|
||||
@@ -83,11 +83,13 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
.delete_all_for_tenant(&tenant_id, action)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let response_body: TenantDeleteResult = delete_info
|
||||
.iter()
|
||||
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteResult>>();
|
||||
json_response(StatusCode::OK, response_body)
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
delete_info
|
||||
.iter()
|
||||
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteResult>>(),
|
||||
)
|
||||
}
|
||||
|
||||
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
@@ -536,7 +538,6 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
https_connstr: sk_info.https_connstr,
|
||||
backup_lsn: sk_info.backup_lsn.0,
|
||||
local_start_lsn: sk_info.local_start_lsn.0,
|
||||
availability_zone: None,
|
||||
|
||||
@@ -121,7 +121,6 @@ pub struct SafeKeeperConf {
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
pub use_https_safekeeper_api: bool,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -171,7 +170,6 @@ impl SafeKeeperConf {
|
||||
ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
|
||||
ssl_cert_reload_period: Duration::from_secs(60),
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,10 +94,10 @@ impl WalReceivers {
|
||||
|
||||
/// Get reference to locked slot contents. Slot must exist (registered
|
||||
/// earlier).
|
||||
fn get_slot(
|
||||
self: &Arc<WalReceivers>,
|
||||
fn get_slot<'a>(
|
||||
self: &'a Arc<WalReceivers>,
|
||||
id: WalReceiverId,
|
||||
) -> MappedMutexGuard<'_, WalReceiverState> {
|
||||
) -> MappedMutexGuard<'a, WalReceiverState> {
|
||||
MutexGuard::map(self.mutex.lock(), |locked| {
|
||||
locked.slots[id]
|
||||
.as_mut()
|
||||
|
||||
@@ -176,7 +176,6 @@ pub struct Donor {
|
||||
pub flush_lsn: Lsn,
|
||||
pub pg_connstr: String,
|
||||
pub http_connstr: String,
|
||||
pub https_connstr: Option<String>,
|
||||
}
|
||||
|
||||
impl From<&PeerInfo> for Donor {
|
||||
@@ -187,7 +186,6 @@ impl From<&PeerInfo> for Donor {
|
||||
flush_lsn: p.flush_lsn,
|
||||
pg_connstr: p.pg_connstr.clone(),
|
||||
http_connstr: p.http_connstr.clone(),
|
||||
https_connstr: p.https_connstr.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -238,33 +236,11 @@ async fn recover(
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// Learn donor term switch history to figure out starting point.
|
||||
|
||||
let mut client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let client = client
|
||||
.build()
|
||||
.context("Failed to build http client for recover")?;
|
||||
|
||||
let url = if conf.use_https_safekeeper_api {
|
||||
if let Some(https_connstr) = donor.https_connstr.as_ref() {
|
||||
format!("https://{https_connstr}")
|
||||
} else {
|
||||
anyhow::bail!(
|
||||
"cannot recover from donor {}: \
|
||||
https is enabled, but https_connstr is not specified",
|
||||
donor.sk_id
|
||||
);
|
||||
}
|
||||
} else {
|
||||
format!("http://{}", donor.http_connstr)
|
||||
};
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let timeline_info: TimelineStatus = client
|
||||
.get(format!(
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
url, tli.ttid.tenant_id, tli.ttid.timeline_id
|
||||
"http://{}/v1/tenant/{}/timeline/{}",
|
||||
donor.http_connstr, tli.ttid.tenant_id, tli.ttid.timeline_id
|
||||
))
|
||||
.send()
|
||||
.await?
|
||||
|
||||
@@ -50,7 +50,6 @@ fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> Peer
|
||||
local_start_lsn: Lsn(sk_info.local_start_lsn),
|
||||
pg_connstr: sk_info.safekeeper_connstr.clone(),
|
||||
http_connstr: sk_info.http_connstr.clone(),
|
||||
https_connstr: sk_info.https_connstr.clone(),
|
||||
ts,
|
||||
}
|
||||
}
|
||||
@@ -364,7 +363,6 @@ impl SharedState {
|
||||
.to_owned()
|
||||
.unwrap_or(conf.listen_pg_addr.clone()),
|
||||
http_connstr: conf.listen_http_addr.to_owned(),
|
||||
https_connstr: conf.listen_https_addr.to_owned(),
|
||||
backup_lsn: self.sk.state().inmem.backup_lsn.0,
|
||||
local_start_lsn: self.sk.state().local_start_lsn.0,
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
@@ -701,7 +699,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Take a writing mutual exclusive lock on timeline shared_state.
|
||||
pub async fn write_shared_state(self: &Arc<Self>) -> WriteGuardSharedState<'_> {
|
||||
pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
|
||||
WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
|
||||
}
|
||||
|
||||
|
||||
@@ -116,7 +116,7 @@ fn test_many_tx() -> anyhow::Result<()> {
|
||||
}
|
||||
None
|
||||
})
|
||||
.next_back()
|
||||
.last()
|
||||
.unwrap();
|
||||
|
||||
let initdb_lsn = 21623024;
|
||||
|
||||
@@ -184,7 +184,6 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
ssl_cert_file: Utf8PathBuf::from(""),
|
||||
ssl_cert_reload_period: Duration::ZERO,
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
};
|
||||
|
||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||
|
||||
@@ -141,7 +141,6 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "zenith-1-sk-1.local:7677".to_owned(),
|
||||
https_connstr: Some("zenith-1-sk-1.local:7678".to_owned()),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
standby_horizon: 0,
|
||||
|
||||
@@ -45,10 +45,8 @@ message SafekeeperTimelineInfo {
|
||||
uint64 standby_horizon = 14;
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
// HTTP endpoint connection string.
|
||||
// HTTP endpoint connection string
|
||||
string http_connstr = 13;
|
||||
// HTTPS endpoint connection string.
|
||||
optional string https_connstr = 15;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 11;
|
||||
}
|
||||
|
||||
@@ -764,7 +764,6 @@ mod tests {
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
https_connstr: Some("neon-1-sk-1.local:7678".to_owned()),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
standby_horizon: 0,
|
||||
|
||||
@@ -10,11 +10,13 @@ pub struct Client {
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(http_client: reqwest::Client, base_url: Url, jwt_token: Option<String>) -> Self {
|
||||
pub fn new(base_url: Url, jwt_token: Option<String>) -> Self {
|
||||
Self {
|
||||
base_url,
|
||||
jwt_token,
|
||||
client: http_client,
|
||||
client: reqwest::ClientBuilder::new()
|
||||
.build()
|
||||
.expect("Failed to construct http client"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ use std::error::Error as _;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use futures::StreamExt;
|
||||
@@ -365,28 +364,25 @@ pub(crate) struct ShardUpdate<'a> {
|
||||
}
|
||||
|
||||
impl ComputeHook {
|
||||
pub(super) fn new(config: Config) -> anyhow::Result<Self> {
|
||||
pub(super) fn new(config: Config) -> Self {
|
||||
let authorization_header = config
|
||||
.control_plane_jwt_token
|
||||
.clone()
|
||||
.map(|jwt| format!("Bearer {}", jwt));
|
||||
|
||||
let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
|
||||
for cert in &config.ssl_ca_certs {
|
||||
client = client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let client = client
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.timeout(NOTIFY_REQUEST_TIMEOUT)
|
||||
.build()
|
||||
.context("Failed to build http client for compute hook")?;
|
||||
.expect("Failed to construct HTTP client");
|
||||
|
||||
Ok(Self {
|
||||
Self {
|
||||
state: Default::default(),
|
||||
config,
|
||||
authorization_header,
|
||||
neon_local_lock: Default::default(),
|
||||
api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
|
||||
client,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// For test environments: use neon_local's LocalEnv to update compute
|
||||
|
||||
@@ -12,7 +12,6 @@ use safekeeper_api::models::SafekeeperUtilization;
|
||||
use safekeeper_client::mgmt_api;
|
||||
use thiserror::Error;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use utils::id::NodeId;
|
||||
use utils::logging::SecretString;
|
||||
|
||||
@@ -228,7 +227,6 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
|
||||
|
||||
Some((*node_id, status))
|
||||
}
|
||||
.instrument(tracing::info_span!("heartbeat_ps", %node_id))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -255,7 +253,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
|
||||
PageserverState::WarmingUp { .. } => {
|
||||
warming_up += 1;
|
||||
}
|
||||
PageserverState::Offline => offline += 1,
|
||||
PageserverState::Offline { .. } => offline += 1,
|
||||
PageserverState::Available { .. } => {}
|
||||
}
|
||||
}
|
||||
@@ -371,7 +369,6 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
|
||||
|
||||
Some((*node_id, status))
|
||||
}
|
||||
.instrument(tracing::info_span!("heartbeat_sk", %node_id))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -394,7 +391,7 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
|
||||
let mut offline = 0;
|
||||
for state in new_state.values() {
|
||||
match state {
|
||||
SafekeeperState::Offline => offline += 1,
|
||||
SafekeeperState::Offline { .. } => offline += 1,
|
||||
SafekeeperState::Available { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1733,9 +1733,9 @@ async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
|
||||
};
|
||||
|
||||
if *self_addr == leader_addr {
|
||||
return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
|
||||
"Leader is stepped down instance".into(),
|
||||
)));
|
||||
return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Leader is stepped down instance"
|
||||
))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1744,17 +1744,19 @@ async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
|
||||
// Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and
|
||||
// include some leeway to get the timeout for proxied requests.
|
||||
const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10);
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.timeout(PROXIED_REQUEST_TIMEOUT)
|
||||
.build();
|
||||
let client = match client {
|
||||
Ok(client) => client,
|
||||
Err(err) => {
|
||||
return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Failed to build leader client for forwarding while in stepped down state: {err}"
|
||||
))));
|
||||
}
|
||||
};
|
||||
|
||||
let client = state.service.get_http_client().clone();
|
||||
|
||||
let request: reqwest::Request = match convert_request(
|
||||
req,
|
||||
&client,
|
||||
leader.address,
|
||||
PROXIED_REQUEST_TIMEOUT,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let request: reqwest::Request = match convert_request(req, &client, leader.address).await {
|
||||
Ok(r) => r,
|
||||
Err(err) => {
|
||||
return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -1812,7 +1814,6 @@ async fn convert_request(
|
||||
req: hyper::Request<Body>,
|
||||
client: &reqwest::Client,
|
||||
to_address: String,
|
||||
timeout: Duration,
|
||||
) -> Result<reqwest::Request, ApiError> {
|
||||
use std::str::FromStr;
|
||||
|
||||
@@ -1867,7 +1868,6 @@ async fn convert_request(
|
||||
.request(method, uri)
|
||||
.headers(headers)
|
||||
.body(body)
|
||||
.timeout(timeout)
|
||||
.build()
|
||||
.map_err(|err| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
|
||||
|
||||
@@ -110,20 +110,7 @@ impl Leadership {
|
||||
) -> Option<GlobalObservedState> {
|
||||
tracing::info!("Sending step down request to {leader:?}");
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in &self.config.ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let http_client = match http_client.build() {
|
||||
Ok(http_client) => http_client,
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to build client for leader step-down request: {err}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let client = PeerClient::new(
|
||||
http_client,
|
||||
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
|
||||
self.config.peer_jwt_token.clone(),
|
||||
);
|
||||
|
||||
@@ -283,8 +283,10 @@ impl Secrets {
|
||||
fn load_secret(cli: &Option<String>, env_name: &str) -> Option<String> {
|
||||
if let Some(v) = cli {
|
||||
Some(v.clone())
|
||||
} else if let Ok(v) = std::env::var(env_name) {
|
||||
Some(v)
|
||||
} else {
|
||||
std::env::var(env_name).ok()
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,11 +59,11 @@ impl ResponseErrorMessageExt for reqwest::Response {
|
||||
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
|
||||
|
||||
impl PeerClient {
|
||||
pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option<String>) -> Self {
|
||||
pub(crate) fn new(uri: Uri, jwt: Option<String>) -> Self {
|
||||
Self {
|
||||
uri,
|
||||
jwt,
|
||||
client: http_client,
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1524,14 +1524,25 @@ impl Persistence {
|
||||
/// Load pending operations from db.
|
||||
pub(crate) async fn list_pending_ops(
|
||||
&self,
|
||||
filter_for_sk: Option<NodeId>,
|
||||
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
|
||||
use crate::schema::safekeeper_timeline_pending_ops::dsl;
|
||||
|
||||
const FILTER_VAL_1: i64 = 1;
|
||||
const FILTER_VAL_2: i64 = 2;
|
||||
let filter_opt = filter_for_sk.map(|id| id.0 as i64);
|
||||
let timeline_from_db = self
|
||||
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
|
||||
Box::pin(async move {
|
||||
let from_db: Vec<TimelinePendingOpPersistence> =
|
||||
dsl::safekeeper_timeline_pending_ops.load(conn).await?;
|
||||
dsl::safekeeper_timeline_pending_ops
|
||||
.filter(
|
||||
dsl::sk_id
|
||||
.eq(filter_opt.unwrap_or(FILTER_VAL_1))
|
||||
.and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))),
|
||||
)
|
||||
.load(conn)
|
||||
.await?;
|
||||
Ok(from_db)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -686,8 +686,6 @@ impl Reconciler {
|
||||
.await?,
|
||||
);
|
||||
|
||||
pausable_failpoint!("reconciler-live-migrate-post-generation-inc");
|
||||
|
||||
let dest_conf = build_location_config(
|
||||
&self.shard,
|
||||
&self.config,
|
||||
@@ -762,9 +760,7 @@ impl Reconciler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns true if the observed state of the attached location was refreshed
|
||||
/// and false otherwise.
|
||||
async fn maybe_refresh_observed(&mut self) -> Result<bool, ReconcileError> {
|
||||
async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> {
|
||||
// If the attached node has uncertain state, read it from the pageserver before proceeding: this
|
||||
// is important to avoid spurious generation increments.
|
||||
//
|
||||
@@ -774,7 +770,7 @@ impl Reconciler {
|
||||
|
||||
let Some(attached_node) = self.intent.attached.as_ref() else {
|
||||
// Nothing to do
|
||||
return Ok(false);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if matches!(
|
||||
@@ -819,7 +815,7 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reconciling a tenant makes API calls to pageservers until the observed state
|
||||
@@ -835,7 +831,7 @@ impl Reconciler {
|
||||
/// state where it still requires later reconciliation.
|
||||
pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
|
||||
// Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
|
||||
let refreshed = self.maybe_refresh_observed().await?;
|
||||
self.maybe_refresh_observed().await?;
|
||||
|
||||
// Special case: live migration
|
||||
self.maybe_live_migrate().await?;
|
||||
@@ -859,14 +855,8 @@ impl Reconciler {
|
||||
);
|
||||
match self.observed.locations.get(&node.get_id()) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
if refreshed {
|
||||
tracing::info!(
|
||||
node_id=%node.get_id(), "Observed configuration correct after refresh. Notifying compute.");
|
||||
self.compute_notify().await?;
|
||||
} else {
|
||||
// Nothing to do
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.");
|
||||
}
|
||||
// Nothing to do
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
|
||||
}
|
||||
observed => {
|
||||
// In all cases other than a matching observed configuration, we will
|
||||
|
||||
@@ -101,7 +101,7 @@ impl SafekeeperClient {
|
||||
pub(crate) async fn delete_tenant(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<models::TenantDeleteResult> {
|
||||
) -> Result<models::TimelineDeleteResult> {
|
||||
measured_request!(
|
||||
"delete_tenant",
|
||||
crate::metrics::Method::Delete,
|
||||
|
||||
@@ -1711,7 +1711,7 @@ impl Service {
|
||||
))),
|
||||
config: config.clone(),
|
||||
persistence,
|
||||
compute_hook: Arc::new(ComputeHook::new(config.clone())?),
|
||||
compute_hook: Arc::new(ComputeHook::new(config.clone())),
|
||||
result_tx,
|
||||
heartbeater_ps,
|
||||
heartbeater_sk,
|
||||
|
||||
@@ -35,10 +35,6 @@ impl SafekeeperReconcilers {
|
||||
service: &Arc<Service>,
|
||||
reqs: Vec<ScheduleRequest>,
|
||||
) {
|
||||
tracing::info!(
|
||||
"Scheduling {} pending safekeeper ops loaded from db",
|
||||
reqs.len()
|
||||
);
|
||||
for req in reqs {
|
||||
self.schedule_request(service, req);
|
||||
}
|
||||
@@ -78,7 +74,7 @@ pub(crate) async fn load_schedule_requests(
|
||||
service: &Arc<Service>,
|
||||
safekeepers: &HashMap<NodeId, Safekeeper>,
|
||||
) -> anyhow::Result<Vec<ScheduleRequest>> {
|
||||
let pending_ops = service.persistence.list_pending_ops().await?;
|
||||
let pending_ops = service.persistence.list_pending_ops(None).await?;
|
||||
let mut res = Vec::with_capacity(pending_ops.len());
|
||||
for op_persist in pending_ops {
|
||||
let node_id = NodeId(op_persist.sk_id as u64);
|
||||
@@ -236,14 +232,12 @@ impl SafekeeperReconciler {
|
||||
let kind = req.kind;
|
||||
let tenant_id = req.tenant_id;
|
||||
let timeline_id = req.timeline_id;
|
||||
let node_id = req.safekeeper.skp.id;
|
||||
self.reconcile_one(req, req_cancel)
|
||||
.instrument(tracing::info_span!(
|
||||
"reconcile_one",
|
||||
?kind,
|
||||
%tenant_id,
|
||||
?timeline_id,
|
||||
%node_id,
|
||||
?timeline_id
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -622,7 +622,7 @@ impl TenantShard {
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
attached_locs.sort_by_key(|i| i.1);
|
||||
if let Some((node_id, _gen)) = attached_locs.into_iter().next_back() {
|
||||
if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
|
||||
self.intent.set_attached(scheduler, Some(*node_id));
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ enum LargeObjectKind {
|
||||
|
||||
impl LargeObjectKind {
|
||||
fn from_key(key: &str) -> Self {
|
||||
let fname = key.split('/').next_back().unwrap();
|
||||
let fname = key.split('/').last().unwrap();
|
||||
|
||||
let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
|
||||
return LargeObjectKind::Other;
|
||||
|
||||
@@ -295,8 +295,8 @@ pub struct ControllerClientConfig {
|
||||
}
|
||||
|
||||
impl ControllerClientConfig {
|
||||
pub fn build_client(self, http_client: reqwest::Client) -> control_api::Client {
|
||||
control_api::Client::new(http_client, self.controller_api, Some(self.controller_jwt))
|
||||
pub fn build_client(self) -> control_api::Client {
|
||||
control_api::Client::new(self.controller_api, Some(self.controller_jwt))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ use camino::Utf8PathBuf;
|
||||
use clap::{Parser, Subcommand};
|
||||
use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use reqwest::{Certificate, Method, Url};
|
||||
use reqwest::{Method, Url};
|
||||
use storage_controller_client::control_api;
|
||||
use storage_scrubber::garbage::{PurgeMode, find_garbage, purge_garbage};
|
||||
use storage_scrubber::pageserver_physical_gc::{GcMode, pageserver_physical_gc};
|
||||
@@ -41,10 +41,6 @@ struct Cli {
|
||||
/// If set to true, the scrubber will exit with error code on fatal error.
|
||||
#[arg(long, default_value_t = false)]
|
||||
exit_code: bool,
|
||||
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
#[arg(long)]
|
||||
ssl_ca_file: Option<Utf8PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
@@ -150,28 +146,13 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
tracing::info!("version: {}, build_tag {}", GIT_VERSION, BUILD_TAG);
|
||||
|
||||
let ssl_ca_certs = match cli.ssl_ca_file.as_ref() {
|
||||
Some(ssl_ca_file) => {
|
||||
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
Certificate::from_pem_bundle(&buf)?
|
||||
}
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(cert);
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
let controller_client = cli.controller_api.map(|controller_api| {
|
||||
ControllerClientConfig {
|
||||
controller_api,
|
||||
// Default to no key: this is a convenience when working in a development environment
|
||||
controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
|
||||
}
|
||||
.build_client(http_client)
|
||||
.build_client()
|
||||
});
|
||||
|
||||
match cli.command {
|
||||
|
||||
@@ -376,28 +376,6 @@ class PageserverWalReceiverProtocol(StrEnum):
|
||||
raise ValueError(f"Unknown protocol type: {proto}")
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageserverTracingConfig:
|
||||
sampling_ratio: tuple[int, int]
|
||||
endpoint: str
|
||||
protocol: str
|
||||
timeout: str
|
||||
|
||||
def to_config_key_value(self) -> tuple[str, dict[str, Any]]:
|
||||
value = {
|
||||
"sampling_ratio": {
|
||||
"numerator": self.sampling_ratio[0],
|
||||
"denominator": self.sampling_ratio[1],
|
||||
},
|
||||
"export_config": {
|
||||
"endpoint": self.endpoint,
|
||||
"protocol": self.protocol,
|
||||
"timeout": self.timeout,
|
||||
},
|
||||
}
|
||||
return ("tracing", value)
|
||||
|
||||
|
||||
class NeonEnvBuilder:
|
||||
"""
|
||||
Builder object to create a Neon runtime environment
|
||||
@@ -447,7 +425,6 @@ class NeonEnvBuilder:
|
||||
pageserver_virtual_file_io_mode: str | None = None,
|
||||
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
|
||||
pageserver_get_vectored_concurrent_io: str | None = None,
|
||||
pageserver_tracing_config: PageserverTracingConfig | None = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -501,8 +478,6 @@ class NeonEnvBuilder:
|
||||
pageserver_get_vectored_concurrent_io
|
||||
)
|
||||
|
||||
self.pageserver_tracing_config = pageserver_tracing_config
|
||||
|
||||
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
|
||||
pageserver_default_tenant_config_compaction_algorithm
|
||||
)
|
||||
@@ -1163,7 +1138,6 @@ class NeonEnv:
|
||||
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
|
||||
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
|
||||
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
|
||||
self.pageserver_tracing_config = config.pageserver_tracing_config
|
||||
|
||||
# Create the neon_local's `NeonLocalInitConf`
|
||||
cfg: dict[str, Any] = {
|
||||
@@ -1288,14 +1262,6 @@ class NeonEnv:
|
||||
if key not in ps_cfg:
|
||||
ps_cfg[key] = value
|
||||
|
||||
if self.pageserver_tracing_config is not None:
|
||||
key, value = self.pageserver_tracing_config.to_config_key_value()
|
||||
|
||||
if key not in ps_cfg:
|
||||
ps_cfg[key] = value
|
||||
|
||||
ps_cfg[key] = value
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageservers.append(
|
||||
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])
|
||||
@@ -1318,7 +1284,6 @@ class NeonEnv:
|
||||
"http_port": port.http,
|
||||
"https_port": port.https,
|
||||
"sync": config.safekeepers_enable_fsync,
|
||||
"use_https_safekeeper_api": config.use_https_safekeeper_api,
|
||||
}
|
||||
if config.auth_enabled:
|
||||
sk_cfg["auth_enabled"] = True
|
||||
|
||||
@@ -110,7 +110,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
".*delaying layer flush by \\S+ for compaction backpressure.*",
|
||||
".*stalling layer flushes for compaction backpressure.*",
|
||||
".*layer roll waiting for flush due to compaction backpressure.*",
|
||||
".*BatchSpanProcessor.*",
|
||||
)
|
||||
|
||||
|
||||
@@ -119,7 +118,7 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
|
||||
# failing to connect to them.
|
||||
".*Call to node.*management API.*failed.*receive body.*",
|
||||
".*Call to node.*management API.*failed.*ReceiveBody.*",
|
||||
".*Call to node.*management API.*failed.*Timeout.*",
|
||||
".*Call to node.*management API still failed after .+ retries, giving up.*",
|
||||
".*Failed to update node .+ after heartbeat round.*error sending request for url.*",
|
||||
".*background_reconcile: failed to fetch top tenants:.*client error \\(Connect\\).*",
|
||||
# Many tests will start up with a node offline
|
||||
|
||||
@@ -1192,28 +1192,3 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
log.info(f"Got perf info response code: {res.status_code}")
|
||||
self.verbose_error(res)
|
||||
return res.json()
|
||||
|
||||
def ingest_aux_files(
|
||||
self,
|
||||
tenant_id: TenantId | TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
aux_files: dict[str, bytes],
|
||||
):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/ingest_aux_files",
|
||||
json={
|
||||
"aux_files": aux_files,
|
||||
},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
return res.json()
|
||||
|
||||
def list_aux_files(
|
||||
self, tenant_id: TenantId | TenantShardId, timeline_id: TimelineId, lsn: Lsn
|
||||
) -> Any:
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/list_aux_files",
|
||||
json={"lsn": str(lsn)},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
return res.json()
|
||||
|
||||
@@ -10,7 +10,6 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PageserverTracingConfig,
|
||||
PgBin,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
@@ -112,15 +111,6 @@ def setup_and_run_pagebench_benchmark(
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
|
||||
)
|
||||
|
||||
tracing_config = PageserverTracingConfig(
|
||||
sampling_ratio=(0, 1000),
|
||||
endpoint="http://localhost:4318/v1/traces",
|
||||
protocol="http-binary",
|
||||
timeout="10s",
|
||||
)
|
||||
neon_env_builder.pageserver_tracing_config = tracing_config
|
||||
ratio = tracing_config.sampling_ratio[0] / tracing_config.sampling_ratio[1]
|
||||
params.update(
|
||||
{
|
||||
"pageserver_config_override.page_cache_size": (
|
||||
@@ -128,7 +118,6 @@ def setup_and_run_pagebench_benchmark(
|
||||
{"unit": "byte"},
|
||||
),
|
||||
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
|
||||
"pageserver_config_override.sampling_ratio": (ratio, {"unit": ""}),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import traceback
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport
|
||||
from fixtures.common_types import Lsn
|
||||
@@ -25,11 +26,7 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
# Granularity of ~0.5 sec
|
||||
def measure_replication_lag(
|
||||
master: psycopg2.extensions.cursor,
|
||||
replica: psycopg2.extensions.cursor,
|
||||
timeout_sec: int = 600,
|
||||
):
|
||||
def measure_replication_lag(master, replica, timeout_sec=600):
|
||||
start = time.time()
|
||||
master.execute("SELECT pg_current_wal_flush_lsn()")
|
||||
master_lsn = Lsn(master.fetchall()[0][0])
|
||||
@@ -43,7 +40,7 @@ def measure_replication_lag(
|
||||
raise TimeoutError(f"Replication sync took more than {timeout_sec} sec")
|
||||
|
||||
|
||||
def check_pgbench_still_running(pgbench: subprocess.Popen[str]):
|
||||
def check_pgbench_still_running(pgbench):
|
||||
rc = pgbench.poll()
|
||||
if rc is not None:
|
||||
raise RuntimeError(f"Pgbench terminated early with return code {rc}")
|
||||
@@ -64,8 +61,6 @@ def test_ro_replica_lag(
|
||||
|
||||
project = neon_api.create_project(pg_version)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: {}", project_id)
|
||||
log.info("Primary endpoint ID: {}", project["project"]["endpoints"][0]["id"])
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
error_occurred = False
|
||||
try:
|
||||
@@ -81,7 +76,6 @@ def test_ro_replica_lag(
|
||||
endpoint_type="read_only",
|
||||
settings={"pg_settings": {"hot_standby_feedback": "on"}},
|
||||
)
|
||||
log.info("Replica endpoint ID: {}", replica["endpoint"]["id"])
|
||||
replica_env = master_env.copy()
|
||||
replica_env["PGHOST"] = replica["endpoint"]["host"]
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
@@ -197,8 +191,6 @@ def test_replication_start_stop(
|
||||
|
||||
project = neon_api.create_project(pg_version)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: {}", project_id)
|
||||
log.info("Primary endpoint ID: {}", project["project"]["endpoints"][0]["id"])
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
try:
|
||||
branch_id = project["branch"]["id"]
|
||||
@@ -208,15 +200,15 @@ def test_replication_start_stop(
|
||||
)
|
||||
|
||||
replicas = []
|
||||
for i in range(num_replicas):
|
||||
replica = neon_api.create_endpoint(
|
||||
project_id,
|
||||
branch_id,
|
||||
endpoint_type="read_only",
|
||||
settings={"pg_settings": {"hot_standby_feedback": "on"}},
|
||||
for _ in range(num_replicas):
|
||||
replicas.append(
|
||||
neon_api.create_endpoint(
|
||||
project_id,
|
||||
branch_id,
|
||||
endpoint_type="read_only",
|
||||
settings={"pg_settings": {"hot_standby_feedback": "on"}},
|
||||
)
|
||||
)
|
||||
log.info("Replica {} endpoint ID: {}", i + 1, replica["endpoint"]["id"])
|
||||
replicas.append(replica)
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
|
||||
replica_connstr = [
|
||||
|
||||
@@ -249,7 +249,6 @@ def test_forward_compatibility(
|
||||
top_output_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
compatibility_snapshot_dir: Path,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
):
|
||||
"""
|
||||
Test that the old binaries can read new data
|
||||
@@ -258,7 +257,6 @@ def test_forward_compatibility(
|
||||
os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
|
||||
)
|
||||
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
neon_env_builder.test_may_use_compatibility_snapshot_binaries = True
|
||||
|
||||
try:
|
||||
|
||||
@@ -4073,101 +4073,6 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB
|
||||
assert reconciles_after_restart == 0
|
||||
|
||||
|
||||
@run_only_on_default_postgres("PG version is not interesting here")
|
||||
@pytest.mark.parametrize("restart_storcon", [True, False])
|
||||
def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart_storcon: bool):
|
||||
"""
|
||||
Test that the storcon can create and delete tenants and timelines with a safekeeper being down.
|
||||
- restart_storcon: tests whether the pending ops are persisted.
|
||||
if we don't restart, we test that we don't require it to come from the db.
|
||||
"""
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.safekeepers[0].stop()
|
||||
|
||||
# Wait for heartbeater to pick up that the safekeeper is gone
|
||||
# This isn't really neccessary
|
||||
def logged_offline():
|
||||
env.storage_controller.assert_log_contains(
|
||||
"Heartbeat round complete for 3 safekeepers, 1 offline"
|
||||
)
|
||||
|
||||
wait_until(logged_offline)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.create_tenant(tenant_id, timeline_id)
|
||||
|
||||
env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
|
||||
env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*Call to safekeeper.* management API still failed after.*",
|
||||
".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API still failed after.*",
|
||||
]
|
||||
)
|
||||
|
||||
if restart_storcon:
|
||||
# Restart the storcon to check that we persist operations
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
env.storage_controller.assert_log_contains("writing pending op for sk id 1")
|
||||
env.safekeepers[0].start()
|
||||
|
||||
# ensure that we applied the operation also for the safekeeper we just brought down
|
||||
def logged_contains_on_sk():
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"pulling timeline {tenant_id}/{timeline_id} from safekeeper"
|
||||
)
|
||||
|
||||
wait_until(logged_contains_on_sk)
|
||||
|
||||
env.safekeepers[1].stop()
|
||||
|
||||
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
|
||||
|
||||
# ensure the safekeeper deleted the timeline
|
||||
def timeline_deleted_on_active_sks():
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{timeline_id} from disk"
|
||||
)
|
||||
env.safekeepers[2].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{timeline_id} from disk"
|
||||
)
|
||||
|
||||
wait_until(timeline_deleted_on_active_sks)
|
||||
|
||||
if restart_storcon:
|
||||
# Restart the storcon to check that we persist operations
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
env.safekeepers[1].start()
|
||||
|
||||
# ensure that there is log msgs for the third safekeeper too
|
||||
def timeline_deleted_on_sk():
|
||||
env.safekeepers[1].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{timeline_id} from disk"
|
||||
)
|
||||
|
||||
wait_until(timeline_deleted_on_sk)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("wrong_az", [True, False])
|
||||
def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool):
|
||||
"""
|
||||
@@ -4271,121 +4176,3 @@ def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder,
|
||||
)
|
||||
else:
|
||||
assert initial_ps.http_client().tenant_list_locations()["tenant_shards"] == []
|
||||
|
||||
|
||||
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
|
||||
def test_storage_controller_migrate_with_pageserver_restart(
|
||||
neon_env_builder: NeonEnvBuilder, make_httpserver
|
||||
):
|
||||
"""
|
||||
Test that live migrations which fail right after incrementing the generation
|
||||
due to the destination going offline eventually send a compute notification
|
||||
after the destination re-attaches.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
# Disable transitions to offline
|
||||
"max_offline": "600s",
|
||||
"use_local_compute_notifications": False,
|
||||
}
|
||||
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{make_httpserver.host}:{make_httpserver.port}/"
|
||||
)
|
||||
|
||||
notifications = []
|
||||
|
||||
def notify(request: Request):
|
||||
log.info(f"Received notify-attach: {request}")
|
||||
notifications.append(request.json)
|
||||
|
||||
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(notify)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*Call to node.*management API failed.*",
|
||||
".*Call to node.*management API still failed.*",
|
||||
".*Reconcile error.*",
|
||||
".*request.*PUT.*migrate.*",
|
||||
]
|
||||
)
|
||||
|
||||
env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}})
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
initial_desc = env.storage_controller.tenant_describe(env.initial_tenant)["shards"][0]
|
||||
log.info(f"{initial_desc=}")
|
||||
primary = env.get_pageserver(initial_desc["node_attached"])
|
||||
secondary = env.get_pageserver(initial_desc["node_secondary"][0])
|
||||
|
||||
# Pause the migration after incrementing the generation in the database
|
||||
env.storage_controller.configure_failpoints(
|
||||
("reconciler-live-migrate-post-generation-inc", "pause")
|
||||
)
|
||||
|
||||
tenant_shard_id = TenantShardId(env.initial_tenant, 0, 0)
|
||||
|
||||
try:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||
migrate_fut = executor.submit(
|
||||
env.storage_controller.tenant_shard_migrate,
|
||||
tenant_shard_id,
|
||||
secondary.id,
|
||||
config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True),
|
||||
)
|
||||
|
||||
def has_hit_migration_failpoint():
|
||||
expr = "at failpoint reconciler-live-migrate-post-generation-inc"
|
||||
log.info(expr)
|
||||
assert env.storage_controller.log_contains(expr)
|
||||
|
||||
wait_until(has_hit_migration_failpoint)
|
||||
|
||||
secondary.stop()
|
||||
|
||||
# Eventually migration completes
|
||||
env.storage_controller.configure_failpoints(
|
||||
("reconciler-live-migrate-post-generation-inc", "off")
|
||||
)
|
||||
try:
|
||||
migrate_fut.result()
|
||||
except StorageControllerApiException as err:
|
||||
log.info(f"Migration failed: {err}")
|
||||
except:
|
||||
env.storage_controller.configure_failpoints(
|
||||
("reconciler-live-migrate-post-generation-inc", "off")
|
||||
)
|
||||
raise
|
||||
|
||||
def process_migration_result():
|
||||
dump = env.storage_controller.tenant_shard_dump()
|
||||
observed = dump[0]["observed"]["locations"]
|
||||
|
||||
log.info(f"{observed=} primary={primary.id} secondary={secondary.id}")
|
||||
|
||||
assert observed[str(primary.id)]["conf"]["mode"] == "AttachedStale"
|
||||
assert observed[str(secondary.id)]["conf"] is None
|
||||
|
||||
wait_until(process_migration_result)
|
||||
|
||||
# Start and wait for re-attach to be processed
|
||||
secondary.start()
|
||||
env.storage_controller.poll_node_status(
|
||||
secondary.id,
|
||||
desired_availability=PageserverAvailability.ACTIVE,
|
||||
desired_scheduling_policy=None,
|
||||
max_attempts=10,
|
||||
backoff=1,
|
||||
)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
assert notifications[-1] == {
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"stripe_size": None,
|
||||
"shards": [{"node_id": int(secondary.id), "shard_number": 0}],
|
||||
"preferred_az": DEFAULT_AZ_ID,
|
||||
}
|
||||
|
||||
@@ -776,7 +776,6 @@ def test_lsn_lease_storcon(neon_env_builder: NeonEnvBuilder):
|
||||
env.initial_tenant, env.initial_timeline, last_flush_lsn
|
||||
)
|
||||
env.storage_controller.tenant_shard_split(env.initial_tenant, 8)
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=120)
|
||||
# TODO: do we preserve LSN leases across shard splits?
|
||||
env.storage_controller.pageserver_api().timeline_lsn_lease(
|
||||
env.initial_tenant, env.initial_timeline, last_flush_lsn
|
||||
|
||||
@@ -1768,87 +1768,6 @@ def test_pageserver_compaction_detach_ancestor_smoke(neon_env_builder: NeonEnvBu
|
||||
workload_child.validate(env.pageserver.id)
|
||||
|
||||
|
||||
def test_timeline_detach_with_aux_files_with_detach_v1(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Validate that "branches do not inherit their parent" is invariant over detach_ancestor.
|
||||
|
||||
Branches hide parent branch aux files etc by stopping lookup of non-inherited keyspace at the parent-child boundary.
|
||||
We had a bug where detach_ancestor running on a child branch would copy aux files key range from child to parent,
|
||||
thereby making parent aux files reappear.
|
||||
"""
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"gc_period": "1s",
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
)
|
||||
|
||||
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
|
||||
|
||||
http = env.pageserver.http_client()
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=env.initial_tenant)
|
||||
lsn0 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
endpoint.safe_psql(
|
||||
"SELECT pg_create_logical_replication_slot('test_slot_parent_1', 'pgoutput')"
|
||||
)
|
||||
lsn1 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
endpoint.safe_psql(
|
||||
"SELECT pg_create_logical_replication_slot('test_slot_parent_2', 'pgoutput')"
|
||||
)
|
||||
lsn2 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn0).keys()) == set(
|
||||
[]
|
||||
)
|
||||
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn1).keys()) == set(
|
||||
["pg_replslot/test_slot_parent_1/state"]
|
||||
)
|
||||
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn2).keys()) == set(
|
||||
["pg_replslot/test_slot_parent_1/state", "pg_replslot/test_slot_parent_2/state"]
|
||||
)
|
||||
|
||||
# Restore at LSN1
|
||||
branch_timeline_id = env.create_branch("restore", env.initial_tenant, "main", lsn1)
|
||||
endpoint2 = env.endpoints.create_start("restore", tenant_id=env.initial_tenant)
|
||||
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
|
||||
|
||||
# Add a new slot file to the restore branch (This won't happen in reality because cplane immediately detaches the branch on restore,
|
||||
# but we want to ensure that aux files on the detached branch are NOT inherited during ancestor detach. We could change the behavior
|
||||
# in the future.
|
||||
# TL;DR we should NEVER automatically detach a branch as a background optimization for those tenants that already used the restore
|
||||
# feature before branch detach was introduced because it will clean up the aux files and stop logical replication.
|
||||
endpoint2.safe_psql(
|
||||
"SELECT pg_create_logical_replication_slot('test_slot_restore', 'pgoutput')"
|
||||
)
|
||||
lsn3 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
|
||||
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
|
||||
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn3).keys()) == set(
|
||||
["pg_replslot/test_slot_restore/state"]
|
||||
)
|
||||
|
||||
print("lsn0=", lsn0)
|
||||
print("lsn1=", lsn1)
|
||||
print("lsn2=", lsn2)
|
||||
print("lsn3=", lsn3)
|
||||
# Detach the restore branch so that main doesn't have any child branches.
|
||||
all_reparented = http.detach_ancestor(
|
||||
env.initial_tenant, branch_timeline_id, detach_behavior="v1"
|
||||
)
|
||||
assert all_reparented == set([])
|
||||
|
||||
# We need to ensure all safekeeper data are ingested before checking aux files: the API does not wait for LSN.
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
|
||||
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn2).keys()) == set(
|
||||
["pg_replslot/test_slot_parent_1/state", "pg_replslot/test_slot_parent_2/state"]
|
||||
), "main branch unaffected"
|
||||
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn3).keys()) == set(
|
||||
["pg_replslot/test_slot_restore/state"]
|
||||
)
|
||||
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
|
||||
|
||||
|
||||
# TODO:
|
||||
# - branch near existing L1 boundary, image layers?
|
||||
# - investigate: why are layers started at uneven lsn? not just after branching, but in general.
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: bce3e48d8a...35bc1b0cba
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 4ac24a747c...6cea02e23c
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 26c7d3f6de...473f68210d
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 7ec41bf6cd...22533c63fc
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.4",
|
||||
"7ec41bf6cd92a4af751272145fdd590270c491da"
|
||||
"22533c63fc42cdc1dbe138650ba1eca10a70c5d7"
|
||||
],
|
||||
"v16": [
|
||||
"16.8",
|
||||
"26c7d3f6de6f361c8923bb80d7563853b4a04958"
|
||||
"473f68210d52ff8508f71c15b0c77c01296f4ace"
|
||||
],
|
||||
"v15": [
|
||||
"15.12",
|
||||
"4ac24a747cd897119ce9b20547b3b04eba2cacbd"
|
||||
"6cea02e23caa950d5f06932491a91b6af8f54360"
|
||||
],
|
||||
"v14": [
|
||||
"14.17",
|
||||
"bce3e48d8a72e70e72dfee1b7421fecd0f1b00ac"
|
||||
"35bc1b0cba55680e3b37abce4e67a46bb15f3315"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user