diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index c336792b83..a3f14f858d 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -19,6 +19,7 @@ use postgres::{Client, NoTls}; use crate::local_env::LocalEnv; use crate::storage::{PageServerNode, WalProposerNode}; use pageserver::ZTimelineId; +use pageserver::zenith_repo_dir; // // ComputeControlPlane @@ -449,8 +450,8 @@ impl PostgresNode { pub fn pg_regress(&self) { self.safe_psql("postgres", "CREATE DATABASE regression"); - - let regress_run_path = self.env.data_dir.join("regress"); + let data_dir = zenith_repo_dir(); + let regress_run_path = data_dir.join("regress"); fs::create_dir_all(regress_run_path.clone()).unwrap(); fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap(); std::env::set_current_dir(regress_run_path).unwrap(); diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index adf5d6164c..db71721e21 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -16,6 +16,7 @@ use anyhow::Result; use serde_derive::{Deserialize, Serialize}; use pageserver::ZTimelineId; +use pageserver::zenith_repo_dir; use walkeeper::xlog_utils; // @@ -52,14 +53,6 @@ impl LocalEnv { } } -fn zenith_repo_dir() -> PathBuf { - // Find repository path - match std::env::var_os("ZENITH_REPO_DIR") { - Some(val) => PathBuf::from(val.to_str().unwrap()), - None => ".zenith".into(), - } -} - // // Initialize a new Zenith repository // diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 914cbbf578..3674307fd3 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -13,7 +13,6 @@ use std::time::Duration; use postgres::{Client, NoTls}; -use crate::compute::PostgresNode; use crate::local_env::LocalEnv; use pageserver::ZTimelineId; diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index 67df31ef65..a50040d358 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -69,12 +69,15 @@ fn test_regress() { // Runs pg_bench on a compute node #[test] fn pgbench() { + let local_env = local_env::test_env("pgbench"); + // Start pageserver that reads WAL directly from that postgres - let storage_cplane = TestStorageControlPlane::one_page_server(String::new()); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); + let storage_cplane = TestStorageControlPlane::one_page_server(&local_env); + let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); // start postgres - let node = compute_cplane.new_test_node(); + let maintli = storage_cplane.get_branch_timeline("main"); + let node = compute_cplane.new_test_node(maintli); node.start().unwrap(); node.pg_bench(10, 100); diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 12db5180af..340894d55b 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -4,12 +4,11 @@ use log::*; use std::fs; -use std::fs::{File, OpenOptions}; +use std::fs::OpenOptions; use std::io; use std::path::PathBuf; use std::process::exit; use std::thread; -use std::fs::OpenOptions; use anyhow::{Context, Result}; use clap::{App, Arg}; @@ -18,18 +17,11 @@ use daemonize::Daemonize; use slog::Drain; use pageserver::page_service; +use pageserver::zenith_repo_dir; use pageserver::tui; //use pageserver::walreceiver; use pageserver::PageServerConf; -fn zenith_repo_dir() -> String { - // Find repository path - match std::env::var_os("ZENITH_REPO_DIR") { - Some(val) => String::from(val.to_str().unwrap()), - None => ".zenith".into(), - } -} - fn main() -> Result<()> { let arg_matches = App::new("Zenith page server") .about("Materializes WAL stream to pages and serves them to the postgres") @@ -140,7 +132,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> { // does this for us. let repodir = zenith_repo_dir(); std::env::set_current_dir(&repodir)?; - info!("Changed current directory to repository in {}", &repodir); + info!("Changed current directory to repository in {:?}", &repodir); } let mut threads = Vec::new(); @@ -186,7 +178,7 @@ fn init_logging(conf: &PageServerConf) -> Result PathBuf { + // Find repository path + match std::env::var_os("ZENITH_REPO_DIR") { + Some(val) => PathBuf::from(val.to_str().unwrap()), + None => ".zenith".into(), + } +} diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 9ed0a422c5..79ea7f072b 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -8,7 +8,7 @@ use crate::restore_local_repo::restore_timeline; use crate::ZTimelineId; -use crate::{walredo, PageServerConf}; +use crate::{walredo, PageServerConf, zenith_repo_dir}; use anyhow::bail; use bytes::{Buf, BufMut, Bytes, BytesMut}; use crossbeam_channel::unbounded; @@ -150,8 +150,8 @@ pub fn get_or_restore_pagecache( } } -fn open_rocksdb(conf: &PageServerConf, timelineid: u64) -> DB { - let path = conf.data_dir.join(timelineid.to_string()); +fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> DB { + let path = zenith_repo_dir().join(timelineid.to_string()); let mut opts = Options::default(); opts.create_if_missing(true); opts.set_use_fsync(true); @@ -159,7 +159,7 @@ fn open_rocksdb(conf: &PageServerConf, timelineid: u64) -> DB { DB::open(&opts, &path).unwrap() } -fn init_page_cache(conf: &PageServerConf, timelineid: u64) -> PageCache { +fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache { // Initialize the channel between the page cache and the WAL applicator let (s, r) = unbounded(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 1ab7ee4eb4..239b89e306 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -428,12 +428,8 @@ pub fn thread_main(conf: &PageServerConf) { loop { let (socket, peer_addr) = listener.accept().await.unwrap(); debug!("accepted connection from {}", peer_addr); -<<<<<<< HEAD socket.set_nodelay(true).unwrap(); - let mut conn_handler = Connection::new(conf.clone(), socket); -======= let mut conn_handler = Connection::new(conf.clone(), socket, &runtime_ref); ->>>>>>> main task::spawn(async move { if let Err(err) = conn_handler.run().await { @@ -788,19 +784,11 @@ impl Connection { loop { let message = self.read_message().await?; -<<<<<<< HEAD - /* - if let Some(m) = &message { - trace!("query({}): {:?}", sysid, m); - }; - */ -======= if let Some(m) = &message { - info!("query({:?}): {:?}", timelineid, m); + trace!("query({:?}): {:?}", timelineid, m); }; ->>>>>>> main if message.is_none() { // connection was closed return Ok(()); @@ -869,41 +857,6 @@ impl Connection { self.write_message(&msg).await? } -<<<<<<< HEAD -======= - Some(FeMessage::ZenithCreateRequest(req)) => { - let tag = page_cache::RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - - pcache.relsize_inc(&tag, 0); - - self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { - ok: true, - n_blocks: 0, - })) - .await? - } - Some(FeMessage::ZenithExtendRequest(req)) => { - let tag = page_cache::RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - - pcache.relsize_inc(&tag, req.blkno + 1); - - self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { - ok: true, - n_blocks: 0, - })) - .await? - } ->>>>>>> main _ => {} } } diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 262479a556..4308fd66a9 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -27,6 +27,7 @@ use anyhow::Result; use bytes::Bytes; use crate::page_cache; +use crate::page_cache::RelTag; use crate::page_cache::BufferTag; use crate::page_cache::PageCache; use crate::waldecoder::WalStreamDecoder; @@ -202,11 +203,13 @@ fn restore_relfile( let r = file.read_exact(&mut buf); match r { Ok(_) => { - let tag = page_cache::BufferTag { - spcnode: spcoid, - dbnode: dboid, - relnode: relnode, - forknum: forknum as u8, + let tag = BufferTag { + rel: RelTag { + spcnode: spcoid, + dbnode: dboid, + relnode: relnode, + forknum: forknum as u8, + }, blknum: blknum, }; pcache.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf)); @@ -233,14 +236,6 @@ fn restore_relfile( blknum += 1; } - let tag = page_cache::RelTag { - spcnode: spcoid, - dbnode: dboid, - relnode: relnode, - forknum: forknum as u8, - }; - pcache.relsize_inc(&tag, blknum); - Ok(()) } @@ -308,16 +303,19 @@ fn restore_wal( // so having multiple copies of it doesn't cost that much) for blk in decoded.blocks.iter() { let tag = BufferTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum as u8, + rel: RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum as u8, + }, blknum: blk.blkno, }; let rec = page_cache::WALRecord { lsn: lsn, will_init: blk.will_init || blk.apply_image, + truncate: false, rec: recdata.clone(), }; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 400c8c59da..f20b7935c2 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -21,6 +21,7 @@ use std::fs; use std::fs::OpenOptions; use std::io::prelude::*; use std::io::Error; +use std::path::PathBuf; use std::process::Stdio; use std::sync::Arc; use std::time::Duration; @@ -171,7 +172,7 @@ impl WalRedoProcess { // Limit shared cache for wal-redo-postres let mut config = OpenOptions::new() .append(true) - .open(datadir.join("postgresql.conf"))?; + .open(PathBuf::from(&datadir).join("postgresql.conf"))?; config.write(b"shared_buffers=128kB\n")?; config.write(b"fsync=off\n")?; } diff --git a/postgres_ffi/Cargo.toml b/postgres_ffi/Cargo.toml index 77cc5cf028..9ca97154c3 100644 --- a/postgres_ffi/Cargo.toml +++ b/postgres_ffi/Cargo.toml @@ -16,4 +16,4 @@ crc32c = "0.6.0" hex = "0.4.3" [build-dependencies] -bindgen = "0.53.1" +bindgen = "0.57" diff --git a/vendor/postgres b/vendor/postgres index b898ad7e3b..9f9aa9c300 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit b898ad7e3b9acce72b64bf064257e392f979a659 +Subproject commit 9f9aa9c300c9bbac296e2c126b3f96701d4e683d