Merge with main branch

This commit is contained in:
Konstantin Knizhnik
2021-04-21 16:10:05 +03:00
parent 07507274c0
commit d8fa2ec367
12 changed files with 47 additions and 98 deletions

View File

@@ -19,6 +19,7 @@ use postgres::{Client, NoTls};
use crate::local_env::LocalEnv;
use crate::storage::{PageServerNode, WalProposerNode};
use pageserver::ZTimelineId;
use pageserver::zenith_repo_dir;
//
// ComputeControlPlane
@@ -449,8 +450,8 @@ impl PostgresNode {
pub fn pg_regress(&self) {
self.safe_psql("postgres", "CREATE DATABASE regression");
let regress_run_path = self.env.data_dir.join("regress");
let data_dir = zenith_repo_dir();
let regress_run_path = data_dir.join("regress");
fs::create_dir_all(regress_run_path.clone()).unwrap();
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();

View File

@@ -16,6 +16,7 @@ use anyhow::Result;
use serde_derive::{Deserialize, Serialize};
use pageserver::ZTimelineId;
use pageserver::zenith_repo_dir;
use walkeeper::xlog_utils;
//
@@ -52,14 +53,6 @@ impl LocalEnv {
}
}
fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}
//
// Initialize a new Zenith repository
//

View File

@@ -13,7 +13,6 @@ use std::time::Duration;
use postgres::{Client, NoTls};
use crate::compute::PostgresNode;
use crate::local_env::LocalEnv;
use pageserver::ZTimelineId;

View File

@@ -69,12 +69,15 @@ fn test_regress() {
// Runs pg_bench on a compute node
#[test]
fn pgbench() {
let local_env = local_env::test_env("pgbench");
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server(String::new());
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
// start postgres
let node = compute_cplane.new_test_node();
let maintli = storage_cplane.get_branch_timeline("main");
let node = compute_cplane.new_test_node(maintli);
node.start().unwrap();
node.pg_bench(10, 100);

View File

@@ -4,12 +4,11 @@
use log::*;
use std::fs;
use std::fs::{File, OpenOptions};
use std::fs::OpenOptions;
use std::io;
use std::path::PathBuf;
use std::process::exit;
use std::thread;
use std::fs::OpenOptions;
use anyhow::{Context, Result};
use clap::{App, Arg};
@@ -18,18 +17,11 @@ use daemonize::Daemonize;
use slog::Drain;
use pageserver::page_service;
use pageserver::zenith_repo_dir;
use pageserver::tui;
//use pageserver::walreceiver;
use pageserver::PageServerConf;
fn zenith_repo_dir() -> String {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => String::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}
fn main() -> Result<()> {
let arg_matches = App::new("Zenith page server")
.about("Materializes WAL stream to pages and serves them to the postgres")
@@ -140,7 +132,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
// does this for us.
let repodir = zenith_repo_dir();
std::env::set_current_dir(&repodir)?;
info!("Changed current directory to repository in {}", &repodir);
info!("Changed current directory to repository in {:?}", &repodir);
}
let mut threads = Vec::new();
@@ -186,7 +178,7 @@ fn init_logging(conf: &PageServerConf) -> Result<slog_scope::GlobalLoggerGuard,
if conf.interactive {
Ok(tui::init_logging())
} else if conf.daemonize {
let log = zenith_repo_dir() + "/pageserver.log";
let log = zenith_repo_dir().join("pageserver.log");
let log_file = OpenOptions::new()
.create(true)
.append(true)

View File

@@ -1,6 +1,7 @@
use std::fmt;
use std::net::SocketAddr;
use std::str::FromStr;
use std::path::PathBuf;
pub mod basebackup;
pub mod page_cache;
@@ -58,3 +59,11 @@ impl fmt::Display for ZTimelineId {
f.write_str(&hex::encode(self.0))
}
}
pub fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}

View File

@@ -8,7 +8,7 @@
use crate::restore_local_repo::restore_timeline;
use crate::ZTimelineId;
use crate::{walredo, PageServerConf};
use crate::{walredo, PageServerConf, zenith_repo_dir};
use anyhow::bail;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crossbeam_channel::unbounded;
@@ -150,8 +150,8 @@ pub fn get_or_restore_pagecache(
}
}
fn open_rocksdb(conf: &PageServerConf, timelineid: u64) -> DB {
let path = conf.data_dir.join(timelineid.to_string());
fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> DB {
let path = zenith_repo_dir().join(timelineid.to_string());
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_use_fsync(true);
@@ -159,7 +159,7 @@ fn open_rocksdb(conf: &PageServerConf, timelineid: u64) -> DB {
DB::open(&opts, &path).unwrap()
}
fn init_page_cache(conf: &PageServerConf, timelineid: u64) -> PageCache {
fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache {
// Initialize the channel between the page cache and the WAL applicator
let (s, r) = unbounded();

View File

@@ -428,12 +428,8 @@ pub fn thread_main(conf: &PageServerConf) {
loop {
let (socket, peer_addr) = listener.accept().await.unwrap();
debug!("accepted connection from {}", peer_addr);
<<<<<<< HEAD
socket.set_nodelay(true).unwrap();
let mut conn_handler = Connection::new(conf.clone(), socket);
=======
let mut conn_handler = Connection::new(conf.clone(), socket, &runtime_ref);
>>>>>>> main
task::spawn(async move {
if let Err(err) = conn_handler.run().await {
@@ -788,19 +784,11 @@ impl Connection {
loop {
let message = self.read_message().await?;
<<<<<<< HEAD
/*
if let Some(m) = &message {
trace!("query({}): {:?}", sysid, m);
};
*/
=======
if let Some(m) = &message {
info!("query({:?}): {:?}", timelineid, m);
trace!("query({:?}): {:?}", timelineid, m);
};
>>>>>>> main
if message.is_none() {
// connection was closed
return Ok(());
@@ -869,41 +857,6 @@ impl Connection {
self.write_message(&msg).await?
}
<<<<<<< HEAD
=======
Some(FeMessage::ZenithCreateRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
};
pcache.relsize_inc(&tag, 0);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0,
}))
.await?
}
Some(FeMessage::ZenithExtendRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
};
pcache.relsize_inc(&tag, req.blkno + 1);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0,
}))
.await?
}
>>>>>>> main
_ => {}
}
}

View File

@@ -27,6 +27,7 @@ use anyhow::Result;
use bytes::Bytes;
use crate::page_cache;
use crate::page_cache::RelTag;
use crate::page_cache::BufferTag;
use crate::page_cache::PageCache;
use crate::waldecoder::WalStreamDecoder;
@@ -202,11 +203,13 @@ fn restore_relfile(
let r = file.read_exact(&mut buf);
match r {
Ok(_) => {
let tag = page_cache::BufferTag {
spcnode: spcoid,
dbnode: dboid,
relnode: relnode,
forknum: forknum as u8,
let tag = BufferTag {
rel: RelTag {
spcnode: spcoid,
dbnode: dboid,
relnode: relnode,
forknum: forknum as u8,
},
blknum: blknum,
};
pcache.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
@@ -233,14 +236,6 @@ fn restore_relfile(
blknum += 1;
}
let tag = page_cache::RelTag {
spcnode: spcoid,
dbnode: dboid,
relnode: relnode,
forknum: forknum as u8,
};
pcache.relsize_inc(&tag, blknum);
Ok(())
}
@@ -308,16 +303,19 @@ fn restore_wal(
// so having multiple copies of it doesn't cost that much)
for blk in decoded.blocks.iter() {
let tag = BufferTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
rel: RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
},
blknum: blk.blkno,
};
let rec = page_cache::WALRecord {
lsn: lsn,
will_init: blk.will_init || blk.apply_image,
truncate: false,
rec: recdata.clone(),
};

View File

@@ -21,6 +21,7 @@ use std::fs;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::Error;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
@@ -171,7 +172,7 @@ impl WalRedoProcess {
// Limit shared cache for wal-redo-postres
let mut config = OpenOptions::new()
.append(true)
.open(datadir.join("postgresql.conf"))?;
.open(PathBuf::from(&datadir).join("postgresql.conf"))?;
config.write(b"shared_buffers=128kB\n")?;
config.write(b"fsync=off\n")?;
}

View File

@@ -16,4 +16,4 @@ crc32c = "0.6.0"
hex = "0.4.3"
[build-dependencies]
bindgen = "0.53.1"
bindgen = "0.57"