Merge commit 'a456e818a' into problame/standby-horizon-leases

This commit is contained in:
Christian Schwarz
2025-08-06 17:49:56 +02:00
36 changed files with 1901 additions and 571 deletions

View File

@@ -31,6 +31,7 @@ config-variables:
- NEON_PROD_AWS_ACCOUNT_ID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- PREWARM_PGBENCH_SIZE
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID

View File

@@ -219,6 +219,7 @@ jobs:
--ignore test_runner/performance/test_cumulative_statistics_persistence.py
--ignore test_runner/performance/test_perf_many_relations.py
--ignore test_runner/performance/test_perf_oltp_large_tenant.py
--ignore test_runner/performance/test_lfc_prewarm.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -410,6 +411,77 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
prewarm-test:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
PGBENCH_SIZE: ${{ vars.PREWARM_PGBENCH_SIZE }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 17
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Run prewarm benchmark
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_lfc_prewarm.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
generate-matrices:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)

2
Cargo.lock generated
View File

@@ -4296,6 +4296,7 @@ dependencies = [
"pageserver_client",
"pageserver_client_grpc",
"pageserver_page_api",
"pprof",
"rand 0.8.5",
"reqwest",
"serde",
@@ -7564,6 +7565,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]

View File

@@ -201,7 +201,7 @@ tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.g
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}
tokio-stream = "0.1"
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "io-util", "rt"] }
toml = "0.8"

View File

@@ -476,6 +476,7 @@ async fn main() -> anyhow::Result<()> {
listen_http_port,
listen_https_port,
availability_zone_id: AvailabilityZone(availability_zone_id),
node_ip_addr: None,
}),
)
.await?;

View File

@@ -1,5 +1,6 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::net::IpAddr;
use std::str::FromStr;
use std::time::{Duration, Instant};
@@ -60,6 +61,11 @@ pub struct NodeRegisterRequest {
pub listen_https_port: Option<u16>,
pub availability_zone_id: AvailabilityZone,
// Reachable IP address of the PS/SK registering, if known.
// Hadron Cluster Coordiantor will update the DNS record of the registering node
// with this IP address.
pub node_ip_addr: Option<IpAddr>,
}
#[derive(Serialize, Deserialize)]
@@ -545,6 +551,39 @@ pub struct SafekeeperDescribeResponse {
pub scheduling_policy: SkSchedulingPolicy,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TimelineSafekeeperPeer {
pub node_id: NodeId,
pub listen_http_addr: String,
pub http_port: i32,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SCSafekeeperTimeline {
// SC does not know the tenant id.
pub timeline_id: TimelineId,
pub peers: Vec<NodeId>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SCSafekeeperTimelinesResponse {
pub timelines: Vec<SCSafekeeperTimeline>,
pub safekeeper_peers: Vec<TimelineSafekeeperPeer>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SafekeeperTimeline {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub peers: Vec<NodeId>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SafekeeperTimelinesResponse {
pub timelines: Vec<SafekeeperTimeline>,
pub safekeeper_peers: Vec<TimelineSafekeeperPeer>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct SafekeeperSchedulingPolicyRequest {
pub scheduling_policy: SkSchedulingPolicy,

View File

@@ -0,0 +1,73 @@
use std::env::{VarError, var};
use std::error::Error;
use std::net::IpAddr;
use std::str::FromStr;
/// Name of the environment variable containing the reachable IP address of the node. If set, the IP address contained in this
/// environment variable is used as the reachable IP address of the pageserver or safekeeper node during node registration.
/// In a Kubernetes environment, this environment variable should be set by Kubernetes to the Pod IP (specified in the Pod
/// template).
pub const HADRON_NODE_IP_ADDRESS: &str = "HADRON_NODE_IP_ADDRESS";
/// Read the reachable IP address of this page server from env var HADRON_NODE_IP_ADDRESS.
/// In Kubernetes this environment variable is set to the Pod IP (specified in the Pod template).
pub fn read_node_ip_addr_from_env() -> Result<Option<IpAddr>, Box<dyn Error>> {
match var(HADRON_NODE_IP_ADDRESS) {
Ok(v) => {
if let Ok(addr) = IpAddr::from_str(&v) {
Ok(Some(addr))
} else {
Err(format!("Invalid IP address string: {v}. Cannot be parsed as either an IPv4 or an IPv6 address.").into())
}
}
Err(VarError::NotPresent) => Ok(None),
Err(e) => Err(e.into()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
use std::net::{Ipv4Addr, Ipv6Addr};
#[test]
fn test_read_node_ip_addr_from_env() {
// SAFETY: test code
unsafe {
// Test with a valid IPv4 address
env::set_var(HADRON_NODE_IP_ADDRESS, "192.168.1.1");
let result = read_node_ip_addr_from_env().unwrap();
assert_eq!(result, Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))));
// Test with a valid IPv6 address
env::set_var(
HADRON_NODE_IP_ADDRESS,
"2001:0db8:85a3:0000:0000:8a2e:0370:7334",
);
}
let result = read_node_ip_addr_from_env().unwrap();
assert_eq!(
result,
Some(IpAddr::V6(
Ipv6Addr::from_str("2001:0db8:85a3:0000:0000:8a2e:0370:7334").unwrap()
))
);
// Test with an invalid IP address
// SAFETY: test code
unsafe {
env::set_var(HADRON_NODE_IP_ADDRESS, "invalid_ip");
}
let result = read_node_ip_addr_from_env();
assert!(result.is_err());
// Test with no environment variable set
// SAFETY: test code
unsafe {
env::remove_var(HADRON_NODE_IP_ADDRESS);
}
let result = read_node_ip_addr_from_env().unwrap();
assert_eq!(result, None);
}
}

View File

@@ -26,6 +26,9 @@ pub mod auth;
// utility functions and helper traits for unified unique id generation/serialization etc.
pub mod id;
// utility functions to obtain reachable IP addresses in PS/SK nodes.
pub mod ip_address;
pub mod shard;
mod hex;

View File

@@ -1,4 +1,5 @@
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::time::Duration;
@@ -7,7 +8,7 @@ use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, VariantNames};
use tokio::time::Instant;
use tracing::info;
use tracing::{info, warn};
/// Logs a critical error, similarly to `tracing::error!`. This will:
///
@@ -377,10 +378,11 @@ impl std::fmt::Debug for SecretString {
///
/// TODO: consider upgrading this to a warning, but currently it fires too often.
#[inline]
pub async fn log_slow<F, O>(name: &str, threshold: Duration, f: std::pin::Pin<&mut F>) -> O
where
F: Future<Output = O>,
{
pub async fn log_slow<O>(
name: &str,
threshold: Duration,
f: Pin<&mut impl Future<Output = O>>,
) -> O {
monitor_slow_future(
threshold,
threshold, // period = threshold
@@ -394,16 +396,42 @@ where
if !is_slow {
return;
}
let elapsed = elapsed_total.as_secs_f64();
if ready {
info!(
"slow {name} completed after {:.3}s",
elapsed_total.as_secs_f64()
);
info!("slow {name} completed after {elapsed:.3}s");
} else {
info!(
"slow {name} still running after {:.3}s",
elapsed_total.as_secs_f64()
);
info!("slow {name} still running after {elapsed:.3}s");
}
},
)
.await
}
/// Logs a periodic warning if a future is slow to complete.
#[inline]
pub async fn warn_slow<O>(
name: &str,
threshold: Duration,
f: Pin<&mut impl Future<Output = O>>,
) -> O {
monitor_slow_future(
threshold,
threshold, // period = threshold
f,
|MonitorSlowFutureCallback {
ready,
is_slow,
elapsed_total,
elapsed_since_last_callback: _,
}| {
if !is_slow {
return;
}
let elapsed = elapsed_total.as_secs_f64();
if ready {
warn!("slow {name} completed after {elapsed:.3}s");
} else {
warn!("slow {name} still running after {elapsed:.3}s");
}
},
)
@@ -416,7 +444,7 @@ where
pub async fn monitor_slow_future<F, O>(
threshold: Duration,
period: Duration,
mut fut: std::pin::Pin<&mut F>,
mut fut: Pin<&mut F>,
mut cb: impl FnMut(MonitorSlowFutureCallback),
) -> O
where

View File

@@ -1,13 +1,16 @@
use std::collections::HashMap;
use std::num::NonZero;
use std::pin::pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::anyhow;
use arc_swap::ArcSwap;
use futures::stream::FuturesUnordered;
use futures::{FutureExt as _, StreamExt as _};
use tonic::codec::CompressionEncoding;
use tracing::instrument;
use tracing::{debug, instrument};
use utils::logging::warn_slow;
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
use crate::retry::Retry;
@@ -21,28 +24,40 @@ use utils::shard::{ShardCount, ShardIndex, ShardNumber};
/// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
/// when full.
///
/// Normal requests are small, and we don't pipeline them, so we can afford a large number of
/// streams per connection.
///
/// TODO: tune all of these constants, and consider making them configurable.
/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels
/// with only streams.
const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(64).unwrap();
/// Max number of concurrent unary request clients per shard.
const MAX_UNARY_CLIENTS: NonZero<usize> = NonZero::new(64).unwrap();
/// Max number of concurrent bulk GetPage streams per channel (i.e. TCP connection). These use a
/// dedicated channel pool with a lower client limit, to avoid TCP-level head-of-line blocking and
/// transmission delays. This also concentrates large window sizes on a smaller set of
/// streams/connections, presumably reducing memory use.
const MAX_BULK_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
/// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage
/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`.
const MAX_STREAMS: NonZero<usize> = NonZero::new(64).unwrap();
/// The batch size threshold at which a GetPage request will use the bulk stream pool.
///
/// The gRPC initial window size is 64 KB. Each page is 8 KB, so let's avoid increasing the window
/// size for the normal stream pool, and route requests for >= 5 pages (>32 KB) to the bulk pool.
const BULK_THRESHOLD_BATCH_SIZE: usize = 5;
/// Max number of pipelined requests per stream.
const MAX_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(2).unwrap();
/// The overall request call timeout, including retries and pool acquisition.
/// TODO: should we retry forever? Should the caller decide?
const CALL_TIMEOUT: Duration = Duration::from_secs(60);
/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these
/// are more throughput-oriented, we have a smaller limit but higher queue depth.
const MAX_BULK_STREAMS: NonZero<usize> = NonZero::new(16).unwrap();
/// The per-request (retry attempt) timeout, including any lazy connection establishment.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus
/// get a larger queue depth.
const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
/// The initial request retry backoff duration. The first retry does not back off.
/// TODO: use a different backoff for ResourceExhausted (rate limiting)? Needs server support.
const BASE_BACKOFF: Duration = Duration::from_millis(5);
/// The maximum request retry backoff duration.
const MAX_BACKOFF: Duration = Duration::from_secs(5);
/// Threshold and interval for warning about slow operation.
const SLOW_THRESHOLD: Duration = Duration::from_secs(3);
/// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
/// basic `page_api::Client` gRPC client, and supports:
@@ -50,10 +65,19 @@ const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
/// * Sharded tenants across multiple Pageservers.
/// * Pooling of connections, clients, and streams for efficient resource use.
/// * Concurrent use by many callers.
/// * Internal handling of GetPage bidirectional streams, with pipelining and error handling.
/// * Internal handling of GetPage bidirectional streams.
/// * Automatic retries.
/// * Observability.
///
/// The client has dedicated connection/client/stream pools per shard, for resource reuse. These
/// pools are unbounded: we allow scaling out as many concurrent streams as needed to serve all
/// concurrent callers, which mostly eliminates head-of-line blocking. Idle streams are fairly
/// cheap: the server task currently uses 26 KB of memory, so we can comfortably fit 100,000
/// concurrent idle streams (2.5 GB memory). The worst case degenerates to the old libpq case with
/// one stream per backend, but without the TCP connection overhead. In the common case we expect
/// significantly lower stream counts due to stream sharing, driven e.g. by idle backends, LFC hits,
/// read coalescing, sharding (backends typically only talk to one shard at a time), etc.
///
/// TODO: this client does not support base backups or LSN leases, as these are only used by
/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
pub struct PageserverClient {
@@ -67,8 +91,6 @@ pub struct PageserverClient {
compression: Option<CompressionEncoding>,
/// The shards for this tenant.
shards: ArcSwap<Shards>,
/// The retry configuration.
retry: Retry,
}
impl PageserverClient {
@@ -94,7 +116,6 @@ impl PageserverClient {
auth_token,
compression,
shards: ArcSwap::new(Arc::new(shards)),
retry: Retry,
})
}
@@ -142,13 +163,15 @@ impl PageserverClient {
&self,
req: page_api::CheckRelExistsRequest,
) -> tonic::Result<page_api::CheckRelExistsResponse> {
self.retry
.with(async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.check_rel_exists(req).await
})
.await
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.check_rel_exists(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
}
/// Returns the total size of a database, as # of bytes.
@@ -157,13 +180,15 @@ impl PageserverClient {
&self,
req: page_api::GetDbSizeRequest,
) -> tonic::Result<page_api::GetDbSizeResponse> {
self.retry
.with(async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_db_size(req).await
})
.await
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.get_db_size(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
}
/// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
@@ -193,6 +218,8 @@ impl PageserverClient {
return Err(tonic::Status::invalid_argument("request attempt must be 0"));
}
debug!("sending request: {req:?}");
// The shards may change while we're fetching pages. We execute the request using a stable
// view of the shards (especially important for requests that span shards), but retry the
// top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
@@ -201,13 +228,16 @@ impl PageserverClient {
//
// TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
// once we figure out how to handle these.
self.retry
.with(async |attempt| {
let mut req = req.clone();
req.request_id.attempt = attempt as u32;
Self::get_page_with_shards(req, &self.shards.load_full()).await
})
.await
let resp = Self::with_retries(CALL_TIMEOUT, async |attempt| {
let mut req = req.clone();
req.request_id.attempt = attempt as u32;
let shards = self.shards.load_full();
Self::with_timeout(REQUEST_TIMEOUT, Self::get_page_with_shards(req, &shards)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
}
/// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
@@ -246,7 +276,7 @@ impl PageserverClient {
req: page_api::GetPageRequest,
shard: &Shard,
) -> tonic::Result<page_api::GetPageResponse> {
let stream = shard.stream(req.request_class.is_bulk()).await;
let mut stream = shard.stream(Self::is_bulk(&req)).await?;
let resp = stream.send(req.clone()).await?;
// Convert per-request errors into a tonic::Status.
@@ -290,13 +320,15 @@ impl PageserverClient {
&self,
req: page_api::GetRelSizeRequest,
) -> tonic::Result<page_api::GetRelSizeResponse> {
self.retry
.with(async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_rel_size(req).await
})
.await
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.get_rel_size(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
}
/// Fetches an SLRU segment.
@@ -305,13 +337,50 @@ impl PageserverClient {
&self,
req: page_api::GetSlruSegmentRequest,
) -> tonic::Result<page_api::GetSlruSegmentResponse> {
self.retry
.with(async |_| {
// SLRU segments are only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_slru_segment(req).await
})
.await
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// SLRU segments are only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.get_slru_segment(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
}
/// Runs the given async closure with retries up to the given timeout. Only certain gRPC status
/// codes are retried, see [`Retry::should_retry`]. Returns `DeadlineExceeded` on timeout.
async fn with_retries<T, F, O>(timeout: Duration, f: F) -> tonic::Result<T>
where
F: FnMut(usize) -> O, // pass attempt number, starting at 0
O: Future<Output = tonic::Result<T>>,
{
Retry {
timeout: Some(timeout),
base_backoff: BASE_BACKOFF,
max_backoff: MAX_BACKOFF,
}
.with(f)
.await
}
/// Runs the given future with a timeout. Returns `DeadlineExceeded` on timeout.
async fn with_timeout<T>(
timeout: Duration,
f: impl Future<Output = tonic::Result<T>>,
) -> tonic::Result<T> {
let started = Instant::now();
tokio::time::timeout(timeout, f).await.map_err(|_| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
started.elapsed().as_secs_f64()
))
})?
}
/// Returns true if the request is considered a bulk request and should use the bulk pool.
fn is_bulk(req: &page_api::GetPageRequest) -> bool {
req.block_numbers.len() >= BULK_THRESHOLD_BATCH_SIZE
}
}
@@ -440,15 +509,23 @@ impl Shards {
}
}
/// A single shard. Uses dedicated resource pools with the following structure:
/// A single shard. Has dedicated resource pools with the following structure:
///
/// * Channel pool: unbounded.
/// * Unary client pool: MAX_UNARY_CLIENTS.
/// * Stream client pool: unbounded.
/// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH.
/// * Bulk channel pool: unbounded.
/// * Channel pool: MAX_CLIENTS_PER_CHANNEL.
/// * Client pool: unbounded.
/// * Stream pool: unbounded.
/// * Bulk channel pool: MAX_BULK_CLIENTS_PER_CHANNEL.
/// * Bulk client pool: unbounded.
/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH.
/// * Bulk stream pool: unbounded.
///
/// We use a separate bulk channel pool with a lower concurrency limit for large batch requests.
/// This avoids TCP-level head-of-line blocking, and also concentrates large window sizes on a
/// smaller set of streams/connections, which presumably reduces memory use. Neither of these pools
/// are bounded, nor do they pipeline requests, so the latency characteristics should be mostly
/// similar (except for TCP transmission time).
///
/// TODO: since we never use bounded pools, we could consider removing the pool limiters. However,
/// the code is fairly trivial, so we may as well keep them around for now in case we need them.
struct Shard {
/// The shard ID.
id: ShardIndex,
@@ -456,7 +533,7 @@ struct Shard {
client_pool: Arc<ClientPool>,
/// GetPage stream pool.
stream_pool: Arc<StreamPool>,
/// GetPage stream pool for bulk requests, e.g. prefetches.
/// GetPage stream pool for bulk requests.
bulk_stream_pool: Arc<StreamPool>,
}
@@ -470,50 +547,30 @@ impl Shard {
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> {
// Common channel pool for unary and stream requests. Bounded by client/stream pools.
let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?;
// Client pool for unary requests.
// Shard pools for unary requests and non-bulk GetPage requests.
let client_pool = ClientPool::new(
channel_pool.clone(),
ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?,
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
compression,
Some(MAX_UNARY_CLIENTS),
None, // unbounded
);
let stream_pool = StreamPool::new(client_pool.clone(), None); // unbounded
// GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients,
// but shares a channel pool with it (as it's unbounded).
let stream_pool = StreamPool::new(
ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
compression,
None, // unbounded, limited by stream pool
),
Some(MAX_STREAMS),
MAX_STREAM_QUEUE_DEPTH,
);
// Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools
// to avoid head-of-line blocking of latency-sensitive requests.
// Bulk GetPage stream pool for large batches (prefetches, sequential scans, vacuum, etc.).
let bulk_stream_pool = StreamPool::new(
ClientPool::new(
ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?,
ChannelPool::new(url, MAX_BULK_CLIENTS_PER_CHANNEL)?,
tenant_id,
timeline_id,
shard_id,
auth_token,
compression,
None, // unbounded, limited by stream pool
None, // unbounded,
),
Some(MAX_BULK_STREAMS),
MAX_BULK_STREAM_QUEUE_DEPTH,
None, // unbounded
);
Ok(Self {
@@ -525,19 +582,23 @@ impl Shard {
}
/// Returns a pooled client for this shard.
#[instrument(skip_all)]
async fn client(&self) -> tonic::Result<ClientGuard> {
self.client_pool
.get()
.await
.map_err(|err| tonic::Status::internal(format!("failed to get client: {err}")))
warn_slow(
"client pool acquisition",
SLOW_THRESHOLD,
pin!(self.client_pool.get()),
)
.await
}
/// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream
/// pool (e.g. for prefetches).
async fn stream(&self, bulk: bool) -> StreamGuard {
match bulk {
false => self.stream_pool.get().await,
true => self.bulk_stream_pool.get().await,
}
/// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk pool.
#[instrument(skip_all, fields(bulk))]
async fn stream(&self, bulk: bool) -> tonic::Result<StreamGuard> {
let pool = match bulk {
false => &self.stream_pool,
true => &self.bulk_stream_pool,
};
warn_slow("stream pool acquisition", SLOW_THRESHOLD, pin!(pool.get())).await
}
}

View File

@@ -9,19 +9,36 @@
//!
//! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients
//! can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a
//! per-channel client limit. Channels may be closed when they are no longer used by any clients.
//! per-channel client limit. Channels are closed immediately when empty, and indirectly rely on
//! client/stream idle timeouts.
//!
//! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared)
//! channel from the ChannelPool for the client's lifetime. A client can only be acquired by a
//! single caller at a time, and is returned to the pool when dropped. Idle clients may be removed
//! from the pool after some time, to free up the channel.
//! single caller at a time, and is returned to the pool when dropped. Idle clients are removed
//! from the pool after a while to free up resources.
//!
//! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the
//! ClientPool for the stream's lifetime. Internal streams are not exposed to callers; instead, it
//! returns a guard that can be used to send a single request, to properly enforce queue depth and
//! route responses. Internally, the pool will reuse or spin up a suitable stream for the request,
//! possibly pipelining multiple requests from multiple callers on the same stream (up to some
//! queue depth). Idle streams may be removed from the pool after a while to free up the client.
//! ClientPool for the stream's lifetime. A stream can only be acquired by a single caller at a
//! time, and is returned to the pool when dropped. Idle streams are removed from the pool after
//! a while to free up resources.
//!
//! The stream only supports sending a single, synchronous request at a time, and does not support
//! pipelining multiple requests from different callers onto the same stream -- instead, we scale
//! out concurrent streams to improve throughput. There are many reasons for this design choice:
//!
//! * It (mostly) eliminates head-of-line blocking. A single stream is processed sequentially by
//! a single server task, which may block e.g. on layer downloads, LSN waits, etc.
//!
//! * Cancellation becomes trivial, by closing the stream. Otherwise, if a caller goes away
//! (e.g. because of a timeout), the request would still be processed by the server and block
//! requests behind it in the stream. It might even block its own timeout retry.
//!
//! * Stream scheduling becomes significantly simpler and cheaper.
//!
//! * Individual callers can still use client-side batching for pipelining.
//!
//! * Idle streams are cheap. Benchmarks show that an idle GetPage stream takes up about 26 KB
//! per stream (2.5 GB for 100,000 streams), so we can afford to scale out.
//!
//! Each channel corresponds to one TCP connection. Each client unary request and each stream
//! corresponds to one HTTP/2 stream and server task.
@@ -29,33 +46,31 @@
//! TODO: error handling (including custom error types).
//! TODO: observability.
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::num::NonZero;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
use futures::StreamExt as _;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
use futures::{Stream, StreamExt as _};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch};
use tokio_stream::wrappers::WatchStream;
use tokio_util::sync::CancellationToken;
use tonic::codec::CompressionEncoding;
use tonic::transport::{Channel, Endpoint};
use tracing::{error, warn};
use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::ShardIndex;
/// Reap channels/clients/streams that have been idle for this long.
/// Reap clients/streams that have been idle for this long. Channels are reaped immediately when
/// empty, and indirectly rely on the client/stream idle timeouts.
///
/// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to
/// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle.
/// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we
/// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to
/// account for it when setting the reap threshold. Alternatively, we can immediately reap empty
/// channels, and/or stream pool clients.
/// A stream's client will be reaped after 2x the idle threshold (first stream the client), but
/// that's okay -- if the stream closes abruptly (e.g. due to timeout or cancellation), we want to
/// keep its client around in the pool for a while.
const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) {
false => Duration::from_secs(180),
true => Duration::from_secs(1), // exercise reaping in tests
@@ -83,8 +98,6 @@ pub struct ChannelPool {
max_clients_per_channel: NonZero<usize>,
/// Open channels.
channels: Mutex<BTreeMap<ChannelID, ChannelEntry>>,
/// Reaps idle channels.
idle_reaper: Reaper,
/// Channel ID generator.
next_channel_id: AtomicUsize,
}
@@ -96,9 +109,6 @@ struct ChannelEntry {
channel: Channel,
/// Number of clients using this channel.
clients: usize,
/// The channel has been idle (no clients) since this time. None if channel is in use.
/// INVARIANT: Some if clients == 0, otherwise None.
idle_since: Option<Instant>,
}
impl ChannelPool {
@@ -108,15 +118,12 @@ impl ChannelPool {
E: TryInto<Endpoint> + Send + Sync + 'static,
<E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
{
let pool = Arc::new(Self {
Ok(Arc::new(Self {
endpoint: endpoint.try_into()?,
max_clients_per_channel,
channels: Mutex::default(),
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
next_channel_id: AtomicUsize::default(),
});
pool.idle_reaper.spawn(&pool);
Ok(pool)
}))
}
/// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel.
@@ -137,22 +144,17 @@ impl ChannelPool {
let mut channels = self.channels.lock().unwrap();
// Try to find an existing channel with available capacity. We check entries in BTreeMap
// order, to fill up the lower-ordered channels first. The ClientPool also prefers clients
// with lower-ordered channel IDs first. This will cluster clients in lower-ordered
// order, to fill up the lower-ordered channels first. The client/stream pools also prefer
// clients with lower-ordered channel IDs first. This will cluster clients in lower-ordered
// channels, and free up higher-ordered channels such that they can be reaped.
for (&id, entry) in channels.iter_mut() {
assert!(
entry.clients <= self.max_clients_per_channel.get(),
"channel overflow"
);
assert_eq!(
entry.idle_since.is_some(),
entry.clients == 0,
"incorrect channel idle state"
);
assert_ne!(entry.clients, 0, "empty channel not reaped");
if entry.clients < self.max_clients_per_channel.get() {
entry.clients += 1;
entry.idle_since = None;
return ChannelGuard {
pool: Arc::downgrade(self),
id,
@@ -169,7 +171,6 @@ impl ChannelPool {
let entry = ChannelEntry {
channel: channel.clone(),
clients: 1, // account for the guard below
idle_since: None,
};
channels.insert(id, entry);
@@ -181,20 +182,6 @@ impl ChannelPool {
}
}
impl Reapable for ChannelPool {
/// Reaps channels that have been idle since before the cutoff.
fn reap_idle(&self, cutoff: Instant) {
self.channels.lock().unwrap().retain(|_, entry| {
let Some(idle_since) = entry.idle_since else {
assert_ne!(entry.clients, 0, "empty channel not marked idle");
return true;
};
assert_eq!(entry.clients, 0, "idle channel has clients");
idle_since >= cutoff
})
}
}
/// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`,
/// since the gRPC client requires an owned `Channel`.
pub struct ChannelGuard {
@@ -211,7 +198,7 @@ impl ChannelGuard {
}
}
/// Returns the channel to the pool.
/// Returns the channel to the pool. The channel is closed when empty.
impl Drop for ChannelGuard {
fn drop(&mut self) {
let Some(pool) = self.pool.upgrade() else {
@@ -220,11 +207,12 @@ impl Drop for ChannelGuard {
let mut channels = pool.channels.lock().unwrap();
let entry = channels.get_mut(&self.id).expect("unknown channel");
assert!(entry.idle_since.is_none(), "active channel marked idle");
assert!(entry.clients > 0, "channel underflow");
entry.clients -= 1;
// Reap empty channels immediately.
if entry.clients == 0 {
entry.idle_since = Some(Instant::now()); // mark channel as idle
channels.remove(&self.id);
}
}
}
@@ -253,8 +241,7 @@ pub struct ClientPool {
///
/// The first client in the map will be acquired next. The map is sorted by client ID, which in
/// turn is sorted by its channel ID, such that we prefer acquiring idle clients from
/// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle
/// clients are reaped.
/// lower-ordered channels. This allows us to free up and reap higher-ordered channels.
idle: Mutex<BTreeMap<ClientID, ClientEntry>>,
/// Reaps idle clients.
idle_reaper: Reaper,
@@ -310,7 +297,7 @@ impl ClientPool {
/// This is moderately performance-sensitive. It is called for every unary request, but these
/// establish a new gRPC stream per request so they're already expensive. GetPage requests use
/// the `StreamPool` instead.
pub async fn get(self: &Arc<Self>) -> anyhow::Result<ClientGuard> {
pub async fn get(self: &Arc<Self>) -> tonic::Result<ClientGuard> {
// Acquire a permit if the pool is bounded.
let mut permit = None;
if let Some(limiter) = self.limiter.clone() {
@@ -328,7 +315,7 @@ impl ClientPool {
});
}
// Slow path: construct a new client.
// Construct a new client.
let mut channel_guard = self.channel_pool.get();
let client = page_api::Client::new(
channel_guard.take(),
@@ -337,7 +324,8 @@ impl ClientPool {
self.shard_id,
self.auth_token.clone(),
self.compression,
)?;
)
.map_err(|err| tonic::Status::internal(format!("failed to create client: {err}")))?;
Ok(ClientGuard {
pool: Arc::downgrade(self),
@@ -407,287 +395,187 @@ impl Drop for ClientGuard {
/// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream
/// acquires a client from the inner `ClientPool` for the stream's lifetime.
///
/// Individual streams are not exposed to callers -- instead, the returned guard can be used to send
/// a single request and await the response. Internally, requests are multiplexed across streams and
/// channels. This allows proper queue depth enforcement and response routing.
/// Individual streams only send a single request at a time, and do not pipeline multiple callers
/// onto the same stream. Instead, we scale out the number of concurrent streams. This is primarily
/// to eliminate head-of-line blocking. See the module documentation for more details.
///
/// TODO: consider making this generic over request and response types; not currently needed.
pub struct StreamPool {
/// The client pool to acquire clients from. Must be unbounded.
client_pool: Arc<ClientPool>,
/// All pooled streams.
/// Idle pooled streams. Acquired streams are removed from here and returned on drop.
///
/// Incoming requests will be sent over an existing stream with available capacity. If all
/// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each
/// stream has an associated Tokio task that processes requests and responses.
streams: Mutex<HashMap<StreamID, StreamEntry>>,
/// The max number of concurrent streams, or None if unbounded.
max_streams: Option<NonZero<usize>>,
/// The max number of concurrent requests per stream.
max_queue_depth: NonZero<usize>,
/// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`.
/// None if the pool is unbounded.
/// The first stream in the map will be acquired next. The map is sorted by stream ID, which is
/// equivalent to the client ID and in turn sorted by its channel ID. This way we prefer
/// acquiring idle streams from lower-ordered channels, which allows us to free up and reap
/// higher-ordered channels.
idle: Mutex<BTreeMap<StreamID, StreamEntry>>,
/// Limits the max number of concurrent streams. None if the pool is unbounded.
limiter: Option<Arc<Semaphore>>,
/// Reaps idle streams.
idle_reaper: Reaper,
/// Stream ID generator.
next_stream_id: AtomicUsize,
}
type StreamID = usize;
type RequestSender = Sender<(page_api::GetPageRequest, ResponseSender)>;
type RequestReceiver = Receiver<(page_api::GetPageRequest, ResponseSender)>;
type ResponseSender = oneshot::Sender<tonic::Result<page_api::GetPageResponse>>;
/// The stream ID. Reuses the inner client ID.
type StreamID = ClientID;
/// A pooled stream.
struct StreamEntry {
/// Sends caller requests to the stream task. The stream task exits when this is dropped.
sender: RequestSender,
/// Number of in-flight requests on this stream.
queue_depth: usize,
/// The time when this stream went idle (queue_depth == 0).
/// INVARIANT: Some if queue_depth == 0, otherwise None.
idle_since: Option<Instant>,
/// The bidirectional stream.
stream: BiStream,
/// The time when this stream was last used, i.e. when it was put back into `StreamPool::idle`.
idle_since: Instant,
}
/// A bidirectional GetPage stream and its client. Can send requests and receive responses.
struct BiStream {
/// The owning client. Holds onto the channel slot while the stream is alive.
client: ClientGuard,
/// Stream for sending requests. Uses a watch channel, so it can only send a single request at a
/// time, and the caller must await the response before sending another request. This is
/// enforced by `StreamGuard::send`.
sender: watch::Sender<page_api::GetPageRequest>,
/// Stream for receiving responses.
receiver: Pin<Box<dyn Stream<Item = tonic::Result<page_api::GetPageResponse>> + Send>>,
}
impl StreamPool {
/// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth`
/// concurrent requests on each stream, and use up to `max_streams` concurrent streams.
/// Creates a new stream pool, using the given client pool. It will use up to `max_streams`
/// concurrent streams.
///
/// The client pool must be unbounded. The stream pool will enforce its own limits, and because
/// streams are long-lived they can cause persistent starvation if they exhaust the client pool.
/// The stream pool should generally have its own dedicated client pool (but it can share a
/// channel pool with others since these are always unbounded).
pub fn new(
client_pool: Arc<ClientPool>,
max_streams: Option<NonZero<usize>>,
max_queue_depth: NonZero<usize>,
) -> Arc<Self> {
pub fn new(client_pool: Arc<ClientPool>, max_streams: Option<NonZero<usize>>) -> Arc<Self> {
assert!(client_pool.limiter.is_none(), "bounded client pool");
let pool = Arc::new(Self {
client_pool,
streams: Mutex::default(),
limiter: max_streams.map(|max_streams| {
Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get()))
}),
max_streams,
max_queue_depth,
idle: Mutex::default(),
limiter: max_streams.map(|max_streams| Arc::new(Semaphore::new(max_streams.get()))),
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
next_stream_id: AtomicUsize::default(),
});
pool.idle_reaper.spawn(&pool);
pool
}
/// Acquires an available stream from the pool, or spins up a new stream async if all streams
/// are full. Returns a guard that can be used to send a single request on the stream and await
/// the response, with queue depth quota already acquired. Blocks if the pool is at capacity
/// (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight).
/// Acquires an available stream from the pool, or spins up a new stream if all streams are
/// full. Returns a guard that can be used to send requests and await the responses. Blocks if
/// the pool is full.
///
/// This is very performance-sensitive, as it is on the GetPage hot path.
///
/// TODO: this must do something more sophisticated for performance. We want:
///
/// * Cheap, concurrent access in the common case where we can use a pooled stream.
/// * Quick acquisition of pooled streams with available capacity.
/// * Prefer streams that belong to lower-numbered channels, to reap idle channels.
/// * Prefer filling up existing streams' queue depth before spinning up new streams.
/// * Don't hold a lock while spinning up new streams.
/// * Allow concurrent clients to join onto streams while they're spun up.
/// * Allow spinning up multiple streams concurrently, but don't overshoot limits.
///
/// For now, we just do something simple but inefficient (linear scan under mutex).
pub async fn get(self: &Arc<Self>) -> StreamGuard {
/// TODO: is a `Mutex<BTreeMap>` performant enough? Will it become too contended? We can't
/// trivially use e.g. DashMap or sharding, because we want to pop lower-ordered streams first
/// to free up higher-ordered channels.
pub async fn get(self: &Arc<Self>) -> tonic::Result<StreamGuard> {
// Acquire a permit if the pool is bounded.
let mut permit = None;
if let Some(limiter) = self.limiter.clone() {
permit = Some(limiter.acquire_owned().await.expect("never closed"));
}
let mut streams = self.streams.lock().unwrap();
// Look for a pooled stream with available capacity.
for (&id, entry) in streams.iter_mut() {
assert!(
entry.queue_depth <= self.max_queue_depth.get(),
"stream queue overflow"
);
assert_eq!(
entry.idle_since.is_some(),
entry.queue_depth == 0,
"incorrect stream idle state"
);
if entry.queue_depth < self.max_queue_depth.get() {
entry.queue_depth += 1;
entry.idle_since = None;
return StreamGuard {
pool: Arc::downgrade(self),
id,
sender: entry.sender.clone(),
permit,
};
}
// Fast path: acquire an idle stream from the pool.
if let Some((_, entry)) = self.idle.lock().unwrap().pop_first() {
return Ok(StreamGuard {
pool: Arc::downgrade(self),
stream: Some(entry.stream),
can_reuse: true,
permit,
});
}
// No available stream, spin up a new one. We install the stream entry in the pool first and
// return the guard, while spinning up the stream task async. This allows other callers to
// join onto this stream and also create additional streams concurrently if this fills up.
let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed);
let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get());
let entry = StreamEntry {
sender: req_tx.clone(),
queue_depth: 1, // reserve quota for this caller
idle_since: None,
};
streams.insert(id, entry);
// Spin up a new stream. Uses a watch channel to send a single request at a time, since
// `StreamGuard::send` enforces this anyway and it avoids unnecessary channel overhead.
let mut client = self.client_pool.get().await?;
if let Some(max_streams) = self.max_streams {
assert!(streams.len() <= max_streams.get(), "stream overflow");
};
let (req_tx, req_rx) = watch::channel(page_api::GetPageRequest::default());
let req_stream = WatchStream::from_changes(req_rx);
let resp_stream = client.get_pages(req_stream).await?;
let client_pool = self.client_pool.clone();
let pool = Arc::downgrade(self);
tokio::spawn(async move {
if let Err(err) = Self::run_stream(client_pool, req_rx).await {
error!("stream failed: {err}");
}
// Remove stream from pool on exit. Weak reference to avoid holding the pool alive.
if let Some(pool) = pool.upgrade() {
let entry = pool.streams.lock().unwrap().remove(&id);
assert!(entry.is_some(), "unknown stream ID: {id}");
}
});
StreamGuard {
Ok(StreamGuard {
pool: Arc::downgrade(self),
id,
sender: req_tx,
stream: Some(BiStream {
client,
sender: req_tx,
receiver: Box::pin(resp_stream),
}),
can_reuse: true,
permit,
}
}
/// Runs a stream task. This acquires a client from the `ClientPool` and establishes a
/// bidirectional GetPage stream, then forwards requests and responses between callers and the
/// stream. It does not track or enforce queue depths -- that's done by `get()` since it must be
/// atomic with pool stream acquisition.
///
/// The task exits when the request channel is closed, or on a stream error. The caller is
/// responsible for removing the stream from the pool on exit.
async fn run_stream(
client_pool: Arc<ClientPool>,
mut caller_rx: RequestReceiver,
) -> anyhow::Result<()> {
// Acquire a client from the pool and create a stream.
let mut client = client_pool.get().await?;
// NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could
// theoretically deadlock if both the client and server block on sends (since we're not
// reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and
// low queue depths, but it was seen to happen with the libpq protocol so better safe than
// sorry. It should never buffer more than the queue depth anyway, but using an unbounded
// channel guarantees that it will never block.
let (req_tx, req_rx) = mpsc::unbounded_channel();
let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx);
let mut resp_stream = client.get_pages(req_stream).await?;
// Track caller response channels by request ID. If the task returns early, these response
// channels will be dropped and the waiting callers will receive an error.
//
// NB: this will leak entries if the server doesn't respond to a request (by request ID).
// It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and
// block further use. But we could consider reaping closed channels after some time.
let mut callers = HashMap::new();
// Process requests and responses.
loop {
tokio::select! {
// Receive requests from callers and send them to the stream.
req = caller_rx.recv() => {
// Shut down if request channel is closed.
let Some((req, resp_tx)) = req else {
return Ok(());
};
// Store the response channel by request ID.
if callers.contains_key(&req.request_id) {
// Error on request ID duplicates. Ignore callers that went away.
_ = resp_tx.send(Err(tonic::Status::invalid_argument(
format!("duplicate request ID: {}", req.request_id),
)));
continue;
}
callers.insert(req.request_id, resp_tx);
// Send the request on the stream. Bail out if the stream is closed.
req_tx.send(req).map_err(|_| {
tonic::Status::unavailable("stream closed")
})?;
}
// Receive responses from the stream and send them to callers.
resp = resp_stream.next() => {
// Shut down if the stream is closed, and bail out on stream errors.
let Some(resp) = resp.transpose()? else {
return Ok(())
};
// Send the response to the caller. Ignore errors if the caller went away.
let Some(resp_tx) = callers.remove(&resp.request_id) else {
warn!("received response for unknown request ID: {}", resp.request_id);
continue;
};
_ = resp_tx.send(Ok(resp));
}
}
}
})
}
}
impl Reapable for StreamPool {
/// Reaps streams that have been idle since before the cutoff.
fn reap_idle(&self, cutoff: Instant) {
self.streams.lock().unwrap().retain(|_, entry| {
let Some(idle_since) = entry.idle_since else {
assert_ne!(entry.queue_depth, 0, "empty stream not marked idle");
return true;
};
assert_eq!(entry.queue_depth, 0, "idle stream has requests");
idle_since >= cutoff
});
self.idle
.lock()
.unwrap()
.retain(|_, entry| entry.idle_since >= cutoff);
}
}
/// A pooled stream reference. Can be used to send a single request, to properly enforce queue
/// depth. Queue depth is already reserved and will be returned on drop.
/// A stream acquired from the pool. Returned to the pool when dropped, unless there are still
/// in-flight requests on the stream, or the stream failed.
pub struct StreamGuard {
pool: Weak<StreamPool>,
id: StreamID,
sender: RequestSender,
stream: Option<BiStream>, // Some until dropped
can_reuse: bool, // returned to pool if true
permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
}
impl StreamGuard {
/// Sends a request on the stream and awaits the response. Consumes the guard, since it's only
/// valid for a single request (to enforce queue depth). This also drops the guard on return and
/// returns the queue depth quota to the pool.
/// Sends a request on the stream and awaits the response. If the future is dropped before it
/// resolves (e.g. due to a timeout or cancellation), the stream will be closed to cancel the
/// request and is not returned to the pool. The same is true if the stream errors, in which
/// case the caller can't send further requests on the stream.
///
/// The `GetPageRequest::request_id` must be unique across in-flight requests.
/// We only support sending a single request at a time, to eliminate head-of-line blocking. See
/// module documentation for details.
///
/// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status`
/// to avoid tearing down the stream for per-request errors. Callers must check this.
pub async fn send(
self,
&mut self,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
let (resp_tx, resp_rx) = oneshot::channel();
let req_id = req.request_id;
let stream = self.stream.as_mut().expect("not dropped");
self.sender
.send((req, resp_tx))
.await
// Mark the stream as not reusable while the request is in flight. We can't return the
// stream to the pool until we receive the response, to avoid head-of-line blocking and
// stale responses. Failed streams can't be reused either.
if !self.can_reuse {
return Err(tonic::Status::internal("stream can't be reused"));
}
self.can_reuse = false;
// Send the request and receive the response.
//
// NB: this uses a watch channel, so it's unsafe to change this code to pipeline requests.
stream
.sender
.send(req)
.map_err(|_| tonic::Status::unavailable("stream closed"))?;
resp_rx
let resp = stream
.receiver
.next()
.await
.map_err(|_| tonic::Status::unavailable("stream closed"))?
.ok_or_else(|| tonic::Status::unavailable("stream closed"))??;
if resp.request_id != req_id {
return Err(tonic::Status::internal(format!(
"response ID {} does not match request ID {}",
resp.request_id, req_id
)));
}
// Success, mark the stream as reusable.
self.can_reuse = true;
Ok(resp)
}
}
@@ -697,26 +585,21 @@ impl Drop for StreamGuard {
return; // pool was dropped
};
// Release the queue depth reservation on drop. This can prematurely decrement it if dropped
// before the response is received, but that's okay.
//
// TODO: actually, it's probably not okay. Queue depth release should be moved into the
// stream task, such that it continues to account for the queue depth slot until the server
// responds. Otherwise, if a slow request times out and keeps blocking the stream, the
// server will keep waiting on it and we can pile on subsequent requests (including the
// timeout retry) in the same stream and get blocked. But we may also want to avoid blocking
// requests on e.g. LSN waits and layer downloads, instead returning early to free up the
// stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line
// blocking. TBD.
let mut streams = pool.streams.lock().unwrap();
let entry = streams.get_mut(&self.id).expect("unknown stream");
assert!(entry.idle_since.is_none(), "active stream marked idle");
assert!(entry.queue_depth > 0, "stream queue underflow");
entry.queue_depth -= 1;
if entry.queue_depth == 0 {
entry.idle_since = Some(Instant::now()); // mark stream as idle
// If the stream isn't reusable, it can't be returned to the pool.
if !self.can_reuse {
return;
}
// Place the idle stream back into the pool.
let entry = StreamEntry {
stream: self.stream.take().expect("dropped once"),
idle_since: Instant::now(),
};
pool.idle
.lock()
.unwrap()
.insert(entry.stream.client.id, entry);
_ = self.permit; // returned on drop, referenced for visibility
}
}

View File

@@ -1,5 +1,6 @@
use std::time::Duration;
use futures::future::pending;
use tokio::time::Instant;
use tracing::{error, info, warn};
@@ -8,60 +9,54 @@ use utils::backoff::exponential_backoff_duration;
/// A retry handler for Pageserver gRPC requests.
///
/// This is used instead of backoff::retry for better control and observability.
pub struct Retry;
pub struct Retry {
/// Timeout across all retry attempts. If None, retries forever.
pub timeout: Option<Duration>,
/// The initial backoff duration. The first retry does not use a backoff.
pub base_backoff: Duration,
/// The maximum backoff duration.
pub max_backoff: Duration,
}
impl Retry {
/// The per-request timeout.
// TODO: tune these, and/or make them configurable. Should we retry forever?
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// The total timeout across all attempts
const TOTAL_TIMEOUT: Duration = Duration::from_secs(60);
/// The initial backoff duration.
const BASE_BACKOFF: Duration = Duration::from_millis(10);
/// The maximum backoff duration.
const MAX_BACKOFF: Duration = Duration::from_secs(10);
/// If true, log successful requests. For debugging.
const LOG_SUCCESS: bool = false;
/// Runs the given async closure with timeouts and retries (exponential backoff), passing the
/// attempt number starting at 0. Logs errors, using the current tracing span for context.
/// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors,
/// using the current tracing span for context.
///
/// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default
/// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`].
/// Only certain gRPC status codes are retried, see [`Self::should_retry`].
pub async fn with<T, F, O>(&self, mut f: F) -> tonic::Result<T>
where
F: FnMut(usize) -> O, // takes attempt number, starting at 0
F: FnMut(usize) -> O, // pass attempt number, starting at 0
O: Future<Output = tonic::Result<T>>,
{
let started = Instant::now();
let deadline = started + Self::TOTAL_TIMEOUT;
let deadline = self.timeout.map(|timeout| started + timeout);
let mut last_error = None;
let mut retries = 0;
loop {
// Set up a future to wait for the backoff (if any) and run the request with a timeout.
// Set up a future to wait for the backoff, if any, and run the closure.
let backoff_and_try = async {
// NB: sleep() always sleeps 1ms, even when given a 0 argument. See:
// https://github.com/tokio-rs/tokio/issues/6866
if let Some(backoff) = Self::backoff_duration(retries) {
if let Some(backoff) = self.backoff_duration(retries) {
tokio::time::sleep(backoff).await;
}
let request_started = Instant::now();
tokio::time::timeout(Self::REQUEST_TIMEOUT, f(retries))
.await
.map_err(|_| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
request_started.elapsed().as_secs_f64()
))
})?
f(retries).await
};
// Wait for the backoff and request, or bail out if the total timeout is exceeded.
// Set up a future for the timeout, if any.
let timeout = async {
match deadline {
Some(deadline) => tokio::time::sleep_until(deadline).await,
None => pending().await,
}
};
// Wait for the backoff and request, or bail out if the timeout is exceeded.
let result = tokio::select! {
result = backoff_and_try => result,
_ = tokio::time::sleep_until(deadline) => {
_ = timeout => {
let last_error = last_error.unwrap_or_else(|| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
@@ -79,7 +74,7 @@ impl Retry {
match result {
// Success, return the result.
Ok(result) => {
if retries > 0 || Self::LOG_SUCCESS {
if retries > 0 {
info!(
"request succeeded after {retries} retries in {:.3}s",
started.elapsed().as_secs_f64(),
@@ -112,12 +107,13 @@ impl Retry {
}
}
/// Returns the backoff duration for the given retry attempt, or None for no backoff.
fn backoff_duration(retry: usize) -> Option<Duration> {
/// Returns the backoff duration for the given retry attempt, or None for no backoff. The first
/// attempt and first retry never backs off, so this returns None for 0 and 1 retries.
fn backoff_duration(&self, retries: usize) -> Option<Duration> {
let backoff = exponential_backoff_duration(
retry as u32,
Self::BASE_BACKOFF.as_secs_f64(),
Self::MAX_BACKOFF.as_secs_f64(),
(retries as u32).saturating_sub(1), // first retry does not back off
self.base_backoff.as_secs_f64(),
self.max_backoff.as_secs_f64(),
);
(!backoff.is_zero()).then_some(backoff)
}

View File

@@ -52,7 +52,7 @@ impl From<ProtocolError> for tonic::Status {
}
/// The LSN a request should read at.
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, Default)]
pub struct ReadLsn {
/// The request's read LSN.
pub request_lsn: Lsn,
@@ -332,7 +332,7 @@ impl From<GetDbSizeResponse> for proto::GetDbSizeResponse {
}
/// Requests one or more pages.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct GetPageRequest {
/// A request ID. Will be included in the response. Should be unique for in-flight requests on
/// the stream.
@@ -433,12 +433,13 @@ impl From<RequestID> for proto::RequestId {
}
/// A GetPage request class.
#[derive(Clone, Copy, Debug, strum_macros::Display)]
#[derive(Clone, Copy, Debug, Default, strum_macros::Display)]
pub enum GetPageClass {
/// Unknown class. For backwards compatibility: used when an older client version sends a class
/// that a newer server version has removed.
Unknown,
/// A normal request. This is the default.
#[default]
Normal,
/// A prefetch request. NB: can only be classified on pg < 18.
Prefetch,
@@ -446,19 +447,6 @@ pub enum GetPageClass {
Background,
}
impl GetPageClass {
/// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than
/// latency-sensitive).
pub fn is_bulk(&self) -> bool {
match self {
Self::Unknown => false,
Self::Normal => false,
Self::Prefetch => true,
Self::Background => true,
}
}
}
impl From<proto::GetPageClass> for GetPageClass {
fn from(pb: proto::GetPageClass) -> Self {
match pb {

View File

@@ -16,6 +16,7 @@ futures.workspace = true
hdrhistogram.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
pprof.workspace = true
rand.workspace = true
reqwest.workspace = true
serde.workspace = true

View File

@@ -0,0 +1,127 @@
use std::sync::Arc;
use anyhow::anyhow;
use futures::StreamExt;
use tonic::transport::Endpoint;
use tracing::info;
use pageserver_page_api::{GetPageClass, GetPageRequest, GetPageStatusCode, ReadLsn, RelTag};
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::shard::ShardIndex;
/// Starts a large number of idle gRPC GetPage streams.
#[derive(clap::Parser)]
pub(crate) struct Args {
/// The Pageserver to connect to. Must use grpc://.
#[clap(long, default_value = "grpc://localhost:51051")]
server: String,
/// The Pageserver HTTP API.
#[clap(long, default_value = "http://localhost:9898")]
http_server: String,
/// The number of streams to open.
#[clap(long, default_value = "100000")]
count: usize,
/// Number of streams per connection.
#[clap(long, default_value = "100")]
per_connection: usize,
/// Send a single GetPage request on each stream.
#[clap(long, default_value_t = false)]
send_request: bool,
}
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
rt.block_on(main_impl(args))
}
async fn main_impl(args: Args) -> anyhow::Result<()> {
// Discover a tenant and timeline to use.
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(),
args.http_server.clone(),
None,
));
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client,
crate::util::cli::targets::Spec {
limit_to_first_n_targets: Some(1),
targets: None,
},
)
.await?;
let ttid = timelines
.first()
.ok_or_else(|| anyhow!("no timelines found"))?;
// Set up the initial client.
let endpoint = Endpoint::from_shared(args.server.clone())?;
let connect = async || {
pageserver_page_api::Client::new(
endpoint.connect().await?,
ttid.tenant_id,
ttid.timeline_id,
ShardIndex::unsharded(),
None,
None,
)
};
let mut client = connect().await?;
let mut streams = Vec::with_capacity(args.count);
// Create streams.
for i in 0..args.count {
if i % 100 == 0 {
info!("opened {}/{} streams", i, args.count);
}
if i % args.per_connection == 0 && i > 0 {
client = connect().await?;
}
let (req_tx, req_rx) = tokio::sync::mpsc::unbounded_channel();
let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx);
let mut resp_stream = client.get_pages(req_stream).await?;
// Send request if specified.
if args.send_request {
req_tx.send(GetPageRequest {
request_id: 1.into(),
request_class: GetPageClass::Normal,
read_lsn: ReadLsn {
request_lsn: Lsn::MAX,
not_modified_since_lsn: Some(Lsn(1)),
},
rel: RelTag {
spcnode: 1664, // pg_global
dbnode: 0, // shared database
relnode: 1262, // pg_authid
forknum: 0, // init
},
block_numbers: vec![0],
})?;
let resp = resp_stream
.next()
.await
.transpose()?
.ok_or_else(|| anyhow!("no response"))?;
if resp.status_code != GetPageStatusCode::Ok {
return Err(anyhow!("{} response", resp.status_code));
}
}
// Hold onto streams to avoid closing them.
streams.push((req_tx, resp_stream));
}
info!("opened {} streams, sleeping", args.count);
// Block forever, to hold the idle streams open for inspection.
futures::future::pending::<()>().await;
Ok(())
}

View File

@@ -1,4 +1,7 @@
use std::fs::File;
use clap::Parser;
use tracing::info;
use utils::logging;
/// Re-usable pieces of code that aren't CLI-specific.
@@ -17,38 +20,73 @@ mod cmd {
pub(super) mod aux_files;
pub(super) mod basebackup;
pub(super) mod getpage_latest_lsn;
pub(super) mod idle_streams;
pub(super) mod ondemand_download_churn;
pub(super) mod trigger_initial_size_calculation;
}
/// Component-level performance test for pageserver.
#[derive(clap::Parser)]
enum Args {
struct Args {
/// Takes a client CPU profile into profile.svg. The benchmark must exit cleanly before it's
/// written, e.g. via --runtime.
#[arg(long)]
profile: bool,
#[command(subcommand)]
subcommand: Subcommand,
}
#[derive(clap::Subcommand)]
enum Subcommand {
Basebackup(cmd::basebackup::Args),
GetPageLatestLsn(cmd::getpage_latest_lsn::Args),
TriggerInitialSizeCalculation(cmd::trigger_initial_size_calculation::Args),
OndemandDownloadChurn(cmd::ondemand_download_churn::Args),
AuxFiles(cmd::aux_files::Args),
IdleStreams(cmd::idle_streams::Args),
}
fn main() {
fn main() -> anyhow::Result<()> {
logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stderr,
)
.unwrap();
)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
let args = Args::parse();
match args {
Args::Basebackup(args) => cmd::basebackup::main(args),
Args::GetPageLatestLsn(args) => cmd::getpage_latest_lsn::main(args),
Args::TriggerInitialSizeCalculation(args) => {
// Start a CPU profile if requested.
let mut profiler = None;
if args.profile {
profiler = Some(
pprof::ProfilerGuardBuilder::default()
.frequency(1000)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()?,
);
}
match args.subcommand {
Subcommand::Basebackup(args) => cmd::basebackup::main(args),
Subcommand::GetPageLatestLsn(args) => cmd::getpage_latest_lsn::main(args),
Subcommand::TriggerInitialSizeCalculation(args) => {
cmd::trigger_initial_size_calculation::main(args)
}
Args::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args),
Args::AuxFiles(args) => cmd::aux_files::main(args),
Subcommand::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args),
Subcommand::AuxFiles(args) => cmd::aux_files::main(args),
Subcommand::IdleStreams(args) => cmd::idle_streams::main(args),
}?;
// Generate a CPU flamegraph if requested.
if let Some(profiler) = profiler {
let report = profiler.report().build()?;
drop(profiler); // stop profiling
let file = File::create("profile.svg")?;
report.flamegraph(file)?;
info!("wrote CPU profile flamegraph to profile.svg")
}
.unwrap()
Ok(())
}

View File

@@ -194,6 +194,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
listen_http_port: m.http_port,
listen_https_port: m.https_port,
availability_zone_id: az_id.expect("Checked above"),
node_ip_addr: None,
})
}
Err(e) => {

View File

@@ -10,6 +10,7 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result, anyhow};
use bytes::Bytes;
use enumset::EnumSet;
use futures::future::join_all;
use futures::{StreamExt, TryFutureExt};
@@ -46,6 +47,7 @@ use pageserver_api::shard::{ShardCount, TenantShardId};
use postgres_ffi::PgMajorVersion;
use remote_storage::{DownloadError, GenericRemoteStorage, TimeTravelError};
use scopeguard::defer;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tenant_size_model::svg::SvgBranchKind;
use tenant_size_model::{SizeResult, StorageModel};
@@ -57,6 +59,7 @@ use utils::auth::SwappableJwtAuth;
use utils::generation::Generation;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use wal_decoder::models::record::NeonWalRecord;
use crate::config::PageServerConf;
use crate::context;
@@ -77,6 +80,7 @@ use crate::tenant::remote_timeline_client::{
};
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::ValuesReconstructState;
use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerName};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
@@ -2712,6 +2716,16 @@ async fn deletion_queue_flush(
}
}
/// Try if `GetPage@Lsn` is successful, useful for manual debugging.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct GetPageResponse {
pub page: Bytes,
pub layers_visited: u32,
pub delta_layers_visited: u32,
pub records: Vec<(Lsn, NeonWalRecord)>,
pub img: Option<(Lsn, Bytes)>,
}
async fn getpage_at_lsn_handler(
request: Request<Body>,
cancel: CancellationToken,
@@ -2762,21 +2776,24 @@ async fn getpage_at_lsn_handler_inner(
// Use last_record_lsn if no lsn is provided
let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
let page = timeline.get(key.0, lsn, &ctx).await?;
if touch {
json_response(StatusCode::OK, ())
} else {
Result::<_, ApiError>::Ok(
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(hyper::Body::from(page))
.unwrap(),
)
let mut reconstruct_state = ValuesReconstructState::new_with_debug(IoConcurrency::sequential());
let page = timeline.debug_get(key.0, lsn, &ctx, &mut reconstruct_state).await?;
let response = GetPageResponse {
page,
layers_visited: reconstruct_state.get_layers_visited(),
delta_layers_visited: reconstruct_state.get_delta_layers_visited(),
records: reconstruct_state.debug_state.records.clone(),
img: reconstruct_state.debug_state.img.clone(),
};
json_response(StatusCode::OK, response)
}
}
.instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.instrument(info_span!("timeline_debug_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}

View File

@@ -3429,7 +3429,13 @@ impl TenantShard {
.collect_vec();
for timeline in timelines {
timeline.maybe_freeze_ephemeral_layer().await;
// Include a span with the timeline ID. The parent span already has the tenant ID.
let span =
info_span!("maybe_freeze_ephemeral_layer", timeline_id = %timeline.timeline_id);
timeline
.maybe_freeze_ephemeral_layer()
.instrument(span)
.await;
}
}

View File

@@ -75,7 +75,7 @@ where
/// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
/// call, to collect more records.
///
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub(crate) struct ValueReconstructState {
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
pub(crate) img: Option<(Lsn, Bytes)>,
@@ -308,6 +308,9 @@ pub struct ValuesReconstructState {
layers_visited: u32,
delta_layers_visited: u32,
pub(crate) enable_debug: bool,
pub(crate) debug_state: ValueReconstructState,
pub(crate) io_concurrency: IoConcurrency,
num_active_ios: Arc<AtomicUsize>,
@@ -657,6 +660,23 @@ impl ValuesReconstructState {
layers_visited: 0,
delta_layers_visited: 0,
io_concurrency,
enable_debug: false,
debug_state: ValueReconstructState::default(),
num_active_ios: Arc::new(AtomicUsize::new(0)),
read_path: None,
}
}
pub(crate) fn new_with_debug(io_concurrency: IoConcurrency) -> Self {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
keys_with_image_coverage: None,
layers_visited: 0,
delta_layers_visited: 0,
io_concurrency,
enable_debug: true,
debug_state: ValueReconstructState::default(),
num_active_ios: Arc::new(AtomicUsize::new(0)),
read_path: None,
}
@@ -670,6 +690,12 @@ impl ValuesReconstructState {
self.io_concurrency.spawn_io(fut).await;
}
pub(crate) fn set_debug_state(&mut self, debug_state: &ValueReconstructState) {
if self.enable_debug {
self.debug_state = debug_state.clone();
}
}
pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
self.layers_visited += 1;
if let ReadableLayer::PersistentLayer(layer) = layer {

View File

@@ -1254,6 +1254,57 @@ impl Timeline {
}
}
#[inline(always)]
pub(crate) async fn debug_get(
&self,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
reconstruct_state: &mut ValuesReconstructState,
) -> Result<Bytes, PageReconstructError> {
if !lsn.is_valid() {
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
}
// This check is debug-only because of the cost of hashing, and because it's a double-check: we
// already checked the key against the shard_identity when looking up the Timeline from
// page_service.
debug_assert!(!self.shard_identity.is_key_disposable(&key));
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
let vectored_res = self
.debug_get_vectored_impl(query, reconstruct_state, ctx)
.await;
let key_value = vectored_res?.pop_first();
match key_value {
Some((got_key, value)) => {
if got_key != key {
error!(
"Expected {}, but singular vectored get returned {}",
key, got_key
);
Err(PageReconstructError::Other(anyhow!(
"Singular vectored get returned wrong key"
)))
} else {
value
}
}
None => Err(PageReconstructError::MissingKey(Box::new(
MissingKeyError {
keyspace: KeySpace::single(key..key.next()),
shard: self.shard_identity.get_shard_number(&key),
original_hwm_lsn: lsn,
ancestor_lsn: None,
backtrace: None,
read_path: None,
query: None,
},
))),
}
}
pub(crate) const LAYERS_VISITED_WARN_THRESHOLD: u32 = 100;
/// Look up multiple page versions at a given LSN
@@ -1548,6 +1599,98 @@ impl Timeline {
Ok(results)
}
// A copy of the get_vectored_impl method except that we store the image and wal records into `reconstruct_state`.
// This is only used in the http getpage call for debugging purpose.
pub(super) async fn debug_get_vectored_impl(
&self,
query: VersionedKeySpaceQuery,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if query.is_empty() {
return Ok(BTreeMap::default());
}
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
Some(ReadPath::new(
query.total_keyspace(),
query.high_watermark_lsn()?,
))
} else {
None
};
reconstruct_state.read_path = read_path;
let traversal_res: Result<(), _> = self
.get_vectored_reconstruct_data(query.clone(), reconstruct_state, ctx)
.await;
if let Err(err) = traversal_res {
// Wait for all the spawned IOs to complete.
// See comments on `spawn_io` inside `storage_layer` for more details.
let mut collect_futs = std::mem::take(&mut reconstruct_state.keys)
.into_values()
.map(|state| state.collect_pending_ios())
.collect::<FuturesUnordered<_>>();
while collect_futs.next().await.is_some() {}
return Err(err);
};
let reconstruct_state = Arc::new(Mutex::new(reconstruct_state));
let futs = FuturesUnordered::new();
for (key, state) in std::mem::take(&mut reconstruct_state.lock().unwrap().keys) {
let req_lsn_for_key = query.map_key_to_lsn(&key);
futs.push({
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
let rc_clone = Arc::clone(&reconstruct_state);
async move {
assert_eq!(state.situation, ValueReconstructSituation::Complete);
let converted = match state.collect_pending_ios().await {
Ok(ok) => ok,
Err(err) => {
return (key, Err(err));
}
};
DELTAS_PER_READ_GLOBAL.observe(converted.num_deltas() as f64);
// The walredo module expects the records to be descending in terms of Lsn.
// And we submit the IOs in that order, so, there shuold be no need to sort here.
debug_assert!(
converted
.records
.is_sorted_by_key(|(lsn, _)| std::cmp::Reverse(*lsn)),
"{converted:?}"
);
{
let mut guard = rc_clone.lock().unwrap();
guard.set_debug_state(&converted);
}
(
key,
walredo_self
.reconstruct_value(
key,
req_lsn_for_key,
converted,
RedoAttemptType::ReadPage,
)
.await,
)
}
});
}
let results = futs
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
.await;
Ok(results)
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
pub(crate) fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
@@ -1932,6 +2075,8 @@ impl Timeline {
// an ephemeral layer open forever when idle. It also freezes layers if the global limit on
// ephemeral layer bytes has been breached.
pub(super) async fn maybe_freeze_ephemeral_layer(&self) {
debug_assert_current_span_has_tenant_and_timeline_id();
let Ok(mut write_guard) = self.write_lock.try_lock() else {
// If the write lock is held, there is an active wal receiver: rolling open layers
// is their responsibility while they hold this lock.

View File

@@ -421,7 +421,7 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
{
if (resp->tag != T_NeonGetPageResponse && resp->tag != T_NeonErrorResponse)
{
neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=" UINT64_FORMAT ", ring_flush=" UINT64_FORMAT ", ring_unused=" UINT64_FORMAT "",
resp->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
}
if (neon_protocol_version >= 3)
@@ -438,7 +438,7 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
getpage_resp->req.blkno != slot->buftag.blockNum)
{
NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC,
"Receive unexpected getpage response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}",
"Receive unexpected getpage response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno,
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), slot->buftag.forkNum, slot->buftag.blockNum);
}
@@ -447,7 +447,7 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
resp->lsn != slot->request_lsns.request_lsn ||
resp->not_modified_since != slot->request_lsns.not_modified_since)
{
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since));
}
@@ -496,9 +496,9 @@ communicator_prefetch_pump_state(void)
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
slot->my_ring_index, MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
@@ -789,9 +789,9 @@ prefetch_read(PrefetchRequest *slot)
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
"Incorrect prefetch read: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
slot->status, slot->response,
(long)slot->my_ring_index, (long)MyPState->ring_receive);
slot->my_ring_index, MyPState->ring_receive);
}
/*
@@ -816,9 +816,9 @@ prefetch_read(PrefetchRequest *slot)
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
slot->my_ring_index, MyPState->ring_receive);
}
/* update prefetch state */
@@ -852,8 +852,8 @@ prefetch_read(PrefetchRequest *slot)
* and the prefetch queue was flushed during the receive call
*/
neon_shard_log(shard_no, LOG,
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
(long) my_ring_index,
"No response from reading prefetch entry " UINT64_FORMAT ": %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
my_ring_index,
RelFileInfoFmt(BufTagGetNRelFileInfo(buftag)),
buftag.forkNum, buftag.blockNum);
return false;
@@ -1844,7 +1844,7 @@ nm_to_string(NeonMessage *msg)
NeonDbSizeResponse *msg_resp = (NeonDbSizeResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeResponse\"");
appendStringInfo(&s, ", \"db_size\": %ld}",
appendStringInfo(&s, ", \"db_size\": " INT64_FORMAT "}",
msg_resp->db_size);
appendStringInfoChar(&s, '}');
@@ -2045,7 +2045,7 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r
exists_resp->req.forknum != request.forknum)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
"Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(exists_resp->req.rinfo), exists_resp->req.forknum,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), request.forknum);
}
@@ -2058,14 +2058,14 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r
{
if (!equal_requests(resp, &request.hdr))
{
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[reqid %lx] could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
resp->reqid,
RelFileInfoFmt(rinfo),
forkNum,
@@ -2241,7 +2241,7 @@ Retry:
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[shard %d, reqid %lx] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
errmsg(NEON_TAG "[shard %d, reqid " UINT64_HEX_FORMAT "] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
slot->shard_no, resp->reqid, blockno, RelFileInfoFmt(rinfo),
forkNum, LSN_FORMAT_ARGS(reqlsns->effective_request_lsn)),
errdetail("page server returned error: %s",
@@ -2294,7 +2294,7 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *
relsize_resp->req.forknum != forknum)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
"Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(relsize_resp->req.rinfo), relsize_resp->req.forknum,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), forknum);
}
@@ -2307,14 +2307,14 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *
{
if (!equal_requests(resp, &request.hdr))
{
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[reqid %lx] could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
resp->reqid,
RelFileInfoFmt(rinfo),
forknum,
@@ -2364,7 +2364,7 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
dbsize_resp->req.dbNode != dbNode)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u}",
"Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, dbNode=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), dbsize_resp->req.dbNode,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), dbNode);
}
@@ -2377,14 +2377,14 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
{
if (!equal_requests(resp, &request.hdr))
{
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match get DB size request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[reqid %lx] could not read db size of db %u from page server at lsn %X/%08X",
errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read db size of db %u from page server at lsn %X/%08X",
resp->reqid,
dbNode, LSN_FORMAT_ARGS(request_lsns->effective_request_lsn)),
errdetail("page server returned error: %s",
@@ -2455,7 +2455,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
slru_resp->req.segno != segno)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%lluu}",
"Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%lluu}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), slru_resp->req.kind, slru_resp->req.segno,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), kind, (unsigned long long) segno);
}
@@ -2469,14 +2469,14 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
{
if (!equal_requests(resp, &request.hdr))
{
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match get SLRU segment request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[reqid %lx] could not read SLRU %d segment %llu at lsn %X/%08X",
errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read SLRU %d segment %llu at lsn %X/%08X",
resp->reqid,
kind,
(unsigned long long) segno,

View File

@@ -165,4 +165,8 @@ extern void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags);
extern TimeLineID GetWALInsertionTimeLine(void);
#endif
/* format codes not present in PG17-; but available in PG18+ */
#define INT64_HEX_FORMAT "%" INT64_MODIFIER "x"
#define UINT64_HEX_FORMAT "%" INT64_MODIFIER "x"
#endif /* NEON_PGVERSIONCOMPAT_H */

View File

@@ -6,10 +6,10 @@
use std::error::Error as _;
use http_utils::error::HttpErrorBody;
use reqwest::{IntoUrl, Method, StatusCode};
use reqwest::{IntoUrl, Method, Response, StatusCode};
use safekeeper_api::models::{
self, PullTimelineRequest, PullTimelineResponse, SafekeeperStatus, SafekeeperUtilization,
TimelineCreateRequest, TimelineStatus,
TimelineCreateRequest,
};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
@@ -161,13 +161,12 @@ impl Client {
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<TimelineStatus> {
) -> Result<Response> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}",
self.mgmt_api_endpoint, tenant_id, timeline_id
);
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
self.get(&uri).await
}
pub async fn snapshot(

View File

@@ -23,6 +23,7 @@ use safekeeper::defaults::{
DEFAULT_PARTIAL_BACKUP_CONCURRENCY, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_SSL_CERT_FILE, DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
};
use safekeeper::hadron;
use safekeeper::wal_backup::WalBackup;
use safekeeper::{
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
@@ -252,6 +253,10 @@ struct Args {
/// Run in development mode (disables security checks)
#[arg(long, help = "Run in development mode (disables security checks)")]
dev: bool,
/* BEGIN_HADRON */
#[arg(long)]
enable_pull_timeline_on_startup: bool,
/* END_HADRON */
}
// Like PathBufValueParser, but allows empty string.
@@ -435,6 +440,11 @@ async fn main() -> anyhow::Result<()> {
use_https_safekeeper_api: args.use_https_safekeeper_api,
enable_tls_wal_service_api: args.enable_tls_wal_service_api,
force_metric_collection_on_scrape: args.force_metric_collection_on_scrape,
/* BEGIN_HADRON */
advertise_pg_addr_tenant_only: None,
enable_pull_timeline_on_startup: args.enable_pull_timeline_on_startup,
hcc_base_url: None,
/* END_HADRON */
});
// initialize sentry if SENTRY_DSN is provided
@@ -529,6 +539,20 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
// Load all timelines from disk to memory.
global_timelines.init().await?;
/* BEGIN_HADRON */
if conf.enable_pull_timeline_on_startup && global_timelines.timelines_count() == 0 {
match hadron::hcc_pull_timelines(&conf, global_timelines.clone()).await {
Ok(_) => {
info!("Successfully pulled all timelines from peer safekeepers");
}
Err(e) => {
error!("Failed to pull timelines from peer safekeepers: {:?}", e);
return Err(e);
}
}
}
/* END_HADRON */
// Run everything in current thread rt, if asked.
if conf.current_thread_runtime {
info!("running in current thread runtime");

388
safekeeper/src/hadron.rs Normal file
View File

@@ -0,0 +1,388 @@
use pem::Pem;
use safekeeper_api::models::PullTimelineRequest;
use std::{collections::HashMap, env::VarError, net::IpAddr, sync::Arc, time::Duration};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use url::Url;
use utils::{backoff, id::TenantTimelineId, ip_address};
use anyhow::Result;
use pageserver_api::controller_api::{
AvailabilityZone, NodeRegisterRequest, SafekeeperTimeline, SafekeeperTimelinesResponse,
};
use crate::{
GlobalTimelines, SafeKeeperConf,
metrics::{
SK_RECOVERY_PULL_TIMELINE_ERRORS, SK_RECOVERY_PULL_TIMELINE_OKS,
SK_RECOVERY_PULL_TIMELINE_SECONDS, SK_RECOVERY_PULL_TIMELINES_SECONDS,
},
pull_timeline,
timelines_global_map::DeleteOrExclude,
};
// Extract information in the SafeKeeperConf to build a NodeRegisterRequest used to register the safekeeper with the HCC.
fn build_node_registeration_request(
conf: &SafeKeeperConf,
node_ip_addr: Option<IpAddr>,
) -> Result<NodeRegisterRequest> {
let advertise_pg_addr_with_port = conf
.advertise_pg_addr_tenant_only
.as_deref()
.expect("advertise_pg_addr_tenant_only is required to register with HCC");
// Extract host/port from the string.
let (advertise_host_addr, pg_port_str) = advertise_pg_addr_with_port.split_at(
advertise_pg_addr_with_port
.rfind(':')
.ok_or(anyhow::anyhow!("Invalid advertise_pg_addr"))?,
);
// Need the `[1..]` to remove the leading ':'.
let pg_port = pg_port_str[1..]
.parse::<u16>()
.map_err(|e| anyhow::anyhow!("Cannot parse PG port: {}", e))?;
let (_, http_port_str) = conf.listen_http_addr.split_at(
conf.listen_http_addr
.rfind(':')
.ok_or(anyhow::anyhow!("Invalid listen_http_addr"))?,
);
let http_port = http_port_str[1..]
.parse::<u16>()
.map_err(|e| anyhow::anyhow!("Cannot parse HTTP port: {}", e))?;
Ok(NodeRegisterRequest {
node_id: conf.my_id,
listen_pg_addr: advertise_host_addr.to_string(),
listen_pg_port: pg_port,
listen_http_addr: advertise_host_addr.to_string(),
listen_http_port: http_port,
node_ip_addr,
availability_zone_id: AvailabilityZone("todo".to_string()),
listen_grpc_addr: None,
listen_grpc_port: None,
listen_https_port: None,
})
}
// Retrieve the JWT token used for authenticating with HCC from the environment variable.
// Returns None if the token cannot be retrieved.
fn get_hcc_auth_token() -> Option<String> {
match std::env::var("HCC_AUTH_TOKEN") {
Ok(v) => {
tracing::info!("Loaded JWT token for authentication with HCC");
Some(v)
}
Err(VarError::NotPresent) => {
tracing::info!("No JWT token for authentication with HCC detected");
None
}
Err(_) => {
tracing::info!(
"Failed to either load to detect non-present HCC_AUTH_TOKEN environment variable"
);
None
}
}
}
async fn send_safekeeper_register_request(
request_url: &Url,
auth_token: &Option<String>,
request: &NodeRegisterRequest,
) -> Result<()> {
let client = reqwest::Client::new();
let mut req_builder = client
.post(request_url.clone())
.header("Content-Type", "application/json");
if let Some(token) = auth_token {
req_builder = req_builder.bearer_auth(token);
}
req_builder
.json(&request)
.send()
.await?
.error_for_status()?;
Ok(())
}
/// Registers this safe keeper with the HCC.
pub async fn register(conf: &SafeKeeperConf) -> Result<()> {
match conf.hcc_base_url.as_ref() {
None => {
tracing::info!("HCC base URL is not set, skipping registration");
Ok(())
}
Some(hcc_base_url) => {
// The following operations acquiring the auth token and the node IP address both read environment
// variables. It's fine for now as this `register()` function is only called once during startup.
// If we start to talk to HCC more regularly in the safekeeper we should probably consider
// refactoring things into a "HadronClusterCoordinatorClient" struct.
let auth_token = get_hcc_auth_token();
let node_ip_addr =
ip_address::read_node_ip_addr_from_env().expect("Error reading node IP address.");
let request = build_node_registeration_request(conf, node_ip_addr)?;
let cancel = CancellationToken::new();
let request_url = hcc_base_url.clone().join("/hadron-internal/v1/sk")?;
backoff::retry(
|| async {
send_safekeeper_register_request(&request_url, &auth_token, &request).await
},
|_| false,
3,
u32::MAX,
"Calling the HCC safekeeper register API",
&cancel,
)
.await
.ok_or(anyhow::anyhow!(
"Error in forever retry loop. This error should never be surfaced."
))?
}
}
}
async fn safekeeper_list_timelines_request(
conf: &SafeKeeperConf,
) -> Result<pageserver_api::controller_api::SafekeeperTimelinesResponse> {
if conf.hcc_base_url.is_none() {
tracing::info!("HCC base URL is not set, skipping registration");
return Err(anyhow::anyhow!("HCC base URL is not set"));
}
// The following operations acquiring the auth token and the node IP address both read environment
// variables. It's fine for now as this `register()` function is only called once during startup.
// If we start to talk to HCC more regularly in the safekeeper we should probably consider
// refactoring things into a "HadronClusterCoordinatorClient" struct.
let auth_token = get_hcc_auth_token();
let method = format!("/control/v1/safekeeper/{}/timelines", conf.my_id.0);
let request_url = conf.hcc_base_url.as_ref().unwrap().clone().join(&method)?;
let client = reqwest::Client::new();
let mut req_builder = client
.get(request_url.clone())
.header("Content-Type", "application/json")
.query(&[("id", conf.my_id.0)]);
if let Some(token) = auth_token {
req_builder = req_builder.bearer_auth(token);
}
let response = req_builder
.send()
.await?
.error_for_status()?
.json::<pageserver_api::controller_api::SafekeeperTimelinesResponse>()
.await?;
Ok(response)
}
// Returns true on success, false otherwise.
pub async fn hcc_pull_timeline(
timeline: SafekeeperTimeline,
conf: &SafeKeeperConf,
global_timelines: Arc<GlobalTimelines>,
nodeid_http: &HashMap<u64, String>,
) -> bool {
let mut request = PullTimelineRequest {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
http_hosts: Vec::new(),
ignore_tombstone: None,
};
for host in timeline.peers {
if host.0 == conf.my_id.0 {
continue;
}
if let Some(http_host) = nodeid_http.get(&host.0) {
request.http_hosts.push(http_host.clone());
}
}
let ca_certs = match conf
.ssl_ca_certs
.iter()
.map(Pem::contents)
.map(reqwest::Certificate::from_der)
.collect::<Result<Vec<_>, _>>()
{
Ok(result) => result,
Err(_) => {
return false;
}
};
match pull_timeline::handle_request(
request,
conf.sk_auth_token.clone(),
ca_certs,
global_timelines.clone(),
true,
)
.await
{
Ok(resp) => {
tracing::info!(
"Completed pulling tenant {} timeline {} from SK {:?}",
timeline.tenant_id,
timeline.timeline_id,
resp.safekeeper_host
);
return true;
}
Err(e) => {
tracing::error!(
"Failed to pull tenant {} timeline {} from SK {}",
timeline.tenant_id,
timeline.timeline_id,
e
);
let ttid = TenantTimelineId {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
};
// Revert the failed timeline pull.
// Notice that not found timeline returns OK also.
match global_timelines
.delete_or_exclude(&ttid, DeleteOrExclude::DeleteLocal)
.await
{
Ok(dr) => {
tracing::info!(
"Deleted tenant {} timeline {} DirExists: {}",
timeline.tenant_id,
timeline.timeline_id,
dr.dir_existed,
);
}
Err(e) => {
tracing::error!(
"Failed to delete tenant {} timeline {} from global_timelines: {}",
timeline.tenant_id,
timeline.timeline_id,
e
);
}
}
}
}
false
}
pub async fn hcc_pull_timeline_till_success(
timeline: SafekeeperTimeline,
conf: &SafeKeeperConf,
global_timelines: Arc<GlobalTimelines>,
nodeid_http: &HashMap<u64, String>,
) {
const MAX_PULL_TIMELINE_RETRIES: u64 = 100;
for i in 0..MAX_PULL_TIMELINE_RETRIES {
if hcc_pull_timeline(
timeline.clone(),
conf,
global_timelines.clone(),
nodeid_http,
)
.await
{
SK_RECOVERY_PULL_TIMELINE_OKS.inc();
return;
}
tracing::error!(
"Failed to pull timeline {} from SK peers, retrying {}/{}",
timeline.timeline_id,
i + 1,
MAX_PULL_TIMELINE_RETRIES
);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
SK_RECOVERY_PULL_TIMELINE_ERRORS.inc();
}
pub async fn hcc_pull_timelines(
conf: &SafeKeeperConf,
global_timelines: Arc<GlobalTimelines>,
) -> Result<()> {
let _timer = SK_RECOVERY_PULL_TIMELINES_SECONDS.start_timer();
tracing::info!("Start pulling timelines from SK peers");
let mut response = SafekeeperTimelinesResponse {
timelines: Vec::new(),
safekeeper_peers: Vec::new(),
};
for i in 0..100 {
match safekeeper_list_timelines_request(conf).await {
Ok(timelines) => {
response = timelines;
}
Err(e) => {
tracing::error!("Failed to list timelines from HCC: {}", e);
if i == 99 {
return Err(e);
}
}
}
sleep(Duration::from_millis(100)).await;
}
let mut nodeid_http = HashMap::new();
for sk in response.safekeeper_peers {
nodeid_http.insert(
sk.node_id.0,
format!("http://{}:{}", sk.listen_http_addr, sk.http_port),
);
}
tracing::info!("Received {} timelines from HCC", response.timelines.len());
for timeline in response.timelines {
let _timer = SK_RECOVERY_PULL_TIMELINE_SECONDS
.with_label_values(&[
&timeline.tenant_id.to_string(),
&timeline.timeline_id.to_string(),
])
.start_timer();
hcc_pull_timeline_till_success(timeline, conf, global_timelines.clone(), &nodeid_http)
.await;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use utils::id::NodeId;
#[test]
fn test_build_node_registeration_request() {
// Test that:
// 1. We always extract the host name and port used to register with the HCC from the
// `advertise_pg_addr` if it is set.
// 2. The correct ports are extracted from `advertise_pg_addr` and `listen_http_addr`.
let mut conf = SafeKeeperConf::dummy();
conf.my_id = NodeId(1);
conf.advertise_pg_addr_tenant_only =
Some("safe-keeper-1.safe-keeper.hadron.svc.cluster.local:5454".to_string());
// `listen_pg_addr` and `listen_pg_addr_tenant_only` are not used for node registration. Set them to a different
// host and port values and make sure that they don't show up in the node registration request.
conf.listen_pg_addr = "0.0.0.0:5456".to_string();
conf.listen_pg_addr_tenant_only = Some("0.0.0.0:5456".to_string());
conf.listen_http_addr = "0.0.0.0:7676".to_string();
let node_ip_addr: Option<IpAddr> = Some("127.0.0.1".parse().unwrap());
let request = build_node_registeration_request(&conf, node_ip_addr).unwrap();
assert_eq!(request.node_id, NodeId(1));
assert_eq!(
request.listen_pg_addr,
"safe-keeper-1.safe-keeper.hadron.svc.cluster.local"
);
assert_eq!(request.listen_pg_port, 5454);
assert_eq!(
request.listen_http_addr,
"safe-keeper-1.safe-keeper.hadron.svc.cluster.local"
);
assert_eq!(request.listen_http_port, 7676);
assert_eq!(
request.node_ip_addr,
Some(IpAddr::V4("127.0.0.1".parse().unwrap()))
);
}
}

View File

@@ -241,9 +241,14 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
ApiError::InternalServerError(anyhow::anyhow!("failed to parse CA certs: {e}"))
})?;
let resp =
pull_timeline::handle_request(data, conf.sk_auth_token.clone(), ca_certs, global_timelines)
.await?;
let resp = pull_timeline::handle_request(
data,
conf.sk_auth_token.clone(),
ca_certs,
global_timelines,
false,
)
.await?;
json_response(StatusCode::OK, resp)
}

View File

@@ -10,6 +10,7 @@ use pem::Pem;
use remote_storage::RemoteStorageConfig;
use storage_broker::Uri;
use tokio::runtime::Runtime;
use url::Url;
use utils::auth::SwappableJwtAuth;
use utils::id::NodeId;
use utils::logging::SecretString;
@@ -20,6 +21,7 @@ pub mod control_file;
pub mod control_file_upgrade;
pub mod copy_timeline;
pub mod debug_dump;
pub mod hadron;
pub mod handler;
pub mod http;
pub mod metrics;
@@ -100,6 +102,11 @@ pub struct SafeKeeperConf {
pub advertise_pg_addr: Option<String>,
pub availability_zone: Option<String>,
pub no_sync: bool,
/* BEGIN_HADRON */
pub advertise_pg_addr_tenant_only: Option<String>,
pub enable_pull_timeline_on_startup: bool,
pub hcc_base_url: Option<Url>,
/* END_HADRON */
pub broker_endpoint: Uri,
pub broker_keepalive_interval: Duration,
pub heartbeat_timeout: Duration,
@@ -185,6 +192,11 @@ impl SafeKeeperConf {
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
force_metric_collection_on_scrape: true,
/* BEGIN_HADRON */
advertise_pg_addr_tenant_only: None,
enable_pull_timeline_on_startup: false,
hcc_base_url: None,
/* END_HADRON */
}
}
}

View File

@@ -85,6 +85,43 @@ pub static WAL_STORAGE_LIMIT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_wal_storage_limit_errors counter")
});
pub static SK_RECOVERY_PULL_TIMELINE_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_recovery_pull_timeline_errors",
concat!(
"Number of errors due to pull_timeline errors during SK lost disk recovery.",
"An increase in this metric indicates pull timelines runs into error."
)
)
.expect("Failed to register safekeeper_recovery_pull_timeline_errors counter")
});
pub static SK_RECOVERY_PULL_TIMELINE_OKS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_recovery_pull_timeline_oks",
concat!(
"Number of successful pull_timeline during SK lost disk recovery.",
"An increase in this metric indicates pull timelines is successful."
)
)
.expect("Failed to register safekeeper_recovery_pull_timeline_oks counter")
});
pub static SK_RECOVERY_PULL_TIMELINES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_recovery_pull_timelines_seconds",
"Seconds to pull timelines",
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_recovery_pull_timelines_seconds histogram")
});
pub static SK_RECOVERY_PULL_TIMELINE_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"safekeeper_recovery_pull_timeline_seconds",
"Seconds to pull timeline",
&["tenant_id", "timeline_id"],
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_recovery_pull_timeline_seconds histogram vec")
});
/* END_HADRON */
pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(

View File

@@ -8,6 +8,7 @@ use bytes::Bytes;
use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt};
use http::StatusCode;
use http_utils::error::ApiError;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
use remote_storage::GenericRemoteStorage;
@@ -21,10 +22,11 @@ use tokio::fs::OpenOptions;
use tokio::io::AsyncWrite;
use tokio::sync::mpsc;
use tokio::task;
use tokio::time::sleep;
use tokio_tar::{Archive, Builder, Header};
use tokio_util::io::{CopyToBytes, SinkWriter};
use tokio_util::sync::PollSender;
use tracing::{error, info, instrument};
use tracing::{error, info, instrument, warn};
use utils::crashsafe::fsync_async_opt;
use utils::id::{NodeId, TenantTimelineId};
use utils::logging::SecretString;
@@ -449,6 +451,7 @@ pub async fn handle_request(
sk_auth_token: Option<SecretString>,
ssl_ca_certs: Vec<Certificate>,
global_timelines: Arc<GlobalTimelines>,
wait_for_peer_timeline_status: bool,
) -> Result<PullTimelineResponse, ApiError> {
let existing_tli = global_timelines.get(TenantTimelineId::new(
request.tenant_id,
@@ -472,37 +475,100 @@ pub async fn handle_request(
let http_hosts = request.http_hosts.clone();
// Figure out statuses of potential donors.
let responses: Vec<Result<TimelineStatus, mgmt_api::Error>> =
futures::future::join_all(http_hosts.iter().map(|url| async {
let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone());
let info = cclient
.timeline_status(request.tenant_id, request.timeline_id)
.await?;
Ok(info)
}))
.await;
let mut statuses = Vec::new();
for (i, response) in responses.into_iter().enumerate() {
match response {
Ok(status) => {
statuses.push((status, i));
}
Err(e) => {
info!("error fetching status from {}: {e}", http_hosts[i]);
if !wait_for_peer_timeline_status {
let responses: Vec<Result<TimelineStatus, mgmt_api::Error>> =
futures::future::join_all(http_hosts.iter().map(|url| async {
let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone());
let resp = cclient
.timeline_status(request.tenant_id, request.timeline_id)
.await?;
let info: TimelineStatus = resp
.json()
.await
.context("Failed to deserialize timeline status")
.map_err(|e| mgmt_api::Error::ReceiveErrorBody(e.to_string()))?;
Ok(info)
}))
.await;
for (i, response) in responses.into_iter().enumerate() {
match response {
Ok(status) => {
statuses.push((status, i));
}
Err(e) => {
info!("error fetching status from {}: {e}", http_hosts[i]);
}
}
}
}
// Allow missing responses from up to one safekeeper (say due to downtime)
// e.g. if we created a timeline on PS A and B, with C being offline. Then B goes
// offline and C comes online. Then we want a pull on C with A and B as hosts to work.
let min_required_successful = (http_hosts.len() - 1).max(1);
if statuses.len() < min_required_successful {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"only got {} successful status responses. required: {min_required_successful}",
statuses.len()
)));
// Allow missing responses from up to one safekeeper (say due to downtime)
// e.g. if we created a timeline on PS A and B, with C being offline. Then B goes
// offline and C comes online. Then we want a pull on C with A and B as hosts to work.
let min_required_successful = (http_hosts.len() - 1).max(1);
if statuses.len() < min_required_successful {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"only got {} successful status responses. required: {min_required_successful}",
statuses.len()
)));
}
} else {
let mut retry = true;
// We must get status from all other peers.
// Otherwise, we may run into split-brain scenario.
while retry {
statuses.clear();
retry = false;
for (i, url) in http_hosts.iter().enumerate() {
let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone());
match cclient
.timeline_status(request.tenant_id, request.timeline_id)
.await
{
Ok(resp) => {
if resp.status() == StatusCode::NOT_FOUND {
warn!(
"Timeline {} not found on peer SK {}, no need to pull it",
TenantTimelineId::new(request.tenant_id, request.timeline_id),
url
);
return Ok(PullTimelineResponse {
safekeeper_host: None,
});
}
let info: TimelineStatus = resp
.json()
.await
.context("Failed to deserialize timeline status")
.map_err(ApiError::InternalServerError)?;
statuses.push((info, i));
}
Err(e) => {
match e {
// If we get a 404, it means the timeline doesn't exist on this safekeeper.
// We can ignore this error.
mgmt_api::Error::ApiError(status, _)
if status == StatusCode::NOT_FOUND =>
{
warn!(
"Timeline {} not found on peer SK {}, no need to pull it",
TenantTimelineId::new(request.tenant_id, request.timeline_id),
url
);
return Ok(PullTimelineResponse {
safekeeper_host: None,
});
}
_ => {}
}
retry = true;
error!("Failed to get timeline status from {}: {:#}", url, e);
}
}
}
sleep(std::time::Duration::from_millis(100)).await;
}
}
// Find the most advanced safekeeper
@@ -511,6 +577,12 @@ pub async fn handle_request(
.max_by_key(|(status, _)| {
(
status.acceptor_state.epoch,
/* BEGIN_HADRON */
// We need to pull from the SK with the highest term.
// This is because another compute may come online and vote the same highest term again on the other two SKs.
// Then, there will be 2 computes running on the same term.
status.acceptor_state.term,
/* END_HADRON */
status.flush_lsn,
status.commit_lsn,
)

View File

@@ -191,6 +191,11 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
force_metric_collection_on_scrape: true,
/* BEGIN_HADRON */
enable_pull_timeline_on_startup: false,
advertise_pg_addr_tenant_only: None,
hcc_base_url: None,
/* END_HADRON */
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -34,7 +34,9 @@ class NeonAPI:
self.retries524 = 0
self.retries4xx = 0
def __request(self, method: str | bytes, endpoint: str, **kwargs: Any) -> requests.Response:
def __request(
self, method: str | bytes, endpoint: str, retry404: bool = False, **kwargs: Any
) -> requests.Response:
kwargs["headers"] = kwargs.get("headers", {})
kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}"
@@ -55,10 +57,12 @@ class NeonAPI:
resp.raise_for_status()
break
elif resp.status_code >= 400:
if resp.status_code == 422:
if resp.json()["message"] == "branch not ready yet":
retry = True
self.retries4xx += 1
if resp.status_code == 404 and retry404:
retry = True
self.retries4xx += 1
elif resp.status_code == 422 and resp.json()["message"] == "branch not ready yet":
retry = True
self.retries4xx += 1
elif resp.status_code == 423 and resp.json()["message"] in {
"endpoint is in some transitive state, could not suspend",
"project already has running conflicting operations, scheduling of new ones is prohibited",
@@ -66,7 +70,7 @@ class NeonAPI:
retry = True
self.retries4xx += 1
elif resp.status_code == 524:
log.info("The request was timed out, trying to get operations")
log.info("The request was timed out")
retry = True
self.retries524 += 1
if retry:
@@ -203,6 +207,9 @@ class NeonAPI:
resp = self.__request(
"GET",
f"/projects/{project_id}/branches/{branch_id}",
# XXX Retry get parent details to work around the issue
# https://databricks.atlassian.net/browse/LKB-279
retry404=True,
headers={
"Accept": "application/json",
},
@@ -307,6 +314,10 @@ class NeonAPI:
if endpoint_type:
data["endpoint"]["type"] = endpoint_type
if settings:
# otherwise we get 400 "settings must not be nil"
# TODO(myrrc): fix on cplane side
if "pg_settings" not in settings:
settings["pg_settings"] = {}
data["endpoint"]["settings"] = settings
resp = self.__request(

View File

@@ -0,0 +1,168 @@
from __future__ import annotations
import os
import timeit
import traceback
from concurrent.futures import ThreadPoolExecutor as Exec
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING, Any, cast
import pytest
from fixtures.benchmark_fixture import NeonBenchmarker, PgBenchRunResult
from fixtures.log_helper import log
from fixtures.neon_api import NeonAPI, connection_parameters_to_env
if TYPE_CHECKING:
from fixtures.compare_fixtures import NeonCompare
from fixtures.neon_fixtures import Endpoint, PgBin
from fixtures.pg_version import PgVersion
from performance.test_perf_pgbench import utc_now_timestamp
# These tests compare performance for a write-heavy and read-heavy workloads of an ordinary endpoint
# compared to the endpoint which saves its LFC and prewarms using it on startup.
def test_compare_prewarmed_pgbench_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.create_branch("normal")
env.create_branch("prewarmed")
pg_bin = neon_compare.pg_bin
ep_normal: Endpoint = env.endpoints.create_start("normal")
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True)
for ep in [ep_normal, ep_prewarmed]:
connstr: str = ep.connstr()
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", connstr, "-s100"])
ep.safe_psql("CREATE EXTENSION neon")
client = ep.http_client()
client.offload_lfc()
ep.stop()
ep.start()
client.prewarm_lfc_wait()
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = pg_bin.run_capture(["pgbench", "-c10", "-T10", connstr])
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
stdout = Path(f"{out}.stdout").read_text()
res = PgBenchRunResult.parse_from_stdout(
stdout=stdout,
run_duration=run_duration,
run_start_timestamp=run_start_timestamp,
run_end_timestamp=run_end_timestamp,
)
name: str = cast("str", ep.branch_name)
neon_compare.zenbenchmark.record_pg_bench_result(name, res)
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_compare_prewarmed_pgbench_perf_benchmark(
pg_bin: PgBin,
neon_api: NeonAPI,
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
name = f"Test prewarmed pgbench performance, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
project = neon_api.create_project(pg_version, name)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
err = False
try:
benchmark_impl(pg_bin, neon_api, project, zenbenchmark)
except Exception as e:
err = True
log.error(f"Caught exception: {e}")
log.error(traceback.format_exc())
finally:
assert not err
neon_api.delete_project(project_id)
def benchmark_impl(
pg_bin: PgBin, neon_api: NeonAPI, project: dict[str, Any], zenbenchmark: NeonBenchmarker
):
pgbench_size = int(os.getenv("PGBENCH_SIZE") or "3424") # 50GB
offload_secs = 20
test_duration_min = 5
pgbench_duration = f"-T{test_duration_min * 60}"
# prewarm API is not publicly exposed. In order to test performance of a
# fully prewarmed endpoint, wait after it restarts.
# The number here is empirical, based on manual runs on staging
prewarmed_sleep_secs = 180
branch_id = project["branch"]["id"]
project_id = project["project"]["id"]
normal_env = connection_parameters_to_env(
project["connection_uris"][0]["connection_parameters"]
)
normal_id = project["endpoints"][0]["id"]
prewarmed_branch_id = neon_api.create_branch(
project_id, "prewarmed", parent_id=branch_id, add_endpoint=False
)["branch"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
ep_prewarmed = neon_api.create_endpoint(
project_id,
prewarmed_branch_id,
endpoint_type="read_write",
settings={"autoprewarm": True, "offload_lfc_interval_seconds": offload_secs},
)
neon_api.wait_for_operation_to_finish(project_id)
prewarmed_env = normal_env.copy()
prewarmed_env["PGHOST"] = ep_prewarmed["endpoint"]["host"]
prewarmed_id = ep_prewarmed["endpoint"]["id"]
def bench(endpoint_name, endpoint_id, env):
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", f"-s{pgbench_size}"], env)
sleep(offload_secs * 2) # ensure LFC is offloaded after pgbench finishes
neon_api.restart_endpoint(project_id, endpoint_id)
sleep(prewarmed_sleep_secs)
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = pg_bin.run_capture(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env)
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
stdout = Path(f"{out}.stdout").read_text()
res = PgBenchRunResult.parse_from_stdout(
stdout=stdout,
run_duration=run_duration,
run_start_timestamp=run_start_timestamp,
run_end_timestamp=run_end_timestamp,
)
zenbenchmark.record_pg_bench_result(endpoint_name, res)
with Exec(max_workers=2) as exe:
exe.submit(bench, "normal", normal_id, normal_env)
exe.submit(bench, "prewarmed", prewarmed_id, prewarmed_env)
def test_compare_prewarmed_read_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.create_branch("normal")
env.create_branch("prewarmed")
ep_normal: Endpoint = env.endpoints.create_start("normal")
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True)
sql = [
"CREATE EXTENSION neon",
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
]
for ep in [ep_normal, ep_prewarmed]:
ep.safe_psql_many(sql)
client = ep.http_client()
client.offload_lfc()
ep.stop()
ep.start()
client.prewarm_lfc_wait()
with neon_compare.record_duration(f"{ep.branch_name}_run_duration"):
ep.safe_psql("SELECT count(*) from foo")

View File

@@ -13,7 +13,6 @@ from typing import TYPE_CHECKING, Any
import pytest
from fixtures.log_helper import log
from requests import HTTPError
if TYPE_CHECKING:
from pathlib import Path
@@ -153,26 +152,11 @@ class NeonBranch:
return
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
parent_id: str = res["branch"]["parent_id"]
# XXX Retry get parent details to work around the issue
# https://databricks.atlassian.net/browse/LKB-279
target_time = datetime.now() + timedelta(seconds=30)
while datetime.now() < target_time:
try:
parent_def = self.neon_api.get_branch_details(self.project_id, parent_id)
except HTTPError as he:
if he.response.status_code == 404:
log.info("Branch not found, waiting...")
time.sleep(1)
else:
raise HTTPError(he) from he
else:
break
else:
raise RuntimeError(f"Branch {parent_id} not found")
# Creates an object for the parent branch
# After the reset operation a new parent branch is created
parent = NeonBranch(self.project, parent_def, True)
parent = NeonBranch(
self.project, self.neon_api.get_branch_details(self.project_id, parent_id), True
)
self.project.branches[parent_id] = parent
self.parent = parent
parent.children[self.id] = self

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import sys
import tarfile
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
@@ -198,3 +199,115 @@ def test_wal_restore_http(neon_env_builder: NeonEnvBuilder, broken_tenant: bool)
# the table is back now!
restored = env.endpoints.create_start("main")
assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)]
# BEGIN_HADRON
# TODO: re-enable once CM python is integreated.
# def clear_directory(directory):
# for item in os.listdir(directory):
# item_path = os.path.join(directory, item)
# if os.path.isdir(item_path):
# log.info(f"removing SK directory: {item_path}")
# shutil.rmtree(item_path)
# else:
# log.info(f"removing SK file: {item_path}")
# os.remove(item_path)
# def test_sk_pull_timelines(
# neon_env_builder: NeonEnvBuilder,
# ):
# DBNAME = "regression"
# superuser_name = "databricks_superuser"
# neon_env_builder.num_safekeepers = 3
# neon_env_builder.num_pageservers = 4
# neon_env_builder.safekeeper_extra_opts = ["--enable-pull-timeline-on-startup"]
# neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
# env = neon_env_builder.init_start(initial_tenant_shard_count=4)
# env.compute_manager.start(base_port=env.compute_manager_port)
# test_creator = "test_creator"
# test_metastore_id = uuid4()
# test_account_id = uuid4()
# test_workspace_id = 1
# test_workspace_url = "http://test_workspace_url"
# test_metadata_version = 1
# test_metadata = {
# "state": "INSTANCE_PROVISIONING",
# "admin_rolename": "admin",
# "admin_password_scram": "abc123456",
# }
# test_instance_name_1 = "test_instance_1"
# test_instance_read_write_compute_pool_1 = {
# "instance_name": test_instance_name_1,
# "compute_pool_name": "compute_pool_1",
# "creator": test_creator,
# "capacity": 2.0,
# "node_count": 1,
# "metadata_version": 0,
# "metadata": {
# "state": "INSTANCE_PROVISIONING",
# },
# }
# test_instance_1_readable_secondaries_enabled = False
# # Test creation
# create_instance_with_retries(
# env,
# test_instance_name_1,
# test_creator,
# test_metastore_id,
# test_account_id,
# test_workspace_id,
# test_workspace_url,
# test_instance_read_write_compute_pool_1,
# test_metadata_version,
# test_metadata,
# test_instance_1_readable_secondaries_enabled,
# )
# instance = env.compute_manager.get_instance_by_name(test_instance_name_1, test_workspace_id)
# log.info(f"haoyu Instance created: {instance}")
# assert instance["instance_name"] == test_instance_name_1
# test_instance_id = instance["instance_id"]
# instance_detail = env.compute_manager.describe_instance(test_instance_id)
# log.info(f"haoyu Instance detail: {instance_detail}")
# env.initial_tenant = instance_detail[0]["tenant_id"]
# env.initial_timeline = instance_detail[0]["timeline_id"]
# # Connect to postgres and create a database called "regression".
# endpoint = env.endpoints.create_start("main")
# endpoint.safe_psql(f"CREATE ROLE {superuser_name}")
# endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
# endpoint.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);")
# # Write some data. ~20 MB.
# num_rows = 0
# for _i in range(0, 20000):
# endpoint.safe_psql(
# "INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False
# )
# num_rows += 1
# log.info(f"SKs {env.storage_controller.hcc_sk_node_list()}")
# env.safekeepers[0].stop(immediate=True)
# clear_directory(env.safekeepers[0].data_dir)
# env.safekeepers[0].start()
# # PG can still write data. ~20 MB.
# for _i in range(0, 20000):
# endpoint.safe_psql(
# "INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False
# )
# num_rows += 1
# tuples = endpoint.safe_psql("SELECT COUNT(*) FROM usertable;")
# assert tuples[0][0] == num_rows
# endpoint.stop_and_destroy()
# END_HADRON

View File

@@ -98,7 +98,7 @@ tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unpref
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["full", "test-util"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-stream = { version = "0.1", features = ["net", "sync"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }