pageserver: initial gRPC page service implementation

This commit is contained in:
Erik Grinaker
2025-05-26 12:39:32 +02:00
parent 67ddf1de28
commit 8daf272561
4 changed files with 501 additions and 114 deletions

View File

@@ -1931,7 +1931,7 @@ pub enum PagestreamFeMessage {
}
// Wrapped in libpq CopyData
#[derive(strum_macros::EnumProperty)]
#[derive(Debug, strum_macros::EnumProperty)]
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),

View File

@@ -584,6 +584,7 @@ impl TryFrom<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
type Error = ProtocolError;
fn try_from(segment: GetSlruSegmentResponse) -> Result<Self, Self::Error> {
// TODO: can a segment legitimately be empty?
if segment.is_empty() {
return Err(ProtocolError::Missing("segment"));
}

View File

@@ -804,7 +804,7 @@ fn start_pageserver(
} else {
None
},
basebackup_cache.clone(),
basebackup_cache,
);
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for
@@ -816,12 +816,10 @@ fn start_pageserver(
let mut page_service_grpc = None;
if let Some(grpc_listener) = grpc_listener {
page_service_grpc = Some(page_service::spawn_grpc(
conf,
tenant_manager.clone(),
grpc_auth,
otel_guard.as_ref().map(|g| g.dispatch.clone()),
grpc_listener,
basebackup_cache,
)?);
}

View File

@@ -1,6 +1,7 @@
//! The Page Service listens for client connections and serves their GetPage@LSN
//! requests.
use std::any::Any;
use std::borrow::Cow;
use std::num::NonZeroUsize;
use std::os::fd::AsRawFd;
@@ -31,6 +32,7 @@ use pageserver_api::models::{
};
use pageserver_api::reltag::SlruKind;
use pageserver_api::shard::TenantShardId;
use pageserver_page_api as page_api;
use pageserver_page_api::proto;
use postgres_backend::{
AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
@@ -39,6 +41,7 @@ use postgres_ffi::BLCKSZ;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use pq_proto::framed::ConnectionError;
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
use smallvec::{SmallVec, smallvec};
use strum_macros::IntoStaticStr;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
@@ -76,6 +79,7 @@ use crate::tenant::mgr::{
GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
};
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::timeline::handle::{HandleUpgradeError, WeakHandle};
use crate::tenant::timeline::{self, WaitLsnError};
use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
@@ -165,15 +169,14 @@ pub fn spawn(
/// Spawns a gRPC server for the page service.
///
/// TODO: move this onto GrpcPageServiceHandler::spawn().
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
/// need to reimplement the TCP+TLS accept loop ourselves.
pub fn spawn_grpc(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
listener: std::net::TcpListener,
basebackup_cache: Arc<BasebackupCache>,
) -> anyhow::Result<CancellableTask> {
let cancel = CancellationToken::new();
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
@@ -202,21 +205,16 @@ pub fn spawn_grpc(
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
// Main page service.
let page_service_handler = PageServerHandler::new(
let page_service_handler = GrpcPageServiceHandler {
tenant_manager,
auth.clone(),
PageServicePipeliningConfig::Serial, // TODO: unused with gRPC
conf.get_vectored_concurrent_io,
ConnectionPerfSpanFields::default(),
basebackup_cache,
ctx,
cancel.clone(),
gate.enter().expect("just created"),
);
};
let mut received_at_interceptor = ReceivedAtInterceptor;
let mut tenant_interceptor = TenantMetadataInterceptor;
let mut auth_interceptor = TenantAuthInterceptor::new(auth);
let interceptors = move |mut req: tonic::Request<()>| {
req = received_at_interceptor.call(req)?;
req = tenant_interceptor.call(req)?;
req = auth_interceptor.call(req)?;
Ok(req)
@@ -709,6 +707,89 @@ enum PageStreamError {
BadRequest(Cow<'static, str>),
}
impl PageStreamError {
/// Converts a PageStreamError into a proto::GetPageResponse with the appropriate status
/// code, or a gRPC status if it should terminate the stream (e.g. shutdown). This is a
/// convenience method for use from a get_pages gRPC stream.
#[allow(clippy::result_large_err)]
fn into_get_page_response(
self,
request_id: page_api::RequestID,
) -> Result<proto::GetPageResponse, tonic::Status> {
use page_api::GetPageStatusCode;
use tonic::Code;
// We dispatch to Into<tonic::Status> first, and then map it to a GetPageResponse.
let status: tonic::Status = self.into();
let status_code = match status.code() {
// We shouldn't see an OK status here, because we're emitting an error.
Code::Ok => {
debug_assert_ne!(status.code(), Code::Ok);
return Err(tonic::Status::internal(format!(
"unexpected OK status: {status:?}",
)));
}
// These are per-request errors, returned as GetPageResponses.
Code::AlreadyExists => GetPageStatusCode::InvalidRequest,
Code::DataLoss => GetPageStatusCode::InternalError,
Code::FailedPrecondition => GetPageStatusCode::InvalidRequest,
Code::InvalidArgument => GetPageStatusCode::InvalidRequest,
Code::Internal => GetPageStatusCode::InternalError,
Code::NotFound => GetPageStatusCode::NotFound,
Code::OutOfRange => GetPageStatusCode::InvalidRequest,
Code::ResourceExhausted => GetPageStatusCode::SlowDown,
// These should terminate the stream.
Code::Aborted => return Err(status),
Code::Cancelled => return Err(status),
Code::DeadlineExceeded => return Err(status),
Code::PermissionDenied => return Err(status),
Code::Unauthenticated => return Err(status),
Code::Unavailable => return Err(status),
Code::Unimplemented => return Err(status),
Code::Unknown => return Err(status),
};
Ok(page_api::GetPageResponse {
request_id,
status_code,
reason: Some(status.message().to_string()),
page_images: SmallVec::new(),
}
.into())
}
}
impl From<PageStreamError> for tonic::Status {
fn from(err: PageStreamError) -> Self {
use tonic::Code;
let code = match &err {
PageStreamError::Reconnect(_) => Code::Unavailable,
PageStreamError::Shutdown => Code::Unavailable,
PageStreamError::Read(err) => match err {
PageReconstructError::Cancelled => Code::Unavailable,
PageReconstructError::MissingKey(_) => Code::NotFound,
PageReconstructError::AncestorLsnTimeout(err) => match err {
WaitLsnError::Timeout(_) => Code::Internal,
WaitLsnError::BadState(_) => Code::Internal,
WaitLsnError::Shutdown => Code::Unavailable,
},
PageReconstructError::Other(_) => Code::Internal,
PageReconstructError::WalRedo(_) => Code::Internal,
},
PageStreamError::LsnTimeout(err) => match err {
WaitLsnError::Timeout(_) => Code::Internal,
WaitLsnError::BadState(_) => Code::Internal,
WaitLsnError::Shutdown => Code::Unavailable,
},
PageStreamError::NotFound(_) => Code::NotFound,
PageStreamError::BadRequest(_) => Code::InvalidArgument,
};
tonic::Status::new(code, format!("{err}"))
}
}
impl From<PageReconstructError> for PageStreamError {
fn from(value: PageReconstructError) -> Self {
match value {
@@ -789,37 +870,37 @@ enum BatchedFeMessage {
Exists {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
shard: WeakHandle<TenantManagerTypes>,
req: models::PagestreamExistsRequest,
},
Nblocks {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
shard: WeakHandle<TenantManagerTypes>,
req: models::PagestreamNblocksRequest,
},
GetPage {
span: Span,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
shard: WeakHandle<TenantManagerTypes>,
pages: SmallVec<[BatchedGetPageRequest; 1]>,
batch_break_reason: GetPageBatchBreakReason,
},
DbSize {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
shard: WeakHandle<TenantManagerTypes>,
req: models::PagestreamDbSizeRequest,
},
GetSlruSegment {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
shard: WeakHandle<TenantManagerTypes>,
req: models::PagestreamGetSlruSegmentRequest,
},
#[cfg(feature = "testing")]
Test {
span: Span,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
shard: WeakHandle<TenantManagerTypes>,
requests: Vec<BatchedTestRequest>,
},
RespondError {
@@ -1068,26 +1149,6 @@ impl PageServerHandler {
let neon_fe_msg =
PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
// TODO: turn in to async closure once available to avoid repeating received_at
async fn record_op_start_and_throttle(
shard: &timeline::handle::Handle<TenantManagerTypes>,
op: metrics::SmgrQueryType,
received_at: Instant,
) -> Result<SmgrOpTimer, QueryError> {
// It's important to start the smgr op metric recorder as early as possible
// so that the _started counters are incremented before we do
// any serious waiting, e.g., for throttle, batching, or actual request handling.
let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
let now = Instant::now();
timer.observe_throttle_start(now);
let throttled = tokio::select! {
res = shard.pagestream_throttle.throttle(1, now) => res,
_ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
};
timer.observe_throttle_done(throttled);
Ok(timer)
}
let batched_msg = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let shard = timeline_handles
@@ -1095,7 +1156,7 @@ impl PageServerHandler {
.await?;
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
let timer = Self::record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetRelExists,
received_at,
@@ -1113,7 +1174,7 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
let timer = Self::record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetRelSize,
received_at,
@@ -1131,7 +1192,7 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
let timer = Self::record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetDbSize,
received_at,
@@ -1149,7 +1210,7 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
let timer = Self::record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetSlruSegment,
received_at,
@@ -1274,7 +1335,7 @@ impl PageServerHandler {
// request handler log messages contain the request-specific fields.
let span = mkspan!(shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
let timer = Self::record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetPageAtLsn,
received_at,
@@ -1321,7 +1382,7 @@ impl PageServerHandler {
BatchedFeMessage::GetPage {
span,
shard: shard.downgrade(),
pages: smallvec::smallvec![BatchedGetPageRequest {
pages: smallvec![BatchedGetPageRequest {
req,
timer,
lsn_range: LsnRange {
@@ -1356,6 +1417,26 @@ impl PageServerHandler {
Ok(Some(batched_msg))
}
/// Starts a SmgrOpTimer at received_at and throttles the request.
async fn record_op_start_and_throttle(
shard: &timeline::handle::Handle<TenantManagerTypes>,
op: metrics::SmgrQueryType,
received_at: Instant,
) -> Result<SmgrOpTimer, QueryError> {
// It's important to start the smgr op metric recorder as early as possible
// so that the _started counters are incremented before we do
// any serious waiting, e.g., for throttle, batching, or actual request handling.
let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
let now = Instant::now();
timer.observe_throttle_start(now);
let throttled = tokio::select! {
res = shard.pagestream_throttle.throttle(1, now) => res,
_ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
};
timer.observe_throttle_done(throttled);
Ok(timer)
}
/// Post-condition: `batch` is Some()
#[instrument(skip_all, level = tracing::Level::TRACE)]
#[allow(clippy::boxed_local)]
@@ -1453,8 +1534,11 @@ impl PageServerHandler {
let (mut handler_results, span) = {
// TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
// won't fit on the stack.
let mut boxpinned =
Box::pin(self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx));
let mut boxpinned = Box::pin(Self::pagestream_dispatch_batched_message(
batch,
io_concurrency,
ctx,
));
log_slow(
log_slow_name,
LOG_SLOW_GETPAGE_THRESHOLD,
@@ -1610,7 +1694,6 @@ impl PageServerHandler {
/// Helper which dispatches a batched message to the appropriate handler.
/// Returns a vec of results, along with the extracted trace span.
async fn pagestream_dispatch_batched_message(
&mut self,
batch: BatchedFeMessage,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
@@ -1640,10 +1723,10 @@ impl PageServerHandler {
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
(
vec![
self.handle_get_rel_exists_request(&shard, &req, &ctx)
Self::handle_get_rel_exists_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer, ctx))
.map(|msg| (PagestreamBeMessage::Exists(msg), timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1659,10 +1742,10 @@ impl PageServerHandler {
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
(
vec![
self.handle_get_nblocks_request(&shard, &req, &ctx)
Self::handle_get_nblocks_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer, ctx))
.map(|msg| (PagestreamBeMessage::Nblocks(msg), timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1680,16 +1763,15 @@ impl PageServerHandler {
{
let npages = pages.len();
trace!(npages, "handling getpage request");
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
pages,
io_concurrency,
batch_break_reason,
&ctx,
)
.instrument(span.clone())
.await;
let res = Self::handle_get_page_at_lsn_request_batched(
&shard,
pages,
io_concurrency,
batch_break_reason,
&ctx,
)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
@@ -1706,10 +1788,10 @@ impl PageServerHandler {
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
(
vec![
self.handle_db_size_request(&shard, &req, &ctx)
Self::handle_db_size_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer, ctx))
.map(|msg| (PagestreamBeMessage::DbSize(msg), timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1725,10 +1807,10 @@ impl PageServerHandler {
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
(
vec![
self.handle_get_slru_segment_request(&shard, &req, &ctx)
Self::handle_get_slru_segment_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer, ctx))
.map(|msg| (PagestreamBeMessage::GetSlruSegment(msg), timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1746,8 +1828,7 @@ impl PageServerHandler {
{
let npages = requests.len();
trace!(npages, "handling getpage request");
let res = self
.handle_test_request_batch(&shard, requests, &ctx)
let res = Self::handle_test_request_batch(&shard, requests, &ctx)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
@@ -2301,11 +2382,10 @@ impl PageServerHandler {
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_rel_exists_request(
&mut self,
timeline: &Timeline,
req: &PagestreamExistsRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
) -> Result<PagestreamExistsResponse, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
@@ -2327,19 +2407,15 @@ impl PageServerHandler {
)
.await?;
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
req: *req,
exists,
}))
Ok(PagestreamExistsResponse { req: *req, exists })
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_nblocks_request(
&mut self,
timeline: &Timeline,
req: &PagestreamNblocksRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
) -> Result<PagestreamNblocksResponse, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
@@ -2361,19 +2437,18 @@ impl PageServerHandler {
)
.await?;
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
Ok(PagestreamNblocksResponse {
req: *req,
n_blocks,
}))
})
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_db_size_request(
&mut self,
timeline: &Timeline,
req: &PagestreamDbSizeRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
) -> Result<PagestreamDbSizeResponse, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
@@ -2397,17 +2472,13 @@ impl PageServerHandler {
.await?;
let db_size = total_blocks as i64 * BLCKSZ as i64;
Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
req: *req,
db_size,
}))
Ok(PagestreamDbSizeResponse { req: *req, db_size })
}
#[instrument(skip_all)]
async fn handle_get_page_at_lsn_request_batched(
&mut self,
timeline: &Timeline,
requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
requests: SmallVec<[BatchedGetPageRequest; 1]>,
io_concurrency: IoConcurrency,
batch_break_reason: GetPageBatchBreakReason,
ctx: &RequestContext,
@@ -2532,11 +2603,10 @@ impl PageServerHandler {
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_slru_segment_request(
&mut self,
timeline: &Timeline,
req: &PagestreamGetSlruSegmentRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
) -> Result<PagestreamGetSlruSegmentResponse, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
@@ -2551,16 +2621,13 @@ impl PageServerHandler {
.ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
Ok(PagestreamBeMessage::GetSlruSegment(
PagestreamGetSlruSegmentResponse { req: *req, segment },
))
Ok(PagestreamGetSlruSegmentResponse { req: *req, segment })
}
// NB: this impl mimics what we do for batched getpage requests.
#[cfg(feature = "testing")]
#[instrument(skip_all, fields(shard_id))]
async fn handle_test_request_batch(
&mut self,
timeline: &Timeline,
requests: Vec<BatchedTestRequest>,
_ctx: &RequestContext,
@@ -3300,57 +3367,342 @@ where
}
}
/// Implements the page service over gRPC.
/// Serves the page service over gRPC. Dispatches to PageServerHandler for request processing.
///
/// TODO: not yet implemented, all methods return unimplemented.
/// TODO: add trace spans, interceptors, and sampling.
/// TODO: rename to PageServiceHandler when libpq impl is removed.
pub struct GrpcPageServiceHandler {
tenant_manager: Arc<TenantManager>,
ctx: RequestContext,
}
impl GrpcPageServiceHandler {
/// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of
/// relations and their sizes, as well as SLRU segments and other data.
#[allow(clippy::result_large_err)]
fn ensure_shard_zero(req: &tonic::Request<impl Any>) -> Result<(), tonic::Status> {
match Self::extract::<ShardIndex>(req).shard_number.0 {
0 => Ok(()),
shard => Err(tonic::Status::invalid_argument(format!(
"request must execute on shard zero (is shard {shard})",
))),
}
}
/// Extracts the given type from the request extensions. It must have been set by an
/// interceptor.
fn extract<T: Send + Sync + 'static>(req: &tonic::Request<impl Any>) -> &T {
req.extensions()
.get::<T>()
.expect("extension should be set by interceptor")
}
/// Generates a PagestreamRequest header from a ReadLsn and request ID.
fn make_hdr(read_lsn: page_api::ReadLsn, req_id: u64) -> PagestreamRequest {
PagestreamRequest {
reqid: req_id,
request_lsn: read_lsn.request_lsn,
not_modified_since: read_lsn.not_modified_since_lsn.unwrap_or_default(),
}
}
/// Acquires a timeline handle for the given request. The request must have been decorated by
/// TenantMetadataInterceptor first.
async fn get_request_timeline(
&self,
req: &tonic::Request<impl Any>,
) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
let ttid = *Self::extract::<TenantTimelineId>(req);
let shard_index = *Self::extract::<ShardIndex>(req);
let shard_selector = ShardSelector::Known(shard_index);
// TODO: untangle this from TenantManagerWrapper::resolve() and Cache::get(), to avoid the
// unnecessary overhead.
TimelineHandles::new(self.tenant_manager.clone())
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
.await
}
/// Starts a SmgrOpTimer at received_at, throttles the request, and records execution start.
/// Only errors if the timeline is shutting down.
///
/// TODO: revamp request timers -- in particular,
/// TODO: consider moving throttling out and returning SlowDown errors.
async fn record_op_start_and_throttle(
timeline: &timeline::handle::Handle<TenantManagerTypes>,
op: metrics::SmgrQueryType,
received_at: Instant,
) -> Result<SmgrOpTimer, tonic::Status> {
let mut timer = PageServerHandler::record_op_start_and_throttle(timeline, op, received_at)
.await
.map_err(|err| match err {
// record_op_start_and_throttle() only returns Shutdown.
QueryError::Shutdown => tonic::Status::unavailable(format!("{err}")),
err => tonic::Status::internal(format!("unexpected error: {err}")),
})?;
timer.observe_execution_start(Instant::now());
Ok(timer)
}
/// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC.
///
/// NB: errors will terminate the stream. Per-request errors should return a GetPageResponse
/// with an appropriate status code instead.
async fn handle_get_page_request(
ctx: &RequestContext,
timeline: &WeakHandle<TenantManagerTypes>,
req: proto::GetPageRequest,
io_concurrency: IoConcurrency,
) -> Result<proto::GetPageResponse, tonic::Status> {
let received_at = Instant::now();
let timeline = timeline.upgrade().map_err(|err| match err {
HandleUpgradeError::ShutDown => tonic::Status::unavailable("timeline is shutting down"),
})?;
let ctx = ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request and convert it to a Pagestream request.
let req: page_api::GetPageRequest = req.try_into()?;
let effective_lsn = match PageServerHandler::effective_request_lsn(
&timeline,
timeline.get_last_record_lsn(),
req.read_lsn.request_lsn,
req.read_lsn.not_modified_since_lsn.unwrap_or_default(),
&timeline.get_applied_gc_cutoff_lsn(),
) {
Ok(lsn) => lsn,
Err(err) => return err.into_get_page_response(req.request_id),
};
let mut batch = SmallVec::with_capacity(req.block_numbers.len());
for blkno in req.block_numbers {
// TODO: this creates one timer per page and throttles it. We should have a timer for
// the entire batch, and throttle only the batch, but this is equivalent to what
// PageServerHandler does already so we keep it for now.
let timer = Self::record_op_start_and_throttle(
&timeline,
metrics::SmgrQueryType::GetPageAtLsn,
received_at,
)
.await?;
batch.push(BatchedGetPageRequest {
req: PagestreamGetPageRequest {
hdr: Self::make_hdr(req.read_lsn, req.request_id),
rel: req.rel,
blkno,
},
lsn_range: LsnRange {
effective_lsn,
request_lsn: req.read_lsn.request_lsn,
},
timer,
ctx: ctx.attached_child(),
batch_wait_ctx: None, // TODO: add tracing
});
}
let results = PageServerHandler::handle_get_page_at_lsn_request_batched(
&timeline,
batch,
io_concurrency,
GetPageBatchBreakReason::BatchFull, // TODO: not relevant for gRPC batches
&ctx,
)
.await;
let mut resp = page_api::GetPageResponse {
request_id: req.request_id,
status_code: page_api::GetPageStatusCode::Ok,
reason: None,
page_images: SmallVec::with_capacity(results.len()),
};
for result in results {
match result {
Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.page_images.push(r.page),
Ok((resp, _, _)) => {
return Err(tonic::Status::internal(format!(
"unexpected response: {resp:?}"
)));
}
Err(err) => return err.err.into_get_page_response(req.request_id),
};
}
Ok(resp.into())
}
}
/// Implements the gRPC page service.
///
/// TODO: when the libpq impl is removed, simplify this:
/// * Add Tower middleware for timeline handle, rate limiting, and timing.
/// * Remove the intermediate Pagestream types.
/// * Inline the handler code.
#[tonic::async_trait]
impl proto::PageService for PageServerHandler {
impl proto::PageService for GrpcPageServiceHandler {
type GetBaseBackupStream = Pin<
Box<dyn Stream<Item = Result<proto::GetBaseBackupResponseChunk, tonic::Status>> + Send>,
>;
type GetPagesStream =
Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
async fn check_rel_exists(
&self,
_: tonic::Request<proto::CheckRelExistsRequest>,
req: tonic::Request<proto::CheckRelExistsRequest>,
) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
let received_at = Self::extract::<ReceivedAt>(&req).0;
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request and convert it to a Pagestream request.
Self::ensure_shard_zero(&req)?;
let req: page_api::CheckRelExistsRequest = req.into_inner().try_into()?;
let req = PagestreamExistsRequest {
hdr: Self::make_hdr(req.read_lsn, 0),
rel: req.rel,
};
// Execute the request and convert the response.
let _timer = Self::record_op_start_and_throttle(
&timeline,
metrics::SmgrQueryType::GetRelExists,
received_at,
)
.await?;
let resp = PageServerHandler::handle_get_rel_exists_request(&timeline, &req, &ctx).await?;
let resp: page_api::CheckRelExistsResponse = resp.exists;
Ok(tonic::Response::new(resp.into()))
}
async fn get_base_backup(
&self,
_: tonic::Request<proto::GetBaseBackupRequest>,
) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
Err(tonic::Status::unimplemented("not implemented")) // TODO
}
async fn get_db_size(
&self,
_: tonic::Request<proto::GetDbSizeRequest>,
req: tonic::Request<proto::GetDbSizeRequest>,
) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
let received_at = Self::extract::<ReceivedAt>(&req).0;
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request and convert it to a Pagestream request.
Self::ensure_shard_zero(&req)?;
let req: page_api::GetDbSizeRequest = req.into_inner().try_into()?;
let req = PagestreamDbSizeRequest {
hdr: Self::make_hdr(req.read_lsn, 0),
dbnode: req.db_oid,
};
// Execute the request and convert the response.
let _timer = Self::record_op_start_and_throttle(
&timeline,
metrics::SmgrQueryType::GetDbSize,
received_at,
)
.await?;
let resp = PageServerHandler::handle_db_size_request(&timeline, &req, &ctx).await?;
let resp = resp.db_size as page_api::GetDbSizeResponse;
Ok(tonic::Response::new(resp.into()))
}
async fn get_pages(
&self,
_: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
req: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
// Extract the timeline from the request and check that it exists.
let ttid = *Self::extract::<TenantTimelineId>(&req);
let shard_index = *Self::extract::<ShardIndex>(&req);
let shard_selector = ShardSelector::Known(shard_index);
let mut handles = TimelineHandles::new(self.tenant_manager.clone());
handles
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
.await?;
let ctx = self.ctx.attached_child();
let mut reqs = req.into_inner();
let resps = async_stream::try_stream! {
let timeline = handles
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
.await?
.downgrade();
while let Some(req) = reqs.message().await? {
// TODO: implement IoConcurrency sidecar.
yield Self::handle_get_page_request(&ctx, &timeline, req, IoConcurrency::Sequential).await?
}
};
Ok(tonic::Response::new(Box::pin(resps)))
}
async fn get_rel_size(
&self,
_: tonic::Request<proto::GetRelSizeRequest>,
req: tonic::Request<proto::GetRelSizeRequest>,
) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
let received_at = Self::extract::<ReceivedAt>(&req).0;
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request and convert it to a Pagestream request.
Self::ensure_shard_zero(&req)?;
let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?;
let req = PagestreamNblocksRequest {
hdr: Self::make_hdr(req.read_lsn, 0),
rel: req.rel,
};
// Execute the request and convert the response.
let _timer = Self::record_op_start_and_throttle(
&timeline,
metrics::SmgrQueryType::GetRelSize,
received_at,
)
.await?;
let resp = PageServerHandler::handle_get_nblocks_request(&timeline, &req, &ctx).await?;
let resp: page_api::GetRelSizeResponse = resp.n_blocks;
Ok(tonic::Response::new(resp.into()))
}
async fn get_slru_segment(
&self,
_: tonic::Request<proto::GetSlruSegmentRequest>,
req: tonic::Request<proto::GetSlruSegmentRequest>,
) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
let received_at = Self::extract::<ReceivedAt>(&req).0;
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request and convert it to a Pagestream request.
Self::ensure_shard_zero(&req)?;
let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?;
let req = PagestreamGetSlruSegmentRequest {
hdr: Self::make_hdr(req.read_lsn, 0),
kind: req.kind as u8,
segno: req.segno,
};
// Execute the request and convert the response.
let _timer = Self::record_op_start_and_throttle(
&timeline,
metrics::SmgrQueryType::GetSlruSegment,
received_at,
)
.await?;
let resp =
PageServerHandler::handle_get_slru_segment_request(&timeline, &req, &ctx).await?;
let resp: page_api::GetSlruSegmentResponse = resp.segment;
Ok(tonic::Response::new(resp.try_into()?))
}
}
@@ -3370,10 +3722,24 @@ impl From<GetActiveTenantError> for QueryError {
}
}
/// gRPC interceptor that records the start time of request processing as a ReceivedAt extension.
///
/// TODO: generalize this for other observability information.
#[derive(Clone)]
struct ReceivedAtInterceptor;
#[derive(Clone)]
struct ReceivedAt(Instant);
impl tonic::service::Interceptor for ReceivedAtInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
req.extensions_mut().insert(ReceivedAt(Instant::now()));
Ok(req)
}
}
/// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type
/// TenantTimelineId and ShardIndex.
///
/// TODO: consider looking up the timeline handle here and storing it.
#[derive(Clone)]
struct TenantMetadataInterceptor;
@@ -3486,14 +3852,36 @@ impl From<GetActiveTimelineError> for QueryError {
}
}
impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
impl From<HandleUpgradeError> for QueryError {
fn from(e: HandleUpgradeError) -> Self {
match e {
crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
HandleUpgradeError::ShutDown => QueryError::Shutdown,
}
}
}
impl From<GetActiveTimelineError> for tonic::Status {
fn from(err: GetActiveTimelineError) -> Self {
use tonic::Code;
let code = match &err {
GetActiveTimelineError::Tenant(err) => match err {
GetActiveTenantError::Broken(_) => Code::Internal,
GetActiveTenantError::Cancelled => Code::Unavailable,
GetActiveTenantError::NotFound(_) => Code::NotFound,
GetActiveTenantError::SwitchedTenant => Code::Unavailable,
GetActiveTenantError::WaitForActiveTimeout { .. } => Code::Unavailable,
GetActiveTenantError::WillNotBecomeActive(_) => Code::Unavailable,
},
GetActiveTimelineError::Timeline(err) => match err {
GetTimelineError::NotFound { .. } => Code::NotFound,
GetTimelineError::NotActive { .. } => Code::Unavailable,
GetTimelineError::ShuttingDown => Code::Unavailable,
},
};
tonic::Status::new(code, format!("{err}"))
}
}
fn set_tracing_field_shard_id(timeline: &Timeline) {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
tracing::Span::current().record(