mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
page_api: add get_slru_segment()
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
// Page service presented by pageservers, for computes
|
||||
// Page service presented by pageservers, for computes.
|
||||
//
|
||||
// Each request must come with the following metadata:
|
||||
// - neon-tenant-id
|
||||
@@ -12,21 +12,32 @@ syntax = "proto3";
|
||||
package page_service;
|
||||
|
||||
service PageService {
|
||||
rpc RelExists(RelExistsRequest) returns (RelExistsResponse);
|
||||
|
||||
// Returns size of a relation, as # of blocks
|
||||
rpc RelSize (RelSizeRequest) returns (RelSizeResponse);
|
||||
|
||||
// Fetches a page.
|
||||
rpc GetPage (GetPageRequest) returns (GetPageResponse);
|
||||
|
||||
// Streaming GetPage protocol.
|
||||
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
|
||||
|
||||
// Returns total size of a database, as # of bytes
|
||||
// Returns the total size of a database, as # of bytes.
|
||||
rpc DbSize (DbSizeRequest) returns (DbSizeResponse);
|
||||
|
||||
// Returns whether a relation exists.
|
||||
rpc RelExists(RelExistsRequest) returns (RelExistsResponse);
|
||||
|
||||
// Returns the size of a relation, as # of blocks.
|
||||
rpc RelSize (RelSizeRequest) returns (RelSizeResponse);
|
||||
|
||||
// Fetches a base backup.
|
||||
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
|
||||
|
||||
// Fetches a page.
|
||||
// TODO: remove this, use GetPages.
|
||||
rpc GetPage (GetPageRequest) returns (GetPageResponse);
|
||||
|
||||
// Fetches pages.
|
||||
//
|
||||
// This is implemented as a bidirectional streaming RPC for performance. Unary
|
||||
// requests incur costs for e.g. HTTP/2 stream setup, header parsing,
|
||||
// authentication, and so on -- with streaming, we only pay these costs during
|
||||
// the initial stream setup. This doubles performance in benchmarks.
|
||||
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
|
||||
|
||||
// Fetches an SLRU segment.
|
||||
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
|
||||
}
|
||||
|
||||
message RequestCommon {
|
||||
@@ -86,3 +97,13 @@ message GetBaseBackupRequest {
|
||||
message GetBaseBackupResponseChunk {
|
||||
bytes chunk = 1;
|
||||
}
|
||||
|
||||
message GetSlruSegmentRequest {
|
||||
RequestCommon common = 1;
|
||||
uint32 kind = 2;
|
||||
uint32 segno = 3;
|
||||
}
|
||||
|
||||
message GetSlruSegmentResponse {
|
||||
bytes segment = 1;
|
||||
}
|
||||
@@ -76,6 +76,13 @@ pub struct GetBaseBackupRequest {
|
||||
pub replica: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct GetSlruSegmentRequest {
|
||||
pub common: RequestCommon,
|
||||
pub kind: u8, // TODO: SlruKind
|
||||
pub segno: u32,
|
||||
}
|
||||
|
||||
//--- Conversions to/from the generated proto types
|
||||
|
||||
use thiserror::Error;
|
||||
@@ -240,3 +247,18 @@ impl TryFrom<&proto::GetBaseBackupRequest> for GetBaseBackupRequest {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&proto::GetSlruSegmentRequest> for GetSlruSegmentRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(value: &proto::GetSlruSegmentRequest) -> Result<Self, Self::Error> {
|
||||
Ok(GetSlruSegmentRequest {
|
||||
common: (&value.common.ok_or(ProtocolError::Missing("common"))?).into(),
|
||||
kind: value
|
||||
.kind
|
||||
.try_into()
|
||||
.or(Err(ProtocolError::InvalidValue("kind")))?,
|
||||
segno: value.segno,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::tenant::storage_layer::IoConcurrency;
|
||||
use crate::tenant::timeline::WaitLsnTimeout;
|
||||
use async_stream::try_stream;
|
||||
use futures::Stream;
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use tokio::io::{AsyncWriteExt, ReadHalf, SimplexStream};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::codec::{Decoder, FramedRead};
|
||||
@@ -493,6 +494,42 @@ impl PageService for PageServiceService {
|
||||
|
||||
Ok(tonic::Response::new(response))
|
||||
}
|
||||
|
||||
async fn get_slru_segment(
|
||||
&self,
|
||||
request: tonic::Request<proto::GetSlruSegmentRequest>,
|
||||
) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
|
||||
let ttid = self.extract_ttid(request.metadata())?;
|
||||
let req: model::GetSlruSegmentRequest = request.get_ref().try_into()?;
|
||||
|
||||
let span = tracing::info_span!("get_slru_segment", tenant_id = %ttid.tenant_id, timeline_id = %ttid.timeline_id, kind = %req.kind, segno = %req.segno, req_lsn = %req.common.request_lsn);
|
||||
|
||||
async {
|
||||
let timeline = self.get_timeline(ttid, ShardSelector::Zero).await?;
|
||||
let ctx = self.ctx.with_scope_timeline(&timeline);
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
&timeline,
|
||||
req.common.request_lsn,
|
||||
req.common.not_modified_since_lsn,
|
||||
&latest_gc_cutoff_lsn,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let kind = SlruKind::from_repr(req.kind)
|
||||
.ok_or(tonic::Status::from_error("invalid SLRU kind".into()))?;
|
||||
let segment = timeline
|
||||
.get_slru_segment(kind, req.segno, lsn, &ctx)
|
||||
.await?;
|
||||
|
||||
Ok(tonic::Response::new(proto::GetSlruSegmentResponse {
|
||||
segment,
|
||||
}))
|
||||
}
|
||||
.instrument(span)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
|
||||
|
||||
Reference in New Issue
Block a user