Use new PageserverClient

This commit is contained in:
Erik Grinaker
2025-07-02 11:27:56 +02:00
parent 6f0af96a54
commit 8ab8fc11a3

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::os::fd::AsRawFd;
use std::os::fd::OwnedFd;
use std::path::PathBuf;
use std::str::FromStr as _;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::backend_comms::NeonIOHandle;
@@ -12,7 +13,7 @@ use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess};
use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest};
use crate::neon_request::{NeonIORequest, NeonIOResult};
use crate::worker_process::in_progress_ios::{RequestInProgressKey, RequestInProgressTable};
use pageserver_client_grpc::request_tracker::ShardedRequestTracker;
use pageserver_client_grpc::client::PageserverClient;
use pageserver_page_api as page_api;
use metrics::{IntCounter, IntCounterVec};
@@ -20,6 +21,7 @@ use metrics::{IntCounter, IntCounterVec};
use tokio::io::AsyncReadExt;
use tokio_pipe::PipeRead;
use uring_common::buf::IoBuf;
use utils::id::{TenantId, TimelineId};
use super::callbacks::{get_request_lsn, notify_proc};
@@ -30,7 +32,7 @@ use utils::lsn::Lsn;
pub struct CommunicatorWorkerProcessStruct<'a> {
neon_request_slots: &'a [NeonIOHandle],
request_tracker: ShardedRequestTracker,
client: PageserverClient,
pub(crate) cache: IntegratedCacheWriteAccess<'a>,
@@ -92,16 +94,10 @@ pub(super) async fn init(
.integrated_cache_init_struct
.worker_process_init(last_lsn, file_cache);
let request_tracker = ShardedRequestTracker::new();
request_tracker
.update_shard_map(
shard_map,
None,
tenant_id,
timeline_id,
auth_token.as_deref(),
)
.await;
let tenant_id = TenantId::from_str(&tenant_id).expect("invalid tenant ID");
let timeline_id = TimelineId::from_str(&timeline_id).expect("invalid timeline ID");
let client = PageserverClient::new(tenant_id, timeline_id, shard_map, auth_token)
.expect("count not create client");
let request_counters = IntCounterVec::new(
metrics::core::Opts::new(
@@ -152,7 +148,7 @@ pub(super) async fn init(
CommunicatorWorkerProcessStruct {
neon_request_slots: cis.neon_request_slots,
request_tracker,
client,
cache,
submission_pipe_read_fd: cis.submission_pipe_read_fd,
next_request_id: AtomicU64::new(1),
@@ -260,8 +256,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
};
match self
.request_tracker
.process_check_rel_exists_request(page_api::CheckRelExistsRequest {
.client
.check_rel_exists(page_api::CheckRelExistsRequest {
read_lsn: self.request_lsns(not_modified_since),
rel,
})
@@ -293,8 +289,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let read_lsn = self.request_lsns(not_modified_since);
match self
.request_tracker
.process_get_rel_size_request(page_api::GetRelSizeRequest { read_lsn, rel })
.client
.get_rel_size(page_api::GetRelSizeRequest { read_lsn, rel })
.await
{
Ok(nblocks) => {
@@ -343,8 +339,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
};
match self
.request_tracker
.process_get_dbsize_request(page_api::GetDbSizeRequest {
.client
.get_db_size(page_api::GetDbSizeRequest {
read_lsn: self.request_lsns(not_modified_since),
db_oid: req.db_oid,
})
@@ -466,7 +462,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
// TODO: Use batched protocol
for (blkno, _lsn, dest, _guard) in cache_misses.iter() {
match self
.request_tracker
.client
.get_page(page_api::GetPageRequest {
request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed),
request_class: page_api::GetPageClass::Normal,
@@ -547,7 +543,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
// TODO: Use batched protocol
for (blkno, _lsn, _guard) in cache_misses.iter() {
match self
.request_tracker
.client
.get_page(page_api::GetPageRequest {
request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed),
request_class: page_api::GetPageClass::Prefetch,