From 8b7796cbfab1df30fb8cc15b0f2494ad7bc83b12 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sun, 29 Jun 2025 21:15:37 +0300 Subject: [PATCH] wip --- pageserver/client_grpc/src/request_tracker.rs | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/pageserver/client_grpc/src/request_tracker.rs b/pageserver/client_grpc/src/request_tracker.rs index 5f55f5a58b..ed585660cc 100644 --- a/pageserver/client_grpc/src/request_tracker.rs +++ b/pageserver/client_grpc/src/request_tracker.rs @@ -1,21 +1,26 @@ +//! The request tracker dispatches GetPage- and other requests to pageservers, managing a pool of +//! connections and gRPC streams. +//! +//! There is usually one global instance of ShardedRequestTracker in an application, in particular +//! in the neon extension's communicator process. The application calls the async functions in +//! ShardedRequestTracker, which routes them to the correct pageservers, taking sharding into +//! account. In the future, there can be multiple pageservers per shard, and RequestTracker manages +//! load balancing between them, but that's not implemented yet. -// -// API Visible to the spawner, just a function call that is async -// use std::sync::Arc; -use crate::client_cache; use pageserver_page_api::GetPageRequest; use pageserver_page_api::GetPageResponse; use pageserver_page_api::*; use pageserver_page_api::proto; +use crate::client_cache; use crate::client_cache::ConnectionPool; use crate::client_cache::ChannelFactory; use crate::AuthInterceptor; use tonic::{transport::{Channel}, Request}; use crate::ClientCacheOptions; use crate::PageserverClientAggregateMetrics; -use tokio::sync::Mutex; use std::sync::atomic::AtomicU64; +use std::sync::Mutex; use utils::shard::ShardIndex; @@ -31,7 +36,7 @@ use async_trait::async_trait; use std::time::Duration; use client_cache::PooledItemFactory; -//use tracing::info; + // // A mock stream pool that just returns a sending channel, and whenever a GetPageRequest // comes in on that channel, it randomly sleeps before sending a GetPageResponse @@ -40,7 +45,7 @@ use client_cache::PooledItemFactory; #[derive(Clone)] pub struct StreamReturner { sender: tokio::sync::mpsc::Sender, - sender_hashmap: Arc>>>>, + sender_hashmap: Arc>>>>, } pub struct MockStreamFactory { } @@ -58,11 +63,10 @@ impl PooledItemFactory for MockStreamFactory { // Create a StreamReturner that will send requests to the receiver channel let stream_returner = StreamReturner { sender: sender.clone(), - sender_hashmap: Arc::new(Mutex::new(std::collections::HashMap::new())), + sender_hashmap: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), }; - let map : Arc>>>> - = Arc::clone(&stream_returner.sender_hashmap); + let map = Arc::clone(&stream_returner.sender_hashmap); tokio::spawn(async move { while let Some(request) = receiver.recv().await { @@ -157,9 +161,9 @@ impl PooledItemFactory for StreamFactory { Ok(resp) => { let stream_returner = StreamReturner { sender: sender.clone(), - sender_hashmap: Arc::new(Mutex::new(std::collections::HashMap::new())), + sender_hashmap: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), }; - let map : Arc>>>> + let map : Arc>>>> = Arc::clone(&stream_returner.sender_hashmap); tokio::spawn(async move {