diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index a3029e67a5..74b814e06a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -5,6 +5,7 @@ use std::{ }; use byteorder::{BigEndian, ReadBytesExt}; +use postgres_ffi::BLCKSZ; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use strum_macros; @@ -570,6 +571,7 @@ pub enum PagestreamFeMessage { Nblocks(PagestreamNblocksRequest), GetPage(PagestreamGetPageRequest), DbSize(PagestreamDbSizeRequest), + GetSlruSegment(PagestreamGetSlruSegmentRequest), } // Wrapped in libpq CopyData @@ -579,6 +581,7 @@ pub enum PagestreamBeMessage { GetPage(PagestreamGetPageResponse), Error(PagestreamErrorResponse), DbSize(PagestreamDbSizeResponse), + GetSlruSegment(PagestreamGetSlruSegmentResponse), } #[derive(Debug, PartialEq, Eq)] @@ -610,6 +613,14 @@ pub struct PagestreamDbSizeRequest { pub dbnode: u32, } +#[derive(Debug, PartialEq, Eq)] +pub struct PagestreamGetSlruSegmentRequest { + pub latest: bool, + pub lsn: Lsn, + pub kind: u8, + pub segno: u32, +} + #[derive(Debug)] pub struct PagestreamExistsResponse { pub exists: bool, @@ -625,6 +636,11 @@ pub struct PagestreamGetPageResponse { pub page: Bytes, } +#[derive(Debug)] +pub struct PagestreamGetSlruSegmentResponse { + pub segment: Bytes, +} + #[derive(Debug)] pub struct PagestreamErrorResponse { pub message: String, @@ -677,6 +693,14 @@ impl PagestreamFeMessage { bytes.put_u64(req.lsn.0); bytes.put_u32(req.dbnode); } + + Self::GetSlruSegment(req) => { + bytes.put_u8(4); + bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.lsn.0); + bytes.put_u8(req.kind); + bytes.put_u32(req.segno); + } } bytes.into() @@ -727,6 +751,14 @@ impl PagestreamFeMessage { lsn: Lsn::from(body.read_u64::()?), dbnode: body.read_u32::()?, })), + 4 => Ok(PagestreamFeMessage::GetSlruSegment( + PagestreamGetSlruSegmentRequest { + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), + kind: body.read_u8()?, + segno: body.read_u32::()?, + }, + )), _ => bail!("unknown smgr message tag: {:?}", msg_tag), } } @@ -761,6 +793,12 @@ impl PagestreamBeMessage { bytes.put_u8(104); /* tag from pagestore_client.h */ bytes.put_i64(resp.db_size); } + + Self::GetSlruSegment(resp) => { + bytes.put_u8(105); /* tag from pagestore_client.h */ + bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); + bytes.put(&resp.segment[..]); + } } bytes.into() diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index 33402ca8ba..eba36b04c6 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -108,9 +108,22 @@ impl RelTag { /// These files are divided into segments, which are divided into /// pages of the same BLCKSZ as used for relation files. /// -#[derive(Debug, Clone, Copy, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +#[derive( + Debug, + Clone, + Copy, + strum_macros::FromRepr, + Hash, + Serialize, + Deserialize, + PartialEq, + Eq, + PartialOrd, + Ord, +)] +#[repr(u8)] pub enum SlruKind { - Clog, + Clog = 0, MultiXactMembers, MultiXactOffsets, } diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index ed452eae7d..8031c22b5e 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -139,6 +139,8 @@ where async fn send_tarball(mut self) -> anyhow::Result<()> { // TODO include checksum + let on_demand_slru_download = true; // TODO: should it be feature flag, config parameter or whatever else ? + // Create pgdata subdirs structure for dir in PGDATA_SUBDIRS.iter() { let header = new_tar_header_dir(dir)?; @@ -165,19 +167,20 @@ where .context("could not add config file to basebackup tarball")?; } } - - // Gather non-relational files from object storage pages. - for kind in [ - SlruKind::Clog, - SlruKind::MultiXactOffsets, - SlruKind::MultiXactMembers, - ] { - for segno in self - .timeline - .list_slru_segments(kind, self.lsn, self.ctx) - .await? - { - self.add_slru_segment(kind, segno).await?; + if !on_demand_slru_download { + // Gather non-relational files from object storage pages. + for kind in [ + SlruKind::Clog, + SlruKind::MultiXactOffsets, + SlruKind::MultiXactMembers, + ] { + for segno in self + .timeline + .list_slru_segments(kind, self.lsn, self.ctx) + .await? + { + self.add_slru_segment(kind, segno).await?; + } } } diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 7cc0333ee5..e3aaf38b9f 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -961,6 +961,7 @@ pub enum SmgrQueryType { GetRelSize, GetPageAtLsn, GetDbSize, + GetSlruSegment, } #[derive(Debug)] @@ -1030,6 +1031,7 @@ mod smgr_query_time_tests { (GetRelSize, "get_rel_size"), (GetPageAtLsn, "get_page_at_lsn"), (GetDbSize, "get_db_size"), + (GetSlruSegment, "get_slru_segment"), ]; for (op, expect) in expect { let actual: &'static str = op.into(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d5ca7f7382..a6ec14b4b4 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -19,7 +19,8 @@ use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, - PagestreamNblocksRequest, PagestreamNblocksResponse, + PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, + PagestreamNblocksResponse, }; use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError}; use pq_proto::framed::ConnectionError; @@ -64,6 +65,7 @@ use crate::tenant::mgr::ShardSelector; use crate::tenant::Timeline; use crate::trace::Tracer; +use pageserver_api::reltag::SlruKind; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; @@ -518,6 +520,16 @@ impl PageServerHandler { span, ) } + PagestreamFeMessage::GetSlruSegment(req) => { + let _timer = metrics.start_timer(metrics::SmgrQueryType::GetSlruSegment); + let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn); + ( + self.handle_get_slru_segment_request(&timeline, &req, &ctx) + .instrument(span.clone()) + .await, + span, + ) + } }; if let Err(e) = &response { @@ -862,6 +874,25 @@ impl PageServerHandler { })) } + async fn handle_get_slru_segment_request( + &self, + timeline: &Timeline, + req: &PagestreamGetSlruSegmentRequest, + ctx: &RequestContext, + ) -> anyhow::Result { + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); + let lsn = + Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) + .await?; + + let kind = SlruKind::from_repr(req.kind).ok_or(anyhow::anyhow!("invalid SLRU kind"))?; + let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?; + + Ok(PagestreamBeMessage::GetSlruSegment( + PagestreamGetSlruSegmentResponse { segment }, + )) + } + #[allow(clippy::too_many_arguments)] #[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))] async fn handle_basebackup_request( diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index b81037ae47..e0769671bb 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -12,7 +12,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::repository::*; use crate::walrecord::NeonWalRecord; use anyhow::Context; -use bytes::{Buf, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; use pageserver_api::key::is_rel_block_key; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; @@ -314,6 +314,25 @@ impl Timeline { } } + /// Get the whole SLRU segment + pub async fn get_slru_segment( + &self, + kind: SlruKind, + segno: u32, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result { + let n_blocks = self.get_slru_segment_size(kind, segno, lsn, ctx).await?; + let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize); + for blkno in 0..n_blocks { + let block = self + .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx) + .await?; + segment.extend_from_slice(&block[..BLCKSZ as usize]); + } + Ok(segment.freeze()) + } + /// Look up given SLRU page version. pub async fn get_slru_page_at_lsn( &self, diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 6612350c90..ad98f7a4c9 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -16,6 +16,7 @@ #include "postgres.h" #include "neon_pgversioncompat.h" +#include "access/slru.h" #include "access/xlogdefs.h" #include RELFILEINFO_HDR #include "storage/block.h" @@ -37,6 +38,7 @@ typedef enum T_NeonNblocksRequest, T_NeonGetPageRequest, T_NeonDbSizeRequest, + T_NeonGetSlruSegmentRequest, /* pagestore -> pagestore_client */ T_NeonExistsResponse = 100, @@ -44,6 +46,7 @@ typedef enum T_NeonGetPageResponse, T_NeonErrorResponse, T_NeonDbSizeResponse, + T_NeonGetSlruSegmentResponse, } NeonMessageTag; /* base struct for c-style inheritance */ @@ -104,6 +107,13 @@ typedef struct BlockNumber blkno; } NeonGetPageRequest; +typedef struct +{ + NeonRequest req; + SlruKind kind; + int segno; +} NeonGetSlruSegmentRequest; + /* supertype of all the Neon*Response structs below */ typedef struct { @@ -143,6 +153,14 @@ typedef struct * message */ } NeonErrorResponse; +typedef struct +{ + NeonMessageTag tag; + int n_blocks; + char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT]; +} NeonGetSlruSegmentResponse; + + extern StringInfoData nm_pack_request(NeonRequest *msg); extern NeonResponse *nm_unpack_response(StringInfo s); extern char *nm_to_string(NeonMessage *msg); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 08ce24b570..b86dba6c89 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -1034,12 +1034,25 @@ nm_pack_request(NeonRequest *msg) break; } + case T_NeonGetSlruSegmentRequest: + { + NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg; + + pq_sendbyte(&s, msg_req->req.latest); + pq_sendint64(&s, msg_req->req.lsn); + pq_sendbyte(&s, msg_req->kind); + pq_sendint32(&s, msg_req->segno); + + break; + } + /* pagestore -> pagestore_client. We never need to create these. */ case T_NeonExistsResponse: case T_NeonNblocksResponse: case T_NeonGetPageResponse: case T_NeonErrorResponse: case T_NeonDbSizeResponse: + case T_NeonGetSlruSegmentResponse: default: neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag); break; @@ -1126,6 +1139,20 @@ nm_unpack_response(StringInfo s) break; } + case T_NeonGetSlruSegmentResponse: + { + NeonGetSlruSegmentResponse *msg_resp; + int n_blocks = pq_getmsgint(s, 4); + msg_resp = palloc(sizeof(NeonGetSlruSegmentResponse)); + msg_resp->tag = tag; + msg_resp->n_blocks = n_blocks; + memcpy(msg_resp->data, pq_getmsgbytes(s, n_blocks * BLCKSZ), n_blocks * BLCKSZ); + pq_getmsgend(s); + + resp = (NeonResponse *) msg_resp; + break; + } + /* * pagestore_client -> pagestore * @@ -1135,6 +1162,7 @@ nm_unpack_response(StringInfo s) case T_NeonNblocksRequest: case T_NeonGetPageRequest: case T_NeonDbSizeRequest: + case T_NeonGetSlruSegmentRequest: default: neon_log(ERROR, "unexpected neon message tag 0x%02x", tag); break; @@ -1204,7 +1232,18 @@ nm_to_string(NeonMessage *msg) appendStringInfoChar(&s, '}'); break; } + case T_NeonGetSlruSegmentRequest: + { + NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg; + appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentRequest\""); + appendStringInfo(&s, ", \"kind\": %u", msg_req->kind); + appendStringInfo(&s, ", \"segno\": %u", msg_req->segno); + appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); + appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); + appendStringInfoChar(&s, '}'); + break; + } /* pagestore -> pagestore_client */ case T_NeonExistsResponse: { @@ -1258,6 +1297,17 @@ nm_to_string(NeonMessage *msg) msg_resp->db_size); appendStringInfoChar(&s, '}'); + break; + } + case T_NeonGetSlruSegmentResponse: + { + NeonGetSlruSegmentResponse *msg_resp = (NeonGetSlruSegmentResponse *) msg; + + appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentResponse\""); + appendStringInfo(&s, ", \"n_blocks\": %u}", + msg_resp->n_blocks); + appendStringInfoChar(&s, '}'); + break; } @@ -2727,6 +2777,61 @@ neon_end_unlogged_build(SMgrRelation reln) unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; } +static int +neon_read_slru_segment(SMgrRelation reln, SlruKind kind, int segno, void* buffer) +{ + XLogRecPtr request_lsn; + /* TODO: any better alternative than flush LSN? Actually we to request SLRU at basebackup creation time... */ +#if PG_VERSION_NUM >= 150000 + request_lsn = GetFlushRecPtr(NULL); +#else + request_lsn = GetFlushRecPtr(); +#endif + NeonResponse *resp; + shardno_t shard_no = 0; /* SLRU are at the zero shard */ + NeonGetSlruSegmentRequest request = { + .req.tag = T_NeonGetSlruSegmentRequest, + .req.latest = false, + .req.lsn = request_lsn, + + .kind = kind, + .segno = segno + }; + int n_blocks; + + do + { + while (!page_server->send(shard_no, &request.req) || !page_server->flush(shard_no)); + consume_prefetch_responses(); + resp = page_server->receive(shard_no); + } while (resp == NULL); + + switch (resp->tag) + { + case T_NeonGetSlruSegmentResponse: + n_blocks = ((NeonGetSlruSegmentResponse *) resp)->n_blocks; + memcpy(buffer, ((NeonGetSlruSegmentResponse *) resp)->data, n_blocks*BLCKSZ); + break; + + case T_NeonErrorResponse: + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg(NEON_TAG "could not read SLRU %d segment %d at lsn %X/%08X", + kind, + segno, + (uint32) (request_lsn >> 32), (uint32) request_lsn), + errdetail("page server returned error: %s", + ((NeonErrorResponse *) resp)->message))); + break; + + default: + neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); + } + pfree(resp); + + return n_blocks; +} + static void AtEOXact_neon(XactEvent event, void *arg) { @@ -2785,6 +2890,8 @@ static const struct f_smgr neon_smgr = .smgr_start_unlogged_build = neon_start_unlogged_build, .smgr_finish_unlogged_build_phase_1 = neon_finish_unlogged_build_phase_1, .smgr_end_unlogged_build = neon_end_unlogged_build, + + .smgr_read_slru_segment = neon_read_slru_segment, }; const f_smgr * diff --git a/trace/src/main.rs b/trace/src/main.rs index ddd970e95d..4605c124e9 100644 --- a/trace/src/main.rs +++ b/trace/src/main.rs @@ -60,6 +60,7 @@ fn analyze_trace(mut reader: R) { match msg { PagestreamFeMessage::Exists(_) => {} PagestreamFeMessage::Nblocks(_) => {} + PagestreamFeMessage::GetSlruSegment(_) => {} PagestreamFeMessage::GetPage(req) => { total += 1; diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index dd067cf656..bab16da2bb 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit dd067cf656f6810a25aca6025633d32d02c5085a +Subproject commit bab16da2bb1aba792849b4e5429671122abf0732 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index bc88f53931..af6924bda1 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit bc88f539312fcc4bb292ce94ae9db09ab6656e8a +Subproject commit af6924bda1a6d3bbe19105d6879f82aaff837fc2 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index e3a22b7292..2db4813086 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit e3a22b72922055f9212eca12700190f118578362 +Subproject commit 2db481308645758214bb327d74558e9f846f43e9 diff --git a/vendor/revisions.json b/vendor/revisions.json index c4cea208ee..b2adaec500 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,5 @@ { - "postgres-v16": "e3a22b72922055f9212eca12700190f118578362", - "postgres-v15": "bc88f539312fcc4bb292ce94ae9db09ab6656e8a", - "postgres-v14": "dd067cf656f6810a25aca6025633d32d02c5085a" + "postgres-v16": "2db481308645758214bb327d74558e9f846f43e9", + "postgres-v15": "af6924bda1a6d3bbe19105d6879f82aaff837fc2", + "postgres-v14": "bab16da2bb1aba792849b4e5429671122abf0732" }