cargo fmt

This commit is contained in:
Eric Seppanen
2021-04-24 16:03:44 -07:00
parent 968cd8f20c
commit 1cb9b5523b
2 changed files with 43 additions and 39 deletions

View File

@@ -31,6 +31,7 @@
use crate::restore_local_repo::restore_timeline;
use crate::waldecoder::Oid;
use crate::walredo::WalRedoManager;
use crate::ZTimelineId;
use crate::{zenith_repo_dir, PageServerConf};
use anyhow::{bail, Context};
@@ -47,7 +48,6 @@ use std::thread;
use std::time::{Duration, Instant};
use std::{convert::TryInto, ops::AddAssign};
use zenith_utils::seqwait::SeqWait;
use crate::walredo::WalRedoManager;
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
@@ -177,7 +177,10 @@ fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
info!("Garbage collection thread started {}", timelineid);
let pcache = get_pagecache(conf, timelineid).unwrap();
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(pcache.do_gc(conf)).unwrap();
}
@@ -199,7 +202,6 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB
}
fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache {
PageCache {
shared: Mutex::new(PageCacheShared {
first_valid_lsn: 0,
@@ -378,7 +380,6 @@ impl WALRecord {
}
impl PageCache {
// Public GET interface functions
///
@@ -414,8 +415,8 @@ impl PageCache {
if let Some(img) = &content.page_image {
page_img = img.clone();
} else if content.wal_record.is_some() {
// Request the WAL redo manager to apply the WAL records for us.
page_img = self.walredo_mgr.request_redo(tag, lsn).await?;
// Request the WAL redo manager to apply the WAL records for us.
page_img = self.walredo_mgr.request_redo(tag, lsn).await?;
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
@@ -493,9 +494,13 @@ impl PageCache {
/// Returns an old page image (if any), and a vector of WAL records to apply
/// over it.
///
pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option<Bytes>, Vec<WALRecord>) {
pub fn collect_records_for_apply(
&self,
tag: BufferTag,
lsn: u64,
) -> (Option<Bytes>, Vec<WALRecord>) {
let mut buf = BytesMut::new();
let key = CacheKey { tag, lsn };
let key = CacheKey { tag, lsn };
key.pack(&mut buf);
let mut base_img: Option<Bytes> = None;
@@ -779,7 +784,6 @@ impl PageCache {
// The caller must ensure that WAL has been received up to 'lsn'.
//
fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<u32> {
//assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire));
let mut key = CacheKey {
@@ -827,7 +831,6 @@ impl PageCache {
}
async fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
let mut buf = BytesMut::new();
loop {
thread::sleep(conf.gc_period);
@@ -896,7 +899,7 @@ impl PageCache {
let key = CacheKey::unpack(&mut buf);
if key.tag == maxkey.tag {
let v = iter.value().unwrap();
if (v[0] & PAGE_IMAGE_FLAG) == 0 {
if (v[0] & PAGE_IMAGE_FLAG) == 0 {
trace!("Reconstruct horizon page {:?}", key);
self.walredo_mgr.request_redo(key.tag, key.lsn).await?;
truncated += 1;

View File

@@ -29,7 +29,7 @@ use std::time::Instant;
use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::sync::{oneshot, mpsc};
use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout;
use bytes::{Buf, BufMut, Bytes, BytesMut};
@@ -82,22 +82,20 @@ pub enum WalRedoError {
///
/// Public interface of WAL redo manager
///
impl WalRedoManager
{
impl WalRedoManager {
///
/// Create a new WalRedoManager.
///
/// This only initializes the struct. You need to call WalRedoManager::launch to
/// start the thread that processes the requests.
pub fn new(conf: &PageServerConf, timelineid: ZTimelineId) -> WalRedoManager {
let (tx, rx) = mpsc::unbounded_channel();
WalRedoManager {
conf: conf.clone(),
timelineid,
request_tx: tx,
request_rx: Mutex::new(Some(rx))
request_rx: Mutex::new(Some(rx)),
}
}
@@ -105,7 +103,6 @@ impl WalRedoManager
/// Launch the WAL redo thread
///
pub fn launch(&self, pcache: Arc<PageCache>) {
// Get mutable references to the values that we need to pass to the
// thread.
let request_rx = self.request_rx.lock().unwrap().take().unwrap();
@@ -117,7 +114,6 @@ impl WalRedoManager
let _walredo_thread = std::thread::Builder::new()
.name("WAL redo thread".into())
.spawn(move || {
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
// runtime for the async part.
@@ -143,18 +139,21 @@ impl WalRedoManager
/// of the given page version.
///
pub async fn request_redo(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, WalRedoError> {
// Create a channel where to receive the response
let (tx, rx) = oneshot::channel::<Result<Bytes, WalRedoError>>();
let request = WalRedoRequest {
tag, lsn,
tag,
lsn,
response_channel: tx,
};
self.request_tx.send(request).expect("could not send WAL redo request");
self.request_tx
.send(request)
.expect("could not send WAL redo request");
rx.await.expect("could not receive response to WAL redo request")
rx.await
.expect("could not receive response to WAL redo request")
}
}
@@ -162,12 +161,10 @@ impl WalRedoManager
/// WAL redo thread
///
impl WalRedoManagerInternal {
//
// Main entry point for the WAL applicator thread.
//
async fn wal_redo_main(&mut self) {
info!("WAL redo thread started {}", self.timelineid);
// Loop forever, handling requests as they come.
@@ -213,12 +210,13 @@ impl WalRedoManagerInternal {
}
}
fn transaction_id_set_status_bit(&self,
xl_info: u8,
xl_rmid: u8,
xl_xid: u32,
record: WALRecord,
page: &mut BytesMut,
fn transaction_id_set_status_bit(
&self,
xl_info: u8,
xl_rmid: u8,
xl_xid: u32,
record: WALRecord,
page: &mut BytesMut,
) {
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
let mut status = 0;
@@ -242,11 +240,11 @@ impl WalRedoManagerInternal {
record.main_data_offset, record.rec.len());
let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32)
/ pg_constants::CLOG_XACTS_PER_BYTE) as usize;
/ pg_constants::CLOG_XACTS_PER_BYTE) as usize;
let byteptr = &mut page[byteno..byteno + 1];
let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE)
* pg_constants::CLOG_BITS_PER_XACT as u32) as u8;
* pg_constants::CLOG_BITS_PER_XACT as u32) as u8;
let mut curval = byteptr[0];
curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK;
@@ -269,9 +267,10 @@ impl WalRedoManagerInternal {
///
/// Process one request for WAL redo.
///
async fn handle_apply_request(&self,
process: &WalRedoProcess,
request: &WalRedoRequest,
async fn handle_apply_request(
&self,
process: &WalRedoProcess,
request: &WalRedoRequest,
) -> Result<Bytes, WalRedoError> {
let pcache = &self.pcache;
let tag = request.tag;
@@ -372,7 +371,8 @@ impl WalRedoProcess {
.args(&["-D", datadir])
.arg("-N")
.output()
.await.expect("failed to execute initdb");
.await
.expect("failed to execute initdb");
if !initdb.status.success() {
panic!(
@@ -436,7 +436,8 @@ impl WalRedoProcess {
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
async fn apply_wal_records(&self,
async fn apply_wal_records(
&self,
tag: BufferTag,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
@@ -451,13 +452,13 @@ impl WalRedoProcess {
TIMEOUT,
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
)
.await??;
.await??;
if base_img.is_some() {
timeout(
TIMEOUT,
stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())),
)
.await??;
.await??;
}
// Send WAL records.