mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-05 19:50:36 +00:00
Compare commits
4 Commits
conrad/par
...
erik/grpc-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2d2d29bf38 | ||
|
|
e8ebb8e433 | ||
|
|
232591e457 | ||
|
|
8daf272561 |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -4236,6 +4236,7 @@ name = "pagebench"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"camino",
|
||||
"clap",
|
||||
"futures",
|
||||
@@ -4244,12 +4245,15 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_page_api",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
|
||||
@@ -1931,7 +1931,7 @@ pub enum PagestreamFeMessage {
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
#[derive(strum_macros::EnumProperty)]
|
||||
#[derive(Debug, strum_macros::EnumProperty)]
|
||||
pub enum PagestreamBeMessage {
|
||||
Exists(PagestreamExistsResponse),
|
||||
Nblocks(PagestreamNblocksResponse),
|
||||
@@ -2042,7 +2042,7 @@ pub enum PagestreamProtocolVersion {
|
||||
|
||||
pub type RequestId = u64;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamRequest {
|
||||
pub reqid: RequestId,
|
||||
pub request_lsn: Lsn,
|
||||
@@ -2061,7 +2061,7 @@ pub struct PagestreamNblocksRequest {
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamGetPageRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub rel: RelTag,
|
||||
|
||||
@@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize};
|
||||
// FIXME: should move 'forknum' as last field to keep this consistent with Postgres.
|
||||
// Then we could replace the custom Ord and PartialOrd implementations below with
|
||||
// deriving them. This will require changes in walredoproc.c.
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct RelTag {
|
||||
pub forknum: u8,
|
||||
pub spcnode: Oid,
|
||||
|
||||
@@ -584,6 +584,7 @@ impl TryFrom<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(segment: GetSlruSegmentResponse) -> Result<Self, Self::Error> {
|
||||
// TODO: can a segment legitimately be empty?
|
||||
if segment.is_empty() {
|
||||
return Err(ProtocolError::Missing("segment"));
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
@@ -15,14 +16,17 @@ hdrhistogram.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace=true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
pageserver_client.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_page_api.workspace = true
|
||||
utils = { path = "../../libs/utils/" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
@@ -7,11 +7,15 @@ use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
|
||||
use pageserver_api::models::{
|
||||
PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_page_api::proto;
|
||||
use rand::prelude::*;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -22,6 +26,12 @@ use utils::lsn::Lsn;
|
||||
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
|
||||
use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
|
||||
#[derive(clap::ValueEnum, Clone, Debug)]
|
||||
enum Protocol {
|
||||
Libpq,
|
||||
Grpc,
|
||||
}
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
@@ -35,6 +45,8 @@ pub(crate) struct Args {
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
#[clap(long, value_enum, default_value = "libpq")]
|
||||
protocol: Protocol,
|
||||
/// Each client sends requests at the given rate.
|
||||
///
|
||||
/// If a request takes too long and we should be issuing a new request already,
|
||||
@@ -303,7 +315,20 @@ async fn main_impl(
|
||||
.unwrap();
|
||||
|
||||
Box::pin(async move {
|
||||
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
|
||||
let client: Box<dyn Client> = match args.protocol {
|
||||
Protocol::Libpq => Box::new(
|
||||
LibpqClient::new(args.page_service_connstring.clone(), worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
|
||||
Protocol::Grpc => Box::new(
|
||||
GrpcClient::new(args.page_service_connstring.clone(), worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
};
|
||||
run_worker(args, client, ss, cancel, rps_period, ranges, weights).await
|
||||
})
|
||||
};
|
||||
|
||||
@@ -355,23 +380,15 @@ async fn main_impl(
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
async fn client_libpq(
|
||||
async fn run_worker(
|
||||
args: &Args,
|
||||
worker_id: WorkerId,
|
||||
mut client: Box<dyn Client>,
|
||||
shared_state: Arc<SharedState>,
|
||||
cancel: CancellationToken,
|
||||
rps_period: Option<Duration>,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: rand::distributions::weighted::WeightedIndex<i128>,
|
||||
) {
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
@@ -415,12 +432,12 @@ async fn client_libpq(
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage_send(req).await.unwrap();
|
||||
client.send_get_page(req).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
}
|
||||
|
||||
let start = inflight.pop_front().unwrap();
|
||||
client.getpage_recv().await.unwrap();
|
||||
client.recv_get_page().await.unwrap();
|
||||
let end = Instant::now();
|
||||
shared_state.live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
@@ -442,3 +459,101 @@ async fn client_libpq(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A benchmark client, to allow switching out the transport protocol.
|
||||
///
|
||||
/// For simplicity, this just uses separate asynchronous send/recv methods. The send method could
|
||||
/// return a future that resolves when the response is received, but we don't really need it.
|
||||
#[async_trait]
|
||||
trait Client: Send {
|
||||
/// Sends an asynchronous GetPage request to the pageserver.
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()>;
|
||||
|
||||
/// Receives the next GetPage response from the pageserver.
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse>;
|
||||
}
|
||||
|
||||
/// A libpq-based Pageserver client.
|
||||
struct LibpqClient {
|
||||
inner: pageserver_client::page_service::PagestreamClient,
|
||||
}
|
||||
|
||||
impl LibpqClient {
|
||||
async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let inner = pageserver_client::page_service::Client::new(connstring)
|
||||
.await?
|
||||
.pagestream(ttid.tenant_id, ttid.timeline_id)
|
||||
.await?;
|
||||
Ok(Self { inner })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for LibpqClient {
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
|
||||
self.inner.getpage_send(req).await
|
||||
}
|
||||
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
self.inner.getpage_recv().await
|
||||
}
|
||||
}
|
||||
|
||||
/// A gRPC client using the raw, no-frills gRPC client.
|
||||
struct GrpcClient {
|
||||
req_tx: tokio::sync::mpsc::Sender<proto::GetPageRequest>,
|
||||
resp_rx: tonic::Streaming<proto::GetPageResponse>,
|
||||
}
|
||||
|
||||
impl GrpcClient {
|
||||
async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let mut client = pageserver_page_api::proto::PageServiceClient::connect(connstring).await?;
|
||||
|
||||
let (req_tx, req_rx) = tokio::sync::mpsc::channel(1);
|
||||
let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx);
|
||||
let mut req = tonic::Request::new(req_stream);
|
||||
let metadata = req.metadata_mut();
|
||||
metadata.insert("neon-tenant-id", ttid.tenant_id.to_string().try_into()?);
|
||||
metadata.insert("neon-timeline-id", ttid.timeline_id.to_string().try_into()?);
|
||||
metadata.insert("neon-shard-id", "0000".try_into()?);
|
||||
|
||||
let resp = client.get_pages(req).await?;
|
||||
let resp_stream = resp.into_inner();
|
||||
|
||||
Ok(Self {
|
||||
req_tx,
|
||||
resp_rx: resp_stream,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for GrpcClient {
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
|
||||
let req = proto::GetPageRequest {
|
||||
request_id: 0,
|
||||
request_class: proto::GetPageClass::Normal as i32,
|
||||
read_lsn: Some(proto::ReadLsn {
|
||||
request_lsn: req.hdr.request_lsn.0,
|
||||
not_modified_since_lsn: req.hdr.not_modified_since.0,
|
||||
}),
|
||||
rel: Some(req.rel.into()),
|
||||
block_number: vec![req.blkno],
|
||||
};
|
||||
self.req_tx.send(req).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
let resp = self.resp_rx.message().await?.unwrap();
|
||||
anyhow::ensure!(
|
||||
resp.status_code == proto::GetPageStatusCode::Ok as i32,
|
||||
"unexpected status code: {}",
|
||||
resp.status_code
|
||||
);
|
||||
Ok(PagestreamGetPageResponse {
|
||||
page: resp.page_image[0].clone(),
|
||||
req: PagestreamGetPageRequest::default(), // dummy
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -804,7 +804,7 @@ fn start_pageserver(
|
||||
} else {
|
||||
None
|
||||
},
|
||||
basebackup_cache.clone(),
|
||||
basebackup_cache,
|
||||
);
|
||||
|
||||
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for
|
||||
@@ -816,12 +816,10 @@ fn start_pageserver(
|
||||
let mut page_service_grpc = None;
|
||||
if let Some(grpc_listener) = grpc_listener {
|
||||
page_service_grpc = Some(page_service::spawn_grpc(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
grpc_auth,
|
||||
otel_guard.as_ref().map(|g| g.dispatch.clone()),
|
||||
grpc_listener,
|
||||
basebackup_cache,
|
||||
)?);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! The Page Service listens for client connections and serves their GetPage@LSN
|
||||
//! requests.
|
||||
|
||||
use std::any::Any;
|
||||
use std::borrow::Cow;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::fd::AsRawFd;
|
||||
@@ -31,6 +32,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_page_api as page_api;
|
||||
use pageserver_page_api::proto;
|
||||
use postgres_backend::{
|
||||
AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
|
||||
@@ -39,6 +41,7 @@ use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
|
||||
use smallvec::{SmallVec, smallvec};
|
||||
use strum_macros::IntoStaticStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -76,6 +79,7 @@ use crate::tenant::mgr::{
|
||||
GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
|
||||
};
|
||||
use crate::tenant::storage_layer::IoConcurrency;
|
||||
use crate::tenant::timeline::handle::{HandleUpgradeError, WeakHandle};
|
||||
use crate::tenant::timeline::{self, WaitLsnError};
|
||||
use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
|
||||
use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
|
||||
@@ -165,15 +169,14 @@ pub fn spawn(
|
||||
|
||||
/// Spawns a gRPC server for the page service.
|
||||
///
|
||||
/// TODO: move this onto GrpcPageServiceHandler::spawn().
|
||||
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
|
||||
/// need to reimplement the TCP+TLS accept loop ourselves.
|
||||
pub fn spawn_grpc(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
listener: std::net::TcpListener,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
) -> anyhow::Result<CancellableTask> {
|
||||
let cancel = CancellationToken::new();
|
||||
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
|
||||
@@ -202,21 +205,16 @@ pub fn spawn_grpc(
|
||||
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
|
||||
|
||||
// Main page service.
|
||||
let page_service_handler = PageServerHandler::new(
|
||||
let page_service_handler = GrpcPageServiceHandler {
|
||||
tenant_manager,
|
||||
auth.clone(),
|
||||
PageServicePipeliningConfig::Serial, // TODO: unused with gRPC
|
||||
conf.get_vectored_concurrent_io,
|
||||
ConnectionPerfSpanFields::default(),
|
||||
basebackup_cache,
|
||||
ctx,
|
||||
cancel.clone(),
|
||||
gate.enter().expect("just created"),
|
||||
);
|
||||
};
|
||||
|
||||
let mut received_at_interceptor = ReceivedAtInterceptor;
|
||||
let mut tenant_interceptor = TenantMetadataInterceptor;
|
||||
let mut auth_interceptor = TenantAuthInterceptor::new(auth);
|
||||
let interceptors = move |mut req: tonic::Request<()>| {
|
||||
req = received_at_interceptor.call(req)?;
|
||||
req = tenant_interceptor.call(req)?;
|
||||
req = auth_interceptor.call(req)?;
|
||||
Ok(req)
|
||||
@@ -709,6 +707,89 @@ enum PageStreamError {
|
||||
BadRequest(Cow<'static, str>),
|
||||
}
|
||||
|
||||
impl PageStreamError {
|
||||
/// Converts a PageStreamError into a proto::GetPageResponse with the appropriate status
|
||||
/// code, or a gRPC status if it should terminate the stream (e.g. shutdown). This is a
|
||||
/// convenience method for use from a get_pages gRPC stream.
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn into_get_page_response(
|
||||
self,
|
||||
request_id: page_api::RequestID,
|
||||
) -> Result<proto::GetPageResponse, tonic::Status> {
|
||||
use page_api::GetPageStatusCode;
|
||||
use tonic::Code;
|
||||
|
||||
// We dispatch to Into<tonic::Status> first, and then map it to a GetPageResponse.
|
||||
let status: tonic::Status = self.into();
|
||||
let status_code = match status.code() {
|
||||
// We shouldn't see an OK status here, because we're emitting an error.
|
||||
Code::Ok => {
|
||||
debug_assert_ne!(status.code(), Code::Ok);
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"unexpected OK status: {status:?}",
|
||||
)));
|
||||
}
|
||||
|
||||
// These are per-request errors, returned as GetPageResponses.
|
||||
Code::AlreadyExists => GetPageStatusCode::InvalidRequest,
|
||||
Code::DataLoss => GetPageStatusCode::InternalError,
|
||||
Code::FailedPrecondition => GetPageStatusCode::InvalidRequest,
|
||||
Code::InvalidArgument => GetPageStatusCode::InvalidRequest,
|
||||
Code::Internal => GetPageStatusCode::InternalError,
|
||||
Code::NotFound => GetPageStatusCode::NotFound,
|
||||
Code::OutOfRange => GetPageStatusCode::InvalidRequest,
|
||||
Code::ResourceExhausted => GetPageStatusCode::SlowDown,
|
||||
|
||||
// These should terminate the stream.
|
||||
Code::Aborted => return Err(status),
|
||||
Code::Cancelled => return Err(status),
|
||||
Code::DeadlineExceeded => return Err(status),
|
||||
Code::PermissionDenied => return Err(status),
|
||||
Code::Unauthenticated => return Err(status),
|
||||
Code::Unavailable => return Err(status),
|
||||
Code::Unimplemented => return Err(status),
|
||||
Code::Unknown => return Err(status),
|
||||
};
|
||||
|
||||
Ok(page_api::GetPageResponse {
|
||||
request_id,
|
||||
status_code,
|
||||
reason: Some(status.message().to_string()),
|
||||
page_images: SmallVec::new(),
|
||||
}
|
||||
.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageStreamError> for tonic::Status {
|
||||
fn from(err: PageStreamError) -> Self {
|
||||
use tonic::Code;
|
||||
let code = match &err {
|
||||
PageStreamError::Reconnect(_) => Code::Unavailable,
|
||||
PageStreamError::Shutdown => Code::Unavailable,
|
||||
PageStreamError::Read(err) => match err {
|
||||
PageReconstructError::Cancelled => Code::Unavailable,
|
||||
PageReconstructError::MissingKey(_) => Code::NotFound,
|
||||
PageReconstructError::AncestorLsnTimeout(err) => match err {
|
||||
WaitLsnError::Timeout(_) => Code::Internal,
|
||||
WaitLsnError::BadState(_) => Code::Internal,
|
||||
WaitLsnError::Shutdown => Code::Unavailable,
|
||||
},
|
||||
PageReconstructError::Other(_) => Code::Internal,
|
||||
PageReconstructError::WalRedo(_) => Code::Internal,
|
||||
},
|
||||
PageStreamError::LsnTimeout(err) => match err {
|
||||
WaitLsnError::Timeout(_) => Code::Internal,
|
||||
WaitLsnError::BadState(_) => Code::Internal,
|
||||
WaitLsnError::Shutdown => Code::Unavailable,
|
||||
},
|
||||
PageStreamError::NotFound(_) => Code::NotFound,
|
||||
PageStreamError::BadRequest(_) => Code::InvalidArgument,
|
||||
};
|
||||
tonic::Status::new(code, format!("{err}"))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for PageStreamError {
|
||||
fn from(value: PageReconstructError) -> Self {
|
||||
match value {
|
||||
@@ -789,37 +870,37 @@ enum BatchedFeMessage {
|
||||
Exists {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamExistsRequest,
|
||||
},
|
||||
Nblocks {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamNblocksRequest,
|
||||
},
|
||||
GetPage {
|
||||
span: Span,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
pages: SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
batch_break_reason: GetPageBatchBreakReason,
|
||||
},
|
||||
DbSize {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamDbSizeRequest,
|
||||
},
|
||||
GetSlruSegment {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamGetSlruSegmentRequest,
|
||||
},
|
||||
#[cfg(feature = "testing")]
|
||||
Test {
|
||||
span: Span,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
requests: Vec<BatchedTestRequest>,
|
||||
},
|
||||
RespondError {
|
||||
@@ -1068,26 +1149,6 @@ impl PageServerHandler {
|
||||
let neon_fe_msg =
|
||||
PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
|
||||
|
||||
// TODO: turn in to async closure once available to avoid repeating received_at
|
||||
async fn record_op_start_and_throttle(
|
||||
shard: &timeline::handle::Handle<TenantManagerTypes>,
|
||||
op: metrics::SmgrQueryType,
|
||||
received_at: Instant,
|
||||
) -> Result<SmgrOpTimer, QueryError> {
|
||||
// It's important to start the smgr op metric recorder as early as possible
|
||||
// so that the _started counters are incremented before we do
|
||||
// any serious waiting, e.g., for throttle, batching, or actual request handling.
|
||||
let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
|
||||
let now = Instant::now();
|
||||
timer.observe_throttle_start(now);
|
||||
let throttled = tokio::select! {
|
||||
res = shard.pagestream_throttle.throttle(1, now) => res,
|
||||
_ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
|
||||
};
|
||||
timer.observe_throttle_done(throttled);
|
||||
Ok(timer)
|
||||
}
|
||||
|
||||
let batched_msg = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
let shard = timeline_handles
|
||||
@@ -1095,7 +1156,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetRelExists,
|
||||
received_at,
|
||||
@@ -1113,7 +1174,7 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetRelSize,
|
||||
received_at,
|
||||
@@ -1131,7 +1192,7 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetDbSize,
|
||||
received_at,
|
||||
@@ -1149,7 +1210,7 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetSlruSegment,
|
||||
received_at,
|
||||
@@ -1274,7 +1335,7 @@ impl PageServerHandler {
|
||||
// request handler log messages contain the request-specific fields.
|
||||
let span = mkspan!(shard.tenant_shard_id.shard_slug());
|
||||
|
||||
let timer = record_op_start_and_throttle(
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
@@ -1321,7 +1382,7 @@ impl PageServerHandler {
|
||||
BatchedFeMessage::GetPage {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest {
|
||||
pages: smallvec![BatchedGetPageRequest {
|
||||
req,
|
||||
timer,
|
||||
lsn_range: LsnRange {
|
||||
@@ -1343,9 +1404,12 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer =
|
||||
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
|
||||
.await?;
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::Test,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
BatchedFeMessage::Test {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
@@ -1356,6 +1420,26 @@ impl PageServerHandler {
|
||||
Ok(Some(batched_msg))
|
||||
}
|
||||
|
||||
/// Starts a SmgrOpTimer at received_at and throttles the request.
|
||||
async fn record_op_start_and_throttle(
|
||||
shard: &timeline::handle::Handle<TenantManagerTypes>,
|
||||
op: metrics::SmgrQueryType,
|
||||
received_at: Instant,
|
||||
) -> Result<SmgrOpTimer, QueryError> {
|
||||
// It's important to start the smgr op metric recorder as early as possible
|
||||
// so that the _started counters are incremented before we do
|
||||
// any serious waiting, e.g., for throttle, batching, or actual request handling.
|
||||
let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
|
||||
let now = Instant::now();
|
||||
timer.observe_throttle_start(now);
|
||||
let throttled = tokio::select! {
|
||||
res = shard.pagestream_throttle.throttle(1, now) => res,
|
||||
_ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
|
||||
};
|
||||
timer.observe_throttle_done(throttled);
|
||||
Ok(timer)
|
||||
}
|
||||
|
||||
/// Post-condition: `batch` is Some()
|
||||
#[instrument(skip_all, level = tracing::Level::TRACE)]
|
||||
#[allow(clippy::boxed_local)]
|
||||
@@ -1453,8 +1537,11 @@ impl PageServerHandler {
|
||||
let (mut handler_results, span) = {
|
||||
// TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
|
||||
// won't fit on the stack.
|
||||
let mut boxpinned =
|
||||
Box::pin(self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx));
|
||||
let mut boxpinned = Box::pin(Self::pagestream_dispatch_batched_message(
|
||||
batch,
|
||||
io_concurrency,
|
||||
ctx,
|
||||
));
|
||||
log_slow(
|
||||
log_slow_name,
|
||||
LOG_SLOW_GETPAGE_THRESHOLD,
|
||||
@@ -1610,7 +1697,6 @@ impl PageServerHandler {
|
||||
/// Helper which dispatches a batched message to the appropriate handler.
|
||||
/// Returns a vec of results, along with the extracted trace span.
|
||||
async fn pagestream_dispatch_batched_message(
|
||||
&mut self,
|
||||
batch: BatchedFeMessage,
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
@@ -1640,10 +1726,10 @@ impl PageServerHandler {
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![
|
||||
self.handle_get_rel_exists_request(&shard, &req, &ctx)
|
||||
Self::handle_get_rel_exists_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map(|msg| (PagestreamBeMessage::Exists(msg), timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1659,10 +1745,10 @@ impl PageServerHandler {
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![
|
||||
self.handle_get_nblocks_request(&shard, &req, &ctx)
|
||||
Self::handle_get_nblocks_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map(|msg| (PagestreamBeMessage::Nblocks(msg), timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1680,16 +1766,15 @@ impl PageServerHandler {
|
||||
{
|
||||
let npages = pages.len();
|
||||
trace!(npages, "handling getpage request");
|
||||
let res = self
|
||||
.handle_get_page_at_lsn_request_batched(
|
||||
&shard,
|
||||
pages,
|
||||
io_concurrency,
|
||||
batch_break_reason,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
let res = Self::handle_get_page_at_lsn_request_batched(
|
||||
&shard,
|
||||
pages,
|
||||
io_concurrency,
|
||||
batch_break_reason,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
assert_eq!(res.len(), npages);
|
||||
res
|
||||
},
|
||||
@@ -1706,10 +1791,10 @@ impl PageServerHandler {
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![
|
||||
self.handle_db_size_request(&shard, &req, &ctx)
|
||||
Self::handle_db_size_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map(|msg| (PagestreamBeMessage::DbSize(msg), timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1725,10 +1810,10 @@ impl PageServerHandler {
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![
|
||||
self.handle_get_slru_segment_request(&shard, &req, &ctx)
|
||||
Self::handle_get_slru_segment_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map(|msg| (PagestreamBeMessage::GetSlruSegment(msg), timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1746,8 +1831,7 @@ impl PageServerHandler {
|
||||
{
|
||||
let npages = requests.len();
|
||||
trace!(npages, "handling getpage request");
|
||||
let res = self
|
||||
.handle_test_request_batch(&shard, requests, &ctx)
|
||||
let res = Self::handle_test_request_batch(&shard, requests, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
assert_eq!(res.len(), npages);
|
||||
@@ -2301,11 +2385,10 @@ impl PageServerHandler {
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_rel_exists_request(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamExistsRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
) -> Result<PagestreamExistsResponse, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
@@ -2327,19 +2410,15 @@ impl PageServerHandler {
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
|
||||
req: *req,
|
||||
exists,
|
||||
}))
|
||||
Ok(PagestreamExistsResponse { req: *req, exists })
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_nblocks_request(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamNblocksRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
) -> Result<PagestreamNblocksResponse, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
@@ -2361,19 +2440,18 @@ impl PageServerHandler {
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
|
||||
Ok(PagestreamNblocksResponse {
|
||||
req: *req,
|
||||
n_blocks,
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_db_size_request(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamDbSizeRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
) -> Result<PagestreamDbSizeResponse, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
@@ -2397,23 +2475,19 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
let db_size = total_blocks as i64 * BLCKSZ as i64;
|
||||
|
||||
Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
|
||||
req: *req,
|
||||
db_size,
|
||||
}))
|
||||
Ok(PagestreamDbSizeResponse { req: *req, db_size })
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_get_page_at_lsn_request_batched(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
requests: SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
io_concurrency: IoConcurrency,
|
||||
batch_break_reason: GetPageBatchBreakReason,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
//debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
timeline
|
||||
.query_metrics
|
||||
@@ -2532,11 +2606,10 @@ impl PageServerHandler {
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_slru_segment_request(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamGetSlruSegmentRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
) -> Result<PagestreamGetSlruSegmentResponse, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
@@ -2551,16 +2624,13 @@ impl PageServerHandler {
|
||||
.ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
|
||||
let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetSlruSegment(
|
||||
PagestreamGetSlruSegmentResponse { req: *req, segment },
|
||||
))
|
||||
Ok(PagestreamGetSlruSegmentResponse { req: *req, segment })
|
||||
}
|
||||
|
||||
// NB: this impl mimics what we do for batched getpage requests.
|
||||
#[cfg(feature = "testing")]
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_test_request_batch(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
requests: Vec<BatchedTestRequest>,
|
||||
_ctx: &RequestContext,
|
||||
@@ -3300,57 +3370,342 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements the page service over gRPC.
|
||||
/// Serves the page service over gRPC. Dispatches to PageServerHandler for request processing.
|
||||
///
|
||||
/// TODO: not yet implemented, all methods return unimplemented.
|
||||
/// TODO: add trace spans, interceptors, and sampling.
|
||||
/// TODO: rename to PageServiceHandler when libpq impl is removed.
|
||||
pub struct GrpcPageServiceHandler {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
impl GrpcPageServiceHandler {
|
||||
/// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of
|
||||
/// relations and their sizes, as well as SLRU segments and other data.
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn ensure_shard_zero(req: &tonic::Request<impl Any>) -> Result<(), tonic::Status> {
|
||||
match Self::extract::<ShardIndex>(req).shard_number.0 {
|
||||
0 => Ok(()),
|
||||
shard => Err(tonic::Status::invalid_argument(format!(
|
||||
"request must execute on shard zero (is shard {shard})",
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the given type from the request extensions. It must have been set by an
|
||||
/// interceptor.
|
||||
fn extract<T: Send + Sync + 'static>(req: &tonic::Request<impl Any>) -> &T {
|
||||
req.extensions()
|
||||
.get::<T>()
|
||||
.expect("extension should be set by interceptor")
|
||||
}
|
||||
|
||||
/// Generates a PagestreamRequest header from a ReadLsn and request ID.
|
||||
fn make_hdr(read_lsn: page_api::ReadLsn, req_id: u64) -> PagestreamRequest {
|
||||
PagestreamRequest {
|
||||
reqid: req_id,
|
||||
request_lsn: read_lsn.request_lsn,
|
||||
not_modified_since: read_lsn.not_modified_since_lsn.unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Acquires a timeline handle for the given request. The request must have been decorated by
|
||||
/// TenantMetadataInterceptor first.
|
||||
async fn get_request_timeline(
|
||||
&self,
|
||||
req: &tonic::Request<impl Any>,
|
||||
) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
|
||||
let ttid = *Self::extract::<TenantTimelineId>(req);
|
||||
let shard_index = *Self::extract::<ShardIndex>(req);
|
||||
let shard_selector = ShardSelector::Known(shard_index);
|
||||
|
||||
// TODO: untangle this from TenantManagerWrapper::resolve() and Cache::get(), to avoid the
|
||||
// unnecessary overhead.
|
||||
TimelineHandles::new(self.tenant_manager.clone())
|
||||
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Starts a SmgrOpTimer at received_at, throttles the request, and records execution start.
|
||||
/// Only errors if the timeline is shutting down.
|
||||
///
|
||||
/// TODO: revamp request timers -- in particular,
|
||||
/// TODO: consider moving throttling out and returning SlowDown errors.
|
||||
async fn record_op_start_and_throttle(
|
||||
timeline: &timeline::handle::Handle<TenantManagerTypes>,
|
||||
op: metrics::SmgrQueryType,
|
||||
received_at: Instant,
|
||||
) -> Result<SmgrOpTimer, tonic::Status> {
|
||||
let mut timer = PageServerHandler::record_op_start_and_throttle(timeline, op, received_at)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
// record_op_start_and_throttle() only returns Shutdown.
|
||||
QueryError::Shutdown => tonic::Status::unavailable(format!("{err}")),
|
||||
err => tonic::Status::internal(format!("unexpected error: {err}")),
|
||||
})?;
|
||||
timer.observe_execution_start(Instant::now());
|
||||
Ok(timer)
|
||||
}
|
||||
|
||||
/// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC.
|
||||
///
|
||||
/// NB: errors will terminate the stream. Per-request errors should return a GetPageResponse
|
||||
/// with an appropriate status code instead.
|
||||
async fn handle_get_page_request(
|
||||
ctx: &RequestContext,
|
||||
timeline: &WeakHandle<TenantManagerTypes>,
|
||||
req: proto::GetPageRequest,
|
||||
io_concurrency: IoConcurrency,
|
||||
) -> Result<proto::GetPageResponse, tonic::Status> {
|
||||
let received_at = Instant::now();
|
||||
let timeline = timeline.upgrade().map_err(|err| match err {
|
||||
HandleUpgradeError::ShutDown => tonic::Status::unavailable("timeline is shutting down"),
|
||||
})?;
|
||||
let ctx = ctx.with_scope_page_service_pagestream(&timeline);
|
||||
|
||||
// Validate the request and convert it to a Pagestream request.
|
||||
let req: page_api::GetPageRequest = req.try_into()?;
|
||||
|
||||
let effective_lsn = match PageServerHandler::effective_request_lsn(
|
||||
&timeline,
|
||||
timeline.get_last_record_lsn(),
|
||||
req.read_lsn.request_lsn,
|
||||
req.read_lsn.not_modified_since_lsn.unwrap_or_default(),
|
||||
&timeline.get_applied_gc_cutoff_lsn(),
|
||||
) {
|
||||
Ok(lsn) => lsn,
|
||||
Err(err) => return err.into_get_page_response(req.request_id),
|
||||
};
|
||||
|
||||
let mut batch = SmallVec::with_capacity(req.block_numbers.len());
|
||||
for blkno in req.block_numbers {
|
||||
// TODO: this creates one timer per page and throttles it. We should have a timer for
|
||||
// the entire batch, and throttle only the batch, but this is equivalent to what
|
||||
// PageServerHandler does already so we keep it for now.
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&timeline,
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
|
||||
batch.push(BatchedGetPageRequest {
|
||||
req: PagestreamGetPageRequest {
|
||||
hdr: Self::make_hdr(req.read_lsn, req.request_id),
|
||||
rel: req.rel,
|
||||
blkno,
|
||||
},
|
||||
lsn_range: LsnRange {
|
||||
effective_lsn,
|
||||
request_lsn: req.read_lsn.request_lsn,
|
||||
},
|
||||
timer,
|
||||
ctx: ctx.attached_child(),
|
||||
batch_wait_ctx: None, // TODO: add tracing
|
||||
});
|
||||
}
|
||||
|
||||
let results = PageServerHandler::handle_get_page_at_lsn_request_batched(
|
||||
&timeline,
|
||||
batch,
|
||||
io_concurrency,
|
||||
GetPageBatchBreakReason::BatchFull, // TODO: not relevant for gRPC batches
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut resp = page_api::GetPageResponse {
|
||||
request_id: req.request_id,
|
||||
status_code: page_api::GetPageStatusCode::Ok,
|
||||
reason: None,
|
||||
page_images: SmallVec::with_capacity(results.len()),
|
||||
};
|
||||
|
||||
for result in results {
|
||||
match result {
|
||||
Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.page_images.push(r.page),
|
||||
Ok((resp, _, _)) => {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"unexpected response: {resp:?}"
|
||||
)));
|
||||
}
|
||||
Err(err) => return err.err.into_get_page_response(req.request_id),
|
||||
};
|
||||
}
|
||||
|
||||
Ok(resp.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements the gRPC page service.
|
||||
///
|
||||
/// TODO: when the libpq impl is removed, simplify this:
|
||||
/// * Add Tower middleware for timeline handle, rate limiting, and timing.
|
||||
/// * Remove the intermediate Pagestream types.
|
||||
/// * Inline the handler code.
|
||||
#[tonic::async_trait]
|
||||
impl proto::PageService for PageServerHandler {
|
||||
impl proto::PageService for GrpcPageServiceHandler {
|
||||
type GetBaseBackupStream = Pin<
|
||||
Box<dyn Stream<Item = Result<proto::GetBaseBackupResponseChunk, tonic::Status>> + Send>,
|
||||
>;
|
||||
|
||||
type GetPagesStream =
|
||||
Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
|
||||
|
||||
async fn check_rel_exists(
|
||||
&self,
|
||||
_: tonic::Request<proto::CheckRelExistsRequest>,
|
||||
req: tonic::Request<proto::CheckRelExistsRequest>,
|
||||
) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
let received_at = Self::extract::<ReceivedAt>(&req).0;
|
||||
let timeline = self.get_request_timeline(&req).await?;
|
||||
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
|
||||
|
||||
// Validate the request and convert it to a Pagestream request.
|
||||
Self::ensure_shard_zero(&req)?;
|
||||
let req: page_api::CheckRelExistsRequest = req.into_inner().try_into()?;
|
||||
let req = PagestreamExistsRequest {
|
||||
hdr: Self::make_hdr(req.read_lsn, 0),
|
||||
rel: req.rel,
|
||||
};
|
||||
|
||||
// Execute the request and convert the response.
|
||||
let _timer = Self::record_op_start_and_throttle(
|
||||
&timeline,
|
||||
metrics::SmgrQueryType::GetRelExists,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp = PageServerHandler::handle_get_rel_exists_request(&timeline, &req, &ctx).await?;
|
||||
let resp: page_api::CheckRelExistsResponse = resp.exists;
|
||||
Ok(tonic::Response::new(resp.into()))
|
||||
}
|
||||
|
||||
async fn get_base_backup(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetBaseBackupRequest>,
|
||||
) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
Err(tonic::Status::unimplemented("not implemented")) // TODO
|
||||
}
|
||||
|
||||
async fn get_db_size(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetDbSizeRequest>,
|
||||
req: tonic::Request<proto::GetDbSizeRequest>,
|
||||
) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
let received_at = Self::extract::<ReceivedAt>(&req).0;
|
||||
let timeline = self.get_request_timeline(&req).await?;
|
||||
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
|
||||
|
||||
// Validate the request and convert it to a Pagestream request.
|
||||
Self::ensure_shard_zero(&req)?;
|
||||
let req: page_api::GetDbSizeRequest = req.into_inner().try_into()?;
|
||||
let req = PagestreamDbSizeRequest {
|
||||
hdr: Self::make_hdr(req.read_lsn, 0),
|
||||
dbnode: req.db_oid,
|
||||
};
|
||||
|
||||
// Execute the request and convert the response.
|
||||
let _timer = Self::record_op_start_and_throttle(
|
||||
&timeline,
|
||||
metrics::SmgrQueryType::GetDbSize,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp = PageServerHandler::handle_db_size_request(&timeline, &req, &ctx).await?;
|
||||
let resp = resp.db_size as page_api::GetDbSizeResponse;
|
||||
Ok(tonic::Response::new(resp.into()))
|
||||
}
|
||||
|
||||
async fn get_pages(
|
||||
&self,
|
||||
_: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
|
||||
req: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
|
||||
) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
// Extract the timeline from the request and check that it exists.
|
||||
let ttid = *Self::extract::<TenantTimelineId>(&req);
|
||||
let shard_index = *Self::extract::<ShardIndex>(&req);
|
||||
let shard_selector = ShardSelector::Known(shard_index);
|
||||
|
||||
let mut handles = TimelineHandles::new(self.tenant_manager.clone());
|
||||
handles
|
||||
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
|
||||
.await?;
|
||||
|
||||
let ctx = self.ctx.attached_child();
|
||||
let mut reqs = req.into_inner();
|
||||
|
||||
let resps = async_stream::try_stream! {
|
||||
let timeline = handles
|
||||
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
|
||||
.await?
|
||||
.downgrade();
|
||||
while let Some(req) = reqs.message().await? {
|
||||
// TODO: implement IoConcurrency sidecar.
|
||||
yield Self::handle_get_page_request(&ctx, &timeline, req, IoConcurrency::Sequential).await?
|
||||
}
|
||||
};
|
||||
|
||||
Ok(tonic::Response::new(Box::pin(resps)))
|
||||
}
|
||||
|
||||
async fn get_rel_size(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetRelSizeRequest>,
|
||||
req: tonic::Request<proto::GetRelSizeRequest>,
|
||||
) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
let received_at = Self::extract::<ReceivedAt>(&req).0;
|
||||
let timeline = self.get_request_timeline(&req).await?;
|
||||
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
|
||||
|
||||
// Validate the request and convert it to a Pagestream request.
|
||||
Self::ensure_shard_zero(&req)?;
|
||||
let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?;
|
||||
let req = PagestreamNblocksRequest {
|
||||
hdr: Self::make_hdr(req.read_lsn, 0),
|
||||
rel: req.rel,
|
||||
};
|
||||
|
||||
// Execute the request and convert the response.
|
||||
let _timer = Self::record_op_start_and_throttle(
|
||||
&timeline,
|
||||
metrics::SmgrQueryType::GetRelSize,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp = PageServerHandler::handle_get_nblocks_request(&timeline, &req, &ctx).await?;
|
||||
let resp: page_api::GetRelSizeResponse = resp.n_blocks;
|
||||
Ok(tonic::Response::new(resp.into()))
|
||||
}
|
||||
|
||||
async fn get_slru_segment(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetSlruSegmentRequest>,
|
||||
req: tonic::Request<proto::GetSlruSegmentRequest>,
|
||||
) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
let received_at = Self::extract::<ReceivedAt>(&req).0;
|
||||
let timeline = self.get_request_timeline(&req).await?;
|
||||
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
|
||||
|
||||
// Validate the request and convert it to a Pagestream request.
|
||||
Self::ensure_shard_zero(&req)?;
|
||||
let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?;
|
||||
let req = PagestreamGetSlruSegmentRequest {
|
||||
hdr: Self::make_hdr(req.read_lsn, 0),
|
||||
kind: req.kind as u8,
|
||||
segno: req.segno,
|
||||
};
|
||||
|
||||
// Execute the request and convert the response.
|
||||
let _timer = Self::record_op_start_and_throttle(
|
||||
&timeline,
|
||||
metrics::SmgrQueryType::GetSlruSegment,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp =
|
||||
PageServerHandler::handle_get_slru_segment_request(&timeline, &req, &ctx).await?;
|
||||
let resp: page_api::GetSlruSegmentResponse = resp.segment;
|
||||
Ok(tonic::Response::new(resp.try_into()?))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3370,10 +3725,24 @@ impl From<GetActiveTenantError> for QueryError {
|
||||
}
|
||||
}
|
||||
|
||||
/// gRPC interceptor that records the start time of request processing as a ReceivedAt extension.
|
||||
///
|
||||
/// TODO: generalize this for other observability information.
|
||||
#[derive(Clone)]
|
||||
struct ReceivedAtInterceptor;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ReceivedAt(Instant);
|
||||
|
||||
impl tonic::service::Interceptor for ReceivedAtInterceptor {
|
||||
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
|
||||
req.extensions_mut().insert(ReceivedAt(Instant::now()));
|
||||
Ok(req)
|
||||
}
|
||||
}
|
||||
|
||||
/// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type
|
||||
/// TenantTimelineId and ShardIndex.
|
||||
///
|
||||
/// TODO: consider looking up the timeline handle here and storing it.
|
||||
#[derive(Clone)]
|
||||
struct TenantMetadataInterceptor;
|
||||
|
||||
@@ -3486,14 +3855,36 @@ impl From<GetActiveTimelineError> for QueryError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
|
||||
fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
|
||||
impl From<HandleUpgradeError> for QueryError {
|
||||
fn from(e: HandleUpgradeError) -> Self {
|
||||
match e {
|
||||
crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
|
||||
HandleUpgradeError::ShutDown => QueryError::Shutdown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTimelineError> for tonic::Status {
|
||||
fn from(err: GetActiveTimelineError) -> Self {
|
||||
use tonic::Code;
|
||||
let code = match &err {
|
||||
GetActiveTimelineError::Tenant(err) => match err {
|
||||
GetActiveTenantError::Broken(_) => Code::Internal,
|
||||
GetActiveTenantError::Cancelled => Code::Unavailable,
|
||||
GetActiveTenantError::NotFound(_) => Code::NotFound,
|
||||
GetActiveTenantError::SwitchedTenant => Code::Unavailable,
|
||||
GetActiveTenantError::WaitForActiveTimeout { .. } => Code::Unavailable,
|
||||
GetActiveTenantError::WillNotBecomeActive(_) => Code::Unavailable,
|
||||
},
|
||||
GetActiveTimelineError::Timeline(err) => match err {
|
||||
GetTimelineError::NotFound { .. } => Code::NotFound,
|
||||
GetTimelineError::NotActive { .. } => Code::Unavailable,
|
||||
GetTimelineError::ShuttingDown => Code::Unavailable,
|
||||
},
|
||||
};
|
||||
tonic::Status::new(code, format!("{err}"))
|
||||
}
|
||||
}
|
||||
|
||||
fn set_tracing_field_shard_id(timeline: &Timeline) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
tracing::Span::current().record(
|
||||
|
||||
@@ -274,7 +274,7 @@ impl Timeline {
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<Bytes, PageReconstructError>> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
//debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let mut slots_filled = 0;
|
||||
let page_count = pages.len();
|
||||
|
||||
Reference in New Issue
Block a user