Move pagestream api to libs/pageserver_api (#2698)

This commit is contained in:
bojanserafimov
2022-10-26 17:32:31 -04:00
committed by GitHub
parent 259a5f356e
commit 0c54eb65fb
13 changed files with 181 additions and 166 deletions

3
Cargo.lock generated
View File

@@ -2189,7 +2189,10 @@ dependencies = [
name = "pageserver_api"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"const_format",
"postgres_ffi",
"serde",
"serde_with",
"utils",

View File

@@ -7,6 +7,9 @@ edition = "2021"
serde = { version = "1.0", features = ["derive"] }
serde_with = "2.0"
const_format = "0.2.21"
anyhow = { version = "1.0", features = ["backtrace"] }
bytes = "1.0.1"
utils = { path = "../utils" }
postgres_ffi = { path = "../postgres_ffi" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -2,6 +2,7 @@ use const_format::formatcp;
/// Public API types
pub mod models;
pub mod reltag;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");

View File

@@ -7,6 +7,10 @@ use utils::{
lsn::Lsn,
};
use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{Buf, BufMut, Bytes, BytesMut};
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
@@ -219,3 +223,160 @@ pub struct FailpointConfig {
pub struct TimelineGcRequest {
pub gc_horizon: Option<u64>,
}
// Wrapped in libpq CopyData
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
}
// Wrapped in libpq CopyData
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
}
#[derive(Debug)]
pub struct PagestreamExistsRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
}
#[derive(Debug)]
pub struct PagestreamNblocksRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
}
#[derive(Debug)]
pub struct PagestreamGetPageRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug)]
pub struct PagestreamDbSizeRequest {
pub latest: bool,
pub lsn: Lsn,
pub dbnode: u32,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub exists: bool,
}
#[derive(Debug)]
pub struct PagestreamNblocksResponse {
pub n_blocks: u32,
}
#[derive(Debug)]
pub struct PagestreamGetPageResponse {
pub page: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub message: String,
}
#[derive(Debug)]
pub struct PagestreamDbSizeResponse {
pub db_size: i64,
}
impl PagestreamFeMessage {
pub fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.get_u8();
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
blkno: body.get_u32(),
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
})),
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
}
}
}
impl PagestreamBeMessage {
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(resp) => {
bytes.put_u8(100); /* tag from pagestore_client.h */
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(101); /* tag from pagestore_client.h */
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(102); /* tag from pagestore_client.h */
bytes.put(&resp.page[..]);
}
Self::Error(resp) => {
bytes.put_u8(103); /* tag from pagestore_client.h */
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(104); /* tag from pagestore_client.h */
bytes.put_i64(resp.db_size);
}
}
bytes.into()
}
}

View File

@@ -22,8 +22,8 @@ use std::time::SystemTime;
use tar::{Builder, EntryType, Header};
use tracing::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};

View File

@@ -12,10 +12,10 @@ use tracing::*;
use walkdir::WalkDir;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::WalStreamDecoder;

View File

@@ -8,7 +8,6 @@ pub mod page_cache;
pub mod page_service;
pub mod pgdatadir_mapping;
pub mod profiling;
pub mod reltag;
pub mod repository;
pub mod storage_sync;
pub mod task_mgr;

View File

@@ -10,8 +10,14 @@
//
use anyhow::{bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
};
use std::io;
use std::net::TcpListener;
use std::str;
@@ -35,7 +41,6 @@ use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::profiling::profpoint_start;
use crate::reltag::RelTag;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
@@ -45,163 +50,6 @@ use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
}
// Wrapped in libpq CopyData
enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
}
#[derive(Debug)]
struct PagestreamExistsRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
struct PagestreamNblocksRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
struct PagestreamGetPageRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
blkno: u32,
}
#[derive(Debug)]
struct PagestreamDbSizeRequest {
latest: bool,
lsn: Lsn,
dbnode: u32,
}
#[derive(Debug)]
struct PagestreamExistsResponse {
exists: bool,
}
#[derive(Debug)]
struct PagestreamNblocksResponse {
n_blocks: u32,
}
#[derive(Debug)]
struct PagestreamGetPageResponse {
page: Bytes,
}
#[derive(Debug)]
struct PagestreamErrorResponse {
message: String,
}
#[derive(Debug)]
struct PagestreamDbSizeResponse {
db_size: i64,
}
impl PagestreamFeMessage {
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.get_u8();
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
blkno: body.get_u32(),
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
})),
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
}
}
}
impl PagestreamBeMessage {
fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(resp) => {
bytes.put_u8(100); /* tag from pagestore_client.h */
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(101); /* tag from pagestore_client.h */
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(102); /* tag from pagestore_client.h */
bytes.put(&resp.page[..]);
}
Self::Error(resp) => {
bytes.put_u8(103); /* tag from pagestore_client.h */
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(104); /* tag from pagestore_client.h */
bytes.put_i64(resp.db_size);
}
}
bytes.into()
}
}
fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Bytes>> + '_ {
async_stream::try_stream! {
loop {

View File

@@ -7,12 +7,12 @@
//! Clarify that)
//!
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::reltag::{RelTag, SlruKind};
use crate::repository::*;
use crate::tenant::Timeline;
use crate::walrecord::NeonWalRecord;
use anyhow::{bail, ensure, Result};
use bytes::{Buf, Bytes};
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TimestampTz, TransactionId};

View File

@@ -37,8 +37,8 @@ use crate::metrics::TimelineMetrics;
use crate::pgdatadir_mapping::BlockNumber;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::reltag::RelTag;
use crate::tenant_config::TenantConfOpt;
use pageserver_api::reltag::RelTag;
use postgres_ffi::to_pg_timestamp;
use utils::{

View File

@@ -31,10 +31,10 @@ use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use crate::walrecord::*;
use crate::ZERO_PAGE;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;

View File

@@ -43,10 +43,10 @@ use crate::metrics::{
WAL_REDO_WAIT_TIME,
};
use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Key;
use crate::walrecord::NeonWalRecord;
use crate::{config::PageServerConf, TEMP_FILE_SUFFIX};
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::v14::nonrelfile_utils::{