mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
Extract repeated code to look up RequestTracker into a helper function
This commit is contained in:
@@ -388,7 +388,7 @@ struct ShardedRequestTrackerInner {
|
||||
trackers: std::collections::HashMap<ShardIndex, RequestTracker>,
|
||||
}
|
||||
pub struct ShardedRequestTracker {
|
||||
inner: Arc<Mutex<ShardedRequestTrackerInner>>,
|
||||
inner: Arc<std::sync::Mutex<ShardedRequestTrackerInner>>,
|
||||
tcp_client_cache_options: ClientCacheOptions,
|
||||
stream_client_cache_options: ClientCacheOptions,
|
||||
}
|
||||
@@ -425,7 +425,7 @@ impl ShardedRequestTracker {
|
||||
max_total_connections: 64, // Total allowable number of streams
|
||||
};
|
||||
ShardedRequestTracker {
|
||||
inner: Arc::new(Mutex::new(ShardedRequestTrackerInner {
|
||||
inner: Arc::new(std::sync::Mutex::new(ShardedRequestTrackerInner {
|
||||
trackers: std::collections::HashMap::new(),
|
||||
})),
|
||||
tcp_client_cache_options,
|
||||
@@ -499,7 +499,7 @@ impl ShardedRequestTracker {
|
||||
let new_tracker = RequestTracker::new(stream_pool, unary_pool, auth_interceptor, shard);
|
||||
trackers.insert(shard, new_tracker);
|
||||
}
|
||||
let mut inner = self.inner.lock().await;
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.trackers = trackers;
|
||||
}
|
||||
|
||||
@@ -508,37 +508,24 @@ impl ShardedRequestTracker {
|
||||
req: GetPageRequest,
|
||||
) -> Result<GetPageResponse, tonic::Status> {
|
||||
|
||||
// Get shard index from the request
|
||||
let shard_index = ShardIndex::unsharded();
|
||||
let inner = self.inner.lock().await;
|
||||
let mut tracker : RequestTracker;
|
||||
if let Some(t) = inner.trackers.get(&shard_index) {
|
||||
tracker = t.clone();
|
||||
} else {
|
||||
return Err(tonic::Status::not_found(format!("Shard {} not found", shard_index)));
|
||||
}
|
||||
drop(inner);
|
||||
// Call the send_getpage_request method on the tracker
|
||||
// Get shard index from the request and look up the RequestTracker instance for that shard
|
||||
let shard_index = ShardIndex::unsharded(); // TODO!
|
||||
let mut tracker = self.lookup_tracker_for_shard(shard_index)?;
|
||||
|
||||
let response = tracker.send_getpage_request(req).await;
|
||||
match response {
|
||||
Ok(resp) => Ok(resp),
|
||||
Err(e) => Err(tonic::Status::unknown(format!("Failed to get page: {}", e))),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process_get_dbsize_request(
|
||||
&self,
|
||||
request: GetDbSizeRequest,
|
||||
) -> Result<u64, tonic::Status> {
|
||||
let shard_index = ShardIndex::unsharded();
|
||||
let inner = self.inner.lock().await;
|
||||
let tracker: RequestTracker;
|
||||
if let Some(t) = inner.trackers.get(&shard_index) {
|
||||
tracker = t.clone();
|
||||
} else {
|
||||
return Err(tonic::Status::not_found(format!("Shard {} not found", shard_index)));
|
||||
}
|
||||
drop(inner); // Release the lock before calling send_process_get_dbsize_request
|
||||
// Call the send_process_get_dbsize_request method on the tracker
|
||||
// Current sharding model assumes that all metadata is present only at shard 0.
|
||||
let tracker = self.lookup_tracker_for_shard(ShardIndex::unsharded())?;
|
||||
|
||||
let response = tracker.send_process_get_dbsize_request(request).await;
|
||||
match response {
|
||||
Ok(resp) => Ok(resp),
|
||||
@@ -550,16 +537,9 @@ impl ShardedRequestTracker {
|
||||
&self,
|
||||
request: GetRelSizeRequest,
|
||||
) -> Result<u32, tonic::Status> {
|
||||
let shard_index = ShardIndex::unsharded();
|
||||
let inner = self.inner.lock().await;
|
||||
let tracker: RequestTracker;
|
||||
if let Some(t) = inner.trackers.get(&shard_index) {
|
||||
tracker = t.clone();
|
||||
} else {
|
||||
return Err(tonic::Status::not_found(format!("Shard {} not found", shard_index)));
|
||||
}
|
||||
drop(inner); // Release the lock before calling send_process_get_rel_size_request
|
||||
// Call the send_process_get_rel_size_request method on the tracker
|
||||
// Current sharding model assumes that all metadata is present only at shard 0.
|
||||
let tracker = self.lookup_tracker_for_shard(ShardIndex::unsharded())?;
|
||||
|
||||
let response = tracker.send_process_get_rel_size_request(request).await;
|
||||
match response {
|
||||
Ok(resp) => Ok(resp),
|
||||
@@ -571,20 +551,28 @@ impl ShardedRequestTracker {
|
||||
&self,
|
||||
request: CheckRelExistsRequest,
|
||||
) -> Result<bool, tonic::Status> {
|
||||
let shard_index = ShardIndex::unsharded();
|
||||
let inner = self.inner.lock().await;
|
||||
let tracker: RequestTracker;
|
||||
if let Some(t) = inner.trackers.get(&shard_index) {
|
||||
tracker = t.clone();
|
||||
} else {
|
||||
return Err(tonic::Status::not_found(format!("Shard {} not found", shard_index)));
|
||||
}
|
||||
drop(inner); // Release the lock before calling send_process_check_rel_exists_request
|
||||
// Call the send_process_check_rel_exists_request method on the tracker
|
||||
// Current sharding model assumes that all metadata is present only at shard 0.
|
||||
let tracker = self.lookup_tracker_for_shard(ShardIndex::unsharded())?;
|
||||
|
||||
let response = tracker.send_process_check_rel_exists_request(request).await;
|
||||
match response {
|
||||
Ok(resp) => Ok(resp),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
fn lookup_tracker_for_shard(
|
||||
&self,
|
||||
shard_index: ShardIndex,
|
||||
) -> Result<RequestTracker, tonic::Status> {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
if let Some(t) = inner.trackers.get(&shard_index) {
|
||||
Ok(t.clone())
|
||||
} else {
|
||||
Err(tonic::Status::not_found(format!(
|
||||
"Shard {} not found",
|
||||
shard_index
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user