diff --git a/Cargo.lock b/Cargo.lock index b39ca6e5a7..3e67126add 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2189,7 +2189,10 @@ dependencies = [ name = "pageserver_api" version = "0.1.0" dependencies = [ + "anyhow", + "bytes", "const_format", + "postgres_ffi", "serde", "serde_with", "utils", diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index 5995325a2f..9121cd4989 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -7,6 +7,9 @@ edition = "2021" serde = { version = "1.0", features = ["derive"] } serde_with = "2.0" const_format = "0.2.21" +anyhow = { version = "1.0", features = ["backtrace"] } +bytes = "1.0.1" utils = { path = "../utils" } +postgres_ffi = { path = "../postgres_ffi" } workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/libs/pageserver_api/src/lib.rs b/libs/pageserver_api/src/lib.rs index a36c1692a9..4890d54f36 100644 --- a/libs/pageserver_api/src/lib.rs +++ b/libs/pageserver_api/src/lib.rs @@ -2,6 +2,7 @@ use const_format::formatcp; /// Public API types pub mod models; +pub mod reltag; pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000; pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index dd40ba9e1c..4360f76fd1 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -7,6 +7,10 @@ use utils::{ lsn::Lsn, }; +use crate::reltag::RelTag; +use anyhow::bail; +use bytes::{Buf, BufMut, Bytes, BytesMut}; + /// A state of a tenant in pageserver's memory. #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum TenantState { @@ -219,3 +223,160 @@ pub struct FailpointConfig { pub struct TimelineGcRequest { pub gc_horizon: Option, } + +// Wrapped in libpq CopyData +pub enum PagestreamFeMessage { + Exists(PagestreamExistsRequest), + Nblocks(PagestreamNblocksRequest), + GetPage(PagestreamGetPageRequest), + DbSize(PagestreamDbSizeRequest), +} + +// Wrapped in libpq CopyData +pub enum PagestreamBeMessage { + Exists(PagestreamExistsResponse), + Nblocks(PagestreamNblocksResponse), + GetPage(PagestreamGetPageResponse), + Error(PagestreamErrorResponse), + DbSize(PagestreamDbSizeResponse), +} + +#[derive(Debug)] +pub struct PagestreamExistsRequest { + pub latest: bool, + pub lsn: Lsn, + pub rel: RelTag, +} + +#[derive(Debug)] +pub struct PagestreamNblocksRequest { + pub latest: bool, + pub lsn: Lsn, + pub rel: RelTag, +} + +#[derive(Debug)] +pub struct PagestreamGetPageRequest { + pub latest: bool, + pub lsn: Lsn, + pub rel: RelTag, + pub blkno: u32, +} + +#[derive(Debug)] +pub struct PagestreamDbSizeRequest { + pub latest: bool, + pub lsn: Lsn, + pub dbnode: u32, +} + +#[derive(Debug)] +pub struct PagestreamExistsResponse { + pub exists: bool, +} + +#[derive(Debug)] +pub struct PagestreamNblocksResponse { + pub n_blocks: u32, +} + +#[derive(Debug)] +pub struct PagestreamGetPageResponse { + pub page: Bytes, +} + +#[derive(Debug)] +pub struct PagestreamErrorResponse { + pub message: String, +} + +#[derive(Debug)] +pub struct PagestreamDbSizeResponse { + pub db_size: i64, +} + +impl PagestreamFeMessage { + pub fn parse(mut body: Bytes) -> anyhow::Result { + // TODO these gets can fail + + // these correspond to the NeonMessageTag enum in pagestore_client.h + // + // TODO: consider using protobuf or serde bincode for less error prone + // serialization. + let msg_tag = body.get_u8(); + match msg_tag { + 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { + latest: body.get_u8() != 0, + lsn: Lsn::from(body.get_u64()), + rel: RelTag { + spcnode: body.get_u32(), + dbnode: body.get_u32(), + relnode: body.get_u32(), + forknum: body.get_u8(), + }, + })), + 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { + latest: body.get_u8() != 0, + lsn: Lsn::from(body.get_u64()), + rel: RelTag { + spcnode: body.get_u32(), + dbnode: body.get_u32(), + relnode: body.get_u32(), + forknum: body.get_u8(), + }, + })), + 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + latest: body.get_u8() != 0, + lsn: Lsn::from(body.get_u64()), + rel: RelTag { + spcnode: body.get_u32(), + dbnode: body.get_u32(), + relnode: body.get_u32(), + forknum: body.get_u8(), + }, + blkno: body.get_u32(), + })), + 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { + latest: body.get_u8() != 0, + lsn: Lsn::from(body.get_u64()), + dbnode: body.get_u32(), + })), + _ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body), + } + } +} + +impl PagestreamBeMessage { + pub fn serialize(&self) -> Bytes { + let mut bytes = BytesMut::new(); + + match self { + Self::Exists(resp) => { + bytes.put_u8(100); /* tag from pagestore_client.h */ + bytes.put_u8(resp.exists as u8); + } + + Self::Nblocks(resp) => { + bytes.put_u8(101); /* tag from pagestore_client.h */ + bytes.put_u32(resp.n_blocks); + } + + Self::GetPage(resp) => { + bytes.put_u8(102); /* tag from pagestore_client.h */ + bytes.put(&resp.page[..]); + } + + Self::Error(resp) => { + bytes.put_u8(103); /* tag from pagestore_client.h */ + bytes.put(resp.message.as_bytes()); + bytes.put_u8(0); // null terminator + } + Self::DbSize(resp) => { + bytes.put_u8(104); /* tag from pagestore_client.h */ + bytes.put_i64(resp.db_size); + } + } + + bytes.into() + } +} diff --git a/pageserver/src/reltag.rs b/libs/pageserver_api/src/reltag.rs similarity index 100% rename from pageserver/src/reltag.rs rename to libs/pageserver_api/src/reltag.rs diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index d0a57a473b..973c3cd3a6 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -22,8 +22,8 @@ use std::time::SystemTime; use tar::{Builder, EntryType, Header}; use tracing::*; -use crate::reltag::{RelTag, SlruKind}; use crate::tenant::Timeline; +use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID}; use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA}; diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index ee3dc684e3..642e41765b 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -12,10 +12,10 @@ use tracing::*; use walkdir::WalkDir; use crate::pgdatadir_mapping::*; -use crate::reltag::{RelTag, SlruKind}; use crate::tenant::Timeline; use crate::walingest::WalIngest; use crate::walrecord::DecodedWALRecord; +use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::*; use postgres_ffi::waldecoder::WalStreamDecoder; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index c75f940386..52a4cb0381 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -8,7 +8,6 @@ pub mod page_cache; pub mod page_service; pub mod pgdatadir_mapping; pub mod profiling; -pub mod reltag; pub mod repository; pub mod storage_sync; pub mod task_mgr; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d61885314e..aec91bc7f1 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -10,8 +10,14 @@ // use anyhow::{bail, ensure, Context, Result}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::Bytes; use futures::{Stream, StreamExt}; +use pageserver_api::models::{ + PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, + PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, + PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, + PagestreamNblocksRequest, PagestreamNblocksResponse, +}; use std::io; use std::net::TcpListener; use std::str; @@ -35,7 +41,6 @@ use crate::config::{PageServerConf, ProfilingConfig}; use crate::import_datadir::import_wal_from_tar; use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME}; use crate::profiling::profpoint_start; -use crate::reltag::RelTag; use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::tenant::Timeline; @@ -45,163 +50,6 @@ use crate::CheckpointConfig; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; -// Wrapped in libpq CopyData -enum PagestreamFeMessage { - Exists(PagestreamExistsRequest), - Nblocks(PagestreamNblocksRequest), - GetPage(PagestreamGetPageRequest), - DbSize(PagestreamDbSizeRequest), -} - -// Wrapped in libpq CopyData -enum PagestreamBeMessage { - Exists(PagestreamExistsResponse), - Nblocks(PagestreamNblocksResponse), - GetPage(PagestreamGetPageResponse), - Error(PagestreamErrorResponse), - DbSize(PagestreamDbSizeResponse), -} - -#[derive(Debug)] -struct PagestreamExistsRequest { - latest: bool, - lsn: Lsn, - rel: RelTag, -} - -#[derive(Debug)] -struct PagestreamNblocksRequest { - latest: bool, - lsn: Lsn, - rel: RelTag, -} - -#[derive(Debug)] -struct PagestreamGetPageRequest { - latest: bool, - lsn: Lsn, - rel: RelTag, - blkno: u32, -} - -#[derive(Debug)] -struct PagestreamDbSizeRequest { - latest: bool, - lsn: Lsn, - dbnode: u32, -} - -#[derive(Debug)] -struct PagestreamExistsResponse { - exists: bool, -} - -#[derive(Debug)] -struct PagestreamNblocksResponse { - n_blocks: u32, -} - -#[derive(Debug)] -struct PagestreamGetPageResponse { - page: Bytes, -} - -#[derive(Debug)] -struct PagestreamErrorResponse { - message: String, -} - -#[derive(Debug)] -struct PagestreamDbSizeResponse { - db_size: i64, -} - -impl PagestreamFeMessage { - fn parse(mut body: Bytes) -> anyhow::Result { - // TODO these gets can fail - - // these correspond to the NeonMessageTag enum in pagestore_client.h - // - // TODO: consider using protobuf or serde bincode for less error prone - // serialization. - let msg_tag = body.get_u8(); - match msg_tag { - 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { - latest: body.get_u8() != 0, - lsn: Lsn::from(body.get_u64()), - rel: RelTag { - spcnode: body.get_u32(), - dbnode: body.get_u32(), - relnode: body.get_u32(), - forknum: body.get_u8(), - }, - })), - 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - latest: body.get_u8() != 0, - lsn: Lsn::from(body.get_u64()), - rel: RelTag { - spcnode: body.get_u32(), - dbnode: body.get_u32(), - relnode: body.get_u32(), - forknum: body.get_u8(), - }, - })), - 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - latest: body.get_u8() != 0, - lsn: Lsn::from(body.get_u64()), - rel: RelTag { - spcnode: body.get_u32(), - dbnode: body.get_u32(), - relnode: body.get_u32(), - forknum: body.get_u8(), - }, - blkno: body.get_u32(), - })), - 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - latest: body.get_u8() != 0, - lsn: Lsn::from(body.get_u64()), - dbnode: body.get_u32(), - })), - _ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body), - } - } -} - -impl PagestreamBeMessage { - fn serialize(&self) -> Bytes { - let mut bytes = BytesMut::new(); - - match self { - Self::Exists(resp) => { - bytes.put_u8(100); /* tag from pagestore_client.h */ - bytes.put_u8(resp.exists as u8); - } - - Self::Nblocks(resp) => { - bytes.put_u8(101); /* tag from pagestore_client.h */ - bytes.put_u32(resp.n_blocks); - } - - Self::GetPage(resp) => { - bytes.put_u8(102); /* tag from pagestore_client.h */ - bytes.put(&resp.page[..]); - } - - Self::Error(resp) => { - bytes.put_u8(103); /* tag from pagestore_client.h */ - bytes.put(resp.message.as_bytes()); - bytes.put_u8(0); // null terminator - } - Self::DbSize(resp) => { - bytes.put_u8(104); /* tag from pagestore_client.h */ - bytes.put_i64(resp.db_size); - } - } - - bytes.into() - } -} - fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream> + '_ { async_stream::try_stream! { loop { diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index ca931ed37d..0e334a63df 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -7,12 +7,12 @@ //! Clarify that) //! use crate::keyspace::{KeySpace, KeySpaceAccum}; -use crate::reltag::{RelTag, SlruKind}; use crate::repository::*; use crate::tenant::Timeline; use crate::walrecord::NeonWalRecord; use anyhow::{bail, ensure, Result}; use bytes::{Buf, Bytes}; +use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; use postgres_ffi::{Oid, TimestampTz, TransactionId}; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 194ca0d857..6a96254df4 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -37,8 +37,8 @@ use crate::metrics::TimelineMetrics; use crate::pgdatadir_mapping::BlockNumber; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; -use crate::reltag::RelTag; use crate::tenant_config::TenantConfOpt; +use pageserver_api::reltag::RelTag; use postgres_ffi::to_pg_timestamp; use utils::{ diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 9a6b99d991..8c81ed824b 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -31,10 +31,10 @@ use bytes::{Buf, Bytes, BytesMut}; use tracing::*; use crate::pgdatadir_mapping::*; -use crate::reltag::{RelTag, SlruKind}; use crate::tenant::Timeline; use crate::walrecord::*; use crate::ZERO_PAGE; +use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index e683c301d8..1cde11082e 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -43,10 +43,10 @@ use crate::metrics::{ WAL_REDO_WAIT_TIME, }; use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block}; -use crate::reltag::{RelTag, SlruKind}; use crate::repository::Key; use crate::walrecord::NeonWalRecord; use crate::{config::PageServerConf, TEMP_FILE_SUFFIX}; +use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; use postgres_ffi::v14::nonrelfile_utils::{