Implement pg_database_size().

In this implementation dbsize equals sum of all relation sizes, excluding shared ones.
This commit is contained in:
Anastasia Lubennikova
2022-05-03 13:23:18 +03:00
committed by Anastasia Lubennikova
parent b68e3b03ed
commit e2cf77441d
3 changed files with 67 additions and 2 deletions

View File

@@ -44,11 +44,14 @@ use crate::CheckpointConfig;
use metrics::{register_histogram_vec, HistogramVec};
use postgres_ffi::xlog_utils::to_pg_timestamp;
use postgres_ffi::pg_constants;
// Wrapped in libpq CopyData
enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
}
// Wrapped in libpq CopyData
@@ -57,6 +60,7 @@ enum PagestreamBeMessage {
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
}
#[derive(Debug)]
@@ -81,6 +85,13 @@ struct PagestreamGetPageRequest {
blkno: u32,
}
#[derive(Debug)]
struct PagestreamDbSizeRequest {
latest: bool,
lsn: Lsn,
dbnode: u32,
}
#[derive(Debug)]
struct PagestreamExistsResponse {
exists: bool,
@@ -101,6 +112,11 @@ struct PagestreamErrorResponse {
message: String,
}
#[derive(Debug)]
struct PagestreamDbSizeResponse {
db_size: i64,
}
impl PagestreamFeMessage {
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
@@ -142,6 +158,11 @@ impl PagestreamFeMessage {
},
blkno: body.get_u32(),
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
})),
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
}
}
@@ -172,6 +193,10 @@ impl PagestreamBeMessage {
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(104); /* tag from pagestore_client.h */
bytes.put_i64(resp.db_size);
}
}
bytes.into()
@@ -367,6 +392,11 @@ impl PageServerHandler {
.observe_closure_duration(|| {
self.handle_get_page_at_lsn_request(timeline.as_ref(), &req)
}),
PagestreamFeMessage::DbSize(req) => SMGR_QUERY_TIME
.with_label_values(&["get_db_size", &tenant_id, &timeline_id])
.observe_closure_duration(|| {
self.handle_db_size_request(timeline.as_ref(), &req)
}),
};
let response = response.unwrap_or_else(|e| {
@@ -487,6 +517,32 @@ impl PageServerHandler {
}))
}
fn handle_db_size_request<R: Repository>(
&self,
timeline: &DatadirTimeline<R>,
req: &PagestreamDbSizeRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_db_size", dbnode = %req.dbnode, req_lsn = %req.lsn).entered();
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
let all_rels = timeline.list_rels(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?;
let mut total_blocks: i64 = 0;
for rel in all_rels {
if rel.forknum == 0 {
let n_blocks = timeline.get_rel_size(rel, lsn).unwrap_or(0);
total_blocks += n_blocks as i64;
}
}
let db_size = total_blocks * pg_constants::BLCKSZ as i64;
Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
db_size,
}))
}
fn handle_get_page_at_lsn_request<R: Repository>(
&self,
timeline: &DatadirTimeline<R>,

View File

@@ -32,7 +32,16 @@ def test_createdb(zenith_simple_env: ZenithEnv):
# Test that you can connect to the new database on both branches
for db in (pg, pg2):
db.connect(dbname='foodb').close()
with closing(db.connect(dbname='foodb')) as conn:
with conn.cursor() as cur:
# Check database size in both branches
cur.execute(
'select pg_size_pretty(pg_database_size(%s)), pg_size_pretty(sum(pg_relation_size(oid))) from pg_class where relisshared is false;',
('foodb', ))
res = cur.fetchone()
# check that dbsize equals sum of all relation sizes, excluding shared ones
# This is how we define dbsize in zenith for now
assert res[0] == res[1]
#