fix clippy warnings

This commit is contained in:
Dmitry Rodionov
2021-08-31 15:36:20 +03:00
committed by Dmitry
parent 0e4cbe0165
commit bc709561b6
29 changed files with 200 additions and 223 deletions

View File

@@ -74,7 +74,10 @@ impl PageServerNode {
args.extend(&["--auth-type", "ZenithJWT"]);
}
create_tenant.map(|tenantid| args.extend(&["--create-tenant", tenantid]));
if let Some(tenantid) = create_tenant {
args.extend(&["--create-tenant", tenantid])
}
let status = cmd
.args(args)
.env_clear()

View File

@@ -84,10 +84,10 @@ impl<'a> Basebackup<'a> {
for filepath in pg_constants::PGDATA_SPECIAL_FILES.iter() {
if *filepath == "pg_hba.conf" {
let data = pg_constants::PG_HBA.as_bytes();
let header = new_tar_header(&filepath, data.len() as u64)?;
self.ar.append(&header, &data[..])?;
let header = new_tar_header(filepath, data.len() as u64)?;
self.ar.append(&header, data)?;
} else {
let header = new_tar_header(&filepath, 0)?;
let header = new_tar_header(filepath, 0)?;
self.ar.append(&header, &mut io::empty())?;
}
}
@@ -166,14 +166,12 @@ impl<'a> Basebackup<'a> {
self.lsn,
)?;
let path = if spcnode == pg_constants::GLOBALTABLESPACE_OID {
let dst_path = "PG_VERSION";
let version_bytes = pg_constants::PG_MAJORVERSION.as_bytes();
let header = new_tar_header(&dst_path, version_bytes.len() as u64)?;
self.ar.append(&header, &version_bytes[..])?;
let header = new_tar_header("PG_VERSION", version_bytes.len() as u64)?;
self.ar.append(&header, version_bytes)?;
let dst_path = format!("global/PG_VERSION");
let header = new_tar_header(&dst_path, version_bytes.len() as u64)?;
self.ar.append(&header, &version_bytes[..])?;
let header = new_tar_header("global/PG_VERSION", version_bytes.len() as u64)?;
self.ar.append(&header, version_bytes)?;
String::from("global/pg_filenode.map") // filenode map for global tablespace
} else {
@@ -188,7 +186,7 @@ impl<'a> Basebackup<'a> {
let dst_path = format!("base/{}/PG_VERSION", dbnode);
let version_bytes = pg_constants::PG_MAJORVERSION.as_bytes();
let header = new_tar_header(&dst_path, version_bytes.len() as u64)?;
self.ar.append(&header, &version_bytes[..])?;
self.ar.append(&header, version_bytes)?;
format!("base/{}/pg_filenode.map", dbnode)
};

View File

@@ -113,7 +113,7 @@ impl CfgFileParams {
.auth_type
.as_ref()
.map_or(Ok(AuthType::Trust), |auth_type| {
AuthType::from_str(&auth_type)
AuthType::from_str(auth_type)
})?;
if !pg_distrib_dir.join("bin/postgres").exists() {
@@ -273,7 +273,7 @@ fn main() -> Result<()> {
fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
// Initialize logger
let (_scope_guard, log_file) = logger::init_logging(&conf, "pageserver.log")?;
let (_scope_guard, log_file) = logger::init_logging(conf, "pageserver.log")?;
let _log_guard = slog_stdlog::init()?;
// Note: this `info!(...)` macro comes from `log` crate

View File

@@ -43,7 +43,7 @@ pub struct PointInTime {
pub fn init_pageserver(conf: &'static PageServerConf, create_tenant: Option<&str>) -> Result<()> {
// Initialize logger
let (_scope_guard, _log_file) = logger::init_logging(&conf, "pageserver.log")?;
let (_scope_guard, _log_file) = logger::init_logging(conf, "pageserver.log")?;
let _log_guard = slog_stdlog::init()?;
// We don't use the real WAL redo manager, because we don't want to spawn the WAL redo
@@ -284,7 +284,7 @@ pub(crate) fn create_branch(
// FIXME: there's a race condition, if you create a branch with the same
// name concurrently.
let data = newtli.to_string();
fs::write(conf.branch_path(&branchname, tenantid), data)?;
fs::write(conf.branch_path(branchname, tenantid), data)?;
Ok(BranchInfo {
name: branchname.to_string(),
@@ -333,21 +333,21 @@ fn parse_point_in_time(
// Check if it's a tag
if lsn.is_none() {
let tagpath = conf.tag_path(name, &tenantid);
let tagpath = conf.tag_path(name, tenantid);
if tagpath.exists() {
let pointstr = fs::read_to_string(tagpath)?;
return parse_point_in_time(conf, &pointstr, &tenantid);
return parse_point_in_time(conf, &pointstr, tenantid);
}
}
// Check if it's a branch
// Check if it's branch @ LSN
let branchpath = conf.branch_path(name, &tenantid);
let branchpath = conf.branch_path(name, tenantid);
if branchpath.exists() {
let pointstr = fs::read_to_string(branchpath)?;
let mut result = parse_point_in_time(conf, &pointstr, &tenantid)?;
let mut result = parse_point_in_time(conf, &pointstr, tenantid)?;
result.lsn = lsn.unwrap_or(Lsn(0));
return Ok(result);
@@ -356,7 +356,7 @@ fn parse_point_in_time(
// Check if it's a timelineid
// Check if it's timelineid @ LSN
if let Ok(timelineid) = ZTimelineId::from_str(name) {
let tlipath = conf.timeline_path(&timelineid, &tenantid);
let tlipath = conf.timeline_path(&timelineid, tenantid);
if tlipath.exists() {
return Ok(PointInTime {
timelineid,

View File

@@ -246,8 +246,8 @@ impl LayeredRepository {
tenantid: ZTenantId,
) -> LayeredRepository {
LayeredRepository {
tenantid: tenantid,
conf: conf,
tenantid,
conf,
timelines: Mutex::new(HashMap::new()),
walredo_mgr,
}
@@ -902,7 +902,7 @@ impl LayeredTimeline {
while lsn < timeline.ancestor_lsn {
trace!("going into ancestor {} ", timeline.ancestor_lsn);
timeline = &timeline.ancestor_timeline.as_ref().unwrap();
timeline = timeline.ancestor_timeline.as_ref().unwrap();
}
// Now we have the right starting timeline for our search.
@@ -937,7 +937,7 @@ impl LayeredTimeline {
// If not, check if there's a layer on the ancestor timeline
if let Some(ancestor) = &timeline.ancestor_timeline {
lsn = timeline.ancestor_lsn;
timeline = &ancestor.as_ref();
timeline = ancestor.as_ref();
trace!("recursing into ancestor at {}/{}", timeline.timelineid, lsn);
continue;
}
@@ -1134,7 +1134,7 @@ impl LayeredTimeline {
}
// freeze it
let (new_historics, new_open) = oldest_layer.freeze(last_record_lsn, &self)?;
let (new_historics, new_open) = oldest_layer.freeze(last_record_lsn, self)?;
// replace this layer with the new layers that 'freeze' returned
layers.pop_oldest_open();
@@ -1176,7 +1176,7 @@ impl LayeredTimeline {
let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid);
let metadata = TimelineMetadata {
disk_consistent_lsn: disk_consistent_lsn,
disk_consistent_lsn,
prev_record_lsn: ondisk_prev_record_lsn,
ancestor_timeline: ancestor_timelineid,
ancestor_lsn: self.ancestor_lsn,
@@ -1304,18 +1304,14 @@ impl LayeredTimeline {
doomed_layer.delete()?;
layers.remove_historic(&*doomed_layer);
if doomed_layer.is_dropped() {
if doomed_layer.get_seg_tag().rel.is_relation() {
result.ondisk_relfiles_dropped += 1;
} else {
result.ondisk_nonrelfiles_dropped += 1;
}
} else {
if doomed_layer.get_seg_tag().rel.is_relation() {
result.ondisk_relfiles_removed += 1;
} else {
result.ondisk_nonrelfiles_removed += 1;
}
match (
doomed_layer.is_dropped(),
doomed_layer.get_seg_tag().rel.is_relation(),
) {
(true, true) => result.ondisk_relfiles_dropped += 1,
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
(false, true) => result.ondisk_relfiles_removed += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1,
}
}

View File

@@ -130,23 +130,23 @@ pub struct DeltaLayerInner {
impl Layer for DeltaLayer {
fn get_timeline_id(&self) -> ZTimelineId {
return self.timelineid;
self.timelineid
}
fn get_seg_tag(&self) -> SegmentTag {
return self.seg;
self.seg
}
fn is_dropped(&self) -> bool {
return self.dropped;
self.dropped
}
fn get_start_lsn(&self) -> Lsn {
return self.start_lsn;
self.start_lsn
}
fn get_end_lsn(&self) -> Lsn {
return self.end_lsn;
self.end_lsn
}
fn filename(&self) -> PathBuf {
@@ -358,6 +358,7 @@ impl DeltaLayer {
/// This is used to write the in-memory layer to disk. The in-memory layer uses the same
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
/// expedient.
#[allow(clippy::too_many_arguments)]
pub fn create(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
@@ -372,16 +373,16 @@ impl DeltaLayer {
) -> Result<DeltaLayer> {
let delta_layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid: timelineid,
tenantid: tenantid,
seg: seg,
start_lsn: start_lsn,
timelineid,
tenantid,
seg,
start_lsn,
end_lsn,
dropped,
inner: Mutex::new(DeltaLayerInner {
loaded: true,
page_version_metas: BTreeMap::new(),
relsizes: relsizes,
relsizes,
}),
predecessor,
};

View File

@@ -111,8 +111,10 @@ impl DeltaFileName {
dropped,
})
}
}
fn to_string(&self) -> String {
impl fmt::Display for DeltaFileName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let basename = match self.seg.rel {
RelishTag::Relation(reltag) => format!(
"rel_{}_{}_{}_{}",
@@ -134,11 +136,12 @@ impl DeltaFileName {
format!("pg_filenodemap_{}_{}", spcnode, dbnode)
}
RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid),
RelishTag::Checkpoint => format!("pg_control_checkpoint"),
RelishTag::ControlFile => format!("pg_control"),
RelishTag::Checkpoint => "pg_control_checkpoint".to_string(),
RelishTag::ControlFile => "pg_control".to_string(),
};
format!(
write!(
f,
"{}_{}_{:016X}_{:016X}{}",
basename,
self.seg.segno,
@@ -149,12 +152,6 @@ impl DeltaFileName {
}
}
impl fmt::Display for DeltaFileName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.to_string())
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct ImageFileName {
pub seg: SegmentTag,
@@ -233,8 +230,10 @@ impl ImageFileName {
Some(ImageFileName { seg, lsn })
}
}
fn to_string(&self) -> String {
impl fmt::Display for ImageFileName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let basename = match self.seg.rel {
RelishTag::Relation(reltag) => format!(
"rel_{}_{}_{}_{}",
@@ -256,11 +255,12 @@ impl ImageFileName {
format!("pg_filenodemap_{}_{}", spcnode, dbnode)
}
RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid),
RelishTag::Checkpoint => format!("pg_control_checkpoint"),
RelishTag::ControlFile => format!("pg_control"),
RelishTag::Checkpoint => "pg_control_checkpoint".to_string(),
RelishTag::ControlFile => "pg_control".to_string(),
};
format!(
write!(
f,
"{}_{}_{:016X}",
basename,
self.seg.segno,
@@ -269,12 +269,6 @@ impl ImageFileName {
}
}
impl fmt::Display for ImageFileName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.to_string())
}
}
/// Scan timeline directory and create ImageFileName and DeltaFilename
/// structs representing all files on disk
///
@@ -302,7 +296,7 @@ pub fn list_files(
warn!("unrecognized filename in timeline dir: {}", fname);
}
}
return Ok((imgfiles, deltafiles));
Ok((imgfiles, deltafiles))
}
/// Helper enum to hold a PageServerConf, or a path

View File

@@ -97,23 +97,23 @@ impl Layer for ImageLayer {
}
fn get_timeline_id(&self) -> ZTimelineId {
return self.timelineid;
self.timelineid
}
fn get_seg_tag(&self) -> SegmentTag {
return self.seg;
self.seg
}
fn is_dropped(&self) -> bool {
return false;
false
}
fn get_start_lsn(&self) -> Lsn {
return self.lsn;
self.lsn
}
fn get_end_lsn(&self) -> Lsn {
return self.lsn;
self.lsn
}
/// Look up given page in the file
@@ -255,10 +255,10 @@ impl ImageLayer {
let layer = ImageLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid: timelineid,
tenantid: tenantid,
seg: seg,
lsn: lsn,
timelineid,
tenantid,
seg,
lsn,
inner: Mutex::new(ImageLayerInner {
loaded: true,
image_type: image_type.clone(),

View File

@@ -14,6 +14,7 @@ use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
use bytes::Bytes;
use log::*;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::ops::Bound::Included;
use std::path::PathBuf;
@@ -93,8 +94,8 @@ impl Layer for InMemoryLayer {
let delta_filename = DeltaFileName {
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: end_lsn,
dropped: dropped,
end_lsn,
dropped,
}
.to_string();
@@ -102,15 +103,15 @@ impl Layer for InMemoryLayer {
}
fn get_timeline_id(&self) -> ZTimelineId {
return self.timelineid;
self.timelineid
}
fn get_seg_tag(&self) -> SegmentTag {
return self.seg;
self.seg
}
fn get_start_lsn(&self) -> Lsn {
return self.start_lsn;
self.start_lsn
}
fn get_end_lsn(&self) -> Lsn {
@@ -254,6 +255,10 @@ impl Layer for InMemoryLayer {
}
}
// Type alias to simplify InMemoryLayer::freeze signature
//
type SuccessorLayers = (Vec<Arc<dyn Layer>>, Option<Arc<InMemoryLayer>>);
impl InMemoryLayer {
/// Return the oldest page version that's stored in this layer
pub fn get_oldest_pending_lsn(&self) -> Lsn {
@@ -429,7 +434,7 @@ impl InMemoryLayer {
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
segsizes: segsizes,
segsizes,
}),
predecessor: Some(src),
})
@@ -454,7 +459,7 @@ impl InMemoryLayer {
cutoff_lsn: Lsn,
// This is needed just to call materialize_page()
timeline: &LayeredTimeline,
) -> Result<(Vec<Arc<dyn Layer>>, Option<Arc<InMemoryLayer>>)> {
) -> Result<SuccessorLayers> {
info!(
"freezing in memory layer for {} on timeline {} at {}",
self.seg, self.timelineid, cutoff_lsn
@@ -494,13 +499,17 @@ impl InMemoryLayer {
before_page_versions = BTreeMap::new();
after_page_versions = BTreeMap::new();
for ((blknum, lsn), pv) in inner.page_versions.iter() {
if *lsn == end_lsn {
// Page versions at the cutoff LSN will be stored in the
// materialized image layer.
} else if *lsn > end_lsn {
after_page_versions.insert((*blknum, *lsn), pv.clone());
} else {
before_page_versions.insert((*blknum, *lsn), pv.clone());
match lsn.cmp(&end_lsn) {
Ordering::Less => {
before_page_versions.insert((*blknum, *lsn), pv.clone());
}
Ordering::Equal => {
// Page versions at the cutoff LSN will be stored in the
// materialized image layer.
}
Ordering::Greater => {
after_page_versions.insert((*blknum, *lsn), pv.clone());
}
}
}
} else {

View File

@@ -109,7 +109,7 @@ impl LayerMap {
if let Some(open) = &segentry.open {
if open.get_start_lsn() <= lsn {
let x: Arc<dyn Layer> = Arc::clone(&open) as _;
let x: Arc<dyn Layer> = Arc::clone(open) as _;
return Some(x);
}
}
@@ -119,7 +119,7 @@ impl LayerMap {
.range((Included(Lsn(0)), Included(lsn)))
.next_back()
{
let x: Arc<dyn Layer> = Arc::clone(&v) as _;
let x: Arc<dyn Layer> = Arc::clone(v) as _;
Some(x)
} else {
None
@@ -132,12 +132,7 @@ impl LayerMap {
///
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<InMemoryLayer>> {
let segentry = self.segs.get(tag)?;
if let Some(open) = &segentry.open {
Some(Arc::clone(open))
} else {
None
}
segentry.open.as_ref().map(Arc::clone)
}
///
@@ -161,7 +156,7 @@ impl LayerMap {
let opensegentry = OpenSegEntry {
oldest_pending_lsn: layer.get_oldest_pending_lsn(),
layer: layer,
layer,
generation: self.current_generation,
};
self.open_segs.push(opensegentry);

View File

@@ -372,7 +372,7 @@ impl PageServerHandler {
.claims
.as_ref()
.expect("claims presence already checked");
Ok(auth::check_permission(claims, tenantid)?)
auth::check_permission(claims, tenantid)
}
}
@@ -389,7 +389,7 @@ impl postgres_backend::Handler for PageServerHandler {
.as_ref()
.as_ref()
.unwrap()
.decode(&str::from_utf8(jwt_response)?)?;
.decode(str::from_utf8(jwt_response)?)?;
if matches!(data.claims.scope, Scope::Tenant) {
ensure!(
@@ -425,7 +425,7 @@ impl postgres_backend::Handler for PageServerHandler {
self.handle_controlfile(pgb)?;
} else if query_string.starts_with("pagestream ") {
let (_, params_raw) = query_string.split_at("pagestream ".len());
let params = params_raw.split(" ").collect::<Vec<_>>();
let params = params_raw.split(' ').collect::<Vec<_>>();
ensure!(
params.len() == 2,
"invalid param number for pagestream command"
@@ -484,7 +484,7 @@ impl postgres_backend::Handler for PageServerHandler {
.get_timeline(timelineid)
.context(format!("error fetching timeline {}", timelineid))?;
walreceiver::launch_wal_receiver(&self.conf, timelineid, &connstr, tenantid.to_owned());
walreceiver::launch_wal_receiver(self.conf, timelineid, &connstr, tenantid.to_owned());
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("branch_create ") {
@@ -495,7 +495,7 @@ impl postgres_backend::Handler for PageServerHandler {
// TODO: escaping, to allow branch names with spaces
let re = Regex::new(r"^branch_create ([[:xdigit:]]+) (\S+) ([^\r\n\s;]+)[\r\n\s;]*;?$")
.unwrap();
let caps = re.captures(&query_string).ok_or_else(err)?;
let caps = re.captures(query_string).ok_or_else(err)?;
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
let branchname = caps.get(2).ok_or_else(err)?.as_str().to_owned();
@@ -504,7 +504,7 @@ impl postgres_backend::Handler for PageServerHandler {
self.check_permission(Some(tenantid))?;
let branch =
branches::create_branch(&self.conf, &branchname, &startpoint_str, &tenantid)?;
branches::create_branch(self.conf, &branchname, &startpoint_str, &tenantid)?;
let branch = serde_json::to_vec(&branch)?;
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?
@@ -519,14 +519,14 @@ impl postgres_backend::Handler for PageServerHandler {
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
let branches = crate::branches::get_branches(&self.conf, &tenantid)?;
let branches = crate::branches::get_branches(self.conf, &tenantid)?;
let branches_buf = serde_json::to_vec(&branches)?;
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?
.write_message_noflush(&BeMessage::DataRow(&[Some(&branches_buf)]))?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("tenant_list") {
let tenants = crate::branches::get_tenants(&self.conf)?;
let tenants = crate::branches::get_tenants(self.conf)?;
let tenants_buf = serde_json::to_vec(&tenants)?;
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?
@@ -537,13 +537,13 @@ impl postgres_backend::Handler for PageServerHandler {
// tenant_create <tenantid>
let re = Regex::new(r"^tenant_create ([[:xdigit:]]+)$").unwrap();
let caps = re.captures(&query_string).ok_or_else(err)?;
let caps = re.captures(query_string).ok_or_else(err)?;
self.check_permission(None)?;
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
tenant_mgr::create_repository_for_tenant(&self.conf, tenantid)?;
tenant_mgr::create_repository_for_tenant(self.conf, tenantid)?;
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
@@ -597,39 +597,39 @@ impl postgres_backend::Handler for PageServerHandler {
RowDescriptor::int8_col(b"elapsed"),
]))?
.write_message_noflush(&BeMessage::DataRow(&[
Some(&result.ondisk_relfiles_total.to_string().as_bytes()),
Some(result.ondisk_relfiles_total.to_string().as_bytes()),
Some(
&result
result
.ondisk_relfiles_needed_by_cutoff
.to_string()
.as_bytes(),
),
Some(
&result
result
.ondisk_relfiles_needed_by_branches
.to_string()
.as_bytes(),
),
Some(&result.ondisk_relfiles_not_updated.to_string().as_bytes()),
Some(&result.ondisk_relfiles_removed.to_string().as_bytes()),
Some(&result.ondisk_relfiles_dropped.to_string().as_bytes()),
Some(&result.ondisk_nonrelfiles_total.to_string().as_bytes()),
Some(result.ondisk_relfiles_not_updated.to_string().as_bytes()),
Some(result.ondisk_relfiles_removed.to_string().as_bytes()),
Some(result.ondisk_relfiles_dropped.to_string().as_bytes()),
Some(result.ondisk_nonrelfiles_total.to_string().as_bytes()),
Some(
&result
result
.ondisk_nonrelfiles_needed_by_cutoff
.to_string()
.as_bytes(),
),
Some(
&result
result
.ondisk_nonrelfiles_needed_by_branches
.to_string()
.as_bytes(),
),
Some(&result.ondisk_nonrelfiles_not_updated.to_string().as_bytes()),
Some(&result.ondisk_nonrelfiles_removed.to_string().as_bytes()),
Some(&result.ondisk_nonrelfiles_dropped.to_string().as_bytes()),
Some(&result.elapsed.as_millis().to_string().as_bytes()),
Some(result.ondisk_nonrelfiles_not_updated.to_string().as_bytes()),
Some(result.ondisk_nonrelfiles_removed.to_string().as_bytes()),
Some(result.ondisk_nonrelfiles_dropped.to_string().as_bytes()),
Some(result.elapsed.as_millis().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else {

View File

@@ -125,11 +125,7 @@ impl RelishTag {
// convenience function to check if this relish is a normal relation.
pub const fn is_relation(&self) -> bool {
if let RelishTag::Relation(_) = self {
true
} else {
false
}
matches!(self, RelishTag::Relation(_))
}
}

View File

@@ -113,7 +113,7 @@ pub trait Timeline: Send + Sync {
fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelTag>>;
/// Get a list of non-relational objects
fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result<HashSet<RelishTag>>;
fn list_nonrels(&self, lsn: Lsn) -> Result<HashSet<RelishTag>>;
//------------------------------------------------------------------------------
// Public PUT functions, to update the repository with new page versions.
@@ -201,6 +201,7 @@ impl WALRecord {
///
/// Tests that should work the same with any Repository/Timeline implementation.
///
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -29,7 +29,7 @@ use zenith_utils::lsn::Lsn;
const MAX_MBR_BLKNO: u32 =
pg_constants::MAX_MULTIXACT_ID / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
const ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
///
/// Import all relation data pages from local disk into the repository.
@@ -130,7 +130,7 @@ pub fn import_timeline_from_postgres_datadir(
}
for entry in fs::read_dir(path.join("pg_twophase"))? {
let entry = entry?;
let xid = u32::from_str_radix(&entry.path().to_str().unwrap(), 16)?;
let xid = u32::from_str_radix(entry.path().to_str().unwrap(), 16)?;
import_nonrel_file(timeline, lsn, RelishTag::TwoPhase { xid }, &entry.path())?;
}
// TODO: Scan pg_tblspc
@@ -429,7 +429,7 @@ pub fn save_decoded_record(
},
rpageno,
lsn,
ZERO_PAGE,
ZERO_PAGE.clone(),
)?;
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
@@ -486,7 +486,7 @@ pub fn save_decoded_record(
},
rpageno,
lsn,
ZERO_PAGE,
ZERO_PAGE.clone(),
)?;
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
let pageno = buf.get_u32_le();
@@ -499,7 +499,7 @@ pub fn save_decoded_record(
},
rpageno,
lsn,
ZERO_PAGE,
ZERO_PAGE.clone(),
)?;
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
@@ -597,19 +597,16 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
// TODO This implementation is very inefficient -
// it scans all non-rels only to find FileNodeMaps
for tag in timeline.list_nonrels(req_lsn)? {
match tag {
RelishTag::FileNodeMap { spcnode, dbnode } => {
if spcnode == src_tablespace_id && dbnode == src_db_id {
let img = timeline.get_page_at_lsn_nowait(tag, 0, req_lsn)?;
let new_tag = RelishTag::FileNodeMap {
spcnode: tablespace_id,
dbnode: db_id,
};
timeline.put_page_image(new_tag, 0, lsn, img)?;
break;
}
if let RelishTag::FileNodeMap { spcnode, dbnode } = tag {
if spcnode == src_tablespace_id && dbnode == src_db_id {
let img = timeline.get_page_at_lsn_nowait(tag, 0, req_lsn)?;
let new_tag = RelishTag::FileNodeMap {
spcnode: tablespace_id,
dbnode: db_id,
};
timeline.put_page_image(new_tag, 0, lsn, img)?;
break;
}
_ => {} // do nothing
}
}
info!(
@@ -785,17 +782,14 @@ fn save_clog_truncate_record(
// instead.
let req_lsn = min(timeline.get_last_record_lsn(), lsn);
for obj in timeline.list_nonrels(req_lsn)? {
match obj {
RelishTag::Slru { slru, segno } => {
if slru == SlruKind::Clog {
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
timeline.put_unlink(RelishTag::Slru { slru, segno }, lsn)?;
trace!("unlink CLOG segment {:>04X} at lsn {}", segno, lsn);
}
if let RelishTag::Slru { slru, segno } = obj {
if slru == SlruKind::Clog {
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
timeline.put_unlink(RelishTag::Slru { slru, segno }, lsn)?;
trace!("unlink CLOG segment {:>04X} at lsn {}", segno, lsn);
}
}
_ => {}
}
}

View File

@@ -197,6 +197,7 @@ impl WalStreamDecoder {
}
#[allow(dead_code)]
#[derive(Default)]
pub struct DecodedBkpBlock {
/* Is this block ref in use? */
//in_use: bool,
@@ -229,25 +230,7 @@ pub struct DecodedBkpBlock {
impl DecodedBkpBlock {
pub fn new() -> DecodedBkpBlock {
DecodedBkpBlock {
rnode_spcnode: 0,
rnode_dbnode: 0,
rnode_relnode: 0,
forknum: 0,
blkno: 0,
flags: 0,
has_image: false,
apply_image: false,
will_init: false,
hole_offset: 0,
hole_length: 0,
bimg_len: 0,
bimg_info: 0,
has_data: false,
data_len: 0,
}
Default::default()
}
}

View File

@@ -457,7 +457,7 @@ fn write_wal_file(
{
Ok(mut file) => {
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
file.write_all(&ZERO_BLOCK)?;
file.write_all(ZERO_BLOCK)?;
}
wal_file = file;
}

View File

@@ -209,7 +209,7 @@ impl WalRedoManager for PostgresRedoManager {
let process = (*process_guard).as_ref().unwrap();
self.runtime
.block_on(self.handle_apply_request(&process, &request))
.block_on(self.handle_apply_request(process, &request))
};
end_time = Instant::now();
@@ -453,7 +453,7 @@ impl PostgresRedoProcess {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
let datadir = conf.tenant_path(&tenantid).join("wal-redo-datadir");
let datadir = conf.tenant_path(tenantid).join("wal-redo-datadir");
// Create empty data directory for wal-redo postgres, deleting old one first.
if datadir.exists() {

View File

@@ -189,11 +189,11 @@ pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
pub const XLP_LONG_HEADER: u16 = 0x0002;
pub const PG_MAJORVERSION: &'static str = "14";
pub const PG_MAJORVERSION: &str = "14";
// List of subdirectories inside pgdata.
// Copied from src/bin/initdb/initdb.c
pub const PGDATA_SUBDIRS: [&'static str; 22] = [
pub const PGDATA_SUBDIRS: [&str; 22] = [
"global",
"pg_wal/archive_status",
"pg_commit_ts",
@@ -218,11 +218,11 @@ pub const PGDATA_SUBDIRS: [&'static str; 22] = [
"pg_logical/mappings",
];
pub const PGDATA_SPECIAL_FILES: [&'static str; 4] = [
pub const PGDATA_SPECIAL_FILES: [&str; 4] = [
"pg_hba.conf",
"pg_ident.conf",
"postgresql.conf",
"postgresql.auto.conf",
];
pub static PG_HBA: &'static str = include_str!("../samples/pg_hba.conf");
pub static PG_HBA: &str = include_str!("../samples/pg_hba.conf");

View File

@@ -37,6 +37,7 @@ pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::<XLogPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::<XLogLongPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
#[allow(clippy::identity_op)]
pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
pub type XLogRecPtr = u64;
@@ -173,12 +174,11 @@ fn find_end_of_wal_segment(
let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS;
wal_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]);
crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]);
crc = !crc;
} else {
crc ^= 0xFFFFFFFFu32;
crc = crc32c_append(crc, &buf[page_offs..page_offs + n]);
crc = !crc;
}
crc = !crc;
rec_offs += n;
offs += n;
contlen -= n;
@@ -465,7 +465,7 @@ mod tests {
let waldump_output = std::str::from_utf8(&waldump_output.stderr).unwrap();
println!("waldump_output = '{}'", &waldump_output);
let re = Regex::new(r"invalid record length at (.+):").unwrap();
let caps = re.captures(&waldump_output).unwrap();
let caps = re.captures(waldump_output).unwrap();
let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
// 5. Rename file to partial to actually find last valid lsn

View File

@@ -56,7 +56,7 @@ impl CPlaneApi {
md5::compute([stored_hash.as_bytes(), salt].concat())
);
let received_hash = std::str::from_utf8(&md5_response)?;
let received_hash = std::str::from_utf8(md5_response)?;
println!(
"auth: {} rh={} sh={} ssh={} {:?}",

View File

@@ -143,10 +143,10 @@ fn main() -> anyhow::Result<()> {
// for each connection.
thread::Builder::new()
.name("Proxy thread".into())
.spawn(move || proxy::thread_main(&state, pageserver_listener))?,
.spawn(move || proxy::thread_main(state, pageserver_listener))?,
thread::Builder::new()
.name("Mgmt thread".into())
.spawn(move || mgmt::thread_main(&state, mgmt_listener))?,
.spawn(move || mgmt::thread_main(state, mgmt_listener))?,
];
for t in threads.into_iter() {

View File

@@ -148,7 +148,7 @@ impl ReplicationConn {
}
});
let (mut start_pos, mut stop_pos) = Self::parse_start_stop(&cmd)?;
let (mut start_pos, mut stop_pos) = Self::parse_start_stop(cmd)?;
let mut wal_seg_size: usize;
loop {

View File

@@ -95,6 +95,12 @@ impl SafeKeeperState {
}
}
impl Default for SafeKeeperState {
fn default() -> Self {
Self::new()
}
}
// protocol messages
/// Initial Proposer -> Acceptor message
@@ -205,7 +211,7 @@ impl ProposerAcceptorMessage {
let rec_size = hdr
.end_lsn
.checked_sub(hdr.begin_lsn)
.ok_or(anyhow!("begin_lsn > end_lsn in AppendRequest"))?
.ok_or_else(|| anyhow!("begin_lsn > end_lsn in AppendRequest"))?
.0 as usize;
if rec_size > MAX_SEND_SIZE {
bail!(
@@ -217,10 +223,7 @@ impl ProposerAcceptorMessage {
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
stream.read_exact(&mut wal_data_vec)?;
let wal_data = Bytes::from(wal_data_vec);
let msg = AppendRequest {
h: hdr,
wal_data: wal_data,
};
let msg = AppendRequest { h: hdr, wal_data };
Ok(ProposerAcceptorMessage::AppendRequest(msg))
}
@@ -378,6 +381,7 @@ where
}
/// Handle request to append WAL.
#[allow(clippy::comparison_chain)]
fn handle_append_request(&mut self, msg: &AppendRequest) -> Result<AcceptorProposerMessage> {
// log first AppendRequest from this proposer
if self.elected_proposer_term < msg.h.term {
@@ -492,7 +496,7 @@ mod tests {
let mut vote_resp = sk.process_msg(&vote_request);
match vote_resp.unwrap() {
AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given != 0),
_ => assert!(false),
r => panic!("unexpected response: {:?}", r),
}
// reboot...
@@ -506,7 +510,7 @@ mod tests {
vote_resp = sk.process_msg(&vote_request);
match vote_resp.unwrap() {
AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given == 0),
_ => assert!(false),
r => panic!("unexpected response: {:?}", r),
}
}
@@ -540,7 +544,7 @@ mod tests {
ar_hdr.begin_lsn = Lsn(2);
ar_hdr.end_lsn = Lsn(3);
append_request = AppendRequest {
h: ar_hdr.clone(),
h: ar_hdr,
wal_data: Bytes::from_static(b"b"),
};
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));

View File

@@ -127,7 +127,7 @@ impl SharedState {
if let CreateControlFile::False = create {
bail!("control file is empty");
}
return Ok((file, SafeKeeperState::new()));
Ok((file, SafeKeeperState::new()))
} else {
match SafeKeeperState::des_from(&mut file) {
Err(e) => {
@@ -144,7 +144,7 @@ impl SharedState {
SK_FORMAT_VERSION
);
}
return Ok((file, s));
Ok((file, s))
}
}
}
@@ -220,11 +220,8 @@ impl Timeline {
commit_lsn = min(shared_state.sk.flush_lsn, shared_state.sk.s.commit_lsn);
// if this is AppendResponse, fill in proper hot standby feedback
match rmsg {
AcceptorProposerMessage::AppendResponse(ref mut resp) => {
resp.hs_feedback = shared_state.hs_feedback.clone();
}
_ => (),
if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg {
resp.hs_feedback = shared_state.hs_feedback.clone();
}
}
// Ping wal sender that new data might be available.
@@ -401,7 +398,7 @@ impl Storage for FileStorage {
{
Ok(mut file) => {
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
file.write_all(&ZERO_BLOCK)?;
file.write_all(ZERO_BLOCK)?;
}
wal_file = file;
}

View File

@@ -359,7 +359,7 @@ fn get_branch_infos(
}
fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let pageserver = PageServerNode::from_env(&env);
let pageserver = PageServerNode::from_env(env);
match tenant_match.subcommand() {
("list", Some(_)) => {
for tenant in pageserver.tenant_list()? {
@@ -381,12 +381,12 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result
}
fn handle_branch(branch_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let pageserver = PageServerNode::from_env(&env);
let pageserver = PageServerNode::from_env(env);
if let Some(branchname) = branch_match.value_of("branchname") {
let startpoint_str = branch_match
.value_of("start-point")
.ok_or(anyhow!("Missing start-point"))?;
.ok_or_else(|| anyhow!("Missing start-point"))?;
let tenantid: ZTenantId = branch_match
.value_of("tenantid")
.map_or(Ok(env.tenantid), |value| value.parse())?;

View File

@@ -42,6 +42,7 @@ lazy_static! {
// performed by the process.
// We know the the size of the block, so we can determine the I/O bytes out of it.
// The value might be not 100% exact, but should be fine for Prometheus metrics in this case.
#[allow(clippy::unnecessary_cast)]
fn update_io_metrics() {
let mut usage = rusage {
ru_utime: timeval {

View File

@@ -8,7 +8,8 @@
use hex::{self, FromHex};
use serde::de::Error;
use serde::{self, Deserializer, Serializer};
use std::{fs, path::PathBuf};
use std::fs;
use std::path::Path;
use anyhow::{bail, Result};
use jsonwebtoken::{
@@ -43,8 +44,8 @@ where
{
let opt: Option<String> = Option::deserialize(deserializer)?;
match opt {
Some(tid) => return Ok(Some(ZTenantId::from_hex(tid).map_err(Error::custom)?)),
None => return Ok(None),
Some(tid) => Ok(Some(ZTenantId::from_hex(tid).map_err(Error::custom)?)),
None => Ok(None),
}
}
@@ -91,7 +92,7 @@ pub struct JwtAuth {
}
impl JwtAuth {
pub fn new<'a>(decoding_key: DecodingKey<'a>) -> Self {
pub fn new(decoding_key: DecodingKey<'_>) -> Self {
Self {
decoding_key: decoding_key.into_static(),
validation: Validation {
@@ -102,7 +103,7 @@ impl JwtAuth {
}
}
pub fn from_key_path(key_path: &PathBuf) -> Result<Self> {
pub fn from_key_path(key_path: &Path) -> Result<Self> {
let public_key = fs::read_to_string(key_path)?;
Ok(Self::new(DecodingKey::from_rsa_pem(public_key.as_bytes())?))
}
@@ -113,8 +114,8 @@ impl JwtAuth {
}
// this function is used only for testing purposes in CLI e g generate tokens during init
pub fn encode_from_key_path(claims: &Claims, key_path: &PathBuf) -> Result<String> {
pub fn encode_from_key_path(claims: &Claims, key_path: &Path) -> Result<String> {
let key_data = fs::read_to_string(key_path)?;
let key = EncodingKey::from_rsa_pem(&key_data.as_bytes())?;
let key = EncodingKey::from_rsa_pem(key_data.as_bytes())?;
Ok(encode(&Header::new(JWT_ALGORITHM), claims, &key)?)
}

View File

@@ -95,13 +95,13 @@ pub fn attach_openapi_ui(
fn parse_token(header_value: &str) -> Result<&str, ApiError> {
// header must be in form Bearer <token>
let (prefix, token) = header_value.split_once(' ').ok_or(ApiError::Unauthorized(
"malformed authorization header".to_string(),
))?;
let (prefix, token) = header_value
.split_once(' ')
.ok_or_else(|| ApiError::Unauthorized("malformed authorization header".to_string()))?;
if prefix != "Bearer" {
Err(ApiError::Unauthorized(
return Err(ApiError::Unauthorized(
"malformed authorization header".to_string(),
))?
));
}
Ok(token)
}
@@ -123,9 +123,11 @@ pub fn auth_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
.map_err(|_| ApiError::Unauthorized("malformed jwt token".to_string()))?;
req.set_context(data.claims);
}
None => Err(ApiError::Unauthorized(
"missing authorization header".to_string(),
))?,
None => {
return Err(ApiError::Unauthorized(
"missing authorization header".to_string(),
))
}
}
}
Ok(req)

View File

@@ -1,6 +1,8 @@
//! zenith_utils is intended to be a place to put code that is shared
//! between other crates in this repository.
#![allow(clippy::manual_range_contains)]
/// `Lsn` type implements common tasks on Log Sequence Numbers
pub mod lsn;
/// SeqWait allows waiting for a future sequence number to arrive