mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
Remove ununused mock factory
After reading the code a few times, I didn't quite understand what it was, to be honest, or how it was going to be used. Remove it now to reduce noise, but we can resurrect it from git history if we need it in the future.
This commit is contained in:
@@ -18,7 +18,6 @@ use pageserver_page_api::GetPageResponse;
|
||||
use pageserver_page_api::proto;
|
||||
use pageserver_page_api::*;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use tonic::{Request, transport::Channel};
|
||||
|
||||
@@ -34,85 +33,10 @@ use std::time::Duration;
|
||||
|
||||
use client_cache::PooledItemFactory;
|
||||
|
||||
//
|
||||
// A mock stream pool that just returns a sending channel, and whenever a GetPageRequest
|
||||
// comes in on that channel, it randomly sleeps before sending a GetPageResponse
|
||||
//
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct StreamReturner {
|
||||
sender: tokio::sync::mpsc::Sender<proto::GetPageRequest>,
|
||||
sender_hashmap: Arc<
|
||||
tokio::sync::Mutex<
|
||||
std::collections::HashMap<
|
||||
u64,
|
||||
tokio::sync::mpsc::Sender<Result<proto::GetPageResponse, Status>>,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
}
|
||||
pub struct MockStreamFactory {}
|
||||
|
||||
impl MockStreamFactory {
|
||||
pub fn new() -> Self {
|
||||
MockStreamFactory {}
|
||||
}
|
||||
}
|
||||
#[async_trait]
|
||||
impl PooledItemFactory<StreamReturner> for MockStreamFactory {
|
||||
async fn create(
|
||||
&self,
|
||||
_connect_timeout: Duration,
|
||||
) -> Result<Result<StreamReturner, tonic::Status>, tokio::time::error::Elapsed> {
|
||||
let (sender, mut receiver) = tokio::sync::mpsc::channel::<proto::GetPageRequest>(1000);
|
||||
// Create a StreamReturner that will send requests to the receiver channel
|
||||
let stream_returner = StreamReturner {
|
||||
sender: sender.clone(),
|
||||
sender_hashmap: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
|
||||
};
|
||||
|
||||
let map = Arc::clone(&stream_returner.sender_hashmap);
|
||||
tokio::spawn(async move {
|
||||
while let Some(request) = receiver.recv().await {
|
||||
// Break out of the loop with 1% chance
|
||||
if rand::random::<f32>() < 0.001 {
|
||||
break;
|
||||
}
|
||||
// Generate a random number between 0 and 100
|
||||
// Simulate some processing time
|
||||
let mapclone = Arc::clone(&map);
|
||||
tokio::spawn(async move {
|
||||
let sleep_ms = rand::random::<u64>() % 100;
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(sleep_ms)).await;
|
||||
let response = proto::GetPageResponse {
|
||||
request_id: request.request_id,
|
||||
..Default::default()
|
||||
};
|
||||
// look up stream in hash map
|
||||
let mut hashmap = mapclone.lock().await;
|
||||
if let Some(sender) = hashmap.get(&request.request_id) {
|
||||
// Send the response to the original request sender
|
||||
if let Err(e) = sender.send(Ok(response.clone())).await {
|
||||
eprintln!("Failed to send response: {}", e);
|
||||
}
|
||||
hashmap.remove(&request.request_id);
|
||||
} else {
|
||||
eprintln!("No sender found for request ID: {}", request.request_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
// Close every sender stream in the hashmap
|
||||
let hashmap = map.lock().await;
|
||||
for sender in hashmap.values() {
|
||||
let error = Status::new(Code::Unknown, "Stream closed");
|
||||
if let Err(e) = sender.send(Err(error)).await {
|
||||
eprintln!("Failed to send close response: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Ok(stream_returner))
|
||||
}
|
||||
sender_hashmap: Arc<tokio::sync::Mutex<std::collections::HashMap<u64, tokio::sync::mpsc::Sender<Result<proto::GetPageResponse, Status>>>>>,
|
||||
}
|
||||
|
||||
pub struct StreamFactory {
|
||||
|
||||
Reference in New Issue
Block a user