diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 2d971a9b96..76f0e8c0e0 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -798,10 +798,9 @@ pub mod virtual_file { pub enum PagestreamFeMessage { Exists(PagestreamExistsRequest), Nblocks(PagestreamNblocksRequest), - GetLatestPage(PagestreamGetLatestPageRequest), // for compatinility with old clients + GetPage(PagestreamGetPageRequest), DbSize(PagestreamDbSizeRequest), GetSlruSegment(PagestreamGetSlruSegmentRequest), - GetPage(PagestreamGetPageRequest), } // Wrapped in libpq CopyData @@ -854,14 +853,6 @@ pub struct PagestreamNblocksRequest { pub rel: RelTag, } -#[derive(Debug, PartialEq, Eq)] -pub struct PagestreamGetLatestPageRequest { - pub latest: bool, - pub lsn: Lsn, - pub rel: RelTag, - pub blkno: u32, -} - #[derive(Debug, PartialEq, Eq)] pub struct PagestreamGetPageRequest { pub horizon: Lsn, @@ -951,9 +942,9 @@ impl PagestreamFeMessage { bytes.put_u8(req.rel.forknum); } - Self::GetLatestPage(req) => { + Self::GetPage(req) => { bytes.put_u8(2); - bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.horizon.0); bytes.put_u64(req.lsn.0); bytes.put_u32(req.rel.spcnode); bytes.put_u32(req.rel.dbnode); @@ -976,17 +967,6 @@ impl PagestreamFeMessage { bytes.put_u8(req.kind); bytes.put_u32(req.segno); } - - Self::GetPage(req) => { - bytes.put_u8(5); - bytes.put_u64(req.horizon.0); - bytes.put_u64(req.lsn.0); - bytes.put_u32(req.rel.spcnode); - bytes.put_u32(req.rel.dbnode); - bytes.put_u32(req.rel.relnode); - bytes.put_u8(req.rel.forknum); - bytes.put_u32(req.blkno); - } } bytes.into() @@ -999,11 +979,25 @@ impl PagestreamFeMessage { // // TODO: consider using protobuf or serde bincode for less error prone // serialization. - let msg_tag = body.read_u8()?; + let mut msg_tag = body.read_u8()?; + let horizon = if msg_tag >= 10 { + // new protocol + msg_tag -= 10; + Lsn::from(body.read_u64::()?) + } else { + // old_protocol + let latest = body.read_u8()? != 0; + if latest { + Lsn::MAX + } else { + Lsn::INVALID + } + }; + let lsn = Lsn::from(body.read_u64::()?); match msg_tag { 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { - horizon: Lsn::from(body.read_u64::()?), - lsn: Lsn::from(body.read_u64::()?), + horizon, + lsn, rel: RelTag { spcnode: body.read_u32::()?, dbnode: body.read_u32::()?, @@ -1012,8 +1006,8 @@ impl PagestreamFeMessage { }, })), 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - horizon: Lsn::from(body.read_u64::()?), - lsn: Lsn::from(body.read_u64::()?), + horizon, + lsn, rel: RelTag { spcnode: body.read_u32::()?, dbnode: body.read_u32::()?, @@ -1021,35 +1015,9 @@ impl PagestreamFeMessage { forknum: body.read_u8()?, }, })), - 2 => Ok(PagestreamFeMessage::GetLatestPage( - PagestreamGetLatestPageRequest { - latest: body.read_u8()? != 0, - lsn: Lsn::from(body.read_u64::()?), - rel: RelTag { - spcnode: body.read_u32::()?, - dbnode: body.read_u32::()?, - relnode: body.read_u32::()?, - forknum: body.read_u8()?, - }, - blkno: body.read_u32::()?, - }, - )), - 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - horizon: Lsn::from(body.read_u64::()?), - lsn: Lsn::from(body.read_u64::()?), - dbnode: body.read_u32::()?, - })), - 4 => Ok(PagestreamFeMessage::GetSlruSegment( - PagestreamGetSlruSegmentRequest { - horizon: Lsn::from(body.read_u64::()?), - lsn: Lsn::from(body.read_u64::()?), - kind: body.read_u8()?, - segno: body.read_u32::()?, - }, - )), - 5 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - horizon: Lsn::from(body.read_u64::()?), - lsn: Lsn::from(body.read_u64::()?), + 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + horizon, + lsn, rel: RelTag { spcnode: body.read_u32::()?, dbnode: body.read_u32::()?, @@ -1058,6 +1026,19 @@ impl PagestreamFeMessage { }, blkno: body.read_u32::()?, })), + 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { + horizon, + lsn, + dbnode: body.read_u32::()?, + })), + 4 => Ok(PagestreamFeMessage::GetSlruSegment( + PagestreamGetSlruSegmentRequest { + horizon, + lsn, + kind: body.read_u8()?, + segno: body.read_u32::()?, + }, + )), _ => bail!("unknown smgr message tag: {:?}", msg_tag), } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3d86360a20..d2751c695f 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -637,25 +637,6 @@ impl PageServerHandler { span, ) } - PagestreamFeMessage::GetLatestPage(old_req) => { - let req = PagestreamGetPageRequest { - horizon: if old_req.latest { - Lsn::MAX - } else { - old_req.lsn - }, - lsn: old_req.lsn, - rel: old_req.rel, - blkno: old_req.blkno, - }; - let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn); - ( - self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - span, - ) - } PagestreamFeMessage::GetPage(req) => { // shard_id is filled in by the handler let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 35f5d73531..4e4315c433 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -31,12 +31,11 @@ typedef enum { /* pagestore_client -> pagestore */ - T_NeonExistsRequest = 0, + T_NeonExistsRequest = 10, /* new protocol message tags start from 10 */ T_NeonNblocksRequest, - T_NeonGetLatestPageRequest, /* old format of get_page command */ + T_NeonGetPageRequest, T_NeonDbSizeRequest, T_NeonGetSlruSegmentRequest, - T_NeonGetPageRequest, /* pagestore -> pagestore_client */ T_NeonExistsResponse = 100, diff --git a/trace/src/main.rs b/trace/src/main.rs index 3a57af57fa..4605c124e9 100644 --- a/trace/src/main.rs +++ b/trace/src/main.rs @@ -7,9 +7,7 @@ use std::{ io::BufReader, }; -use pageserver_api::models::{ - PagestreamFeMessage, PagestreamGetLatestPageRequest, PagestreamGetPageRequest, -}; +use pageserver_api::models::{PagestreamFeMessage, PagestreamGetPageRequest}; use utils::id::{ConnectionId, TenantId, TimelineId}; use clap::{Parser, Subcommand}; @@ -53,11 +51,9 @@ enum Command { // - detect any prefetching anomalies by looking for negative deltas during seqscan fn analyze_trace(mut reader: R) { let mut total = 0; // Total requests traced - let mut old = 0; // Old requests traced let mut cross_rel = 0; // Requests that ask for different rel than previous request let mut deltas = HashMap::::new(); // Consecutive blkno differences let mut prev: Option = None; - let mut old_prev: Option = None; // Compute stats while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) { @@ -65,20 +61,6 @@ fn analyze_trace(mut reader: R) { PagestreamFeMessage::Exists(_) => {} PagestreamFeMessage::Nblocks(_) => {} PagestreamFeMessage::GetSlruSegment(_) => {} - PagestreamFeMessage::GetLatestPage(req) => { - total += 1; - old += 1; - - if let Some(prev) = old_prev { - if prev.rel == req.rel { - let delta = (req.blkno as i32) - (prev.blkno as i32); - deltas.entry(delta).and_modify(|c| *c += 1).or_insert(1); - } else { - cross_rel += 1; - } - } - old_prev = Some(req); - } PagestreamFeMessage::GetPage(req) => { total += 1; @@ -101,7 +83,6 @@ fn analyze_trace(mut reader: R) { deltas.retain(|_, count| *count > 300); other -= deltas.len(); dbg!(total); - dbg!(old); dbg!(cross_rel); dbg!(other); dbg!(deltas);