diff --git a/Cargo.lock b/Cargo.lock index 89351432c1..228211acc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -701,7 +701,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "itoa", "matchit", @@ -718,7 +718,7 @@ dependencies = [ "sync_wrapper 1.0.1", "tokio", "tokio-tungstenite 0.26.1", - "tower 0.5.2", + "tower", "tower-layer", "tower-service", "tracing", @@ -761,7 +761,7 @@ dependencies = [ "mime", "pin-project-lite", "serde", - "tower 0.5.2", + "tower", "tower-layer", "tower-service", ] @@ -1337,7 +1337,7 @@ dependencies = [ "tokio-postgres", "tokio-stream", "tokio-util", - "tower 0.5.2", + "tower", "tower-http", "tower-otel", "tracing", @@ -2066,7 +2066,7 @@ dependencies = [ "test-log", "tokio", "tokio-util", - "tower 0.5.2", + "tower", "tracing", "utils", "workspace_hack", @@ -2330,7 +2330,7 @@ dependencies = [ "futures-core", "futures-sink", "http-body-util", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "pin-project", "rand 0.8.5", @@ -2883,9 +2883,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.8.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" [[package]] name = "httpdate" @@ -2935,9 +2935,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" dependencies = [ "bytes", "futures-channel", @@ -2977,7 +2977,7 @@ checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "rustls 0.22.4", "rustls-pki-types", @@ -2992,7 +2992,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" dependencies = [ - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "pin-project-lite", "tokio", @@ -3001,20 +3001,20 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.7" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +checksum = "cf9f1e950e0d9d1d3c47184416723cf29c0d1f93bd8cccf37e4beb6b44f31710" dependencies = [ "bytes", "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.0", - "hyper 1.4.1", + "hyper 1.6.0", + "libc", "pin-project-lite", "socket2", "tokio", - "tower 0.4.13", "tower-service", "tracing", ] @@ -4432,6 +4432,23 @@ dependencies = [ "workspace_hack", ] +[[package]] +name = "pageserver_client_grpc" +version = "0.1.0" +dependencies = [ + "bytes", + "futures", + "http 1.1.0", + "pageserver_page_api", + "rand 0.8.5", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tonic 0.13.1", + "tracing", + "utils", +] + [[package]] name = "pageserver_compaction" version = "0.1.0" @@ -5208,7 +5225,7 @@ dependencies = [ "humantime", "humantime-serde", "hyper 0.14.30", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "indexmap 2.9.0", "ipnet", @@ -5604,7 +5621,7 @@ dependencies = [ "http-body-util", "http-types", "humantime-serde", - "hyper 1.4.1", + "hyper 1.6.0", "itertools 0.10.5", "metrics", "once_cell", @@ -5644,7 +5661,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-rustls 0.26.0", "hyper-util", "ipnet", @@ -5701,7 +5718,7 @@ dependencies = [ "futures", "getrandom 0.2.11", "http 1.1.0", - "hyper 1.4.1", + "hyper 1.6.0", "parking_lot 0.11.2", "reqwest", "reqwest-middleware", @@ -6642,12 +6659,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.5" +version = "0.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -6713,7 +6730,7 @@ dependencies = [ "http-body-util", "http-utils", "humantime", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "metrics", "once_cell", @@ -7538,11 +7555,12 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", + "flate2", "h2 0.4.4", "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-timeout", "hyper-util", "percent-encoding", @@ -7553,7 +7571,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.2", "tokio-stream", - "tower 0.5.2", + "tower", "tower-layer", "tower-service", "tracing", @@ -7586,21 +7604,6 @@ dependencies = [ "tonic 0.13.1", ] -[[package]] -name = "tower" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" -dependencies = [ - "futures-core", - "futures-util", - "pin-project", - "pin-project-lite", - "tokio", - "tower-layer", - "tower-service", -] - [[package]] name = "tower" version = "0.5.2" @@ -8591,7 +8594,7 @@ dependencies = [ "hex", "hmac", "hyper 0.14.30", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "indexmap 2.9.0", "itertools 0.12.1", @@ -8645,7 +8648,7 @@ dependencies = [ "tokio-stream", "tokio-util", "toml_edit", - "tower 0.5.2", + "tower", "tracing", "tracing-core", "tracing-log", diff --git a/Cargo.toml b/Cargo.toml index a040010fb7..4790497d8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "pageserver/compaction", "pageserver/ctl", "pageserver/client", + "pageserver/client_grpc", "pageserver/pagebench", "pageserver/page_api", "proxy", @@ -199,7 +200,7 @@ tokio-tar = "0.3" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.8" toml_edit = "0.22" -tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] } +tonic = { version = "0.13.1", default-features = false, features = ["gzip", "channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] } tonic-reflection = { version = "0.13.1", features = ["server"] } tower = { version = "0.5.2", default-features = false } tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] } @@ -254,6 +255,7 @@ metrics = { version = "0.1", path = "./libs/metrics/" } pageserver = { path = "./pageserver" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } pageserver_client = { path = "./pageserver/client" } +pageserver_client_grpc = { path = "./pageserver/client_grpc" } pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" } pageserver_page_api = { path = "./pageserver/page_api" } postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" } diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml new file mode 100644 index 0000000000..580cd8edeb --- /dev/null +++ b/pageserver/client_grpc/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "pageserver_client_grpc" +version = "0.1.0" +edition = "2024" + +[dependencies] +bytes.workspace = true +futures.workspace = true +http.workspace = true +thiserror.workspace = true +tonic.workspace = true +tracing.workspace = true +pageserver_page_api.workspace = true +utils.workspace = true +tokio = { version = "1.43.1", features = ["full", "macros", "net", "io-util", "rt", "rt-multi-thread"] } +rand = "0.8" +tokio-util = { version = "0.7", features = ["compat"] } diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs new file mode 100644 index 0000000000..d83f1493b0 --- /dev/null +++ b/pageserver/client_grpc/src/lib.rs @@ -0,0 +1,188 @@ +// +// Pageserver gRPC client library +// +// This library provides a gRPC client for the pageserver for the +// communicator project. +// +// This library is a work in progress. +// +// TODO: This should properly use the shard map +// + +use std::collections::HashMap; +use bytes::Bytes; +use futures::{StreamExt}; +use thiserror::Error; +use tonic::metadata::AsciiMetadataValue; +use pageserver_page_api::model::*; +use pageserver_page_api::proto; +use pageserver_page_api::proto::PageServiceClient; +use utils::shard::ShardIndex; +use std::fmt::Debug; +use tracing::error; +use tokio::sync::RwLock; +use tonic::transport::{Channel, Endpoint}; + +#[derive(Error, Debug)] +pub enum PageserverClientError { + #[error("could not connect to service: {0}")] + ConnectError(#[from] tonic::transport::Error), + #[error("could not perform request: {0}`")] + RequestError(#[from] tonic::Status), + #[error("protocol error: {0}")] + ProtocolError(#[from] ProtocolError), + #[error("could not perform request: {0}`")] + InvalidUri(#[from] http::uri::InvalidUri), + #[error("could not perform request: {0}`")] + Other(String), +} + +pub struct PageserverClient { + _tenant_id: String, + _timeline_id: String, + _auth_token: Option, + shard_map: HashMap, + channels: tokio::sync::RwLock>, + auth_interceptor: AuthInterceptor, +} + +impl PageserverClient { + /// TODO: this doesn't currently react to changes in the shard map. + pub fn new( + tenant_id: &str, + timeline_id: &str, + auth_token: &Option, + shard_map: HashMap, + ) -> Self { + Self { + _tenant_id: tenant_id.to_string(), + _timeline_id: timeline_id.to_string(), + _auth_token: auth_token.clone(), + shard_map, + channels: RwLock::new(HashMap::new()), + auth_interceptor: AuthInterceptor::new(tenant_id, timeline_id, auth_token.as_deref()), + } + } + // + // TODO: This opens a new gRPC stream for every request, which is extremely inefficient + pub async fn get_page( + &self, + request: &GetPageRequest, + ) -> Result, PageserverClientError> { + // FIXME: calculate the shard number correctly + let shard = ShardIndex::unsharded(); + let chan = self.get_client(shard).await; + + let mut client = + PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); + + let request = proto::GetPageRequest::from(request); + let request_stream = futures::stream::once(std::future::ready(request)); + + let mut response_stream = client + .get_pages(tonic::Request::new(request_stream)) + .await? + .into_inner(); + + let Some(response) = response_stream.next().await else { + return Err(PageserverClientError::Other( + "no response received for getpage request".to_string(), + )); + }; + + match response { + Err(status) => { + return Err(PageserverClientError::RequestError(status)); + } + Ok(resp) => { + let response: GetPageResponse = resp.try_into().unwrap(); + return Ok(response.page_images.to_vec()); + } + } + } + + + // + // TODO: this should use a connection pool with concurrency limits, + // not a single connection to the shard. + // + async fn get_client(&self, shard: ShardIndex) -> Channel { + // Get channel from the hashmap + let mut channels = self.channels.write(); + if let Some(channel) = channels.await.get(&shard) { + return channel.clone(); + } + // Create a new channel if it doesn't exist + let shard_url = self + .shard_map + .get(&shard) + .expect("shard not found in shard map"); + + let attempt = Endpoint::from_shared(shard_url.clone()) + .expect("invalid endpoint") + .connect() + .await; + + match attempt { + Ok(channel) => { + channels = self.channels.write(); + channels.await.insert(shard, channel.clone()); + channel.clone() + } + Err(e) => { + // TODO: handle this more gracefully, e.g. with a connection pool retry + panic!("Failed to connect to shard {shard}: {e}"); + } + } + } +} + +/// Inject tenant_id, timeline_id and authentication token to all pageserver requests. +#[derive(Clone)] +struct AuthInterceptor { + tenant_id: AsciiMetadataValue, + shard_id: Option, + timeline_id: AsciiMetadataValue, + auth_header: Option, // including "Bearer " prefix +} + +impl AuthInterceptor { + fn new(tenant_id: &str, timeline_id: &str, auth_token: Option<&str>) -> Self { + Self { + tenant_id: tenant_id.parse().expect("could not parse tenant id"), + shard_id: None, + timeline_id: timeline_id.parse().expect("could not parse timeline id"), + auth_header: auth_token + .map(|t| format!("Bearer {t}")) + .map(|t| t.parse().expect("could not parse auth token")), + } + } + + fn for_shard(&self, shard_id: ShardIndex) -> Self { + let mut with_shard = self.clone(); + with_shard.shard_id = Some( + shard_id + .to_string() + .parse() + .expect("could not parse shard id"), + ); + with_shard + } +} + +impl tonic::service::Interceptor for AuthInterceptor { + fn call(&mut self, mut req: tonic::Request<()>) -> Result, tonic::Status> { + req.metadata_mut() + .insert("neon-tenant-id", self.tenant_id.clone()); + if let Some(shard_id) = &self.shard_id { + req.metadata_mut().insert("neon-shard-id", shard_id.clone()); + } + req.metadata_mut() + .insert("neon-timeline-id", self.timeline_id.clone()); + if let Some(auth_header) = &self.auth_header { + req.metadata_mut() + .insert("authorization", auth_header.clone()); + } + Ok(req) + } +} diff --git a/pageserver/page_api/src/lib.rs b/pageserver/page_api/src/lib.rs index f515f27f3e..1f656deb80 100644 --- a/pageserver/page_api/src/lib.rs +++ b/pageserver/page_api/src/lib.rs @@ -18,6 +18,6 @@ pub mod proto { pub use page_service_server::{PageService, PageServiceServer}; } -mod model; +pub mod model; pub use model::*; diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 7ab97a994e..baaf65942f 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -102,6 +102,15 @@ impl TryFrom for proto::ReadLsn { } } +impl From<&ReadLsn> for proto::ReadLsn { + fn from(value: &ReadLsn) -> proto::ReadLsn { + proto::ReadLsn { + request_lsn: value.request_lsn.into(), + not_modified_since_lsn: value.not_modified_since_lsn.unwrap_or_default().0, + } + } +} + // RelTag is defined in pageserver_api::reltag. pub type RelTag = pageserver_api::reltag::RelTag; @@ -132,6 +141,16 @@ impl From for proto::RelTag { } } +impl From<&RelTag> for proto::RelTag { + fn from(value: &RelTag) -> proto::RelTag { + proto::RelTag { + spc_oid: value.spcnode, + db_oid: value.dbnode, + rel_number: value.relnode, + fork_number: value.forknum as u32, + } + } +} /// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error. #[derive(Clone, Copy, Debug)] pub struct CheckRelExistsRequest { @@ -311,6 +330,17 @@ impl TryFrom for GetPageRequest { } } +impl From<&GetPageRequest> for proto::GetPageRequest { + fn from(request: &GetPageRequest) -> proto::GetPageRequest { + proto::GetPageRequest { + request_id: request.request_id, + request_class: request.request_class.into(), + read_lsn: Some(request.read_lsn.try_into().unwrap()), + rel: Some(request.rel.into()), + block_number: request.block_numbers.clone().into_vec(), + } + } +} impl TryFrom for proto::GetPageRequest { type Error = ProtocolError;