diff --git a/pageserver/client_grpc/src/request_tracker.rs b/pageserver/client_grpc/src/request_tracker.rs index 8d5b77ed5a..5f55f5a58b 100644 --- a/pageserver/client_grpc/src/request_tracker.rs +++ b/pageserver/client_grpc/src/request_tracker.rs @@ -388,7 +388,7 @@ struct ShardedRequestTrackerInner { trackers: std::collections::HashMap, } pub struct ShardedRequestTracker { - inner: Arc>, + inner: Arc>, tcp_client_cache_options: ClientCacheOptions, stream_client_cache_options: ClientCacheOptions, } @@ -425,7 +425,7 @@ impl ShardedRequestTracker { max_total_connections: 64, // Total allowable number of streams }; ShardedRequestTracker { - inner: Arc::new(Mutex::new(ShardedRequestTrackerInner { + inner: Arc::new(std::sync::Mutex::new(ShardedRequestTrackerInner { trackers: std::collections::HashMap::new(), })), tcp_client_cache_options, @@ -499,7 +499,7 @@ impl ShardedRequestTracker { let new_tracker = RequestTracker::new(stream_pool, unary_pool, auth_interceptor, shard); trackers.insert(shard, new_tracker); } - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock().unwrap(); inner.trackers = trackers; } @@ -508,37 +508,24 @@ impl ShardedRequestTracker { req: GetPageRequest, ) -> Result { - // Get shard index from the request - let shard_index = ShardIndex::unsharded(); - let inner = self.inner.lock().await; - let mut tracker : RequestTracker; - if let Some(t) = inner.trackers.get(&shard_index) { - tracker = t.clone(); - } else { - return Err(tonic::Status::not_found(format!("Shard {} not found", shard_index))); - } - drop(inner); - // Call the send_getpage_request method on the tracker + // Get shard index from the request and look up the RequestTracker instance for that shard + let shard_index = ShardIndex::unsharded(); // TODO! + let mut tracker = self.lookup_tracker_for_shard(shard_index)?; + let response = tracker.send_getpage_request(req).await; match response { Ok(resp) => Ok(resp), Err(e) => Err(tonic::Status::unknown(format!("Failed to get page: {}", e))), } } + pub async fn process_get_dbsize_request( &self, request: GetDbSizeRequest, ) -> Result { - let shard_index = ShardIndex::unsharded(); - let inner = self.inner.lock().await; - let tracker: RequestTracker; - if let Some(t) = inner.trackers.get(&shard_index) { - tracker = t.clone(); - } else { - return Err(tonic::Status::not_found(format!("Shard {} not found", shard_index))); - } - drop(inner); // Release the lock before calling send_process_get_dbsize_request - // Call the send_process_get_dbsize_request method on the tracker + // Current sharding model assumes that all metadata is present only at shard 0. + let tracker = self.lookup_tracker_for_shard(ShardIndex::unsharded())?; + let response = tracker.send_process_get_dbsize_request(request).await; match response { Ok(resp) => Ok(resp), @@ -550,16 +537,9 @@ impl ShardedRequestTracker { &self, request: GetRelSizeRequest, ) -> Result { - let shard_index = ShardIndex::unsharded(); - let inner = self.inner.lock().await; - let tracker: RequestTracker; - if let Some(t) = inner.trackers.get(&shard_index) { - tracker = t.clone(); - } else { - return Err(tonic::Status::not_found(format!("Shard {} not found", shard_index))); - } - drop(inner); // Release the lock before calling send_process_get_rel_size_request - // Call the send_process_get_rel_size_request method on the tracker + // Current sharding model assumes that all metadata is present only at shard 0. + let tracker = self.lookup_tracker_for_shard(ShardIndex::unsharded())?; + let response = tracker.send_process_get_rel_size_request(request).await; match response { Ok(resp) => Ok(resp), @@ -571,20 +551,28 @@ impl ShardedRequestTracker { &self, request: CheckRelExistsRequest, ) -> Result { - let shard_index = ShardIndex::unsharded(); - let inner = self.inner.lock().await; - let tracker: RequestTracker; - if let Some(t) = inner.trackers.get(&shard_index) { - tracker = t.clone(); - } else { - return Err(tonic::Status::not_found(format!("Shard {} not found", shard_index))); - } - drop(inner); // Release the lock before calling send_process_check_rel_exists_request - // Call the send_process_check_rel_exists_request method on the tracker + // Current sharding model assumes that all metadata is present only at shard 0. + let tracker = self.lookup_tracker_for_shard(ShardIndex::unsharded())?; + let response = tracker.send_process_check_rel_exists_request(request).await; match response { Ok(resp) => Ok(resp), Err(e) => Err(e), } } + + fn lookup_tracker_for_shard( + &self, + shard_index: ShardIndex, + ) -> Result { + let inner = self.inner.lock().unwrap(); + if let Some(t) = inner.trackers.get(&shard_index) { + Ok(t.clone()) + } else { + Err(tonic::Status::not_found(format!( + "Shard {} not found", + shard_index + ))) + } + } }