mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
Address issues from Eric's review
This commit is contained in:
@@ -17,8 +17,7 @@ use postgres::{Client, NoTls};
|
||||
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::storage::{PageServerNode, WalProposerNode};
|
||||
use pageserver::zenith_repo_dir;
|
||||
use pageserver::ZTimelineId;
|
||||
use pageserver::{zenith_repo_dir, ZTimelineId};
|
||||
|
||||
//
|
||||
// ComputeControlPlane
|
||||
@@ -450,7 +449,7 @@ impl PostgresNode {
|
||||
self.safe_psql("postgres", "CREATE DATABASE regression");
|
||||
let data_dir = zenith_repo_dir();
|
||||
let regress_run_path = data_dir.join("regress");
|
||||
fs::create_dir_all(regress_run_path.clone()).unwrap();
|
||||
fs::create_dir_all(®ress_run_path).unwrap();
|
||||
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
|
||||
std::env::set_current_dir(regress_run_path).unwrap();
|
||||
|
||||
|
||||
@@ -111,7 +111,7 @@ impl TestStorageControlPlane {
|
||||
|
||||
pub fn stop(&self) {
|
||||
for wa in self.wal_acceptors.iter() {
|
||||
let _unused = wa.stop();
|
||||
let _ = wa.stop();
|
||||
}
|
||||
self.test_done.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
@@ -4,8 +4,7 @@
|
||||
|
||||
use log::*;
|
||||
use parse_duration::parse;
|
||||
use std::fs;
|
||||
use std::fs::OpenOptions;
|
||||
use std::fs::{self, OpenOptions};
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
use std::process::exit;
|
||||
@@ -18,11 +17,7 @@ use daemonize::Daemonize;
|
||||
|
||||
use slog::Drain;
|
||||
|
||||
use pageserver::page_service;
|
||||
use pageserver::tui;
|
||||
use pageserver::zenith_repo_dir;
|
||||
//use pageserver::walreceiver;
|
||||
use pageserver::PageServerConf;
|
||||
use pageserver::{page_service, tui, zenith_repo_dir, PageServerConf};
|
||||
|
||||
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ use crossbeam_channel::unbounded;
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use rocksdb::*;
|
||||
use rocksdb;
|
||||
use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::Ordering;
|
||||
@@ -33,7 +33,7 @@ pub struct PageCache {
|
||||
shared: Mutex<PageCacheShared>,
|
||||
|
||||
// RocksDB handle
|
||||
db: DB,
|
||||
db: rocksdb::DB,
|
||||
|
||||
// Channel for communicating with the WAL redo process here.
|
||||
pub walredo_sender: Sender<Arc<CacheEntry>>,
|
||||
@@ -168,13 +168,13 @@ fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
|
||||
pcache.do_gc(conf).unwrap();
|
||||
}
|
||||
|
||||
fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> DB {
|
||||
fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB {
|
||||
let path = zenith_repo_dir().join(timelineid.to_string());
|
||||
let mut opts = Options::default();
|
||||
let mut opts = rocksdb::Options::default();
|
||||
opts.create_if_missing(true);
|
||||
opts.set_use_fsync(true);
|
||||
opts.set_compression_type(DBCompressionType::Lz4);
|
||||
DB::open(&opts, &path).unwrap()
|
||||
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
|
||||
rocksdb::DB::open(&opts, &path).unwrap()
|
||||
}
|
||||
|
||||
fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache {
|
||||
@@ -309,7 +309,7 @@ impl RelTag {
|
||||
buf.put_u32(self.spcnode);
|
||||
buf.put_u32(self.dbnode);
|
||||
buf.put_u32(self.relnode);
|
||||
buf.put_u32(self.forknum as u32);
|
||||
buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres
|
||||
}
|
||||
pub fn unpack(buf: &mut BytesMut) -> RelTag {
|
||||
RelTag {
|
||||
@@ -381,10 +381,11 @@ impl WALRecord {
|
||||
// Public interface functions
|
||||
|
||||
impl PageCache {
|
||||
|
||||
fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
|
||||
let mut minbuf = BytesMut::new();
|
||||
let mut maxbuf = BytesMut::new();
|
||||
let cf = self.db.cf_handle(DEFAULT_COLUMN_FAMILY_NAME).unwrap();
|
||||
let cf = self.db.cf_handle(rocksdb::DEFAULT_COLUMN_FAMILY_NAME).unwrap();
|
||||
loop {
|
||||
thread::sleep(conf.gc_period);
|
||||
let last_lsn = self.get_last_valid_lsn();
|
||||
@@ -407,7 +408,7 @@ impl PageCache {
|
||||
maxkey.pack(&mut maxbuf);
|
||||
let mut iter = self
|
||||
.db
|
||||
.iterator(IteratorMode::From(&maxbuf[..], Direction::Reverse));
|
||||
.iterator(rocksdb::IteratorMode::From(&maxbuf[..], rocksdb::Direction::Reverse));
|
||||
if let Some((k, v)) = iter.next() {
|
||||
minbuf.clear();
|
||||
minbuf.extend_from_slice(&v);
|
||||
@@ -437,7 +438,7 @@ impl PageCache {
|
||||
// locate most recent record before horizon
|
||||
let mut iter = self
|
||||
.db
|
||||
.iterator(IteratorMode::From(&maxbuf[..], Direction::Reverse));
|
||||
.iterator(rocksdb::IteratorMode::From(&maxbuf[..], rocksdb::Direction::Reverse));
|
||||
if let Some((k, v)) = iter.next() {
|
||||
minbuf.clear();
|
||||
minbuf.extend_from_slice(&v);
|
||||
@@ -538,18 +539,18 @@ impl PageCache {
|
||||
let mut buf = BytesMut::new();
|
||||
minkey.pack(&mut buf);
|
||||
|
||||
let mut readopts = ReadOptions::default();
|
||||
let mut readopts = rocksdb::ReadOptions::default();
|
||||
readopts.set_iterate_lower_bound(buf.to_vec());
|
||||
|
||||
buf.clear();
|
||||
maxkey.pack(&mut buf);
|
||||
let mut iter = self
|
||||
.db
|
||||
.iterator_opt(IteratorMode::From(&buf[..], Direction::Reverse), readopts);
|
||||
.iterator_opt(rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), readopts);
|
||||
let entry_opt = iter.next();
|
||||
|
||||
if entry_opt.is_none() {
|
||||
static ZERO_PAGE: [u8; 8192] = [0 as u8; 8192];
|
||||
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||
return Ok(Bytes::from_static(&ZERO_PAGE));
|
||||
/* return Err("could not find page image")?; */
|
||||
}
|
||||
@@ -606,14 +607,14 @@ impl PageCache {
|
||||
let mut buf = BytesMut::new();
|
||||
minkey.pack(&mut buf);
|
||||
|
||||
let mut readopts = ReadOptions::default();
|
||||
let mut readopts = rocksdb::ReadOptions::default();
|
||||
readopts.set_iterate_lower_bound(buf.to_vec());
|
||||
|
||||
buf.clear();
|
||||
entry.key.pack(&mut buf);
|
||||
let iter = self
|
||||
.db
|
||||
.iterator_opt(IteratorMode::From(&buf[..], Direction::Reverse), readopts);
|
||||
.iterator_opt(rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), readopts);
|
||||
|
||||
let mut base_img: Option<Bytes> = None;
|
||||
let mut records: Vec<WALRecord> = Vec::new();
|
||||
@@ -826,7 +827,7 @@ impl PageCache {
|
||||
key.pack(&mut buf);
|
||||
let mut iter = self
|
||||
.db
|
||||
.iterator(IteratorMode::From(&buf[..], Direction::Reverse));
|
||||
.iterator(rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse));
|
||||
if let Some((k, v)) = iter.next() {
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&k);
|
||||
@@ -867,7 +868,7 @@ impl PageCache {
|
||||
key.pack(&mut buf);
|
||||
let mut iter = self
|
||||
.db
|
||||
.iterator(IteratorMode::From(&buf[..], Direction::Reverse));
|
||||
.iterator(rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse));
|
||||
if let Some((k, _v)) = iter.next() {
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&k);
|
||||
|
||||
Reference in New Issue
Block a user