diff --git a/.github/actionlint.yml b/.github/actionlint.yml index 3142a36fa0..25b2fc702a 100644 --- a/.github/actionlint.yml +++ b/.github/actionlint.yml @@ -31,6 +31,7 @@ config-variables: - NEON_PROD_AWS_ACCOUNT_ID - PGREGRESS_PG16_PROJECT_ID - PGREGRESS_PG17_PROJECT_ID + - PREWARM_PGBENCH_SIZE - REMOTE_STORAGE_AZURE_CONTAINER - REMOTE_STORAGE_AZURE_REGION - SLACK_CICD_CHANNEL_ID diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index 79371ec704..df80bad579 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -219,6 +219,7 @@ jobs: --ignore test_runner/performance/test_cumulative_statistics_persistence.py --ignore test_runner/performance/test_perf_many_relations.py --ignore test_runner/performance/test_perf_oltp_large_tenant.py + --ignore test_runner/performance/test_lfc_prewarm.py env: BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }} VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}" @@ -410,6 +411,77 @@ jobs: env: SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} + prewarm-test: + if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }} + permissions: + contents: write + statuses: write + id-token: write # aws-actions/configure-aws-credentials + env: + PGBENCH_SIZE: ${{ vars.PREWARM_PGBENCH_SIZE }} + POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install + DEFAULT_PG_VERSION: 17 + TEST_OUTPUT: /tmp/test_output + BUILD_TYPE: remote + SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }} + PLATFORM: "neon-staging" + + runs-on: [ self-hosted, us-east-2, x64 ] + container: + image: ghcr.io/neondatabase/build-tools:pinned-bookworm + credentials: + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + options: --init + + steps: + - name: Harden the runner (Audit all outbound calls) + uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0 + with: + egress-policy: audit + + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2 + with: + aws-region: eu-central-1 + role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + role-duration-seconds: 18000 # 5 hours + + - name: Download Neon artifact + uses: ./.github/actions/download + with: + name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact + path: /tmp/neon/ + prefix: latest + aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + + - name: Run prewarm benchmark + uses: ./.github/actions/run-python-test-set + with: + build_type: ${{ env.BUILD_TYPE }} + test_selection: performance/test_lfc_prewarm.py + run_in_parallel: false + save_perf_report: ${{ env.SAVE_PERF_REPORT }} + extra_params: -m remote_cluster --timeout 5400 + pg_version: ${{ env.DEFAULT_PG_VERSION }} + aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + env: + VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}" + PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}" + NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }} + + - name: Create Allure report + id: create-allure-report + if: ${{ !cancelled() }} + uses: ./.github/actions/allure-report-generate + with: + store-test-results-into-db: true + aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + env: + REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }} + generate-matrices: if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }} # Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday) diff --git a/Cargo.lock b/Cargo.lock index 0fec8183f1..f91ae705c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4296,6 +4296,7 @@ dependencies = [ "pageserver_client", "pageserver_client_grpc", "pageserver_page_api", + "pprof", "rand 0.8.5", "reqwest", "serde", @@ -7564,6 +7565,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0d521ee4d9..df2064a4a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -201,7 +201,7 @@ tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.g tokio-io-timeout = "1.2.0" tokio-postgres-rustls = "0.12.0" tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]} -tokio-stream = "0.1" +tokio-stream = { version = "0.1", features = ["sync"] } tokio-tar = "0.3" tokio-util = { version = "0.7.10", features = ["io", "io-util", "rt"] } toml = "0.8" diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 24fd34a87a..fcc5549beb 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -476,6 +476,7 @@ async fn main() -> anyhow::Result<()> { listen_http_port, listen_https_port, availability_zone_id: AvailabilityZone(availability_zone_id), + node_ip_addr: None, }), ) .await?; diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index b02c6a613a..8f86b03f72 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -1,5 +1,6 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Display; +use std::net::IpAddr; use std::str::FromStr; use std::time::{Duration, Instant}; @@ -60,6 +61,11 @@ pub struct NodeRegisterRequest { pub listen_https_port: Option, pub availability_zone_id: AvailabilityZone, + + // Reachable IP address of the PS/SK registering, if known. + // Hadron Cluster Coordiantor will update the DNS record of the registering node + // with this IP address. + pub node_ip_addr: Option, } #[derive(Serialize, Deserialize)] @@ -545,6 +551,39 @@ pub struct SafekeeperDescribeResponse { pub scheduling_policy: SkSchedulingPolicy, } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct TimelineSafekeeperPeer { + pub node_id: NodeId, + pub listen_http_addr: String, + pub http_port: i32, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct SCSafekeeperTimeline { + // SC does not know the tenant id. + pub timeline_id: TimelineId, + pub peers: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct SCSafekeeperTimelinesResponse { + pub timelines: Vec, + pub safekeeper_peers: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct SafekeeperTimeline { + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub peers: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct SafekeeperTimelinesResponse { + pub timelines: Vec, + pub safekeeper_peers: Vec, +} + #[derive(Serialize, Deserialize, Clone)] pub struct SafekeeperSchedulingPolicyRequest { pub scheduling_policy: SkSchedulingPolicy, diff --git a/libs/utils/src/ip_address.rs b/libs/utils/src/ip_address.rs new file mode 100644 index 0000000000..d0834d0ba5 --- /dev/null +++ b/libs/utils/src/ip_address.rs @@ -0,0 +1,73 @@ +use std::env::{VarError, var}; +use std::error::Error; +use std::net::IpAddr; +use std::str::FromStr; + +/// Name of the environment variable containing the reachable IP address of the node. If set, the IP address contained in this +/// environment variable is used as the reachable IP address of the pageserver or safekeeper node during node registration. +/// In a Kubernetes environment, this environment variable should be set by Kubernetes to the Pod IP (specified in the Pod +/// template). +pub const HADRON_NODE_IP_ADDRESS: &str = "HADRON_NODE_IP_ADDRESS"; + +/// Read the reachable IP address of this page server from env var HADRON_NODE_IP_ADDRESS. +/// In Kubernetes this environment variable is set to the Pod IP (specified in the Pod template). +pub fn read_node_ip_addr_from_env() -> Result, Box> { + match var(HADRON_NODE_IP_ADDRESS) { + Ok(v) => { + if let Ok(addr) = IpAddr::from_str(&v) { + Ok(Some(addr)) + } else { + Err(format!("Invalid IP address string: {v}. Cannot be parsed as either an IPv4 or an IPv6 address.").into()) + } + } + Err(VarError::NotPresent) => Ok(None), + Err(e) => Err(e.into()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + use std::net::{Ipv4Addr, Ipv6Addr}; + + #[test] + fn test_read_node_ip_addr_from_env() { + // SAFETY: test code + unsafe { + // Test with a valid IPv4 address + env::set_var(HADRON_NODE_IP_ADDRESS, "192.168.1.1"); + let result = read_node_ip_addr_from_env().unwrap(); + assert_eq!(result, Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)))); + + // Test with a valid IPv6 address + env::set_var( + HADRON_NODE_IP_ADDRESS, + "2001:0db8:85a3:0000:0000:8a2e:0370:7334", + ); + } + let result = read_node_ip_addr_from_env().unwrap(); + assert_eq!( + result, + Some(IpAddr::V6( + Ipv6Addr::from_str("2001:0db8:85a3:0000:0000:8a2e:0370:7334").unwrap() + )) + ); + + // Test with an invalid IP address + // SAFETY: test code + unsafe { + env::set_var(HADRON_NODE_IP_ADDRESS, "invalid_ip"); + } + let result = read_node_ip_addr_from_env(); + assert!(result.is_err()); + + // Test with no environment variable set + // SAFETY: test code + unsafe { + env::remove_var(HADRON_NODE_IP_ADDRESS); + } + let result = read_node_ip_addr_from_env().unwrap(); + assert_eq!(result, None); + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 2b81da017d..69771be5dc 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -26,6 +26,9 @@ pub mod auth; // utility functions and helper traits for unified unique id generation/serialization etc. pub mod id; +// utility functions to obtain reachable IP addresses in PS/SK nodes. +pub mod ip_address; + pub mod shard; mod hex; diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 5828a400a0..d67c0f123b 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,4 +1,5 @@ use std::future::Future; +use std::pin::Pin; use std::str::FromStr; use std::time::Duration; @@ -7,7 +8,7 @@ use metrics::{IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use strum_macros::{EnumString, VariantNames}; use tokio::time::Instant; -use tracing::info; +use tracing::{info, warn}; /// Logs a critical error, similarly to `tracing::error!`. This will: /// @@ -377,10 +378,11 @@ impl std::fmt::Debug for SecretString { /// /// TODO: consider upgrading this to a warning, but currently it fires too often. #[inline] -pub async fn log_slow(name: &str, threshold: Duration, f: std::pin::Pin<&mut F>) -> O -where - F: Future, -{ +pub async fn log_slow( + name: &str, + threshold: Duration, + f: Pin<&mut impl Future>, +) -> O { monitor_slow_future( threshold, threshold, // period = threshold @@ -394,16 +396,42 @@ where if !is_slow { return; } + let elapsed = elapsed_total.as_secs_f64(); if ready { - info!( - "slow {name} completed after {:.3}s", - elapsed_total.as_secs_f64() - ); + info!("slow {name} completed after {elapsed:.3}s"); } else { - info!( - "slow {name} still running after {:.3}s", - elapsed_total.as_secs_f64() - ); + info!("slow {name} still running after {elapsed:.3}s"); + } + }, + ) + .await +} + +/// Logs a periodic warning if a future is slow to complete. +#[inline] +pub async fn warn_slow( + name: &str, + threshold: Duration, + f: Pin<&mut impl Future>, +) -> O { + monitor_slow_future( + threshold, + threshold, // period = threshold + f, + |MonitorSlowFutureCallback { + ready, + is_slow, + elapsed_total, + elapsed_since_last_callback: _, + }| { + if !is_slow { + return; + } + let elapsed = elapsed_total.as_secs_f64(); + if ready { + warn!("slow {name} completed after {elapsed:.3}s"); + } else { + warn!("slow {name} still running after {elapsed:.3}s"); } }, ) @@ -416,7 +444,7 @@ where pub async fn monitor_slow_future( threshold: Duration, period: Duration, - mut fut: std::pin::Pin<&mut F>, + mut fut: Pin<&mut F>, mut cb: impl FnMut(MonitorSlowFutureCallback), ) -> O where diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 7049fbdb96..3a9edc7092 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -1,13 +1,16 @@ use std::collections::HashMap; use std::num::NonZero; +use std::pin::pin; use std::sync::Arc; +use std::time::{Duration, Instant}; use anyhow::anyhow; use arc_swap::ArcSwap; use futures::stream::FuturesUnordered; use futures::{FutureExt as _, StreamExt as _}; use tonic::codec::CompressionEncoding; -use tracing::instrument; +use tracing::{debug, instrument}; +use utils::logging::warn_slow; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; use crate::retry::Retry; @@ -21,28 +24,40 @@ use utils::shard::{ShardCount, ShardIndex, ShardNumber}; /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up /// when full. /// +/// Normal requests are small, and we don't pipeline them, so we can afford a large number of +/// streams per connection. +/// /// TODO: tune all of these constants, and consider making them configurable. -/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels -/// with only streams. -const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); +const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(64).unwrap(); -/// Max number of concurrent unary request clients per shard. -const MAX_UNARY_CLIENTS: NonZero = NonZero::new(64).unwrap(); +/// Max number of concurrent bulk GetPage streams per channel (i.e. TCP connection). These use a +/// dedicated channel pool with a lower client limit, to avoid TCP-level head-of-line blocking and +/// transmission delays. This also concentrates large window sizes on a smaller set of +/// streams/connections, presumably reducing memory use. +const MAX_BULK_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); -/// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage -/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`. -const MAX_STREAMS: NonZero = NonZero::new(64).unwrap(); +/// The batch size threshold at which a GetPage request will use the bulk stream pool. +/// +/// The gRPC initial window size is 64 KB. Each page is 8 KB, so let's avoid increasing the window +/// size for the normal stream pool, and route requests for >= 5 pages (>32 KB) to the bulk pool. +const BULK_THRESHOLD_BATCH_SIZE: usize = 5; -/// Max number of pipelined requests per stream. -const MAX_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(2).unwrap(); +/// The overall request call timeout, including retries and pool acquisition. +/// TODO: should we retry forever? Should the caller decide? +const CALL_TIMEOUT: Duration = Duration::from_secs(60); -/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these -/// are more throughput-oriented, we have a smaller limit but higher queue depth. -const MAX_BULK_STREAMS: NonZero = NonZero::new(16).unwrap(); +/// The per-request (retry attempt) timeout, including any lazy connection establishment. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); -/// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus -/// get a larger queue depth. -const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); +/// The initial request retry backoff duration. The first retry does not back off. +/// TODO: use a different backoff for ResourceExhausted (rate limiting)? Needs server support. +const BASE_BACKOFF: Duration = Duration::from_millis(5); + +/// The maximum request retry backoff duration. +const MAX_BACKOFF: Duration = Duration::from_secs(5); + +/// Threshold and interval for warning about slow operation. +const SLOW_THRESHOLD: Duration = Duration::from_secs(3); /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the /// basic `page_api::Client` gRPC client, and supports: @@ -50,10 +65,19 @@ const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); /// * Sharded tenants across multiple Pageservers. /// * Pooling of connections, clients, and streams for efficient resource use. /// * Concurrent use by many callers. -/// * Internal handling of GetPage bidirectional streams, with pipelining and error handling. +/// * Internal handling of GetPage bidirectional streams. /// * Automatic retries. /// * Observability. /// +/// The client has dedicated connection/client/stream pools per shard, for resource reuse. These +/// pools are unbounded: we allow scaling out as many concurrent streams as needed to serve all +/// concurrent callers, which mostly eliminates head-of-line blocking. Idle streams are fairly +/// cheap: the server task currently uses 26 KB of memory, so we can comfortably fit 100,000 +/// concurrent idle streams (2.5 GB memory). The worst case degenerates to the old libpq case with +/// one stream per backend, but without the TCP connection overhead. In the common case we expect +/// significantly lower stream counts due to stream sharing, driven e.g. by idle backends, LFC hits, +/// read coalescing, sharding (backends typically only talk to one shard at a time), etc. +/// /// TODO: this client does not support base backups or LSN leases, as these are only used by /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards. pub struct PageserverClient { @@ -67,8 +91,6 @@ pub struct PageserverClient { compression: Option, /// The shards for this tenant. shards: ArcSwap, - /// The retry configuration. - retry: Retry, } impl PageserverClient { @@ -94,7 +116,6 @@ impl PageserverClient { auth_token, compression, shards: ArcSwap::new(Arc::new(shards)), - retry: Retry, }) } @@ -142,13 +163,15 @@ impl PageserverClient { &self, req: page_api::CheckRelExistsRequest, ) -> tonic::Result { - self.retry - .with(async |_| { - // Relation metadata is only available on shard 0. - let mut client = self.shards.load_full().get_zero().client().await?; - client.check_rel_exists(req).await - }) - .await + debug!("sending request: {req:?}"); + let resp = Self::with_retries(CALL_TIMEOUT, async |_| { + // Relation metadata is only available on shard 0. + let mut client = self.shards.load_full().get_zero().client().await?; + Self::with_timeout(REQUEST_TIMEOUT, client.check_rel_exists(req)).await + }) + .await?; + debug!("received response: {resp:?}"); + Ok(resp) } /// Returns the total size of a database, as # of bytes. @@ -157,13 +180,15 @@ impl PageserverClient { &self, req: page_api::GetDbSizeRequest, ) -> tonic::Result { - self.retry - .with(async |_| { - // Relation metadata is only available on shard 0. - let mut client = self.shards.load_full().get_zero().client().await?; - client.get_db_size(req).await - }) - .await + debug!("sending request: {req:?}"); + let resp = Self::with_retries(CALL_TIMEOUT, async |_| { + // Relation metadata is only available on shard 0. + let mut client = self.shards.load_full().get_zero().client().await?; + Self::with_timeout(REQUEST_TIMEOUT, client.get_db_size(req)).await + }) + .await?; + debug!("received response: {resp:?}"); + Ok(resp) } /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the @@ -193,6 +218,8 @@ impl PageserverClient { return Err(tonic::Status::invalid_argument("request attempt must be 0")); } + debug!("sending request: {req:?}"); + // The shards may change while we're fetching pages. We execute the request using a stable // view of the shards (especially important for requests that span shards), but retry the // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary @@ -201,13 +228,16 @@ impl PageserverClient { // // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this // once we figure out how to handle these. - self.retry - .with(async |attempt| { - let mut req = req.clone(); - req.request_id.attempt = attempt as u32; - Self::get_page_with_shards(req, &self.shards.load_full()).await - }) - .await + let resp = Self::with_retries(CALL_TIMEOUT, async |attempt| { + let mut req = req.clone(); + req.request_id.attempt = attempt as u32; + let shards = self.shards.load_full(); + Self::with_timeout(REQUEST_TIMEOUT, Self::get_page_with_shards(req, &shards)).await + }) + .await?; + + debug!("received response: {resp:?}"); + Ok(resp) } /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of @@ -246,7 +276,7 @@ impl PageserverClient { req: page_api::GetPageRequest, shard: &Shard, ) -> tonic::Result { - let stream = shard.stream(req.request_class.is_bulk()).await; + let mut stream = shard.stream(Self::is_bulk(&req)).await?; let resp = stream.send(req.clone()).await?; // Convert per-request errors into a tonic::Status. @@ -290,13 +320,15 @@ impl PageserverClient { &self, req: page_api::GetRelSizeRequest, ) -> tonic::Result { - self.retry - .with(async |_| { - // Relation metadata is only available on shard 0. - let mut client = self.shards.load_full().get_zero().client().await?; - client.get_rel_size(req).await - }) - .await + debug!("sending request: {req:?}"); + let resp = Self::with_retries(CALL_TIMEOUT, async |_| { + // Relation metadata is only available on shard 0. + let mut client = self.shards.load_full().get_zero().client().await?; + Self::with_timeout(REQUEST_TIMEOUT, client.get_rel_size(req)).await + }) + .await?; + debug!("received response: {resp:?}"); + Ok(resp) } /// Fetches an SLRU segment. @@ -305,13 +337,50 @@ impl PageserverClient { &self, req: page_api::GetSlruSegmentRequest, ) -> tonic::Result { - self.retry - .with(async |_| { - // SLRU segments are only available on shard 0. - let mut client = self.shards.load_full().get_zero().client().await?; - client.get_slru_segment(req).await - }) - .await + debug!("sending request: {req:?}"); + let resp = Self::with_retries(CALL_TIMEOUT, async |_| { + // SLRU segments are only available on shard 0. + let mut client = self.shards.load_full().get_zero().client().await?; + Self::with_timeout(REQUEST_TIMEOUT, client.get_slru_segment(req)).await + }) + .await?; + debug!("received response: {resp:?}"); + Ok(resp) + } + + /// Runs the given async closure with retries up to the given timeout. Only certain gRPC status + /// codes are retried, see [`Retry::should_retry`]. Returns `DeadlineExceeded` on timeout. + async fn with_retries(timeout: Duration, f: F) -> tonic::Result + where + F: FnMut(usize) -> O, // pass attempt number, starting at 0 + O: Future>, + { + Retry { + timeout: Some(timeout), + base_backoff: BASE_BACKOFF, + max_backoff: MAX_BACKOFF, + } + .with(f) + .await + } + + /// Runs the given future with a timeout. Returns `DeadlineExceeded` on timeout. + async fn with_timeout( + timeout: Duration, + f: impl Future>, + ) -> tonic::Result { + let started = Instant::now(); + tokio::time::timeout(timeout, f).await.map_err(|_| { + tonic::Status::deadline_exceeded(format!( + "request timed out after {:.3}s", + started.elapsed().as_secs_f64() + )) + })? + } + + /// Returns true if the request is considered a bulk request and should use the bulk pool. + fn is_bulk(req: &page_api::GetPageRequest) -> bool { + req.block_numbers.len() >= BULK_THRESHOLD_BATCH_SIZE } } @@ -440,15 +509,23 @@ impl Shards { } } -/// A single shard. Uses dedicated resource pools with the following structure: +/// A single shard. Has dedicated resource pools with the following structure: /// -/// * Channel pool: unbounded. -/// * Unary client pool: MAX_UNARY_CLIENTS. -/// * Stream client pool: unbounded. -/// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH. -/// * Bulk channel pool: unbounded. +/// * Channel pool: MAX_CLIENTS_PER_CHANNEL. +/// * Client pool: unbounded. +/// * Stream pool: unbounded. +/// * Bulk channel pool: MAX_BULK_CLIENTS_PER_CHANNEL. /// * Bulk client pool: unbounded. -/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH. +/// * Bulk stream pool: unbounded. +/// +/// We use a separate bulk channel pool with a lower concurrency limit for large batch requests. +/// This avoids TCP-level head-of-line blocking, and also concentrates large window sizes on a +/// smaller set of streams/connections, which presumably reduces memory use. Neither of these pools +/// are bounded, nor do they pipeline requests, so the latency characteristics should be mostly +/// similar (except for TCP transmission time). +/// +/// TODO: since we never use bounded pools, we could consider removing the pool limiters. However, +/// the code is fairly trivial, so we may as well keep them around for now in case we need them. struct Shard { /// The shard ID. id: ShardIndex, @@ -456,7 +533,7 @@ struct Shard { client_pool: Arc, /// GetPage stream pool. stream_pool: Arc, - /// GetPage stream pool for bulk requests, e.g. prefetches. + /// GetPage stream pool for bulk requests. bulk_stream_pool: Arc, } @@ -470,50 +547,30 @@ impl Shard { auth_token: Option, compression: Option, ) -> anyhow::Result { - // Common channel pool for unary and stream requests. Bounded by client/stream pools. - let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?; - - // Client pool for unary requests. + // Shard pools for unary requests and non-bulk GetPage requests. let client_pool = ClientPool::new( - channel_pool.clone(), + ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?, tenant_id, timeline_id, shard_id, auth_token.clone(), compression, - Some(MAX_UNARY_CLIENTS), + None, // unbounded ); + let stream_pool = StreamPool::new(client_pool.clone(), None); // unbounded - // GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients, - // but shares a channel pool with it (as it's unbounded). - let stream_pool = StreamPool::new( - ClientPool::new( - channel_pool.clone(), - tenant_id, - timeline_id, - shard_id, - auth_token.clone(), - compression, - None, // unbounded, limited by stream pool - ), - Some(MAX_STREAMS), - MAX_STREAM_QUEUE_DEPTH, - ); - - // Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools - // to avoid head-of-line blocking of latency-sensitive requests. + // Bulk GetPage stream pool for large batches (prefetches, sequential scans, vacuum, etc.). let bulk_stream_pool = StreamPool::new( ClientPool::new( - ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?, + ChannelPool::new(url, MAX_BULK_CLIENTS_PER_CHANNEL)?, tenant_id, timeline_id, shard_id, auth_token, compression, - None, // unbounded, limited by stream pool + None, // unbounded, ), - Some(MAX_BULK_STREAMS), - MAX_BULK_STREAM_QUEUE_DEPTH, + None, // unbounded ); Ok(Self { @@ -525,19 +582,23 @@ impl Shard { } /// Returns a pooled client for this shard. + #[instrument(skip_all)] async fn client(&self) -> tonic::Result { - self.client_pool - .get() - .await - .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) + warn_slow( + "client pool acquisition", + SLOW_THRESHOLD, + pin!(self.client_pool.get()), + ) + .await } - /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream - /// pool (e.g. for prefetches). - async fn stream(&self, bulk: bool) -> StreamGuard { - match bulk { - false => self.stream_pool.get().await, - true => self.bulk_stream_pool.get().await, - } + /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk pool. + #[instrument(skip_all, fields(bulk))] + async fn stream(&self, bulk: bool) -> tonic::Result { + let pool = match bulk { + false => &self.stream_pool, + true => &self.bulk_stream_pool, + }; + warn_slow("stream pool acquisition", SLOW_THRESHOLD, pin!(pool.get())).await } } diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 906872e091..98a649b4c8 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -9,19 +9,36 @@ //! //! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients //! can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a -//! per-channel client limit. Channels may be closed when they are no longer used by any clients. +//! per-channel client limit. Channels are closed immediately when empty, and indirectly rely on +//! client/stream idle timeouts. //! //! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared) //! channel from the ChannelPool for the client's lifetime. A client can only be acquired by a -//! single caller at a time, and is returned to the pool when dropped. Idle clients may be removed -//! from the pool after some time, to free up the channel. +//! single caller at a time, and is returned to the pool when dropped. Idle clients are removed +//! from the pool after a while to free up resources. //! //! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the -//! ClientPool for the stream's lifetime. Internal streams are not exposed to callers; instead, it -//! returns a guard that can be used to send a single request, to properly enforce queue depth and -//! route responses. Internally, the pool will reuse or spin up a suitable stream for the request, -//! possibly pipelining multiple requests from multiple callers on the same stream (up to some -//! queue depth). Idle streams may be removed from the pool after a while to free up the client. +//! ClientPool for the stream's lifetime. A stream can only be acquired by a single caller at a +//! time, and is returned to the pool when dropped. Idle streams are removed from the pool after +//! a while to free up resources. +//! +//! The stream only supports sending a single, synchronous request at a time, and does not support +//! pipelining multiple requests from different callers onto the same stream -- instead, we scale +//! out concurrent streams to improve throughput. There are many reasons for this design choice: +//! +//! * It (mostly) eliminates head-of-line blocking. A single stream is processed sequentially by +//! a single server task, which may block e.g. on layer downloads, LSN waits, etc. +//! +//! * Cancellation becomes trivial, by closing the stream. Otherwise, if a caller goes away +//! (e.g. because of a timeout), the request would still be processed by the server and block +//! requests behind it in the stream. It might even block its own timeout retry. +//! +//! * Stream scheduling becomes significantly simpler and cheaper. +//! +//! * Individual callers can still use client-side batching for pipelining. +//! +//! * Idle streams are cheap. Benchmarks show that an idle GetPage stream takes up about 26 KB +//! per stream (2.5 GB for 100,000 streams), so we can afford to scale out. //! //! Each channel corresponds to one TCP connection. Each client unary request and each stream //! corresponds to one HTTP/2 stream and server task. @@ -29,33 +46,31 @@ //! TODO: error handling (including custom error types). //! TODO: observability. -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::num::NonZero; use std::ops::{Deref, DerefMut}; +use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::time::{Duration, Instant}; -use futures::StreamExt as _; -use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; +use futures::{Stream, StreamExt as _}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch}; +use tokio_stream::wrappers::WatchStream; use tokio_util::sync::CancellationToken; use tonic::codec::CompressionEncoding; use tonic::transport::{Channel, Endpoint}; -use tracing::{error, warn}; use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::ShardIndex; -/// Reap channels/clients/streams that have been idle for this long. +/// Reap clients/streams that have been idle for this long. Channels are reaped immediately when +/// empty, and indirectly rely on the client/stream idle timeouts. /// -/// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to -/// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle. -/// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we -/// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to -/// account for it when setting the reap threshold. Alternatively, we can immediately reap empty -/// channels, and/or stream pool clients. +/// A stream's client will be reaped after 2x the idle threshold (first stream the client), but +/// that's okay -- if the stream closes abruptly (e.g. due to timeout or cancellation), we want to +/// keep its client around in the pool for a while. const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) { false => Duration::from_secs(180), true => Duration::from_secs(1), // exercise reaping in tests @@ -83,8 +98,6 @@ pub struct ChannelPool { max_clients_per_channel: NonZero, /// Open channels. channels: Mutex>, - /// Reaps idle channels. - idle_reaper: Reaper, /// Channel ID generator. next_channel_id: AtomicUsize, } @@ -96,9 +109,6 @@ struct ChannelEntry { channel: Channel, /// Number of clients using this channel. clients: usize, - /// The channel has been idle (no clients) since this time. None if channel is in use. - /// INVARIANT: Some if clients == 0, otherwise None. - idle_since: Option, } impl ChannelPool { @@ -108,15 +118,12 @@ impl ChannelPool { E: TryInto + Send + Sync + 'static, >::Error: std::error::Error + Send + Sync, { - let pool = Arc::new(Self { + Ok(Arc::new(Self { endpoint: endpoint.try_into()?, max_clients_per_channel, channels: Mutex::default(), - idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), next_channel_id: AtomicUsize::default(), - }); - pool.idle_reaper.spawn(&pool); - Ok(pool) + })) } /// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel. @@ -137,22 +144,17 @@ impl ChannelPool { let mut channels = self.channels.lock().unwrap(); // Try to find an existing channel with available capacity. We check entries in BTreeMap - // order, to fill up the lower-ordered channels first. The ClientPool also prefers clients - // with lower-ordered channel IDs first. This will cluster clients in lower-ordered + // order, to fill up the lower-ordered channels first. The client/stream pools also prefer + // clients with lower-ordered channel IDs first. This will cluster clients in lower-ordered // channels, and free up higher-ordered channels such that they can be reaped. for (&id, entry) in channels.iter_mut() { assert!( entry.clients <= self.max_clients_per_channel.get(), "channel overflow" ); - assert_eq!( - entry.idle_since.is_some(), - entry.clients == 0, - "incorrect channel idle state" - ); + assert_ne!(entry.clients, 0, "empty channel not reaped"); if entry.clients < self.max_clients_per_channel.get() { entry.clients += 1; - entry.idle_since = None; return ChannelGuard { pool: Arc::downgrade(self), id, @@ -169,7 +171,6 @@ impl ChannelPool { let entry = ChannelEntry { channel: channel.clone(), clients: 1, // account for the guard below - idle_since: None, }; channels.insert(id, entry); @@ -181,20 +182,6 @@ impl ChannelPool { } } -impl Reapable for ChannelPool { - /// Reaps channels that have been idle since before the cutoff. - fn reap_idle(&self, cutoff: Instant) { - self.channels.lock().unwrap().retain(|_, entry| { - let Some(idle_since) = entry.idle_since else { - assert_ne!(entry.clients, 0, "empty channel not marked idle"); - return true; - }; - assert_eq!(entry.clients, 0, "idle channel has clients"); - idle_since >= cutoff - }) - } -} - /// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`, /// since the gRPC client requires an owned `Channel`. pub struct ChannelGuard { @@ -211,7 +198,7 @@ impl ChannelGuard { } } -/// Returns the channel to the pool. +/// Returns the channel to the pool. The channel is closed when empty. impl Drop for ChannelGuard { fn drop(&mut self) { let Some(pool) = self.pool.upgrade() else { @@ -220,11 +207,12 @@ impl Drop for ChannelGuard { let mut channels = pool.channels.lock().unwrap(); let entry = channels.get_mut(&self.id).expect("unknown channel"); - assert!(entry.idle_since.is_none(), "active channel marked idle"); assert!(entry.clients > 0, "channel underflow"); entry.clients -= 1; + + // Reap empty channels immediately. if entry.clients == 0 { - entry.idle_since = Some(Instant::now()); // mark channel as idle + channels.remove(&self.id); } } } @@ -253,8 +241,7 @@ pub struct ClientPool { /// /// The first client in the map will be acquired next. The map is sorted by client ID, which in /// turn is sorted by its channel ID, such that we prefer acquiring idle clients from - /// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle - /// clients are reaped. + /// lower-ordered channels. This allows us to free up and reap higher-ordered channels. idle: Mutex>, /// Reaps idle clients. idle_reaper: Reaper, @@ -310,7 +297,7 @@ impl ClientPool { /// This is moderately performance-sensitive. It is called for every unary request, but these /// establish a new gRPC stream per request so they're already expensive. GetPage requests use /// the `StreamPool` instead. - pub async fn get(self: &Arc) -> anyhow::Result { + pub async fn get(self: &Arc) -> tonic::Result { // Acquire a permit if the pool is bounded. let mut permit = None; if let Some(limiter) = self.limiter.clone() { @@ -328,7 +315,7 @@ impl ClientPool { }); } - // Slow path: construct a new client. + // Construct a new client. let mut channel_guard = self.channel_pool.get(); let client = page_api::Client::new( channel_guard.take(), @@ -337,7 +324,8 @@ impl ClientPool { self.shard_id, self.auth_token.clone(), self.compression, - )?; + ) + .map_err(|err| tonic::Status::internal(format!("failed to create client: {err}")))?; Ok(ClientGuard { pool: Arc::downgrade(self), @@ -407,287 +395,187 @@ impl Drop for ClientGuard { /// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream /// acquires a client from the inner `ClientPool` for the stream's lifetime. /// -/// Individual streams are not exposed to callers -- instead, the returned guard can be used to send -/// a single request and await the response. Internally, requests are multiplexed across streams and -/// channels. This allows proper queue depth enforcement and response routing. +/// Individual streams only send a single request at a time, and do not pipeline multiple callers +/// onto the same stream. Instead, we scale out the number of concurrent streams. This is primarily +/// to eliminate head-of-line blocking. See the module documentation for more details. /// /// TODO: consider making this generic over request and response types; not currently needed. pub struct StreamPool { /// The client pool to acquire clients from. Must be unbounded. client_pool: Arc, - /// All pooled streams. + /// Idle pooled streams. Acquired streams are removed from here and returned on drop. /// - /// Incoming requests will be sent over an existing stream with available capacity. If all - /// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each - /// stream has an associated Tokio task that processes requests and responses. - streams: Mutex>, - /// The max number of concurrent streams, or None if unbounded. - max_streams: Option>, - /// The max number of concurrent requests per stream. - max_queue_depth: NonZero, - /// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`. - /// None if the pool is unbounded. + /// The first stream in the map will be acquired next. The map is sorted by stream ID, which is + /// equivalent to the client ID and in turn sorted by its channel ID. This way we prefer + /// acquiring idle streams from lower-ordered channels, which allows us to free up and reap + /// higher-ordered channels. + idle: Mutex>, + /// Limits the max number of concurrent streams. None if the pool is unbounded. limiter: Option>, /// Reaps idle streams. idle_reaper: Reaper, - /// Stream ID generator. - next_stream_id: AtomicUsize, } -type StreamID = usize; -type RequestSender = Sender<(page_api::GetPageRequest, ResponseSender)>; -type RequestReceiver = Receiver<(page_api::GetPageRequest, ResponseSender)>; -type ResponseSender = oneshot::Sender>; +/// The stream ID. Reuses the inner client ID. +type StreamID = ClientID; +/// A pooled stream. struct StreamEntry { - /// Sends caller requests to the stream task. The stream task exits when this is dropped. - sender: RequestSender, - /// Number of in-flight requests on this stream. - queue_depth: usize, - /// The time when this stream went idle (queue_depth == 0). - /// INVARIANT: Some if queue_depth == 0, otherwise None. - idle_since: Option, + /// The bidirectional stream. + stream: BiStream, + /// The time when this stream was last used, i.e. when it was put back into `StreamPool::idle`. + idle_since: Instant, +} + +/// A bidirectional GetPage stream and its client. Can send requests and receive responses. +struct BiStream { + /// The owning client. Holds onto the channel slot while the stream is alive. + client: ClientGuard, + /// Stream for sending requests. Uses a watch channel, so it can only send a single request at a + /// time, and the caller must await the response before sending another request. This is + /// enforced by `StreamGuard::send`. + sender: watch::Sender, + /// Stream for receiving responses. + receiver: Pin> + Send>>, } impl StreamPool { - /// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth` - /// concurrent requests on each stream, and use up to `max_streams` concurrent streams. + /// Creates a new stream pool, using the given client pool. It will use up to `max_streams` + /// concurrent streams. /// /// The client pool must be unbounded. The stream pool will enforce its own limits, and because /// streams are long-lived they can cause persistent starvation if they exhaust the client pool. /// The stream pool should generally have its own dedicated client pool (but it can share a /// channel pool with others since these are always unbounded). - pub fn new( - client_pool: Arc, - max_streams: Option>, - max_queue_depth: NonZero, - ) -> Arc { + pub fn new(client_pool: Arc, max_streams: Option>) -> Arc { assert!(client_pool.limiter.is_none(), "bounded client pool"); let pool = Arc::new(Self { client_pool, - streams: Mutex::default(), - limiter: max_streams.map(|max_streams| { - Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get())) - }), - max_streams, - max_queue_depth, + idle: Mutex::default(), + limiter: max_streams.map(|max_streams| Arc::new(Semaphore::new(max_streams.get()))), idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), - next_stream_id: AtomicUsize::default(), }); pool.idle_reaper.spawn(&pool); pool } - /// Acquires an available stream from the pool, or spins up a new stream async if all streams - /// are full. Returns a guard that can be used to send a single request on the stream and await - /// the response, with queue depth quota already acquired. Blocks if the pool is at capacity - /// (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight). + /// Acquires an available stream from the pool, or spins up a new stream if all streams are + /// full. Returns a guard that can be used to send requests and await the responses. Blocks if + /// the pool is full. /// /// This is very performance-sensitive, as it is on the GetPage hot path. /// - /// TODO: this must do something more sophisticated for performance. We want: - /// - /// * Cheap, concurrent access in the common case where we can use a pooled stream. - /// * Quick acquisition of pooled streams with available capacity. - /// * Prefer streams that belong to lower-numbered channels, to reap idle channels. - /// * Prefer filling up existing streams' queue depth before spinning up new streams. - /// * Don't hold a lock while spinning up new streams. - /// * Allow concurrent clients to join onto streams while they're spun up. - /// * Allow spinning up multiple streams concurrently, but don't overshoot limits. - /// - /// For now, we just do something simple but inefficient (linear scan under mutex). - pub async fn get(self: &Arc) -> StreamGuard { + /// TODO: is a `Mutex` performant enough? Will it become too contended? We can't + /// trivially use e.g. DashMap or sharding, because we want to pop lower-ordered streams first + /// to free up higher-ordered channels. + pub async fn get(self: &Arc) -> tonic::Result { // Acquire a permit if the pool is bounded. let mut permit = None; if let Some(limiter) = self.limiter.clone() { permit = Some(limiter.acquire_owned().await.expect("never closed")); } - let mut streams = self.streams.lock().unwrap(); - // Look for a pooled stream with available capacity. - for (&id, entry) in streams.iter_mut() { - assert!( - entry.queue_depth <= self.max_queue_depth.get(), - "stream queue overflow" - ); - assert_eq!( - entry.idle_since.is_some(), - entry.queue_depth == 0, - "incorrect stream idle state" - ); - if entry.queue_depth < self.max_queue_depth.get() { - entry.queue_depth += 1; - entry.idle_since = None; - return StreamGuard { - pool: Arc::downgrade(self), - id, - sender: entry.sender.clone(), - permit, - }; - } + // Fast path: acquire an idle stream from the pool. + if let Some((_, entry)) = self.idle.lock().unwrap().pop_first() { + return Ok(StreamGuard { + pool: Arc::downgrade(self), + stream: Some(entry.stream), + can_reuse: true, + permit, + }); } - // No available stream, spin up a new one. We install the stream entry in the pool first and - // return the guard, while spinning up the stream task async. This allows other callers to - // join onto this stream and also create additional streams concurrently if this fills up. - let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed); - let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get()); - let entry = StreamEntry { - sender: req_tx.clone(), - queue_depth: 1, // reserve quota for this caller - idle_since: None, - }; - streams.insert(id, entry); + // Spin up a new stream. Uses a watch channel to send a single request at a time, since + // `StreamGuard::send` enforces this anyway and it avoids unnecessary channel overhead. + let mut client = self.client_pool.get().await?; - if let Some(max_streams) = self.max_streams { - assert!(streams.len() <= max_streams.get(), "stream overflow"); - }; + let (req_tx, req_rx) = watch::channel(page_api::GetPageRequest::default()); + let req_stream = WatchStream::from_changes(req_rx); + let resp_stream = client.get_pages(req_stream).await?; - let client_pool = self.client_pool.clone(); - let pool = Arc::downgrade(self); - - tokio::spawn(async move { - if let Err(err) = Self::run_stream(client_pool, req_rx).await { - error!("stream failed: {err}"); - } - // Remove stream from pool on exit. Weak reference to avoid holding the pool alive. - if let Some(pool) = pool.upgrade() { - let entry = pool.streams.lock().unwrap().remove(&id); - assert!(entry.is_some(), "unknown stream ID: {id}"); - } - }); - - StreamGuard { + Ok(StreamGuard { pool: Arc::downgrade(self), - id, - sender: req_tx, + stream: Some(BiStream { + client, + sender: req_tx, + receiver: Box::pin(resp_stream), + }), + can_reuse: true, permit, - } - } - - /// Runs a stream task. This acquires a client from the `ClientPool` and establishes a - /// bidirectional GetPage stream, then forwards requests and responses between callers and the - /// stream. It does not track or enforce queue depths -- that's done by `get()` since it must be - /// atomic with pool stream acquisition. - /// - /// The task exits when the request channel is closed, or on a stream error. The caller is - /// responsible for removing the stream from the pool on exit. - async fn run_stream( - client_pool: Arc, - mut caller_rx: RequestReceiver, - ) -> anyhow::Result<()> { - // Acquire a client from the pool and create a stream. - let mut client = client_pool.get().await?; - - // NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could - // theoretically deadlock if both the client and server block on sends (since we're not - // reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and - // low queue depths, but it was seen to happen with the libpq protocol so better safe than - // sorry. It should never buffer more than the queue depth anyway, but using an unbounded - // channel guarantees that it will never block. - let (req_tx, req_rx) = mpsc::unbounded_channel(); - let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx); - let mut resp_stream = client.get_pages(req_stream).await?; - - // Track caller response channels by request ID. If the task returns early, these response - // channels will be dropped and the waiting callers will receive an error. - // - // NB: this will leak entries if the server doesn't respond to a request (by request ID). - // It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and - // block further use. But we could consider reaping closed channels after some time. - let mut callers = HashMap::new(); - - // Process requests and responses. - loop { - tokio::select! { - // Receive requests from callers and send them to the stream. - req = caller_rx.recv() => { - // Shut down if request channel is closed. - let Some((req, resp_tx)) = req else { - return Ok(()); - }; - - // Store the response channel by request ID. - if callers.contains_key(&req.request_id) { - // Error on request ID duplicates. Ignore callers that went away. - _ = resp_tx.send(Err(tonic::Status::invalid_argument( - format!("duplicate request ID: {}", req.request_id), - ))); - continue; - } - callers.insert(req.request_id, resp_tx); - - // Send the request on the stream. Bail out if the stream is closed. - req_tx.send(req).map_err(|_| { - tonic::Status::unavailable("stream closed") - })?; - } - - // Receive responses from the stream and send them to callers. - resp = resp_stream.next() => { - // Shut down if the stream is closed, and bail out on stream errors. - let Some(resp) = resp.transpose()? else { - return Ok(()) - }; - - // Send the response to the caller. Ignore errors if the caller went away. - let Some(resp_tx) = callers.remove(&resp.request_id) else { - warn!("received response for unknown request ID: {}", resp.request_id); - continue; - }; - _ = resp_tx.send(Ok(resp)); - } - } - } + }) } } impl Reapable for StreamPool { /// Reaps streams that have been idle since before the cutoff. fn reap_idle(&self, cutoff: Instant) { - self.streams.lock().unwrap().retain(|_, entry| { - let Some(idle_since) = entry.idle_since else { - assert_ne!(entry.queue_depth, 0, "empty stream not marked idle"); - return true; - }; - assert_eq!(entry.queue_depth, 0, "idle stream has requests"); - idle_since >= cutoff - }); + self.idle + .lock() + .unwrap() + .retain(|_, entry| entry.idle_since >= cutoff); } } -/// A pooled stream reference. Can be used to send a single request, to properly enforce queue -/// depth. Queue depth is already reserved and will be returned on drop. +/// A stream acquired from the pool. Returned to the pool when dropped, unless there are still +/// in-flight requests on the stream, or the stream failed. pub struct StreamGuard { pool: Weak, - id: StreamID, - sender: RequestSender, + stream: Option, // Some until dropped + can_reuse: bool, // returned to pool if true permit: Option, // None if pool is unbounded } impl StreamGuard { - /// Sends a request on the stream and awaits the response. Consumes the guard, since it's only - /// valid for a single request (to enforce queue depth). This also drops the guard on return and - /// returns the queue depth quota to the pool. + /// Sends a request on the stream and awaits the response. If the future is dropped before it + /// resolves (e.g. due to a timeout or cancellation), the stream will be closed to cancel the + /// request and is not returned to the pool. The same is true if the stream errors, in which + /// case the caller can't send further requests on the stream. /// - /// The `GetPageRequest::request_id` must be unique across in-flight requests. + /// We only support sending a single request at a time, to eliminate head-of-line blocking. See + /// module documentation for details. /// /// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status` /// to avoid tearing down the stream for per-request errors. Callers must check this. pub async fn send( - self, + &mut self, req: page_api::GetPageRequest, ) -> tonic::Result { - let (resp_tx, resp_rx) = oneshot::channel(); + let req_id = req.request_id; + let stream = self.stream.as_mut().expect("not dropped"); - self.sender - .send((req, resp_tx)) - .await + // Mark the stream as not reusable while the request is in flight. We can't return the + // stream to the pool until we receive the response, to avoid head-of-line blocking and + // stale responses. Failed streams can't be reused either. + if !self.can_reuse { + return Err(tonic::Status::internal("stream can't be reused")); + } + self.can_reuse = false; + + // Send the request and receive the response. + // + // NB: this uses a watch channel, so it's unsafe to change this code to pipeline requests. + stream + .sender + .send(req) .map_err(|_| tonic::Status::unavailable("stream closed"))?; - resp_rx + let resp = stream + .receiver + .next() .await - .map_err(|_| tonic::Status::unavailable("stream closed"))? + .ok_or_else(|| tonic::Status::unavailable("stream closed"))??; + + if resp.request_id != req_id { + return Err(tonic::Status::internal(format!( + "response ID {} does not match request ID {}", + resp.request_id, req_id + ))); + } + + // Success, mark the stream as reusable. + self.can_reuse = true; + + Ok(resp) } } @@ -697,26 +585,21 @@ impl Drop for StreamGuard { return; // pool was dropped }; - // Release the queue depth reservation on drop. This can prematurely decrement it if dropped - // before the response is received, but that's okay. - // - // TODO: actually, it's probably not okay. Queue depth release should be moved into the - // stream task, such that it continues to account for the queue depth slot until the server - // responds. Otherwise, if a slow request times out and keeps blocking the stream, the - // server will keep waiting on it and we can pile on subsequent requests (including the - // timeout retry) in the same stream and get blocked. But we may also want to avoid blocking - // requests on e.g. LSN waits and layer downloads, instead returning early to free up the - // stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line - // blocking. TBD. - let mut streams = pool.streams.lock().unwrap(); - let entry = streams.get_mut(&self.id).expect("unknown stream"); - assert!(entry.idle_since.is_none(), "active stream marked idle"); - assert!(entry.queue_depth > 0, "stream queue underflow"); - entry.queue_depth -= 1; - if entry.queue_depth == 0 { - entry.idle_since = Some(Instant::now()); // mark stream as idle + // If the stream isn't reusable, it can't be returned to the pool. + if !self.can_reuse { + return; } + // Place the idle stream back into the pool. + let entry = StreamEntry { + stream: self.stream.take().expect("dropped once"), + idle_since: Instant::now(), + }; + pool.idle + .lock() + .unwrap() + .insert(entry.stream.client.id, entry); + _ = self.permit; // returned on drop, referenced for visibility } } diff --git a/pageserver/client_grpc/src/retry.rs b/pageserver/client_grpc/src/retry.rs index a1e0b8636f..8a138711e8 100644 --- a/pageserver/client_grpc/src/retry.rs +++ b/pageserver/client_grpc/src/retry.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use futures::future::pending; use tokio::time::Instant; use tracing::{error, info, warn}; @@ -8,60 +9,54 @@ use utils::backoff::exponential_backoff_duration; /// A retry handler for Pageserver gRPC requests. /// /// This is used instead of backoff::retry for better control and observability. -pub struct Retry; +pub struct Retry { + /// Timeout across all retry attempts. If None, retries forever. + pub timeout: Option, + /// The initial backoff duration. The first retry does not use a backoff. + pub base_backoff: Duration, + /// The maximum backoff duration. + pub max_backoff: Duration, +} impl Retry { - /// The per-request timeout. - // TODO: tune these, and/or make them configurable. Should we retry forever? - const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); - /// The total timeout across all attempts - const TOTAL_TIMEOUT: Duration = Duration::from_secs(60); - /// The initial backoff duration. - const BASE_BACKOFF: Duration = Duration::from_millis(10); - /// The maximum backoff duration. - const MAX_BACKOFF: Duration = Duration::from_secs(10); - /// If true, log successful requests. For debugging. - const LOG_SUCCESS: bool = false; - - /// Runs the given async closure with timeouts and retries (exponential backoff), passing the - /// attempt number starting at 0. Logs errors, using the current tracing span for context. + /// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors, + /// using the current tracing span for context. /// - /// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default - /// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`]. + /// Only certain gRPC status codes are retried, see [`Self::should_retry`]. pub async fn with(&self, mut f: F) -> tonic::Result where - F: FnMut(usize) -> O, // takes attempt number, starting at 0 + F: FnMut(usize) -> O, // pass attempt number, starting at 0 O: Future>, { let started = Instant::now(); - let deadline = started + Self::TOTAL_TIMEOUT; + let deadline = self.timeout.map(|timeout| started + timeout); let mut last_error = None; let mut retries = 0; loop { - // Set up a future to wait for the backoff (if any) and run the request with a timeout. + // Set up a future to wait for the backoff, if any, and run the closure. let backoff_and_try = async { // NB: sleep() always sleeps 1ms, even when given a 0 argument. See: // https://github.com/tokio-rs/tokio/issues/6866 - if let Some(backoff) = Self::backoff_duration(retries) { + if let Some(backoff) = self.backoff_duration(retries) { tokio::time::sleep(backoff).await; } - let request_started = Instant::now(); - tokio::time::timeout(Self::REQUEST_TIMEOUT, f(retries)) - .await - .map_err(|_| { - tonic::Status::deadline_exceeded(format!( - "request timed out after {:.3}s", - request_started.elapsed().as_secs_f64() - )) - })? + f(retries).await }; - // Wait for the backoff and request, or bail out if the total timeout is exceeded. + // Set up a future for the timeout, if any. + let timeout = async { + match deadline { + Some(deadline) => tokio::time::sleep_until(deadline).await, + None => pending().await, + } + }; + + // Wait for the backoff and request, or bail out if the timeout is exceeded. let result = tokio::select! { result = backoff_and_try => result, - _ = tokio::time::sleep_until(deadline) => { + _ = timeout => { let last_error = last_error.unwrap_or_else(|| { tonic::Status::deadline_exceeded(format!( "request timed out after {:.3}s", @@ -79,7 +74,7 @@ impl Retry { match result { // Success, return the result. Ok(result) => { - if retries > 0 || Self::LOG_SUCCESS { + if retries > 0 { info!( "request succeeded after {retries} retries in {:.3}s", started.elapsed().as_secs_f64(), @@ -112,12 +107,13 @@ impl Retry { } } - /// Returns the backoff duration for the given retry attempt, or None for no backoff. - fn backoff_duration(retry: usize) -> Option { + /// Returns the backoff duration for the given retry attempt, or None for no backoff. The first + /// attempt and first retry never backs off, so this returns None for 0 and 1 retries. + fn backoff_duration(&self, retries: usize) -> Option { let backoff = exponential_backoff_duration( - retry as u32, - Self::BASE_BACKOFF.as_secs_f64(), - Self::MAX_BACKOFF.as_secs_f64(), + (retries as u32).saturating_sub(1), // first retry does not back off + self.base_backoff.as_secs_f64(), + self.max_backoff.as_secs_f64(), ); (!backoff.is_zero()).then_some(backoff) } diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 46ebefcaf6..a194dc76c3 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -52,7 +52,7 @@ impl From for tonic::Status { } /// The LSN a request should read at. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, Default)] pub struct ReadLsn { /// The request's read LSN. pub request_lsn: Lsn, @@ -332,7 +332,7 @@ impl From for proto::GetDbSizeResponse { } /// Requests one or more pages. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct GetPageRequest { /// A request ID. Will be included in the response. Should be unique for in-flight requests on /// the stream. @@ -433,12 +433,13 @@ impl From for proto::RequestId { } /// A GetPage request class. -#[derive(Clone, Copy, Debug, strum_macros::Display)] +#[derive(Clone, Copy, Debug, Default, strum_macros::Display)] pub enum GetPageClass { /// Unknown class. For backwards compatibility: used when an older client version sends a class /// that a newer server version has removed. Unknown, /// A normal request. This is the default. + #[default] Normal, /// A prefetch request. NB: can only be classified on pg < 18. Prefetch, @@ -446,19 +447,6 @@ pub enum GetPageClass { Background, } -impl GetPageClass { - /// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than - /// latency-sensitive). - pub fn is_bulk(&self) -> bool { - match self { - Self::Unknown => false, - Self::Normal => false, - Self::Prefetch => true, - Self::Background => true, - } - } -} - impl From for GetPageClass { fn from(pb: proto::GetPageClass) -> Self { match pb { diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 4086213830..609fef2b4f 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -16,6 +16,7 @@ futures.workspace = true hdrhistogram.workspace = true humantime.workspace = true humantime-serde.workspace = true +pprof.workspace = true rand.workspace = true reqwest.workspace = true serde.workspace = true diff --git a/pageserver/pagebench/src/cmd/idle_streams.rs b/pageserver/pagebench/src/cmd/idle_streams.rs new file mode 100644 index 0000000000..73bc9f3f46 --- /dev/null +++ b/pageserver/pagebench/src/cmd/idle_streams.rs @@ -0,0 +1,127 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use futures::StreamExt; +use tonic::transport::Endpoint; +use tracing::info; + +use pageserver_page_api::{GetPageClass, GetPageRequest, GetPageStatusCode, ReadLsn, RelTag}; +use utils::id::TenantTimelineId; +use utils::lsn::Lsn; +use utils::shard::ShardIndex; + +/// Starts a large number of idle gRPC GetPage streams. +#[derive(clap::Parser)] +pub(crate) struct Args { + /// The Pageserver to connect to. Must use grpc://. + #[clap(long, default_value = "grpc://localhost:51051")] + server: String, + /// The Pageserver HTTP API. + #[clap(long, default_value = "http://localhost:9898")] + http_server: String, + /// The number of streams to open. + #[clap(long, default_value = "100000")] + count: usize, + /// Number of streams per connection. + #[clap(long, default_value = "100")] + per_connection: usize, + /// Send a single GetPage request on each stream. + #[clap(long, default_value_t = false)] + send_request: bool, +} + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + + rt.block_on(main_impl(args)) +} + +async fn main_impl(args: Args) -> anyhow::Result<()> { + // Discover a tenant and timeline to use. + let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + reqwest::Client::new(), + args.http_server.clone(), + None, + )); + let timelines: Vec = crate::util::cli::targets::discover( + &mgmt_api_client, + crate::util::cli::targets::Spec { + limit_to_first_n_targets: Some(1), + targets: None, + }, + ) + .await?; + let ttid = timelines + .first() + .ok_or_else(|| anyhow!("no timelines found"))?; + + // Set up the initial client. + let endpoint = Endpoint::from_shared(args.server.clone())?; + + let connect = async || { + pageserver_page_api::Client::new( + endpoint.connect().await?, + ttid.tenant_id, + ttid.timeline_id, + ShardIndex::unsharded(), + None, + None, + ) + }; + + let mut client = connect().await?; + let mut streams = Vec::with_capacity(args.count); + + // Create streams. + for i in 0..args.count { + if i % 100 == 0 { + info!("opened {}/{} streams", i, args.count); + } + if i % args.per_connection == 0 && i > 0 { + client = connect().await?; + } + + let (req_tx, req_rx) = tokio::sync::mpsc::unbounded_channel(); + let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx); + let mut resp_stream = client.get_pages(req_stream).await?; + + // Send request if specified. + if args.send_request { + req_tx.send(GetPageRequest { + request_id: 1.into(), + request_class: GetPageClass::Normal, + read_lsn: ReadLsn { + request_lsn: Lsn::MAX, + not_modified_since_lsn: Some(Lsn(1)), + }, + rel: RelTag { + spcnode: 1664, // pg_global + dbnode: 0, // shared database + relnode: 1262, // pg_authid + forknum: 0, // init + }, + block_numbers: vec![0], + })?; + let resp = resp_stream + .next() + .await + .transpose()? + .ok_or_else(|| anyhow!("no response"))?; + if resp.status_code != GetPageStatusCode::Ok { + return Err(anyhow!("{} response", resp.status_code)); + } + } + + // Hold onto streams to avoid closing them. + streams.push((req_tx, resp_stream)); + } + + info!("opened {} streams, sleeping", args.count); + + // Block forever, to hold the idle streams open for inspection. + futures::future::pending::<()>().await; + + Ok(()) +} diff --git a/pageserver/pagebench/src/main.rs b/pageserver/pagebench/src/main.rs index 5527557450..ceca58e032 100644 --- a/pageserver/pagebench/src/main.rs +++ b/pageserver/pagebench/src/main.rs @@ -1,4 +1,7 @@ +use std::fs::File; + use clap::Parser; +use tracing::info; use utils::logging; /// Re-usable pieces of code that aren't CLI-specific. @@ -17,38 +20,73 @@ mod cmd { pub(super) mod aux_files; pub(super) mod basebackup; pub(super) mod getpage_latest_lsn; + pub(super) mod idle_streams; pub(super) mod ondemand_download_churn; pub(super) mod trigger_initial_size_calculation; } /// Component-level performance test for pageserver. #[derive(clap::Parser)] -enum Args { +struct Args { + /// Takes a client CPU profile into profile.svg. The benchmark must exit cleanly before it's + /// written, e.g. via --runtime. + #[arg(long)] + profile: bool, + + #[command(subcommand)] + subcommand: Subcommand, +} + +#[derive(clap::Subcommand)] +enum Subcommand { Basebackup(cmd::basebackup::Args), GetPageLatestLsn(cmd::getpage_latest_lsn::Args), TriggerInitialSizeCalculation(cmd::trigger_initial_size_calculation::Args), OndemandDownloadChurn(cmd::ondemand_download_churn::Args), AuxFiles(cmd::aux_files::Args), + IdleStreams(cmd::idle_streams::Args), } -fn main() { +fn main() -> anyhow::Result<()> { logging::init( logging::LogFormat::Plain, logging::TracingErrorLayerEnablement::Disabled, logging::Output::Stderr, - ) - .unwrap(); + )?; logging::replace_panic_hook_with_tracing_panic_hook().forget(); let args = Args::parse(); - match args { - Args::Basebackup(args) => cmd::basebackup::main(args), - Args::GetPageLatestLsn(args) => cmd::getpage_latest_lsn::main(args), - Args::TriggerInitialSizeCalculation(args) => { + + // Start a CPU profile if requested. + let mut profiler = None; + if args.profile { + profiler = Some( + pprof::ProfilerGuardBuilder::default() + .frequency(1000) + .blocklist(&["libc", "libgcc", "pthread", "vdso"]) + .build()?, + ); + } + + match args.subcommand { + Subcommand::Basebackup(args) => cmd::basebackup::main(args), + Subcommand::GetPageLatestLsn(args) => cmd::getpage_latest_lsn::main(args), + Subcommand::TriggerInitialSizeCalculation(args) => { cmd::trigger_initial_size_calculation::main(args) } - Args::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args), - Args::AuxFiles(args) => cmd::aux_files::main(args), + Subcommand::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args), + Subcommand::AuxFiles(args) => cmd::aux_files::main(args), + Subcommand::IdleStreams(args) => cmd::idle_streams::main(args), + }?; + + // Generate a CPU flamegraph if requested. + if let Some(profiler) = profiler { + let report = profiler.report().build()?; + drop(profiler); // stop profiling + let file = File::create("profile.svg")?; + report.flamegraph(file)?; + info!("wrote CPU profile flamegraph to profile.svg") } - .unwrap() + + Ok(()) } diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index f1f9aaf43c..be1de43d18 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -194,6 +194,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { listen_http_port: m.http_port, listen_https_port: m.https_port, availability_zone_id: az_id.expect("Checked above"), + node_ip_addr: None, }) } Err(e) => { diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index ca1a278dd3..9875c58118 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result, anyhow}; +use bytes::Bytes; use enumset::EnumSet; use futures::future::join_all; use futures::{StreamExt, TryFutureExt}; @@ -46,6 +47,7 @@ use pageserver_api::shard::{ShardCount, TenantShardId}; use postgres_ffi::PgMajorVersion; use remote_storage::{DownloadError, GenericRemoteStorage, TimeTravelError}; use scopeguard::defer; +use serde::{Deserialize, Serialize}; use serde_json::json; use tenant_size_model::svg::SvgBranchKind; use tenant_size_model::{SizeResult, StorageModel}; @@ -57,6 +59,7 @@ use utils::auth::SwappableJwtAuth; use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; +use wal_decoder::models::record::NeonWalRecord; use crate::config::PageServerConf; use crate::context; @@ -77,6 +80,7 @@ use crate::tenant::remote_timeline_client::{ }; use crate::tenant::secondary::SecondaryController; use crate::tenant::size::ModelInputs; +use crate::tenant::storage_layer::ValuesReconstructState; use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerName}; use crate::tenant::timeline::layer_manager::LayerManagerLockHolder; use crate::tenant::timeline::offload::{OffloadError, offload_timeline}; @@ -2712,6 +2716,16 @@ async fn deletion_queue_flush( } } +/// Try if `GetPage@Lsn` is successful, useful for manual debugging. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +struct GetPageResponse { + pub page: Bytes, + pub layers_visited: u32, + pub delta_layers_visited: u32, + pub records: Vec<(Lsn, NeonWalRecord)>, + pub img: Option<(Lsn, Bytes)>, +} + async fn getpage_at_lsn_handler( request: Request, cancel: CancellationToken, @@ -2762,21 +2776,24 @@ async fn getpage_at_lsn_handler_inner( // Use last_record_lsn if no lsn is provided let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn()); - let page = timeline.get(key.0, lsn, &ctx).await?; if touch { json_response(StatusCode::OK, ()) } else { - Result::<_, ApiError>::Ok( - Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/octet-stream") - .body(hyper::Body::from(page)) - .unwrap(), - ) + let mut reconstruct_state = ValuesReconstructState::new_with_debug(IoConcurrency::sequential()); + let page = timeline.debug_get(key.0, lsn, &ctx, &mut reconstruct_state).await?; + let response = GetPageResponse { + page, + layers_visited: reconstruct_state.get_layers_visited(), + delta_layers_visited: reconstruct_state.get_delta_layers_visited(), + records: reconstruct_state.debug_state.records.clone(), + img: reconstruct_state.debug_state.img.clone(), + }; + + json_response(StatusCode::OK, response) } } - .instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) + .instrument(info_span!("timeline_debug_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) .await } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 03dcda82e3..2ea4ede99d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3429,7 +3429,13 @@ impl TenantShard { .collect_vec(); for timeline in timelines { - timeline.maybe_freeze_ephemeral_layer().await; + // Include a span with the timeline ID. The parent span already has the tenant ID. + let span = + info_span!("maybe_freeze_ephemeral_layer", timeline_id = %timeline.timeline_id); + timeline + .maybe_freeze_ephemeral_layer() + .instrument(span) + .await; } } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 9fbb9d2438..43ea8fffa3 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -75,7 +75,7 @@ where /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data' /// call, to collect more records. /// -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub(crate) struct ValueReconstructState { pub(crate) records: Vec<(Lsn, NeonWalRecord)>, pub(crate) img: Option<(Lsn, Bytes)>, @@ -308,6 +308,9 @@ pub struct ValuesReconstructState { layers_visited: u32, delta_layers_visited: u32, + pub(crate) enable_debug: bool, + pub(crate) debug_state: ValueReconstructState, + pub(crate) io_concurrency: IoConcurrency, num_active_ios: Arc, @@ -657,6 +660,23 @@ impl ValuesReconstructState { layers_visited: 0, delta_layers_visited: 0, io_concurrency, + enable_debug: false, + debug_state: ValueReconstructState::default(), + num_active_ios: Arc::new(AtomicUsize::new(0)), + read_path: None, + } + } + + pub(crate) fn new_with_debug(io_concurrency: IoConcurrency) -> Self { + Self { + keys: HashMap::new(), + keys_done: KeySpaceRandomAccum::new(), + keys_with_image_coverage: None, + layers_visited: 0, + delta_layers_visited: 0, + io_concurrency, + enable_debug: true, + debug_state: ValueReconstructState::default(), num_active_ios: Arc::new(AtomicUsize::new(0)), read_path: None, } @@ -670,6 +690,12 @@ impl ValuesReconstructState { self.io_concurrency.spawn_io(fut).await; } + pub(crate) fn set_debug_state(&mut self, debug_state: &ValueReconstructState) { + if self.enable_debug { + self.debug_state = debug_state.clone(); + } + } + pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) { self.layers_visited += 1; if let ReadableLayer::PersistentLayer(layer) = layer { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 5921afdbde..c457ffc365 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1254,6 +1254,57 @@ impl Timeline { } } + #[inline(always)] + pub(crate) async fn debug_get( + &self, + key: Key, + lsn: Lsn, + ctx: &RequestContext, + reconstruct_state: &mut ValuesReconstructState, + ) -> Result { + if !lsn.is_valid() { + return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN"))); + } + + // This check is debug-only because of the cost of hashing, and because it's a double-check: we + // already checked the key against the shard_identity when looking up the Timeline from + // page_service. + debug_assert!(!self.shard_identity.is_key_disposable(&key)); + + let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn); + let vectored_res = self + .debug_get_vectored_impl(query, reconstruct_state, ctx) + .await; + + let key_value = vectored_res?.pop_first(); + match key_value { + Some((got_key, value)) => { + if got_key != key { + error!( + "Expected {}, but singular vectored get returned {}", + key, got_key + ); + Err(PageReconstructError::Other(anyhow!( + "Singular vectored get returned wrong key" + ))) + } else { + value + } + } + None => Err(PageReconstructError::MissingKey(Box::new( + MissingKeyError { + keyspace: KeySpace::single(key..key.next()), + shard: self.shard_identity.get_shard_number(&key), + original_hwm_lsn: lsn, + ancestor_lsn: None, + backtrace: None, + read_path: None, + query: None, + }, + ))), + } + } + pub(crate) const LAYERS_VISITED_WARN_THRESHOLD: u32 = 100; /// Look up multiple page versions at a given LSN @@ -1548,6 +1599,98 @@ impl Timeline { Ok(results) } + // A copy of the get_vectored_impl method except that we store the image and wal records into `reconstruct_state`. + // This is only used in the http getpage call for debugging purpose. + pub(super) async fn debug_get_vectored_impl( + &self, + query: VersionedKeySpaceQuery, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result>, GetVectoredError> { + if query.is_empty() { + return Ok(BTreeMap::default()); + } + + let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() { + Some(ReadPath::new( + query.total_keyspace(), + query.high_watermark_lsn()?, + )) + } else { + None + }; + + reconstruct_state.read_path = read_path; + + let traversal_res: Result<(), _> = self + .get_vectored_reconstruct_data(query.clone(), reconstruct_state, ctx) + .await; + + if let Err(err) = traversal_res { + // Wait for all the spawned IOs to complete. + // See comments on `spawn_io` inside `storage_layer` for more details. + let mut collect_futs = std::mem::take(&mut reconstruct_state.keys) + .into_values() + .map(|state| state.collect_pending_ios()) + .collect::>(); + while collect_futs.next().await.is_some() {} + return Err(err); + }; + + let reconstruct_state = Arc::new(Mutex::new(reconstruct_state)); + let futs = FuturesUnordered::new(); + + for (key, state) in std::mem::take(&mut reconstruct_state.lock().unwrap().keys) { + let req_lsn_for_key = query.map_key_to_lsn(&key); + futs.push({ + let walredo_self = self.myself.upgrade().expect("&self method holds the arc"); + let rc_clone = Arc::clone(&reconstruct_state); + + async move { + assert_eq!(state.situation, ValueReconstructSituation::Complete); + + let converted = match state.collect_pending_ios().await { + Ok(ok) => ok, + Err(err) => { + return (key, Err(err)); + } + }; + DELTAS_PER_READ_GLOBAL.observe(converted.num_deltas() as f64); + + // The walredo module expects the records to be descending in terms of Lsn. + // And we submit the IOs in that order, so, there shuold be no need to sort here. + debug_assert!( + converted + .records + .is_sorted_by_key(|(lsn, _)| std::cmp::Reverse(*lsn)), + "{converted:?}" + ); + { + let mut guard = rc_clone.lock().unwrap(); + guard.set_debug_state(&converted); + } + ( + key, + walredo_self + .reconstruct_value( + key, + req_lsn_for_key, + converted, + RedoAttemptType::ReadPage, + ) + .await, + ) + } + }); + } + + let results = futs + .collect::>>() + .await; + + Ok(results) + } + /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev. pub(crate) fn get_last_record_lsn(&self) -> Lsn { self.last_record_lsn.load().last @@ -1932,6 +2075,8 @@ impl Timeline { // an ephemeral layer open forever when idle. It also freezes layers if the global limit on // ephemeral layer bytes has been breached. pub(super) async fn maybe_freeze_ephemeral_layer(&self) { + debug_assert_current_span_has_tenant_and_timeline_id(); + let Ok(mut write_guard) = self.write_lock.try_lock() else { // If the write lock is held, there is an active wal receiver: rolling open layers // is their responsibility while they hold this lock. diff --git a/pgxn/neon/communicator.c b/pgxn/neon/communicator.c index bd53855eab..158b8940a3 100644 --- a/pgxn/neon/communicator.c +++ b/pgxn/neon/communicator.c @@ -421,7 +421,7 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp) { if (resp->tag != T_NeonGetPageResponse && resp->tag != T_NeonErrorResponse) { - neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld", + neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=" UINT64_FORMAT ", ring_flush=" UINT64_FORMAT ", ring_unused=" UINT64_FORMAT "", resp->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused); } if (neon_protocol_version >= 3) @@ -438,7 +438,7 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp) getpage_resp->req.blkno != slot->buftag.blockNum) { NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC, - "Receive unexpected getpage response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}", + "Receive unexpected getpage response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno, slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), slot->buftag.forkNum, slot->buftag.blockNum); } @@ -447,7 +447,7 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp) resp->lsn != slot->request_lsns.request_lsn || resp->not_modified_since != slot->request_lsns.not_modified_since) { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", + elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since)); } @@ -496,9 +496,9 @@ communicator_prefetch_pump_state(void) slot->my_ring_index != MyPState->ring_receive) { neon_shard_log(slot->shard_no, PANIC, - "Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu", + "Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "", slot->status, slot->response, - (long) slot->my_ring_index, (long) MyPState->ring_receive); + slot->my_ring_index, MyPState->ring_receive); } /* update prefetch state */ MyPState->n_responses_buffered += 1; @@ -789,9 +789,9 @@ prefetch_read(PrefetchRequest *slot) slot->my_ring_index != MyPState->ring_receive) { neon_shard_log(slot->shard_no, PANIC, - "Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu", + "Incorrect prefetch read: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "", slot->status, slot->response, - (long)slot->my_ring_index, (long)MyPState->ring_receive); + slot->my_ring_index, MyPState->ring_receive); } /* @@ -816,9 +816,9 @@ prefetch_read(PrefetchRequest *slot) slot->my_ring_index != MyPState->ring_receive) { neon_shard_log(shard_no, PANIC, - "Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu", + "Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "", slot->status, slot->response, - (long) slot->my_ring_index, (long) MyPState->ring_receive); + slot->my_ring_index, MyPState->ring_receive); } /* update prefetch state */ @@ -852,8 +852,8 @@ prefetch_read(PrefetchRequest *slot) * and the prefetch queue was flushed during the receive call */ neon_shard_log(shard_no, LOG, - "No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect", - (long) my_ring_index, + "No response from reading prefetch entry " UINT64_FORMAT ": %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect", + my_ring_index, RelFileInfoFmt(BufTagGetNRelFileInfo(buftag)), buftag.forkNum, buftag.blockNum); return false; @@ -1844,7 +1844,7 @@ nm_to_string(NeonMessage *msg) NeonDbSizeResponse *msg_resp = (NeonDbSizeResponse *) msg; appendStringInfoString(&s, "{\"type\": \"NeonDbSizeResponse\""); - appendStringInfo(&s, ", \"db_size\": %ld}", + appendStringInfo(&s, ", \"db_size\": " INT64_FORMAT "}", msg_resp->db_size); appendStringInfoChar(&s, '}'); @@ -2045,7 +2045,7 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r exists_resp->req.forknum != request.forknum) { NEON_PANIC_CONNECTION_STATE(0, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", + "Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(exists_resp->req.rinfo), exists_resp->req.forknum, request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), request.forknum); } @@ -2058,14 +2058,14 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r { if (!equal_requests(resp, &request.hdr)) { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", + elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since)); } } ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", + errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", resp->reqid, RelFileInfoFmt(rinfo), forkNum, @@ -2241,7 +2241,7 @@ Retry: case T_NeonErrorResponse: ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[shard %d, reqid %lx] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", + errmsg(NEON_TAG "[shard %d, reqid " UINT64_HEX_FORMAT "] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", slot->shard_no, resp->reqid, blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(reqlsns->effective_request_lsn)), errdetail("page server returned error: %s", @@ -2294,7 +2294,7 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns * relsize_resp->req.forknum != forknum) { NEON_PANIC_CONNECTION_STATE(0, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", + "Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(relsize_resp->req.rinfo), relsize_resp->req.forknum, request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), forknum); } @@ -2307,14 +2307,14 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns * { if (!equal_requests(resp, &request.hdr)) { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", + elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since)); } } ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", + errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", resp->reqid, RelFileInfoFmt(rinfo), forknum, @@ -2364,7 +2364,7 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns) dbsize_resp->req.dbNode != dbNode) { NEON_PANIC_CONNECTION_STATE(0, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u}", + "Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, dbNode=%u}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), dbsize_resp->req.dbNode, request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), dbNode); } @@ -2377,14 +2377,14 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns) { if (!equal_requests(resp, &request.hdr)) { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", + elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match get DB size request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since)); } } ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read db size of db %u from page server at lsn %X/%08X", + errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read db size of db %u from page server at lsn %X/%08X", resp->reqid, dbNode, LSN_FORMAT_ARGS(request_lsns->effective_request_lsn)), errdetail("page server returned error: %s", @@ -2455,7 +2455,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re slru_resp->req.segno != segno) { NEON_PANIC_CONNECTION_STATE(0, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%lluu}", + "Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%lluu}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), slru_resp->req.kind, slru_resp->req.segno, request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), kind, (unsigned long long) segno); } @@ -2469,14 +2469,14 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re { if (!equal_requests(resp, &request.hdr)) { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", + elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match get SLRU segment request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since)); } } ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read SLRU %d segment %llu at lsn %X/%08X", + errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read SLRU %d segment %llu at lsn %X/%08X", resp->reqid, kind, (unsigned long long) segno, diff --git a/pgxn/neon/neon_pgversioncompat.h b/pgxn/neon/neon_pgversioncompat.h index 787bd552f8..c7574ef0f9 100644 --- a/pgxn/neon/neon_pgversioncompat.h +++ b/pgxn/neon/neon_pgversioncompat.h @@ -165,4 +165,8 @@ extern void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags); extern TimeLineID GetWALInsertionTimeLine(void); #endif +/* format codes not present in PG17-; but available in PG18+ */ +#define INT64_HEX_FORMAT "%" INT64_MODIFIER "x" +#define UINT64_HEX_FORMAT "%" INT64_MODIFIER "x" + #endif /* NEON_PGVERSIONCOMPAT_H */ diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index b4bb193a4b..3c8db3029e 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -6,10 +6,10 @@ use std::error::Error as _; use http_utils::error::HttpErrorBody; -use reqwest::{IntoUrl, Method, StatusCode}; +use reqwest::{IntoUrl, Method, Response, StatusCode}; use safekeeper_api::models::{ self, PullTimelineRequest, PullTimelineResponse, SafekeeperStatus, SafekeeperUtilization, - TimelineCreateRequest, TimelineStatus, + TimelineCreateRequest, }; use utils::id::{NodeId, TenantId, TimelineId}; use utils::logging::SecretString; @@ -161,13 +161,12 @@ impl Client { &self, tenant_id: TenantId, timeline_id: TimelineId, - ) -> Result { + ) -> Result { let uri = format!( "{}/v1/tenant/{}/timeline/{}", self.mgmt_api_endpoint, tenant_id, timeline_id ); - let resp = self.get(&uri).await?; - resp.json().await.map_err(Error::ReceiveBody) + self.get(&uri).await } pub async fn snapshot( diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index b2d5976ef4..79cf2f9149 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -23,6 +23,7 @@ use safekeeper::defaults::{ DEFAULT_PARTIAL_BACKUP_CONCURRENCY, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE, DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE, }; +use safekeeper::hadron; use safekeeper::wal_backup::WalBackup; use safekeeper::{ BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, @@ -252,6 +253,10 @@ struct Args { /// Run in development mode (disables security checks) #[arg(long, help = "Run in development mode (disables security checks)")] dev: bool, + /* BEGIN_HADRON */ + #[arg(long)] + enable_pull_timeline_on_startup: bool, + /* END_HADRON */ } // Like PathBufValueParser, but allows empty string. @@ -435,6 +440,11 @@ async fn main() -> anyhow::Result<()> { use_https_safekeeper_api: args.use_https_safekeeper_api, enable_tls_wal_service_api: args.enable_tls_wal_service_api, force_metric_collection_on_scrape: args.force_metric_collection_on_scrape, + /* BEGIN_HADRON */ + advertise_pg_addr_tenant_only: None, + enable_pull_timeline_on_startup: args.enable_pull_timeline_on_startup, + hcc_base_url: None, + /* END_HADRON */ }); // initialize sentry if SENTRY_DSN is provided @@ -529,6 +539,20 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { // Load all timelines from disk to memory. global_timelines.init().await?; + /* BEGIN_HADRON */ + if conf.enable_pull_timeline_on_startup && global_timelines.timelines_count() == 0 { + match hadron::hcc_pull_timelines(&conf, global_timelines.clone()).await { + Ok(_) => { + info!("Successfully pulled all timelines from peer safekeepers"); + } + Err(e) => { + error!("Failed to pull timelines from peer safekeepers: {:?}", e); + return Err(e); + } + } + } + /* END_HADRON */ + // Run everything in current thread rt, if asked. if conf.current_thread_runtime { info!("running in current thread runtime"); diff --git a/safekeeper/src/hadron.rs b/safekeeper/src/hadron.rs new file mode 100644 index 0000000000..b41bf2c3da --- /dev/null +++ b/safekeeper/src/hadron.rs @@ -0,0 +1,388 @@ +use pem::Pem; +use safekeeper_api::models::PullTimelineRequest; +use std::{collections::HashMap, env::VarError, net::IpAddr, sync::Arc, time::Duration}; +use tokio::time::sleep; +use tokio_util::sync::CancellationToken; +use url::Url; +use utils::{backoff, id::TenantTimelineId, ip_address}; + +use anyhow::Result; +use pageserver_api::controller_api::{ + AvailabilityZone, NodeRegisterRequest, SafekeeperTimeline, SafekeeperTimelinesResponse, +}; + +use crate::{ + GlobalTimelines, SafeKeeperConf, + metrics::{ + SK_RECOVERY_PULL_TIMELINE_ERRORS, SK_RECOVERY_PULL_TIMELINE_OKS, + SK_RECOVERY_PULL_TIMELINE_SECONDS, SK_RECOVERY_PULL_TIMELINES_SECONDS, + }, + pull_timeline, + timelines_global_map::DeleteOrExclude, +}; + +// Extract information in the SafeKeeperConf to build a NodeRegisterRequest used to register the safekeeper with the HCC. +fn build_node_registeration_request( + conf: &SafeKeeperConf, + node_ip_addr: Option, +) -> Result { + let advertise_pg_addr_with_port = conf + .advertise_pg_addr_tenant_only + .as_deref() + .expect("advertise_pg_addr_tenant_only is required to register with HCC"); + + // Extract host/port from the string. + let (advertise_host_addr, pg_port_str) = advertise_pg_addr_with_port.split_at( + advertise_pg_addr_with_port + .rfind(':') + .ok_or(anyhow::anyhow!("Invalid advertise_pg_addr"))?, + ); + // Need the `[1..]` to remove the leading ':'. + let pg_port = pg_port_str[1..] + .parse::() + .map_err(|e| anyhow::anyhow!("Cannot parse PG port: {}", e))?; + + let (_, http_port_str) = conf.listen_http_addr.split_at( + conf.listen_http_addr + .rfind(':') + .ok_or(anyhow::anyhow!("Invalid listen_http_addr"))?, + ); + let http_port = http_port_str[1..] + .parse::() + .map_err(|e| anyhow::anyhow!("Cannot parse HTTP port: {}", e))?; + + Ok(NodeRegisterRequest { + node_id: conf.my_id, + listen_pg_addr: advertise_host_addr.to_string(), + listen_pg_port: pg_port, + listen_http_addr: advertise_host_addr.to_string(), + listen_http_port: http_port, + node_ip_addr, + availability_zone_id: AvailabilityZone("todo".to_string()), + listen_grpc_addr: None, + listen_grpc_port: None, + listen_https_port: None, + }) +} + +// Retrieve the JWT token used for authenticating with HCC from the environment variable. +// Returns None if the token cannot be retrieved. +fn get_hcc_auth_token() -> Option { + match std::env::var("HCC_AUTH_TOKEN") { + Ok(v) => { + tracing::info!("Loaded JWT token for authentication with HCC"); + Some(v) + } + Err(VarError::NotPresent) => { + tracing::info!("No JWT token for authentication with HCC detected"); + None + } + Err(_) => { + tracing::info!( + "Failed to either load to detect non-present HCC_AUTH_TOKEN environment variable" + ); + None + } + } +} + +async fn send_safekeeper_register_request( + request_url: &Url, + auth_token: &Option, + request: &NodeRegisterRequest, +) -> Result<()> { + let client = reqwest::Client::new(); + let mut req_builder = client + .post(request_url.clone()) + .header("Content-Type", "application/json"); + if let Some(token) = auth_token { + req_builder = req_builder.bearer_auth(token); + } + req_builder + .json(&request) + .send() + .await? + .error_for_status()?; + Ok(()) +} + +/// Registers this safe keeper with the HCC. +pub async fn register(conf: &SafeKeeperConf) -> Result<()> { + match conf.hcc_base_url.as_ref() { + None => { + tracing::info!("HCC base URL is not set, skipping registration"); + Ok(()) + } + Some(hcc_base_url) => { + // The following operations acquiring the auth token and the node IP address both read environment + // variables. It's fine for now as this `register()` function is only called once during startup. + // If we start to talk to HCC more regularly in the safekeeper we should probably consider + // refactoring things into a "HadronClusterCoordinatorClient" struct. + let auth_token = get_hcc_auth_token(); + let node_ip_addr = + ip_address::read_node_ip_addr_from_env().expect("Error reading node IP address."); + + let request = build_node_registeration_request(conf, node_ip_addr)?; + let cancel = CancellationToken::new(); + let request_url = hcc_base_url.clone().join("/hadron-internal/v1/sk")?; + + backoff::retry( + || async { + send_safekeeper_register_request(&request_url, &auth_token, &request).await + }, + |_| false, + 3, + u32::MAX, + "Calling the HCC safekeeper register API", + &cancel, + ) + .await + .ok_or(anyhow::anyhow!( + "Error in forever retry loop. This error should never be surfaced." + ))? + } + } +} + +async fn safekeeper_list_timelines_request( + conf: &SafeKeeperConf, +) -> Result { + if conf.hcc_base_url.is_none() { + tracing::info!("HCC base URL is not set, skipping registration"); + return Err(anyhow::anyhow!("HCC base URL is not set")); + } + + // The following operations acquiring the auth token and the node IP address both read environment + // variables. It's fine for now as this `register()` function is only called once during startup. + // If we start to talk to HCC more regularly in the safekeeper we should probably consider + // refactoring things into a "HadronClusterCoordinatorClient" struct. + let auth_token = get_hcc_auth_token(); + let method = format!("/control/v1/safekeeper/{}/timelines", conf.my_id.0); + let request_url = conf.hcc_base_url.as_ref().unwrap().clone().join(&method)?; + + let client = reqwest::Client::new(); + let mut req_builder = client + .get(request_url.clone()) + .header("Content-Type", "application/json") + .query(&[("id", conf.my_id.0)]); + if let Some(token) = auth_token { + req_builder = req_builder.bearer_auth(token); + } + let response = req_builder + .send() + .await? + .error_for_status()? + .json::() + .await?; + Ok(response) +} + +// Returns true on success, false otherwise. +pub async fn hcc_pull_timeline( + timeline: SafekeeperTimeline, + conf: &SafeKeeperConf, + global_timelines: Arc, + nodeid_http: &HashMap, +) -> bool { + let mut request = PullTimelineRequest { + tenant_id: timeline.tenant_id, + timeline_id: timeline.timeline_id, + http_hosts: Vec::new(), + ignore_tombstone: None, + }; + for host in timeline.peers { + if host.0 == conf.my_id.0 { + continue; + } + if let Some(http_host) = nodeid_http.get(&host.0) { + request.http_hosts.push(http_host.clone()); + } + } + + let ca_certs = match conf + .ssl_ca_certs + .iter() + .map(Pem::contents) + .map(reqwest::Certificate::from_der) + .collect::, _>>() + { + Ok(result) => result, + Err(_) => { + return false; + } + }; + match pull_timeline::handle_request( + request, + conf.sk_auth_token.clone(), + ca_certs, + global_timelines.clone(), + true, + ) + .await + { + Ok(resp) => { + tracing::info!( + "Completed pulling tenant {} timeline {} from SK {:?}", + timeline.tenant_id, + timeline.timeline_id, + resp.safekeeper_host + ); + return true; + } + Err(e) => { + tracing::error!( + "Failed to pull tenant {} timeline {} from SK {}", + timeline.tenant_id, + timeline.timeline_id, + e + ); + + let ttid = TenantTimelineId { + tenant_id: timeline.tenant_id, + timeline_id: timeline.timeline_id, + }; + // Revert the failed timeline pull. + // Notice that not found timeline returns OK also. + match global_timelines + .delete_or_exclude(&ttid, DeleteOrExclude::DeleteLocal) + .await + { + Ok(dr) => { + tracing::info!( + "Deleted tenant {} timeline {} DirExists: {}", + timeline.tenant_id, + timeline.timeline_id, + dr.dir_existed, + ); + } + Err(e) => { + tracing::error!( + "Failed to delete tenant {} timeline {} from global_timelines: {}", + timeline.tenant_id, + timeline.timeline_id, + e + ); + } + } + } + } + false +} + +pub async fn hcc_pull_timeline_till_success( + timeline: SafekeeperTimeline, + conf: &SafeKeeperConf, + global_timelines: Arc, + nodeid_http: &HashMap, +) { + const MAX_PULL_TIMELINE_RETRIES: u64 = 100; + for i in 0..MAX_PULL_TIMELINE_RETRIES { + if hcc_pull_timeline( + timeline.clone(), + conf, + global_timelines.clone(), + nodeid_http, + ) + .await + { + SK_RECOVERY_PULL_TIMELINE_OKS.inc(); + return; + } + tracing::error!( + "Failed to pull timeline {} from SK peers, retrying {}/{}", + timeline.timeline_id, + i + 1, + MAX_PULL_TIMELINE_RETRIES + ); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + SK_RECOVERY_PULL_TIMELINE_ERRORS.inc(); +} + +pub async fn hcc_pull_timelines( + conf: &SafeKeeperConf, + global_timelines: Arc, +) -> Result<()> { + let _timer = SK_RECOVERY_PULL_TIMELINES_SECONDS.start_timer(); + tracing::info!("Start pulling timelines from SK peers"); + + let mut response = SafekeeperTimelinesResponse { + timelines: Vec::new(), + safekeeper_peers: Vec::new(), + }; + for i in 0..100 { + match safekeeper_list_timelines_request(conf).await { + Ok(timelines) => { + response = timelines; + } + Err(e) => { + tracing::error!("Failed to list timelines from HCC: {}", e); + if i == 99 { + return Err(e); + } + } + } + sleep(Duration::from_millis(100)).await; + } + + let mut nodeid_http = HashMap::new(); + for sk in response.safekeeper_peers { + nodeid_http.insert( + sk.node_id.0, + format!("http://{}:{}", sk.listen_http_addr, sk.http_port), + ); + } + tracing::info!("Received {} timelines from HCC", response.timelines.len()); + for timeline in response.timelines { + let _timer = SK_RECOVERY_PULL_TIMELINE_SECONDS + .with_label_values(&[ + &timeline.tenant_id.to_string(), + &timeline.timeline_id.to_string(), + ]) + .start_timer(); + hcc_pull_timeline_till_success(timeline, conf, global_timelines.clone(), &nodeid_http) + .await; + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use utils::id::NodeId; + + #[test] + fn test_build_node_registeration_request() { + // Test that: + // 1. We always extract the host name and port used to register with the HCC from the + // `advertise_pg_addr` if it is set. + // 2. The correct ports are extracted from `advertise_pg_addr` and `listen_http_addr`. + let mut conf = SafeKeeperConf::dummy(); + conf.my_id = NodeId(1); + conf.advertise_pg_addr_tenant_only = + Some("safe-keeper-1.safe-keeper.hadron.svc.cluster.local:5454".to_string()); + // `listen_pg_addr` and `listen_pg_addr_tenant_only` are not used for node registration. Set them to a different + // host and port values and make sure that they don't show up in the node registration request. + conf.listen_pg_addr = "0.0.0.0:5456".to_string(); + conf.listen_pg_addr_tenant_only = Some("0.0.0.0:5456".to_string()); + conf.listen_http_addr = "0.0.0.0:7676".to_string(); + let node_ip_addr: Option = Some("127.0.0.1".parse().unwrap()); + + let request = build_node_registeration_request(&conf, node_ip_addr).unwrap(); + assert_eq!(request.node_id, NodeId(1)); + assert_eq!( + request.listen_pg_addr, + "safe-keeper-1.safe-keeper.hadron.svc.cluster.local" + ); + assert_eq!(request.listen_pg_port, 5454); + assert_eq!( + request.listen_http_addr, + "safe-keeper-1.safe-keeper.hadron.svc.cluster.local" + ); + assert_eq!(request.listen_http_port, 7676); + assert_eq!( + request.node_ip_addr, + Some(IpAddr::V4("127.0.0.1".parse().unwrap())) + ); + } +} diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 4b061c65d9..a0ee2facb5 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -241,9 +241,14 @@ async fn timeline_pull_handler(mut request: Request) -> Result, pub availability_zone: Option, pub no_sync: bool, + /* BEGIN_HADRON */ + pub advertise_pg_addr_tenant_only: Option, + pub enable_pull_timeline_on_startup: bool, + pub hcc_base_url: Option, + /* END_HADRON */ pub broker_endpoint: Uri, pub broker_keepalive_interval: Duration, pub heartbeat_timeout: Duration, @@ -185,6 +192,11 @@ impl SafeKeeperConf { use_https_safekeeper_api: false, enable_tls_wal_service_api: false, force_metric_collection_on_scrape: true, + /* BEGIN_HADRON */ + advertise_pg_addr_tenant_only: None, + enable_pull_timeline_on_startup: false, + hcc_base_url: None, + /* END_HADRON */ } } } diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 1f98651e71..e1af51c115 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -85,6 +85,43 @@ pub static WAL_STORAGE_LIMIT_ERRORS: Lazy = Lazy::new(|| { ) .expect("Failed to register safekeeper_wal_storage_limit_errors counter") }); +pub static SK_RECOVERY_PULL_TIMELINE_ERRORS: Lazy = Lazy::new(|| { + register_int_counter!( + "safekeeper_recovery_pull_timeline_errors", + concat!( + "Number of errors due to pull_timeline errors during SK lost disk recovery.", + "An increase in this metric indicates pull timelines runs into error." + ) + ) + .expect("Failed to register safekeeper_recovery_pull_timeline_errors counter") +}); +pub static SK_RECOVERY_PULL_TIMELINE_OKS: Lazy = Lazy::new(|| { + register_int_counter!( + "safekeeper_recovery_pull_timeline_oks", + concat!( + "Number of successful pull_timeline during SK lost disk recovery.", + "An increase in this metric indicates pull timelines is successful." + ) + ) + .expect("Failed to register safekeeper_recovery_pull_timeline_oks counter") +}); +pub static SK_RECOVERY_PULL_TIMELINES_SECONDS: Lazy = Lazy::new(|| { + register_histogram!( + "safekeeper_recovery_pull_timelines_seconds", + "Seconds to pull timelines", + DISK_FSYNC_SECONDS_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_recovery_pull_timelines_seconds histogram") +}); +pub static SK_RECOVERY_PULL_TIMELINE_SECONDS: Lazy = Lazy::new(|| { + register_histogram_vec!( + "safekeeper_recovery_pull_timeline_seconds", + "Seconds to pull timeline", + &["tenant_id", "timeline_id"], + DISK_FSYNC_SECONDS_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_recovery_pull_timeline_seconds histogram vec") +}); /* END_HADRON */ pub static PERSIST_CONTROL_FILE_SECONDS: Lazy = Lazy::new(|| { register_histogram!( diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 1c9e5bade5..b4c4877b2c 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -8,6 +8,7 @@ use bytes::Bytes; use camino::Utf8PathBuf; use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt, TryStreamExt}; +use http::StatusCode; use http_utils::error::ApiError; use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo}; use remote_storage::GenericRemoteStorage; @@ -21,10 +22,11 @@ use tokio::fs::OpenOptions; use tokio::io::AsyncWrite; use tokio::sync::mpsc; use tokio::task; +use tokio::time::sleep; use tokio_tar::{Archive, Builder, Header}; use tokio_util::io::{CopyToBytes, SinkWriter}; use tokio_util::sync::PollSender; -use tracing::{error, info, instrument}; +use tracing::{error, info, instrument, warn}; use utils::crashsafe::fsync_async_opt; use utils::id::{NodeId, TenantTimelineId}; use utils::logging::SecretString; @@ -449,6 +451,7 @@ pub async fn handle_request( sk_auth_token: Option, ssl_ca_certs: Vec, global_timelines: Arc, + wait_for_peer_timeline_status: bool, ) -> Result { let existing_tli = global_timelines.get(TenantTimelineId::new( request.tenant_id, @@ -472,37 +475,100 @@ pub async fn handle_request( let http_hosts = request.http_hosts.clone(); // Figure out statuses of potential donors. - let responses: Vec> = - futures::future::join_all(http_hosts.iter().map(|url| async { - let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone()); - let info = cclient - .timeline_status(request.tenant_id, request.timeline_id) - .await?; - Ok(info) - })) - .await; - let mut statuses = Vec::new(); - for (i, response) in responses.into_iter().enumerate() { - match response { - Ok(status) => { - statuses.push((status, i)); - } - Err(e) => { - info!("error fetching status from {}: {e}", http_hosts[i]); + if !wait_for_peer_timeline_status { + let responses: Vec> = + futures::future::join_all(http_hosts.iter().map(|url| async { + let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone()); + let resp = cclient + .timeline_status(request.tenant_id, request.timeline_id) + .await?; + let info: TimelineStatus = resp + .json() + .await + .context("Failed to deserialize timeline status") + .map_err(|e| mgmt_api::Error::ReceiveErrorBody(e.to_string()))?; + Ok(info) + })) + .await; + + for (i, response) in responses.into_iter().enumerate() { + match response { + Ok(status) => { + statuses.push((status, i)); + } + Err(e) => { + info!("error fetching status from {}: {e}", http_hosts[i]); + } } } - } - // Allow missing responses from up to one safekeeper (say due to downtime) - // e.g. if we created a timeline on PS A and B, with C being offline. Then B goes - // offline and C comes online. Then we want a pull on C with A and B as hosts to work. - let min_required_successful = (http_hosts.len() - 1).max(1); - if statuses.len() < min_required_successful { - return Err(ApiError::InternalServerError(anyhow::anyhow!( - "only got {} successful status responses. required: {min_required_successful}", - statuses.len() - ))); + // Allow missing responses from up to one safekeeper (say due to downtime) + // e.g. if we created a timeline on PS A and B, with C being offline. Then B goes + // offline and C comes online. Then we want a pull on C with A and B as hosts to work. + let min_required_successful = (http_hosts.len() - 1).max(1); + if statuses.len() < min_required_successful { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "only got {} successful status responses. required: {min_required_successful}", + statuses.len() + ))); + } + } else { + let mut retry = true; + // We must get status from all other peers. + // Otherwise, we may run into split-brain scenario. + while retry { + statuses.clear(); + retry = false; + for (i, url) in http_hosts.iter().enumerate() { + let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone()); + match cclient + .timeline_status(request.tenant_id, request.timeline_id) + .await + { + Ok(resp) => { + if resp.status() == StatusCode::NOT_FOUND { + warn!( + "Timeline {} not found on peer SK {}, no need to pull it", + TenantTimelineId::new(request.tenant_id, request.timeline_id), + url + ); + return Ok(PullTimelineResponse { + safekeeper_host: None, + }); + } + let info: TimelineStatus = resp + .json() + .await + .context("Failed to deserialize timeline status") + .map_err(ApiError::InternalServerError)?; + statuses.push((info, i)); + } + Err(e) => { + match e { + // If we get a 404, it means the timeline doesn't exist on this safekeeper. + // We can ignore this error. + mgmt_api::Error::ApiError(status, _) + if status == StatusCode::NOT_FOUND => + { + warn!( + "Timeline {} not found on peer SK {}, no need to pull it", + TenantTimelineId::new(request.tenant_id, request.timeline_id), + url + ); + return Ok(PullTimelineResponse { + safekeeper_host: None, + }); + } + _ => {} + } + retry = true; + error!("Failed to get timeline status from {}: {:#}", url, e); + } + } + } + sleep(std::time::Duration::from_millis(100)).await; + } } // Find the most advanced safekeeper @@ -511,6 +577,12 @@ pub async fn handle_request( .max_by_key(|(status, _)| { ( status.acceptor_state.epoch, + /* BEGIN_HADRON */ + // We need to pull from the SK with the highest term. + // This is because another compute may come online and vote the same highest term again on the other two SKs. + // Then, there will be 2 computes running on the same term. + status.acceptor_state.term, + /* END_HADRON */ status.flush_lsn, status.commit_lsn, ) diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 280cd790a4..393df6228e 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -191,6 +191,11 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { use_https_safekeeper_api: false, enable_tls_wal_service_api: false, force_metric_collection_on_scrape: true, + /* BEGIN_HADRON */ + enable_pull_timeline_on_startup: false, + advertise_pg_addr_tenant_only: None, + hcc_base_url: None, + /* END_HADRON */ }; let mut global = GlobalMap::new(disk, conf.clone())?; diff --git a/test_runner/fixtures/neon_api.py b/test_runner/fixtures/neon_api.py index 9d85b9a332..bb618325e0 100644 --- a/test_runner/fixtures/neon_api.py +++ b/test_runner/fixtures/neon_api.py @@ -34,7 +34,9 @@ class NeonAPI: self.retries524 = 0 self.retries4xx = 0 - def __request(self, method: str | bytes, endpoint: str, **kwargs: Any) -> requests.Response: + def __request( + self, method: str | bytes, endpoint: str, retry404: bool = False, **kwargs: Any + ) -> requests.Response: kwargs["headers"] = kwargs.get("headers", {}) kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}" @@ -55,10 +57,12 @@ class NeonAPI: resp.raise_for_status() break elif resp.status_code >= 400: - if resp.status_code == 422: - if resp.json()["message"] == "branch not ready yet": - retry = True - self.retries4xx += 1 + if resp.status_code == 404 and retry404: + retry = True + self.retries4xx += 1 + elif resp.status_code == 422 and resp.json()["message"] == "branch not ready yet": + retry = True + self.retries4xx += 1 elif resp.status_code == 423 and resp.json()["message"] in { "endpoint is in some transitive state, could not suspend", "project already has running conflicting operations, scheduling of new ones is prohibited", @@ -66,7 +70,7 @@ class NeonAPI: retry = True self.retries4xx += 1 elif resp.status_code == 524: - log.info("The request was timed out, trying to get operations") + log.info("The request was timed out") retry = True self.retries524 += 1 if retry: @@ -203,6 +207,9 @@ class NeonAPI: resp = self.__request( "GET", f"/projects/{project_id}/branches/{branch_id}", + # XXX Retry get parent details to work around the issue + # https://databricks.atlassian.net/browse/LKB-279 + retry404=True, headers={ "Accept": "application/json", }, @@ -307,6 +314,10 @@ class NeonAPI: if endpoint_type: data["endpoint"]["type"] = endpoint_type if settings: + # otherwise we get 400 "settings must not be nil" + # TODO(myrrc): fix on cplane side + if "pg_settings" not in settings: + settings["pg_settings"] = {} data["endpoint"]["settings"] = settings resp = self.__request( diff --git a/test_runner/performance/test_lfc_prewarm.py b/test_runner/performance/test_lfc_prewarm.py new file mode 100644 index 0000000000..6c0083de95 --- /dev/null +++ b/test_runner/performance/test_lfc_prewarm.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +import os +import timeit +import traceback +from concurrent.futures import ThreadPoolExecutor as Exec +from pathlib import Path +from time import sleep +from typing import TYPE_CHECKING, Any, cast + +import pytest +from fixtures.benchmark_fixture import NeonBenchmarker, PgBenchRunResult +from fixtures.log_helper import log +from fixtures.neon_api import NeonAPI, connection_parameters_to_env + +if TYPE_CHECKING: + from fixtures.compare_fixtures import NeonCompare + from fixtures.neon_fixtures import Endpoint, PgBin + from fixtures.pg_version import PgVersion + +from performance.test_perf_pgbench import utc_now_timestamp + +# These tests compare performance for a write-heavy and read-heavy workloads of an ordinary endpoint +# compared to the endpoint which saves its LFC and prewarms using it on startup. + + +def test_compare_prewarmed_pgbench_perf(neon_compare: NeonCompare): + env = neon_compare.env + env.create_branch("normal") + env.create_branch("prewarmed") + pg_bin = neon_compare.pg_bin + ep_normal: Endpoint = env.endpoints.create_start("normal") + ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True) + + for ep in [ep_normal, ep_prewarmed]: + connstr: str = ep.connstr() + pg_bin.run(["pgbench", "-i", "-I", "dtGvp", connstr, "-s100"]) + ep.safe_psql("CREATE EXTENSION neon") + client = ep.http_client() + client.offload_lfc() + ep.stop() + ep.start() + client.prewarm_lfc_wait() + + run_start_timestamp = utc_now_timestamp() + t0 = timeit.default_timer() + out = pg_bin.run_capture(["pgbench", "-c10", "-T10", connstr]) + run_duration = timeit.default_timer() - t0 + run_end_timestamp = utc_now_timestamp() + + stdout = Path(f"{out}.stdout").read_text() + res = PgBenchRunResult.parse_from_stdout( + stdout=stdout, + run_duration=run_duration, + run_start_timestamp=run_start_timestamp, + run_end_timestamp=run_end_timestamp, + ) + name: str = cast("str", ep.branch_name) + neon_compare.zenbenchmark.record_pg_bench_result(name, res) + + +@pytest.mark.remote_cluster +@pytest.mark.timeout(2 * 60 * 60) +def test_compare_prewarmed_pgbench_perf_benchmark( + pg_bin: PgBin, + neon_api: NeonAPI, + pg_version: PgVersion, + zenbenchmark: NeonBenchmarker, +): + name = f"Test prewarmed pgbench performance, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}" + project = neon_api.create_project(pg_version, name) + project_id = project["project"]["id"] + neon_api.wait_for_operation_to_finish(project_id) + err = False + try: + benchmark_impl(pg_bin, neon_api, project, zenbenchmark) + except Exception as e: + err = True + log.error(f"Caught exception: {e}") + log.error(traceback.format_exc()) + finally: + assert not err + neon_api.delete_project(project_id) + + +def benchmark_impl( + pg_bin: PgBin, neon_api: NeonAPI, project: dict[str, Any], zenbenchmark: NeonBenchmarker +): + pgbench_size = int(os.getenv("PGBENCH_SIZE") or "3424") # 50GB + offload_secs = 20 + test_duration_min = 5 + pgbench_duration = f"-T{test_duration_min * 60}" + # prewarm API is not publicly exposed. In order to test performance of a + # fully prewarmed endpoint, wait after it restarts. + # The number here is empirical, based on manual runs on staging + prewarmed_sleep_secs = 180 + + branch_id = project["branch"]["id"] + project_id = project["project"]["id"] + normal_env = connection_parameters_to_env( + project["connection_uris"][0]["connection_parameters"] + ) + normal_id = project["endpoints"][0]["id"] + + prewarmed_branch_id = neon_api.create_branch( + project_id, "prewarmed", parent_id=branch_id, add_endpoint=False + )["branch"]["id"] + neon_api.wait_for_operation_to_finish(project_id) + + ep_prewarmed = neon_api.create_endpoint( + project_id, + prewarmed_branch_id, + endpoint_type="read_write", + settings={"autoprewarm": True, "offload_lfc_interval_seconds": offload_secs}, + ) + neon_api.wait_for_operation_to_finish(project_id) + + prewarmed_env = normal_env.copy() + prewarmed_env["PGHOST"] = ep_prewarmed["endpoint"]["host"] + prewarmed_id = ep_prewarmed["endpoint"]["id"] + + def bench(endpoint_name, endpoint_id, env): + pg_bin.run(["pgbench", "-i", "-I", "dtGvp", f"-s{pgbench_size}"], env) + sleep(offload_secs * 2) # ensure LFC is offloaded after pgbench finishes + neon_api.restart_endpoint(project_id, endpoint_id) + sleep(prewarmed_sleep_secs) + + run_start_timestamp = utc_now_timestamp() + t0 = timeit.default_timer() + out = pg_bin.run_capture(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env) + run_duration = timeit.default_timer() - t0 + run_end_timestamp = utc_now_timestamp() + + stdout = Path(f"{out}.stdout").read_text() + res = PgBenchRunResult.parse_from_stdout( + stdout=stdout, + run_duration=run_duration, + run_start_timestamp=run_start_timestamp, + run_end_timestamp=run_end_timestamp, + ) + zenbenchmark.record_pg_bench_result(endpoint_name, res) + + with Exec(max_workers=2) as exe: + exe.submit(bench, "normal", normal_id, normal_env) + exe.submit(bench, "prewarmed", prewarmed_id, prewarmed_env) + + +def test_compare_prewarmed_read_perf(neon_compare: NeonCompare): + env = neon_compare.env + env.create_branch("normal") + env.create_branch("prewarmed") + ep_normal: Endpoint = env.endpoints.create_start("normal") + ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True) + + sql = [ + "CREATE EXTENSION neon", + "CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')", + "INSERT INTO foo SELECT FROM generate_series(1,1000000)", + ] + for ep in [ep_normal, ep_prewarmed]: + ep.safe_psql_many(sql) + client = ep.http_client() + client.offload_lfc() + ep.stop() + ep.start() + client.prewarm_lfc_wait() + with neon_compare.record_duration(f"{ep.branch_name}_run_duration"): + ep.safe_psql("SELECT count(*) from foo") diff --git a/test_runner/random_ops/test_random_ops.py b/test_runner/random_ops/test_random_ops.py index 5c43b06bc5..b106e9b729 100644 --- a/test_runner/random_ops/test_random_ops.py +++ b/test_runner/random_ops/test_random_ops.py @@ -13,7 +13,6 @@ from typing import TYPE_CHECKING, Any import pytest from fixtures.log_helper import log -from requests import HTTPError if TYPE_CHECKING: from pathlib import Path @@ -153,26 +152,11 @@ class NeonBranch: return self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"]) parent_id: str = res["branch"]["parent_id"] - # XXX Retry get parent details to work around the issue - # https://databricks.atlassian.net/browse/LKB-279 - target_time = datetime.now() + timedelta(seconds=30) - while datetime.now() < target_time: - try: - parent_def = self.neon_api.get_branch_details(self.project_id, parent_id) - except HTTPError as he: - if he.response.status_code == 404: - log.info("Branch not found, waiting...") - time.sleep(1) - else: - raise HTTPError(he) from he - else: - break - else: - raise RuntimeError(f"Branch {parent_id} not found") - # Creates an object for the parent branch # After the reset operation a new parent branch is created - parent = NeonBranch(self.project, parent_def, True) + parent = NeonBranch( + self.project, self.neon_api.get_branch_details(self.project_id, parent_id), True + ) self.project.branches[parent_id] = parent self.parent = parent parent.children[self.id] = self diff --git a/test_runner/regress/test_wal_restore.py b/test_runner/regress/test_wal_restore.py index 0bb63308bb..573016f772 100644 --- a/test_runner/regress/test_wal_restore.py +++ b/test_runner/regress/test_wal_restore.py @@ -3,6 +3,7 @@ from __future__ import annotations import sys import tarfile import tempfile +from pathlib import Path from typing import TYPE_CHECKING import pytest @@ -198,3 +199,115 @@ def test_wal_restore_http(neon_env_builder: NeonEnvBuilder, broken_tenant: bool) # the table is back now! restored = env.endpoints.create_start("main") assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)] + + +# BEGIN_HADRON +# TODO: re-enable once CM python is integreated. +# def clear_directory(directory): +# for item in os.listdir(directory): +# item_path = os.path.join(directory, item) +# if os.path.isdir(item_path): +# log.info(f"removing SK directory: {item_path}") +# shutil.rmtree(item_path) +# else: +# log.info(f"removing SK file: {item_path}") +# os.remove(item_path) + + +# def test_sk_pull_timelines( +# neon_env_builder: NeonEnvBuilder, +# ): +# DBNAME = "regression" +# superuser_name = "databricks_superuser" +# neon_env_builder.num_safekeepers = 3 +# neon_env_builder.num_pageservers = 4 +# neon_env_builder.safekeeper_extra_opts = ["--enable-pull-timeline-on-startup"] +# neon_env_builder.enable_safekeeper_remote_storage(s3_storage()) + +# env = neon_env_builder.init_start(initial_tenant_shard_count=4) + +# env.compute_manager.start(base_port=env.compute_manager_port) + +# test_creator = "test_creator" +# test_metastore_id = uuid4() +# test_account_id = uuid4() +# test_workspace_id = 1 +# test_workspace_url = "http://test_workspace_url" +# test_metadata_version = 1 +# test_metadata = { +# "state": "INSTANCE_PROVISIONING", +# "admin_rolename": "admin", +# "admin_password_scram": "abc123456", +# } + +# test_instance_name_1 = "test_instance_1" +# test_instance_read_write_compute_pool_1 = { +# "instance_name": test_instance_name_1, +# "compute_pool_name": "compute_pool_1", +# "creator": test_creator, +# "capacity": 2.0, +# "node_count": 1, +# "metadata_version": 0, +# "metadata": { +# "state": "INSTANCE_PROVISIONING", +# }, +# } + +# test_instance_1_readable_secondaries_enabled = False + +# # Test creation +# create_instance_with_retries( +# env, +# test_instance_name_1, +# test_creator, +# test_metastore_id, +# test_account_id, +# test_workspace_id, +# test_workspace_url, +# test_instance_read_write_compute_pool_1, +# test_metadata_version, +# test_metadata, +# test_instance_1_readable_secondaries_enabled, +# ) +# instance = env.compute_manager.get_instance_by_name(test_instance_name_1, test_workspace_id) +# log.info(f"haoyu Instance created: {instance}") +# assert instance["instance_name"] == test_instance_name_1 +# test_instance_id = instance["instance_id"] +# instance_detail = env.compute_manager.describe_instance(test_instance_id) +# log.info(f"haoyu Instance detail: {instance_detail}") + +# env.initial_tenant = instance_detail[0]["tenant_id"] +# env.initial_timeline = instance_detail[0]["timeline_id"] + +# # Connect to postgres and create a database called "regression". +# endpoint = env.endpoints.create_start("main") +# endpoint.safe_psql(f"CREATE ROLE {superuser_name}") +# endpoint.safe_psql(f"CREATE DATABASE {DBNAME}") + +# endpoint.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);") +# # Write some data. ~20 MB. +# num_rows = 0 +# for _i in range(0, 20000): +# endpoint.safe_psql( +# "INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False +# ) +# num_rows += 1 + +# log.info(f"SKs {env.storage_controller.hcc_sk_node_list()}") + +# env.safekeepers[0].stop(immediate=True) +# clear_directory(env.safekeepers[0].data_dir) +# env.safekeepers[0].start() + +# # PG can still write data. ~20 MB. +# for _i in range(0, 20000): +# endpoint.safe_psql( +# "INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False +# ) +# num_rows += 1 + +# tuples = endpoint.safe_psql("SELECT COUNT(*) FROM usertable;") +# assert tuples[0][0] == num_rows +# endpoint.stop_and_destroy() + +# END_HADRON diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index fc01deb92d..c61598cdf6 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -98,7 +98,7 @@ tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unpref time = { version = "0.3", features = ["macros", "serde-well-known"] } tokio = { version = "1", features = ["full", "test-util"] } tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] } -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { version = "0.1", features = ["net", "sync"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] } toml_edit = { version = "0.22", features = ["serde"] } tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }