From 7136d0a1923b49ebbe36448094cece3bf2023ff9 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 16 Oct 2024 18:15:17 +0200 Subject: [PATCH] pageserver: refactor dbase records --- pageserver/src/walingest.rs | 248 ++++++++++++++++++++++++------------ 1 file changed, 169 insertions(+), 79 deletions(-) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index c396751b68..05f689137d 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -135,6 +135,23 @@ struct SmgrTruncate { to: BlockNumber, } +enum DbaseRecord { + Create(DbaseCreate), + Drop(DbaseDrop), +} + +struct DbaseCreate { + db_id: u32, + tablespace_id: u32, + src_db_id: u32, + src_tablespace_id: u32, +} + +struct DbaseDrop { + db_id: u32, + tablespace_ids: Vec, +} + impl WalIngest { pub async fn new( timeline: &Timeline, @@ -243,83 +260,17 @@ impl WalIngest { } } pg_constants::RM_DBASE_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - debug!(%info, %pg_version, "handle RM_DBASE_ID"); + let maybe_dbase_record = + Self::decode_dbase_record(&mut buf, &decoded, pg_version).unwrap(); - if pg_version == 14 { - if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE { - let createdb = XlCreateDatabase::decode(&mut buf); - debug!("XLOG_DBASE_CREATE v14"); - - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) + if let Some(dbase_record) = maybe_dbase_record { + match dbase_record { + DbaseRecord::Create(create) => { + self.ingest_xlog_dbase_create(create, modification, ctx) .await?; } - } - } else if pg_version == 15 { - if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) - .await?; - } - } - } else if pg_version == 16 { - if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) - .await?; - } - } - } else if pg_version == 17 { - if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) - .await?; + DbaseRecord::Drop(drop) => { + self.ingest_xlog_dbase_drop(drop, modification, ctx).await?; } } } @@ -1334,14 +1285,16 @@ impl WalIngest { /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record. async fn ingest_xlog_dbase_create( &mut self, + create: DbaseCreate, modification: &mut DatadirModification<'_>, - rec: &XlCreateDatabase, ctx: &RequestContext, ) -> anyhow::Result<()> { - let db_id = rec.db_id; - let tablespace_id = rec.tablespace_id; - let src_db_id = rec.src_db_id; - let src_tablespace_id = rec.src_tablespace_id; + let DbaseCreate { + db_id, + tablespace_id, + src_db_id, + src_tablespace_id, + } = create; let rels = modification .tline @@ -1427,6 +1380,143 @@ impl WalIngest { Ok(()) } + async fn ingest_xlog_dbase_drop( + &mut self, + dbase_drop: DbaseDrop, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let DbaseDrop { + db_id, + tablespace_ids, + } = dbase_drop; + for tablespace_id in tablespace_ids { + trace!("Drop db {}, {}", tablespace_id, db_id); + modification.drop_dbdir(tablespace_id, db_id, ctx).await?; + } + + Ok(()) + } + + fn decode_dbase_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + // TODO: Refactor this to avoid the duplication between postgres versions. + + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + debug!(%info, %pg_version, "handle RM_DBASE_ID"); + + if pg_version == 14 { + if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE { + let createdb = XlCreateDatabase::decode(buf); + debug!("XLOG_DBASE_CREATE v14"); + + let record = DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + }); + + return Ok(Some(record)); + } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + + let record = DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + }); + + return Ok(Some(record)); + } + } else if pg_version == 15 { + if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { + debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + debug!("XLOG_DBASE_CREATE_FILE_COPY"); + + let createdb = XlCreateDatabase::decode(buf); + let record = DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + }); + + return Ok(Some(record)); + } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + let record = DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + }); + + return Ok(Some(record)); + } + } else if pg_version == 16 { + if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { + debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + debug!("XLOG_DBASE_CREATE_FILE_COPY"); + + let createdb = XlCreateDatabase::decode(buf); + let record = DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + }); + + return Ok(Some(record)); + } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + let record = DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + }); + + return Ok(Some(record)); + } + } else if pg_version == 17 { + if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG { + debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + debug!("XLOG_DBASE_CREATE_FILE_COPY"); + + let createdb = XlCreateDatabase::decode(buf); + let record = DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + }); + + return Ok(Some(record)); + } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + let record = DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + }); + + return Ok(Some(record)); + } + } + + Ok(None) + } + async fn ingest_xlog_smgr_create( &mut self, create: SmgrCreate,