diff --git a/Cargo.lock b/Cargo.lock index 98fd2fa2f9..01424856a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4340,6 +4340,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-stream", "tokio-util", "tracing", "utils", @@ -4526,6 +4527,7 @@ name = "pageserver_client_grpc" version = "0.1.0" dependencies = [ "bytes", + "futures", "http 1.1.0", "pageserver_data_api", "thiserror 1.0.69", diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index 3a2e4150b1..65c23840e1 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -5,6 +5,7 @@ edition = "2024" [dependencies] bytes.workspace = true +futures.workspace = true http.workspace = true thiserror.workspace = true tonic.workspace = true diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 3115990331..4d70241329 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -7,9 +7,8 @@ use std::collections::HashMap; use std::sync::RwLock; use bytes::Bytes; -use http; +use futures::Stream; use thiserror::Error; -use tonic; use tonic::metadata::AsciiMetadataValue; use tonic::transport::Channel; @@ -108,6 +107,21 @@ impl PageserverClient { Ok(response.into_inner().page_image) } + pub async fn get_pages( + &self, + requests: impl Stream + Send + 'static, + ) -> std::result::Result< + tonic::Response>, + PageserverClientError, + > { + // FIXME: calculate the shard number correctly + let shard_no = 0; + + let mut client = self.get_client(shard_no).await?; + + Ok(client.get_pages(tonic::Request::new(requests)).await?) + } + /// Process a request to get the size of a database. pub async fn process_dbsize_request( &self, diff --git a/pageserver/data_api/build.rs b/pageserver/data_api/build.rs index 8a4dfca836..4af621df1c 100644 --- a/pageserver/data_api/build.rs +++ b/pageserver/data_api/build.rs @@ -1,7 +1,7 @@ fn main() -> Result<(), Box> { // Generate rust code from .proto protobuf. tonic_build::configure() - .bytes(&["."]) + .bytes(["."]) .compile_protos(&["proto/page_service.proto"], &["proto"]) .unwrap_or_else(|e| panic!("failed to compile protos {:?}", e)); Ok(()) diff --git a/pageserver/data_api/proto/page_service.proto b/pageserver/data_api/proto/page_service.proto index 0e9116f39a..d9ddf41794 100644 --- a/pageserver/data_api/proto/page_service.proto +++ b/pageserver/data_api/proto/page_service.proto @@ -17,8 +17,12 @@ service PageService { // Returns size of a relation, as # of blocks rpc RelSize (RelSizeRequest) returns (RelSizeResponse); + // Fetches a page. rpc GetPage (GetPageRequest) returns (GetPageResponse); + // Streaming GetPage protocol. + rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse); + // Returns total size of a database, as # of bytes rpc DbSize (DbSizeRequest) returns (DbSizeResponse); diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index c41007f3bb..ed4b630d95 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -15,11 +15,12 @@ hdrhistogram.workspace = true humantime.workspace = true humantime-serde.workspace = true rand.workspace = true -reqwest.workspace=true +reqwest.workspace = true serde.workspace = true serde_json.workspace = true tracing.workspace = true tokio.workspace = true +tokio-stream.workspace = true tokio-util.workspace = true pageserver_client.workspace = true diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index bcd7710239..00e4c69430 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -9,7 +9,6 @@ use anyhow::Context; use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api::ForceAwaitLogicalSize; use pageserver_client::page_service::BasebackupRequest; -use pageserver_client_grpc; use pageserver_data_api::model::{GetBaseBackupRequest, RequestCommon}; use rand::prelude::*; diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 2b535d8507..08f4f4ef83 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -29,6 +29,8 @@ use crate::util::{request_stats, tokio_thread_local_stats}; pub(crate) struct Args { #[clap(long, default_value = "false")] grpc: bool, + #[clap(long, default_value = "false")] + grpc_stream: bool, #[clap(long, default_value = "http://localhost:9898")] mgmt_api_endpoint: String, #[clap(long, default_value = "postgres://postgres@localhost:64000")] @@ -299,7 +301,18 @@ async fn main_impl( .unwrap(); Box::pin(async move { - if args.grpc { + if args.grpc_stream { + client_grpc_stream( + args, + worker_id, + ss, + cancel, + rps_period, + ranges, + weights, + ) + .await + } else if args.grpc { client_grpc( args, worker_id, @@ -461,6 +474,7 @@ async fn client_libpq( } } +#[allow(clippy::too_many_arguments)] async fn client_grpc( args: &Args, worker_id: WorkerId, @@ -557,3 +571,103 @@ async fn client_grpc( } } } + +async fn client_grpc_stream( + args: &Args, + worker_id: WorkerId, + shared_state: Arc, + cancel: CancellationToken, + rps_period: Option, + ranges: Vec, + weights: rand::distributions::weighted::WeightedIndex, +) { + let shard_map = HashMap::from([(0, args.page_service_connstring.clone())]); + let client = pageserver_client_grpc::PageserverClient::new( + &worker_id.timeline.tenant_id.to_string(), + &worker_id.timeline.timeline_id.to_string(), + &None, + shard_map, + ); + + let (request_tx, request_rx) = tokio::sync::mpsc::channel(1); + let request_stream = tokio_stream::wrappers::ReceiverStream::new(request_rx); + let mut response_stream = client.get_pages(request_stream).await.unwrap().into_inner(); + + shared_state.start_work_barrier.wait().await; + let client_start = Instant::now(); + let mut ticks_processed = 0; + let mut inflight = VecDeque::new(); + + while !cancel.is_cancelled() { + // Detect if a request took longer than the RPS rate + if let Some(period) = &rps_period { + let periods_passed_until_now = + usize::try_from(client_start.elapsed().as_micros() / period.as_micros()).unwrap(); + + if periods_passed_until_now > ticks_processed { + shared_state.live_stats.missed((periods_passed_until_now - ticks_processed) as u64); + } + ticks_processed = periods_passed_until_now; + } + + // Send requests until the queue depth is reached + while inflight.len() < args.queue_depth.get() { + let start = Instant::now(); + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = Key::from_i128(key); + assert!(key.is_rel_block_key()); + let (rel_tag, block_no) = key + .to_rel_block() + .expect("we filter non-rel-block keys out above"); + pageserver_data_api::model::GetPageRequest { + common: pageserver_data_api::model::RequestCommon { + request_lsn: if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }, + not_modified_since_lsn: r.timeline_lsn, + }, + rel: pageserver_data_api::model::RelTag { + spc_oid: rel_tag.spcnode, + db_oid: rel_tag.dbnode, + rel_number: rel_tag.relnode, + fork_number: rel_tag.forknum, + }, + block_number: block_no, + } + }; + request_tx.send((&req).into()).await.unwrap(); + inflight.push_back(start); + } + + // Receive responses for the inflight requests + if let Some(response) = response_stream.next().await { + response.unwrap(); // Ensure the response is successful + let start = inflight.pop_front().unwrap(); + let end = Instant::now(); + shared_state.live_stats.request_done(); + ticks_processed += 1; + STATS.with(|stats| { + stats + .borrow() + .lock() + .unwrap() + .observe(end.duration_since(start)) + .unwrap(); + }); + } + + // Enforce RPS limit if specified + if let Some(period) = &rps_period { + let next_at = client_start + + Duration::from_micros( + (ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(), + ); + tokio::time::sleep_until(next_at.into()).await; + } + } +} diff --git a/pageserver/src/compute_service_grpc.rs b/pageserver/src/compute_service_grpc.rs index 337c249187..5c5dc109f9 100644 --- a/pageserver/src/compute_service_grpc.rs +++ b/pageserver/src/compute_service_grpc.rs @@ -31,6 +31,8 @@ use crate::tenant::mgr::ShardResolveResult; use crate::tenant::mgr::ShardSelector; use crate::tenant::storage_layer::IoConcurrency; use crate::tenant::timeline::WaitLsnTimeout; +use async_stream::try_stream; +use futures::Stream; use tokio::io::{AsyncWriteExt, ReadHalf, SimplexStream}; use tokio::task::JoinHandle; use tokio_util::codec::{Decoder, FramedRead}; @@ -47,10 +49,11 @@ use bytes::BytesMut; use jsonwebtoken::TokenData; use tracing::Instrument; use tracing::{debug, error}; -use utils::auth::SwappableJwtAuth; +use utils::auth::{Claims, SwappableJwtAuth}; use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; +use utils::shard::ShardIndex; use utils::simple_rcu::RcuReadGuard; use crate::tenant::PageReconstructError; @@ -144,6 +147,8 @@ fn convert_reltag(value: &model::RelTag) -> pageserver_api::reltag::RelTag { #[tonic::async_trait] impl PageService for PageServiceService { type GetBaseBackupStream = GetBaseBackupStream; + type GetPagesStream = + Pin> + Send>>; async fn rel_exists( &self, @@ -258,14 +263,64 @@ impl PageService for PageServiceService { ) .await?; - Ok(tonic::Response::new(proto::GetPageResponse { - page_image: page_image, - })) + Ok(tonic::Response::new(proto::GetPageResponse { page_image })) } .instrument(span) .await } + async fn get_pages( + &self, + request: tonic::Request>, + ) -> Result, tonic::Status> { + // TODO: pass the shard index in the request metadata. + let ttid = self.extract_ttid(request.metadata())?; + let timeline = self + .get_timeline(ttid, ShardSelector::Known(ShardIndex::unsharded())) + .await?; + let ctx = self.ctx.with_scope_timeline(&timeline); + let conf = self.conf; + + let mut request_stream = request.into_inner(); + + let response_stream = try_stream! { + while let Some(request) = request_stream.message().await? { + let guard = timeline + .gate + .enter() + .or(Err(tonic::Status::unavailable("timeline is shutting down")))?; + + let request: model::GetPageRequest = (&request).try_into()?; + let rel = convert_reltag(&request.rel); + let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); + let lsn = Self::wait_or_get_last_lsn( + &timeline, + request.common.request_lsn, + request.common.not_modified_since_lsn, + &latest_gc_cutoff_lsn, + &ctx, + ) + .await?; + + let page_image = timeline + .get_rel_page_at_lsn( + rel, + request.block_number, + Version::Lsn(lsn), + &ctx, + IoConcurrency::spawn_from_conf(conf, guard), + ) + .await?; + + yield proto::GetPageResponse { page_image }; + } + }; + + Ok(tonic::Response::new( + Box::pin(response_stream) as Self::GetPagesStream + )) + } + async fn db_size( &self, request: tonic::Request, @@ -641,7 +696,7 @@ impl tonic::service::Interceptor for PageServiceAuthenticator { let jwtdata: TokenData = auth .decode(jwt) .map_err(|err| tonic::Status::unauthenticated(format!("invalid JWT token: {}", err)))?; - let claims = jwtdata.claims; + let claims: Claims = jwtdata.claims; if matches!(claims.scope, utils::auth::Scope::Tenant) && claims.tenant_id.is_none() { return Err(tonic::Status::unauthenticated( @@ -669,7 +724,6 @@ impl tonic::service::Interceptor for PageServiceAuthenticator { /// /// The first part of the Chain chunks the tarball. The second part checks the return value /// of the send_basebackup_tarball Future that created the tarball. - type GetBaseBackupStream = futures::stream::Chain; fn new_basebackup_response_stream( diff --git a/pgxn/neon/communicator/build.rs b/pgxn/neon/communicator/build.rs index 851a2d9b37..ef570c3d0a 100644 --- a/pgxn/neon/communicator/build.rs +++ b/pgxn/neon/communicator/build.rs @@ -1,5 +1,3 @@ -use cbindgen; - use std::env; fn main() -> Result<(), Box> { diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index a9a06d6225..a5cc976bc5 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -120,7 +120,7 @@ pub extern "C" fn bcomm_start_get_page_v_request<'t>( // Tell the communicator about it bs.submit_request(request_idx); - return request_idx; + request_idx } /// Check if a request has completed. Returns: diff --git a/pgxn/neon/communicator/src/worker_process/logging.rs b/pgxn/neon/communicator/src/worker_process/logging.rs index 9eeb4340fa..756d338efa 100644 --- a/pgxn/neon/communicator/src/worker_process/logging.rs +++ b/pgxn/neon/communicator/src/worker_process/logging.rs @@ -116,7 +116,7 @@ struct EventBuilder<'a> { maker: &'a Maker, } -impl<'a> std::io::Write for EventBuilder<'a> { +impl std::io::Write for EventBuilder<'_> { fn write(&mut self, buf: &[u8]) -> std::io::Result { self.event.message.write(buf) } @@ -126,7 +126,7 @@ impl<'a> std::io::Write for EventBuilder<'a> { } } -impl<'a> Drop for EventBuilder<'a> { +impl Drop for EventBuilder<'_> { fn drop(&mut self) { let maker = self.maker; let event = std::mem::take(&mut self.event); diff --git a/pgxn/neon/communicator/src/worker_process/worker_interface.rs b/pgxn/neon/communicator/src/worker_process/worker_interface.rs index 49c78713b4..f2967faf6b 100644 --- a/pgxn/neon/communicator/src/worker_process/worker_interface.rs +++ b/pgxn/neon/communicator/src/worker_process/worker_interface.rs @@ -29,14 +29,7 @@ pub extern "C" fn communicator_worker_process_launch( // Convert the arguments into more convenient Rust types let tenant_id = unsafe { CStr::from_ptr(tenant_id) }.to_str().unwrap(); let timeline_id = unsafe { CStr::from_ptr(timeline_id) }.to_str().unwrap(); - let auth_token = { - if auth_token.is_null() { - None - } else { - let c_str = unsafe { CStr::from_ptr(auth_token) }; - Some(c_str.to_str().unwrap().to_string()) - } - }; + let auth_token = unsafe { auth_token.as_ref() }.map(|s| s.to_string()); let file_cache_path = { if file_cache_path.is_null() { None