diff --git a/Cargo.lock b/Cargo.lock index d66b383ed1..80546b884d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4294,6 +4294,7 @@ dependencies = [ "humantime-serde", "pageserver_api", "pageserver_client", + "pageserver_client_grpc", "pageserver_page_api", "rand 0.8.5", "reqwest", @@ -4323,6 +4324,7 @@ dependencies = [ "pageserver_api", "postgres_ffi", "remote_storage", + "serde", "serde_json", "svg_fmt", "thiserror 1.0.69", @@ -4499,6 +4501,7 @@ name = "pageserver_client_grpc" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "bytes", "compute_api", "futures", diff --git a/Cargo.toml b/Cargo.toml index 14f2cfcb56..0d521ee4d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -262,6 +262,7 @@ neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" } pageserver = { path = "./pageserver" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } pageserver_client = { path = "./pageserver/client" } +pageserver_client_grpc = { path = "./pageserver/client_grpc" } pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" } pageserver_page_api = { path = "./pageserver/page_api" } postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" } diff --git a/compute_tools/README.md b/compute_tools/README.md index 8d84031efc..49f1368f0e 100644 --- a/compute_tools/README.md +++ b/compute_tools/README.md @@ -46,11 +46,14 @@ stateDiagram-v2 Configuration --> Failed : Failed to configure the compute Configuration --> Running : Compute has been configured Empty --> Init : Compute spec is immediately available - Empty --> TerminationPending : Requested termination + Empty --> TerminationPendingFast : Requested termination + Empty --> TerminationPendingImmediate : Requested termination Init --> Failed : Failed to start Postgres Init --> Running : Started Postgres - Running --> TerminationPending : Requested termination - TerminationPending --> Terminated : Terminated compute + Running --> TerminationPendingFast : Requested termination + Running --> TerminationPendingImmediate : Requested termination + TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status + TerminationPendingImmediate --> Terminated : Terminated compute immediately Failed --> [*] : Compute exited Terminated --> [*] : Compute exited ``` diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 9b5a464432..f6d1d0e8f2 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -961,14 +961,20 @@ impl ComputeNode { None }; - let mut delay_exit = false; let mut state = self.state.lock().unwrap(); state.terminate_flush_lsn = lsn; - if let ComputeStatus::TerminationPending { mode } = state.status { + + let delay_exit = state.status == ComputeStatus::TerminationPendingFast; + if state.status == ComputeStatus::TerminationPendingFast + || state.status == ComputeStatus::TerminationPendingImmediate + { + info!( + "Changing compute status from {} to {}", + state.status, + ComputeStatus::Terminated + ); state.status = ComputeStatus::Terminated; self.state_changed.notify_all(); - // we were asked to terminate gracefully, don't exit to avoid restart - delay_exit = mode == compute_api::responses::TerminateMode::Fast } drop(state); @@ -1810,6 +1816,8 @@ impl ComputeNode { tls_config, )?; + self.pg_reload_conf()?; + if !spec.skip_pg_catalog_updates { let max_concurrent_connections = spec.reconfigure_concurrency; // Temporarily reset max_cluster_size in config @@ -1829,10 +1837,9 @@ impl ComputeNode { Ok(()) })?; + self.pg_reload_conf()?; } - self.pg_reload_conf()?; - let unknown_op = "unknown".to_string(); let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op); info!( @@ -1905,7 +1912,8 @@ impl ComputeNode { // exit loop ComputeStatus::Failed - | ComputeStatus::TerminationPending { .. } + | ComputeStatus::TerminationPendingFast + | ComputeStatus::TerminationPendingImmediate | ComputeStatus::Terminated => break 'cert_update, // wait diff --git a/compute_tools/src/compute_prewarm.rs b/compute_tools/src/compute_prewarm.rs index 3f6f9a7ecc..d014a5bb72 100644 --- a/compute_tools/src/compute_prewarm.rs +++ b/compute_tools/src/compute_prewarm.rs @@ -70,7 +70,7 @@ impl ComputeNode { } }; let row = match client - .query_one("select * from get_prewarm_info()", &[]) + .query_one("select * from neon.get_prewarm_info()", &[]) .await { Ok(row) => row, @@ -146,7 +146,7 @@ impl ComputeNode { ComputeNode::get_maintenance_client(&self.tokio_conn_conf) .await .context("connecting to postgres")? - .query_one("select prewarm_local_cache($1)", &[&uncompressed]) + .query_one("select neon.prewarm_local_cache($1)", &[&uncompressed]) .await .context("loading LFC state into postgres") .map(|_| ()) @@ -196,7 +196,7 @@ impl ComputeNode { ComputeNode::get_maintenance_client(&self.tokio_conn_conf) .await .context("connecting to postgres")? - .query_one("select get_local_cache_state()", &[]) + .query_one("select neon.get_local_cache_state()", &[]) .await .context("querying LFC state")? .try_get::(0) diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index 3c58b284b3..93a357e160 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -371,9 +371,28 @@ paths: summary: Terminate Postgres and wait for it to exit description: "" operationId: terminate + parameters: + - name: mode + in: query + description: "Terminate mode: fast (wait 30s before returning) and immediate" + required: false + schema: + type: string + enum: ["fast", "immediate"] + default: fast responses: 200: description: Result + content: + application/json: + schema: + $ref: "#/components/schemas/TerminateResponse" + 201: + description: Result if compute is already terminated + content: + application/json: + schema: + $ref: "#/components/schemas/TerminateResponse" 412: description: "wrong state" content: @@ -530,11 +549,14 @@ components: type: string enum: - empty - - init - - failed - - running - configuration_pending + - init + - running - configuration + - failed + - termination_pending_fast + - termination_pending_immediate + - terminated example: running ExtensionInstallRequest: @@ -660,6 +682,17 @@ components: description: Role name. example: "neon" + TerminateResponse: + type: object + required: + - lsn + properties: + lsn: + type: string + nullable: true + description: "last WAL flush LSN" + example: "0/028F10D8" + SetRoleGrantsResponse: type: object required: diff --git a/compute_tools/src/http/routes/terminate.rs b/compute_tools/src/http/routes/terminate.rs index 32d90a5990..5b30b020c8 100644 --- a/compute_tools/src/http/routes/terminate.rs +++ b/compute_tools/src/http/routes/terminate.rs @@ -3,7 +3,7 @@ use crate::http::JsonResponse; use axum::extract::State; use axum::response::Response; use axum_extra::extract::OptionalQuery; -use compute_api::responses::{ComputeStatus, TerminateResponse}; +use compute_api::responses::{ComputeStatus, TerminateMode, TerminateResponse}; use http::StatusCode; use serde::Deserialize; use std::sync::Arc; @@ -12,7 +12,7 @@ use tracing::info; #[derive(Deserialize, Default)] pub struct TerminateQuery { - mode: compute_api::responses::TerminateMode, + mode: TerminateMode, } /// Terminate the compute. @@ -24,16 +24,16 @@ pub(in crate::http) async fn terminate( { let mut state = compute.state.lock().unwrap(); if state.status == ComputeStatus::Terminated { - return JsonResponse::success(StatusCode::CREATED, state.terminate_flush_lsn); + let response = TerminateResponse { + lsn: state.terminate_flush_lsn, + }; + return JsonResponse::success(StatusCode::CREATED, response); } if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) { return JsonResponse::invalid_status(state.status); } - state.set_status( - ComputeStatus::TerminationPending { mode }, - &compute.state_changed, - ); + state.set_status(mode.into(), &compute.state_changed); } forward_termination_signal(false); diff --git a/compute_tools/src/migrations/0002-alter_roles.sql b/compute_tools/src/migrations/0002-alter_roles.sql index 6cb49f873f..8fc371eb8f 100644 --- a/compute_tools/src/migrations/0002-alter_roles.sql +++ b/compute_tools/src/migrations/0002-alter_roles.sql @@ -1,3 +1,16 @@ +-- On December 8th, 2023, an engineering escalation (INC-110) was opened after +-- it was found that BYPASSRLS was being applied to all roles. +-- +-- PR that introduced the issue: https://github.com/neondatabase/neon/pull/5657 +-- Subsequent commit on main: https://github.com/neondatabase/neon/commit/ad99fa5f0393e2679e5323df653c508ffa0ac072 +-- +-- NOBYPASSRLS and INHERIT are the defaults for a Postgres role, but because it +-- isn't easy to know if a Postgres cluster is affected by the issue, we need to +-- keep the migration around for a long time, if not indefinitely, so any +-- cluster can be fixed. +-- +-- Branching is the gift that keeps on giving... + DO $$ DECLARE role_name text; diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index 2fc0f20a75..abd7d58c90 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -85,7 +85,8 @@ impl ComputeMonitor { if matches!( compute_status, ComputeStatus::Terminated - | ComputeStatus::TerminationPending { .. } + | ComputeStatus::TerminationPendingFast + | ComputeStatus::TerminationPendingImmediate | ComputeStatus::Failed ) { info!( diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index c5c63da8e5..dd933fea6c 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -923,7 +923,8 @@ impl Endpoint { ComputeStatus::Empty | ComputeStatus::ConfigurationPending | ComputeStatus::Configuration - | ComputeStatus::TerminationPending { .. } + | ComputeStatus::TerminationPendingFast + | ComputeStatus::TerminationPendingImmediate | ComputeStatus::Terminated => { bail!("unexpected compute status: {:?}", state.status) } diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index e10c381fb4..2fe233214a 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -121,6 +121,15 @@ pub enum TerminateMode { Immediate, } +impl From for ComputeStatus { + fn from(mode: TerminateMode) -> Self { + match mode { + TerminateMode::Fast => ComputeStatus::TerminationPendingFast, + TerminateMode::Immediate => ComputeStatus::TerminationPendingImmediate, + } + } +} + #[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum ComputeStatus { @@ -141,7 +150,9 @@ pub enum ComputeStatus { // control-plane to terminate it. Failed, // Termination requested - TerminationPending { mode: TerminateMode }, + TerminationPendingFast, + // Termination requested, without waiting 30s before returning from /terminate + TerminationPendingImmediate, // Terminated Postgres Terminated, } @@ -160,7 +171,10 @@ impl Display for ComputeStatus { ComputeStatus::Running => f.write_str("running"), ComputeStatus::Configuration => f.write_str("configuration"), ComputeStatus::Failed => f.write_str("failed"), - ComputeStatus::TerminationPending { .. } => f.write_str("termination-pending"), + ComputeStatus::TerminationPendingFast => f.write_str("termination-pending-fast"), + ComputeStatus::TerminationPendingImmediate => { + f.write_str("termination-pending-immediate") + } ComputeStatus::Terminated => f.write_str("terminated"), } } diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index 7c6abf252e..5f856a44d4 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -428,6 +428,12 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState { shard_number: 0, }; + let empty_wal_rate_limiter = crate::bindings::WalRateLimiter { + should_limit: crate::bindings::pg_atomic_uint32 { value: 0 }, + sent_bytes: 0, + last_recorded_time_us: 0, + }; + crate::bindings::WalproposerShmemState { propEpochStartLsn: crate::bindings::pg_atomic_uint64 { value: 0 }, donor_name: [0; 64], @@ -441,6 +447,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState { num_shards: 0, replica_promote: false, min_ps_feedback: empty_feedback, + wal_rate_limiter: empty_wal_rate_limiter, } } diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index af4be23b9b..fe1ddc2e7d 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::error::Error as _; use std::time::Duration; @@ -251,6 +251,70 @@ impl Client { Ok(()) } + pub async fn tenant_timeline_compact( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + force_image_layer_creation: bool, + must_force_image_layer_creation: bool, + scheduled: bool, + wait_until_done: bool, + ) -> Result<()> { + let mut path = reqwest::Url::parse(&format!( + "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/compact", + self.mgmt_api_endpoint + )) + .expect("Cannot build URL"); + + if force_image_layer_creation { + path.query_pairs_mut() + .append_pair("force_image_layer_creation", "true"); + } + + if must_force_image_layer_creation { + path.query_pairs_mut() + .append_pair("must_force_image_layer_creation", "true"); + } + + if scheduled { + path.query_pairs_mut().append_pair("scheduled", "true"); + } + if wait_until_done { + path.query_pairs_mut() + .append_pair("wait_until_scheduled_compaction_done", "true"); + path.query_pairs_mut() + .append_pair("wait_until_uploaded", "true"); + } + self.request(Method::PUT, path, ()).await?; + Ok(()) + } + + /* BEGIN_HADRON */ + pub async fn tenant_timeline_describe( + &self, + tenant_shard_id: &TenantShardId, + timeline_id: &TimelineId, + ) -> Result { + let mut path = reqwest::Url::parse(&format!( + "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}", + self.mgmt_api_endpoint + )) + .expect("Cannot build URL"); + path.query_pairs_mut() + .append_pair("include-image-consistent-lsn", "true"); + + let response: reqwest::Response = self.request(Method::GET, path, ()).await?; + let body = response.json().await.map_err(Error::ReceiveBody)?; + Ok(body) + } + + pub async fn list_tenant_visible_size(&self) -> Result> { + let uri = format!("{}/v1/list_tenant_visible_size", self.mgmt_api_endpoint); + let resp = self.get(&uri).await?; + resp.json().await.map_err(Error::ReceiveBody) + } + /* END_HADRON */ + pub async fn tenant_scan_remote_storage( &self, tenant_id: TenantId, diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index ca224900ac..e2741ad839 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -9,6 +9,7 @@ testing = ["pageserver_api/testing"] [dependencies] anyhow.workspace = true +arc-swap.workspace = true bytes.workspace = true compute_api.workspace = true futures.workspace = true diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 63852868c3..7049fbdb96 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -3,8 +3,10 @@ use std::num::NonZero; use std::sync::Arc; use anyhow::anyhow; +use arc_swap::ArcSwap; use futures::stream::FuturesUnordered; use futures::{FutureExt as _, StreamExt as _}; +use tonic::codec::CompressionEncoding; use tracing::instrument; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; @@ -55,28 +57,85 @@ const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); /// TODO: this client does not support base backups or LSN leases, as these are only used by /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards. pub struct PageserverClient { - // TODO: support swapping out the shard map, e.g. via an ArcSwap. - shards: Shards, + /// The tenant ID. + tenant_id: TenantId, + /// The timeline ID. + timeline_id: TimelineId, + /// The JWT auth token for this tenant, if any. + auth_token: Option, + /// The compression to use, if any. + compression: Option, + /// The shards for this tenant. + shards: ArcSwap, + /// The retry configuration. retry: Retry, } impl PageserverClient { /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given - /// in the shard map, which must be complete and must use gRPC URLs. + /// in the shard spec, which must be complete and must use gRPC URLs. pub fn new( tenant_id: TenantId, timeline_id: TimelineId, - shard_map: HashMap, - stripe_size: ShardStripeSize, + shard_spec: ShardSpec, auth_token: Option, + compression: Option, ) -> anyhow::Result { - let shards = Shards::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)?; + let shards = Shards::new( + tenant_id, + timeline_id, + shard_spec, + auth_token.clone(), + compression, + )?; Ok(Self { - shards, + tenant_id, + timeline_id, + auth_token, + compression, + shards: ArcSwap::new(Arc::new(shards)), retry: Retry, }) } + /// Updates the shards from the given shard spec. In-flight requests will complete using the + /// existing shards, but may retry with the new shards if they fail. + /// + /// TODO: verify that in-flight requests are allowed to complete, and that the old pools are + /// properly spun down and dropped afterwards. + pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> { + // Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races + // with concurrent updates, but that involves creating a new `Shards` on every attempt, + // which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere + // in the stack, and if they're violated then we already have problems elsewhere, so a + // best-effort but possibly-racy check is okay here. + let old = self.shards.load_full(); + if shard_spec.count < old.count { + return Err(anyhow!( + "can't reduce shard count from {} to {}", + old.count, + shard_spec.count + )); + } + if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size { + return Err(anyhow!( + "can't change stripe size from {} to {}", + old.stripe_size, + shard_spec.stripe_size + )); + } + + let shards = Shards::new( + self.tenant_id, + self.timeline_id, + shard_spec, + self.auth_token.clone(), + self.compression, + )?; + self.shards.store(Arc::new(shards)); + Ok(()) + } + /// Returns whether a relation exists. #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))] pub async fn check_rel_exists( @@ -84,9 +143,9 @@ impl PageserverClient { req: page_api::CheckRelExistsRequest, ) -> tonic::Result { self.retry - .with(async || { + .with(async |_| { // Relation metadata is only available on shard 0. - let mut client = self.shards.get_zero().client().await?; + let mut client = self.shards.load_full().get_zero().client().await?; client.check_rel_exists(req).await }) .await @@ -99,16 +158,17 @@ impl PageserverClient { req: page_api::GetDbSizeRequest, ) -> tonic::Result { self.retry - .with(async || { + .with(async |_| { // Relation metadata is only available on shard 0. - let mut client = self.shards.get_zero().client().await?; + let mut client = self.shards.load_full().get_zero().client().await?; client.get_db_size(req).await }) .await } - /// Fetches pages. The `request_id` must be unique across all in-flight requests. Automatically - /// splits requests that straddle shard boundaries, and assembles the responses. + /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the + /// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle + /// shard boundaries, and assembles the responses. /// /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status` /// errors. All responses will have `GetPageStatusCode::Ok`. @@ -128,72 +188,96 @@ impl PageserverClient { if req.block_numbers.is_empty() { return Err(tonic::Status::invalid_argument("no block number")); } + // The request attempt must be 0. The client will increment it internally. + if req.request_id.attempt != 0 { + return Err(tonic::Status::invalid_argument("request attempt must be 0")); + } + // The shards may change while we're fetching pages. We execute the request using a stable + // view of the shards (especially important for requests that span shards), but retry the + // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary + // retries and re-splits in some cases where requests span shards, but these are expected to + // be rare. + // + // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this + // once we figure out how to handle these. + self.retry + .with(async |attempt| { + let mut req = req.clone(); + req.request_id.attempt = attempt as u32; + Self::get_page_with_shards(req, &self.shards.load_full()).await + }) + .await + } + + /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of + /// concurrent shard updates. Does not retry internally, but is retried by `get_page()`. + async fn get_page_with_shards( + req: page_api::GetPageRequest, + shards: &Shards, + ) -> tonic::Result { // Fast path: request is for a single shard. if let Some(shard_id) = - GetPageSplitter::is_single_shard(&req, self.shards.count, self.shards.stripe_size) + GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size) { - return self.get_page_for_shard(shard_id, req).await; + return Self::get_page_with_shard(req, shards.get(shard_id)?).await; } // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and // reassemble the responses. - // - // TODO: when we support shard map updates, we need to detect when it changes and re-split - // the request on errors. - let mut splitter = GetPageSplitter::split(req, self.shards.count, self.shards.stripe_size); + let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size); - let mut shard_requests: FuturesUnordered<_> = splitter - .drain_requests() - .map(|(shard_id, shard_req)| { - // NB: each request will retry internally. - self.get_page_for_shard(shard_id, shard_req) - .map(move |result| result.map(|resp| (shard_id, resp))) - }) - .collect(); + let mut shard_requests = FuturesUnordered::new(); + for (shard_id, shard_req) in splitter.drain_requests() { + let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?) + .map(move |result| result.map(|resp| (shard_id, resp))); + shard_requests.push(future); + } while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? { splitter.add_response(shard_id, shard_response)?; } - splitter.assemble_response() + splitter.get_response() } - /// Fetches pages that belong to the given shard. - #[instrument(skip_all, fields(shard = %shard_id))] - async fn get_page_for_shard( - &self, - shard_id: ShardIndex, + /// Fetches pages on the given shard. Does not retry internally. + async fn get_page_with_shard( req: page_api::GetPageRequest, + shard: &Shard, ) -> tonic::Result { - let resp = self - .retry - .with(async || { - let stream = self - .shards - .get(shard_id)? - .stream(req.request_class.is_bulk()) - .await; - let resp = stream.send(req.clone()).await?; + let stream = shard.stream(req.request_class.is_bulk()).await; + let resp = stream.send(req.clone()).await?; - // Convert per-request errors into a tonic::Status. - if resp.status_code != page_api::GetPageStatusCode::Ok { - return Err(tonic::Status::new( - resp.status_code.into(), - resp.reason.unwrap_or_else(|| String::from("unknown error")), - )); - } + // Convert per-request errors into a tonic::Status. + if resp.status_code != page_api::GetPageStatusCode::Ok { + return Err(tonic::Status::new( + resp.status_code.into(), + resp.reason.unwrap_or_else(|| String::from("unknown error")), + )); + } - Ok(resp) - }) - .await?; - - // Make sure we got the right number of pages. - // NB: check outside of the retry loop, since we don't want to retry this. - let (expected, actual) = (req.block_numbers.len(), resp.page_images.len()); - if expected != actual { + // Check that we received the expected pages. + if req.rel != resp.rel { return Err(tonic::Status::internal(format!( - "expected {expected} pages for shard {shard_id}, got {actual}", + "shard {} returned wrong relation, expected {} got {}", + shard.id, req.rel, resp.rel + ))); + } + if !req + .block_numbers + .iter() + .copied() + .eq(resp.pages.iter().map(|p| p.block_number)) + { + return Err(tonic::Status::internal(format!( + "shard {} returned wrong pages, expected {:?} got {:?}", + shard.id, + req.block_numbers, + resp.pages + .iter() + .map(|page| page.block_number) + .collect::>() ))); } @@ -207,9 +291,9 @@ impl PageserverClient { req: page_api::GetRelSizeRequest, ) -> tonic::Result { self.retry - .with(async || { + .with(async |_| { // Relation metadata is only available on shard 0. - let mut client = self.shards.get_zero().client().await?; + let mut client = self.shards.load_full().get_zero().client().await?; client.get_rel_size(req).await }) .await @@ -222,50 +306,53 @@ impl PageserverClient { req: page_api::GetSlruSegmentRequest, ) -> tonic::Result { self.retry - .with(async || { + .with(async |_| { // SLRU segments are only available on shard 0. - let mut client = self.shards.get_zero().client().await?; + let mut client = self.shards.load_full().get_zero().client().await?; client.get_slru_segment(req).await }) .await } } -/// Tracks the tenant's shards. -struct Shards { +/// Shard specification for a PageserverClient. +pub struct ShardSpec { + /// Maps shard indices to gRPC URLs. + /// + /// INVARIANT: every shard 0..count is present, and shard 0 is always present. + /// INVARIANT: every URL is valid and uses grpc:// scheme. + urls: HashMap, /// The shard count. /// /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention. count: ShardCount, - /// The stripe size. Only used for sharded tenants. + /// The stripe size for these shards. stripe_size: ShardStripeSize, - /// Shards by shard index. - /// - /// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`. - /// - /// INVARIANT: every shard 0..count is present. - /// INVARIANT: shard 0 is always present. - map: HashMap, } -impl Shards { - /// Creates a new set of shards based on a shard map. - fn new( - tenant_id: TenantId, - timeline_id: TimelineId, - shard_map: HashMap, - stripe_size: ShardStripeSize, - auth_token: Option, +impl ShardSpec { + /// Creates a new shard spec with the given URLs and stripe size. All shards must be given. + /// The stripe size may be omitted for unsharded tenants. + pub fn new( + urls: HashMap, + stripe_size: Option, ) -> anyhow::Result { - let count = match shard_map.len() { + // Compute the shard count. + let count = match urls.len() { 0 => return Err(anyhow!("no shards provided")), 1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()` n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")), n => ShardCount::new(n as u8), }; - let mut map = HashMap::new(); - for (shard_id, url) in shard_map { + // Determine the stripe size. It doesn't matter for unsharded tenants. + if stripe_size.is_none() && !count.is_unsharded() { + return Err(anyhow!("stripe size must be given for sharded tenants")); + } + let stripe_size = stripe_size.unwrap_or_default(); + + // Validate the shard spec. + for (shard_id, url) in &urls { // The shard index must match the computed shard count, even for unsharded tenants. if shard_id.shard_count != count { return Err(anyhow!("invalid shard index {shard_id}, expected {count}")); @@ -276,21 +363,72 @@ impl Shards { } // The above conditions guarantee that we have all shards 0..count: len() matches count, // shard number < count, and numbers are unique (via hashmap). - let shard = Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?; - map.insert(shard_id, shard); + + // Validate the URL. + if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc { + return Err(anyhow!("invalid shard URL {url}: must use gRPC")); + } } Ok(Self { + urls, count, stripe_size, - map, + }) + } +} + +/// Tracks the tenant's shards. +struct Shards { + /// Shards by shard index. + /// + /// INVARIANT: every shard 0..count is present. + /// INVARIANT: shard 0 is always present. + by_index: HashMap, + /// The shard count. + /// + /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention. + count: ShardCount, + /// The stripe size. Only used for sharded tenants. + stripe_size: ShardStripeSize, +} + +impl Shards { + /// Creates a new set of shards based on a shard spec. + fn new( + tenant_id: TenantId, + timeline_id: TimelineId, + shard_spec: ShardSpec, + auth_token: Option, + compression: Option, + ) -> anyhow::Result { + // NB: the shard spec has already been validated when constructed. + let mut shards = HashMap::with_capacity(shard_spec.urls.len()); + for (shard_id, url) in shard_spec.urls { + shards.insert( + shard_id, + Shard::new( + url, + tenant_id, + timeline_id, + shard_id, + auth_token.clone(), + compression, + )?, + ); + } + + Ok(Self { + by_index: shards, + count: shard_spec.count, + stripe_size: shard_spec.stripe_size, }) } /// Looks up the given shard. #[allow(clippy::result_large_err)] // TODO: check perf impact fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> { - self.map + self.by_index .get(&shard_id) .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}"))) } @@ -312,6 +450,8 @@ impl Shards { /// * Bulk client pool: unbounded. /// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH. struct Shard { + /// The shard ID. + id: ShardIndex, /// Unary gRPC client pool. client_pool: Arc, /// GetPage stream pool. @@ -328,12 +468,8 @@ impl Shard { timeline_id: TimelineId, shard_id: ShardIndex, auth_token: Option, + compression: Option, ) -> anyhow::Result { - // Sanity-check that the URL uses gRPC. - if PageserverProtocol::from_connstring(&url)? != PageserverProtocol::Grpc { - return Err(anyhow!("invalid shard URL {url}: must use gRPC")); - } - // Common channel pool for unary and stream requests. Bounded by client/stream pools. let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?; @@ -344,6 +480,7 @@ impl Shard { timeline_id, shard_id, auth_token.clone(), + compression, Some(MAX_UNARY_CLIENTS), ); @@ -356,6 +493,7 @@ impl Shard { timeline_id, shard_id, auth_token.clone(), + compression, None, // unbounded, limited by stream pool ), Some(MAX_STREAMS), @@ -371,6 +509,7 @@ impl Shard { timeline_id, shard_id, auth_token, + compression, None, // unbounded, limited by stream pool ), Some(MAX_BULK_STREAMS), @@ -378,6 +517,7 @@ impl Shard { ); Ok(Self { + id: shard_id, client_pool, stream_pool, bulk_stream_pool, diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 3fc7178be2..14fb3fbd5a 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -3,4 +3,4 @@ mod pool; mod retry; mod split; -pub use client::PageserverClient; +pub use client::{PageserverClient, ShardSpec}; diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 89b3bd646f..906872e091 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -40,6 +40,7 @@ use futures::StreamExt as _; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; use tokio_util::sync::CancellationToken; +use tonic::codec::CompressionEncoding; use tonic::transport::{Channel, Endpoint}; use tracing::{error, warn}; @@ -242,6 +243,8 @@ pub struct ClientPool { shard_id: ShardIndex, /// Authentication token, if any. auth_token: Option, + /// Compression to use. + compression: Option, /// Channel pool to acquire channels from. channel_pool: Arc, /// Limits the max number of concurrent clients for this pool. None if the pool is unbounded. @@ -281,6 +284,7 @@ impl ClientPool { timeline_id: TimelineId, shard_id: ShardIndex, auth_token: Option, + compression: Option, max_clients: Option>, ) -> Arc { let pool = Arc::new(Self { @@ -288,6 +292,7 @@ impl ClientPool { timeline_id, shard_id, auth_token, + compression, channel_pool, idle: Mutex::default(), idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), @@ -331,7 +336,7 @@ impl ClientPool { self.timeline_id, self.shard_id, self.auth_token.clone(), - None, + self.compression, )?; Ok(ClientGuard { @@ -586,6 +591,10 @@ impl StreamPool { // Track caller response channels by request ID. If the task returns early, these response // channels will be dropped and the waiting callers will receive an error. + // + // NB: this will leak entries if the server doesn't respond to a request (by request ID). + // It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and + // block further use. But we could consider reaping closed channels after some time. let mut callers = HashMap::new(); // Process requests and responses. @@ -690,6 +699,15 @@ impl Drop for StreamGuard { // Release the queue depth reservation on drop. This can prematurely decrement it if dropped // before the response is received, but that's okay. + // + // TODO: actually, it's probably not okay. Queue depth release should be moved into the + // stream task, such that it continues to account for the queue depth slot until the server + // responds. Otherwise, if a slow request times out and keeps blocking the stream, the + // server will keep waiting on it and we can pile on subsequent requests (including the + // timeout retry) in the same stream and get blocked. But we may also want to avoid blocking + // requests on e.g. LSN waits and layer downloads, instead returning early to free up the + // stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line + // blocking. TBD. let mut streams = pool.streams.lock().unwrap(); let entry = streams.get_mut(&self.id).expect("unknown stream"); assert!(entry.idle_since.is_none(), "active stream marked idle"); diff --git a/pageserver/client_grpc/src/retry.rs b/pageserver/client_grpc/src/retry.rs index b0473204d7..a1e0b8636f 100644 --- a/pageserver/client_grpc/src/retry.rs +++ b/pageserver/client_grpc/src/retry.rs @@ -23,14 +23,14 @@ impl Retry { /// If true, log successful requests. For debugging. const LOG_SUCCESS: bool = false; - /// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors, - /// using the current tracing span for context. + /// Runs the given async closure with timeouts and retries (exponential backoff), passing the + /// attempt number starting at 0. Logs errors, using the current tracing span for context. /// /// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default /// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`]. pub async fn with(&self, mut f: F) -> tonic::Result where - F: FnMut() -> O, + F: FnMut(usize) -> O, // takes attempt number, starting at 0 O: Future>, { let started = Instant::now(); @@ -47,7 +47,7 @@ impl Retry { } let request_started = Instant::now(); - tokio::time::timeout(Self::REQUEST_TIMEOUT, f()) + tokio::time::timeout(Self::REQUEST_TIMEOUT, f(retries)) .await .map_err(|_| { tonic::Status::deadline_exceeded(format!( @@ -131,7 +131,6 @@ impl Retry { tonic::Code::Aborted => true, tonic::Code::Cancelled => true, tonic::Code::DeadlineExceeded => true, // maybe transient slowness - tonic::Code::Internal => true, // maybe transient failure? tonic::Code::ResourceExhausted => true, tonic::Code::Unavailable => true, @@ -139,6 +138,10 @@ impl Retry { tonic::Code::AlreadyExists => false, tonic::Code::DataLoss => false, tonic::Code::FailedPrecondition => false, + // NB: don't retry Internal. It is intended for serious errors such as invariant + // violations, and is also used for client-side invariant checks that would otherwise + // result in retry loops. + tonic::Code::Internal => false, tonic::Code::InvalidArgument => false, tonic::Code::NotFound => false, tonic::Code::OutOfRange => false, diff --git a/pageserver/client_grpc/src/split.rs b/pageserver/client_grpc/src/split.rs index 5bbcaab393..b7539b900c 100644 --- a/pageserver/client_grpc/src/split.rs +++ b/pageserver/client_grpc/src/split.rs @@ -5,27 +5,24 @@ use bytes::Bytes; use pageserver_api::key::rel_block_to_key; use pageserver_api::shard::{ShardStripeSize, key_to_shard_number}; use pageserver_page_api as page_api; -use utils::shard::{ShardCount, ShardIndex}; +use utils::shard::{ShardCount, ShardIndex, ShardNumber}; /// Splits GetPageRequests that straddle shard boundaries and assembles the responses. /// TODO: add tests for this. pub struct GetPageSplitter { - /// The original request ID. Used for all shard requests. - request_id: page_api::RequestID, /// Split requests by shard index. requests: HashMap, - /// Maps the offset in `GetPageRequest::block_numbers` to the owning shard. Used to assemble - /// the response pages in the same order as the original request. + /// The response being assembled. Preallocated with empty pages, to be filled in. + response: page_api::GetPageResponse, + /// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used + /// to assemble the response pages in the same order as the original request. block_shards: Vec, - /// Page responses by shard index. Will be assembled into a single response. - responses: HashMap>, } impl GetPageSplitter { /// Checks if the given request only touches a single shard, and returns the shard ID. This is /// the common case, so we check first in order to avoid unnecessary allocations and overhead. - /// The caller must ensure that the request has at least one block number, or this will panic. - pub fn is_single_shard( + pub fn for_single_shard( req: &page_api::GetPageRequest, count: ShardCount, stripe_size: ShardStripeSize, @@ -35,8 +32,12 @@ impl GetPageSplitter { return Some(ShardIndex::unsharded()); } - // Find the base shard index for the first page, and compare with the rest. - let key = rel_block_to_key(req.rel, *req.block_numbers.first().expect("no pages")); + // Find the first page's shard, for comparison. If there are no pages, just return the first + // shard (caller likely checked already, otherwise the server will reject it). + let Some(&first_page) = req.block_numbers.first() else { + return Some(ShardIndex::new(ShardNumber(0), count)); + }; + let key = rel_block_to_key(req.rel, first_page); let shard_number = key_to_shard_number(count, stripe_size, &key); req.block_numbers @@ -57,19 +58,19 @@ impl GetPageSplitter { ) -> Self { // The caller should make sure we don't split requests unnecessarily. debug_assert!( - Self::is_single_shard(&req, count, stripe_size).is_none(), + Self::for_single_shard(&req, count, stripe_size).is_none(), "unnecessary request split" ); // Split the requests by shard index. let mut requests = HashMap::with_capacity(2); // common case let mut block_shards = Vec::with_capacity(req.block_numbers.len()); - for blkno in req.block_numbers { + for &blkno in &req.block_numbers { let key = rel_block_to_key(req.rel, blkno); let shard_number = key_to_shard_number(count, stripe_size, &key); let shard_id = ShardIndex::new(shard_number, count); - let shard_req = requests + requests .entry(shard_id) .or_insert_with(|| page_api::GetPageRequest { request_id: req.request_id, @@ -77,27 +78,47 @@ impl GetPageSplitter { rel: req.rel, read_lsn: req.read_lsn, block_numbers: Vec::new(), - }); - shard_req.block_numbers.push(blkno); + }) + .block_numbers + .push(blkno); block_shards.push(shard_id); } - Self { + // Construct a response to be populated by shard responses. Preallocate empty page slots + // with the expected block numbers. + let response = page_api::GetPageResponse { request_id: req.request_id, - responses: HashMap::with_capacity(requests.len()), + status_code: page_api::GetPageStatusCode::Ok, + reason: None, + rel: req.rel, + pages: req + .block_numbers + .into_iter() + .map(|block_number| { + page_api::Page { + block_number, + image: Bytes::new(), // empty page slot to be filled in + } + }) + .collect(), + }; + + Self { requests, + response, block_shards, } } - /// Drains the per-shard requests, moving them out of the hashmap to avoid extra allocations. + /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations. pub fn drain_requests( &mut self, ) -> impl Iterator { self.requests.drain() } - /// Adds a response from the given shard. + /// Adds a response from the given shard. The response must match the request ID and have an OK + /// status code. A response must not already exist for the given shard ID. #[allow(clippy::result_large_err)] pub fn add_response( &mut self, @@ -105,68 +126,84 @@ impl GetPageSplitter { response: page_api::GetPageResponse, ) -> tonic::Result<()> { // The caller should already have converted status codes into tonic::Status. - assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok); - - // Make sure the response matches the request ID. - if response.request_id != self.request_id { + if response.status_code != page_api::GetPageStatusCode::Ok { return Err(tonic::Status::internal(format!( - "response ID {} does not match request ID {}", - response.request_id, self.request_id + "unexpected non-OK response for shard {shard_id}: {} {}", + response.status_code, + response.reason.unwrap_or_default() ))); } - // Add the response data to the map. - let old = self.responses.insert(shard_id, response.page_images); - - if old.is_some() { + if response.request_id != self.response.request_id { return Err(tonic::Status::internal(format!( - "duplicate response for shard {shard_id}", + "response ID mismatch for shard {shard_id}: expected {}, got {}", + self.response.request_id, response.request_id + ))); + } + + // Place the shard response pages into the assembled response, in request order. + let mut pages = response.pages.into_iter(); + + for (i, &s) in self.block_shards.iter().enumerate() { + if shard_id != s { + continue; + } + + let Some(slot) = self.response.pages.get_mut(i) else { + return Err(tonic::Status::internal(format!( + "no block_shards slot {i} for shard {shard_id}" + ))); + }; + let Some(page) = pages.next() else { + return Err(tonic::Status::internal(format!( + "missing page {} in shard {shard_id} response", + slot.block_number + ))); + }; + if page.block_number != slot.block_number { + return Err(tonic::Status::internal(format!( + "shard {shard_id} returned wrong page at index {i}, expected {} got {}", + slot.block_number, page.block_number + ))); + } + if !slot.image.is_empty() { + return Err(tonic::Status::internal(format!( + "shard {shard_id} returned duplicate page {} at index {i}", + slot.block_number + ))); + } + + *slot = page; + } + + // Make sure we've consumed all pages from the shard response. + if let Some(extra_page) = pages.next() { + return Err(tonic::Status::internal(format!( + "shard {shard_id} returned extra page: {}", + extra_page.block_number ))); } Ok(()) } - /// Assembles the shard responses into a single response. Responses must be present for all - /// relevant shards, and the total number of pages must match the original request. + /// Fetches the final, assembled response. #[allow(clippy::result_large_err)] - pub fn assemble_response(self) -> tonic::Result { - let mut response = page_api::GetPageResponse { - request_id: self.request_id, - status_code: page_api::GetPageStatusCode::Ok, - reason: None, - page_images: Vec::with_capacity(self.block_shards.len()), - }; - - // Set up per-shard page iterators we can pull from. - let mut shard_responses = HashMap::with_capacity(self.responses.len()); - for (shard_id, responses) in self.responses { - shard_responses.insert(shard_id, responses.into_iter()); - } - - // Reassemble the responses in the same order as the original request. - for shard_id in &self.block_shards { - let page = shard_responses - .get_mut(shard_id) - .ok_or_else(|| { - tonic::Status::internal(format!("missing response for shard {shard_id}")) - })? - .next() - .ok_or_else(|| { - tonic::Status::internal(format!("missing page from shard {shard_id}")) - })?; - response.page_images.push(page); - } - - // Make sure there are no additional pages. - for (shard_id, mut pages) in shard_responses { - if pages.next().is_some() { + pub fn get_response(self) -> tonic::Result { + // Check that the response is complete. + for (i, page) in self.response.pages.iter().enumerate() { + if page.image.is_empty() { return Err(tonic::Status::internal(format!( - "extra pages returned from shard {shard_id}" + "missing page {} for shard {}", + page.block_number, + self.block_shards + .get(i) + .map(|s| s.to_string()) + .unwrap_or_else(|| "?".to_string()) ))); } } - Ok(response) + Ok(self.response) } } diff --git a/pageserver/ctl/Cargo.toml b/pageserver/ctl/Cargo.toml index 7b70f0dc87..ba34fa1f69 100644 --- a/pageserver/ctl/Cargo.toml +++ b/pageserver/ctl/Cargo.toml @@ -17,6 +17,7 @@ pageserver = { path = ".." } pageserver_api.workspace = true remote_storage = { path = "../../libs/remote_storage" } postgres_ffi.workspace = true +serde.workspace = true thiserror.workspace = true tokio.workspace = true tokio-util.workspace = true diff --git a/pageserver/ctl/src/download_remote_object.rs b/pageserver/ctl/src/download_remote_object.rs new file mode 100644 index 0000000000..aa09774701 --- /dev/null +++ b/pageserver/ctl/src/download_remote_object.rs @@ -0,0 +1,85 @@ +use camino::Utf8PathBuf; +use clap::Parser; +use tokio_util::sync::CancellationToken; + +/// Download a specific object from remote storage to a local file. +/// +/// The remote storage configuration is supplied via the `REMOTE_STORAGE_CONFIG` environment +/// variable, in the same TOML format that the pageserver itself understands. This allows the +/// command to work with any cloud supported by the `remote_storage` crate (currently AWS S3, +/// Azure Blob Storage and local files), as long as the credentials are available via the +/// standard environment variables expected by the underlying SDKs. +/// +/// Examples for setting the environment variable: +/// +/// ```bash +/// # AWS S3 (region can also be provided via AWS_REGION) +/// export REMOTE_STORAGE_CONFIG='remote_storage = { bucket_name = "my-bucket", bucket_region = "us-east-2" }' +/// +/// # Azure Blob Storage (account key picked up from AZURE_STORAGE_ACCOUNT_KEY) +/// export REMOTE_STORAGE_CONFIG='remote_storage = { container = "my-container", account = "my-account" }' +/// ``` +#[derive(Parser)] +pub(crate) struct DownloadRemoteObjectCmd { + /// Key / path of the object to download (relative to the remote storage prefix). + /// + /// Examples: + /// "wal/3aa8f.../00000001000000000000000A" + /// "pageserver/v1/tenants//timelines//layer_12345" + pub remote_path: String, + + /// Path of the local file to create. Existing file will be overwritten. + /// + /// Examples: + /// "./segment" + /// "/tmp/layer_12345.parquet" + pub output_file: Utf8PathBuf, +} + +pub(crate) async fn main(cmd: &DownloadRemoteObjectCmd) -> anyhow::Result<()> { + use remote_storage::{DownloadOpts, GenericRemoteStorage, RemotePath, RemoteStorageConfig}; + + // Fetch remote storage configuration from the environment + let config_str = std::env::var("REMOTE_STORAGE_CONFIG").map_err(|_| { + anyhow::anyhow!( + "'REMOTE_STORAGE_CONFIG' environment variable must be set to a valid remote storage TOML config" + ) + })?; + + let config = RemoteStorageConfig::from_toml_str(&config_str)?; + + // Initialise remote storage client + let storage = GenericRemoteStorage::from_config(&config).await?; + + // RemotePath must be relative – leading slashes confuse the parser. + let remote_path_str = cmd.remote_path.trim_start_matches('/'); + let remote_path = RemotePath::from_string(remote_path_str)?; + + let cancel = CancellationToken::new(); + + println!( + "Downloading '{remote_path}' from remote storage bucket {:?} ...", + config.storage.bucket_name() + ); + + // Start the actual download + let download = storage + .download(&remote_path, &DownloadOpts::default(), &cancel) + .await?; + + // Stream to file + let mut reader = tokio_util::io::StreamReader::new(download.download_stream); + let tmp_path = cmd.output_file.with_extension("tmp"); + let mut file = tokio::fs::File::create(&tmp_path).await?; + tokio::io::copy(&mut reader, &mut file).await?; + file.sync_all().await?; + // Atomically move into place + tokio::fs::rename(&tmp_path, &cmd.output_file).await?; + + println!( + "Downloaded to '{}'. Last modified: {:?}, etag: {}", + cmd.output_file, download.last_modified, download.etag + ); + + Ok(()) +} diff --git a/pageserver/ctl/src/index_part.rs b/pageserver/ctl/src/index_part.rs index 838d00e490..9801f3c9dc 100644 --- a/pageserver/ctl/src/index_part.rs +++ b/pageserver/ctl/src/index_part.rs @@ -1,14 +1,16 @@ use std::str::FromStr; -use anyhow::Context; +use anyhow::{Context, Ok}; use camino::Utf8PathBuf; use pageserver::tenant::{ IndexPart, layer_map::{LayerMap, SearchResult}, - remote_timeline_client::remote_layer_path, - storage_layer::{PersistentLayerDesc, ReadableLayerWeak}, + remote_timeline_client::{index::LayerFileMetadata, remote_layer_path}, + storage_layer::{LayerName, LayerVisibilityHint, PersistentLayerDesc, ReadableLayerWeak}, }; use pageserver_api::key::Key; +use serde::Serialize; +use std::collections::BTreeMap; use utils::{ id::{TenantId, TimelineId}, lsn::Lsn, @@ -33,6 +35,31 @@ pub(crate) enum IndexPartCmd { #[arg(long)] lsn: String, }, + /// List all visible delta and image layers at the latest LSN. + ListVisibleLayers { + #[arg(long)] + path: Utf8PathBuf, + }, +} + +fn create_layer_map_from_index_part( + index_part: &IndexPart, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, +) -> LayerMap { + let mut layer_map = LayerMap::default(); + { + let mut updates = layer_map.batch_update(); + for (key, value) in index_part.layer_metadata.iter() { + updates.insert_historic(PersistentLayerDesc::from_filename( + tenant_shard_id, + timeline_id, + key.clone(), + value.file_size, + )); + } + } + layer_map } async fn search_layers( @@ -49,18 +76,7 @@ async fn search_layers( let bytes = tokio::fs::read(path).await?; IndexPart::from_json_bytes(&bytes).unwrap() }; - let mut layer_map = LayerMap::default(); - { - let mut updates = layer_map.batch_update(); - for (key, value) in index_json.layer_metadata.iter() { - updates.insert_historic(PersistentLayerDesc::from_filename( - tenant_shard_id, - timeline_id, - key.clone(), - value.file_size, - )); - } - } + let layer_map = create_layer_map_from_index_part(&index_json, tenant_shard_id, timeline_id); let key = Key::from_hex(key)?; let lsn = Lsn::from_str(lsn).unwrap(); @@ -98,6 +114,69 @@ async fn search_layers( Ok(()) } +#[derive(Debug, Clone, Serialize)] +struct VisibleLayers { + pub total_images: u64, + pub total_image_bytes: u64, + pub total_deltas: u64, + pub total_delta_bytes: u64, + pub layer_metadata: BTreeMap, +} + +impl VisibleLayers { + pub fn new() -> Self { + Self { + layer_metadata: BTreeMap::new(), + total_images: 0, + total_image_bytes: 0, + total_deltas: 0, + total_delta_bytes: 0, + } + } + + pub fn add_layer(&mut self, name: LayerName, layer: LayerFileMetadata) { + match name { + LayerName::Image(_) => { + self.total_images += 1; + self.total_image_bytes += layer.file_size; + } + LayerName::Delta(_) => { + self.total_deltas += 1; + self.total_delta_bytes += layer.file_size; + } + } + self.layer_metadata.insert(name, layer); + } +} + +async fn list_visible_layers(path: &Utf8PathBuf) -> anyhow::Result<()> { + let tenant_id = TenantId::generate(); + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + let timeline_id = TimelineId::generate(); + + let bytes = tokio::fs::read(path).await.context("read file")?; + let index_part = IndexPart::from_json_bytes(&bytes).context("deserialize")?; + let layer_map = create_layer_map_from_index_part(&index_part, tenant_shard_id, timeline_id); + let mut visible_layers = VisibleLayers::new(); + let (layers, _key_space) = layer_map.get_visibility(Vec::new()); + for (layer, visibility) in layers { + if visibility == LayerVisibilityHint::Visible { + visible_layers.add_layer( + layer.layer_name(), + index_part + .layer_metadata + .get(&layer.layer_name()) + .unwrap() + .clone(), + ); + } + } + let output = serde_json::to_string_pretty(&visible_layers).context("serialize output")?; + println!("{output}"); + + Ok(()) +} + pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> { match cmd { IndexPartCmd::Dump { path } => { @@ -114,5 +193,6 @@ pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> { key, lsn, } => search_layers(tenant_id, timeline_id, path, key, lsn).await, + IndexPartCmd::ListVisibleLayers { path } => list_visible_layers(path).await, } } diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 3cd4faaf2e..e84ad2c87f 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -4,6 +4,7 @@ //! //! Separate, `metadata` subcommand allows to print and update pageserver's metadata file. +mod download_remote_object; mod draw_timeline_dir; mod index_part; mod key; @@ -16,6 +17,7 @@ use std::time::{Duration, SystemTime}; use camino::{Utf8Path, Utf8PathBuf}; use clap::{Parser, Subcommand}; +use download_remote_object::DownloadRemoteObjectCmd; use index_part::IndexPartCmd; use layers::LayerCmd; use page_trace::PageTraceCmd; @@ -63,6 +65,7 @@ enum Commands { /// Debug print a hex key found from logs Key(key::DescribeKeyCommand), PageTrace(PageTraceCmd), + DownloadRemoteObject(DownloadRemoteObjectCmd), } /// Read and update pageserver metadata file @@ -185,6 +188,9 @@ async fn main() -> anyhow::Result<()> { } Commands::Key(dkc) => dkc.execute(), Commands::PageTrace(cmd) => page_trace::main(&cmd)?, + Commands::DownloadRemoteObject(cmd) => { + download_remote_object::main(&cmd).await?; + } }; Ok(()) } diff --git a/pageserver/page_api/proto/page_service.proto b/pageserver/page_api/proto/page_service.proto index fc20eef503..e39c214231 100644 --- a/pageserver/page_api/proto/page_service.proto +++ b/pageserver/page_api/proto/page_service.proto @@ -165,7 +165,7 @@ message GetDbSizeResponse { message GetPageRequest { // A request ID. Will be included in the response. Should be unique for // in-flight requests on the stream. - uint64 request_id = 1; + RequestID request_id = 1; // The request class. GetPageClass request_class = 2; // The LSN to read at. @@ -189,6 +189,14 @@ message GetPageRequest { repeated uint32 block_number = 5; } +// A Request ID. Should be unique for in-flight requests on a stream. Included in the response. +message RequestID { + // The base request ID. + uint64 id = 1; + // The request attempt. Starts at 0, incremented on each retry. + uint32 attempt = 2; +} + // A GetPageRequest class. Primarily intended for observability, but may also be // used for prioritization in the future. enum GetPageClass { @@ -211,13 +219,26 @@ enum GetPageClass { // the entire batch is ready, so no one can make use of the individual pages. message GetPageResponse { // The original request's ID. - uint64 request_id = 1; - // The response status code. + RequestID request_id = 1; + // The response status code. If not OK, the rel and page fields will be empty. GetPageStatusCode status_code = 2; // A string describing the status, if any. string reason = 3; - // The 8KB page images, in the same order as the request. Empty if status_code != OK. - repeated bytes page_image = 4; + // The relation that the pages belong to. + RelTag rel = 4; + // The page(s), in the same order as the request. + repeated Page page = 5; +} + +// A page. +// +// TODO: it would be slightly more efficient (but less convenient) to have separate arrays of block +// numbers and images, but given the 8KB page size it's probably negligible. Benchmark it anyway. +message Page { + // The page number. + uint32 block_number = 1; + // The materialized page image, as an 8KB byte vector. + bytes image = 2; } // A GetPageResponse status code. diff --git a/pageserver/page_api/src/client.rs b/pageserver/page_api/src/client.rs index b8d3a77f3e..23e6193de9 100644 --- a/pageserver/page_api/src/client.rs +++ b/pageserver/page_api/src/client.rs @@ -1,4 +1,5 @@ use anyhow::Context as _; +use futures::future::ready; use futures::{Stream, StreamExt as _, TryStreamExt as _}; use tokio::io::AsyncRead; use tokio_util::io::StreamReader; @@ -110,7 +111,7 @@ impl Client { ) -> tonic::Result> + Send + 'static> { let reqs = reqs.map(proto::GetPageRequest::from); let resps = self.inner.get_pages(reqs).await?.into_inner(); - Ok(resps.map_ok(GetPageResponse::from)) + Ok(resps.and_then(|resp| ready(GetPageResponse::try_from(resp).map_err(|err| err.into())))) } /// Returns the size of a relation, as # of blocks. diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index c089a4f1d8..46ebefcaf6 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -359,7 +359,10 @@ impl TryFrom for GetPageRequest { return Err(ProtocolError::Missing("block_number")); } Ok(Self { - request_id: pb.request_id, + request_id: pb + .request_id + .ok_or(ProtocolError::Missing("request_id"))? + .into(), request_class: pb.request_class.into(), read_lsn: pb .read_lsn @@ -374,7 +377,7 @@ impl TryFrom for GetPageRequest { impl From for proto::GetPageRequest { fn from(request: GetPageRequest) -> Self { Self { - request_id: request.request_id, + request_id: Some(request.request_id.into()), request_class: request.request_class.into(), read_lsn: Some(request.read_lsn.into()), rel: Some(request.rel.into()), @@ -383,8 +386,51 @@ impl From for proto::GetPageRequest { } } -/// A GetPage request ID. -pub type RequestID = u64; +/// A GetPage request ID and retry attempt. Should be unique for in-flight requests on a stream. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct RequestID { + /// The base request ID. + pub id: u64, + // The request attempt. Starts at 0, incremented on each retry. + pub attempt: u32, +} + +impl RequestID { + /// Creates a new RequestID with the given ID and an initial attempt of 0. + pub fn new(id: u64) -> Self { + Self { id, attempt: 0 } + } +} + +impl Display for RequestID { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.id, self.attempt) + } +} + +impl From for RequestID { + fn from(pb: proto::RequestId) -> Self { + Self { + id: pb.id, + attempt: pb.attempt, + } + } +} + +impl From for RequestID { + fn from(id: u64) -> Self { + Self::new(id) + } +} + +impl From for proto::RequestId { + fn from(request_id: RequestID) -> Self { + Self { + id: request_id.id, + attempt: request_id.attempt, + } + } +} /// A GetPage request class. #[derive(Clone, Copy, Debug, strum_macros::Display)] @@ -459,32 +505,41 @@ impl From for i32 { pub struct GetPageResponse { /// The original request's ID. pub request_id: RequestID, - /// The response status code. + /// The response status code. If not OK, the `rel` and `pages` fields will be empty. pub status_code: GetPageStatusCode, /// A string describing the status, if any. pub reason: Option, - /// The 8KB page images, in the same order as the request. Empty if status != OK. - pub page_images: Vec, + /// The relation that the pages belong to. + pub rel: RelTag, + // The page(s), in the same order as the request. + pub pages: Vec, } -impl From for GetPageResponse { - fn from(pb: proto::GetPageResponse) -> Self { - Self { - request_id: pb.request_id, +impl TryFrom for GetPageResponse { + type Error = ProtocolError; + + fn try_from(pb: proto::GetPageResponse) -> Result { + Ok(Self { + request_id: pb + .request_id + .ok_or(ProtocolError::Missing("request_id"))? + .into(), status_code: pb.status_code.into(), reason: Some(pb.reason).filter(|r| !r.is_empty()), - page_images: pb.page_image, - } + rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?, + pages: pb.page.into_iter().map(Page::from).collect(), + }) } } impl From for proto::GetPageResponse { fn from(response: GetPageResponse) -> Self { Self { - request_id: response.request_id, + request_id: Some(response.request_id.into()), status_code: response.status_code.into(), reason: response.reason.unwrap_or_default(), - page_image: response.page_images, + rel: Some(response.rel.into()), + page: response.pages.into_iter().map(proto::Page::from).collect(), } } } @@ -517,11 +572,39 @@ impl GetPageResponse { request_id, status_code, reason: Some(status.message().to_string()), - page_images: Vec::new(), + rel: RelTag::default(), + pages: Vec::new(), }) } } +// A page. +#[derive(Clone, Debug)] +pub struct Page { + /// The page number. + pub block_number: u32, + /// The materialized page image, as an 8KB byte vector. + pub image: Bytes, +} + +impl From for Page { + fn from(pb: proto::Page) -> Self { + Self { + block_number: pb.block_number, + image: pb.image, + } + } +} + +impl From for proto::Page { + fn from(page: Page) -> Self { + Self { + block_number: page.block_number, + image: page.image, + } + } +} + /// A GetPage response status code. /// /// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index f5dfc0db25..4086213830 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -27,8 +27,9 @@ tokio-util.workspace = true tonic.workspace = true url.workspace = true -pageserver_client.workspace = true pageserver_api.workspace = true +pageserver_client.workspace = true +pageserver_client_grpc.workspace = true pageserver_page_api.workspace = true utils = { path = "../../libs/utils/" } workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index f14caf548c..30b30d36f6 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -10,12 +10,14 @@ use anyhow::Context; use async_trait::async_trait; use bytes::Bytes; use camino::Utf8PathBuf; +use futures::stream::FuturesUnordered; use futures::{Stream, StreamExt as _}; use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpaceAccum; use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest}; use pageserver_api::reltag::RelTag; use pageserver_api::shard::TenantShardId; +use pageserver_client_grpc::{self as client_grpc, ShardSpec}; use pageserver_page_api as page_api; use rand::prelude::*; use tokio::task::JoinSet; @@ -37,6 +39,10 @@ pub(crate) struct Args { /// Pageserver connection string. Supports postgresql:// and grpc:// protocols. #[clap(long, default_value = "postgres://postgres@localhost:64000")] page_service_connstring: String, + /// Use the rich gRPC Pageserver client `client_grpc::PageserverClient`, rather than the basic + /// no-frills `page_api::Client`. Only valid with grpc:// connstrings. + #[clap(long)] + rich_client: bool, #[clap(long)] pageserver_jwt: Option, #[clap(long, default_value = "1")] @@ -332,6 +338,7 @@ async fn main_impl( let client: Box = match scheme.as_str() { "postgresql" | "postgres" => { assert!(!args.compression, "libpq does not support compression"); + assert!(!args.rich_client, "rich client requires grpc://"); Box::new( LibpqClient::new(&args.page_service_connstring, worker_id.timeline) .await @@ -339,6 +346,16 @@ async fn main_impl( ) } + "grpc" if args.rich_client => Box::new( + RichGrpcClient::new( + &args.page_service_connstring, + worker_id.timeline, + args.compression, + ) + .await + .unwrap(), + ), + "grpc" => Box::new( GrpcClient::new( &args.page_service_connstring, @@ -657,7 +674,7 @@ impl Client for GrpcClient { blks: Vec, ) -> anyhow::Result<()> { let req = page_api::GetPageRequest { - request_id: req_id, + request_id: req_id.into(), request_class: page_api::GetPageClass::Normal, read_lsn: page_api::ReadLsn { request_lsn: req_lsn, @@ -677,6 +694,79 @@ impl Client for GrpcClient { "unexpected status code: {}", resp.status_code, ); - Ok((resp.request_id, resp.page_images)) + Ok(( + resp.request_id.id, + resp.pages.into_iter().map(|p| p.image).collect(), + )) + } +} + +/// A rich gRPC Pageserver client. +struct RichGrpcClient { + inner: Arc, + requests: FuturesUnordered< + Pin> + Send>>, + >, +} + +impl RichGrpcClient { + async fn new( + connstring: &str, + ttid: TenantTimelineId, + compression: bool, + ) -> anyhow::Result { + let inner = Arc::new(client_grpc::PageserverClient::new( + ttid.tenant_id, + ttid.timeline_id, + ShardSpec::new( + [(ShardIndex::unsharded(), connstring.to_string())].into(), + None, + )?, + None, + compression.then_some(tonic::codec::CompressionEncoding::Zstd), + )?); + Ok(Self { + inner, + requests: FuturesUnordered::new(), + }) + } +} + +#[async_trait] +impl Client for RichGrpcClient { + async fn send_get_page( + &mut self, + req_id: u64, + req_lsn: Lsn, + mod_lsn: Lsn, + rel: RelTag, + blks: Vec, + ) -> anyhow::Result<()> { + let req = page_api::GetPageRequest { + request_id: req_id.into(), + request_class: page_api::GetPageClass::Normal, + read_lsn: page_api::ReadLsn { + request_lsn: req_lsn, + not_modified_since_lsn: Some(mod_lsn), + }, + rel, + block_numbers: blks, + }; + let inner = self.inner.clone(); + self.requests.push(Box::pin(async move { + inner + .get_page(req) + .await + .map_err(|err| anyhow::anyhow!("{err}")) + })); + Ok(()) + } + + async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec)> { + let resp = self.requests.next().await.unwrap()?; + Ok(( + resp.request_id.id, + resp.pages.into_iter().map(|p| p.image).collect(), + )) } } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index c04c5af17f..4fa540c4c6 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2,7 +2,9 @@ //! Management HTTP API //! use std::cmp::Reverse; -use std::collections::{BinaryHeap, HashMap}; +use std::collections::BTreeMap; +use std::collections::BinaryHeap; +use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -3217,6 +3219,30 @@ async fn get_utilization( .map_err(ApiError::InternalServerError) } +/// HADRON +async fn list_tenant_visible_size_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + check_permission(&request, None)?; + let state = get_state(&request); + + let mut map = BTreeMap::new(); + for (tenant_shard_id, slot) in state.tenant_manager.list() { + match slot { + TenantSlot::Attached(tenant) => { + let visible_size = tenant.get_visible_size(); + map.insert(tenant_shard_id, visible_size); + } + TenantSlot::Secondary(_) | TenantSlot::InProgress(_) => { + continue; + } + } + } + + json_response(StatusCode::OK, map) +} + async fn list_aux_files( mut request: Request, _cancel: CancellationToken, @@ -4154,6 +4180,7 @@ pub fn make_router( .put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler)) .put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler)) .get("/v1/utilization", |r| api_handler(r, get_utilization)) + .get("/v1/list_tenant_visible_size", |r| api_handler(r, list_tenant_visible_size_handler)) .post( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files", |r| testing_api_handler("ingest_aux_files", r, ingest_aux_files), diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 2882d2ca39..617a5201ec 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -2867,6 +2867,24 @@ pub(crate) static MISROUTED_PAGESTREAM_REQUESTS: Lazy = Lazy::new(|| .expect("failed to define a metric") }); +// Global counter for PageStream request results by outcome. Outcomes are divided into 3 categories: +// - success +// - internal_error: errors that indicate bugs in the storage cluster (e.g. page reconstruction errors, misrouted requests, LSN timeout errors) +// - other_error: transient error conditions that are expected in normal operation or indicate bugs with other parts of the system (e.g. error due to pageserver shutdown, malformed requests etc.) +pub(crate) static PAGESTREAM_HANDLER_RESULTS_TOTAL: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_pagestream_handler_results_total", + "Number of pageserver pagestream handler results by outcome (success, internal_error, other_error)", + &["outcome"] + ) + .expect("failed to define a metric") +}); + +// Constants for pageserver_pagestream_handler_results_total's outcome labels +pub(crate) const PAGESTREAM_HANDLER_OUTCOME_SUCCESS: &str = "success"; +pub(crate) const PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR: &str = "internal_error"; +pub(crate) const PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR: &str = "other_error"; + // Metrics collected on WAL redo operations // // We collect the time spent in actual WAL redo ('redo'), and time waiting diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index e404d212b8..a7f02c07b7 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -70,7 +70,7 @@ use crate::context::{ }; use crate::metrics::{ self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS, - MISROUTED_PAGESTREAM_REQUESTS, SmgrOpTimer, TimelineMetrics, + MISROUTED_PAGESTREAM_REQUESTS, PAGESTREAM_HANDLER_RESULTS_TOTAL, SmgrOpTimer, TimelineMetrics, }; use crate::pgdatadir_mapping::{LsnRange, Version}; use crate::span::{ @@ -1441,20 +1441,57 @@ impl PageServerHandler { let (response_msg, ctx) = match handler_result { Err(e) => match &e.err { PageStreamError::Shutdown => { + // BEGIN HADRON + PAGESTREAM_HANDLER_RESULTS_TOTAL + .with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR]) + .inc(); + // END HADRON + // If we fail to fulfil a request during shutdown, which may be _because_ of // shutdown, then do not send the error to the client. Instead just drop the // connection. span.in_scope(|| info!("dropping connection due to shutdown")); return Err(QueryError::Shutdown); } - PageStreamError::Reconnect(reason) => { - span.in_scope(|| info!("handler requested reconnect: {reason}")); + PageStreamError::Reconnect(_reason) => { + span.in_scope(|| { + // BEGIN HADRON + // We can get here because the compute node is pointing at the wrong PS. We + // already have a metric to keep track of this so suppressing this log to + // reduce log spam. The information in this log message is not going to be that + // helpful given the volume of logs that can be generated. + // info!("handler requested reconnect: {reason}") + // END HADRON + }); + // BEGIN HADRON + PAGESTREAM_HANDLER_RESULTS_TOTAL + .with_label_values(&[ + metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR, + ]) + .inc(); + // END HADRON return Err(QueryError::Reconnect); } PageStreamError::Read(_) | PageStreamError::LsnTimeout(_) | PageStreamError::NotFound(_) | PageStreamError::BadRequest(_) => { + // BEGIN HADRON + if let PageStreamError::Read(_) | PageStreamError::LsnTimeout(_) = &e.err { + PAGESTREAM_HANDLER_RESULTS_TOTAL + .with_label_values(&[ + metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR, + ]) + .inc(); + } else { + PAGESTREAM_HANDLER_RESULTS_TOTAL + .with_label_values(&[ + metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR, + ]) + .inc(); + } + // END HADRON + // print the all details to the log with {:#}, but for the client the // error message is enough. Do not log if shutting down, as the anyhow::Error // here includes cancellation which is not an error. @@ -1472,7 +1509,15 @@ impl PageServerHandler { ) } }, - Ok((response_msg, _op_timer_already_observed, ctx)) => (response_msg, Some(ctx)), + Ok((response_msg, _op_timer_already_observed, ctx)) => { + // BEGIN HADRON + PAGESTREAM_HANDLER_RESULTS_TOTAL + .with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_SUCCESS]) + .inc(); + // END HADRON + + (response_msg, Some(ctx)) + } }; let ctx = ctx.map(|req_ctx| { @@ -3421,9 +3466,12 @@ impl GrpcPageServiceHandler { } /// Generates a PagestreamRequest header from a ReadLsn and request ID. - fn make_hdr(read_lsn: page_api::ReadLsn, req_id: u64) -> PagestreamRequest { + fn make_hdr( + read_lsn: page_api::ReadLsn, + req_id: Option, + ) -> PagestreamRequest { PagestreamRequest { - reqid: req_id, + reqid: req_id.map(|r| r.id).unwrap_or_default(), request_lsn: read_lsn.request_lsn, not_modified_since: read_lsn .not_modified_since_lsn @@ -3533,7 +3581,7 @@ impl GrpcPageServiceHandler { batch.push(BatchedGetPageRequest { req: PagestreamGetPageRequest { - hdr: Self::make_hdr(req.read_lsn, req.request_id), + hdr: Self::make_hdr(req.read_lsn, Some(req.request_id)), rel: req.rel, blkno, }, @@ -3563,12 +3611,16 @@ impl GrpcPageServiceHandler { request_id: req.request_id, status_code: page_api::GetPageStatusCode::Ok, reason: None, - page_images: Vec::with_capacity(results.len()), + rel: req.rel, + pages: Vec::with_capacity(results.len()), }; for result in results { match result { - Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.page_images.push(r.page), + Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.pages.push(page_api::Page { + block_number: r.req.blkno, + image: r.page, + }), Ok((resp, _, _)) => { return Err(tonic::Status::internal(format!( "unexpected response: {resp:?}" @@ -3611,7 +3663,7 @@ impl proto::PageService for GrpcPageServiceHandler { span_record!(rel=%req.rel, lsn=%req.read_lsn); let req = PagestreamExistsRequest { - hdr: Self::make_hdr(req.read_lsn, 0), + hdr: Self::make_hdr(req.read_lsn, None), rel: req.rel, }; @@ -3761,7 +3813,7 @@ impl proto::PageService for GrpcPageServiceHandler { span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn); let req = PagestreamDbSizeRequest { - hdr: Self::make_hdr(req.read_lsn, 0), + hdr: Self::make_hdr(req.read_lsn, None), dbnode: req.db_oid, }; @@ -3811,7 +3863,7 @@ impl proto::PageService for GrpcPageServiceHandler { .await? .downgrade(); while let Some(req) = reqs.message().await? { - let req_id = req.request_id; + let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default(); let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone()) .instrument(span.clone()) // propagate request span .await; @@ -3850,7 +3902,7 @@ impl proto::PageService for GrpcPageServiceHandler { span_record!(rel=%req.rel, lsn=%req.read_lsn); let req = PagestreamNblocksRequest { - hdr: Self::make_hdr(req.read_lsn, 0), + hdr: Self::make_hdr(req.read_lsn, None), rel: req.rel, }; @@ -3883,7 +3935,7 @@ impl proto::PageService for GrpcPageServiceHandler { span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn); let req = PagestreamGetSlruSegmentRequest { - hdr: Self::make_hdr(req.read_lsn, 0), + hdr: Self::make_hdr(req.read_lsn, None), kind: req.kind as u8, segno: req.segno, }; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index a184cb7e91..169f4b091b 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5772,6 +5772,16 @@ impl TenantShard { .unwrap_or(0) } + /// HADRON + /// Return the visible size of all timelines in this tenant. + pub(crate) fn get_visible_size(&self) -> u64 { + let timelines = self.timelines.lock().unwrap(); + timelines + .values() + .map(|t| t.metrics.visible_physical_size_gauge.get()) + .sum() + } + /// Builds a new tenant manifest, and uploads it if it differs from the last-known tenant /// manifest in `Self::remote_tenant_manifest`. /// diff --git a/pageserver/src/tenant/storage_layer/layer_name.rs b/pageserver/src/tenant/storage_layer/layer_name.rs index 0f7995f87b..973852defc 100644 --- a/pageserver/src/tenant/storage_layer/layer_name.rs +++ b/pageserver/src/tenant/storage_layer/layer_name.rs @@ -225,7 +225,7 @@ impl fmt::Display for ImageLayerName { /// storage and object names in remote storage consist of the LayerName plus some extra qualifiers /// that uniquely identify the physical incarnation of a layer (see [crate::tenant::remote_timeline_client::remote_layer_path]) /// and [`crate::tenant::storage_layer::layer::local_layer_path`]) -#[derive(Debug, PartialEq, Eq, Hash, Clone)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Ord, PartialOrd)] pub enum LayerName { Image(ImageLayerName), Delta(DeltaLayerName), diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 3b6c4247c3..05ba6da663 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -1410,7 +1410,7 @@ pg_init_libpagestore(void) "sharding stripe size", NULL, &stripe_size, - 32768, 1, INT_MAX, + 2048, 1, INT_MAX, PGC_SIGHUP, GUC_UNIT_BLOCKS, NULL, NULL, NULL); diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 4b223b6b18..e3a4022664 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -376,6 +376,18 @@ typedef struct PageserverFeedback uint32 shard_number; } PageserverFeedback; +/* BEGIN_HADRON */ +typedef struct WalRateLimiter +{ + /* If the value is 1, PG backends will hit backpressure. */ + pg_atomic_uint32 should_limit; + /* The number of bytes sent in the current second. */ + uint64 sent_bytes; + /* The last recorded time in microsecond. */ + TimestampTz last_recorded_time_us; +} WalRateLimiter; +/* END_HADRON */ + typedef struct WalproposerShmemState { pg_atomic_uint64 propEpochStartLsn; @@ -395,6 +407,11 @@ typedef struct WalproposerShmemState /* aggregated feedback with min LSNs across shards */ PageserverFeedback min_ps_feedback; + + /* BEGIN_HADRON */ + /* The WAL rate limiter */ + WalRateLimiter wal_rate_limiter; + /* END_HADRON */ } WalproposerShmemState; /* diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 185fc83ace..aaf8f43eeb 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -66,6 +66,9 @@ int wal_acceptor_reconnect_timeout = 1000; int wal_acceptor_connection_timeout = 10000; int safekeeper_proto_version = 3; char *safekeeper_conninfo_options = ""; +/* BEGIN_HADRON */ +int databricks_max_wal_mb_per_second = -1; +/* END_HADRON */ /* Set to true in the walproposer bgw. */ static bool am_walproposer; @@ -252,6 +255,18 @@ nwp_register_gucs(void) PGC_POSTMASTER, 0, NULL, NULL, NULL); + + /* BEGIN_HADRON */ + DefineCustomIntVariable( + "databricks.max_wal_mb_per_second", + "The maximum WAL MB per second allowed. If breached, sending WAL hit the backpressure. Setting to -1 disables the limit.", + NULL, + &databricks_max_wal_mb_per_second, + -1, -1, INT_MAX, + PGC_SUSET, + GUC_UNIT_MB, + NULL, NULL, NULL); + /* END_HADRON */ } @@ -393,6 +408,7 @@ assign_neon_safekeepers(const char *newval, void *extra) static uint64 backpressure_lag_impl(void) { + struct WalproposerShmemState* state = NULL; if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0) { XLogRecPtr writePtr; @@ -426,6 +442,18 @@ backpressure_lag_impl(void) return (myFlushLsn - applyPtr - max_replication_apply_lag * MB); } } + + /* BEGIN_HADRON */ + if (databricks_max_wal_mb_per_second == -1) { + return 0; + } + + state = GetWalpropShmemState(); + if (state != NULL && pg_atomic_read_u32(&state->wal_rate_limiter.should_limit) == 1) + { + return 1; + } + /* END_HADRON */ return 0; } @@ -472,6 +500,9 @@ WalproposerShmemInit(void) pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0); pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0); pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0); + /* BEGIN_HADRON */ + pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0); + /* END_HADRON */ } LWLockRelease(AddinShmemInitLock); @@ -487,6 +518,9 @@ WalproposerShmemInit_SyncSafekeeper(void) pg_atomic_init_u64(&walprop_shared->propEpochStartLsn, 0); pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0); pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0); + /* BEGIN_HADRON */ + pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0); + /* END_HADRON */ } #define BACK_PRESSURE_DELAY 10000L // 0.01 sec @@ -521,7 +555,6 @@ backpressure_throttling_impl(void) if (lag == 0) return retry; - old_status = get_ps_display(&len); new_status = (char *) palloc(len + 64 + 1); memcpy(new_status, old_status, len); @@ -1458,6 +1491,8 @@ XLogBroadcastWalProposer(WalProposer *wp) { XLogRecPtr startptr; XLogRecPtr endptr; + struct WalproposerShmemState *state = NULL; + TimestampTz now = 0; /* Start from the last sent position */ startptr = sentPtr; @@ -1502,13 +1537,36 @@ XLogBroadcastWalProposer(WalProposer *wp) * that arbitrary LSN is eventually reported as written, flushed and * applied, so that it can measure the elapsed time. */ - LagTrackerWrite(endptr, GetCurrentTimestamp()); + now = GetCurrentTimestamp(); + LagTrackerWrite(endptr, now); /* Do we have any work to do? */ Assert(startptr <= endptr); if (endptr <= startptr) return; + /* BEGIN_HADRON */ + state = GetWalpropShmemState(); + if (databricks_max_wal_mb_per_second != -1 && state != NULL) + { + uint64 max_wal_bytes = (uint64) databricks_max_wal_mb_per_second * 1024 * 1024; + struct WalRateLimiter *limiter = &state->wal_rate_limiter; + + if (now - limiter->last_recorded_time_us > USECS_PER_SEC) + { + /* Reset the rate limiter */ + limiter->last_recorded_time_us = now; + limiter->sent_bytes = 0; + pg_atomic_exchange_u32(&limiter->should_limit, 0); + } + limiter->sent_bytes += (endptr - startptr); + if (limiter->sent_bytes > max_wal_bytes) + { + pg_atomic_exchange_u32(&limiter->should_limit, 1); + } + } + /* END_HADRON */ + WalProposerBroadcast(wp, startptr, endptr); sentPtr = endptr; diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 9baa80f73a..1f98651e71 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -59,6 +59,15 @@ pub static FLUSH_WAL_SECONDS: Lazy = Lazy::new(|| { .expect("Failed to register safekeeper_flush_wal_seconds histogram") }); /* BEGIN_HADRON */ +// Counter of all ProposerAcceptorMessage requests received +pub static PROPOSER_ACCEPTOR_MESSAGES_TOTAL: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "safekeeper_proposer_acceptor_messages_total", + "Total number of ProposerAcceptorMessage requests received by the Safekeeper.", + &["outcome"] + ) + .expect("Failed to register safekeeper_proposer_acceptor_messages_total counter") +}); pub static WAL_DISK_IO_ERRORS: Lazy = Lazy::new(|| { register_int_counter!( "safekeeper_wal_disk_io_errors", diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 4d15fc9de3..09ca041e22 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -24,7 +24,7 @@ use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; use utils::pageserver_feedback::PageserverFeedback; -use crate::metrics::MISC_OPERATION_SECONDS; +use crate::metrics::{MISC_OPERATION_SECONDS, PROPOSER_ACCEPTOR_MESSAGES_TOTAL}; use crate::state::TimelineState; use crate::{control_file, wal_storage}; @@ -938,7 +938,7 @@ where &mut self, msg: &ProposerAcceptorMessage, ) -> Result> { - match msg { + let res = match msg { ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await, ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await, ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await, @@ -949,7 +949,20 @@ where self.handle_append_request(msg, false).await } ProposerAcceptorMessage::FlushWAL => self.handle_flush().await, - } + }; + + // BEGIN HADRON + match &res { + Ok(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL + .with_label_values(&["success"]) + .inc(), + Err(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL + .with_label_values(&["error"]) + .inc(), + }; + + res + // END HADRON } /// Handle initial message from proposer: check its sanity and send my diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 8e7d957b22..23b9d1c8c9 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -333,6 +333,13 @@ class PageserverHttpClient(requests.Session, MetricsGetter): res = self.post(f"http://localhost:{self.port}/v1/reload_auth_validation_keys") self.verbose_error(res) + def list_tenant_visible_size(self) -> dict[TenantShardId, int]: + res = self.get(f"http://localhost:{self.port}/v1/list_tenant_visible_size") + self.verbose_error(res) + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + def tenant_list(self) -> list[dict[Any, Any]]: res = self.get(f"http://localhost:{self.port}/v1/tenant") self.verbose_error(res) diff --git a/test_runner/fixtures/port_distributor.py b/test_runner/fixtures/port_distributor.py index 6a829a9399..e51d08e16e 100644 --- a/test_runner/fixtures/port_distributor.py +++ b/test_runner/fixtures/port_distributor.py @@ -3,6 +3,7 @@ from __future__ import annotations import re import socket from contextlib import closing +from itertools import cycle from fixtures.log_helper import log @@ -34,15 +35,23 @@ def can_bind(host: str, port: int) -> bool: class PortDistributor: def __init__(self, base_port: int, port_number: int): - self.iterator = iter(range(base_port, base_port + port_number)) + self.base_port = base_port + self.port_number = port_number + self.cycle = cycle(range(base_port, base_port + port_number)) self.port_map: dict[int, int] = {} def get_port(self) -> int: - for port in self.iterator: + checked = 0 + for port in self.cycle: if can_bind("localhost", port): return port + elif checked < self.port_number: + checked += 1 + else: + break + raise RuntimeError( - "port range configured for test is exhausted, consider enlarging the range" + f"port range ({self.base_port}..{self.base_port + self.port_number}) configured for test is exhausted, consider enlarging the range" ) def replace_with_new_port(self, value: int | str) -> int | str: diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index 22e5bf576f..0f0cf4cc6d 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -40,7 +40,7 @@ def prom_parse(client: EndpointHttpClient) -> dict[str, float]: def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) -> Any: if method == PrewarmMethod.POSTGRES: - cur.execute("select get_local_cache_state()") + cur.execute("select neon.get_local_cache_state()") return cur.fetchall()[0][0] if method == PrewarmMethod.AUTOPREWARM: @@ -72,7 +72,7 @@ def prewarm_endpoint( elif method == PrewarmMethod.COMPUTE_CTL: client.prewarm_lfc() elif method == PrewarmMethod.POSTGRES: - cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + cur.execute("select neon.prewarm_local_cache(%s)", (lfc_state,)) def check_prewarmed( @@ -116,7 +116,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): pg_conn = endpoint.connect() pg_cur = pg_conn.cursor() - pg_cur.execute("create extension neon") + pg_cur.execute("create schema neon; create extension neon with schema neon") pg_cur.execute("create database lfc") lfc_conn = endpoint.connect(dbname="lfc") @@ -142,10 +142,12 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): lfc_cur = lfc_conn.cursor() prewarm_endpoint(method, client, pg_cur, lfc_state) - pg_cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'") + pg_cur.execute( + "select lfc_value from neon.neon_lfc_stats where lfc_key='file_cache_used_pages'" + ) lfc_used_pages = pg_cur.fetchall()[0][0] log.info(f"Used LFC size: {lfc_used_pages}") - pg_cur.execute("select * from get_prewarm_info()") + pg_cur.execute("select * from neon.get_prewarm_info()") total, prewarmed, skipped, _ = pg_cur.fetchall()[0] log.info(f"Prewarm info: {total=} {prewarmed=} {skipped=}") progress = (prewarmed + skipped) * 100 // total @@ -186,7 +188,7 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet pg_conn = endpoint.connect() pg_cur = pg_conn.cursor() - pg_cur.execute("create extension neon") + pg_cur.execute("create schema neon; create extension neon with schema neon") pg_cur.execute("CREATE DATABASE lfc") lfc_conn = endpoint.connect(dbname="lfc") diff --git a/test_runner/regress/test_pageserver_api.py b/test_runner/regress/test_pageserver_api.py index 7f9207047e..92889e5de3 100644 --- a/test_runner/regress/test_pageserver_api.py +++ b/test_runner/regress/test_pageserver_api.py @@ -3,6 +3,7 @@ from __future__ import annotations from typing import TYPE_CHECKING from fixtures.common_types import Lsn, TenantId, TimelineId +from fixtures.log_helper import log from fixtures.neon_fixtures import ( DEFAULT_BRANCH_NAME, NeonEnv, @@ -164,3 +165,15 @@ def test_pageserver_http_index_part_force_patch(neon_env_builder: NeonEnvBuilder {"rel_size_migration": "legacy"}, ) assert client.timeline_detail(tenant_id, timeline_id)["rel_size_migration"] == "legacy" + + +def test_pageserver_get_tenant_visible_size(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_pageservers = 1 + env = neon_env_builder.init_start() + env.create_tenant(shard_count=4) + env.create_tenant(shard_count=2) + + json = env.pageserver.http_client().list_tenant_visible_size() + log.info(f"{json}") + # initial tennat + 2 newly created tenants + assert len(json) == 7 diff --git a/test_runner/regress/test_replica_promotes.py b/test_runner/regress/test_replica_promotes.py index 1f26269f40..8d39ac123a 100644 --- a/test_runner/regress/test_replica_promotes.py +++ b/test_runner/regress/test_replica_promotes.py @@ -60,7 +60,7 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod): with primary.connect() as primary_conn: primary_cur = primary_conn.cursor() - primary_cur.execute("create extension neon") + primary_cur.execute("create schema neon;create extension neon with schema neon") primary_cur.execute( "create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)" ) @@ -172,7 +172,7 @@ def test_replica_promote_handler_disconnects(neon_simple_env: NeonEnv): secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") with primary.connect() as conn, conn.cursor() as cur: - cur.execute("create extension neon") + cur.execute("create schema neon;create extension neon with schema neon") cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)") cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)") cur.execute("show neon.safekeepers")