diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 19e5d60ea3..3a4060e9a8 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -457,7 +457,7 @@ pub struct JwksSettings { } /// Protocol used to connect to a Pageserver. Parsed from the connstring scheme. -#[derive(Clone, Copy, Debug, Default)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub enum PageserverProtocol { /// The original protocol based on libpq and COPY. Uses postgresql:// or postgres:// scheme. #[default] diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index 0e9bf9ccdf..a4ab4a9a3b 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -11,9 +11,16 @@ http.workspace = true thiserror.workspace = true tonic.workspace = true tracing.workspace = true -tokio = { version = "1.43.1", features = ["full", "macros", "net", "io-util", "rt", "rt-multi-thread"] } +tokio = { version = "1.43.1", features = [ + "full", + "macros", + "net", + "io-util", + "rt", + "rt-multi-thread", +] } uuid = { version = "1", features = ["v4"] } -tower = { version = "0.4", features = ["timeout", "util"] } +tower = { version = "0.4", features = ["timeout", "util"] } rand = "0.8" tokio-util = { version = "0.7", features = ["compat"] } hyper-util = "0.1.9" @@ -25,6 +32,7 @@ async-trait = { version = "0.1" } tokio-stream = "0.1" dashmap = "5" chrono = { version = "0.4", features = ["serde"] } +compute_api.workspace = true pageserver_page_api.workspace = true diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 3e4da72715..ad2ee18761 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::{anyhow, ensure}; +use compute_api::spec::PageserverProtocol; use pageserver_page_api as page_api; use tokio::time::Instant; use tracing::{error, info, instrument, warn}; @@ -28,7 +29,8 @@ pub struct PageserverClient { } impl PageserverClient { - /// Creates a new Pageserver client. + /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given + /// in the shard map, which must be complete and must use gRPC URLs. pub fn new( tenant_id: TenantId, timeline_id: TimelineId, @@ -334,6 +336,11 @@ impl Shard { shard_id: ShardIndex, auth_token: Option, ) -> anyhow::Result { + // Sanity-check that the URL uses gRPC. + if PageserverProtocol::from_connstring(&url)? != PageserverProtocol::Grpc { + return Err(anyhow!("invalid shard URL {url}: must use gRPC")); + } + // Use a common channel pool for all clients, to multiplex unary and stream requests across // the same TCP connections. The channel pool is unbounded (but client pools are bounded). let channel_pool = ChannelPool::new(url)?;