From 1cb9b5523be75be37f60bb4252887a8278d1ae2e Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Sat, 24 Apr 2021 16:03:44 -0700 Subject: [PATCH] cargo fmt --- pageserver/src/page_cache.rs | 25 +++++++++------- pageserver/src/walredo.rs | 57 ++++++++++++++++++------------------ 2 files changed, 43 insertions(+), 39 deletions(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 4490829753..983221092e 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -31,6 +31,7 @@ use crate::restore_local_repo::restore_timeline; use crate::waldecoder::Oid; +use crate::walredo::WalRedoManager; use crate::ZTimelineId; use crate::{zenith_repo_dir, PageServerConf}; use anyhow::{bail, Context}; @@ -47,7 +48,6 @@ use std::thread; use std::time::{Duration, Instant}; use std::{convert::TryInto, ops::AddAssign}; use zenith_utils::seqwait::SeqWait; -use crate::walredo::WalRedoManager; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); @@ -177,7 +177,10 @@ fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { info!("Garbage collection thread started {}", timelineid); let pcache = get_pagecache(conf, timelineid).unwrap(); - let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); runtime.block_on(pcache.do_gc(conf)).unwrap(); } @@ -199,7 +202,6 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB } fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache { - PageCache { shared: Mutex::new(PageCacheShared { first_valid_lsn: 0, @@ -378,7 +380,6 @@ impl WALRecord { } impl PageCache { - // Public GET interface functions /// @@ -414,8 +415,8 @@ impl PageCache { if let Some(img) = &content.page_image { page_img = img.clone(); } else if content.wal_record.is_some() { - // Request the WAL redo manager to apply the WAL records for us. - page_img = self.walredo_mgr.request_redo(tag, lsn).await?; + // Request the WAL redo manager to apply the WAL records for us. + page_img = self.walredo_mgr.request_redo(tag, lsn).await?; } else { // No base image, and no WAL record. Huh? bail!("no page image or WAL record for requested page"); @@ -493,9 +494,13 @@ impl PageCache { /// Returns an old page image (if any), and a vector of WAL records to apply /// over it. /// - pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option, Vec) { + pub fn collect_records_for_apply( + &self, + tag: BufferTag, + lsn: u64, + ) -> (Option, Vec) { let mut buf = BytesMut::new(); - let key = CacheKey { tag, lsn }; + let key = CacheKey { tag, lsn }; key.pack(&mut buf); let mut base_img: Option = None; @@ -779,7 +784,6 @@ impl PageCache { // The caller must ensure that WAL has been received up to 'lsn'. // fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { - //assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire)); let mut key = CacheKey { @@ -827,7 +831,6 @@ impl PageCache { } async fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { - let mut buf = BytesMut::new(); loop { thread::sleep(conf.gc_period); @@ -896,7 +899,7 @@ impl PageCache { let key = CacheKey::unpack(&mut buf); if key.tag == maxkey.tag { let v = iter.value().unwrap(); - if (v[0] & PAGE_IMAGE_FLAG) == 0 { + if (v[0] & PAGE_IMAGE_FLAG) == 0 { trace!("Reconstruct horizon page {:?}", key); self.walredo_mgr.request_redo(key.tag, key.lsn).await?; truncated += 1; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 4110ceef43..4fca14f5e6 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -29,7 +29,7 @@ use std::time::Instant; use tokio::io::AsyncBufReadExt; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; -use tokio::sync::{oneshot, mpsc}; +use tokio::sync::{mpsc, oneshot}; use tokio::time::timeout; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -82,22 +82,20 @@ pub enum WalRedoError { /// /// Public interface of WAL redo manager /// -impl WalRedoManager -{ +impl WalRedoManager { /// /// Create a new WalRedoManager. /// /// This only initializes the struct. You need to call WalRedoManager::launch to /// start the thread that processes the requests. pub fn new(conf: &PageServerConf, timelineid: ZTimelineId) -> WalRedoManager { - let (tx, rx) = mpsc::unbounded_channel(); WalRedoManager { conf: conf.clone(), timelineid, request_tx: tx, - request_rx: Mutex::new(Some(rx)) + request_rx: Mutex::new(Some(rx)), } } @@ -105,7 +103,6 @@ impl WalRedoManager /// Launch the WAL redo thread /// pub fn launch(&self, pcache: Arc) { - // Get mutable references to the values that we need to pass to the // thread. let request_rx = self.request_rx.lock().unwrap().take().unwrap(); @@ -117,7 +114,6 @@ impl WalRedoManager let _walredo_thread = std::thread::Builder::new() .name("WAL redo thread".into()) .spawn(move || { - // We block on waiting for requests on the walredo request channel, but // use async I/O to communicate with the child process. Initialize the // runtime for the async part. @@ -143,18 +139,21 @@ impl WalRedoManager /// of the given page version. /// pub async fn request_redo(&self, tag: BufferTag, lsn: u64) -> Result { - // Create a channel where to receive the response let (tx, rx) = oneshot::channel::>(); let request = WalRedoRequest { - tag, lsn, + tag, + lsn, response_channel: tx, }; - self.request_tx.send(request).expect("could not send WAL redo request"); + self.request_tx + .send(request) + .expect("could not send WAL redo request"); - rx.await.expect("could not receive response to WAL redo request") + rx.await + .expect("could not receive response to WAL redo request") } } @@ -162,12 +161,10 @@ impl WalRedoManager /// WAL redo thread /// impl WalRedoManagerInternal { - // // Main entry point for the WAL applicator thread. // async fn wal_redo_main(&mut self) { - info!("WAL redo thread started {}", self.timelineid); // Loop forever, handling requests as they come. @@ -213,12 +210,13 @@ impl WalRedoManagerInternal { } } - fn transaction_id_set_status_bit(&self, - xl_info: u8, - xl_rmid: u8, - xl_xid: u32, - record: WALRecord, - page: &mut BytesMut, + fn transaction_id_set_status_bit( + &self, + xl_info: u8, + xl_rmid: u8, + xl_xid: u32, + record: WALRecord, + page: &mut BytesMut, ) { let info = xl_info & pg_constants::XLOG_XACT_OPMASK; let mut status = 0; @@ -242,11 +240,11 @@ impl WalRedoManagerInternal { record.main_data_offset, record.rec.len()); let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) - / pg_constants::CLOG_XACTS_PER_BYTE) as usize; + / pg_constants::CLOG_XACTS_PER_BYTE) as usize; let byteptr = &mut page[byteno..byteno + 1]; let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE) - * pg_constants::CLOG_BITS_PER_XACT as u32) as u8; + * pg_constants::CLOG_BITS_PER_XACT as u32) as u8; let mut curval = byteptr[0]; curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK; @@ -269,9 +267,10 @@ impl WalRedoManagerInternal { /// /// Process one request for WAL redo. /// - async fn handle_apply_request(&self, - process: &WalRedoProcess, - request: &WalRedoRequest, + async fn handle_apply_request( + &self, + process: &WalRedoProcess, + request: &WalRedoRequest, ) -> Result { let pcache = &self.pcache; let tag = request.tag; @@ -372,7 +371,8 @@ impl WalRedoProcess { .args(&["-D", datadir]) .arg("-N") .output() - .await.expect("failed to execute initdb"); + .await + .expect("failed to execute initdb"); if !initdb.status.success() { panic!( @@ -436,7 +436,8 @@ impl WalRedoProcess { // Apply given WAL records ('records') over an old page image. Returns // new page image. // - async fn apply_wal_records(&self, + async fn apply_wal_records( + &self, tag: BufferTag, base_img: Option, records: Vec, @@ -451,13 +452,13 @@ impl WalRedoProcess { TIMEOUT, stdin.write_all(&build_begin_redo_for_block_msg(tag)), ) - .await??; + .await??; if base_img.is_some() { timeout( TIMEOUT, stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())), ) - .await??; + .await??; } // Send WAL records.