diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index e0638dfc1f..17dad6a560 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::os::fd::AsRawFd; use std::os::fd::OwnedFd; use std::path::PathBuf; +use std::str::FromStr as _; use std::sync::atomic::{AtomicU64, Ordering}; use crate::backend_comms::NeonIOHandle; @@ -12,7 +13,7 @@ use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess}; use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest}; use crate::neon_request::{NeonIORequest, NeonIOResult}; use crate::worker_process::in_progress_ios::{RequestInProgressKey, RequestInProgressTable}; -use pageserver_client_grpc::request_tracker::ShardedRequestTracker; +use pageserver_client_grpc::client::PageserverClient; use pageserver_page_api as page_api; use metrics::{IntCounter, IntCounterVec}; @@ -20,6 +21,7 @@ use metrics::{IntCounter, IntCounterVec}; use tokio::io::AsyncReadExt; use tokio_pipe::PipeRead; use uring_common::buf::IoBuf; +use utils::id::{TenantId, TimelineId}; use super::callbacks::{get_request_lsn, notify_proc}; @@ -30,7 +32,7 @@ use utils::lsn::Lsn; pub struct CommunicatorWorkerProcessStruct<'a> { neon_request_slots: &'a [NeonIOHandle], - request_tracker: ShardedRequestTracker, + client: PageserverClient, pub(crate) cache: IntegratedCacheWriteAccess<'a>, @@ -92,16 +94,10 @@ pub(super) async fn init( .integrated_cache_init_struct .worker_process_init(last_lsn, file_cache); - let request_tracker = ShardedRequestTracker::new(); - request_tracker - .update_shard_map( - shard_map, - None, - tenant_id, - timeline_id, - auth_token.as_deref(), - ) - .await; + let tenant_id = TenantId::from_str(&tenant_id).expect("invalid tenant ID"); + let timeline_id = TimelineId::from_str(&timeline_id).expect("invalid timeline ID"); + let client = PageserverClient::new(tenant_id, timeline_id, shard_map, auth_token) + .expect("count not create client"); let request_counters = IntCounterVec::new( metrics::core::Opts::new( @@ -152,7 +148,7 @@ pub(super) async fn init( CommunicatorWorkerProcessStruct { neon_request_slots: cis.neon_request_slots, - request_tracker, + client, cache, submission_pipe_read_fd: cis.submission_pipe_read_fd, next_request_id: AtomicU64::new(1), @@ -260,8 +256,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { }; match self - .request_tracker - .process_check_rel_exists_request(page_api::CheckRelExistsRequest { + .client + .check_rel_exists(page_api::CheckRelExistsRequest { read_lsn: self.request_lsns(not_modified_since), rel, }) @@ -293,8 +289,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let read_lsn = self.request_lsns(not_modified_since); match self - .request_tracker - .process_get_rel_size_request(page_api::GetRelSizeRequest { read_lsn, rel }) + .client + .get_rel_size(page_api::GetRelSizeRequest { read_lsn, rel }) .await { Ok(nblocks) => { @@ -343,8 +339,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { }; match self - .request_tracker - .process_get_dbsize_request(page_api::GetDbSizeRequest { + .client + .get_db_size(page_api::GetDbSizeRequest { read_lsn: self.request_lsns(not_modified_since), db_oid: req.db_oid, }) @@ -466,7 +462,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // TODO: Use batched protocol for (blkno, _lsn, dest, _guard) in cache_misses.iter() { match self - .request_tracker + .client .get_page(page_api::GetPageRequest { request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed), request_class: page_api::GetPageClass::Normal, @@ -547,7 +543,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // TODO: Use batched protocol for (blkno, _lsn, _guard) in cache_misses.iter() { match self - .request_tracker + .client .get_page(page_api::GetPageRequest { request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed), request_class: page_api::GetPageClass::Prefetch,