Compare commits

..

6 Commits

Author SHA1 Message Date
Kosntantin Knizhnik
7c8e87056b Move check of page LSNB to heck_page_lsn() function 2025-07-14 18:03:58 +03:00
Konstantin Knizhnik
1db119c657 Use GetCurrentReplayRecPtr instead of GetXLogReplayRecPtr in the check for returned page LSN 2025-07-14 08:44:05 +03:00
Kosntantin Knizhnik
00826b4082 Handle case of 0 replay_lsn 2025-07-13 20:41:49 +03:00
Kosntantin Knizhnik
b726293ec3 Fix indentation 2025-07-13 16:55:29 +03:00
Kosntantin Knizhnik
c42c38138e Set request_lsn=max(gc_cutoff,request_lsn) 2025-07-12 15:55:41 +03:00
Kosntantin Knizhnik
af61b7238d Set request_lsn=max(gc_cutoff,request_lsn) 2025-07-12 15:55:35 +03:00
68 changed files with 886 additions and 2314 deletions

View File

@@ -27,4 +27,4 @@
!storage_controller/
!vendor/postgres-*/
!workspace_hack/
!build-tools/patches
!build_tools/patches

View File

@@ -31,7 +31,6 @@ config-variables:
- NEON_PROD_AWS_ACCOUNT_ID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- PREWARM_PGBENCH_SIZE
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID

View File

@@ -176,11 +176,7 @@ runs:
fi
if [[ $BUILD_TYPE == "debug" && $RUNNER_ARCH == 'X64' ]]; then
# We don't use code coverage for regression tests (the step is disabled),
# so there's no need to collect it.
# Ref https://github.com/neondatabase/neon/issues/4540
# cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=()
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
else
cov_prefix=()
fi

View File

@@ -150,7 +150,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v15 build
id: cache_pg_15
@@ -162,7 +162,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v16 build
id: cache_pg_16
@@ -174,7 +174,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v17 build
id: cache_pg_17
@@ -186,7 +186,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Build all
# Note: the Makefile picks up BUILD_TYPE and CARGO_PROFILE from the env variables

View File

@@ -219,7 +219,6 @@ jobs:
--ignore test_runner/performance/test_cumulative_statistics_persistence.py
--ignore test_runner/performance/test_perf_many_relations.py
--ignore test_runner/performance/test_perf_oltp_large_tenant.py
--ignore test_runner/performance/test_lfc_prewarm.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -411,77 +410,6 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
prewarm-test:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
PGBENCH_SIZE: ${{ vars.PREWARM_PGBENCH_SIZE }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 17
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Run prewarm benchmark
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_lfc_prewarm.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
generate-matrices:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)

View File

@@ -72,7 +72,7 @@ jobs:
ARCHS: ${{ inputs.archs || '["x64","arm64"]' }}
DEBIANS: ${{ inputs.debians || '["bullseye","bookworm"]' }}
IMAGE_TAG: |
${{ hashFiles('build-tools/Dockerfile',
${{ hashFiles('build-tools.Dockerfile',
'.github/workflows/build-build-tools-image.yml') }}
run: |
echo "archs=${ARCHS}" | tee -a ${GITHUB_OUTPUT}
@@ -144,7 +144,7 @@ jobs:
- uses: docker/build-push-action@471d1dc4e07e5cdedd4c2171150001c434f0b7a4 # v6.15.0
with:
file: build-tools/Dockerfile
file: build-tools.Dockerfile
context: .
provenance: false
push: true

2
Cargo.lock generated
View File

@@ -4296,7 +4296,6 @@ dependencies = [
"pageserver_client",
"pageserver_client_grpc",
"pageserver_page_api",
"pprof",
"rand 0.8.5",
"reqwest",
"serde",
@@ -7564,7 +7563,6 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]

View File

@@ -201,7 +201,7 @@ tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.g
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "io-util", "rt"] }
toml = "0.8"

View File

@@ -35,7 +35,7 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused=on\ntimeout=15\ntries=5\nretry-on-host-error=on\n" > /root/.wgetrc && \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
COPY build-tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
COPY build_tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
set -e && \

View File

@@ -9,7 +9,7 @@
#
# build-tools: This contains Rust compiler toolchain and other tools needed at compile
# time. This is also used for the storage builds. This image is defined in
# build-tools/Dockerfile.
# build-tools.Dockerfile.
#
# build-deps: Contains C compiler, other build tools, and compile-time dependencies
# needed to compile PostgreSQL and most extensions. (Some extensions need
@@ -115,7 +115,7 @@ ARG EXTENSIONS=all
FROM $BASE_IMAGE_SHA AS build-deps
ARG DEBIAN_VERSION
# Keep in sync with build-tools/Dockerfile
# Keep in sync with build-tools.Dockerfile
ENV PROTOC_VERSION=25.1
# Use strict mode for bash to catch errors early
@@ -1790,7 +1790,7 @@ RUN set -e \
#########################################################################################
FROM build-deps AS exporters
ARG TARGETARCH
# Keep sql_exporter version same as in build-tools/Dockerfile and
# Keep sql_exporter version same as in build-tools.Dockerfile and
# test_runner/regress/test_compute_metrics.py
# See comment on the top of the file regading `echo`, `-e` and `\n`
RUN if [ "$TARGETARCH" = "amd64" ]; then\

View File

@@ -1286,7 +1286,9 @@ impl ComputeNode {
// In case of error, log and fail the check, but don't crash.
// We're playing it safe because these errors could be transient
// and we don't yet retry.
// and we don't yet retry. Also being careful here allows us to
// be backwards compatible with safekeepers that don't have the
// TIMELINE_STATUS API yet.
if responses.len() < quorum {
error!(
"failed sync safekeepers check {:?} {:?} {:?}",
@@ -2485,7 +2487,7 @@ pub async fn installed_extensions(conf: tokio_postgres::Config) -> Result<()> {
serde_json::to_string(&extensions).expect("failed to serialize extensions list")
);
}
Err(err) => error!("could not get installed extensions: {err}"),
Err(err) => error!("could not get installed extensions: {err:?}"),
}
Ok(())
}

View File

@@ -2,7 +2,6 @@ use std::collections::HashMap;
use anyhow::Result;
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use tokio_postgres::error::Error as PostgresError;
use tokio_postgres::{Client, Config, NoTls};
use crate::metrics::INSTALLED_EXTENSIONS;
@@ -11,7 +10,7 @@ use crate::metrics::INSTALLED_EXTENSIONS;
/// and to make database listing query here more explicit.
///
/// Limit the number of databases to 500 to avoid excessive load.
async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
async fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
// `pg_database.datconnlimit = -2` means that the database is in the
// invalid state
let databases = client
@@ -38,9 +37,7 @@ async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
/// Same extension can be installed in multiple databases with different versions,
/// so we report a separate metric (number of databases where it is installed)
/// for each extension version.
pub async fn get_installed_extensions(
mut conf: Config,
) -> Result<InstalledExtensions, PostgresError> {
pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExtensions> {
conf.application_name("compute_ctl:get_installed_extensions");
let databases: Vec<String> = {
let (mut client, connection) = conf.connect(NoTls).await?;

View File

@@ -464,7 +464,7 @@ impl Endpoint {
conf.append("max_connections", "100");
conf.append("wal_level", "logical");
// wal_sender_timeout is the maximum time to wait for WAL replication.
// It also defines how often the walreceiver will send a feedback message to the wal sender.
// It also defines how often the walreciever will send a feedback message to the wal sender.
conf.append("wal_sender_timeout", "5s");
conf.append("listen_addresses", &self.pg_address.ip().to_string());
conf.append("port", &self.pg_address.port().to_string());

View File

@@ -476,7 +476,6 @@ async fn main() -> anyhow::Result<()> {
listen_http_port,
listen_https_port,
availability_zone_id: AvailabilityZone(availability_zone_id),
node_ip_addr: None,
}),
)
.await?;

View File

@@ -75,7 +75,7 @@ CLI examples:
* AWS S3 : `env AWS_ACCESS_KEY_ID='SOMEKEYAAAAASADSAH*#' AWS_SECRET_ACCESS_KEY='SOMEsEcReTsd292v' ${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/'}"`
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
For local S3 installations, refer to their documentation for name format and credentials.
For local S3 installations, refer to the their documentation for name format and credentials.
Similar to other pageserver settings, toml config file can be used to configure either of the storages as backup targets.
Required sections are:

View File

@@ -1,6 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::net::IpAddr;
use std::str::FromStr;
use std::time::{Duration, Instant};
@@ -61,11 +60,6 @@ pub struct NodeRegisterRequest {
pub listen_https_port: Option<u16>,
pub availability_zone_id: AvailabilityZone,
// Reachable IP address of the PS/SK registering, if known.
// Hadron Cluster Coordiantor will update the DNS record of the registering node
// with this IP address.
pub node_ip_addr: Option<IpAddr>,
}
#[derive(Serialize, Deserialize)]
@@ -551,39 +545,6 @@ pub struct SafekeeperDescribeResponse {
pub scheduling_policy: SkSchedulingPolicy,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TimelineSafekeeperPeer {
pub node_id: NodeId,
pub listen_http_addr: String,
pub http_port: i32,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SCSafekeeperTimeline {
// SC does not know the tenant id.
pub timeline_id: TimelineId,
pub peers: Vec<NodeId>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SCSafekeeperTimelinesResponse {
pub timelines: Vec<SCSafekeeperTimeline>,
pub safekeeper_peers: Vec<TimelineSafekeeperPeer>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SafekeeperTimeline {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub peers: Vec<NodeId>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SafekeeperTimelinesResponse {
pub timelines: Vec<SafekeeperTimeline>,
pub safekeeper_peers: Vec<TimelineSafekeeperPeer>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct SafekeeperSchedulingPolicyRequest {
pub scheduling_policy: SkSchedulingPolicy,

View File

@@ -1,73 +0,0 @@
use std::env::{VarError, var};
use std::error::Error;
use std::net::IpAddr;
use std::str::FromStr;
/// Name of the environment variable containing the reachable IP address of the node. If set, the IP address contained in this
/// environment variable is used as the reachable IP address of the pageserver or safekeeper node during node registration.
/// In a Kubernetes environment, this environment variable should be set by Kubernetes to the Pod IP (specified in the Pod
/// template).
pub const HADRON_NODE_IP_ADDRESS: &str = "HADRON_NODE_IP_ADDRESS";
/// Read the reachable IP address of this page server from env var HADRON_NODE_IP_ADDRESS.
/// In Kubernetes this environment variable is set to the Pod IP (specified in the Pod template).
pub fn read_node_ip_addr_from_env() -> Result<Option<IpAddr>, Box<dyn Error>> {
match var(HADRON_NODE_IP_ADDRESS) {
Ok(v) => {
if let Ok(addr) = IpAddr::from_str(&v) {
Ok(Some(addr))
} else {
Err(format!("Invalid IP address string: {v}. Cannot be parsed as either an IPv4 or an IPv6 address.").into())
}
}
Err(VarError::NotPresent) => Ok(None),
Err(e) => Err(e.into()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
use std::net::{Ipv4Addr, Ipv6Addr};
#[test]
fn test_read_node_ip_addr_from_env() {
// SAFETY: test code
unsafe {
// Test with a valid IPv4 address
env::set_var(HADRON_NODE_IP_ADDRESS, "192.168.1.1");
let result = read_node_ip_addr_from_env().unwrap();
assert_eq!(result, Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))));
// Test with a valid IPv6 address
env::set_var(
HADRON_NODE_IP_ADDRESS,
"2001:0db8:85a3:0000:0000:8a2e:0370:7334",
);
}
let result = read_node_ip_addr_from_env().unwrap();
assert_eq!(
result,
Some(IpAddr::V6(
Ipv6Addr::from_str("2001:0db8:85a3:0000:0000:8a2e:0370:7334").unwrap()
))
);
// Test with an invalid IP address
// SAFETY: test code
unsafe {
env::set_var(HADRON_NODE_IP_ADDRESS, "invalid_ip");
}
let result = read_node_ip_addr_from_env();
assert!(result.is_err());
// Test with no environment variable set
// SAFETY: test code
unsafe {
env::remove_var(HADRON_NODE_IP_ADDRESS);
}
let result = read_node_ip_addr_from_env().unwrap();
assert_eq!(result, None);
}
}

View File

@@ -26,9 +26,6 @@ pub mod auth;
// utility functions and helper traits for unified unique id generation/serialization etc.
pub mod id;
// utility functions to obtain reachable IP addresses in PS/SK nodes.
pub mod ip_address;
pub mod shard;
mod hex;

View File

@@ -1,5 +1,4 @@
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::time::Duration;
@@ -8,7 +7,7 @@ use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, VariantNames};
use tokio::time::Instant;
use tracing::{info, warn};
use tracing::info;
/// Logs a critical error, similarly to `tracing::error!`. This will:
///
@@ -378,11 +377,10 @@ impl std::fmt::Debug for SecretString {
///
/// TODO: consider upgrading this to a warning, but currently it fires too often.
#[inline]
pub async fn log_slow<O>(
name: &str,
threshold: Duration,
f: Pin<&mut impl Future<Output = O>>,
) -> O {
pub async fn log_slow<F, O>(name: &str, threshold: Duration, f: std::pin::Pin<&mut F>) -> O
where
F: Future<Output = O>,
{
monitor_slow_future(
threshold,
threshold, // period = threshold
@@ -396,42 +394,16 @@ pub async fn log_slow<O>(
if !is_slow {
return;
}
let elapsed = elapsed_total.as_secs_f64();
if ready {
info!("slow {name} completed after {elapsed:.3}s");
info!(
"slow {name} completed after {:.3}s",
elapsed_total.as_secs_f64()
);
} else {
info!("slow {name} still running after {elapsed:.3}s");
}
},
)
.await
}
/// Logs a periodic warning if a future is slow to complete.
#[inline]
pub async fn warn_slow<O>(
name: &str,
threshold: Duration,
f: Pin<&mut impl Future<Output = O>>,
) -> O {
monitor_slow_future(
threshold,
threshold, // period = threshold
f,
|MonitorSlowFutureCallback {
ready,
is_slow,
elapsed_total,
elapsed_since_last_callback: _,
}| {
if !is_slow {
return;
}
let elapsed = elapsed_total.as_secs_f64();
if ready {
warn!("slow {name} completed after {elapsed:.3}s");
} else {
warn!("slow {name} still running after {elapsed:.3}s");
info!(
"slow {name} still running after {:.3}s",
elapsed_total.as_secs_f64()
);
}
},
)
@@ -444,7 +416,7 @@ pub async fn warn_slow<O>(
pub async fn monitor_slow_future<F, O>(
threshold: Duration,
period: Duration,
mut fut: Pin<&mut F>,
mut fut: std::pin::Pin<&mut F>,
mut cb: impl FnMut(MonitorSlowFutureCallback),
) -> O
where

View File

@@ -1,16 +1,13 @@
use std::collections::HashMap;
use std::num::NonZero;
use std::pin::pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::anyhow;
use arc_swap::ArcSwap;
use futures::stream::FuturesUnordered;
use futures::{FutureExt as _, StreamExt as _};
use tonic::codec::CompressionEncoding;
use tracing::{debug, instrument};
use utils::logging::warn_slow;
use tracing::instrument;
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
use crate::retry::Retry;
@@ -24,40 +21,28 @@ use utils::shard::{ShardCount, ShardIndex, ShardNumber};
/// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
/// when full.
///
/// Normal requests are small, and we don't pipeline them, so we can afford a large number of
/// streams per connection.
///
/// TODO: tune all of these constants, and consider making them configurable.
const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(64).unwrap();
/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels
/// with only streams.
const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
/// Max number of concurrent bulk GetPage streams per channel (i.e. TCP connection). These use a
/// dedicated channel pool with a lower client limit, to avoid TCP-level head-of-line blocking and
/// transmission delays. This also concentrates large window sizes on a smaller set of
/// streams/connections, presumably reducing memory use.
const MAX_BULK_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
/// Max number of concurrent unary request clients per shard.
const MAX_UNARY_CLIENTS: NonZero<usize> = NonZero::new(64).unwrap();
/// The batch size threshold at which a GetPage request will use the bulk stream pool.
///
/// The gRPC initial window size is 64 KB. Each page is 8 KB, so let's avoid increasing the window
/// size for the normal stream pool, and route requests for >= 5 pages (>32 KB) to the bulk pool.
const BULK_THRESHOLD_BATCH_SIZE: usize = 5;
/// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage
/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`.
const MAX_STREAMS: NonZero<usize> = NonZero::new(64).unwrap();
/// The overall request call timeout, including retries and pool acquisition.
/// TODO: should we retry forever? Should the caller decide?
const CALL_TIMEOUT: Duration = Duration::from_secs(60);
/// Max number of pipelined requests per stream.
const MAX_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(2).unwrap();
/// The per-request (retry attempt) timeout, including any lazy connection establishment.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these
/// are more throughput-oriented, we have a smaller limit but higher queue depth.
const MAX_BULK_STREAMS: NonZero<usize> = NonZero::new(16).unwrap();
/// The initial request retry backoff duration. The first retry does not back off.
/// TODO: use a different backoff for ResourceExhausted (rate limiting)? Needs server support.
const BASE_BACKOFF: Duration = Duration::from_millis(5);
/// The maximum request retry backoff duration.
const MAX_BACKOFF: Duration = Duration::from_secs(5);
/// Threshold and interval for warning about slow operation.
const SLOW_THRESHOLD: Duration = Duration::from_secs(3);
/// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus
/// get a larger queue depth.
const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
/// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
/// basic `page_api::Client` gRPC client, and supports:
@@ -65,19 +50,10 @@ const SLOW_THRESHOLD: Duration = Duration::from_secs(3);
/// * Sharded tenants across multiple Pageservers.
/// * Pooling of connections, clients, and streams for efficient resource use.
/// * Concurrent use by many callers.
/// * Internal handling of GetPage bidirectional streams.
/// * Internal handling of GetPage bidirectional streams, with pipelining and error handling.
/// * Automatic retries.
/// * Observability.
///
/// The client has dedicated connection/client/stream pools per shard, for resource reuse. These
/// pools are unbounded: we allow scaling out as many concurrent streams as needed to serve all
/// concurrent callers, which mostly eliminates head-of-line blocking. Idle streams are fairly
/// cheap: the server task currently uses 26 KB of memory, so we can comfortably fit 100,000
/// concurrent idle streams (2.5 GB memory). The worst case degenerates to the old libpq case with
/// one stream per backend, but without the TCP connection overhead. In the common case we expect
/// significantly lower stream counts due to stream sharing, driven e.g. by idle backends, LFC hits,
/// read coalescing, sharding (backends typically only talk to one shard at a time), etc.
///
/// TODO: this client does not support base backups or LSN leases, as these are only used by
/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
pub struct PageserverClient {
@@ -91,6 +67,8 @@ pub struct PageserverClient {
compression: Option<CompressionEncoding>,
/// The shards for this tenant.
shards: ArcSwap<Shards>,
/// The retry configuration.
retry: Retry,
}
impl PageserverClient {
@@ -116,6 +94,7 @@ impl PageserverClient {
auth_token,
compression,
shards: ArcSwap::new(Arc::new(shards)),
retry: Retry,
})
}
@@ -163,15 +142,13 @@ impl PageserverClient {
&self,
req: page_api::CheckRelExistsRequest,
) -> tonic::Result<page_api::CheckRelExistsResponse> {
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.check_rel_exists(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
self.retry
.with(async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.check_rel_exists(req).await
})
.await
}
/// Returns the total size of a database, as # of bytes.
@@ -180,15 +157,13 @@ impl PageserverClient {
&self,
req: page_api::GetDbSizeRequest,
) -> tonic::Result<page_api::GetDbSizeResponse> {
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.get_db_size(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
self.retry
.with(async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_db_size(req).await
})
.await
}
/// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
@@ -218,8 +193,6 @@ impl PageserverClient {
return Err(tonic::Status::invalid_argument("request attempt must be 0"));
}
debug!("sending request: {req:?}");
// The shards may change while we're fetching pages. We execute the request using a stable
// view of the shards (especially important for requests that span shards), but retry the
// top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
@@ -228,16 +201,13 @@ impl PageserverClient {
//
// TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
// once we figure out how to handle these.
let resp = Self::with_retries(CALL_TIMEOUT, async |attempt| {
let mut req = req.clone();
req.request_id.attempt = attempt as u32;
let shards = self.shards.load_full();
Self::with_timeout(REQUEST_TIMEOUT, Self::get_page_with_shards(req, &shards)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
self.retry
.with(async |attempt| {
let mut req = req.clone();
req.request_id.attempt = attempt as u32;
Self::get_page_with_shards(req, &self.shards.load_full()).await
})
.await
}
/// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
@@ -276,7 +246,7 @@ impl PageserverClient {
req: page_api::GetPageRequest,
shard: &Shard,
) -> tonic::Result<page_api::GetPageResponse> {
let mut stream = shard.stream(Self::is_bulk(&req)).await?;
let stream = shard.stream(req.request_class.is_bulk()).await;
let resp = stream.send(req.clone()).await?;
// Convert per-request errors into a tonic::Status.
@@ -320,15 +290,13 @@ impl PageserverClient {
&self,
req: page_api::GetRelSizeRequest,
) -> tonic::Result<page_api::GetRelSizeResponse> {
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.get_rel_size(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
self.retry
.with(async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_rel_size(req).await
})
.await
}
/// Fetches an SLRU segment.
@@ -337,50 +305,13 @@ impl PageserverClient {
&self,
req: page_api::GetSlruSegmentRequest,
) -> tonic::Result<page_api::GetSlruSegmentResponse> {
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// SLRU segments are only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.get_slru_segment(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
}
/// Runs the given async closure with retries up to the given timeout. Only certain gRPC status
/// codes are retried, see [`Retry::should_retry`]. Returns `DeadlineExceeded` on timeout.
async fn with_retries<T, F, O>(timeout: Duration, f: F) -> tonic::Result<T>
where
F: FnMut(usize) -> O, // pass attempt number, starting at 0
O: Future<Output = tonic::Result<T>>,
{
Retry {
timeout: Some(timeout),
base_backoff: BASE_BACKOFF,
max_backoff: MAX_BACKOFF,
}
.with(f)
.await
}
/// Runs the given future with a timeout. Returns `DeadlineExceeded` on timeout.
async fn with_timeout<T>(
timeout: Duration,
f: impl Future<Output = tonic::Result<T>>,
) -> tonic::Result<T> {
let started = Instant::now();
tokio::time::timeout(timeout, f).await.map_err(|_| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
started.elapsed().as_secs_f64()
))
})?
}
/// Returns true if the request is considered a bulk request and should use the bulk pool.
fn is_bulk(req: &page_api::GetPageRequest) -> bool {
req.block_numbers.len() >= BULK_THRESHOLD_BATCH_SIZE
self.retry
.with(async |_| {
// SLRU segments are only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_slru_segment(req).await
})
.await
}
}
@@ -509,23 +440,15 @@ impl Shards {
}
}
/// A single shard. Has dedicated resource pools with the following structure:
/// A single shard. Uses dedicated resource pools with the following structure:
///
/// * Channel pool: MAX_CLIENTS_PER_CHANNEL.
/// * Client pool: unbounded.
/// * Stream pool: unbounded.
/// * Bulk channel pool: MAX_BULK_CLIENTS_PER_CHANNEL.
/// * Channel pool: unbounded.
/// * Unary client pool: MAX_UNARY_CLIENTS.
/// * Stream client pool: unbounded.
/// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH.
/// * Bulk channel pool: unbounded.
/// * Bulk client pool: unbounded.
/// * Bulk stream pool: unbounded.
///
/// We use a separate bulk channel pool with a lower concurrency limit for large batch requests.
/// This avoids TCP-level head-of-line blocking, and also concentrates large window sizes on a
/// smaller set of streams/connections, which presumably reduces memory use. Neither of these pools
/// are bounded, nor do they pipeline requests, so the latency characteristics should be mostly
/// similar (except for TCP transmission time).
///
/// TODO: since we never use bounded pools, we could consider removing the pool limiters. However,
/// the code is fairly trivial, so we may as well keep them around for now in case we need them.
/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH.
struct Shard {
/// The shard ID.
id: ShardIndex,
@@ -533,7 +456,7 @@ struct Shard {
client_pool: Arc<ClientPool>,
/// GetPage stream pool.
stream_pool: Arc<StreamPool>,
/// GetPage stream pool for bulk requests.
/// GetPage stream pool for bulk requests, e.g. prefetches.
bulk_stream_pool: Arc<StreamPool>,
}
@@ -547,30 +470,50 @@ impl Shard {
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> {
// Shard pools for unary requests and non-bulk GetPage requests.
// Common channel pool for unary and stream requests. Bounded by client/stream pools.
let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?;
// Client pool for unary requests.
let client_pool = ClientPool::new(
ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?,
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
compression,
None, // unbounded
Some(MAX_UNARY_CLIENTS),
);
let stream_pool = StreamPool::new(client_pool.clone(), None); // unbounded
// Bulk GetPage stream pool for large batches (prefetches, sequential scans, vacuum, etc.).
// GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients,
// but shares a channel pool with it (as it's unbounded).
let stream_pool = StreamPool::new(
ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
compression,
None, // unbounded, limited by stream pool
),
Some(MAX_STREAMS),
MAX_STREAM_QUEUE_DEPTH,
);
// Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools
// to avoid head-of-line blocking of latency-sensitive requests.
let bulk_stream_pool = StreamPool::new(
ClientPool::new(
ChannelPool::new(url, MAX_BULK_CLIENTS_PER_CHANNEL)?,
ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?,
tenant_id,
timeline_id,
shard_id,
auth_token,
compression,
None, // unbounded,
None, // unbounded, limited by stream pool
),
None, // unbounded
Some(MAX_BULK_STREAMS),
MAX_BULK_STREAM_QUEUE_DEPTH,
);
Ok(Self {
@@ -582,23 +525,19 @@ impl Shard {
}
/// Returns a pooled client for this shard.
#[instrument(skip_all)]
async fn client(&self) -> tonic::Result<ClientGuard> {
warn_slow(
"client pool acquisition",
SLOW_THRESHOLD,
pin!(self.client_pool.get()),
)
.await
self.client_pool
.get()
.await
.map_err(|err| tonic::Status::internal(format!("failed to get client: {err}")))
}
/// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk pool.
#[instrument(skip_all, fields(bulk))]
async fn stream(&self, bulk: bool) -> tonic::Result<StreamGuard> {
let pool = match bulk {
false => &self.stream_pool,
true => &self.bulk_stream_pool,
};
warn_slow("stream pool acquisition", SLOW_THRESHOLD, pin!(pool.get())).await
/// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream
/// pool (e.g. for prefetches).
async fn stream(&self, bulk: bool) -> StreamGuard {
match bulk {
false => self.stream_pool.get().await,
true => self.bulk_stream_pool.get().await,
}
}
}

View File

@@ -9,36 +9,19 @@
//!
//! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients
//! can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a
//! per-channel client limit. Channels are closed immediately when empty, and indirectly rely on
//! client/stream idle timeouts.
//! per-channel client limit. Channels may be closed when they are no longer used by any clients.
//!
//! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared)
//! channel from the ChannelPool for the client's lifetime. A client can only be acquired by a
//! single caller at a time, and is returned to the pool when dropped. Idle clients are removed
//! from the pool after a while to free up resources.
//! single caller at a time, and is returned to the pool when dropped. Idle clients may be removed
//! from the pool after some time, to free up the channel.
//!
//! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the
//! ClientPool for the stream's lifetime. A stream can only be acquired by a single caller at a
//! time, and is returned to the pool when dropped. Idle streams are removed from the pool after
//! a while to free up resources.
//!
//! The stream only supports sending a single, synchronous request at a time, and does not support
//! pipelining multiple requests from different callers onto the same stream -- instead, we scale
//! out concurrent streams to improve throughput. There are many reasons for this design choice:
//!
//! * It (mostly) eliminates head-of-line blocking. A single stream is processed sequentially by
//! a single server task, which may block e.g. on layer downloads, LSN waits, etc.
//!
//! * Cancellation becomes trivial, by closing the stream. Otherwise, if a caller goes away
//! (e.g. because of a timeout), the request would still be processed by the server and block
//! requests behind it in the stream. It might even block its own timeout retry.
//!
//! * Stream scheduling becomes significantly simpler and cheaper.
//!
//! * Individual callers can still use client-side batching for pipelining.
//!
//! * Idle streams are cheap. Benchmarks show that an idle GetPage stream takes up about 26 KB
//! per stream (2.5 GB for 100,000 streams), so we can afford to scale out.
//! ClientPool for the stream's lifetime. Internal streams are not exposed to callers; instead, it
//! returns a guard that can be used to send a single request, to properly enforce queue depth and
//! route responses. Internally, the pool will reuse or spin up a suitable stream for the request,
//! possibly pipelining multiple requests from multiple callers on the same stream (up to some
//! queue depth). Idle streams may be removed from the pool after a while to free up the client.
//!
//! Each channel corresponds to one TCP connection. Each client unary request and each stream
//! corresponds to one HTTP/2 stream and server task.
@@ -46,31 +29,33 @@
//! TODO: error handling (including custom error types).
//! TODO: observability.
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::num::NonZero;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
use futures::{Stream, StreamExt as _};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch};
use tokio_stream::wrappers::WatchStream;
use futures::StreamExt as _;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tonic::codec::CompressionEncoding;
use tonic::transport::{Channel, Endpoint};
use tracing::{error, warn};
use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::ShardIndex;
/// Reap clients/streams that have been idle for this long. Channels are reaped immediately when
/// empty, and indirectly rely on the client/stream idle timeouts.
/// Reap channels/clients/streams that have been idle for this long.
///
/// A stream's client will be reaped after 2x the idle threshold (first stream the client), but
/// that's okay -- if the stream closes abruptly (e.g. due to timeout or cancellation), we want to
/// keep its client around in the pool for a while.
/// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to
/// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle.
/// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we
/// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to
/// account for it when setting the reap threshold. Alternatively, we can immediately reap empty
/// channels, and/or stream pool clients.
const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) {
false => Duration::from_secs(180),
true => Duration::from_secs(1), // exercise reaping in tests
@@ -98,6 +83,8 @@ pub struct ChannelPool {
max_clients_per_channel: NonZero<usize>,
/// Open channels.
channels: Mutex<BTreeMap<ChannelID, ChannelEntry>>,
/// Reaps idle channels.
idle_reaper: Reaper,
/// Channel ID generator.
next_channel_id: AtomicUsize,
}
@@ -109,6 +96,9 @@ struct ChannelEntry {
channel: Channel,
/// Number of clients using this channel.
clients: usize,
/// The channel has been idle (no clients) since this time. None if channel is in use.
/// INVARIANT: Some if clients == 0, otherwise None.
idle_since: Option<Instant>,
}
impl ChannelPool {
@@ -118,12 +108,15 @@ impl ChannelPool {
E: TryInto<Endpoint> + Send + Sync + 'static,
<E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
{
Ok(Arc::new(Self {
let pool = Arc::new(Self {
endpoint: endpoint.try_into()?,
max_clients_per_channel,
channels: Mutex::default(),
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
next_channel_id: AtomicUsize::default(),
}))
});
pool.idle_reaper.spawn(&pool);
Ok(pool)
}
/// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel.
@@ -144,17 +137,22 @@ impl ChannelPool {
let mut channels = self.channels.lock().unwrap();
// Try to find an existing channel with available capacity. We check entries in BTreeMap
// order, to fill up the lower-ordered channels first. The client/stream pools also prefer
// clients with lower-ordered channel IDs first. This will cluster clients in lower-ordered
// order, to fill up the lower-ordered channels first. The ClientPool also prefers clients
// with lower-ordered channel IDs first. This will cluster clients in lower-ordered
// channels, and free up higher-ordered channels such that they can be reaped.
for (&id, entry) in channels.iter_mut() {
assert!(
entry.clients <= self.max_clients_per_channel.get(),
"channel overflow"
);
assert_ne!(entry.clients, 0, "empty channel not reaped");
assert_eq!(
entry.idle_since.is_some(),
entry.clients == 0,
"incorrect channel idle state"
);
if entry.clients < self.max_clients_per_channel.get() {
entry.clients += 1;
entry.idle_since = None;
return ChannelGuard {
pool: Arc::downgrade(self),
id,
@@ -171,6 +169,7 @@ impl ChannelPool {
let entry = ChannelEntry {
channel: channel.clone(),
clients: 1, // account for the guard below
idle_since: None,
};
channels.insert(id, entry);
@@ -182,6 +181,20 @@ impl ChannelPool {
}
}
impl Reapable for ChannelPool {
/// Reaps channels that have been idle since before the cutoff.
fn reap_idle(&self, cutoff: Instant) {
self.channels.lock().unwrap().retain(|_, entry| {
let Some(idle_since) = entry.idle_since else {
assert_ne!(entry.clients, 0, "empty channel not marked idle");
return true;
};
assert_eq!(entry.clients, 0, "idle channel has clients");
idle_since >= cutoff
})
}
}
/// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`,
/// since the gRPC client requires an owned `Channel`.
pub struct ChannelGuard {
@@ -198,7 +211,7 @@ impl ChannelGuard {
}
}
/// Returns the channel to the pool. The channel is closed when empty.
/// Returns the channel to the pool.
impl Drop for ChannelGuard {
fn drop(&mut self) {
let Some(pool) = self.pool.upgrade() else {
@@ -207,12 +220,11 @@ impl Drop for ChannelGuard {
let mut channels = pool.channels.lock().unwrap();
let entry = channels.get_mut(&self.id).expect("unknown channel");
assert!(entry.idle_since.is_none(), "active channel marked idle");
assert!(entry.clients > 0, "channel underflow");
entry.clients -= 1;
// Reap empty channels immediately.
if entry.clients == 0 {
channels.remove(&self.id);
entry.idle_since = Some(Instant::now()); // mark channel as idle
}
}
}
@@ -241,7 +253,8 @@ pub struct ClientPool {
///
/// The first client in the map will be acquired next. The map is sorted by client ID, which in
/// turn is sorted by its channel ID, such that we prefer acquiring idle clients from
/// lower-ordered channels. This allows us to free up and reap higher-ordered channels.
/// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle
/// clients are reaped.
idle: Mutex<BTreeMap<ClientID, ClientEntry>>,
/// Reaps idle clients.
idle_reaper: Reaper,
@@ -297,7 +310,7 @@ impl ClientPool {
/// This is moderately performance-sensitive. It is called for every unary request, but these
/// establish a new gRPC stream per request so they're already expensive. GetPage requests use
/// the `StreamPool` instead.
pub async fn get(self: &Arc<Self>) -> tonic::Result<ClientGuard> {
pub async fn get(self: &Arc<Self>) -> anyhow::Result<ClientGuard> {
// Acquire a permit if the pool is bounded.
let mut permit = None;
if let Some(limiter) = self.limiter.clone() {
@@ -315,7 +328,7 @@ impl ClientPool {
});
}
// Construct a new client.
// Slow path: construct a new client.
let mut channel_guard = self.channel_pool.get();
let client = page_api::Client::new(
channel_guard.take(),
@@ -324,8 +337,7 @@ impl ClientPool {
self.shard_id,
self.auth_token.clone(),
self.compression,
)
.map_err(|err| tonic::Status::internal(format!("failed to create client: {err}")))?;
)?;
Ok(ClientGuard {
pool: Arc::downgrade(self),
@@ -395,187 +407,287 @@ impl Drop for ClientGuard {
/// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream
/// acquires a client from the inner `ClientPool` for the stream's lifetime.
///
/// Individual streams only send a single request at a time, and do not pipeline multiple callers
/// onto the same stream. Instead, we scale out the number of concurrent streams. This is primarily
/// to eliminate head-of-line blocking. See the module documentation for more details.
/// Individual streams are not exposed to callers -- instead, the returned guard can be used to send
/// a single request and await the response. Internally, requests are multiplexed across streams and
/// channels. This allows proper queue depth enforcement and response routing.
///
/// TODO: consider making this generic over request and response types; not currently needed.
pub struct StreamPool {
/// The client pool to acquire clients from. Must be unbounded.
client_pool: Arc<ClientPool>,
/// Idle pooled streams. Acquired streams are removed from here and returned on drop.
/// All pooled streams.
///
/// The first stream in the map will be acquired next. The map is sorted by stream ID, which is
/// equivalent to the client ID and in turn sorted by its channel ID. This way we prefer
/// acquiring idle streams from lower-ordered channels, which allows us to free up and reap
/// higher-ordered channels.
idle: Mutex<BTreeMap<StreamID, StreamEntry>>,
/// Limits the max number of concurrent streams. None if the pool is unbounded.
/// Incoming requests will be sent over an existing stream with available capacity. If all
/// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each
/// stream has an associated Tokio task that processes requests and responses.
streams: Mutex<HashMap<StreamID, StreamEntry>>,
/// The max number of concurrent streams, or None if unbounded.
max_streams: Option<NonZero<usize>>,
/// The max number of concurrent requests per stream.
max_queue_depth: NonZero<usize>,
/// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`.
/// None if the pool is unbounded.
limiter: Option<Arc<Semaphore>>,
/// Reaps idle streams.
idle_reaper: Reaper,
/// Stream ID generator.
next_stream_id: AtomicUsize,
}
/// The stream ID. Reuses the inner client ID.
type StreamID = ClientID;
type StreamID = usize;
type RequestSender = Sender<(page_api::GetPageRequest, ResponseSender)>;
type RequestReceiver = Receiver<(page_api::GetPageRequest, ResponseSender)>;
type ResponseSender = oneshot::Sender<tonic::Result<page_api::GetPageResponse>>;
/// A pooled stream.
struct StreamEntry {
/// The bidirectional stream.
stream: BiStream,
/// The time when this stream was last used, i.e. when it was put back into `StreamPool::idle`.
idle_since: Instant,
}
/// A bidirectional GetPage stream and its client. Can send requests and receive responses.
struct BiStream {
/// The owning client. Holds onto the channel slot while the stream is alive.
client: ClientGuard,
/// Stream for sending requests. Uses a watch channel, so it can only send a single request at a
/// time, and the caller must await the response before sending another request. This is
/// enforced by `StreamGuard::send`.
sender: watch::Sender<page_api::GetPageRequest>,
/// Stream for receiving responses.
receiver: Pin<Box<dyn Stream<Item = tonic::Result<page_api::GetPageResponse>> + Send>>,
/// Sends caller requests to the stream task. The stream task exits when this is dropped.
sender: RequestSender,
/// Number of in-flight requests on this stream.
queue_depth: usize,
/// The time when this stream went idle (queue_depth == 0).
/// INVARIANT: Some if queue_depth == 0, otherwise None.
idle_since: Option<Instant>,
}
impl StreamPool {
/// Creates a new stream pool, using the given client pool. It will use up to `max_streams`
/// concurrent streams.
/// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth`
/// concurrent requests on each stream, and use up to `max_streams` concurrent streams.
///
/// The client pool must be unbounded. The stream pool will enforce its own limits, and because
/// streams are long-lived they can cause persistent starvation if they exhaust the client pool.
/// The stream pool should generally have its own dedicated client pool (but it can share a
/// channel pool with others since these are always unbounded).
pub fn new(client_pool: Arc<ClientPool>, max_streams: Option<NonZero<usize>>) -> Arc<Self> {
pub fn new(
client_pool: Arc<ClientPool>,
max_streams: Option<NonZero<usize>>,
max_queue_depth: NonZero<usize>,
) -> Arc<Self> {
assert!(client_pool.limiter.is_none(), "bounded client pool");
let pool = Arc::new(Self {
client_pool,
idle: Mutex::default(),
limiter: max_streams.map(|max_streams| Arc::new(Semaphore::new(max_streams.get()))),
streams: Mutex::default(),
limiter: max_streams.map(|max_streams| {
Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get()))
}),
max_streams,
max_queue_depth,
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
next_stream_id: AtomicUsize::default(),
});
pool.idle_reaper.spawn(&pool);
pool
}
/// Acquires an available stream from the pool, or spins up a new stream if all streams are
/// full. Returns a guard that can be used to send requests and await the responses. Blocks if
/// the pool is full.
/// Acquires an available stream from the pool, or spins up a new stream async if all streams
/// are full. Returns a guard that can be used to send a single request on the stream and await
/// the response, with queue depth quota already acquired. Blocks if the pool is at capacity
/// (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight).
///
/// This is very performance-sensitive, as it is on the GetPage hot path.
///
/// TODO: is a `Mutex<BTreeMap>` performant enough? Will it become too contended? We can't
/// trivially use e.g. DashMap or sharding, because we want to pop lower-ordered streams first
/// to free up higher-ordered channels.
pub async fn get(self: &Arc<Self>) -> tonic::Result<StreamGuard> {
/// TODO: this must do something more sophisticated for performance. We want:
///
/// * Cheap, concurrent access in the common case where we can use a pooled stream.
/// * Quick acquisition of pooled streams with available capacity.
/// * Prefer streams that belong to lower-numbered channels, to reap idle channels.
/// * Prefer filling up existing streams' queue depth before spinning up new streams.
/// * Don't hold a lock while spinning up new streams.
/// * Allow concurrent clients to join onto streams while they're spun up.
/// * Allow spinning up multiple streams concurrently, but don't overshoot limits.
///
/// For now, we just do something simple but inefficient (linear scan under mutex).
pub async fn get(self: &Arc<Self>) -> StreamGuard {
// Acquire a permit if the pool is bounded.
let mut permit = None;
if let Some(limiter) = self.limiter.clone() {
permit = Some(limiter.acquire_owned().await.expect("never closed"));
}
let mut streams = self.streams.lock().unwrap();
// Fast path: acquire an idle stream from the pool.
if let Some((_, entry)) = self.idle.lock().unwrap().pop_first() {
return Ok(StreamGuard {
pool: Arc::downgrade(self),
stream: Some(entry.stream),
can_reuse: true,
permit,
});
// Look for a pooled stream with available capacity.
for (&id, entry) in streams.iter_mut() {
assert!(
entry.queue_depth <= self.max_queue_depth.get(),
"stream queue overflow"
);
assert_eq!(
entry.idle_since.is_some(),
entry.queue_depth == 0,
"incorrect stream idle state"
);
if entry.queue_depth < self.max_queue_depth.get() {
entry.queue_depth += 1;
entry.idle_since = None;
return StreamGuard {
pool: Arc::downgrade(self),
id,
sender: entry.sender.clone(),
permit,
};
}
}
// Spin up a new stream. Uses a watch channel to send a single request at a time, since
// `StreamGuard::send` enforces this anyway and it avoids unnecessary channel overhead.
let mut client = self.client_pool.get().await?;
// No available stream, spin up a new one. We install the stream entry in the pool first and
// return the guard, while spinning up the stream task async. This allows other callers to
// join onto this stream and also create additional streams concurrently if this fills up.
let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed);
let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get());
let entry = StreamEntry {
sender: req_tx.clone(),
queue_depth: 1, // reserve quota for this caller
idle_since: None,
};
streams.insert(id, entry);
let (req_tx, req_rx) = watch::channel(page_api::GetPageRequest::default());
let req_stream = WatchStream::from_changes(req_rx);
let resp_stream = client.get_pages(req_stream).await?;
if let Some(max_streams) = self.max_streams {
assert!(streams.len() <= max_streams.get(), "stream overflow");
};
Ok(StreamGuard {
let client_pool = self.client_pool.clone();
let pool = Arc::downgrade(self);
tokio::spawn(async move {
if let Err(err) = Self::run_stream(client_pool, req_rx).await {
error!("stream failed: {err}");
}
// Remove stream from pool on exit. Weak reference to avoid holding the pool alive.
if let Some(pool) = pool.upgrade() {
let entry = pool.streams.lock().unwrap().remove(&id);
assert!(entry.is_some(), "unknown stream ID: {id}");
}
});
StreamGuard {
pool: Arc::downgrade(self),
stream: Some(BiStream {
client,
sender: req_tx,
receiver: Box::pin(resp_stream),
}),
can_reuse: true,
id,
sender: req_tx,
permit,
})
}
}
/// Runs a stream task. This acquires a client from the `ClientPool` and establishes a
/// bidirectional GetPage stream, then forwards requests and responses between callers and the
/// stream. It does not track or enforce queue depths -- that's done by `get()` since it must be
/// atomic with pool stream acquisition.
///
/// The task exits when the request channel is closed, or on a stream error. The caller is
/// responsible for removing the stream from the pool on exit.
async fn run_stream(
client_pool: Arc<ClientPool>,
mut caller_rx: RequestReceiver,
) -> anyhow::Result<()> {
// Acquire a client from the pool and create a stream.
let mut client = client_pool.get().await?;
// NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could
// theoretically deadlock if both the client and server block on sends (since we're not
// reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and
// low queue depths, but it was seen to happen with the libpq protocol so better safe than
// sorry. It should never buffer more than the queue depth anyway, but using an unbounded
// channel guarantees that it will never block.
let (req_tx, req_rx) = mpsc::unbounded_channel();
let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx);
let mut resp_stream = client.get_pages(req_stream).await?;
// Track caller response channels by request ID. If the task returns early, these response
// channels will be dropped and the waiting callers will receive an error.
//
// NB: this will leak entries if the server doesn't respond to a request (by request ID).
// It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and
// block further use. But we could consider reaping closed channels after some time.
let mut callers = HashMap::new();
// Process requests and responses.
loop {
tokio::select! {
// Receive requests from callers and send them to the stream.
req = caller_rx.recv() => {
// Shut down if request channel is closed.
let Some((req, resp_tx)) = req else {
return Ok(());
};
// Store the response channel by request ID.
if callers.contains_key(&req.request_id) {
// Error on request ID duplicates. Ignore callers that went away.
_ = resp_tx.send(Err(tonic::Status::invalid_argument(
format!("duplicate request ID: {}", req.request_id),
)));
continue;
}
callers.insert(req.request_id, resp_tx);
// Send the request on the stream. Bail out if the stream is closed.
req_tx.send(req).map_err(|_| {
tonic::Status::unavailable("stream closed")
})?;
}
// Receive responses from the stream and send them to callers.
resp = resp_stream.next() => {
// Shut down if the stream is closed, and bail out on stream errors.
let Some(resp) = resp.transpose()? else {
return Ok(())
};
// Send the response to the caller. Ignore errors if the caller went away.
let Some(resp_tx) = callers.remove(&resp.request_id) else {
warn!("received response for unknown request ID: {}", resp.request_id);
continue;
};
_ = resp_tx.send(Ok(resp));
}
}
}
}
}
impl Reapable for StreamPool {
/// Reaps streams that have been idle since before the cutoff.
fn reap_idle(&self, cutoff: Instant) {
self.idle
.lock()
.unwrap()
.retain(|_, entry| entry.idle_since >= cutoff);
self.streams.lock().unwrap().retain(|_, entry| {
let Some(idle_since) = entry.idle_since else {
assert_ne!(entry.queue_depth, 0, "empty stream not marked idle");
return true;
};
assert_eq!(entry.queue_depth, 0, "idle stream has requests");
idle_since >= cutoff
});
}
}
/// A stream acquired from the pool. Returned to the pool when dropped, unless there are still
/// in-flight requests on the stream, or the stream failed.
/// A pooled stream reference. Can be used to send a single request, to properly enforce queue
/// depth. Queue depth is already reserved and will be returned on drop.
pub struct StreamGuard {
pool: Weak<StreamPool>,
stream: Option<BiStream>, // Some until dropped
can_reuse: bool, // returned to pool if true
id: StreamID,
sender: RequestSender,
permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
}
impl StreamGuard {
/// Sends a request on the stream and awaits the response. If the future is dropped before it
/// resolves (e.g. due to a timeout or cancellation), the stream will be closed to cancel the
/// request and is not returned to the pool. The same is true if the stream errors, in which
/// case the caller can't send further requests on the stream.
/// Sends a request on the stream and awaits the response. Consumes the guard, since it's only
/// valid for a single request (to enforce queue depth). This also drops the guard on return and
/// returns the queue depth quota to the pool.
///
/// We only support sending a single request at a time, to eliminate head-of-line blocking. See
/// module documentation for details.
/// The `GetPageRequest::request_id` must be unique across in-flight requests.
///
/// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status`
/// to avoid tearing down the stream for per-request errors. Callers must check this.
pub async fn send(
&mut self,
self,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
let req_id = req.request_id;
let stream = self.stream.as_mut().expect("not dropped");
let (resp_tx, resp_rx) = oneshot::channel();
// Mark the stream as not reusable while the request is in flight. We can't return the
// stream to the pool until we receive the response, to avoid head-of-line blocking and
// stale responses. Failed streams can't be reused either.
if !self.can_reuse {
return Err(tonic::Status::internal("stream can't be reused"));
}
self.can_reuse = false;
// Send the request and receive the response.
//
// NB: this uses a watch channel, so it's unsafe to change this code to pipeline requests.
stream
.sender
.send(req)
self.sender
.send((req, resp_tx))
.await
.map_err(|_| tonic::Status::unavailable("stream closed"))?;
let resp = stream
.receiver
.next()
resp_rx
.await
.ok_or_else(|| tonic::Status::unavailable("stream closed"))??;
if resp.request_id != req_id {
return Err(tonic::Status::internal(format!(
"response ID {} does not match request ID {}",
resp.request_id, req_id
)));
}
// Success, mark the stream as reusable.
self.can_reuse = true;
Ok(resp)
.map_err(|_| tonic::Status::unavailable("stream closed"))?
}
}
@@ -585,21 +697,26 @@ impl Drop for StreamGuard {
return; // pool was dropped
};
// If the stream isn't reusable, it can't be returned to the pool.
if !self.can_reuse {
return;
// Release the queue depth reservation on drop. This can prematurely decrement it if dropped
// before the response is received, but that's okay.
//
// TODO: actually, it's probably not okay. Queue depth release should be moved into the
// stream task, such that it continues to account for the queue depth slot until the server
// responds. Otherwise, if a slow request times out and keeps blocking the stream, the
// server will keep waiting on it and we can pile on subsequent requests (including the
// timeout retry) in the same stream and get blocked. But we may also want to avoid blocking
// requests on e.g. LSN waits and layer downloads, instead returning early to free up the
// stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line
// blocking. TBD.
let mut streams = pool.streams.lock().unwrap();
let entry = streams.get_mut(&self.id).expect("unknown stream");
assert!(entry.idle_since.is_none(), "active stream marked idle");
assert!(entry.queue_depth > 0, "stream queue underflow");
entry.queue_depth -= 1;
if entry.queue_depth == 0 {
entry.idle_since = Some(Instant::now()); // mark stream as idle
}
// Place the idle stream back into the pool.
let entry = StreamEntry {
stream: self.stream.take().expect("dropped once"),
idle_since: Instant::now(),
};
pool.idle
.lock()
.unwrap()
.insert(entry.stream.client.id, entry);
_ = self.permit; // returned on drop, referenced for visibility
}
}

View File

@@ -1,6 +1,5 @@
use std::time::Duration;
use futures::future::pending;
use tokio::time::Instant;
use tracing::{error, info, warn};
@@ -9,54 +8,60 @@ use utils::backoff::exponential_backoff_duration;
/// A retry handler for Pageserver gRPC requests.
///
/// This is used instead of backoff::retry for better control and observability.
pub struct Retry {
/// Timeout across all retry attempts. If None, retries forever.
pub timeout: Option<Duration>,
/// The initial backoff duration. The first retry does not use a backoff.
pub base_backoff: Duration,
/// The maximum backoff duration.
pub max_backoff: Duration,
}
pub struct Retry;
impl Retry {
/// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors,
/// using the current tracing span for context.
/// The per-request timeout.
// TODO: tune these, and/or make them configurable. Should we retry forever?
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// The total timeout across all attempts
const TOTAL_TIMEOUT: Duration = Duration::from_secs(60);
/// The initial backoff duration.
const BASE_BACKOFF: Duration = Duration::from_millis(10);
/// The maximum backoff duration.
const MAX_BACKOFF: Duration = Duration::from_secs(10);
/// If true, log successful requests. For debugging.
const LOG_SUCCESS: bool = false;
/// Runs the given async closure with timeouts and retries (exponential backoff), passing the
/// attempt number starting at 0. Logs errors, using the current tracing span for context.
///
/// Only certain gRPC status codes are retried, see [`Self::should_retry`].
/// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default
/// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`].
pub async fn with<T, F, O>(&self, mut f: F) -> tonic::Result<T>
where
F: FnMut(usize) -> O, // pass attempt number, starting at 0
F: FnMut(usize) -> O, // takes attempt number, starting at 0
O: Future<Output = tonic::Result<T>>,
{
let started = Instant::now();
let deadline = self.timeout.map(|timeout| started + timeout);
let deadline = started + Self::TOTAL_TIMEOUT;
let mut last_error = None;
let mut retries = 0;
loop {
// Set up a future to wait for the backoff, if any, and run the closure.
// Set up a future to wait for the backoff (if any) and run the request with a timeout.
let backoff_and_try = async {
// NB: sleep() always sleeps 1ms, even when given a 0 argument. See:
// https://github.com/tokio-rs/tokio/issues/6866
if let Some(backoff) = self.backoff_duration(retries) {
if let Some(backoff) = Self::backoff_duration(retries) {
tokio::time::sleep(backoff).await;
}
f(retries).await
let request_started = Instant::now();
tokio::time::timeout(Self::REQUEST_TIMEOUT, f(retries))
.await
.map_err(|_| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
request_started.elapsed().as_secs_f64()
))
})?
};
// Set up a future for the timeout, if any.
let timeout = async {
match deadline {
Some(deadline) => tokio::time::sleep_until(deadline).await,
None => pending().await,
}
};
// Wait for the backoff and request, or bail out if the timeout is exceeded.
// Wait for the backoff and request, or bail out if the total timeout is exceeded.
let result = tokio::select! {
result = backoff_and_try => result,
_ = timeout => {
_ = tokio::time::sleep_until(deadline) => {
let last_error = last_error.unwrap_or_else(|| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
@@ -74,7 +79,7 @@ impl Retry {
match result {
// Success, return the result.
Ok(result) => {
if retries > 0 {
if retries > 0 || Self::LOG_SUCCESS {
info!(
"request succeeded after {retries} retries in {:.3}s",
started.elapsed().as_secs_f64(),
@@ -107,13 +112,12 @@ impl Retry {
}
}
/// Returns the backoff duration for the given retry attempt, or None for no backoff. The first
/// attempt and first retry never backs off, so this returns None for 0 and 1 retries.
fn backoff_duration(&self, retries: usize) -> Option<Duration> {
/// Returns the backoff duration for the given retry attempt, or None for no backoff.
fn backoff_duration(retry: usize) -> Option<Duration> {
let backoff = exponential_backoff_duration(
(retries as u32).saturating_sub(1), // first retry does not back off
self.base_backoff.as_secs_f64(),
self.max_backoff.as_secs_f64(),
retry as u32,
Self::BASE_BACKOFF.as_secs_f64(),
Self::MAX_BACKOFF.as_secs_f64(),
);
(!backoff.is_zero()).then_some(backoff)
}

View File

@@ -49,7 +49,7 @@ impl From<ProtocolError> for tonic::Status {
}
/// The LSN a request should read at.
#[derive(Clone, Copy, Debug, Default)]
#[derive(Clone, Copy, Debug)]
pub struct ReadLsn {
/// The request's read LSN.
pub request_lsn: Lsn,
@@ -329,7 +329,7 @@ impl From<GetDbSizeResponse> for proto::GetDbSizeResponse {
}
/// Requests one or more pages.
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug)]
pub struct GetPageRequest {
/// A request ID. Will be included in the response. Should be unique for in-flight requests on
/// the stream.
@@ -430,13 +430,12 @@ impl From<RequestID> for proto::RequestId {
}
/// A GetPage request class.
#[derive(Clone, Copy, Debug, Default, strum_macros::Display)]
#[derive(Clone, Copy, Debug, strum_macros::Display)]
pub enum GetPageClass {
/// Unknown class. For backwards compatibility: used when an older client version sends a class
/// that a newer server version has removed.
Unknown,
/// A normal request. This is the default.
#[default]
Normal,
/// A prefetch request. NB: can only be classified on pg < 18.
Prefetch,
@@ -444,6 +443,19 @@ pub enum GetPageClass {
Background,
}
impl GetPageClass {
/// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than
/// latency-sensitive).
pub fn is_bulk(&self) -> bool {
match self {
Self::Unknown => false,
Self::Normal => false,
Self::Prefetch => true,
Self::Background => true,
}
}
}
impl From<proto::GetPageClass> for GetPageClass {
fn from(pb: proto::GetPageClass) -> Self {
match pb {

View File

@@ -16,7 +16,6 @@ futures.workspace = true
hdrhistogram.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
pprof.workspace = true
rand.workspace = true
reqwest.workspace = true
serde.workspace = true

View File

@@ -1,127 +0,0 @@
use std::sync::Arc;
use anyhow::anyhow;
use futures::StreamExt;
use tonic::transport::Endpoint;
use tracing::info;
use pageserver_page_api::{GetPageClass, GetPageRequest, GetPageStatusCode, ReadLsn, RelTag};
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::shard::ShardIndex;
/// Starts a large number of idle gRPC GetPage streams.
#[derive(clap::Parser)]
pub(crate) struct Args {
/// The Pageserver to connect to. Must use grpc://.
#[clap(long, default_value = "grpc://localhost:51051")]
server: String,
/// The Pageserver HTTP API.
#[clap(long, default_value = "http://localhost:9898")]
http_server: String,
/// The number of streams to open.
#[clap(long, default_value = "100000")]
count: usize,
/// Number of streams per connection.
#[clap(long, default_value = "100")]
per_connection: usize,
/// Send a single GetPage request on each stream.
#[clap(long, default_value_t = false)]
send_request: bool,
}
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
rt.block_on(main_impl(args))
}
async fn main_impl(args: Args) -> anyhow::Result<()> {
// Discover a tenant and timeline to use.
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(),
args.http_server.clone(),
None,
));
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client,
crate::util::cli::targets::Spec {
limit_to_first_n_targets: Some(1),
targets: None,
},
)
.await?;
let ttid = timelines
.first()
.ok_or_else(|| anyhow!("no timelines found"))?;
// Set up the initial client.
let endpoint = Endpoint::from_shared(args.server.clone())?;
let connect = async || {
pageserver_page_api::Client::new(
endpoint.connect().await?,
ttid.tenant_id,
ttid.timeline_id,
ShardIndex::unsharded(),
None,
None,
)
};
let mut client = connect().await?;
let mut streams = Vec::with_capacity(args.count);
// Create streams.
for i in 0..args.count {
if i % 100 == 0 {
info!("opened {}/{} streams", i, args.count);
}
if i % args.per_connection == 0 && i > 0 {
client = connect().await?;
}
let (req_tx, req_rx) = tokio::sync::mpsc::unbounded_channel();
let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx);
let mut resp_stream = client.get_pages(req_stream).await?;
// Send request if specified.
if args.send_request {
req_tx.send(GetPageRequest {
request_id: 1.into(),
request_class: GetPageClass::Normal,
read_lsn: ReadLsn {
request_lsn: Lsn::MAX,
not_modified_since_lsn: Some(Lsn(1)),
},
rel: RelTag {
spcnode: 1664, // pg_global
dbnode: 0, // shared database
relnode: 1262, // pg_authid
forknum: 0, // init
},
block_numbers: vec![0],
})?;
let resp = resp_stream
.next()
.await
.transpose()?
.ok_or_else(|| anyhow!("no response"))?;
if resp.status_code != GetPageStatusCode::Ok {
return Err(anyhow!("{} response", resp.status_code));
}
}
// Hold onto streams to avoid closing them.
streams.push((req_tx, resp_stream));
}
info!("opened {} streams, sleeping", args.count);
// Block forever, to hold the idle streams open for inspection.
futures::future::pending::<()>().await;
Ok(())
}

View File

@@ -1,7 +1,4 @@
use std::fs::File;
use clap::Parser;
use tracing::info;
use utils::logging;
/// Re-usable pieces of code that aren't CLI-specific.
@@ -20,73 +17,38 @@ mod cmd {
pub(super) mod aux_files;
pub(super) mod basebackup;
pub(super) mod getpage_latest_lsn;
pub(super) mod idle_streams;
pub(super) mod ondemand_download_churn;
pub(super) mod trigger_initial_size_calculation;
}
/// Component-level performance test for pageserver.
#[derive(clap::Parser)]
struct Args {
/// Takes a client CPU profile into profile.svg. The benchmark must exit cleanly before it's
/// written, e.g. via --runtime.
#[arg(long)]
profile: bool,
#[command(subcommand)]
subcommand: Subcommand,
}
#[derive(clap::Subcommand)]
enum Subcommand {
enum Args {
Basebackup(cmd::basebackup::Args),
GetPageLatestLsn(cmd::getpage_latest_lsn::Args),
TriggerInitialSizeCalculation(cmd::trigger_initial_size_calculation::Args),
OndemandDownloadChurn(cmd::ondemand_download_churn::Args),
AuxFiles(cmd::aux_files::Args),
IdleStreams(cmd::idle_streams::Args),
}
fn main() -> anyhow::Result<()> {
fn main() {
logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stderr,
)?;
)
.unwrap();
logging::replace_panic_hook_with_tracing_panic_hook().forget();
let args = Args::parse();
// Start a CPU profile if requested.
let mut profiler = None;
if args.profile {
profiler = Some(
pprof::ProfilerGuardBuilder::default()
.frequency(1000)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()?,
);
}
match args.subcommand {
Subcommand::Basebackup(args) => cmd::basebackup::main(args),
Subcommand::GetPageLatestLsn(args) => cmd::getpage_latest_lsn::main(args),
Subcommand::TriggerInitialSizeCalculation(args) => {
match args {
Args::Basebackup(args) => cmd::basebackup::main(args),
Args::GetPageLatestLsn(args) => cmd::getpage_latest_lsn::main(args),
Args::TriggerInitialSizeCalculation(args) => {
cmd::trigger_initial_size_calculation::main(args)
}
Subcommand::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args),
Subcommand::AuxFiles(args) => cmd::aux_files::main(args),
Subcommand::IdleStreams(args) => cmd::idle_streams::main(args),
}?;
// Generate a CPU flamegraph if requested.
if let Some(profiler) = profiler {
let report = profiler.report().build()?;
drop(profiler); // stop profiling
let file = File::create("profile.svg")?;
report.flamegraph(file)?;
info!("wrote CPU profile flamegraph to profile.svg")
Args::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args),
Args::AuxFiles(args) => cmd::aux_files::main(args),
}
Ok(())
.unwrap()
}

View File

@@ -194,7 +194,6 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
listen_http_port: m.http_port,
listen_https_port: m.https_port,
availability_zone_id: az_id.expect("Checked above"),
node_ip_addr: None,
})
}
Err(e) => {

View File

@@ -1,5 +1,5 @@
//! The validator is responsible for validating DeletionLists for execution,
//! based on whether the generation in the DeletionList is still the latest
//! based on whethe the generation in the DeletionList is still the latest
//! generation for a tenant.
//!
//! The purpose of validation is to ensure split-brain safety in the cluster

View File

@@ -10,7 +10,6 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result, anyhow};
use bytes::Bytes;
use enumset::EnumSet;
use futures::future::join_all;
use futures::{StreamExt, TryFutureExt};
@@ -47,7 +46,6 @@ use pageserver_api::shard::{ShardCount, TenantShardId};
use postgres_ffi::PgMajorVersion;
use remote_storage::{DownloadError, GenericRemoteStorage, TimeTravelError};
use scopeguard::defer;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tenant_size_model::svg::SvgBranchKind;
use tenant_size_model::{SizeResult, StorageModel};
@@ -59,7 +57,6 @@ use utils::auth::SwappableJwtAuth;
use utils::generation::Generation;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use wal_decoder::models::record::NeonWalRecord;
use crate::config::PageServerConf;
use crate::context;
@@ -80,7 +77,6 @@ use crate::tenant::remote_timeline_client::{
};
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::ValuesReconstructState;
use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerName};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
@@ -2712,16 +2708,6 @@ async fn deletion_queue_flush(
}
}
/// Try if `GetPage@Lsn` is successful, useful for manual debugging.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct GetPageResponse {
pub page: Bytes,
pub layers_visited: u32,
pub delta_layers_visited: u32,
pub records: Vec<(Lsn, NeonWalRecord)>,
pub img: Option<(Lsn, Bytes)>,
}
async fn getpage_at_lsn_handler(
request: Request<Body>,
cancel: CancellationToken,
@@ -2772,24 +2758,21 @@ async fn getpage_at_lsn_handler_inner(
// Use last_record_lsn if no lsn is provided
let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
let page = timeline.get(key.0, lsn, &ctx).await?;
if touch {
json_response(StatusCode::OK, ())
} else {
let mut reconstruct_state = ValuesReconstructState::new_with_debug(IoConcurrency::sequential());
let page = timeline.debug_get(key.0, lsn, &ctx, &mut reconstruct_state).await?;
let response = GetPageResponse {
page,
layers_visited: reconstruct_state.get_layers_visited(),
delta_layers_visited: reconstruct_state.get_delta_layers_visited(),
records: reconstruct_state.debug_state.records.clone(),
img: reconstruct_state.debug_state.img.clone(),
};
json_response(StatusCode::OK, response)
Result::<_, ApiError>::Ok(
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(hyper::Body::from(page))
.unwrap(),
)
}
}
.instrument(info_span!("timeline_debug_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}

View File

@@ -2167,7 +2167,7 @@ impl PageServerHandler {
fn effective_request_lsn(
timeline: &Timeline,
last_record_lsn: Lsn,
request_lsn: Lsn,
mut request_lsn: Lsn,
not_modified_since: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
) -> Result<Lsn, PageStreamError> {
@@ -2195,12 +2195,16 @@ impl PageServerHandler {
if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
let gc_info = &timeline.gc_info.read().unwrap();
if !gc_info.lsn_covered_by_lease(request_lsn) {
return Err(
PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
request_lsn, **latest_gc_cutoff_lsn
).into())
// While request was in flight, replica apply_lsn may be advanced.
// latest_gc_cutoff_lsn is conservative estimation for min(redo_lsn) for all replicas,
// so it is safe to move request_lsn forward to latest_gc_cutoff_lsn.
// If replica lease is expired and latest_gc_cutoff_lsn>redo_lsn for this replica,
// then check of page LSN at replia protects it from getting too new version of the page.
warn!(
"Tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
request_lsn, **latest_gc_cutoff_lsn
);
request_lsn = **latest_gc_cutoff_lsn;
}
}

View File

@@ -3393,13 +3393,7 @@ impl TenantShard {
.collect_vec();
for timeline in timelines {
// Include a span with the timeline ID. The parent span already has the tenant ID.
let span =
info_span!("maybe_freeze_ephemeral_layer", timeline_id = %timeline.timeline_id);
timeline
.maybe_freeze_ephemeral_layer()
.instrument(span)
.await;
timeline.maybe_freeze_ephemeral_layer().await;
}
}

View File

@@ -75,7 +75,7 @@ where
/// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
/// call, to collect more records.
///
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default)]
pub(crate) struct ValueReconstructState {
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
pub(crate) img: Option<(Lsn, Bytes)>,
@@ -308,9 +308,6 @@ pub struct ValuesReconstructState {
layers_visited: u32,
delta_layers_visited: u32,
pub(crate) enable_debug: bool,
pub(crate) debug_state: ValueReconstructState,
pub(crate) io_concurrency: IoConcurrency,
num_active_ios: Arc<AtomicUsize>,
@@ -660,23 +657,6 @@ impl ValuesReconstructState {
layers_visited: 0,
delta_layers_visited: 0,
io_concurrency,
enable_debug: false,
debug_state: ValueReconstructState::default(),
num_active_ios: Arc::new(AtomicUsize::new(0)),
read_path: None,
}
}
pub(crate) fn new_with_debug(io_concurrency: IoConcurrency) -> Self {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
keys_with_image_coverage: None,
layers_visited: 0,
delta_layers_visited: 0,
io_concurrency,
enable_debug: true,
debug_state: ValueReconstructState::default(),
num_active_ios: Arc::new(AtomicUsize::new(0)),
read_path: None,
}
@@ -690,12 +670,6 @@ impl ValuesReconstructState {
self.io_concurrency.spawn_io(fut).await;
}
pub(crate) fn set_debug_state(&mut self, debug_state: &ValueReconstructState) {
if self.enable_debug {
self.debug_state = debug_state.clone();
}
}
pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
self.layers_visited += 1;
if let ReadableLayer::PersistentLayer(layer) = layer {

View File

@@ -1253,57 +1253,6 @@ impl Timeline {
}
}
#[inline(always)]
pub(crate) async fn debug_get(
&self,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
reconstruct_state: &mut ValuesReconstructState,
) -> Result<Bytes, PageReconstructError> {
if !lsn.is_valid() {
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
}
// This check is debug-only because of the cost of hashing, and because it's a double-check: we
// already checked the key against the shard_identity when looking up the Timeline from
// page_service.
debug_assert!(!self.shard_identity.is_key_disposable(&key));
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
let vectored_res = self
.debug_get_vectored_impl(query, reconstruct_state, ctx)
.await;
let key_value = vectored_res?.pop_first();
match key_value {
Some((got_key, value)) => {
if got_key != key {
error!(
"Expected {}, but singular vectored get returned {}",
key, got_key
);
Err(PageReconstructError::Other(anyhow!(
"Singular vectored get returned wrong key"
)))
} else {
value
}
}
None => Err(PageReconstructError::MissingKey(Box::new(
MissingKeyError {
keyspace: KeySpace::single(key..key.next()),
shard: self.shard_identity.get_shard_number(&key),
original_hwm_lsn: lsn,
ancestor_lsn: None,
backtrace: None,
read_path: None,
query: None,
},
))),
}
}
pub(crate) const LAYERS_VISITED_WARN_THRESHOLD: u32 = 100;
/// Look up multiple page versions at a given LSN
@@ -1598,98 +1547,6 @@ impl Timeline {
Ok(results)
}
// A copy of the get_vectored_impl method except that we store the image and wal records into `reconstruct_state`.
// This is only used in the http getpage call for debugging purpose.
pub(super) async fn debug_get_vectored_impl(
&self,
query: VersionedKeySpaceQuery,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if query.is_empty() {
return Ok(BTreeMap::default());
}
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
Some(ReadPath::new(
query.total_keyspace(),
query.high_watermark_lsn()?,
))
} else {
None
};
reconstruct_state.read_path = read_path;
let traversal_res: Result<(), _> = self
.get_vectored_reconstruct_data(query.clone(), reconstruct_state, ctx)
.await;
if let Err(err) = traversal_res {
// Wait for all the spawned IOs to complete.
// See comments on `spawn_io` inside `storage_layer` for more details.
let mut collect_futs = std::mem::take(&mut reconstruct_state.keys)
.into_values()
.map(|state| state.collect_pending_ios())
.collect::<FuturesUnordered<_>>();
while collect_futs.next().await.is_some() {}
return Err(err);
};
let reconstruct_state = Arc::new(Mutex::new(reconstruct_state));
let futs = FuturesUnordered::new();
for (key, state) in std::mem::take(&mut reconstruct_state.lock().unwrap().keys) {
let req_lsn_for_key = query.map_key_to_lsn(&key);
futs.push({
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
let rc_clone = Arc::clone(&reconstruct_state);
async move {
assert_eq!(state.situation, ValueReconstructSituation::Complete);
let converted = match state.collect_pending_ios().await {
Ok(ok) => ok,
Err(err) => {
return (key, Err(err));
}
};
DELTAS_PER_READ_GLOBAL.observe(converted.num_deltas() as f64);
// The walredo module expects the records to be descending in terms of Lsn.
// And we submit the IOs in that order, so, there shuold be no need to sort here.
debug_assert!(
converted
.records
.is_sorted_by_key(|(lsn, _)| std::cmp::Reverse(*lsn)),
"{converted:?}"
);
{
let mut guard = rc_clone.lock().unwrap();
guard.set_debug_state(&converted);
}
(
key,
walredo_self
.reconstruct_value(
key,
req_lsn_for_key,
converted,
RedoAttemptType::ReadPage,
)
.await,
)
}
});
}
let results = futs
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
.await;
Ok(results)
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
pub(crate) fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
@@ -2036,8 +1893,6 @@ impl Timeline {
// an ephemeral layer open forever when idle. It also freezes layers if the global limit on
// ephemeral layer bytes has been breached.
pub(super) async fn maybe_freeze_ephemeral_layer(&self) {
debug_assert_current_span_has_tenant_and_timeline_id();
let Ok(mut write_guard) = self.write_lock.try_lock() else {
// If the write lock is held, there is an active wal receiver: rolling open layers
// is their responsibility while they hold this lock.

View File

@@ -1,6 +1,6 @@
//! An utilization metric which is used to decide on which pageserver to put next tenant.
//!
//! The metric is exposed via `GET /v1/utilization`. Refer and maintain its openapi spec as the
//! The metric is exposed via `GET /v1/utilization`. Refer and maintain it's openapi spec as the
//! truth.
use std::path::Path;

View File

@@ -1069,7 +1069,7 @@ impl WalIngest {
// NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
// go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
// read it, like GetNewMultiXactId(). This is different from how nextXid is
// incremented! nextXid skips over < FirstNormalTransactionId when the value
// incremented! nextXid skips over < FirstNormalTransactionId when the the value
// is stored, so it's never 0 in a checkpoint.
//
// I don't know why it's done that way, it seems less error-prone to skip over 0

View File

@@ -421,7 +421,7 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
{
if (resp->tag != T_NeonGetPageResponse && resp->tag != T_NeonErrorResponse)
{
neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=" UINT64_FORMAT ", ring_flush=" UINT64_FORMAT ", ring_unused=" UINT64_FORMAT "",
neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
resp->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
}
if (neon_protocol_version >= 3)
@@ -438,7 +438,7 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
getpage_resp->req.blkno != slot->buftag.blockNum)
{
NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC,
"Receive unexpected getpage response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}",
"Receive unexpected getpage response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno,
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), slot->buftag.forkNum, slot->buftag.blockNum);
}
@@ -447,7 +447,7 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
resp->lsn != slot->request_lsns.request_lsn ||
resp->not_modified_since != slot->request_lsns.not_modified_since)
{
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since));
}
@@ -496,9 +496,9 @@ communicator_prefetch_pump_state(void)
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
slot->my_ring_index, MyPState->ring_receive);
(long) slot->my_ring_index, (long) MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
@@ -789,9 +789,9 @@ prefetch_read(PrefetchRequest *slot)
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch read: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
slot->my_ring_index, MyPState->ring_receive);
(long)slot->my_ring_index, (long)MyPState->ring_receive);
}
/*
@@ -816,9 +816,9 @@ prefetch_read(PrefetchRequest *slot)
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
slot->my_ring_index, MyPState->ring_receive);
(long) slot->my_ring_index, (long) MyPState->ring_receive);
}
/* update prefetch state */
@@ -852,8 +852,8 @@ prefetch_read(PrefetchRequest *slot)
* and the prefetch queue was flushed during the receive call
*/
neon_shard_log(shard_no, LOG,
"No response from reading prefetch entry " UINT64_FORMAT ": %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
my_ring_index,
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
(long) my_ring_index,
RelFileInfoFmt(BufTagGetNRelFileInfo(buftag)),
buftag.forkNum, buftag.blockNum);
return false;
@@ -1045,6 +1045,34 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(!found);
}
/*
* Check that pahge LSN returned by PS to replica is not beyand replay LSN.
* It can happen only in case of deteriorated lease.
*/
static bool
check_page_lsn(NeonGetPageResponse* resp, XLogRecPtr* replay_lsn_ptr)
{
if (RecoveryInProgress())
{
XLogRecPtr page_lsn = PageGetLSN((Page)resp->page);
#if PG_VERSION_NUM >= 150000
XLogRecPtr replay_lsn = GetCurrentReplayRecPtr(NULL);
#else
/*
* PG14 doesn't have GetCurrentReplayRecPtr() function which returns end of currently applied record.
* And GetXLogReplayRecPtr returns end of WAL records which was already applied.
* So we have to use this hack with resp->req.lsn which is expected to contain end record ptr in this case.
* But it works onlyfor v3 protocol version.
*/
XLogRecPtr replay_lsn = Max(GetXLogReplayRecPtr(NULL), resp->req.hdr.lsn);
#endif
if (replay_lsn_ptr)
*replay_lsn_ptr = replay_lsn;
return replay_lsn == 0 || page_lsn <= replay_lsn;
}
return true;
}
/*
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
@@ -1068,7 +1096,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
for (int i = 0; i < nblocks; i++)
{
PrfHashEntry *entry;
NeonGetPageResponse* resp;
hashkey.buftag.blockNum = blocknum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
@@ -1101,8 +1129,16 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
continue;
}
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
resp = (NeonGetPageResponse*)slot->response;
/*
* Ignore "in-future" responses caused by deteriorated lease
*/
if (!check_page_lsn(resp, NULL))
{
continue;
}
memcpy(buffers[i], resp->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
@@ -1844,7 +1880,7 @@ nm_to_string(NeonMessage *msg)
NeonDbSizeResponse *msg_resp = (NeonDbSizeResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeResponse\"");
appendStringInfo(&s, ", \"db_size\": " INT64_FORMAT "}",
appendStringInfo(&s, ", \"db_size\": %ld}",
msg_resp->db_size);
appendStringInfoChar(&s, '}');
@@ -2045,7 +2081,7 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r
exists_resp->req.forknum != request.forknum)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(exists_resp->req.rinfo), exists_resp->req.forknum,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), request.forknum);
}
@@ -2058,14 +2094,14 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r
{
if (!equal_requests(resp, &request.hdr))
{
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
errmsg(NEON_TAG "[reqid %lx] could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
resp->reqid,
RelFileInfoFmt(rinfo),
forkNum,
@@ -2227,6 +2263,15 @@ Retry:
case T_NeonGetPageResponse:
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
XLogRecPtr replay_lsn;
if (!check_page_lsn(getpage_resp, &replay_lsn))
{
/* Alternative to throw error is to repeat the query with request_lsn=replay_lsn */
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("There is no more version of page %u of relation %u/%u/%u.%u at LSN %X/%X at page server, request LSN %X/%X, latest version is at LSN %X/%X",
blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(replay_lsn), LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(PageGetLSN((Page)getpage_resp->page)))));
}
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
@@ -2241,7 +2286,7 @@ Retry:
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[shard %d, reqid " UINT64_HEX_FORMAT "] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
errmsg(NEON_TAG "[shard %d, reqid %lx] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
slot->shard_no, resp->reqid, blockno, RelFileInfoFmt(rinfo),
forkNum, LSN_FORMAT_ARGS(reqlsns->effective_request_lsn)),
errdetail("page server returned error: %s",
@@ -2294,7 +2339,7 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *
relsize_resp->req.forknum != forknum)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(relsize_resp->req.rinfo), relsize_resp->req.forknum,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), forknum);
}
@@ -2307,14 +2352,14 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *
{
if (!equal_requests(resp, &request.hdr))
{
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
errmsg(NEON_TAG "[reqid %lx] could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
resp->reqid,
RelFileInfoFmt(rinfo),
forknum,
@@ -2364,7 +2409,7 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
dbsize_resp->req.dbNode != dbNode)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, dbNode=%u}",
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), dbsize_resp->req.dbNode,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), dbNode);
}
@@ -2377,14 +2422,14 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
{
if (!equal_requests(resp, &request.hdr))
{
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match get DB size request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read db size of db %u from page server at lsn %X/%08X",
errmsg(NEON_TAG "[reqid %lx] could not read db size of db %u from page server at lsn %X/%08X",
resp->reqid,
dbNode, LSN_FORMAT_ARGS(request_lsns->effective_request_lsn)),
errdetail("page server returned error: %s",
@@ -2455,7 +2500,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
slru_resp->req.segno != segno)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%lluu}",
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%lluu}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), slru_resp->req.kind, slru_resp->req.segno,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), kind, (unsigned long long) segno);
}
@@ -2469,14 +2514,14 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
{
if (!equal_requests(resp, &request.hdr))
{
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match get SLRU segment request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read SLRU %d segment %llu at lsn %X/%08X",
errmsg(NEON_TAG "[reqid %lx] could not read SLRU %d segment %llu at lsn %X/%08X",
resp->reqid,
kind,
(unsigned long long) segno,

View File

@@ -165,8 +165,4 @@ extern void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags);
extern TimeLineID GetWALInsertionTimeLine(void);
#endif
/* format codes not present in PG17-; but available in PG18+ */
#define INT64_HEX_FORMAT "%" INT64_MODIFIER "x"
#define UINT64_HEX_FORMAT "%" INT64_MODIFIER "x"
#endif /* NEON_PGVERSIONCOMPAT_H */

209
poetry.lock generated
View File

@@ -2,123 +2,127 @@
[[package]]
name = "aiohappyeyeballs"
version = "2.6.1"
version = "2.3.5"
description = "Happy Eyeballs for asyncio"
optional = false
python-versions = ">=3.9"
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "aiohappyeyeballs-2.6.1-py3-none-any.whl", hash = "sha256:f349ba8f4b75cb25c99c5c2d84e997e485204d2902a9597802b0371f09331fb8"},
{file = "aiohappyeyeballs-2.6.1.tar.gz", hash = "sha256:c3f9d0113123803ccadfdf3f0faa505bc78e6a72d1cc4806cbd719826e943558"},
{file = "aiohappyeyeballs-2.3.5-py3-none-any.whl", hash = "sha256:4d6dea59215537dbc746e93e779caea8178c866856a721c9c660d7a5a7b8be03"},
{file = "aiohappyeyeballs-2.3.5.tar.gz", hash = "sha256:6fa48b9f1317254f122a07a131a86b71ca6946ca989ce6326fff54a99a920105"},
]
[[package]]
name = "aiohttp"
version = "3.12.14"
version = "3.10.11"
description = "Async http client/server framework (asyncio)"
optional = false
python-versions = ">=3.9"
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "aiohttp-3.12.14-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:906d5075b5ba0dd1c66fcaaf60eb09926a9fef3ca92d912d2a0bbdbecf8b1248"},
{file = "aiohttp-3.12.14-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c875bf6fc2fd1a572aba0e02ef4e7a63694778c5646cdbda346ee24e630d30fb"},
{file = "aiohttp-3.12.14-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:fbb284d15c6a45fab030740049d03c0ecd60edad9cd23b211d7e11d3be8d56fd"},
{file = "aiohttp-3.12.14-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:38e360381e02e1a05d36b223ecab7bc4a6e7b5ab15760022dc92589ee1d4238c"},
{file = "aiohttp-3.12.14-cp310-cp310-manylinux_2_17_armv7l.manylinux2014_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:aaf90137b5e5d84a53632ad95ebee5c9e3e7468f0aab92ba3f608adcb914fa95"},
{file = "aiohttp-3.12.14-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e532a25e4a0a2685fa295a31acf65e027fbe2bea7a4b02cdfbbba8a064577663"},
{file = "aiohttp-3.12.14-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:eab9762c4d1b08ae04a6c77474e6136da722e34fdc0e6d6eab5ee93ac29f35d1"},
{file = "aiohttp-3.12.14-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:abe53c3812b2899889a7fca763cdfaeee725f5be68ea89905e4275476ffd7e61"},
{file = "aiohttp-3.12.14-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5760909b7080aa2ec1d320baee90d03b21745573780a072b66ce633eb77a8656"},
{file = "aiohttp-3.12.14-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:02fcd3f69051467bbaa7f84d7ec3267478c7df18d68b2e28279116e29d18d4f3"},
{file = "aiohttp-3.12.14-cp310-cp310-musllinux_1_2_armv7l.whl", hash = "sha256:4dcd1172cd6794884c33e504d3da3c35648b8be9bfa946942d353b939d5f1288"},
{file = "aiohttp-3.12.14-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:224d0da41355b942b43ad08101b1b41ce633a654128ee07e36d75133443adcda"},
{file = "aiohttp-3.12.14-cp310-cp310-musllinux_1_2_ppc64le.whl", hash = "sha256:e387668724f4d734e865c1776d841ed75b300ee61059aca0b05bce67061dcacc"},
{file = "aiohttp-3.12.14-cp310-cp310-musllinux_1_2_s390x.whl", hash = "sha256:dec9cde5b5a24171e0b0a4ca064b1414950904053fb77c707efd876a2da525d8"},
{file = "aiohttp-3.12.14-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:bbad68a2af4877cc103cd94af9160e45676fc6f0c14abb88e6e092b945c2c8e3"},
{file = "aiohttp-3.12.14-cp310-cp310-win32.whl", hash = "sha256:ee580cb7c00bd857b3039ebca03c4448e84700dc1322f860cf7a500a6f62630c"},
{file = "aiohttp-3.12.14-cp310-cp310-win_amd64.whl", hash = "sha256:cf4f05b8cea571e2ccc3ca744e35ead24992d90a72ca2cf7ab7a2efbac6716db"},
{file = "aiohttp-3.12.14-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:f4552ff7b18bcec18b60a90c6982049cdb9dac1dba48cf00b97934a06ce2e597"},
{file = "aiohttp-3.12.14-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:8283f42181ff6ccbcf25acaae4e8ab2ff7e92b3ca4a4ced73b2c12d8cd971393"},
{file = "aiohttp-3.12.14-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:040afa180ea514495aaff7ad34ec3d27826eaa5d19812730fe9e529b04bb2179"},
{file = "aiohttp-3.12.14-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b413c12f14c1149f0ffd890f4141a7471ba4b41234fe4fd4a0ff82b1dc299dbb"},
{file = "aiohttp-3.12.14-cp311-cp311-manylinux_2_17_armv7l.manylinux2014_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:1d6f607ce2e1a93315414e3d448b831238f1874b9968e1195b06efaa5c87e245"},
{file = "aiohttp-3.12.14-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:565e70d03e924333004ed101599902bba09ebb14843c8ea39d657f037115201b"},
{file = "aiohttp-3.12.14-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4699979560728b168d5ab63c668a093c9570af2c7a78ea24ca5212c6cdc2b641"},
{file = "aiohttp-3.12.14-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ad5fdf6af93ec6c99bf800eba3af9a43d8bfd66dce920ac905c817ef4a712afe"},
{file = "aiohttp-3.12.14-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4ac76627c0b7ee0e80e871bde0d376a057916cb008a8f3ffc889570a838f5cc7"},
{file = "aiohttp-3.12.14-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:798204af1180885651b77bf03adc903743a86a39c7392c472891649610844635"},
{file = "aiohttp-3.12.14-cp311-cp311-musllinux_1_2_armv7l.whl", hash = "sha256:4f1205f97de92c37dd71cf2d5bcfb65fdaed3c255d246172cce729a8d849b4da"},
{file = "aiohttp-3.12.14-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:76ae6f1dd041f85065d9df77c6bc9c9703da9b5c018479d20262acc3df97d419"},
{file = "aiohttp-3.12.14-cp311-cp311-musllinux_1_2_ppc64le.whl", hash = "sha256:a194ace7bc43ce765338ca2dfb5661489317db216ea7ea700b0332878b392cab"},
{file = "aiohttp-3.12.14-cp311-cp311-musllinux_1_2_s390x.whl", hash = "sha256:16260e8e03744a6fe3fcb05259eeab8e08342c4c33decf96a9dad9f1187275d0"},
{file = "aiohttp-3.12.14-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:8c779e5ebbf0e2e15334ea404fcce54009dc069210164a244d2eac8352a44b28"},
{file = "aiohttp-3.12.14-cp311-cp311-win32.whl", hash = "sha256:a289f50bf1bd5be227376c067927f78079a7bdeccf8daa6a9e65c38bae14324b"},
{file = "aiohttp-3.12.14-cp311-cp311-win_amd64.whl", hash = "sha256:0b8a69acaf06b17e9c54151a6c956339cf46db4ff72b3ac28516d0f7068f4ced"},
{file = "aiohttp-3.12.14-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:a0ecbb32fc3e69bc25efcda7d28d38e987d007096cbbeed04f14a6662d0eee22"},
{file = "aiohttp-3.12.14-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:0400f0ca9bb3e0b02f6466421f253797f6384e9845820c8b05e976398ac1d81a"},
{file = "aiohttp-3.12.14-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a56809fed4c8a830b5cae18454b7464e1529dbf66f71c4772e3cfa9cbec0a1ff"},
{file = "aiohttp-3.12.14-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:27f2e373276e4755691a963e5d11756d093e346119f0627c2d6518208483fb6d"},
{file = "aiohttp-3.12.14-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:ca39e433630e9a16281125ef57ece6817afd1d54c9f1bf32e901f38f16035869"},
{file = "aiohttp-3.12.14-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9c748b3f8b14c77720132b2510a7d9907a03c20ba80f469e58d5dfd90c079a1c"},
{file = "aiohttp-3.12.14-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f0a568abe1b15ce69d4cc37e23020720423f0728e3cb1f9bcd3f53420ec3bfe7"},
{file = "aiohttp-3.12.14-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9888e60c2c54eaf56704b17feb558c7ed6b7439bca1e07d4818ab878f2083660"},
{file = "aiohttp-3.12.14-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3006a1dc579b9156de01e7916d38c63dc1ea0679b14627a37edf6151bc530088"},
{file = "aiohttp-3.12.14-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:aa8ec5c15ab80e5501a26719eb48a55f3c567da45c6ea5bb78c52c036b2655c7"},
{file = "aiohttp-3.12.14-cp312-cp312-musllinux_1_2_armv7l.whl", hash = "sha256:39b94e50959aa07844c7fe2206b9f75d63cc3ad1c648aaa755aa257f6f2498a9"},
{file = "aiohttp-3.12.14-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:04c11907492f416dad9885d503fbfc5dcb6768d90cad8639a771922d584609d3"},
{file = "aiohttp-3.12.14-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:88167bd9ab69bb46cee91bd9761db6dfd45b6e76a0438c7e884c3f8160ff21eb"},
{file = "aiohttp-3.12.14-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:791504763f25e8f9f251e4688195e8b455f8820274320204f7eafc467e609425"},
{file = "aiohttp-3.12.14-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:2785b112346e435dd3a1a67f67713a3fe692d288542f1347ad255683f066d8e0"},
{file = "aiohttp-3.12.14-cp312-cp312-win32.whl", hash = "sha256:15f5f4792c9c999a31d8decf444e79fcfd98497bf98e94284bf390a7bb8c1729"},
{file = "aiohttp-3.12.14-cp312-cp312-win_amd64.whl", hash = "sha256:3b66e1a182879f579b105a80d5c4bd448b91a57e8933564bf41665064796a338"},
{file = "aiohttp-3.12.14-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:3143a7893d94dc82bc409f7308bc10d60285a3cd831a68faf1aa0836c5c3c767"},
{file = "aiohttp-3.12.14-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:3d62ac3d506cef54b355bd34c2a7c230eb693880001dfcda0bf88b38f5d7af7e"},
{file = "aiohttp-3.12.14-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:48e43e075c6a438937c4de48ec30fa8ad8e6dfef122a038847456bfe7b947b63"},
{file = "aiohttp-3.12.14-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:077b4488411a9724cecc436cbc8c133e0d61e694995b8de51aaf351c7578949d"},
{file = "aiohttp-3.12.14-cp313-cp313-manylinux_2_17_armv7l.manylinux2014_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:d8c35632575653f297dcbc9546305b2c1133391089ab925a6a3706dfa775ccab"},
{file = "aiohttp-3.12.14-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:6b8ce87963f0035c6834b28f061df90cf525ff7c9b6283a8ac23acee6502afd4"},
{file = "aiohttp-3.12.14-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f0a2cf66e32a2563bb0766eb24eae7e9a269ac0dc48db0aae90b575dc9583026"},
{file = "aiohttp-3.12.14-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cdea089caf6d5cde975084a884c72d901e36ef9c2fd972c9f51efbbc64e96fbd"},
{file = "aiohttp-3.12.14-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8a7865f27db67d49e81d463da64a59365ebd6b826e0e4847aa111056dcb9dc88"},
{file = "aiohttp-3.12.14-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:0ab5b38a6a39781d77713ad930cb5e7feea6f253de656a5f9f281a8f5931b086"},
{file = "aiohttp-3.12.14-cp313-cp313-musllinux_1_2_armv7l.whl", hash = "sha256:9b3b15acee5c17e8848d90a4ebc27853f37077ba6aec4d8cb4dbbea56d156933"},
{file = "aiohttp-3.12.14-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:e4c972b0bdaac167c1e53e16a16101b17c6d0ed7eac178e653a07b9f7fad7151"},
{file = "aiohttp-3.12.14-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:7442488b0039257a3bdbc55f7209587911f143fca11df9869578db6c26feeeb8"},
{file = "aiohttp-3.12.14-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:f68d3067eecb64c5e9bab4a26aa11bd676f4c70eea9ef6536b0a4e490639add3"},
{file = "aiohttp-3.12.14-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:f88d3704c8b3d598a08ad17d06006cb1ca52a1182291f04979e305c8be6c9758"},
{file = "aiohttp-3.12.14-cp313-cp313-win32.whl", hash = "sha256:a3c99ab19c7bf375c4ae3debd91ca5d394b98b6089a03231d4c580ef3c2ae4c5"},
{file = "aiohttp-3.12.14-cp313-cp313-win_amd64.whl", hash = "sha256:3f8aad695e12edc9d571f878c62bedc91adf30c760c8632f09663e5f564f4baa"},
{file = "aiohttp-3.12.14-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:b8cc6b05e94d837bcd71c6531e2344e1ff0fb87abe4ad78a9261d67ef5d83eae"},
{file = "aiohttp-3.12.14-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:d1dcb015ac6a3b8facd3677597edd5ff39d11d937456702f0bb2b762e390a21b"},
{file = "aiohttp-3.12.14-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3779ed96105cd70ee5e85ca4f457adbce3d9ff33ec3d0ebcdf6c5727f26b21b3"},
{file = "aiohttp-3.12.14-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:717a0680729b4ebd7569c1dcd718c46b09b360745fd8eb12317abc74b14d14d0"},
{file = "aiohttp-3.12.14-cp39-cp39-manylinux_2_17_armv7l.manylinux2014_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:b5dd3a2ef7c7e968dbbac8f5574ebeac4d2b813b247e8cec28174a2ba3627170"},
{file = "aiohttp-3.12.14-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:4710f77598c0092239bc12c1fcc278a444e16c7032d91babf5abbf7166463f7b"},
{file = "aiohttp-3.12.14-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f3e9f75ae842a6c22a195d4a127263dbf87cbab729829e0bd7857fb1672400b2"},
{file = "aiohttp-3.12.14-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5f9c8d55d6802086edd188e3a7d85a77787e50d56ce3eb4757a3205fa4657922"},
{file = "aiohttp-3.12.14-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:79b29053ff3ad307880d94562cca80693c62062a098a5776ea8ef5ef4b28d140"},
{file = "aiohttp-3.12.14-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:23e1332fff36bebd3183db0c7a547a1da9d3b4091509f6d818e098855f2f27d3"},
{file = "aiohttp-3.12.14-cp39-cp39-musllinux_1_2_armv7l.whl", hash = "sha256:a564188ce831fd110ea76bcc97085dd6c625b427db3f1dbb14ca4baa1447dcbc"},
{file = "aiohttp-3.12.14-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:a7a1b4302f70bb3ec40ca86de82def532c97a80db49cac6a6700af0de41af5ee"},
{file = "aiohttp-3.12.14-cp39-cp39-musllinux_1_2_ppc64le.whl", hash = "sha256:1b07ccef62950a2519f9bfc1e5b294de5dd84329f444ca0b329605ea787a3de5"},
{file = "aiohttp-3.12.14-cp39-cp39-musllinux_1_2_s390x.whl", hash = "sha256:938bd3ca6259e7e48b38d84f753d548bd863e0c222ed6ee6ace3fd6752768a84"},
{file = "aiohttp-3.12.14-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:8bc784302b6b9f163b54c4e93d7a6f09563bd01ff2b841b29ed3ac126e5040bf"},
{file = "aiohttp-3.12.14-cp39-cp39-win32.whl", hash = "sha256:a3416f95961dd7d5393ecff99e3f41dc990fb72eda86c11f2a60308ac6dcd7a0"},
{file = "aiohttp-3.12.14-cp39-cp39-win_amd64.whl", hash = "sha256:196858b8820d7f60578f8b47e5669b3195c21d8ab261e39b1d705346458f445f"},
{file = "aiohttp-3.12.14.tar.gz", hash = "sha256:6e06e120e34d93100de448fd941522e11dafa78ef1a893c179901b7d66aa29f2"},
{file = "aiohttp-3.10.11-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:5077b1a5f40ffa3ba1f40d537d3bec4383988ee51fbba6b74aa8fb1bc466599e"},
{file = "aiohttp-3.10.11-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:8d6a14a4d93b5b3c2891fca94fa9d41b2322a68194422bef0dd5ec1e57d7d298"},
{file = "aiohttp-3.10.11-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ffbfde2443696345e23a3c597049b1dd43049bb65337837574205e7368472177"},
{file = "aiohttp-3.10.11-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:20b3d9e416774d41813bc02fdc0663379c01817b0874b932b81c7f777f67b217"},
{file = "aiohttp-3.10.11-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2b943011b45ee6bf74b22245c6faab736363678e910504dd7531a58c76c9015a"},
{file = "aiohttp-3.10.11-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48bc1d924490f0d0b3658fe5c4b081a4d56ebb58af80a6729d4bd13ea569797a"},
{file = "aiohttp-3.10.11-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e12eb3f4b1f72aaaf6acd27d045753b18101524f72ae071ae1c91c1cd44ef115"},
{file = "aiohttp-3.10.11-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f14ebc419a568c2eff3c1ed35f634435c24ead2fe19c07426af41e7adb68713a"},
{file = "aiohttp-3.10.11-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:72b191cdf35a518bfc7ca87d770d30941decc5aaf897ec8b484eb5cc8c7706f3"},
{file = "aiohttp-3.10.11-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:5ab2328a61fdc86424ee540d0aeb8b73bbcad7351fb7cf7a6546fc0bcffa0038"},
{file = "aiohttp-3.10.11-cp310-cp310-musllinux_1_2_ppc64le.whl", hash = "sha256:aa93063d4af05c49276cf14e419550a3f45258b6b9d1f16403e777f1addf4519"},
{file = "aiohttp-3.10.11-cp310-cp310-musllinux_1_2_s390x.whl", hash = "sha256:30283f9d0ce420363c24c5c2421e71a738a2155f10adbb1a11a4d4d6d2715cfc"},
{file = "aiohttp-3.10.11-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:e5358addc8044ee49143c546d2182c15b4ac3a60be01c3209374ace05af5733d"},
{file = "aiohttp-3.10.11-cp310-cp310-win32.whl", hash = "sha256:e1ffa713d3ea7cdcd4aea9cddccab41edf6882fa9552940344c44e59652e1120"},
{file = "aiohttp-3.10.11-cp310-cp310-win_amd64.whl", hash = "sha256:778cbd01f18ff78b5dd23c77eb82987ee4ba23408cbed233009fd570dda7e674"},
{file = "aiohttp-3.10.11-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:80ff08556c7f59a7972b1e8919f62e9c069c33566a6d28586771711e0eea4f07"},
{file = "aiohttp-3.10.11-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:2c8f96e9ee19f04c4914e4e7a42a60861066d3e1abf05c726f38d9d0a466e695"},
{file = "aiohttp-3.10.11-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:fb8601394d537da9221947b5d6e62b064c9a43e88a1ecd7414d21a1a6fba9c24"},
{file = "aiohttp-3.10.11-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2ea224cf7bc2d8856d6971cea73b1d50c9c51d36971faf1abc169a0d5f85a382"},
{file = "aiohttp-3.10.11-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:db9503f79e12d5d80b3efd4d01312853565c05367493379df76d2674af881caa"},
{file = "aiohttp-3.10.11-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0f449a50cc33f0384f633894d8d3cd020e3ccef81879c6e6245c3c375c448625"},
{file = "aiohttp-3.10.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:82052be3e6d9e0c123499127782a01a2b224b8af8c62ab46b3f6197035ad94e9"},
{file = "aiohttp-3.10.11-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:20063c7acf1eec550c8eb098deb5ed9e1bb0521613b03bb93644b810986027ac"},
{file = "aiohttp-3.10.11-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:489cced07a4c11488f47aab1f00d0c572506883f877af100a38f1fedaa884c3a"},
{file = "aiohttp-3.10.11-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:ea9b3bab329aeaa603ed3bf605f1e2a6f36496ad7e0e1aa42025f368ee2dc07b"},
{file = "aiohttp-3.10.11-cp311-cp311-musllinux_1_2_ppc64le.whl", hash = "sha256:ca117819d8ad113413016cb29774b3f6d99ad23c220069789fc050267b786c16"},
{file = "aiohttp-3.10.11-cp311-cp311-musllinux_1_2_s390x.whl", hash = "sha256:2dfb612dcbe70fb7cdcf3499e8d483079b89749c857a8f6e80263b021745c730"},
{file = "aiohttp-3.10.11-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:f9b615d3da0d60e7d53c62e22b4fd1c70f4ae5993a44687b011ea3a2e49051b8"},
{file = "aiohttp-3.10.11-cp311-cp311-win32.whl", hash = "sha256:29103f9099b6068bbdf44d6a3d090e0a0b2be6d3c9f16a070dd9d0d910ec08f9"},
{file = "aiohttp-3.10.11-cp311-cp311-win_amd64.whl", hash = "sha256:236b28ceb79532da85d59aa9b9bf873b364e27a0acb2ceaba475dc61cffb6f3f"},
{file = "aiohttp-3.10.11-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:7480519f70e32bfb101d71fb9a1f330fbd291655a4c1c922232a48c458c52710"},
{file = "aiohttp-3.10.11-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:f65267266c9aeb2287a6622ee2bb39490292552f9fbf851baabc04c9f84e048d"},
{file = "aiohttp-3.10.11-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:7400a93d629a0608dc1d6c55f1e3d6e07f7375745aaa8bd7f085571e4d1cee97"},
{file = "aiohttp-3.10.11-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f34b97e4b11b8d4eb2c3a4f975be626cc8af99ff479da7de49ac2c6d02d35725"},
{file = "aiohttp-3.10.11-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1e7b825da878464a252ccff2958838f9caa82f32a8dbc334eb9b34a026e2c636"},
{file = "aiohttp-3.10.11-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f9f92a344c50b9667827da308473005f34767b6a2a60d9acff56ae94f895f385"},
{file = "aiohttp-3.10.11-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc6f1ab987a27b83c5268a17218463c2ec08dbb754195113867a27b166cd6087"},
{file = "aiohttp-3.10.11-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1dc0f4ca54842173d03322793ebcf2c8cc2d34ae91cc762478e295d8e361e03f"},
{file = "aiohttp-3.10.11-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:7ce6a51469bfaacff146e59e7fb61c9c23006495d11cc24c514a455032bcfa03"},
{file = "aiohttp-3.10.11-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:aad3cd91d484d065ede16f3cf15408254e2469e3f613b241a1db552c5eb7ab7d"},
{file = "aiohttp-3.10.11-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:f4df4b8ca97f658c880fb4b90b1d1ec528315d4030af1ec763247ebfd33d8b9a"},
{file = "aiohttp-3.10.11-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:2e4e18a0a2d03531edbc06c366954e40a3f8d2a88d2b936bbe78a0c75a3aab3e"},
{file = "aiohttp-3.10.11-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:6ce66780fa1a20e45bc753cda2a149daa6dbf1561fc1289fa0c308391c7bc0a4"},
{file = "aiohttp-3.10.11-cp312-cp312-win32.whl", hash = "sha256:a919c8957695ea4c0e7a3e8d16494e3477b86f33067478f43106921c2fef15bb"},
{file = "aiohttp-3.10.11-cp312-cp312-win_amd64.whl", hash = "sha256:b5e29706e6389a2283a91611c91bf24f218962717c8f3b4e528ef529d112ee27"},
{file = "aiohttp-3.10.11-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:703938e22434d7d14ec22f9f310559331f455018389222eed132808cd8f44127"},
{file = "aiohttp-3.10.11-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:9bc50b63648840854e00084c2b43035a62e033cb9b06d8c22b409d56eb098413"},
{file = "aiohttp-3.10.11-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:5f0463bf8b0754bc744e1feb61590706823795041e63edf30118a6f0bf577461"},
{file = "aiohttp-3.10.11-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f6c6dec398ac5a87cb3a407b068e1106b20ef001c344e34154616183fe684288"},
{file = "aiohttp-3.10.11-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:bcaf2d79104d53d4dcf934f7ce76d3d155302d07dae24dff6c9fffd217568067"},
{file = "aiohttp-3.10.11-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:25fd5470922091b5a9aeeb7e75be609e16b4fba81cdeaf12981393fb240dd10e"},
{file = "aiohttp-3.10.11-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bbde2ca67230923a42161b1f408c3992ae6e0be782dca0c44cb3206bf330dee1"},
{file = "aiohttp-3.10.11-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:249c8ff8d26a8b41a0f12f9df804e7c685ca35a207e2410adbd3e924217b9006"},
{file = "aiohttp-3.10.11-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:878ca6a931ee8c486a8f7b432b65431d095c522cbeb34892bee5be97b3481d0f"},
{file = "aiohttp-3.10.11-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:8663f7777ce775f0413324be0d96d9730959b2ca73d9b7e2c2c90539139cbdd6"},
{file = "aiohttp-3.10.11-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:6cd3f10b01f0c31481fba8d302b61603a2acb37b9d30e1d14e0f5a58b7b18a31"},
{file = "aiohttp-3.10.11-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:4e8d8aad9402d3aa02fdc5ca2fe68bcb9fdfe1f77b40b10410a94c7f408b664d"},
{file = "aiohttp-3.10.11-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:38e3c4f80196b4f6c3a85d134a534a56f52da9cb8d8e7af1b79a32eefee73a00"},
{file = "aiohttp-3.10.11-cp313-cp313-win32.whl", hash = "sha256:fc31820cfc3b2863c6e95e14fcf815dc7afe52480b4dc03393c4873bb5599f71"},
{file = "aiohttp-3.10.11-cp313-cp313-win_amd64.whl", hash = "sha256:4996ff1345704ffdd6d75fb06ed175938c133425af616142e7187f28dc75f14e"},
{file = "aiohttp-3.10.11-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:74baf1a7d948b3d640badeac333af581a367ab916b37e44cf90a0334157cdfd2"},
{file = "aiohttp-3.10.11-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:473aebc3b871646e1940c05268d451f2543a1d209f47035b594b9d4e91ce8339"},
{file = "aiohttp-3.10.11-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:c2f746a6968c54ab2186574e15c3f14f3e7f67aef12b761e043b33b89c5b5f95"},
{file = "aiohttp-3.10.11-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d110cabad8360ffa0dec8f6ec60e43286e9d251e77db4763a87dcfe55b4adb92"},
{file = "aiohttp-3.10.11-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e0099c7d5d7afff4202a0c670e5b723f7718810000b4abcbc96b064129e64bc7"},
{file = "aiohttp-3.10.11-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0316e624b754dbbf8c872b62fe6dcb395ef20c70e59890dfa0de9eafccd2849d"},
{file = "aiohttp-3.10.11-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5a5f7ab8baf13314e6b2485965cbacb94afff1e93466ac4d06a47a81c50f9cca"},
{file = "aiohttp-3.10.11-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c891011e76041e6508cbfc469dd1a8ea09bc24e87e4c204e05f150c4c455a5fa"},
{file = "aiohttp-3.10.11-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:9208299251370ee815473270c52cd3f7069ee9ed348d941d574d1457d2c73e8b"},
{file = "aiohttp-3.10.11-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:459f0f32c8356e8125f45eeff0ecf2b1cb6db1551304972702f34cd9e6c44658"},
{file = "aiohttp-3.10.11-cp38-cp38-musllinux_1_2_ppc64le.whl", hash = "sha256:14cdc8c1810bbd4b4b9f142eeee23cda528ae4e57ea0923551a9af4820980e39"},
{file = "aiohttp-3.10.11-cp38-cp38-musllinux_1_2_s390x.whl", hash = "sha256:971aa438a29701d4b34e4943e91b5e984c3ae6ccbf80dd9efaffb01bd0b243a9"},
{file = "aiohttp-3.10.11-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:9a309c5de392dfe0f32ee57fa43ed8fc6ddf9985425e84bd51ed66bb16bce3a7"},
{file = "aiohttp-3.10.11-cp38-cp38-win32.whl", hash = "sha256:9ec1628180241d906a0840b38f162a3215114b14541f1a8711c368a8739a9be4"},
{file = "aiohttp-3.10.11-cp38-cp38-win_amd64.whl", hash = "sha256:9c6e0ffd52c929f985c7258f83185d17c76d4275ad22e90aa29f38e211aacbec"},
{file = "aiohttp-3.10.11-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:cdc493a2e5d8dc79b2df5bec9558425bcd39aff59fc949810cbd0832e294b106"},
{file = "aiohttp-3.10.11-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:b3e70f24e7d0405be2348da9d5a7836936bf3a9b4fd210f8c37e8d48bc32eca6"},
{file = "aiohttp-3.10.11-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:968b8fb2a5eee2770eda9c7b5581587ef9b96fbdf8dcabc6b446d35ccc69df01"},
{file = "aiohttp-3.10.11-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:deef4362af9493d1382ef86732ee2e4cbc0d7c005947bd54ad1a9a16dd59298e"},
{file = "aiohttp-3.10.11-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:686b03196976e327412a1b094f4120778c7c4b9cff9bce8d2fdfeca386b89829"},
{file = "aiohttp-3.10.11-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3bf6d027d9d1d34e1c2e1645f18a6498c98d634f8e373395221121f1c258ace8"},
{file = "aiohttp-3.10.11-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:099fd126bf960f96d34a760e747a629c27fb3634da5d05c7ef4d35ef4ea519fc"},
{file = "aiohttp-3.10.11-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c73c4d3dae0b4644bc21e3de546530531d6cdc88659cdeb6579cd627d3c206aa"},
{file = "aiohttp-3.10.11-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:0c5580f3c51eea91559db3facd45d72e7ec970b04528b4709b1f9c2555bd6d0b"},
{file = "aiohttp-3.10.11-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:fdf6429f0caabfd8a30c4e2eaecb547b3c340e4730ebfe25139779b9815ba138"},
{file = "aiohttp-3.10.11-cp39-cp39-musllinux_1_2_ppc64le.whl", hash = "sha256:d97187de3c276263db3564bb9d9fad9e15b51ea10a371ffa5947a5ba93ad6777"},
{file = "aiohttp-3.10.11-cp39-cp39-musllinux_1_2_s390x.whl", hash = "sha256:0acafb350cfb2eba70eb5d271f55e08bd4502ec35e964e18ad3e7d34d71f7261"},
{file = "aiohttp-3.10.11-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:c13ed0c779911c7998a58e7848954bd4d63df3e3575f591e321b19a2aec8df9f"},
{file = "aiohttp-3.10.11-cp39-cp39-win32.whl", hash = "sha256:22b7c540c55909140f63ab4f54ec2c20d2635c0289cdd8006da46f3327f971b9"},
{file = "aiohttp-3.10.11-cp39-cp39-win_amd64.whl", hash = "sha256:7b26b1551e481012575dab8e3727b16fe7dd27eb2711d2e63ced7368756268fb"},
{file = "aiohttp-3.10.11.tar.gz", hash = "sha256:9dc2b8f3dcab2e39e0fa309c8da50c3b55e6f34ab25f1a71d3288f24924d33a7"},
]
[package.dependencies]
aiohappyeyeballs = ">=2.5.0"
aiosignal = ">=1.4.0"
aiohappyeyeballs = ">=2.3.0"
aiosignal = ">=1.1.2"
attrs = ">=17.3.0"
frozenlist = ">=1.1.1"
multidict = ">=4.5,<7.0"
propcache = ">=0.2.0"
yarl = ">=1.17.0,<2.0"
yarl = ">=1.12.0,<2.0"
[package.extras]
speedups = ["Brotli ; platform_python_implementation == \"CPython\"", "aiodns (>=3.3.0)", "brotlicffi ; platform_python_implementation != \"CPython\""]
speedups = ["Brotli ; platform_python_implementation == \"CPython\"", "aiodns (>=3.2.0) ; sys_platform == \"linux\" or sys_platform == \"darwin\"", "brotlicffi ; platform_python_implementation != \"CPython\""]
[[package]]
name = "aiopg"
@@ -141,19 +145,18 @@ sa = ["sqlalchemy[postgresql-psycopg2binary] (>=1.3,<1.5)"]
[[package]]
name = "aiosignal"
version = "1.4.0"
version = "1.3.1"
description = "aiosignal: a list of registered asynchronous callbacks"
optional = false
python-versions = ">=3.9"
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "aiosignal-1.4.0-py3-none-any.whl", hash = "sha256:053243f8b92b990551949e63930a839ff0cf0b0ebbe0597b0f3fb19e1a0fe82e"},
{file = "aiosignal-1.4.0.tar.gz", hash = "sha256:f47eecd9468083c2029cc99945502cb7708b082c232f9aca65da147157b251c7"},
{file = "aiosignal-1.3.1-py3-none-any.whl", hash = "sha256:f8376fb07dd1e86a584e4fcdec80b36b7f81aac666ebc724e2c090300dd83b17"},
{file = "aiosignal-1.3.1.tar.gz", hash = "sha256:54cd96e15e1649b75d6c87526a6ff0b6c1b0dd3459f43d9ca11d48c339b68cfc"},
]
[package.dependencies]
frozenlist = ">=1.1.0"
typing-extensions = {version = ">=4.2", markers = "python_version < \"3.13\""}
[[package]]
name = "allure-pytest"
@@ -3844,4 +3847,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "6a1e8ba06b8194bf28d87fd5e184e2ddc2b4a19dffcbe3953b26da3d55c9212f"
content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5"

View File

@@ -63,19 +63,16 @@ impl<P: QueueProcessing> BatchQueue<P> {
}
}
pub fn enqueue(&self, req: P::Req) -> (u64, oneshot::Receiver<ProcResult<P>>) {
self.inner.lock_propagate_poison().register_job(req)
}
/// Perform a single request-response process, this may be batched internally.
///
/// This function is not cancel safe.
pub async fn call<R>(
&self,
id: u64,
mut rx: oneshot::Receiver<ProcResult<P>>,
req: P::Req,
cancelled: impl Future<Output = R>,
) -> Result<P::Res, BatchQueueError<P::Err, R>> {
let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req);
let mut cancelled = pin!(cancelled);
let resp: Option<Result<P::Res, P::Err>> = loop {
// try become the leader, or try wait for success.

View File

@@ -14,8 +14,8 @@ use std::time::{Duration, Instant};
use hashlink::{LruCache, linked_hash_map::RawEntryMut};
use tracing::debug;
use super::Cache;
use super::common::Cached;
use super::{Cache, timed_lru};
/// An implementation of timed LRU cache with fixed capacity.
/// Key properties:
@@ -30,7 +30,7 @@ use super::common::Cached;
///
/// * There's an API for immediate invalidation (removal) of a cache entry;
/// It's useful in case we know for sure that the entry is no longer correct.
/// See [`Cached`] for more information.
/// See [`timed_lru::Cached`] for more information.
///
/// * Expired entries are kept in the cache, until they are evicted by the LRU policy,
/// or by a successful lookup (i.e. the entry hasn't expired yet).
@@ -217,18 +217,15 @@ impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
}
impl<K: Hash + Eq, V: Clone> TimedLru<K, V> {
/// Retrieve a cached entry in convenient wrapper, alongside timing information.
pub(crate) fn get_with_created_at<Q>(
&self,
key: &Q,
) -> Option<Cached<&Self, (<Self as Cache>::Value, Instant)>>
/// Retrieve a cached entry in convenient wrapper.
pub(crate) fn get<Q>(&self, key: &Q) -> Option<timed_lru::Cached<&Self>>
where
K: Borrow<Q> + Clone,
Q: Hash + Eq + ?Sized,
{
self.get_raw(key, |key, entry| Cached {
token: Some((self, key.clone())),
value: (entry.value.clone(), entry.created_at),
value: entry.value.clone(),
})
}
}

View File

@@ -6,7 +6,6 @@ use std::time::Duration;
use futures::FutureExt;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use metrics::MeasuredCounterPairGuard;
use postgres_client::RawCancelToken;
use postgres_client::tls::MakeTlsConnect;
use redis::{Cmd, FromRedisValue, SetExpiry, SetOptions, Value};
@@ -24,59 +23,30 @@ use crate::context::RequestContext;
use crate::control_plane::ControlPlaneApi;
use crate::error::ReportableError;
use crate::ext::LockExt;
use crate::metrics::{
CancelChannelSizeGauge, CancelChannelSizeGuard, CancellationRequest, Metrics, RedisMsgKind,
};
use crate::metrics::{CancelChannelSizeGuard, CancellationRequest, Metrics, RedisMsgKind};
use crate::pqproto::CancelKeyData;
use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::keys::KeyPrefix;
use crate::redis::kv_ops::{RedisKVClient, RedisKVClientError};
use crate::util::run_until;
type IpSubnetKey = IpNet;
const CANCEL_KEY_TTL: Duration = Duration::from_secs(600);
const CANCEL_KEY_REFRESH: Duration = Duration::from_secs(570);
const CANCEL_KEY_TTL: std::time::Duration = std::time::Duration::from_secs(600);
const CANCEL_KEY_REFRESH: std::time::Duration = std::time::Duration::from_secs(570);
// Message types for sending through mpsc channel
pub enum CancelKeyOp {
Store {
StoreCancelKey {
key: CancelKeyData,
value: Box<str>,
expire: Duration,
expire: std::time::Duration,
},
Refresh {
key: CancelKeyData,
expire: Duration,
},
Get {
GetCancelData {
key: CancelKeyData,
},
GetOld {
GetCancelDataOld {
key: CancelKeyData,
},
Delete {
key: CancelKeyData,
},
}
impl CancelKeyOp {
fn redis_msg_kind(&self) -> RedisMsgKind {
match self {
CancelKeyOp::Store { .. } => RedisMsgKind::Set,
CancelKeyOp::Refresh { .. } => RedisMsgKind::Expire,
CancelKeyOp::Get { .. } => RedisMsgKind::Get,
CancelKeyOp::GetOld { .. } => RedisMsgKind::HGet,
CancelKeyOp::Delete { .. } => RedisMsgKind::Unlink,
}
}
fn metric_guard(&self) -> MeasuredCounterPairGuard<'static, CancelChannelSizeGauge> {
Metrics::get()
.proxy
.cancel_channel_size
.guard(self.redis_msg_kind())
}
}
#[derive(thiserror::Error, Debug, Clone)]
@@ -132,16 +102,12 @@ impl Pipeline {
self.inner.add_command(cmd);
self.replies += 1;
}
fn add_command_ignore_reply(&mut self, cmd: Cmd) {
self.inner.add_command(cmd).ignore();
}
}
impl CancelKeyOp {
fn register(&self, pipe: &mut Pipeline) {
match self {
CancelKeyOp::Store { key, value, expire } => {
CancelKeyOp::StoreCancelKey { key, value, expire } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command(Cmd::set_options(
&key,
@@ -149,22 +115,14 @@ impl CancelKeyOp {
SetOptions::default().with_expiration(SetExpiry::EX(expire.as_secs())),
));
}
CancelKeyOp::Refresh { key, expire } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command(Cmd::expire(&key, expire.as_secs() as i64));
}
CancelKeyOp::GetOld { key } => {
CancelKeyOp::GetCancelDataOld { key } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command(Cmd::hget(key, "data"));
}
CancelKeyOp::Get { key } => {
CancelKeyOp::GetCancelData { key } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command(Cmd::get(key));
}
CancelKeyOp::Delete { key } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command_ignore_reply(Cmd::unlink(key));
}
}
}
}
@@ -301,11 +259,14 @@ impl CancellationHandler {
return Err(CancelError::InternalError);
};
let op = CancelKeyOp::Get { key };
let (id, rx) = tx.enqueue((op.metric_guard(), op));
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Get);
let op = CancelKeyOp::GetCancelData { key };
let result = timeout(
TIMEOUT,
tx.call(id, rx, std::future::pending::<Infallible>()),
tx.call((guard, op), std::future::pending::<Infallible>()),
)
.await
.map_err(|_| {
@@ -323,11 +284,14 @@ impl CancellationHandler {
&& let Some(errcode) = err.code()
&& errcode == "WRONGTYPE"
{
let op = CancelKeyOp::GetOld { key };
let (id, rx) = tx.enqueue((op.metric_guard(), op));
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::HGet);
let op = CancelKeyOp::GetCancelDataOld { key };
timeout(
TIMEOUT,
tx.call(id, rx, std::future::pending::<Infallible>()),
tx.call((guard, op), std::future::pending::<Infallible>()),
)
.await
.map_err(|_| {
@@ -509,105 +473,39 @@ impl Session {
let mut cancel = pin!(cancel);
#[derive(Copy, Clone, PartialEq, Eq)]
enum State {
Set,
Refresh,
Delete,
}
let mut state = State::Set;
loop {
let op = match state {
State::Set => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"registering cancellation key"
);
CancelKeyOp::Store {
key: self.key,
value: closure_json.clone(),
expire: CANCEL_KEY_TTL,
}
}
State::Refresh => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"refreshing cancellation key"
);
CancelKeyOp::Refresh {
key: self.key,
expire: CANCEL_KEY_TTL,
}
}
State::Delete => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"deleting cancellation key"
);
CancelKeyOp::Delete { key: self.key }
}
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Set);
let op = CancelKeyOp::StoreCancelKey {
key: self.key,
value: closure_json.clone(),
expire: CANCEL_KEY_TTL,
};
let (id, rx) = tx.enqueue((op.metric_guard(), op));
if state == State::Delete {
// The key deletion is just best effort. We enqueue the command,
// but don't drive the queue and wait for a response.
break;
}
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"registering cancellation key"
);
match tx.call(id, rx, cancel.as_mut()).await {
// SET returns OK
Ok(Value::Okay) => {
match tx.call((guard, op), cancel.as_mut()).await {
Ok(_) => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"registered cancellation key"
);
state = State::Refresh;
}
// EXPIRE returns 1
Ok(Value::Int(1)) => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"refreshed cancellation key"
);
// wait before continuing.
tokio::time::sleep(CANCEL_KEY_REFRESH).await;
}
Ok(_) => {
// Any other response likely means the key expired.
tracing::warn!(src=%self.key, "refreshing cancellation key failed");
// Re-enter the SET loop to repush full data.
state = State::Set;
}
// retry immediately.
Err(BatchQueueError::Result(error)) => {
tracing::warn!(?error, "error refreshing cancellation key");
// Small delay to prevent busy loop with high cpu and logging.
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
tracing::warn!(?error, "error registering cancellation key");
}
Err(BatchQueueError::Cancelled(Err(_cancelled))) => {
state = State::Delete;
continue;
}
}
// wait before continuing. break immediately if cancelled.
if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut())
.await
.is_err()
{
state = State::Delete;
Err(BatchQueueError::Cancelled(Err(_cancelled))) => break,
}
}

View File

@@ -23,13 +23,12 @@ use crate::control_plane::errors::{
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
};
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse};
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason};
use crate::control_plane::{
AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo,
RoleAccessControl,
};
use crate::metrics::Metrics;
use crate::proxy::retry::CouldRetry;
use crate::rate_limiter::WakeComputeRateLimiter;
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
use crate::{compute, http, scram};
@@ -383,31 +382,16 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
macro_rules! check_cache {
() => {
if let Some(cached) = self.caches.node_info.get_with_created_at(&key) {
let (cached, (info, created_at)) = cached.take_value();
return match info {
Err(mut msg) => {
info!(key = &*key, "found cached wake_compute error");
if let Some(cached) = self.caches.node_info.get(&key) {
let (cached, info) = cached.take_value();
let info = info.map_err(|c| {
info!(key = &*key, "found cached wake_compute error");
WakeComputeError::ControlPlane(ControlPlaneError::Message(Box::new(*c)))
})?;
// if retry_delay_ms is set, reduce it by the amount of time it spent in cache
if let Some(status) = &mut msg.status {
if let Some(retry_info) = &mut status.details.retry_info {
retry_info.retry_delay_ms = retry_info
.retry_delay_ms
.saturating_sub(created_at.elapsed().as_millis() as u64)
}
}
Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
msg,
)))
}
Ok(info) => {
debug!(key = &*key, "found cached compute node info");
ctx.set_project(info.aux.clone());
Ok(cached.map(|()| info))
}
};
debug!(key = &*key, "found cached compute node info");
ctx.set_project(info.aux.clone());
return Ok(cached.map(|()| info));
}
};
}
@@ -450,29 +434,42 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
Ok(cached.map(|()| node))
}
Err(err) => match err {
WakeComputeError::ControlPlane(ControlPlaneError::Message(ref msg)) => {
let retry_info = msg.status.as_ref().and_then(|s| s.details.retry_info);
WakeComputeError::ControlPlane(ControlPlaneError::Message(err)) => {
let Some(status) = &err.status else {
return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)));
};
// If we can retry this error, do not cache it,
// unless we were given a retry delay.
if msg.could_retry() && retry_info.is_none() {
return Err(err);
let reason = status
.details
.error_info
.map_or(Reason::Unknown, |x| x.reason);
// if we can retry this error, do not cache it.
if reason.can_retry() {
return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)));
}
// at this point, we should only have quota errors.
debug!(
key = &*key,
"created a cache entry for the wake compute error"
);
let ttl = retry_info.map_or(Duration::from_secs(30), |r| {
Duration::from_millis(r.retry_delay_ms)
});
self.caches.node_info.insert_ttl(
key,
Err(err.clone()),
Duration::from_secs(30),
);
self.caches.node_info.insert_ttl(key, Err(msg.clone()), ttl);
Err(err)
Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)))
}
err => Err(err),
err => return Err(err),
},
}
}

View File

@@ -43,35 +43,28 @@ impl UserFacingError for ControlPlaneError {
}
impl ReportableError for ControlPlaneError {
fn get_error_kind(&self) -> ErrorKind {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
ControlPlaneError::Message(e) => match e.get_reason() {
Reason::RoleProtected
| Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::EndpointDisabled
| Reason::BranchNotFound
| Reason::InvalidEphemeralEndpointOptions => ErrorKind::User,
Reason::RoleProtected => ErrorKind::User,
Reason::ResourceNotFound => ErrorKind::User,
Reason::ProjectNotFound => ErrorKind::User,
Reason::EndpointNotFound => ErrorKind::User,
Reason::BranchNotFound => ErrorKind::User,
Reason::RateLimitExceeded => ErrorKind::ServiceRateLimit,
Reason::NonDefaultBranchComputeTimeExceeded
| Reason::ActiveTimeQuotaExceeded
| Reason::ComputeTimeQuotaExceeded
| Reason::WrittenDataQuotaExceeded
| Reason::DataTransferQuotaExceeded
| Reason::LogicalSizeQuotaExceeded
| Reason::ActiveEndpointsLimitExceeded => ErrorKind::Quota,
Reason::ConcurrencyLimitReached
| Reason::LockAlreadyTaken
| Reason::RunningOperations
| Reason::EndpointIdle
| Reason::ProjectUnderMaintenance
| Reason::Unknown => ErrorKind::ControlPlane,
Reason::NonDefaultBranchComputeTimeExceeded => ErrorKind::Quota,
Reason::ActiveTimeQuotaExceeded => ErrorKind::Quota,
Reason::ComputeTimeQuotaExceeded => ErrorKind::Quota,
Reason::WrittenDataQuotaExceeded => ErrorKind::Quota,
Reason::DataTransferQuotaExceeded => ErrorKind::Quota,
Reason::LogicalSizeQuotaExceeded => ErrorKind::Quota,
Reason::ConcurrencyLimitReached => ErrorKind::ControlPlane,
Reason::LockAlreadyTaken => ErrorKind::ControlPlane,
Reason::RunningOperations => ErrorKind::ControlPlane,
Reason::ActiveEndpointsLimitExceeded => ErrorKind::ControlPlane,
Reason::Unknown => ErrorKind::ControlPlane,
},
ControlPlaneError::Transport(_) => ErrorKind::ControlPlane,
ControlPlaneError::Transport(_) => crate::error::ErrorKind::ControlPlane,
}
}
}
@@ -127,10 +120,10 @@ impl UserFacingError for GetAuthInfoError {
}
impl ReportableError for GetAuthInfoError {
fn get_error_kind(&self) -> ErrorKind {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
Self::BadSecret => ErrorKind::ControlPlane,
Self::ApiError(_) => ErrorKind::ControlPlane,
Self::BadSecret => crate::error::ErrorKind::ControlPlane,
Self::ApiError(_) => crate::error::ErrorKind::ControlPlane,
}
}
}

View File

@@ -126,16 +126,10 @@ pub(crate) enum Reason {
/// or that the subject doesn't have enough permissions to access the requested endpoint.
#[serde(rename = "ENDPOINT_NOT_FOUND")]
EndpointNotFound,
/// EndpointDisabled indicates that the endpoint has been disabled and does not accept connections.
#[serde(rename = "ENDPOINT_DISABLED")]
EndpointDisabled,
/// BranchNotFound indicates that the branch wasn't found, usually due to the provided ID not being correct,
/// or that the subject doesn't have enough permissions to access the requested branch.
#[serde(rename = "BRANCH_NOT_FOUND")]
BranchNotFound,
/// InvalidEphemeralEndpointOptions indicates that the specified LSN or timestamp are wrong.
#[serde(rename = "INVALID_EPHEMERAL_OPTIONS")]
InvalidEphemeralEndpointOptions,
/// RateLimitExceeded indicates that the rate limit for the operation has been exceeded.
#[serde(rename = "RATE_LIMIT_EXCEEDED")]
RateLimitExceeded,
@@ -158,9 +152,6 @@ pub(crate) enum Reason {
/// LogicalSizeQuotaExceeded indicates that the logical size quota was exceeded.
#[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")]
LogicalSizeQuotaExceeded,
/// ActiveEndpointsLimitExceeded indicates that the limit of concurrently active endpoints was exceeded.
#[serde(rename = "ACTIVE_ENDPOINTS_LIMIT_EXCEEDED")]
ActiveEndpointsLimitExceeded,
/// RunningOperations indicates that the project already has some running operations
/// and scheduling of new ones is prohibited.
#[serde(rename = "RUNNING_OPERATIONS")]
@@ -171,13 +162,9 @@ pub(crate) enum Reason {
/// LockAlreadyTaken indicates that the we attempted to take a lock that was already taken.
#[serde(rename = "LOCK_ALREADY_TAKEN")]
LockAlreadyTaken,
/// EndpointIdle indicates that the endpoint cannot become active, because it's idle.
#[serde(rename = "ENDPOINT_IDLE")]
EndpointIdle,
/// ProjectUnderMaintenance indicates that the project is currently ongoing maintenance,
/// and thus cannot accept connections.
#[serde(rename = "PROJECT_UNDER_MAINTENANCE")]
ProjectUnderMaintenance,
/// ActiveEndpointsLimitExceeded indicates that the limit of concurrently active endpoints was exceeded.
#[serde(rename = "ACTIVE_ENDPOINTS_LIMIT_EXCEEDED")]
ActiveEndpointsLimitExceeded,
#[default]
#[serde(other)]
Unknown,
@@ -197,15 +184,13 @@ impl Reason {
pub(crate) fn can_retry(self) -> bool {
match self {
// do not retry role protected errors
// not a transient error
// not a transitive error
Reason::RoleProtected => false,
// on retry, it will still not be found or valid
// on retry, it will still not be found
Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::EndpointDisabled
| Reason::BranchNotFound
| Reason::InvalidEphemeralEndpointOptions => false,
| Reason::BranchNotFound => false,
// we were asked to go away
Reason::RateLimitExceeded
| Reason::NonDefaultBranchComputeTimeExceeded
@@ -215,13 +200,11 @@ impl Reason {
| Reason::DataTransferQuotaExceeded
| Reason::LogicalSizeQuotaExceeded
| Reason::ActiveEndpointsLimitExceeded => false,
// transient error. control plane is currently busy
// transitive error. control plane is currently busy
// but might be ready soon
Reason::RunningOperations
| Reason::ConcurrencyLimitReached
| Reason::LockAlreadyTaken
| Reason::EndpointIdle
| Reason::ProjectUnderMaintenance => true,
| Reason::LockAlreadyTaken => true,
// unknown error. better not retry it.
Reason::Unknown => false,
}

View File

@@ -376,9 +376,7 @@ pub enum Waiting {
pub enum RedisMsgKind {
Set,
Get,
Expire,
HGet,
Unlink,
}
#[derive(Default, Clone)]

View File

@@ -110,7 +110,7 @@ where
debug!(error = ?err, COULD_NOT_CONNECT);
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just received this from cplane and not from the cache, we shouldn't retry.
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if !should_retry(&err, num_retries, compute.retry) {
Metrics::get().proxy.retries_metric.observe(

View File

@@ -195,18 +195,15 @@ impl NeonOptions {
// proxy options:
/// `PARAMS_COMPAT` allows opting in to forwarding all startup parameters from client to compute.
pub const PARAMS_COMPAT: &'static str = "proxy_params_compat";
pub const PARAMS_COMPAT: &str = "proxy_params_compat";
// cplane options:
/// `LSN` allows provisioning an ephemeral compute with time-travel to the provided LSN.
const LSN: &'static str = "lsn";
/// `TIMESTAMP` allows provisioning an ephemeral compute with time-travel to the provided timestamp.
const TIMESTAMP: &'static str = "timestamp";
const LSN: &str = "lsn";
/// `ENDPOINT_TYPE` allows configuring an ephemeral compute to be read_only or read_write.
const ENDPOINT_TYPE: &'static str = "endpoint_type";
const ENDPOINT_TYPE: &str = "endpoint_type";
pub(crate) fn parse_params(params: &StartupMessageParams) -> Self {
params
@@ -231,7 +228,6 @@ impl NeonOptions {
// This is not a cplane option, we know it does not create ephemeral computes.
Self::PARAMS_COMPAT => false,
Self::LSN => true,
Self::TIMESTAMP => true,
Self::ENDPOINT_TYPE => true,
// err on the side of caution. any cplane options we don't know about
// might lead to ephemeral computes.

View File

@@ -7,16 +7,8 @@ pub async fn run_until_cancelled<F: Future>(
f: F,
cancellation_token: &CancellationToken,
) -> Option<F::Output> {
run_until(f, cancellation_token.cancelled()).await.ok()
}
/// Runs the future `f` unless interrupted by future `condition`.
pub async fn run_until<F1: Future, F2: Future>(
f: F1,
condition: F2,
) -> Result<F1::Output, F2::Output> {
match select(pin!(f), pin!(condition)).await {
Either::Left((f1, _)) => Ok(f1),
Either::Right((f2, _)) => Err(f2),
match select(pin!(f), pin!(cancellation_token.cancelled())).await {
Either::Left((f, _)) => Some(f),
Either::Right(((), _)) => None,
}
}

View File

@@ -32,7 +32,7 @@ psutil = "^5.9.4"
types-psutil = "^5.9.5.12"
types-toml = "^0.10.8.6"
pytest-httpserver = "^1.0.8"
aiohttp = "3.12.14"
aiohttp = "3.10.11"
pytest-rerunfailures = "^15.0"
types-pytest-lazy-fixture = "^0.6.3.3"
pytest-split = "^0.8.1"

View File

@@ -6,10 +6,10 @@
use std::error::Error as _;
use http_utils::error::HttpErrorBody;
use reqwest::{IntoUrl, Method, Response, StatusCode};
use reqwest::{IntoUrl, Method, StatusCode};
use safekeeper_api::models::{
self, PullTimelineRequest, PullTimelineResponse, SafekeeperStatus, SafekeeperUtilization,
TimelineCreateRequest,
TimelineCreateRequest, TimelineStatus,
};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
@@ -161,12 +161,13 @@ impl Client {
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<Response> {
) -> Result<TimelineStatus> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}",
self.mgmt_api_endpoint, tenant_id, timeline_id
);
self.get(&uri).await
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn snapshot(

View File

@@ -23,7 +23,6 @@ use safekeeper::defaults::{
DEFAULT_PARTIAL_BACKUP_CONCURRENCY, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_SSL_CERT_FILE, DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
};
use safekeeper::hadron;
use safekeeper::wal_backup::WalBackup;
use safekeeper::{
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
@@ -253,10 +252,6 @@ struct Args {
/// Run in development mode (disables security checks)
#[arg(long, help = "Run in development mode (disables security checks)")]
dev: bool,
/* BEGIN_HADRON */
#[arg(long)]
enable_pull_timeline_on_startup: bool,
/* END_HADRON */
}
// Like PathBufValueParser, but allows empty string.
@@ -440,11 +435,6 @@ async fn main() -> anyhow::Result<()> {
use_https_safekeeper_api: args.use_https_safekeeper_api,
enable_tls_wal_service_api: args.enable_tls_wal_service_api,
force_metric_collection_on_scrape: args.force_metric_collection_on_scrape,
/* BEGIN_HADRON */
advertise_pg_addr_tenant_only: None,
enable_pull_timeline_on_startup: args.enable_pull_timeline_on_startup,
hcc_base_url: None,
/* END_HADRON */
});
// initialize sentry if SENTRY_DSN is provided
@@ -539,20 +529,6 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
// Load all timelines from disk to memory.
global_timelines.init().await?;
/* BEGIN_HADRON */
if conf.enable_pull_timeline_on_startup && global_timelines.timelines_count() == 0 {
match hadron::hcc_pull_timelines(&conf, global_timelines.clone()).await {
Ok(_) => {
info!("Successfully pulled all timelines from peer safekeepers");
}
Err(e) => {
error!("Failed to pull timelines from peer safekeepers: {:?}", e);
return Err(e);
}
}
}
/* END_HADRON */
// Run everything in current thread rt, if asked.
if conf.current_thread_runtime {
info!("running in current thread runtime");

View File

@@ -1,388 +0,0 @@
use pem::Pem;
use safekeeper_api::models::PullTimelineRequest;
use std::{collections::HashMap, env::VarError, net::IpAddr, sync::Arc, time::Duration};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use url::Url;
use utils::{backoff, id::TenantTimelineId, ip_address};
use anyhow::Result;
use pageserver_api::controller_api::{
AvailabilityZone, NodeRegisterRequest, SafekeeperTimeline, SafekeeperTimelinesResponse,
};
use crate::{
GlobalTimelines, SafeKeeperConf,
metrics::{
SK_RECOVERY_PULL_TIMELINE_ERRORS, SK_RECOVERY_PULL_TIMELINE_OKS,
SK_RECOVERY_PULL_TIMELINE_SECONDS, SK_RECOVERY_PULL_TIMELINES_SECONDS,
},
pull_timeline,
timelines_global_map::DeleteOrExclude,
};
// Extract information in the SafeKeeperConf to build a NodeRegisterRequest used to register the safekeeper with the HCC.
fn build_node_registeration_request(
conf: &SafeKeeperConf,
node_ip_addr: Option<IpAddr>,
) -> Result<NodeRegisterRequest> {
let advertise_pg_addr_with_port = conf
.advertise_pg_addr_tenant_only
.as_deref()
.expect("advertise_pg_addr_tenant_only is required to register with HCC");
// Extract host/port from the string.
let (advertise_host_addr, pg_port_str) = advertise_pg_addr_with_port.split_at(
advertise_pg_addr_with_port
.rfind(':')
.ok_or(anyhow::anyhow!("Invalid advertise_pg_addr"))?,
);
// Need the `[1..]` to remove the leading ':'.
let pg_port = pg_port_str[1..]
.parse::<u16>()
.map_err(|e| anyhow::anyhow!("Cannot parse PG port: {}", e))?;
let (_, http_port_str) = conf.listen_http_addr.split_at(
conf.listen_http_addr
.rfind(':')
.ok_or(anyhow::anyhow!("Invalid listen_http_addr"))?,
);
let http_port = http_port_str[1..]
.parse::<u16>()
.map_err(|e| anyhow::anyhow!("Cannot parse HTTP port: {}", e))?;
Ok(NodeRegisterRequest {
node_id: conf.my_id,
listen_pg_addr: advertise_host_addr.to_string(),
listen_pg_port: pg_port,
listen_http_addr: advertise_host_addr.to_string(),
listen_http_port: http_port,
node_ip_addr,
availability_zone_id: AvailabilityZone("todo".to_string()),
listen_grpc_addr: None,
listen_grpc_port: None,
listen_https_port: None,
})
}
// Retrieve the JWT token used for authenticating with HCC from the environment variable.
// Returns None if the token cannot be retrieved.
fn get_hcc_auth_token() -> Option<String> {
match std::env::var("HCC_AUTH_TOKEN") {
Ok(v) => {
tracing::info!("Loaded JWT token for authentication with HCC");
Some(v)
}
Err(VarError::NotPresent) => {
tracing::info!("No JWT token for authentication with HCC detected");
None
}
Err(_) => {
tracing::info!(
"Failed to either load to detect non-present HCC_AUTH_TOKEN environment variable"
);
None
}
}
}
async fn send_safekeeper_register_request(
request_url: &Url,
auth_token: &Option<String>,
request: &NodeRegisterRequest,
) -> Result<()> {
let client = reqwest::Client::new();
let mut req_builder = client
.post(request_url.clone())
.header("Content-Type", "application/json");
if let Some(token) = auth_token {
req_builder = req_builder.bearer_auth(token);
}
req_builder
.json(&request)
.send()
.await?
.error_for_status()?;
Ok(())
}
/// Registers this safe keeper with the HCC.
pub async fn register(conf: &SafeKeeperConf) -> Result<()> {
match conf.hcc_base_url.as_ref() {
None => {
tracing::info!("HCC base URL is not set, skipping registration");
Ok(())
}
Some(hcc_base_url) => {
// The following operations acquiring the auth token and the node IP address both read environment
// variables. It's fine for now as this `register()` function is only called once during startup.
// If we start to talk to HCC more regularly in the safekeeper we should probably consider
// refactoring things into a "HadronClusterCoordinatorClient" struct.
let auth_token = get_hcc_auth_token();
let node_ip_addr =
ip_address::read_node_ip_addr_from_env().expect("Error reading node IP address.");
let request = build_node_registeration_request(conf, node_ip_addr)?;
let cancel = CancellationToken::new();
let request_url = hcc_base_url.clone().join("/hadron-internal/v1/sk")?;
backoff::retry(
|| async {
send_safekeeper_register_request(&request_url, &auth_token, &request).await
},
|_| false,
3,
u32::MAX,
"Calling the HCC safekeeper register API",
&cancel,
)
.await
.ok_or(anyhow::anyhow!(
"Error in forever retry loop. This error should never be surfaced."
))?
}
}
}
async fn safekeeper_list_timelines_request(
conf: &SafeKeeperConf,
) -> Result<pageserver_api::controller_api::SafekeeperTimelinesResponse> {
if conf.hcc_base_url.is_none() {
tracing::info!("HCC base URL is not set, skipping registration");
return Err(anyhow::anyhow!("HCC base URL is not set"));
}
// The following operations acquiring the auth token and the node IP address both read environment
// variables. It's fine for now as this `register()` function is only called once during startup.
// If we start to talk to HCC more regularly in the safekeeper we should probably consider
// refactoring things into a "HadronClusterCoordinatorClient" struct.
let auth_token = get_hcc_auth_token();
let method = format!("/control/v1/safekeeper/{}/timelines", conf.my_id.0);
let request_url = conf.hcc_base_url.as_ref().unwrap().clone().join(&method)?;
let client = reqwest::Client::new();
let mut req_builder = client
.get(request_url.clone())
.header("Content-Type", "application/json")
.query(&[("id", conf.my_id.0)]);
if let Some(token) = auth_token {
req_builder = req_builder.bearer_auth(token);
}
let response = req_builder
.send()
.await?
.error_for_status()?
.json::<pageserver_api::controller_api::SafekeeperTimelinesResponse>()
.await?;
Ok(response)
}
// Returns true on success, false otherwise.
pub async fn hcc_pull_timeline(
timeline: SafekeeperTimeline,
conf: &SafeKeeperConf,
global_timelines: Arc<GlobalTimelines>,
nodeid_http: &HashMap<u64, String>,
) -> bool {
let mut request = PullTimelineRequest {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
http_hosts: Vec::new(),
ignore_tombstone: None,
};
for host in timeline.peers {
if host.0 == conf.my_id.0 {
continue;
}
if let Some(http_host) = nodeid_http.get(&host.0) {
request.http_hosts.push(http_host.clone());
}
}
let ca_certs = match conf
.ssl_ca_certs
.iter()
.map(Pem::contents)
.map(reqwest::Certificate::from_der)
.collect::<Result<Vec<_>, _>>()
{
Ok(result) => result,
Err(_) => {
return false;
}
};
match pull_timeline::handle_request(
request,
conf.sk_auth_token.clone(),
ca_certs,
global_timelines.clone(),
true,
)
.await
{
Ok(resp) => {
tracing::info!(
"Completed pulling tenant {} timeline {} from SK {:?}",
timeline.tenant_id,
timeline.timeline_id,
resp.safekeeper_host
);
return true;
}
Err(e) => {
tracing::error!(
"Failed to pull tenant {} timeline {} from SK {}",
timeline.tenant_id,
timeline.timeline_id,
e
);
let ttid = TenantTimelineId {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
};
// Revert the failed timeline pull.
// Notice that not found timeline returns OK also.
match global_timelines
.delete_or_exclude(&ttid, DeleteOrExclude::DeleteLocal)
.await
{
Ok(dr) => {
tracing::info!(
"Deleted tenant {} timeline {} DirExists: {}",
timeline.tenant_id,
timeline.timeline_id,
dr.dir_existed,
);
}
Err(e) => {
tracing::error!(
"Failed to delete tenant {} timeline {} from global_timelines: {}",
timeline.tenant_id,
timeline.timeline_id,
e
);
}
}
}
}
false
}
pub async fn hcc_pull_timeline_till_success(
timeline: SafekeeperTimeline,
conf: &SafeKeeperConf,
global_timelines: Arc<GlobalTimelines>,
nodeid_http: &HashMap<u64, String>,
) {
const MAX_PULL_TIMELINE_RETRIES: u64 = 100;
for i in 0..MAX_PULL_TIMELINE_RETRIES {
if hcc_pull_timeline(
timeline.clone(),
conf,
global_timelines.clone(),
nodeid_http,
)
.await
{
SK_RECOVERY_PULL_TIMELINE_OKS.inc();
return;
}
tracing::error!(
"Failed to pull timeline {} from SK peers, retrying {}/{}",
timeline.timeline_id,
i + 1,
MAX_PULL_TIMELINE_RETRIES
);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
SK_RECOVERY_PULL_TIMELINE_ERRORS.inc();
}
pub async fn hcc_pull_timelines(
conf: &SafeKeeperConf,
global_timelines: Arc<GlobalTimelines>,
) -> Result<()> {
let _timer = SK_RECOVERY_PULL_TIMELINES_SECONDS.start_timer();
tracing::info!("Start pulling timelines from SK peers");
let mut response = SafekeeperTimelinesResponse {
timelines: Vec::new(),
safekeeper_peers: Vec::new(),
};
for i in 0..100 {
match safekeeper_list_timelines_request(conf).await {
Ok(timelines) => {
response = timelines;
}
Err(e) => {
tracing::error!("Failed to list timelines from HCC: {}", e);
if i == 99 {
return Err(e);
}
}
}
sleep(Duration::from_millis(100)).await;
}
let mut nodeid_http = HashMap::new();
for sk in response.safekeeper_peers {
nodeid_http.insert(
sk.node_id.0,
format!("http://{}:{}", sk.listen_http_addr, sk.http_port),
);
}
tracing::info!("Received {} timelines from HCC", response.timelines.len());
for timeline in response.timelines {
let _timer = SK_RECOVERY_PULL_TIMELINE_SECONDS
.with_label_values(&[
&timeline.tenant_id.to_string(),
&timeline.timeline_id.to_string(),
])
.start_timer();
hcc_pull_timeline_till_success(timeline, conf, global_timelines.clone(), &nodeid_http)
.await;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use utils::id::NodeId;
#[test]
fn test_build_node_registeration_request() {
// Test that:
// 1. We always extract the host name and port used to register with the HCC from the
// `advertise_pg_addr` if it is set.
// 2. The correct ports are extracted from `advertise_pg_addr` and `listen_http_addr`.
let mut conf = SafeKeeperConf::dummy();
conf.my_id = NodeId(1);
conf.advertise_pg_addr_tenant_only =
Some("safe-keeper-1.safe-keeper.hadron.svc.cluster.local:5454".to_string());
// `listen_pg_addr` and `listen_pg_addr_tenant_only` are not used for node registration. Set them to a different
// host and port values and make sure that they don't show up in the node registration request.
conf.listen_pg_addr = "0.0.0.0:5456".to_string();
conf.listen_pg_addr_tenant_only = Some("0.0.0.0:5456".to_string());
conf.listen_http_addr = "0.0.0.0:7676".to_string();
let node_ip_addr: Option<IpAddr> = Some("127.0.0.1".parse().unwrap());
let request = build_node_registeration_request(&conf, node_ip_addr).unwrap();
assert_eq!(request.node_id, NodeId(1));
assert_eq!(
request.listen_pg_addr,
"safe-keeper-1.safe-keeper.hadron.svc.cluster.local"
);
assert_eq!(request.listen_pg_port, 5454);
assert_eq!(
request.listen_http_addr,
"safe-keeper-1.safe-keeper.hadron.svc.cluster.local"
);
assert_eq!(request.listen_http_port, 7676);
assert_eq!(
request.node_ip_addr,
Some(IpAddr::V4("127.0.0.1".parse().unwrap()))
);
}
}

View File

@@ -241,14 +241,9 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
ApiError::InternalServerError(anyhow::anyhow!("failed to parse CA certs: {e}"))
})?;
let resp = pull_timeline::handle_request(
data,
conf.sk_auth_token.clone(),
ca_certs,
global_timelines,
false,
)
.await?;
let resp =
pull_timeline::handle_request(data, conf.sk_auth_token.clone(), ca_certs, global_timelines)
.await?;
json_response(StatusCode::OK, resp)
}

View File

@@ -10,7 +10,6 @@ use pem::Pem;
use remote_storage::RemoteStorageConfig;
use storage_broker::Uri;
use tokio::runtime::Runtime;
use url::Url;
use utils::auth::SwappableJwtAuth;
use utils::id::NodeId;
use utils::logging::SecretString;
@@ -21,7 +20,6 @@ pub mod control_file;
pub mod control_file_upgrade;
pub mod copy_timeline;
pub mod debug_dump;
pub mod hadron;
pub mod handler;
pub mod http;
pub mod metrics;
@@ -102,11 +100,6 @@ pub struct SafeKeeperConf {
pub advertise_pg_addr: Option<String>,
pub availability_zone: Option<String>,
pub no_sync: bool,
/* BEGIN_HADRON */
pub advertise_pg_addr_tenant_only: Option<String>,
pub enable_pull_timeline_on_startup: bool,
pub hcc_base_url: Option<Url>,
/* END_HADRON */
pub broker_endpoint: Uri,
pub broker_keepalive_interval: Duration,
pub heartbeat_timeout: Duration,
@@ -192,11 +185,6 @@ impl SafeKeeperConf {
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
force_metric_collection_on_scrape: true,
/* BEGIN_HADRON */
advertise_pg_addr_tenant_only: None,
enable_pull_timeline_on_startup: false,
hcc_base_url: None,
/* END_HADRON */
}
}
}

View File

@@ -85,43 +85,6 @@ pub static WAL_STORAGE_LIMIT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_wal_storage_limit_errors counter")
});
pub static SK_RECOVERY_PULL_TIMELINE_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_recovery_pull_timeline_errors",
concat!(
"Number of errors due to pull_timeline errors during SK lost disk recovery.",
"An increase in this metric indicates pull timelines runs into error."
)
)
.expect("Failed to register safekeeper_recovery_pull_timeline_errors counter")
});
pub static SK_RECOVERY_PULL_TIMELINE_OKS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_recovery_pull_timeline_oks",
concat!(
"Number of successful pull_timeline during SK lost disk recovery.",
"An increase in this metric indicates pull timelines is successful."
)
)
.expect("Failed to register safekeeper_recovery_pull_timeline_oks counter")
});
pub static SK_RECOVERY_PULL_TIMELINES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_recovery_pull_timelines_seconds",
"Seconds to pull timelines",
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_recovery_pull_timelines_seconds histogram")
});
pub static SK_RECOVERY_PULL_TIMELINE_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"safekeeper_recovery_pull_timeline_seconds",
"Seconds to pull timeline",
&["tenant_id", "timeline_id"],
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_recovery_pull_timeline_seconds histogram vec")
});
/* END_HADRON */
pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(

View File

@@ -8,7 +8,6 @@ use bytes::Bytes;
use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt};
use http::StatusCode;
use http_utils::error::ApiError;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
use remote_storage::GenericRemoteStorage;
@@ -22,11 +21,10 @@ use tokio::fs::OpenOptions;
use tokio::io::AsyncWrite;
use tokio::sync::mpsc;
use tokio::task;
use tokio::time::sleep;
use tokio_tar::{Archive, Builder, Header};
use tokio_util::io::{CopyToBytes, SinkWriter};
use tokio_util::sync::PollSender;
use tracing::{error, info, instrument, warn};
use tracing::{error, info, instrument};
use utils::crashsafe::fsync_async_opt;
use utils::id::{NodeId, TenantTimelineId};
use utils::logging::SecretString;
@@ -451,7 +449,6 @@ pub async fn handle_request(
sk_auth_token: Option<SecretString>,
ssl_ca_certs: Vec<Certificate>,
global_timelines: Arc<GlobalTimelines>,
wait_for_peer_timeline_status: bool,
) -> Result<PullTimelineResponse, ApiError> {
let existing_tli = global_timelines.get(TenantTimelineId::new(
request.tenant_id,
@@ -475,100 +472,37 @@ pub async fn handle_request(
let http_hosts = request.http_hosts.clone();
// Figure out statuses of potential donors.
let responses: Vec<Result<TimelineStatus, mgmt_api::Error>> =
futures::future::join_all(http_hosts.iter().map(|url| async {
let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone());
let info = cclient
.timeline_status(request.tenant_id, request.timeline_id)
.await?;
Ok(info)
}))
.await;
let mut statuses = Vec::new();
if !wait_for_peer_timeline_status {
let responses: Vec<Result<TimelineStatus, mgmt_api::Error>> =
futures::future::join_all(http_hosts.iter().map(|url| async {
let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone());
let resp = cclient
.timeline_status(request.tenant_id, request.timeline_id)
.await?;
let info: TimelineStatus = resp
.json()
.await
.context("Failed to deserialize timeline status")
.map_err(|e| mgmt_api::Error::ReceiveErrorBody(e.to_string()))?;
Ok(info)
}))
.await;
for (i, response) in responses.into_iter().enumerate() {
match response {
Ok(status) => {
statuses.push((status, i));
}
Err(e) => {
info!("error fetching status from {}: {e}", http_hosts[i]);
}
for (i, response) in responses.into_iter().enumerate() {
match response {
Ok(status) => {
statuses.push((status, i));
}
Err(e) => {
info!("error fetching status from {}: {e}", http_hosts[i]);
}
}
}
// Allow missing responses from up to one safekeeper (say due to downtime)
// e.g. if we created a timeline on PS A and B, with C being offline. Then B goes
// offline and C comes online. Then we want a pull on C with A and B as hosts to work.
let min_required_successful = (http_hosts.len() - 1).max(1);
if statuses.len() < min_required_successful {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"only got {} successful status responses. required: {min_required_successful}",
statuses.len()
)));
}
} else {
let mut retry = true;
// We must get status from all other peers.
// Otherwise, we may run into split-brain scenario.
while retry {
statuses.clear();
retry = false;
for (i, url) in http_hosts.iter().enumerate() {
let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone());
match cclient
.timeline_status(request.tenant_id, request.timeline_id)
.await
{
Ok(resp) => {
if resp.status() == StatusCode::NOT_FOUND {
warn!(
"Timeline {} not found on peer SK {}, no need to pull it",
TenantTimelineId::new(request.tenant_id, request.timeline_id),
url
);
return Ok(PullTimelineResponse {
safekeeper_host: None,
});
}
let info: TimelineStatus = resp
.json()
.await
.context("Failed to deserialize timeline status")
.map_err(ApiError::InternalServerError)?;
statuses.push((info, i));
}
Err(e) => {
match e {
// If we get a 404, it means the timeline doesn't exist on this safekeeper.
// We can ignore this error.
mgmt_api::Error::ApiError(status, _)
if status == StatusCode::NOT_FOUND =>
{
warn!(
"Timeline {} not found on peer SK {}, no need to pull it",
TenantTimelineId::new(request.tenant_id, request.timeline_id),
url
);
return Ok(PullTimelineResponse {
safekeeper_host: None,
});
}
_ => {}
}
retry = true;
error!("Failed to get timeline status from {}: {:#}", url, e);
}
}
}
sleep(std::time::Duration::from_millis(100)).await;
}
// Allow missing responses from up to one safekeeper (say due to downtime)
// e.g. if we created a timeline on PS A and B, with C being offline. Then B goes
// offline and C comes online. Then we want a pull on C with A and B as hosts to work.
let min_required_successful = (http_hosts.len() - 1).max(1);
if statuses.len() < min_required_successful {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"only got {} successful status responses. required: {min_required_successful}",
statuses.len()
)));
}
// Find the most advanced safekeeper
@@ -577,12 +511,6 @@ pub async fn handle_request(
.max_by_key(|(status, _)| {
(
status.acceptor_state.epoch,
/* BEGIN_HADRON */
// We need to pull from the SK with the highest term.
// This is because another compute may come online and vote the same highest term again on the other two SKs.
// Then, there will be 2 computes running on the same term.
status.acceptor_state.term,
/* END_HADRON */
status.flush_lsn,
status.commit_lsn,
)

View File

@@ -191,11 +191,6 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
force_metric_collection_on_scrape: true,
/* BEGIN_HADRON */
enable_pull_timeline_on_startup: false,
advertise_pg_addr_tenant_only: None,
hcc_base_url: None,
/* END_HADRON */
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -34,9 +34,7 @@ class NeonAPI:
self.retries524 = 0
self.retries4xx = 0
def __request(
self, method: str | bytes, endpoint: str, retry404: bool = False, **kwargs: Any
) -> requests.Response:
def __request(self, method: str | bytes, endpoint: str, **kwargs: Any) -> requests.Response:
kwargs["headers"] = kwargs.get("headers", {})
kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}"
@@ -57,12 +55,10 @@ class NeonAPI:
resp.raise_for_status()
break
elif resp.status_code >= 400:
if resp.status_code == 404 and retry404:
retry = True
self.retries4xx += 1
elif resp.status_code == 422 and resp.json()["message"] == "branch not ready yet":
retry = True
self.retries4xx += 1
if resp.status_code == 422:
if resp.json()["message"] == "branch not ready yet":
retry = True
self.retries4xx += 1
elif resp.status_code == 423 and resp.json()["message"] in {
"endpoint is in some transitive state, could not suspend",
"project already has running conflicting operations, scheduling of new ones is prohibited",
@@ -70,7 +66,7 @@ class NeonAPI:
retry = True
self.retries4xx += 1
elif resp.status_code == 524:
log.info("The request was timed out")
log.info("The request was timed out, trying to get operations")
retry = True
self.retries524 += 1
if retry:
@@ -207,9 +203,6 @@ class NeonAPI:
resp = self.__request(
"GET",
f"/projects/{project_id}/branches/{branch_id}",
# XXX Retry get parent details to work around the issue
# https://databricks.atlassian.net/browse/LKB-279
retry404=True,
headers={
"Accept": "application/json",
},
@@ -314,10 +307,6 @@ class NeonAPI:
if endpoint_type:
data["endpoint"]["type"] = endpoint_type
if settings:
# otherwise we get 400 "settings must not be nil"
# TODO(myrrc): fix on cplane side
if "pg_settings" not in settings:
settings["pg_settings"] = {}
data["endpoint"]["settings"] = settings
resp = self.__request(

View File

@@ -728,7 +728,7 @@ class NeonEnvBuilder:
# NB: neon_local rewrites postgresql.conf on each start based on neon_local config. No need to patch it.
# However, in this new NeonEnv, the pageservers and safekeepers listen on different ports, and the storage
# controller will currently reject re-attach requests from them because the NodeMetadata isn't identical.
# So, from_repo_dir patches up the storcon database.
# So, from_repo_dir patches up the the storcon database.
patch_script_path = self.repo_dir / "storage_controller_db.startup.sql"
assert not patch_script_path.exists()
patch_script = ""

View File

@@ -1,168 +0,0 @@
from __future__ import annotations
import os
import timeit
import traceback
from concurrent.futures import ThreadPoolExecutor as Exec
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING, Any, cast
import pytest
from fixtures.benchmark_fixture import NeonBenchmarker, PgBenchRunResult
from fixtures.log_helper import log
from fixtures.neon_api import NeonAPI, connection_parameters_to_env
if TYPE_CHECKING:
from fixtures.compare_fixtures import NeonCompare
from fixtures.neon_fixtures import Endpoint, PgBin
from fixtures.pg_version import PgVersion
from performance.test_perf_pgbench import utc_now_timestamp
# These tests compare performance for a write-heavy and read-heavy workloads of an ordinary endpoint
# compared to the endpoint which saves its LFC and prewarms using it on startup.
def test_compare_prewarmed_pgbench_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.create_branch("normal")
env.create_branch("prewarmed")
pg_bin = neon_compare.pg_bin
ep_normal: Endpoint = env.endpoints.create_start("normal")
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True)
for ep in [ep_normal, ep_prewarmed]:
connstr: str = ep.connstr()
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", connstr, "-s100"])
ep.safe_psql("CREATE EXTENSION neon")
client = ep.http_client()
client.offload_lfc()
ep.stop()
ep.start()
client.prewarm_lfc_wait()
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = pg_bin.run_capture(["pgbench", "-c10", "-T10", connstr])
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
stdout = Path(f"{out}.stdout").read_text()
res = PgBenchRunResult.parse_from_stdout(
stdout=stdout,
run_duration=run_duration,
run_start_timestamp=run_start_timestamp,
run_end_timestamp=run_end_timestamp,
)
name: str = cast("str", ep.branch_name)
neon_compare.zenbenchmark.record_pg_bench_result(name, res)
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_compare_prewarmed_pgbench_perf_benchmark(
pg_bin: PgBin,
neon_api: NeonAPI,
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
name = f"Test prewarmed pgbench performance, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
project = neon_api.create_project(pg_version, name)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
err = False
try:
benchmark_impl(pg_bin, neon_api, project, zenbenchmark)
except Exception as e:
err = True
log.error(f"Caught exception: {e}")
log.error(traceback.format_exc())
finally:
assert not err
neon_api.delete_project(project_id)
def benchmark_impl(
pg_bin: PgBin, neon_api: NeonAPI, project: dict[str, Any], zenbenchmark: NeonBenchmarker
):
pgbench_size = int(os.getenv("PGBENCH_SIZE") or "3424") # 50GB
offload_secs = 20
test_duration_min = 5
pgbench_duration = f"-T{test_duration_min * 60}"
# prewarm API is not publicly exposed. In order to test performance of a
# fully prewarmed endpoint, wait after it restarts.
# The number here is empirical, based on manual runs on staging
prewarmed_sleep_secs = 180
branch_id = project["branch"]["id"]
project_id = project["project"]["id"]
normal_env = connection_parameters_to_env(
project["connection_uris"][0]["connection_parameters"]
)
normal_id = project["endpoints"][0]["id"]
prewarmed_branch_id = neon_api.create_branch(
project_id, "prewarmed", parent_id=branch_id, add_endpoint=False
)["branch"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
ep_prewarmed = neon_api.create_endpoint(
project_id,
prewarmed_branch_id,
endpoint_type="read_write",
settings={"autoprewarm": True, "offload_lfc_interval_seconds": offload_secs},
)
neon_api.wait_for_operation_to_finish(project_id)
prewarmed_env = normal_env.copy()
prewarmed_env["PGHOST"] = ep_prewarmed["endpoint"]["host"]
prewarmed_id = ep_prewarmed["endpoint"]["id"]
def bench(endpoint_name, endpoint_id, env):
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", f"-s{pgbench_size}"], env)
sleep(offload_secs * 2) # ensure LFC is offloaded after pgbench finishes
neon_api.restart_endpoint(project_id, endpoint_id)
sleep(prewarmed_sleep_secs)
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = pg_bin.run_capture(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env)
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
stdout = Path(f"{out}.stdout").read_text()
res = PgBenchRunResult.parse_from_stdout(
stdout=stdout,
run_duration=run_duration,
run_start_timestamp=run_start_timestamp,
run_end_timestamp=run_end_timestamp,
)
zenbenchmark.record_pg_bench_result(endpoint_name, res)
with Exec(max_workers=2) as exe:
exe.submit(bench, "normal", normal_id, normal_env)
exe.submit(bench, "prewarmed", prewarmed_id, prewarmed_env)
def test_compare_prewarmed_read_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.create_branch("normal")
env.create_branch("prewarmed")
ep_normal: Endpoint = env.endpoints.create_start("normal")
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True)
sql = [
"CREATE EXTENSION neon",
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
]
for ep in [ep_normal, ep_prewarmed]:
ep.safe_psql_many(sql)
client = ep.http_client()
client.offload_lfc()
ep.stop()
ep.start()
client.prewarm_lfc_wait()
with neon_compare.record_duration(f"{ep.branch_name}_run_duration"):
ep.safe_psql("SELECT count(*) from foo")

View File

@@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Any
import pytest
from fixtures.log_helper import log
from requests import HTTPError
if TYPE_CHECKING:
from pathlib import Path
@@ -152,11 +153,26 @@ class NeonBranch:
return
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
parent_id: str = res["branch"]["parent_id"]
# XXX Retry get parent details to work around the issue
# https://databricks.atlassian.net/browse/LKB-279
target_time = datetime.now() + timedelta(seconds=30)
while datetime.now() < target_time:
try:
parent_def = self.neon_api.get_branch_details(self.project_id, parent_id)
except HTTPError as he:
if he.response.status_code == 404:
log.info("Branch not found, waiting...")
time.sleep(1)
else:
raise HTTPError(he) from he
else:
break
else:
raise RuntimeError(f"Branch {parent_id} not found")
# Creates an object for the parent branch
# After the reset operation a new parent branch is created
parent = NeonBranch(
self.project, self.neon_api.get_branch_details(self.project_id, parent_id), True
)
parent = NeonBranch(self.project, parent_def, True)
self.project.branches[parent_id] = parent
self.parent = parent
parent.children[self.id] = self

View File

@@ -24,7 +24,10 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
[
".*get_values_reconstruct_data for layer .*",
".*could not find data for key.*",
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*load failed.*load local timeline.*",
".*: layer load failed, assuming permanent failure:.*",
".*failed to get checkpoint bytes.*",
".*failed to get control bytes.*",

View File

@@ -687,7 +687,7 @@ def test_sharding_compaction(
for _i in range(0, 10):
# Each of these does some writes then a checkpoint: because we set image_creation_threshold to 1,
# these should result in image layers each time we write some data into a shard, and also shards
# receiving less data hitting their "empty image layer" path (where they should skip writing the layer,
# recieving less data hitting their "empty image layer" path (wherre they should skip writing the layer,
# rather than asserting)
workload.churn_rows(64)

View File

@@ -217,7 +217,7 @@ if SQL_EXPORTER is None:
self, logs_dir: Path, config_file: Path, collector_file: Path, port: int
) -> None:
# NOTE: Keep the version the same as in
# compute/compute-node.Dockerfile and build-tools/Dockerfile.
# compute/compute-node.Dockerfile and build-tools.Dockerfile.
#
# The "host" network mode allows sql_exporter to talk to the
# endpoint which is running on the host.

View File

@@ -76,6 +76,7 @@ def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
"""Tests tenants with and without wal acceptors"""
tenant_1, _ = env.create_tenant()
tenant_2, _ = env.create_tenant()

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
import sys
import tarfile
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
@@ -199,115 +198,3 @@ def test_wal_restore_http(neon_env_builder: NeonEnvBuilder, broken_tenant: bool)
# the table is back now!
restored = env.endpoints.create_start("main")
assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)]
# BEGIN_HADRON
# TODO: re-enable once CM python is integreated.
# def clear_directory(directory):
# for item in os.listdir(directory):
# item_path = os.path.join(directory, item)
# if os.path.isdir(item_path):
# log.info(f"removing SK directory: {item_path}")
# shutil.rmtree(item_path)
# else:
# log.info(f"removing SK file: {item_path}")
# os.remove(item_path)
# def test_sk_pull_timelines(
# neon_env_builder: NeonEnvBuilder,
# ):
# DBNAME = "regression"
# superuser_name = "databricks_superuser"
# neon_env_builder.num_safekeepers = 3
# neon_env_builder.num_pageservers = 4
# neon_env_builder.safekeeper_extra_opts = ["--enable-pull-timeline-on-startup"]
# neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
# env = neon_env_builder.init_start(initial_tenant_shard_count=4)
# env.compute_manager.start(base_port=env.compute_manager_port)
# test_creator = "test_creator"
# test_metastore_id = uuid4()
# test_account_id = uuid4()
# test_workspace_id = 1
# test_workspace_url = "http://test_workspace_url"
# test_metadata_version = 1
# test_metadata = {
# "state": "INSTANCE_PROVISIONING",
# "admin_rolename": "admin",
# "admin_password_scram": "abc123456",
# }
# test_instance_name_1 = "test_instance_1"
# test_instance_read_write_compute_pool_1 = {
# "instance_name": test_instance_name_1,
# "compute_pool_name": "compute_pool_1",
# "creator": test_creator,
# "capacity": 2.0,
# "node_count": 1,
# "metadata_version": 0,
# "metadata": {
# "state": "INSTANCE_PROVISIONING",
# },
# }
# test_instance_1_readable_secondaries_enabled = False
# # Test creation
# create_instance_with_retries(
# env,
# test_instance_name_1,
# test_creator,
# test_metastore_id,
# test_account_id,
# test_workspace_id,
# test_workspace_url,
# test_instance_read_write_compute_pool_1,
# test_metadata_version,
# test_metadata,
# test_instance_1_readable_secondaries_enabled,
# )
# instance = env.compute_manager.get_instance_by_name(test_instance_name_1, test_workspace_id)
# log.info(f"haoyu Instance created: {instance}")
# assert instance["instance_name"] == test_instance_name_1
# test_instance_id = instance["instance_id"]
# instance_detail = env.compute_manager.describe_instance(test_instance_id)
# log.info(f"haoyu Instance detail: {instance_detail}")
# env.initial_tenant = instance_detail[0]["tenant_id"]
# env.initial_timeline = instance_detail[0]["timeline_id"]
# # Connect to postgres and create a database called "regression".
# endpoint = env.endpoints.create_start("main")
# endpoint.safe_psql(f"CREATE ROLE {superuser_name}")
# endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
# endpoint.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);")
# # Write some data. ~20 MB.
# num_rows = 0
# for _i in range(0, 20000):
# endpoint.safe_psql(
# "INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False
# )
# num_rows += 1
# log.info(f"SKs {env.storage_controller.hcc_sk_node_list()}")
# env.safekeepers[0].stop(immediate=True)
# clear_directory(env.safekeepers[0].data_dir)
# env.safekeepers[0].start()
# # PG can still write data. ~20 MB.
# for _i in range(0, 20000):
# endpoint.safe_psql(
# "INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False
# )
# num_rows += 1
# tuples = endpoint.safe_psql("SELECT COUNT(*) FROM usertable;")
# assert tuples[0][0] == num_rows
# endpoint.stop_and_destroy()
# END_HADRON

View File

@@ -98,7 +98,7 @@ tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unpref
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["full", "test-util"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
tokio-stream = { version = "0.1", features = ["net", "sync"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }