From 2dbbb8c59b05313991453ccddc4258f5c7e5ebbe Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 22 Apr 2021 10:12:22 +0300 Subject: [PATCH] Address issues from Eric's review --- control_plane/src/compute.rs | 5 ++--- control_plane/src/storage.rs | 2 +- pageserver/src/bin/pageserver.rs | 9 ++------ pageserver/src/page_cache.rs | 35 ++++++++++++++++---------------- 4 files changed, 23 insertions(+), 28 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 9807756232..1279d941e7 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -17,8 +17,7 @@ use postgres::{Client, NoTls}; use crate::local_env::LocalEnv; use crate::storage::{PageServerNode, WalProposerNode}; -use pageserver::zenith_repo_dir; -use pageserver::ZTimelineId; +use pageserver::{zenith_repo_dir, ZTimelineId}; // // ComputeControlPlane @@ -450,7 +449,7 @@ impl PostgresNode { self.safe_psql("postgres", "CREATE DATABASE regression"); let data_dir = zenith_repo_dir(); let regress_run_path = data_dir.join("regress"); - fs::create_dir_all(regress_run_path.clone()).unwrap(); + fs::create_dir_all(®ress_run_path).unwrap(); fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap(); std::env::set_current_dir(regress_run_path).unwrap(); diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 28198a3008..1a85de68f9 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -111,7 +111,7 @@ impl TestStorageControlPlane { pub fn stop(&self) { for wa in self.wal_acceptors.iter() { - let _unused = wa.stop(); + let _ = wa.stop(); } self.test_done.store(true, Ordering::Relaxed); } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 8801e5de14..062fc4eb21 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -4,8 +4,7 @@ use log::*; use parse_duration::parse; -use std::fs; -use std::fs::OpenOptions; +use std::fs::{self, OpenOptions}; use std::io; use std::path::PathBuf; use std::process::exit; @@ -18,11 +17,7 @@ use daemonize::Daemonize; use slog::Drain; -use pageserver::page_service; -use pageserver::tui; -use pageserver::zenith_repo_dir; -//use pageserver::walreceiver; -use pageserver::PageServerConf; +use pageserver::{page_service, tui, zenith_repo_dir, PageServerConf}; const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index c4723315f5..0777230ac3 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -15,7 +15,7 @@ use crossbeam_channel::unbounded; use crossbeam_channel::{Receiver, Sender}; use lazy_static::lazy_static; use log::*; -use rocksdb::*; +use rocksdb; use std::cmp::min; use std::collections::HashMap; use std::sync::atomic::Ordering; @@ -33,7 +33,7 @@ pub struct PageCache { shared: Mutex, // RocksDB handle - db: DB, + db: rocksdb::DB, // Channel for communicating with the WAL redo process here. pub walredo_sender: Sender>, @@ -168,13 +168,13 @@ fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { pcache.do_gc(conf).unwrap(); } -fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> DB { +fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB { let path = zenith_repo_dir().join(timelineid.to_string()); - let mut opts = Options::default(); + let mut opts = rocksdb::Options::default(); opts.create_if_missing(true); opts.set_use_fsync(true); - opts.set_compression_type(DBCompressionType::Lz4); - DB::open(&opts, &path).unwrap() + opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + rocksdb::DB::open(&opts, &path).unwrap() } fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache { @@ -309,7 +309,7 @@ impl RelTag { buf.put_u32(self.spcnode); buf.put_u32(self.dbnode); buf.put_u32(self.relnode); - buf.put_u32(self.forknum as u32); + buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres } pub fn unpack(buf: &mut BytesMut) -> RelTag { RelTag { @@ -381,10 +381,11 @@ impl WALRecord { // Public interface functions impl PageCache { + fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { let mut minbuf = BytesMut::new(); let mut maxbuf = BytesMut::new(); - let cf = self.db.cf_handle(DEFAULT_COLUMN_FAMILY_NAME).unwrap(); + let cf = self.db.cf_handle(rocksdb::DEFAULT_COLUMN_FAMILY_NAME).unwrap(); loop { thread::sleep(conf.gc_period); let last_lsn = self.get_last_valid_lsn(); @@ -407,7 +408,7 @@ impl PageCache { maxkey.pack(&mut maxbuf); let mut iter = self .db - .iterator(IteratorMode::From(&maxbuf[..], Direction::Reverse)); + .iterator(rocksdb::IteratorMode::From(&maxbuf[..], rocksdb::Direction::Reverse)); if let Some((k, v)) = iter.next() { minbuf.clear(); minbuf.extend_from_slice(&v); @@ -437,7 +438,7 @@ impl PageCache { // locate most recent record before horizon let mut iter = self .db - .iterator(IteratorMode::From(&maxbuf[..], Direction::Reverse)); + .iterator(rocksdb::IteratorMode::From(&maxbuf[..], rocksdb::Direction::Reverse)); if let Some((k, v)) = iter.next() { minbuf.clear(); minbuf.extend_from_slice(&v); @@ -538,18 +539,18 @@ impl PageCache { let mut buf = BytesMut::new(); minkey.pack(&mut buf); - let mut readopts = ReadOptions::default(); + let mut readopts = rocksdb::ReadOptions::default(); readopts.set_iterate_lower_bound(buf.to_vec()); buf.clear(); maxkey.pack(&mut buf); let mut iter = self .db - .iterator_opt(IteratorMode::From(&buf[..], Direction::Reverse), readopts); + .iterator_opt(rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), readopts); let entry_opt = iter.next(); if entry_opt.is_none() { - static ZERO_PAGE: [u8; 8192] = [0 as u8; 8192]; + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; return Ok(Bytes::from_static(&ZERO_PAGE)); /* return Err("could not find page image")?; */ } @@ -606,14 +607,14 @@ impl PageCache { let mut buf = BytesMut::new(); minkey.pack(&mut buf); - let mut readopts = ReadOptions::default(); + let mut readopts = rocksdb::ReadOptions::default(); readopts.set_iterate_lower_bound(buf.to_vec()); buf.clear(); entry.key.pack(&mut buf); let iter = self .db - .iterator_opt(IteratorMode::From(&buf[..], Direction::Reverse), readopts); + .iterator_opt(rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), readopts); let mut base_img: Option = None; let mut records: Vec = Vec::new(); @@ -826,7 +827,7 @@ impl PageCache { key.pack(&mut buf); let mut iter = self .db - .iterator(IteratorMode::From(&buf[..], Direction::Reverse)); + .iterator(rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse)); if let Some((k, v)) = iter.next() { buf.clear(); buf.extend_from_slice(&k); @@ -867,7 +868,7 @@ impl PageCache { key.pack(&mut buf); let mut iter = self .db - .iterator(IteratorMode::From(&buf[..], Direction::Reverse)); + .iterator(rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse)); if let Some((k, _v)) = iter.next() { buf.clear(); buf.extend_from_slice(&k);