From b5373de208b108c47e985b214faf2717a8337dbf Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 29 Apr 2025 17:59:27 +0200 Subject: [PATCH] page_api: add `get_slru_segment()` --- pageserver/page_api/proto/page_service.proto | 47 ++++++++++++++------ pageserver/page_api/src/model.rs | 22 +++++++++ pageserver/src/compute_service_grpc.rs | 37 +++++++++++++++ 3 files changed, 93 insertions(+), 13 deletions(-) diff --git a/pageserver/page_api/proto/page_service.proto b/pageserver/page_api/proto/page_service.proto index d9ddf41794..295b689c38 100644 --- a/pageserver/page_api/proto/page_service.proto +++ b/pageserver/page_api/proto/page_service.proto @@ -1,4 +1,4 @@ -// Page service presented by pageservers, for computes +// Page service presented by pageservers, for computes. // // Each request must come with the following metadata: // - neon-tenant-id @@ -12,21 +12,32 @@ syntax = "proto3"; package page_service; service PageService { - rpc RelExists(RelExistsRequest) returns (RelExistsResponse); - - // Returns size of a relation, as # of blocks - rpc RelSize (RelSizeRequest) returns (RelSizeResponse); - - // Fetches a page. - rpc GetPage (GetPageRequest) returns (GetPageResponse); - - // Streaming GetPage protocol. - rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse); - - // Returns total size of a database, as # of bytes + // Returns the total size of a database, as # of bytes. rpc DbSize (DbSizeRequest) returns (DbSizeResponse); + // Returns whether a relation exists. + rpc RelExists(RelExistsRequest) returns (RelExistsResponse); + + // Returns the size of a relation, as # of blocks. + rpc RelSize (RelSizeRequest) returns (RelSizeResponse); + + // Fetches a base backup. rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk); + + // Fetches a page. + // TODO: remove this, use GetPages. + rpc GetPage (GetPageRequest) returns (GetPageResponse); + + // Fetches pages. + // + // This is implemented as a bidirectional streaming RPC for performance. Unary + // requests incur costs for e.g. HTTP/2 stream setup, header parsing, + // authentication, and so on -- with streaming, we only pay these costs during + // the initial stream setup. This doubles performance in benchmarks. + rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse); + + // Fetches an SLRU segment. + rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse); } message RequestCommon { @@ -86,3 +97,13 @@ message GetBaseBackupRequest { message GetBaseBackupResponseChunk { bytes chunk = 1; } + +message GetSlruSegmentRequest { + RequestCommon common = 1; + uint32 kind = 2; + uint32 segno = 3; +} + +message GetSlruSegmentResponse { + bytes segment = 1; +} \ No newline at end of file diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 30b5f0be52..1d028e5075 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -76,6 +76,13 @@ pub struct GetBaseBackupRequest { pub replica: bool, } +#[derive(Clone, Debug)] +pub struct GetSlruSegmentRequest { + pub common: RequestCommon, + pub kind: u8, // TODO: SlruKind + pub segno: u32, +} + //--- Conversions to/from the generated proto types use thiserror::Error; @@ -240,3 +247,18 @@ impl TryFrom<&proto::GetBaseBackupRequest> for GetBaseBackupRequest { }) } } + +impl TryFrom<&proto::GetSlruSegmentRequest> for GetSlruSegmentRequest { + type Error = ProtocolError; + + fn try_from(value: &proto::GetSlruSegmentRequest) -> Result { + Ok(GetSlruSegmentRequest { + common: (&value.common.ok_or(ProtocolError::Missing("common"))?).into(), + kind: value + .kind + .try_into() + .or(Err(ProtocolError::InvalidValue("kind")))?, + segno: value.segno, + }) + } +} diff --git a/pageserver/src/compute_service_grpc.rs b/pageserver/src/compute_service_grpc.rs index 76e4289c84..33087fc311 100644 --- a/pageserver/src/compute_service_grpc.rs +++ b/pageserver/src/compute_service_grpc.rs @@ -33,6 +33,7 @@ use crate::tenant::storage_layer::IoConcurrency; use crate::tenant::timeline::WaitLsnTimeout; use async_stream::try_stream; use futures::Stream; +use pageserver_api::reltag::SlruKind; use tokio::io::{AsyncWriteExt, ReadHalf, SimplexStream}; use tokio::task::JoinHandle; use tokio_util::codec::{Decoder, FramedRead}; @@ -493,6 +494,42 @@ impl PageService for PageServiceService { Ok(tonic::Response::new(response)) } + + async fn get_slru_segment( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let ttid = self.extract_ttid(request.metadata())?; + let req: model::GetSlruSegmentRequest = request.get_ref().try_into()?; + + let span = tracing::info_span!("get_slru_segment", tenant_id = %ttid.tenant_id, timeline_id = %ttid.timeline_id, kind = %req.kind, segno = %req.segno, req_lsn = %req.common.request_lsn); + + async { + let timeline = self.get_timeline(ttid, ShardSelector::Zero).await?; + let ctx = self.ctx.with_scope_timeline(&timeline); + let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); + let lsn = Self::wait_or_get_last_lsn( + &timeline, + req.common.request_lsn, + req.common.not_modified_since_lsn, + &latest_gc_cutoff_lsn, + &ctx, + ) + .await?; + + let kind = SlruKind::from_repr(req.kind) + .ok_or(tonic::Status::from_error("invalid SLRU kind".into()))?; + let segment = timeline + .get_slru_segment(kind, req.segno, lsn, &ctx) + .await?; + + Ok(tonic::Response::new(proto::GetSlruSegmentResponse { + segment, + })) + } + .instrument(span) + .await + } } /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].