diff --git a/pageserver/client_grpc/src/request_tracker.rs b/pageserver/client_grpc/src/request_tracker.rs index 26fd3ce864..7a40e6ece7 100644 --- a/pageserver/client_grpc/src/request_tracker.rs +++ b/pageserver/client_grpc/src/request_tracker.rs @@ -18,7 +18,6 @@ use pageserver_page_api::GetPageResponse; use pageserver_page_api::proto; use pageserver_page_api::*; use std::sync::Arc; -use std::sync::Mutex; use std::sync::atomic::AtomicU64; use tonic::{Request, transport::Channel}; @@ -34,85 +33,10 @@ use std::time::Duration; use client_cache::PooledItemFactory; -// -// A mock stream pool that just returns a sending channel, and whenever a GetPageRequest -// comes in on that channel, it randomly sleeps before sending a GetPageResponse -// - #[derive(Clone)] pub struct StreamReturner { sender: tokio::sync::mpsc::Sender, - sender_hashmap: Arc< - tokio::sync::Mutex< - std::collections::HashMap< - u64, - tokio::sync::mpsc::Sender>, - >, - >, - >, -} -pub struct MockStreamFactory {} - -impl MockStreamFactory { - pub fn new() -> Self { - MockStreamFactory {} - } -} -#[async_trait] -impl PooledItemFactory for MockStreamFactory { - async fn create( - &self, - _connect_timeout: Duration, - ) -> Result, tokio::time::error::Elapsed> { - let (sender, mut receiver) = tokio::sync::mpsc::channel::(1000); - // Create a StreamReturner that will send requests to the receiver channel - let stream_returner = StreamReturner { - sender: sender.clone(), - sender_hashmap: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), - }; - - let map = Arc::clone(&stream_returner.sender_hashmap); - tokio::spawn(async move { - while let Some(request) = receiver.recv().await { - // Break out of the loop with 1% chance - if rand::random::() < 0.001 { - break; - } - // Generate a random number between 0 and 100 - // Simulate some processing time - let mapclone = Arc::clone(&map); - tokio::spawn(async move { - let sleep_ms = rand::random::() % 100; - tokio::time::sleep(tokio::time::Duration::from_millis(sleep_ms)).await; - let response = proto::GetPageResponse { - request_id: request.request_id, - ..Default::default() - }; - // look up stream in hash map - let mut hashmap = mapclone.lock().await; - if let Some(sender) = hashmap.get(&request.request_id) { - // Send the response to the original request sender - if let Err(e) = sender.send(Ok(response.clone())).await { - eprintln!("Failed to send response: {}", e); - } - hashmap.remove(&request.request_id); - } else { - eprintln!("No sender found for request ID: {}", request.request_id); - } - }); - } - // Close every sender stream in the hashmap - let hashmap = map.lock().await; - for sender in hashmap.values() { - let error = Status::new(Code::Unknown, "Stream closed"); - if let Err(e) = sender.send(Err(error)).await { - eprintln!("Failed to send close response: {}", e); - } - } - }); - - Ok(Ok(stream_returner)) - } + sender_hashmap: Arc>>>>, } pub struct StreamFactory {