cargo fmt

This commit is contained in:
Eric Seppanen
2021-05-17 09:29:58 -07:00
parent 746f667311
commit 398d522d88
12 changed files with 116 additions and 80 deletions

View File

@@ -103,8 +103,7 @@ pub fn init(remote_pageserver: Option<&str>) -> Result<()> {
let conf = if let Some(addr) = remote_pageserver {
// check that addr is parsable
let _uri = Url::parse(addr)
.map_err(|e| anyhow!("{}: {}", addr, e))?;
let _uri = Url::parse(addr).map_err(|e| anyhow!("{}: {}", addr, e))?;
LocalEnv {
pageserver_connstring: format!("postgresql://{}/", addr),

View File

@@ -1,18 +1,18 @@
use std::collections::HashMap;
use std::net::{SocketAddr, TcpStream};
use std::path::PathBuf;
use std::process::Command;
use std::thread;
use std::time::Duration;
use std::collections::HashMap;
use anyhow::{anyhow, bail, Result};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use postgres::{Client, NoTls};
use pageserver::branches::BranchInfo;
use crate::local_env::LocalEnv;
use crate::read_pidfile;
use pageserver::branches::BranchInfo;
//
// Control routines for pageserver.
@@ -43,10 +43,14 @@ impl PageServerNode {
pub fn init(&self) -> Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let status = cmd.args(&["--init", "-D", self.env.base_data_dir.to_str().unwrap()])
let status = cmd
.args(&["--init", "-D", self.env.base_data_dir.to_str().unwrap()])
.env_clear()
.env("RUST_BACKTRACE", "1")
.env("POSTGRES_DISTRIB_DIR", self.env.pg_distrib_dir.to_str().unwrap())
.env(
"POSTGRES_DISTRIB_DIR",
self.env.pg_distrib_dir.to_str().unwrap(),
)
.env("ZENITH_REPO_DIR", self.repo_path())
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
@@ -77,15 +81,23 @@ impl PageServerNode {
);
let mut cmd = Command::new(self.env.pageserver_bin()?);
cmd.args(&["-l", self.address().to_string().as_str(), "-D", self.repo_path().to_str().unwrap()])
.arg("-d")
.env_clear()
.env("RUST_BACKTRACE", "1")
.env("POSTGRES_DISTRIB_DIR", self.env.pg_distrib_dir.to_str().unwrap())
.env("ZENITH_REPO_DIR", self.repo_path())
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap());
cmd.args(&[
"-l",
self.address().to_string().as_str(),
"-D",
self.repo_path().to_str().unwrap(),
])
.arg("-d")
.env_clear()
.env("RUST_BACKTRACE", "1")
.env(
"POSTGRES_DISTRIB_DIR",
self.env.pg_distrib_dir.to_str().unwrap(),
)
.env("ZENITH_REPO_DIR", self.repo_path())
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap());
if !cmd.status()?.success() {
bail!(
@@ -183,8 +195,13 @@ impl PageServerNode {
.flatten()
.ok_or_else(|| anyhow!("missing branch"))?;
let res: BranchInfo = serde_json::from_str(branch_json)
.map_err(|e| anyhow!("failed to parse branch_create response: {}: {}", branch_json, e))?;
let res: BranchInfo = serde_json::from_str(branch_json).map_err(|e| {
anyhow!(
"failed to parse branch_create response: {}: {}",
branch_json,
e
)
})?;
Ok(res)
}

View File

@@ -30,7 +30,8 @@ pub fn create_test_env(testname: &str) -> LocalEnv {
// Remove remnants of old test repo
let _ = fs::remove_dir_all(&base_path);
fs::create_dir_all(&base_path).expect(format!("could not create directory for {}", base_path_str).as_str());
fs::create_dir_all(&base_path)
.expect(format!("could not create directory for {}", base_path_str).as_str());
let pgdatadirs_path = base_path.join("pgdatadirs");
fs::create_dir(&pgdatadirs_path)
@@ -107,7 +108,7 @@ impl TestStorageControlPlane {
data_dir: datadir_base.join(format!("wal_acceptor_{}", i)),
systemid,
env: local_env.clone(),
pass_to_pageserver: i == 0
pass_to_pageserver: i == 0,
};
wal_acceptor.init();
wal_acceptor.start();
@@ -343,15 +344,21 @@ impl WalAcceptorNode {
[].to_vec()
};
let status = Command::new(self.env.zenith_distrib_dir.as_ref().unwrap().join("wal_acceptor"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.listen.to_string().as_str()])
.args(&["--systemid", self.systemid.to_string().as_str()])
.args(&ps_arg)
.arg("-d")
.arg("-n")
.status()
.expect("failed to start wal_acceptor");
let status = Command::new(
self.env
.zenith_distrib_dir
.as_ref()
.unwrap()
.join("wal_acceptor"),
)
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.listen.to_string().as_str()])
.args(&["--systemid", self.systemid.to_string().as_str()])
.args(&ps_arg)
.arg("-d")
.arg("-n")
.status()
.expect("failed to start wal_acceptor");
if !status.success() {
panic!("wal_acceptor start failed");

View File

@@ -1,8 +1,8 @@
use control_plane::compute::ComputeControlPlane;
use integration_tests;
use integration_tests::TestStorageControlPlane;
use integration_tests::PostgresNodeExt;
use integration_tests::TestStorageControlPlane;
// XXX: force all redo at the end
// -- restart + seqscan won't read deleted stuff

View File

@@ -6,8 +6,8 @@ use std::{thread, time};
use control_plane::compute::ComputeControlPlane;
use integration_tests;
use integration_tests::TestStorageControlPlane;
use integration_tests::PostgresNodeExt;
use integration_tests::TestStorageControlPlane;
const DOWNTIME: u64 = 2;
@@ -101,7 +101,10 @@ fn test_many_timelines() {
for i in 1..N_TIMELINES {
let branchname = format!("experimental{}", i);
storage_cplane.pageserver.branch_create(&branchname, "main").unwrap();
storage_cplane
.pageserver
.branch_create(&branchname, "main")
.unwrap();
timelines.push(branchname);
}

View File

@@ -5,11 +5,11 @@
use log::*;
use parse_duration::parse;
use std::fs::{File, OpenOptions};
use std::{env, path::PathBuf};
use std::io;
use std::process::exit;
use std::thread;
use std::time::Duration;
use std::{env, path::PathBuf};
use anyhow::{Context, Result};
use clap::{App, Arg};
@@ -17,7 +17,7 @@ use daemonize::Daemonize;
use slog::{Drain, FnValue};
use pageserver::{page_cache, page_service, tui, PageServerConf, branches};
use pageserver::{branches, page_cache, page_service, tui, PageServerConf};
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
const DEFAULT_GC_PERIOD_SEC: u64 = 10;
@@ -199,7 +199,10 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
// change into the repository directory. In daemon mode, Daemonize
// does this for us.
std::env::set_current_dir(&conf.workdir)?;
info!("Changed current directory to repository in {:?}", &conf.workdir);
info!(
"Changed current directory to repository in {:?}",
&conf.workdir
);
}
let mut threads = Vec::new();

View File

@@ -4,19 +4,25 @@
// TODO: move all paths construction to conf impl
//
use anyhow::{Context, Result, anyhow};
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use fs::File;
use fs_extra;
use postgres_ffi::xlog_utils;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fs, path::{Path, PathBuf}, process::{Command, Stdio}, str::FromStr};
use fs_extra;
use fs::File;
use std::io::Read;
use std::env;
use std::io::Read;
use std::{
collections::HashMap,
fs,
path::{Path, PathBuf},
process::{Command, Stdio},
str::FromStr,
};
use zenith_utils::lsn::Lsn;
use crate::{repository::Repository, ZTimelineId, PageServerConf};
use crate::{repository::Repository, PageServerConf, ZTimelineId};
#[derive(Serialize, Deserialize, Clone)]
pub struct BranchInfo {
@@ -71,10 +77,7 @@ pub fn init_repo(conf: &PageServerConf) -> Result<()> {
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.env(
"DYLD_LIBRARY_PATH",
conf.pg_lib_dir().to_str().unwrap(),
)
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.stdout(Stdio::null())
.status()
.with_context(|| "failed to execute initdb")?;
@@ -176,7 +179,11 @@ pub(crate) fn get_system_id(conf: &PageServerConf) -> Result<u64> {
Ok(controlfile.system_identifier)
}
pub(crate) fn create_branch(conf: &PageServerConf, branchname: &str, startpoint_str: &str) -> Result<BranchInfo> {
pub(crate) fn create_branch(
conf: &PageServerConf,
branchname: &str,
startpoint_str: &str,
) -> Result<BranchInfo> {
if conf.branch_path(&branchname).exists() {
anyhow::bail!("branch {} already exists", branchname);
}
@@ -208,7 +215,7 @@ pub(crate) fn create_branch(conf: &PageServerConf, branchname: &str, startpoint_
&oldtimelinedir.join("wal"),
&newtimelinedir.join("wal"),
startpoint.lsn,
16 * 1024 * 1024 // FIXME: assume default WAL segment size
16 * 1024 * 1024, // FIXME: assume default WAL segment size
)?;
Ok(BranchInfo {
@@ -243,8 +250,9 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result<PointInTime> {
let lsn: Option<Lsn>;
if let Some(lsnstr) = strings.next() {
lsn = Some(Lsn::from_str(lsnstr)
.with_context(|| "invalid LSN in point-in-time specification")?);
lsn = Some(
Lsn::from_str(lsnstr).with_context(|| "invalid LSN in point-in-time specification")?,
);
} else {
lsn = None
}
@@ -337,8 +345,7 @@ fn create_timeline(conf: &PageServerConf, ancestor: Option<PointInTime>) -> Resu
/// If the given LSN is in the middle of a segment, the last segment containing it
/// is written out as .partial, and padded with zeros.
///
fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{
fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()> {
let last_segno = upto.segment_number(wal_seg_size);
let last_segoff = upto.segment_offset(wal_seg_size);
@@ -349,7 +356,7 @@ fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Res
// Check if the filename looks like an xlog file, or a .partial file.
if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) {
continue
continue;
}
let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize);
@@ -371,7 +378,10 @@ fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Res
std::io::copy(&mut src_file.take(copylen), &mut dst_file)?;
if copylen < wal_seg_size {
std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?;
std::io::copy(
&mut std::io::repeat(0).take(wal_seg_size - copylen),
&mut dst_file,
)?;
}
}
}

View File

@@ -32,7 +32,6 @@ pub struct PageServerConf {
}
impl PageServerConf {
//
// Repository paths, relative to workdir.
//
@@ -50,7 +49,9 @@ impl PageServerConf {
}
fn snapshots_path(&self, timelineid: ZTimelineId) -> PathBuf {
std::path::Path::new("timelines").join(timelineid.to_string()).join("snapshots")
std::path::Path::new("timelines")
.join(timelineid.to_string())
.join("snapshots")
}
//
@@ -66,7 +67,6 @@ impl PageServerConf {
}
}
/// Zenith Timeline ID is a 128-bit random ID.
///
/// Zenith timeline IDs are different from PostgreSQL timeline
@@ -127,4 +127,3 @@ impl fmt::Display for ZTimelineId {
f.write_str(&hex::encode(self.0))
}
}

View File

@@ -24,13 +24,13 @@ use std::time::Duration;
use zenith_utils::lsn::Lsn;
use crate::basebackup;
use crate::branches;
use crate::page_cache;
use crate::repository::{BufferTag, RelTag};
use crate::restore_local_repo;
use crate::walreceiver;
use crate::PageServerConf;
use crate::ZTimelineId;
use crate::branches;
#[derive(Debug)]
enum FeMessage {
@@ -691,7 +691,6 @@ impl Connection {
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
} else if query_string.starts_with(b"branch_create ") {
let query_str = String::from_utf8(query_string.to_vec())?;
let err = || anyhow!("invalid branch_create: '{}'", query_str);
@@ -699,9 +698,7 @@ impl Connection {
// branch_create <branchname> <startpoint>
// TODO lazy static
let re = Regex::new(r"^branch_create (\w+) ([\w@\\]+)[\r\n\s]*;?$").unwrap();
let caps = re
.captures(&query_str)
.ok_or_else(err)?;
let caps = re.captures(&query_str).ok_or_else(err)?;
let branchname: String = String::from(caps.get(1).ok_or_else(err)?.as_str());
let startpoint_str: String = String::from(caps.get(2).ok_or_else(err)?.as_str());
@@ -731,10 +728,12 @@ impl Connection {
// on connect
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
} else if query_string.to_ascii_lowercase().starts_with(b"identify_system") {
} else if query_string
.to_ascii_lowercase()
.starts_with(b"identify_system")
{
// TODO: match postgres response formarmat for 'identify_system'
let system_id = crate::branches::get_system_id(&self.conf)?
.to_string();
let system_id = crate::branches::get_system_id(&self.conf)?.to_string();
self.write_message_noflush(&BeMessage::RowDescription)?;
self.write_message_noflush(&BeMessage::DataRow(Bytes::from(system_id)))?;

View File

@@ -287,11 +287,11 @@ mod tests {
use super::*;
use crate::walredo::{WalRedoError, WalRedoManager};
use crate::PageServerConf;
use std::env;
use std::fs;
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
use std::env;
fn get_test_conf() -> PageServerConf {
PageServerConf {

View File

@@ -156,10 +156,7 @@ impl CacheEntryContent {
}
impl RocksRepository {
pub fn new(
conf: &PageServerConf,
walredo_mgr: Arc<dyn WalRedoManager>,
) -> RocksRepository {
pub fn new(conf: &PageServerConf, walredo_mgr: Arc<dyn WalRedoManager>) -> RocksRepository {
RocksRepository {
conf: conf.clone(),
timelines: Mutex::new(HashMap::new()),
@@ -185,8 +182,7 @@ impl Repository for RocksRepository {
match timelines.get(&timelineid) {
Some(timeline) => Ok(timeline.clone()),
None => {
let timeline =
RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone());
let timeline = RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone());
restore_timeline(&self.conf, &timeline, timelineid)?;

View File

@@ -1,13 +1,13 @@
use anyhow::Result;
use anyhow::{anyhow, Context};
use clap::{App, Arg, ArgMatches, SubCommand};
use std::collections::HashMap;
use std::process::exit;
use anyhow::{Context, anyhow};
use anyhow::Result;
use clap::{App, Arg, ArgMatches, SubCommand};
use control_plane::local_env;
use control_plane::compute::ComputeControlPlane;
use control_plane::local_env;
use control_plane::storage::PageServerNode;
use pageserver::{ZTimelineId, branches::BranchInfo};
use pageserver::{branches::BranchInfo, ZTimelineId};
use zenith_utils::lsn::Lsn;
// Main entry point for the 'zenith' CLI utility
@@ -33,7 +33,7 @@ fn main() -> Result<()> {
Arg::with_name("remote-pageserver")
.long("remote-pageserver")
.required(false)
.value_name("pageserver-url")
.value_name("pageserver-url"),
),
)
.subcommand(
@@ -66,8 +66,7 @@ fn main() -> Result<()> {
// Create config file
if let ("init", Some(sub_args)) = matches.subcommand() {
let pageserver_uri = sub_args.value_of("pageserver-url");
local_env::init(pageserver_uri)
.with_context(|| "Failed to create cofig file")?;
local_env::init(pageserver_uri).with_context(|| "Failed to create cofig file")?;
}
// all other commands would need config
@@ -91,7 +90,11 @@ fn main() -> Result<()> {
if let Some(branchname) = sub_args.value_of("branchname") {
if let Some(startpoint_str) = sub_args.value_of("start-point") {
let branch = pageserver.branch_create(branchname, startpoint_str)?;
println!("Created branch '{}' at {:?}", branch.name, branch.latest_valid_lsn.unwrap_or(Lsn(0)));
println!(
"Created branch '{}' at {:?}",
branch.name,
branch.latest_valid_lsn.unwrap_or(Lsn(0))
);
} else {
panic!("Missing start-point");
}