Issue #411. Support drop database in pageserver.

Use put_unlink for FileNodeMap relishes.
Always store FileNodeMap as materialized page images (no wal records).
This commit is contained in:
anastasia
2021-08-11 13:41:06 +03:00
committed by lubennikovaav
parent 27442c3daa
commit 78963ad104
6 changed files with 88 additions and 46 deletions

View File

@@ -179,18 +179,18 @@ impl<'a> Basebackup<'a> {
// Extract twophase state files
//
fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
if let Ok(img) =
self.timeline
.get_page_at_lsn_nowait(RelishTag::TwoPhase { xid }, 0, self.lsn)
{
let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
let crc = crc32c::crc32c(&img[..]);
buf.put_u32_le(crc);
let path = format!("pg_twophase/{:>08X}", xid);
let header = new_tar_header(&path, buf.len() as u64)?;
self.ar.append(&header, &buf[..])?;
}
let img = self
.timeline
.get_page_at_lsn_nowait(RelishTag::TwoPhase { xid }, 0, self.lsn)?;
let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
let crc = crc32c::crc32c(&img[..]);
buf.put_u32_le(crc);
let path = format!("pg_twophase/{:>08X}", xid);
let header = new_tar_header(&path, buf.len() as u64)?;
self.ar.append(&header, &buf[..])?;
Ok(())
}

View File

@@ -589,7 +589,11 @@ impl Timeline for LayeredTimeline {
let mut all_rels = HashSet::new();
let mut timeline = self;
loop {
let rels = timeline.layers.lock().unwrap().list_rels(spcnode, dbnode, lsn)?;
let rels = timeline
.layers
.lock()
.unwrap()
.list_rels(spcnode, dbnode, lsn)?;
all_rels.extend(rels.iter());

View File

@@ -213,17 +213,15 @@ impl LayerMap {
if (spcnode == 0 || reltag.spcnode == spcnode)
&& (dbnode == 0 || reltag.dbnode == dbnode)
{
// Add only if it exists at the requested LSN.
if let Some(open) = &segentry.open {
if open.get_end_lsn() > lsn {
if open.get_end_lsn() > lsn {
rels.insert(reltag);
}
}
else if let Some((_k, _v)) = segentry
.historic
.range((Included(Lsn(0)), Included(lsn)))
.next_back()
} else if let Some((_k, _v)) = segentry
.historic
.range((Included(Lsn(0)), Included(lsn)))
.next_back()
{
rels.insert(reltag);
}
@@ -239,19 +237,17 @@ impl LayerMap {
// Scan the timeline directory to get all rels in this timeline.
for (seg, segentry) in self.segs.iter() {
if let RelishTag::Relation(_) = seg.rel { }
else
{
if let RelishTag::Relation(_) = seg.rel {
} else {
// Add only if it exists at the requested LSN.
if let Some(open) = &segentry.open {
if open.get_end_lsn() > lsn {
if open.get_end_lsn() > lsn {
rels.insert(seg.rel);
}
}
else if let Some((_k, _v)) = segentry
.historic
.range((Included(Lsn(0)), Included(lsn)))
.next_back()
} else if let Some((_k, _v)) = segentry
.historic
.range((Included(Lsn(0)), Included(lsn)))
.next_back()
{
rels.insert(seg.rel);
}

View File

@@ -386,9 +386,30 @@ pub fn save_decoded_record(
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE {
let createdb = XlCreateDatabase::decode(&mut buf);
save_xlog_dbase_create(timeline, lsn, &createdb)?;
} else {
// TODO
trace!("XLOG_DBASE_DROP is not handled yet");
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
let rels = timeline.list_rels(tablespace_id, dropdb.db_id, lsn)?;
for rel in rels {
timeline.put_unlink(RelishTag::Relation(rel), lsn)?;
}
trace!(
"Unlink FileNodeMap {}, {} at lsn {}",
tablespace_id,
dropdb.db_id,
lsn
);
timeline.put_unlink(
RelishTag::FileNodeMap {
spcnode: tablespace_id,
dbnode: dropdb.db_id,
},
lsn,
)?;
}
}
} else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID {
trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
@@ -486,7 +507,7 @@ pub fn save_decoded_record(
}
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
save_relmap_record(timeline, lsn, &xlrec, decoded)?;
save_relmap_page(timeline, lsn, &xlrec, decoded)?;
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
@@ -943,22 +964,23 @@ fn save_multixact_truncate_record(
Ok(())
}
fn save_relmap_record(
fn save_relmap_page(
timeline: &dyn Timeline,
lsn: Lsn,
xlrec: &XlRelmapUpdate,
decoded: &DecodedWALRecord,
) -> Result<()> {
let rec = WALRecord {
lsn,
will_init: true,
rec: decoded.record.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
let tag = RelishTag::FileNodeMap {
spcnode: xlrec.tsid,
dbnode: xlrec.dbid,
};
timeline.put_wal_record(tag, 0, rec)?;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
// skip xl_relmap_update
buf.advance(12);
timeline.put_page_image(tag, 0, lsn, Bytes::copy_from_slice(&buf[..]))?;
Ok(())
}

View File

@@ -331,6 +331,31 @@ impl XlCreateDatabase {
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlDropDatabase {
pub db_id: Oid,
pub n_tablespaces: Oid, /* number of tablespace IDs */
pub tablespace_ids: Vec<Oid>,
}
impl XlDropDatabase {
pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
let mut rec = XlDropDatabase {
db_id: buf.get_u32_le(),
n_tablespaces: buf.get_u32_le(),
tablespace_ids: Vec::<Oid>::new(),
};
for _i in 0..rec.n_tablespaces {
let id = buf.get_u32_le();
rec.tablespace_ids.push(id);
}
rec
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapInsert {

View File

@@ -418,11 +418,6 @@ impl PostgresRedoManager {
} else {
panic!();
}
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
// Relation map file has size 512 bytes
page.clear();
page.extend_from_slice(&buf[12..]); // skip xl_relmap_update
assert!(page.len() == 512); // size of pg_filenode.map
}
}