Merge with main branch

This commit is contained in:
Konstantin Knizhnik
2021-04-19 17:00:30 +03:00
parent 8aa3013ec2
commit 95160dee6d
8 changed files with 88 additions and 87 deletions

View File

@@ -7,6 +7,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::{collections::BTreeMap, path::PathBuf};
use std::{io::Write, net::SocketAddr};
use std::path::Path;
use lazy_static::lazy_static;
use postgres::{Client, NoTls};
@@ -246,7 +247,9 @@ impl PostgresNode {
max_replication_slots = 10\n\
hot_standby = on\n\
shared_buffers = 1MB\n\
fsync = off\n\
max_connections = 100\n\
wal_sender_timeout = 0\n\
wal_level = replica\n\
listen_addresses = '{address}'\n\
port = {port}\n",
@@ -415,8 +418,69 @@ impl PostgresNode {
}
}
// TODO
pub fn pg_bench() {}
pub fn pg_regress(&self) {
self.safe_psql("postgres", "CREATE DATABASE regression");
let regress_run_path = self.env.data_dir.join("regress");
fs::create_dir_all(regress_run_path.clone()).unwrap();
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();
let regress_build_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
let regress_src_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
let _regress_check = Command::new(regress_build_path.join("pg_regress"))
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
regress_src_path.join("parallel_schedule").to_str().unwrap()
)
.as_str(),
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("PGPORT", self.address.port().to_string())
.env("PGUSER", self.whoami())
.env("PGHOST", self.address.ip().to_string())
.status()
.expect("pg_regress failed");
}
pub fn pg_bench(&self, clients: u32, seconds: u32) {
let port = self.address.port().to_string();
let clients = clients.to_string();
let seconds = seconds.to_string();
let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&["-i", "-p", port.as_str(), "postgres"])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench -i");
let _pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&[
"-p",
port.as_str(),
"-T",
seconds.as_str(),
"-P",
"1",
"-c",
clients.as_str(),
"-M",
"prepared",
"postgres",
])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench run");
}
}
impl Drop for PostgresNode {

View File

@@ -12,7 +12,6 @@ use std::time::Duration;
use postgres::{Client, NoTls};
use crate::compute::PostgresNode;
use crate::local_env::{self, LocalEnv};
type Result<T> = std::result::Result<T, Box<dyn error::Error>>;
@@ -104,6 +103,9 @@ impl TestStorageControlPlane {
}
pub fn stop(&self) {
for wa in self.wal_acceptors.iter() {
let _unused = wa.stop();
}
self.test_done.store(true, Ordering::Relaxed);
}
@@ -350,42 +352,6 @@ impl Drop for WalProposerNode {
}
}
///////////////////////////////////////////////////////////////////////////////
pub fn regress_check(pg: &PostgresNode) {
pg.safe_psql("postgres", "CREATE DATABASE regression");
let regress_run_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_check/regress");
fs::create_dir_all(regress_run_path.clone()).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();
let regress_build_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
let regress_src_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
let _regress_check = Command::new(regress_build_path.join("pg_regress"))
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", pg.env.pg_bin_dir().to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
regress_src_path.join("parallel_schedule").to_str().unwrap()
)
.as_str(),
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap())
.env("PGHOST", pg.address.ip().to_string())
.env("PGPORT", pg.address.port().to_string())
.env("PGUSER", pg.whoami())
.status()
.expect("pg_regress failed");
}
/// Read a PID file
///
/// This should contain an unsigned integer, but we return it as a String

View File

@@ -60,7 +60,6 @@ fn test_regress() {
let node = compute_cplane.new_test_node();
node.start().unwrap();
<<<<<<< HEAD
node.pg_regress();
}
@@ -68,17 +67,14 @@ fn test_regress() {
#[test]
fn pgbench() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::one_page_server(String::new());
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
// start postgres
let node = compute_cplane.new_node();
node.start(&storage_cplane);
let node = compute_cplane.new_test_node();
node.start().unwrap();
node.pg_bench(10, 100);
=======
control_plane::storage::regress_check(&node);
>>>>>>> main
}
// Run two postgres instances on one pageserver

View File

@@ -29,9 +29,10 @@ daemonize = "0.4.1"
rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", rev="7f15a24ec7daa0a5d9516da706212745f9042818", features = ["no-verify-ssl"] }
tokio = { version = "1.3.0", features = ["full"] }
tokio-stream = { version = "0.1.4" }
tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
postgres-protocol = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb.git" }
anyhow = "1.0"
crc32c = "0.6.0"

View File

@@ -224,11 +224,11 @@ fn init_logging(conf: &PageServerConf) -> Result<slog_scope::GlobalLoggerGuard,
let log_file = OpenOptions::new()
.create(true)
.append(true)
.open(log)
.unwrap_or_else(|_| {
.open(&log).map_err(|err| {
// We failed to initialize logging, so we can't log this message with error!
eprintln!("Could not create log file {:?}: {}", log, err);
err
})?;
})?;
let decorator = slog_term::PlainSyncDecorator::new(log_file);
let drain = slog_term::CompactFormat::new(decorator).build();

View File

@@ -9,20 +9,17 @@
use crate::{walredo, PageServerConf};
use anyhow::bail;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use core::ops::Bound::Included;
use crossbeam_channel::unbounded;
use crossbeam_channel::{Receiver, Sender};
use lazy_static::lazy_static;
use log::*;
use rand::Rng;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
use std::{convert::TryInto, ops::AddAssign};
use lazy_static::lazy_static;
use rocksdb::*;
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
@@ -253,19 +250,6 @@ impl CacheEntryContent {
}
}
impl CacheEntry {
fn new(key: CacheKey) -> CacheEntry {
CacheEntry {
key,
content: Mutex::new(CacheEntryContent {
page_image: None,
wal_record: None,
apply_pending: false,
}
}
}
}
impl CacheEntry {
fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry {
CacheEntry {
@@ -404,18 +388,6 @@ impl PageCache {
lsn & 0xffff_ffff
);
}
let pagecache = &shared.pagecache;
let mut entries = pagecache.range((Included(&minkey), Included(&maxkey)));
let entry_opt = entries.next_back();
if entry_opt.is_none() {
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
}
let mut buf = BytesMut::new();
minkey.pack(&mut buf);

View File

@@ -324,10 +324,12 @@ async fn slurp_base_file(
while bytes.remaining() >= 8192 {
let tag = page_cache::BufferTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
rel: page_cache::RelTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
},
blknum: blknum,
};

View File

@@ -8,7 +8,7 @@
use crate::page_cache;
use crate::page_cache::{BufferTag, RelTag};
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::waldecoder::*;
use crate::PageServerConf;
use anyhow::Error;
use log::*;