Support CREATE DATABASE command

This commit is contained in:
Konstantin Knizhnik
2021-04-23 17:03:56 +03:00
parent b64bd2a8af
commit 52ee3a2bac
5 changed files with 147 additions and 65 deletions

View File

@@ -229,11 +229,13 @@ fn init_logging(conf: &PageServerConf) -> Result<slog_scope::GlobalLoggerGuard,
if record.level().is_at_least(slog::Level::Info) {
return true;
}
/* let's do not be too verbose
if record.level().is_at_least(slog::Level::Debug)
&& record.module().starts_with("pageserver")
{
return true;
}
*/
false
})
.fuse();

View File

@@ -5,6 +5,7 @@
//
use crate::restore_local_repo::restore_timeline;
use crate::waldecoder::Oid;
use crate::ZTimelineId;
use crate::{walredo, zenith_repo_dir, PageServerConf};
use anyhow::{bail, Context};
@@ -16,8 +17,8 @@ use log::*;
use rocksdb;
use std::cmp::min;
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicU64};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
@@ -494,7 +495,18 @@ impl PageCache {
Ok(page_img)
}
async fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> {
async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result<u64> {
let mut lsn = req_lsn;
//When invalid LSN is requested, it means "don't wait, return latest version of the page"
//This is necessary for bootstrap.
if lsn == 0 {
lsn = self.last_valid_lsn.load(Ordering::Acquire);
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
self.last_valid_lsn.load(Ordering::Acquire),
lsn
);
}
self.seqwait_lsn
.wait_for_timeout(lsn, TIMEOUT)
.await
@@ -506,7 +518,7 @@ impl PageCache {
)
})?;
Ok(())
Ok(lsn)
}
//
@@ -517,22 +529,7 @@ impl PageCache {
pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
let mut lsn = req_lsn;
//When invalid LSN is requested, it means "don't wait, return latest version of the page"
//This is necessary for bootstrap.
if lsn == 0
{
lsn = self.last_valid_lsn.load(Ordering::Acquire);
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
self.last_valid_lsn.load(Ordering::Acquire),
lsn
);
}
else
{
self.wait_lsn(lsn).await?;
}
let lsn = self.wait_lsn(req_lsn).await?;
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
@@ -555,6 +552,7 @@ impl PageCache {
if entry_opt.is_none() {
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn);
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
@@ -578,7 +576,7 @@ impl PageCache {
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
trace!(
debug!(
"Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}",
page_lsn_hi,
page_lsn_lo,
@@ -738,7 +736,6 @@ impl PageCache {
// Can't move backwards.
let oldlsn = shared.last_valid_lsn;
if lsn >= oldlsn {
shared.last_valid_lsn = lsn;
self.seqwait_lsn.advance(lsn);
@@ -809,9 +806,10 @@ impl PageCache {
shared.last_record_lsn
}
pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<u32> {
pub async fn relsize_get(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result<u32> {
let mut lsn = req_lsn;
if lsn != u64::MAX {
self.wait_lsn(lsn).await?;
lsn = self.wait_lsn(lsn).await?;
}
let mut key = CacheKey {
@@ -848,18 +846,18 @@ impl PageCache {
}
}
let relsize = tag.blknum + 1;
trace!("Size of relation {:?} at {} is {}", rel, lsn, relsize);
debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize);
return Ok(relsize);
}
}
break;
}
trace!("Size of relation {:?} at {} is zero", rel, lsn);
debug!("Size of relation {:?} at {} is zero", rel, lsn);
Ok(0)
}
pub async fn relsize_exist(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<bool> {
self.wait_lsn(lsn).await?;
pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result<bool> {
let lsn = self.wait_lsn(req_lsn).await?;
let key = CacheKey {
tag: BufferTag {
@@ -879,11 +877,11 @@ impl PageCache {
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == *rel {
trace!("Relation {:?} exists at {}", rel, lsn);
debug!("Relation {:?} exists at {}", rel, lsn);
return Ok(true);
}
}
trace!("Relation {:?} doesn't exist at {}", rel, lsn);
debug!("Relation {:?} doesn't exist at {}", rel, lsn);
Ok(false)
}
@@ -898,6 +896,56 @@ impl PageCache {
last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed),
}
}
pub fn create_database(
&self,
lsn: u64,
db_id: Oid,
tablespace_id: Oid,
src_db_id: Oid,
src_tablespace_id: Oid,
) -> anyhow::Result<()> {
let mut buf = BytesMut::new();
let key = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: src_tablespace_id,
dbnode: src_db_id,
relnode: 0,
forknum: 0u8,
},
blknum: 0,
},
lsn: 0,
};
key.pack(&mut buf);
let iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Forward,
));
let mut n = 0;
for (k, v) in iter {
buf.clear();
buf.extend_from_slice(&k);
let mut key = CacheKey::unpack(&mut buf);
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
key.tag.rel.spcnode = tablespace_id;
key.tag.rel.dbnode = db_id;
key.lsn = lsn;
buf.clear();
key.pack(&mut buf);
self.db.put(&buf[..], v)?;
n += 1;
}
info!(
"Create database {}/{}, copy {} entries",
tablespace_id, db_id, n
);
Ok(())
}
}
pub fn get_stats() -> PageCacheStats {

View File

@@ -2,8 +2,8 @@ use crate::pg_constants;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use std::cmp::min;
use thiserror::Error;
use std::str;
use thiserror::Error;
const XLOG_BLCKSZ: u32 = 8192;
@@ -378,17 +378,41 @@ pub struct XlSmgrTruncate {
pub flags: u32,
}
pub fn decode_truncate_record(decoded: &DecodedWALRecord) -> XlSmgrTruncate {
let mut buf = decoded.record.clone();
buf.advance((SizeOfXLogRecord + 2) as usize);
XlSmgrTruncate {
blkno: buf.get_u32_le(),
rnode: RelFileNode {
spcnode: buf.get_u32_le(), /* tablespace */
dbnode: buf.get_u32_le(), /* database */
relnode: buf.get_u32_le(), /* relation */
},
flags: buf.get_u32_le(),
impl XlSmgrTruncate {
pub fn decode(decoded: &DecodedWALRecord) -> XlSmgrTruncate {
let mut buf = decoded.record.clone();
buf.advance((SizeOfXLogRecord + 2) as usize);
XlSmgrTruncate {
blkno: buf.get_u32_le(),
rnode: RelFileNode {
spcnode: buf.get_u32_le(), /* tablespace */
dbnode: buf.get_u32_le(), /* database */
relnode: buf.get_u32_le(), /* relation */
},
flags: buf.get_u32_le(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlCreateDatabase {
pub db_id: Oid,
pub tablespace_id: Oid,
pub src_db_id: Oid,
pub src_tablespace_id: Oid,
}
impl XlCreateDatabase {
pub fn decode(decoded: &DecodedWALRecord) -> XlCreateDatabase {
let mut buf = decoded.record.clone();
buf.advance((SizeOfXLogRecord + 2) as usize);
XlCreateDatabase {
db_id: buf.get_u32_le(),
tablespace_id: buf.get_u32_le(),
src_db_id: buf.get_u32_le(),
src_tablespace_id: buf.get_u32_le(),
}
}
}
@@ -678,38 +702,34 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
blocks.push(blk);
//TODO parse abort record to extract subtrans entries
}
}
else if xl_rmid == pg_constants::RM_DBASE_ID
{
} else if xl_rmid == pg_constants::RM_DBASE_ID {
let info = xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::XLOG_DBASE_CREATE
{
if info == pg_constants::XLOG_DBASE_CREATE {
//buf points to main_data
let db_id = buf.get_u32_le();
let tablespace_id = buf.get_u32_le();
let src_db_id = buf.get_u32_le();
let src_tablespace_id = buf.get_u32_le();
trace!("XLOG_DBASE_CREATE db_id {} src_db_id {}", db_id, src_db_id);
let db_id = buf.get_u32_le();
let tablespace_id = buf.get_u32_le();
let src_db_id = buf.get_u32_le();
let src_tablespace_id = buf.get_u32_le();
trace!(
"XLOG_DBASE_CREATE tablespace_id/db_id {}/{} src_db_id {}/{}",
tablespace_id,
db_id,
src_tablespace_id,
src_db_id
);
// in postgres it is implemented as copydir
// we need to copy all pages in page_cache
}
else
{
} else {
trace!("XLOG_DBASE_DROP is not handled yet");
}
}
else if xl_rmid == pg_constants::RM_TBLSPC_ID
{
} else if xl_rmid == pg_constants::RM_TBLSPC_ID {
let info = xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::XLOG_TBLSPC_CREATE
{
if info == pg_constants::XLOG_TBLSPC_CREATE {
//buf points to main_data
let ts_id = buf.get_u32_le();
let ts_id = buf.get_u32_le();
let ts_path = str::from_utf8(&buf).unwrap();
trace!("XLOG_TBLSPC_CREATE ts_id {} ts_path {}", ts_id, ts_path);
}
else
{
} else {
trace!("XLOG_TBLSPC_DROP is not handled yet");
}
}

View File

@@ -248,7 +248,7 @@ async fn walreceiver_main(
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = decode_truncate_record(&decoded);
let truncate = XlSmgrTruncate::decode(&decoded);
if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 {
let tag = BufferTag {
rel: RelTag {
@@ -268,6 +268,18 @@ async fn walreceiver_main(
};
pcache.put_rel_wal_record(tag, rec).await?;
}
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&decoded);
pcache.create_database(
lsn,
createdb.db_id,
createdb.tablespace_id,
createdb.src_db_id,
createdb.src_tablespace_id,
)?;
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN