This commit is contained in:
Heikki Linnakangas
2025-06-29 21:15:37 +03:00
parent fdc7e9c2a4
commit 8b7796cbfa

View File

@@ -1,21 +1,26 @@
//! The request tracker dispatches GetPage- and other requests to pageservers, managing a pool of
//! connections and gRPC streams.
//!
//! There is usually one global instance of ShardedRequestTracker in an application, in particular
//! in the neon extension's communicator process. The application calls the async functions in
//! ShardedRequestTracker, which routes them to the correct pageservers, taking sharding into
//! account. In the future, there can be multiple pageservers per shard, and RequestTracker manages
//! load balancing between them, but that's not implemented yet.
//
// API Visible to the spawner, just a function call that is async
//
use std::sync::Arc;
use crate::client_cache;
use pageserver_page_api::GetPageRequest;
use pageserver_page_api::GetPageResponse;
use pageserver_page_api::*;
use pageserver_page_api::proto;
use crate::client_cache;
use crate::client_cache::ConnectionPool;
use crate::client_cache::ChannelFactory;
use crate::AuthInterceptor;
use tonic::{transport::{Channel}, Request};
use crate::ClientCacheOptions;
use crate::PageserverClientAggregateMetrics;
use tokio::sync::Mutex;
use std::sync::atomic::AtomicU64;
use std::sync::Mutex;
use utils::shard::ShardIndex;
@@ -31,7 +36,7 @@ use async_trait::async_trait;
use std::time::Duration;
use client_cache::PooledItemFactory;
//use tracing::info;
//
// A mock stream pool that just returns a sending channel, and whenever a GetPageRequest
// comes in on that channel, it randomly sleeps before sending a GetPageResponse
@@ -40,7 +45,7 @@ use client_cache::PooledItemFactory;
#[derive(Clone)]
pub struct StreamReturner {
sender: tokio::sync::mpsc::Sender<proto::GetPageRequest>,
sender_hashmap: Arc<Mutex<std::collections::HashMap<u64, tokio::sync::mpsc::Sender<Result<proto::GetPageResponse, Status>>>>>,
sender_hashmap: Arc<tokio::sync::Mutex<std::collections::HashMap<u64, tokio::sync::mpsc::Sender<Result<proto::GetPageResponse, Status>>>>>,
}
pub struct MockStreamFactory {
}
@@ -58,11 +63,10 @@ impl PooledItemFactory<StreamReturner> for MockStreamFactory {
// Create a StreamReturner that will send requests to the receiver channel
let stream_returner = StreamReturner {
sender: sender.clone(),
sender_hashmap: Arc::new(Mutex::new(std::collections::HashMap::new())),
sender_hashmap: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
};
let map : Arc<Mutex<std::collections::HashMap<u64, tokio::sync::mpsc::Sender<Result<proto::GetPageResponse, _>>>>>
= Arc::clone(&stream_returner.sender_hashmap);
let map = Arc::clone(&stream_returner.sender_hashmap);
tokio::spawn(async move {
while let Some(request) = receiver.recv().await {
@@ -157,9 +161,9 @@ impl PooledItemFactory<StreamReturner> for StreamFactory {
Ok(resp) => {
let stream_returner = StreamReturner {
sender: sender.clone(),
sender_hashmap: Arc::new(Mutex::new(std::collections::HashMap::new())),
sender_hashmap: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
};
let map : Arc<Mutex<std::collections::HashMap<u64, tokio::sync::mpsc::Sender<Result<proto::GetPageResponse, _>>>>>
let map : Arc<tokio::sync::Mutex<std::collections::HashMap<u64, tokio::sync::mpsc::Sender<Result<proto::GetPageResponse, _>>>>>
= Arc::clone(&stream_returner.sender_hashmap);
tokio::spawn(async move {