mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
Fix 1.66 Clippy warnings (#3178)
1.66 release speeds up compile times for over 10% according to tests.
Also its Clippy finds plenty of old nits in our code:
* useless conversion, `foo as u8` where `foo: u8` and similar, removed
`as u8` and similar
* useless references and dereferenced (that were automatically adjusted
by the compiler), removed various `&` and `*`
* bool -> u8 conversion via `if/else`, changed to `u8::from`
* Map `.iter()` calls where only values were used, changed to
`.values()` instead
Standing out lints:
* `Eq` is missing in our protoc generated structs. Silenced, does not
seem crucial for us.
* `fn default` looks like the one from `Default` trait, so I've
implemented that instead and replaced the `dummy_*` method in tests with
`::default()` invocation
* Clippy detected that
```
if retry_attempt < u32::MAX {
retry_attempt += 1;
}
```
is a saturating add and proposed to replace it.
This commit is contained in:
@@ -175,7 +175,7 @@ impl ComputeNode {
|
||||
let start_time = Utc::now();
|
||||
|
||||
let sync_handle = Command::new(&self.pgbin)
|
||||
.args(&["--sync-safekeepers"])
|
||||
.args(["--sync-safekeepers"])
|
||||
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()
|
||||
@@ -253,7 +253,7 @@ impl ComputeNode {
|
||||
|
||||
// Run postgres as a child process.
|
||||
let mut pg = Command::new(&self.pgbin)
|
||||
.args(&["-D", &self.pgdata])
|
||||
.args(["-D", &self.pgdata])
|
||||
.spawn()
|
||||
.expect("cannot start postgres process");
|
||||
|
||||
|
||||
@@ -549,7 +549,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
|
||||
table.load_preset(comfy_table::presets::NOTHING);
|
||||
|
||||
table.set_header(&[
|
||||
table.set_header([
|
||||
"NODE",
|
||||
"ADDRESS",
|
||||
"TIMELINE",
|
||||
@@ -584,7 +584,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
.map(|name| name.as_str())
|
||||
.unwrap_or("?");
|
||||
|
||||
table.add_row(&[
|
||||
table.add_row([
|
||||
node_name.as_str(),
|
||||
&node.address.to_string(),
|
||||
&node.timeline_id.to_string(),
|
||||
|
||||
@@ -17,7 +17,7 @@ pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
|
||||
"storage_broker",
|
||||
&env.base_data_dir,
|
||||
&env.storage_broker_bin(),
|
||||
&args,
|
||||
args,
|
||||
[],
|
||||
background_process::InitialPidFile::Create(&storage_broker_pid_file_path(env)),
|
||||
|| {
|
||||
|
||||
@@ -44,7 +44,7 @@ impl ComputeControlPlane {
|
||||
let mut nodes = BTreeMap::default();
|
||||
let pgdatadirspath = &env.pg_data_dirs_path();
|
||||
|
||||
for tenant_dir in fs::read_dir(&pgdatadirspath)
|
||||
for tenant_dir in fs::read_dir(pgdatadirspath)
|
||||
.with_context(|| format!("failed to list {}", pgdatadirspath.display()))?
|
||||
{
|
||||
let tenant_dir = tenant_dir?;
|
||||
@@ -67,8 +67,8 @@ impl ComputeControlPlane {
|
||||
fn get_port(&mut self) -> u16 {
|
||||
1 + self
|
||||
.nodes
|
||||
.iter()
|
||||
.map(|(_name, node)| node.address.port())
|
||||
.values()
|
||||
.map(|node| node.address.port())
|
||||
.max()
|
||||
.unwrap_or(self.base_port)
|
||||
}
|
||||
@@ -183,7 +183,7 @@ impl PostgresNode {
|
||||
|
||||
fn sync_safekeepers(&self, auth_token: &Option<String>, pg_version: u32) -> Result<Lsn> {
|
||||
let pg_path = self.env.pg_bin_dir(pg_version)?.join("postgres");
|
||||
let mut cmd = Command::new(&pg_path);
|
||||
let mut cmd = Command::new(pg_path);
|
||||
|
||||
cmd.arg("--sync-safekeepers")
|
||||
.env_clear()
|
||||
@@ -261,7 +261,7 @@ impl PostgresNode {
|
||||
}
|
||||
|
||||
fn create_pgdata(&self) -> Result<()> {
|
||||
fs::create_dir_all(&self.pgdata()).with_context(|| {
|
||||
fs::create_dir_all(self.pgdata()).with_context(|| {
|
||||
format!(
|
||||
"could not create data directory {}",
|
||||
self.pgdata().display()
|
||||
@@ -478,7 +478,7 @@ impl PostgresNode {
|
||||
postgresql_conf_path.to_str().unwrap()
|
||||
)
|
||||
})?;
|
||||
fs::remove_dir_all(&self.pgdata())?;
|
||||
fs::remove_dir_all(self.pgdata())?;
|
||||
self.create_pgdata()?;
|
||||
|
||||
// 2. Bring back config files
|
||||
@@ -514,7 +514,7 @@ impl PostgresNode {
|
||||
"Destroying postgres data directory '{}'",
|
||||
self.pgdata().to_str().unwrap()
|
||||
);
|
||||
fs::remove_dir_all(&self.pgdata())?;
|
||||
fs::remove_dir_all(self.pgdata())?;
|
||||
} else {
|
||||
self.pg_ctl(&["stop"], &None)?;
|
||||
}
|
||||
|
||||
@@ -404,7 +404,7 @@ impl LocalEnv {
|
||||
}
|
||||
}
|
||||
|
||||
fs::create_dir(&base_path)?;
|
||||
fs::create_dir(base_path)?;
|
||||
|
||||
// generate keys for jwt
|
||||
// openssl genrsa -out private_key.pem 2048
|
||||
@@ -413,7 +413,7 @@ impl LocalEnv {
|
||||
private_key_path = base_path.join("auth_private_key.pem");
|
||||
let keygen_output = Command::new("openssl")
|
||||
.arg("genrsa")
|
||||
.args(&["-out", private_key_path.to_str().unwrap()])
|
||||
.args(["-out", private_key_path.to_str().unwrap()])
|
||||
.arg("2048")
|
||||
.stdout(Stdio::null())
|
||||
.output()
|
||||
@@ -430,10 +430,10 @@ impl LocalEnv {
|
||||
// openssl rsa -in private_key.pem -pubout -outform PEM -out public_key.pem
|
||||
let keygen_output = Command::new("openssl")
|
||||
.arg("rsa")
|
||||
.args(&["-in", private_key_path.to_str().unwrap()])
|
||||
.args(["-in", private_key_path.to_str().unwrap()])
|
||||
.arg("-pubout")
|
||||
.args(&["-outform", "PEM"])
|
||||
.args(&["-out", public_key_path.to_str().unwrap()])
|
||||
.args(["-outform", "PEM"])
|
||||
.args(["-out", public_key_path.to_str().unwrap()])
|
||||
.stdout(Stdio::null())
|
||||
.output()
|
||||
.context("failed to generate auth private key")?;
|
||||
|
||||
@@ -241,7 +241,7 @@ impl PageServerNode {
|
||||
let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
|
||||
args.push(Cow::Borrowed("--init"));
|
||||
|
||||
let init_output = Command::new(&self.env.pageserver_bin())
|
||||
let init_output = Command::new(self.env.pageserver_bin())
|
||||
.args(args.iter().map(Cow::as_ref))
|
||||
.envs(self.pageserver_env_variables()?)
|
||||
.output()
|
||||
|
||||
@@ -323,7 +323,7 @@ impl PagestreamFeMessage {
|
||||
match self {
|
||||
Self::Exists(req) => {
|
||||
bytes.put_u8(0);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
@@ -333,7 +333,7 @@ impl PagestreamFeMessage {
|
||||
|
||||
Self::Nblocks(req) => {
|
||||
bytes.put_u8(1);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
@@ -343,7 +343,7 @@ impl PagestreamFeMessage {
|
||||
|
||||
Self::GetPage(req) => {
|
||||
bytes.put_u8(2);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
@@ -354,7 +354,7 @@ impl PagestreamFeMessage {
|
||||
|
||||
Self::DbSize(req) => {
|
||||
bytes.put_u8(3);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.dbnode);
|
||||
}
|
||||
|
||||
@@ -14,8 +14,8 @@ pub fn transaction_id_set_status(xid: u32, status: u8, page: &mut BytesMut) {
|
||||
status
|
||||
);
|
||||
|
||||
let byteno: usize = ((xid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32)
|
||||
/ pg_constants::CLOG_XACTS_PER_BYTE) as usize;
|
||||
let byteno: usize =
|
||||
((xid % pg_constants::CLOG_XACTS_PER_PAGE) / pg_constants::CLOG_XACTS_PER_BYTE) as usize;
|
||||
|
||||
let bshift: u8 =
|
||||
((xid % pg_constants::CLOG_XACTS_PER_BYTE) * pg_constants::CLOG_BITS_PER_XACT as u32) as u8;
|
||||
@@ -25,13 +25,13 @@ pub fn transaction_id_set_status(xid: u32, status: u8, page: &mut BytesMut) {
|
||||
}
|
||||
|
||||
pub fn transaction_id_get_status(xid: u32, page: &[u8]) -> u8 {
|
||||
let byteno: usize = ((xid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32)
|
||||
/ pg_constants::CLOG_XACTS_PER_BYTE) as usize;
|
||||
let byteno: usize =
|
||||
((xid % pg_constants::CLOG_XACTS_PER_PAGE) / pg_constants::CLOG_XACTS_PER_BYTE) as usize;
|
||||
|
||||
let bshift: u8 =
|
||||
((xid % pg_constants::CLOG_XACTS_PER_BYTE) * pg_constants::CLOG_BITS_PER_XACT as u32) as u8;
|
||||
|
||||
((page[byteno] >> bshift) & pg_constants::CLOG_XACT_BITMASK) as u8
|
||||
(page[byteno] >> bshift) & pg_constants::CLOG_XACT_BITMASK
|
||||
}
|
||||
|
||||
// See CLOGPagePrecedes in clog.c
|
||||
|
||||
@@ -333,7 +333,7 @@ impl CheckPoint {
|
||||
// We need this segment to start compute node.
|
||||
//
|
||||
pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, SerializeError> {
|
||||
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE as usize);
|
||||
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
|
||||
|
||||
let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
|
||||
let hdr = XLogLongPageHeaderData {
|
||||
@@ -574,7 +574,7 @@ mod tests {
|
||||
|
||||
// Rename file to partial to actually find last valid lsn, then rename it back.
|
||||
fs::rename(
|
||||
cfg.wal_dir().join(&last_segment),
|
||||
cfg.wal_dir().join(last_segment),
|
||||
cfg.wal_dir().join(format!("{}.partial", last_segment)),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -81,7 +81,7 @@ impl Conf {
|
||||
.new_pg_command("initdb")?
|
||||
.arg("-D")
|
||||
.arg(self.datadir.as_os_str())
|
||||
.args(&["-U", "postgres", "--no-instructions", "--no-sync"])
|
||||
.args(["-U", "postgres", "--no-instructions", "--no-sync"])
|
||||
.output()?;
|
||||
debug!("initdb output: {:?}", output);
|
||||
ensure!(
|
||||
@@ -105,12 +105,12 @@ impl Conf {
|
||||
let unix_socket_dir_path = unix_socket_dir.path().to_owned();
|
||||
let server_process = self
|
||||
.new_pg_command("postgres")?
|
||||
.args(&["-c", "listen_addresses="])
|
||||
.args(["-c", "listen_addresses="])
|
||||
.arg("-k")
|
||||
.arg(unix_socket_dir_path.as_os_str())
|
||||
.arg("-D")
|
||||
.arg(self.datadir.as_os_str())
|
||||
.args(&["-c", "logging_collector=on"]) // stderr will mess up with tests output
|
||||
.args(["-c", "logging_collector=on"]) // stderr will mess up with tests output
|
||||
.args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
|
||||
.stderr(Stdio::from(log_file))
|
||||
.spawn()?;
|
||||
@@ -142,7 +142,7 @@ impl Conf {
|
||||
);
|
||||
let output = self
|
||||
.new_pg_command("pg_waldump")?
|
||||
.args(&[
|
||||
.args([
|
||||
&first_segment_file.as_os_str(),
|
||||
&last_segment_file.as_os_str(),
|
||||
])
|
||||
|
||||
@@ -881,7 +881,7 @@ impl<'a> BeMessage<'a> {
|
||||
buf.put_u8(b'k');
|
||||
buf.put_u64(req.sent_ptr);
|
||||
buf.put_i64(req.timestamp);
|
||||
buf.put_u8(if req.request_reply { 1 } else { 0 });
|
||||
buf.put_u8(u8::from(req.request_reply));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,34 +157,34 @@ mod tests {
|
||||
assert_eq!(err.kind(), io::ErrorKind::AlreadyExists);
|
||||
|
||||
let invalid_dir_path = file_path.join("folder");
|
||||
create_dir_all(&invalid_dir_path).unwrap_err();
|
||||
create_dir_all(invalid_dir_path).unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_path_with_suffix_extension() {
|
||||
let p = PathBuf::from("/foo/bar");
|
||||
assert_eq!(
|
||||
&path_with_suffix_extension(&p, "temp").to_string_lossy(),
|
||||
&path_with_suffix_extension(p, "temp").to_string_lossy(),
|
||||
"/foo/bar.temp"
|
||||
);
|
||||
let p = PathBuf::from("/foo/bar");
|
||||
assert_eq!(
|
||||
&path_with_suffix_extension(&p, "temp.temp").to_string_lossy(),
|
||||
&path_with_suffix_extension(p, "temp.temp").to_string_lossy(),
|
||||
"/foo/bar.temp.temp"
|
||||
);
|
||||
let p = PathBuf::from("/foo/bar.baz");
|
||||
assert_eq!(
|
||||
&path_with_suffix_extension(&p, "temp.temp").to_string_lossy(),
|
||||
&path_with_suffix_extension(p, "temp.temp").to_string_lossy(),
|
||||
"/foo/bar.baz.temp.temp"
|
||||
);
|
||||
let p = PathBuf::from("/foo/bar.baz");
|
||||
assert_eq!(
|
||||
&path_with_suffix_extension(&p, ".temp").to_string_lossy(),
|
||||
&path_with_suffix_extension(p, ".temp").to_string_lossy(),
|
||||
"/foo/bar.baz..temp"
|
||||
);
|
||||
let p = PathBuf::from("/foo/bar/dir/");
|
||||
assert_eq!(
|
||||
&path_with_suffix_extension(&p, ".temp").to_string_lossy(),
|
||||
&path_with_suffix_extension(p, ".temp").to_string_lossy(),
|
||||
"/foo/bar/dir..temp"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ impl BufStream {
|
||||
|
||||
/// Returns a reference to the underlying TcpStream.
|
||||
fn get_ref(&self) -> &TcpStream {
|
||||
&*self.0.get_ref().0
|
||||
&self.0.get_ref().0
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -84,7 +84,7 @@ fn add_multithreaded_walredo_requesters(
|
||||
|
||||
barrier.wait();
|
||||
|
||||
execute_all(input, &*manager).unwrap();
|
||||
execute_all(input, &manager).unwrap();
|
||||
|
||||
barrier.wait();
|
||||
}
|
||||
|
||||
@@ -131,7 +131,7 @@ where
|
||||
|
||||
// Create pgdata subdirs structure
|
||||
for dir in PGDATA_SUBDIRS.iter() {
|
||||
let header = new_tar_header_dir(*dir)?;
|
||||
let header = new_tar_header_dir(dir)?;
|
||||
self.ar.append(&header, &mut io::empty())?;
|
||||
}
|
||||
|
||||
|
||||
@@ -126,7 +126,7 @@ fn initialize_config(
|
||||
);
|
||||
}
|
||||
// Supplement the CLI arguments with the config file
|
||||
let cfg_file_contents = std::fs::read_to_string(&cfg_file_path).with_context(|| {
|
||||
let cfg_file_contents = std::fs::read_to_string(cfg_file_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to read pageserver config at '{}'",
|
||||
cfg_file_path.display()
|
||||
@@ -180,7 +180,7 @@ fn initialize_config(
|
||||
if update_config {
|
||||
info!("Writing pageserver config to '{}'", cfg_file_path.display());
|
||||
|
||||
std::fs::write(&cfg_file_path, toml.to_string()).with_context(|| {
|
||||
std::fs::write(cfg_file_path, toml.to_string()).with_context(|| {
|
||||
format!(
|
||||
"Failed to write pageserver config to '{}'",
|
||||
cfg_file_path.display()
|
||||
|
||||
@@ -60,7 +60,7 @@ fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
|
||||
let control_file = ControlFileData::decode(&std::fs::read(&control_file_path)?)?;
|
||||
let control_file = ControlFileData::decode(&std::fs::read(control_file_path)?)?;
|
||||
println!("{control_file:?}");
|
||||
let control_file_initdb = Lsn(control_file.checkPoint);
|
||||
println!(
|
||||
@@ -79,7 +79,7 @@ fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
fn handle_metadata(path: &Path, arg_matches: &clap::ArgMatches) -> Result<(), anyhow::Error> {
|
||||
let metadata_bytes = std::fs::read(&path)?;
|
||||
let metadata_bytes = std::fs::read(path)?;
|
||||
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
|
||||
println!("Current metadata:\n{meta:?}");
|
||||
let mut update_meta = false;
|
||||
@@ -110,7 +110,7 @@ fn handle_metadata(path: &Path, arg_matches: &clap::ArgMatches) -> Result<(), an
|
||||
|
||||
if update_meta {
|
||||
let metadata_bytes = meta.to_bytes()?;
|
||||
std::fs::write(&path, &metadata_bytes)?;
|
||||
std::fs::write(path, metadata_bytes)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -722,7 +722,7 @@ impl PageServerConf {
|
||||
auth_validation_public_key_path: None,
|
||||
remote_storage_config: None,
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::dummy_conf(),
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
|
||||
broker_keepalive_interval: Duration::from_secs(5000),
|
||||
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
|
||||
|
||||
@@ -267,7 +267,7 @@ fn import_wal(walpath: &Path, tline: &Timeline, startpoint: Lsn, endpoint: Lsn)
|
||||
}
|
||||
|
||||
let nread = file.read_to_end(&mut buf)?;
|
||||
if nread != WAL_SEGMENT_SIZE - offset as usize {
|
||||
if nread != WAL_SEGMENT_SIZE - offset {
|
||||
// Maybe allow this for .partial files?
|
||||
error!("read only {} bytes from WAL file", nread);
|
||||
}
|
||||
|
||||
@@ -444,9 +444,7 @@ impl PageServerHandler {
|
||||
pgb.flush().await?;
|
||||
let mut copyin_stream = Box::pin(copyin_stream(pgb));
|
||||
let reader = SyncIoBridge::new(StreamReader::new(&mut copyin_stream));
|
||||
tokio::task::block_in_place(|| {
|
||||
import_wal_from_tar(&*timeline, reader, start_lsn, end_lsn)
|
||||
})?;
|
||||
tokio::task::block_in_place(|| import_wal_from_tar(&timeline, reader, start_lsn, end_lsn))?;
|
||||
info!("wal import complete");
|
||||
|
||||
// Drain the rest of the Copy data
|
||||
@@ -658,7 +656,7 @@ impl PageServerHandler {
|
||||
tokio::task::block_in_place(|| {
|
||||
let basebackup =
|
||||
basebackup::Basebackup::new(&mut writer, &timeline, lsn, prev_lsn, full_backup)?;
|
||||
tracing::Span::current().record("lsn", &basebackup.lsn.to_string().as_str());
|
||||
tracing::Span::current().record("lsn", basebackup.lsn.to_string().as_str());
|
||||
basebackup.send_tarball()
|
||||
})?;
|
||||
pgb.write_message(&BeMessage::CopyDone)?;
|
||||
|
||||
@@ -710,14 +710,14 @@ impl<'a> DatadirModification<'a> {
|
||||
let mut dbdir = DbDirectory::des(&buf)?;
|
||||
|
||||
let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
|
||||
if r == None || r == Some(false) {
|
||||
if r.is_none() || r == Some(false) {
|
||||
// The dbdir entry didn't exist, or it contained a
|
||||
// 'false'. The 'insert' call already updated it with
|
||||
// 'true', now write the updated 'dbdirs' map back.
|
||||
let buf = DbDirectory::ser(&dbdir)?;
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
}
|
||||
if r == None {
|
||||
if r.is_none() {
|
||||
// Create RelDirectory
|
||||
let buf = RelDirectory::ser(&RelDirectory {
|
||||
rels: HashSet::new(),
|
||||
@@ -1095,9 +1095,7 @@ impl<'a> DatadirModification<'a> {
|
||||
// work directly with Images, and we never need to read actual
|
||||
// data pages. We could handle this if we had to, by calling
|
||||
// the walredo manager, but let's keep it simple for now.
|
||||
return PageReconstructResult::from(anyhow::anyhow!(
|
||||
"unexpected pending WAL record"
|
||||
));
|
||||
PageReconstructResult::from(anyhow::anyhow!("unexpected pending WAL record"))
|
||||
}
|
||||
} else {
|
||||
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
|
||||
@@ -1425,7 +1423,7 @@ fn twophase_key_range(xid: TransactionId) -> Range<Key> {
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: if overflowed { 1 } else { 0 },
|
||||
field5: u8::from(overflowed),
|
||||
field6: next_xid,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -519,9 +519,9 @@ impl RemoteTimelineClient {
|
||||
let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
|
||||
current_remote_index_part
|
||||
.layer_metadata
|
||||
.iter()
|
||||
.values()
|
||||
// If we don't have the file size for the layer, don't account for it in the metric.
|
||||
.map(|(_, ilmd)| ilmd.file_size.unwrap_or(0))
|
||||
.map(|ilmd| ilmd.file_size.unwrap_or(0))
|
||||
.sum()
|
||||
} else {
|
||||
0
|
||||
|
||||
@@ -337,7 +337,7 @@ impl TimelineUninitMark {
|
||||
let uninit_mark_parent = uninit_mark_file
|
||||
.parent()
|
||||
.with_context(|| format!("Uninit mark file {uninit_mark_file:?} has no parent"))?;
|
||||
ignore_absent_files(|| fs::remove_file(&uninit_mark_file)).with_context(|| {
|
||||
ignore_absent_files(|| fs::remove_file(uninit_mark_file)).with_context(|| {
|
||||
format!("Failed to remove uninit mark file at path {uninit_mark_file:?}")
|
||||
})?;
|
||||
crashsafe::fsync(uninit_mark_parent).context("Failed to fsync uninit mark parent")?;
|
||||
@@ -2321,12 +2321,12 @@ impl Tenant {
|
||||
// See more for on the issue #2748 condenced out of the initial PR review.
|
||||
let mut shared_cache = self.cached_logical_sizes.lock().await;
|
||||
|
||||
size::gather_inputs(self, logical_sizes_at_once, &mut *shared_cache).await
|
||||
size::gather_inputs(self, logical_sizes_at_once, &mut shared_cache).await
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_timeline_and_uninit_mark(timeline_dir: &Path, uninit_mark: &Path) -> anyhow::Result<()> {
|
||||
fs::remove_dir_all(&timeline_dir)
|
||||
fs::remove_dir_all(timeline_dir)
|
||||
.or_else(|e| {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
// we can leave the uninit mark without a timeline dir,
|
||||
@@ -2342,7 +2342,7 @@ fn remove_timeline_and_uninit_mark(timeline_dir: &Path, uninit_mark: &Path) -> a
|
||||
timeline_dir.display()
|
||||
)
|
||||
})?;
|
||||
fs::remove_file(&uninit_mark).with_context(|| {
|
||||
fs::remove_file(uninit_mark).with_context(|| {
|
||||
format!(
|
||||
"Failed to remove timeline uninit mark file {}",
|
||||
uninit_mark.display()
|
||||
@@ -2442,7 +2442,7 @@ fn try_create_target_tenant_dir(
|
||||
anyhow::bail!("failpoint tenant-creation-before-tmp-rename");
|
||||
});
|
||||
|
||||
fs::rename(&temporary_tenant_dir, target_tenant_directory).with_context(|| {
|
||||
fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| {
|
||||
format!(
|
||||
"failed to move tenant {} temporary directory {} into the permanent one {}",
|
||||
tenant_id,
|
||||
@@ -2496,9 +2496,9 @@ fn run_initdb(
|
||||
);
|
||||
|
||||
let initdb_output = Command::new(&initdb_bin_path)
|
||||
.args(&["-D", &initdb_target_dir.to_string_lossy()])
|
||||
.args(&["-U", &conf.superuser])
|
||||
.args(&["-E", "utf8"])
|
||||
.args(["-D", &initdb_target_dir.to_string_lossy()])
|
||||
.args(["-U", &conf.superuser])
|
||||
.args(["-E", "utf8"])
|
||||
.arg("--no-instructions")
|
||||
// This is only used for a temporary installation that is deleted shortly after,
|
||||
// so no need to fsync it
|
||||
@@ -2660,9 +2660,11 @@ pub mod harness {
|
||||
|
||||
// Disable automatic GC and compaction to make the unit tests more deterministic.
|
||||
// The tests perform them manually if needed.
|
||||
let mut tenant_conf = TenantConf::dummy_conf();
|
||||
tenant_conf.gc_period = Duration::ZERO;
|
||||
tenant_conf.compaction_period = Duration::ZERO;
|
||||
let tenant_conf = TenantConf {
|
||||
gc_period: Duration::ZERO,
|
||||
compaction_period: Duration::ZERO,
|
||||
..TenantConf::default()
|
||||
};
|
||||
|
||||
let tenant_id = TenantId::generate();
|
||||
fs::create_dir_all(conf.tenant_path(&tenant_id))?;
|
||||
|
||||
@@ -139,7 +139,7 @@ impl<'a, const L: usize> OnDiskNode<'a, L> {
|
||||
off += keys_len as u64;
|
||||
|
||||
let values_off = off as usize;
|
||||
let values_len = num_children as usize * VALUE_SZ as usize;
|
||||
let values_len = num_children as usize * VALUE_SZ;
|
||||
//off += values_len as u64;
|
||||
|
||||
let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
|
||||
@@ -177,7 +177,7 @@ impl<'a, const L: usize> OnDiskNode<'a, L> {
|
||||
while low < high {
|
||||
let mid = low + size / 2;
|
||||
|
||||
let key_off = mid as usize * self.suffix_len as usize;
|
||||
let key_off = mid * self.suffix_len as usize;
|
||||
let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
|
||||
// Does this match?
|
||||
keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
|
||||
@@ -328,7 +328,7 @@ where
|
||||
while idx < node.num_children as usize {
|
||||
let suffix = &node.keys[key_off..key_off + suffix_len];
|
||||
keybuf[prefix_len..].copy_from_slice(suffix);
|
||||
let value = node.value(idx as usize);
|
||||
let value = node.value(idx);
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if node.level == 0 {
|
||||
// leaf
|
||||
@@ -368,7 +368,7 @@ where
|
||||
key_off -= suffix_len;
|
||||
let suffix = &node.keys[key_off..key_off + suffix_len];
|
||||
keybuf[prefix_len..].copy_from_slice(suffix);
|
||||
let value = node.value(idx as usize);
|
||||
let value = node.value(idx);
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if node.level == 0 {
|
||||
// leaf
|
||||
@@ -629,7 +629,7 @@ impl<const L: usize> BuildNode<L> {
|
||||
self.keys.extend(&key[self.prefix.len()..]);
|
||||
self.values.extend(value.0);
|
||||
|
||||
assert!(self.keys.len() == self.num_children as usize * self.suffix_len as usize);
|
||||
assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
|
||||
assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
|
||||
|
||||
self.size += self.suffix_len + VALUE_SZ;
|
||||
@@ -674,7 +674,7 @@ impl<const L: usize> BuildNode<L> {
|
||||
self.size -= prefix_len * self.num_children as usize;
|
||||
self.size += prefix_len;
|
||||
|
||||
assert!(self.keys.len() == self.num_children as usize * self.suffix_len as usize);
|
||||
assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
|
||||
assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
|
||||
|
||||
true
|
||||
@@ -684,7 +684,7 @@ impl<const L: usize> BuildNode<L> {
|
||||
/// Serialize the node to on-disk format.
|
||||
///
|
||||
fn pack(&self) -> Bytes {
|
||||
assert!(self.keys.len() == self.num_children as usize * self.suffix_len as usize);
|
||||
assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
|
||||
assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
|
||||
assert!(self.num_children > 0);
|
||||
|
||||
@@ -940,7 +940,7 @@ mod tests {
|
||||
let t = -(f64::ln(u));
|
||||
let key_int = (t * 1000000.0) as u128;
|
||||
|
||||
all_data.insert(key_int as u128, idx as u64);
|
||||
all_data.insert(key_int, idx as u64);
|
||||
}
|
||||
|
||||
// Build a tree from it
|
||||
|
||||
@@ -91,7 +91,7 @@ impl EphemeralFile {
|
||||
break;
|
||||
}
|
||||
|
||||
off += n as usize;
|
||||
off += n;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -569,7 +569,7 @@ impl ImageLayerWriterInner {
|
||||
lsn: self.lsn,
|
||||
},
|
||||
);
|
||||
std::fs::rename(self.path, &final_path)?;
|
||||
std::fs::rename(self.path, final_path)?;
|
||||
|
||||
trace!("created image layer {}", layer.path().display());
|
||||
|
||||
|
||||
@@ -255,8 +255,7 @@ pub fn save_metadata(
|
||||
// fsync the parent directory to ensure the directory entry is durable
|
||||
if first_save {
|
||||
let timeline_dir = File::open(
|
||||
&path
|
||||
.parent()
|
||||
path.parent()
|
||||
.expect("Metadata should always have a parent dir"),
|
||||
)?;
|
||||
timeline_dir.sync_all()?;
|
||||
|
||||
@@ -1327,10 +1327,8 @@ impl Timeline {
|
||||
index_part.timeline_layers.len()
|
||||
);
|
||||
remote_client.init_upload_queue(index_part)?;
|
||||
let local_only_filenames = self
|
||||
.create_remote_layers(index_part, local_layers, disk_consistent_lsn)
|
||||
.await?;
|
||||
local_only_filenames
|
||||
self.create_remote_layers(index_part, local_layers, disk_consistent_lsn)
|
||||
.await?
|
||||
}
|
||||
None => {
|
||||
info!("initializing upload queue as empty");
|
||||
@@ -3425,9 +3423,9 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> {
|
||||
let mut new_path = path.to_owned();
|
||||
|
||||
for i in 0u32.. {
|
||||
new_path.set_file_name(format!("{}.{}.old", filename, i));
|
||||
new_path.set_file_name(format!("{filename}.{i}.old"));
|
||||
if !new_path.exists() {
|
||||
std::fs::rename(&path, &new_path)?;
|
||||
std::fs::rename(path, &new_path)?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -191,11 +191,10 @@ impl TenantConfOpt {
|
||||
}
|
||||
}
|
||||
|
||||
impl TenantConf {
|
||||
pub fn default() -> TenantConf {
|
||||
impl Default for TenantConf {
|
||||
fn default() -> Self {
|
||||
use defaults::*;
|
||||
|
||||
TenantConf {
|
||||
Self {
|
||||
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_timeout: humantime::parse_duration(DEFAULT_CHECKPOINT_TIMEOUT)
|
||||
.expect("cannot parse default checkpoint timeout"),
|
||||
@@ -220,29 +219,4 @@ impl TenantConf {
|
||||
trace_read_requests: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dummy_conf() -> Self {
|
||||
TenantConf {
|
||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_timeout: Duration::from_secs(600),
|
||||
compaction_target_size: 4 * 1024 * 1024,
|
||||
compaction_period: Duration::from_secs(10),
|
||||
compaction_threshold: defaults::DEFAULT_COMPACTION_THRESHOLD,
|
||||
gc_horizon: defaults::DEFAULT_GC_HORIZON,
|
||||
gc_period: Duration::from_secs(10),
|
||||
image_creation_threshold: defaults::DEFAULT_IMAGE_CREATION_THRESHOLD,
|
||||
pitr_interval: Duration::from_secs(60 * 60),
|
||||
walreceiver_connect_timeout: humantime::parse_duration(
|
||||
defaults::DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
|
||||
)
|
||||
.unwrap(),
|
||||
lagging_wal_timeout: humantime::parse_duration(
|
||||
defaults::DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT,
|
||||
)
|
||||
.unwrap(),
|
||||
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.unwrap(),
|
||||
trace_read_requests: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -317,7 +317,7 @@ impl<'a> WalIngest<'a> {
|
||||
spcnode: blk.rnode_spcnode,
|
||||
dbnode: blk.rnode_dbnode,
|
||||
relnode: blk.rnode_relnode,
|
||||
forknum: blk.forknum as u8,
|
||||
forknum: blk.forknum,
|
||||
};
|
||||
|
||||
//
|
||||
@@ -1131,7 +1131,7 @@ mod tests {
|
||||
async fn test_relsize() -> Result<()> {
|
||||
let tenant = TenantHarness::create("test_relsize")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
|
||||
let mut walingest = init_walingest_test(&*tline)?;
|
||||
let mut walingest = init_walingest_test(&tline)?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
walingest.put_rel_creation(&mut m, TESTREL_A)?;
|
||||
@@ -1155,7 +1155,7 @@ mod tests {
|
||||
.no_ondemand_download()?;
|
||||
m.commit()?;
|
||||
|
||||
assert_current_logical_size(&*tline, Lsn(0x50));
|
||||
assert_current_logical_size(&tline, Lsn(0x50));
|
||||
|
||||
// The relation was created at LSN 2, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
@@ -1239,7 +1239,7 @@ mod tests {
|
||||
let mut m = tline.begin_modification(Lsn(0x60));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?;
|
||||
m.commit()?;
|
||||
assert_current_logical_size(&*tline, Lsn(0x60));
|
||||
assert_current_logical_size(&tline, Lsn(0x60));
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
@@ -1347,7 +1347,7 @@ mod tests {
|
||||
async fn test_drop_extend() -> Result<()> {
|
||||
let tenant = TenantHarness::create("test_drop_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
|
||||
let mut walingest = init_walingest_test(&*tline)?;
|
||||
let mut walingest = init_walingest_test(&tline)?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
walingest
|
||||
@@ -1416,7 +1416,7 @@ mod tests {
|
||||
async fn test_truncate_extend() -> Result<()> {
|
||||
let tenant = TenantHarness::create("test_truncate_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
|
||||
let mut walingest = init_walingest_test(&*tline)?;
|
||||
let mut walingest = init_walingest_test(&tline)?;
|
||||
|
||||
// Create a 20 MB relation (the size is arbitrary)
|
||||
let relsize = 20 * 1024 * 1024 / 8192;
|
||||
@@ -1554,7 +1554,7 @@ mod tests {
|
||||
async fn test_large_rel() -> Result<()> {
|
||||
let tenant = TenantHarness::create("test_large_rel")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
|
||||
let mut walingest = init_walingest_test(&*tline)?;
|
||||
let mut walingest = init_walingest_test(&tline)?;
|
||||
|
||||
let mut lsn = 0x10;
|
||||
for blknum in 0..RELSEG_SIZE + 1 {
|
||||
@@ -1567,7 +1567,7 @@ mod tests {
|
||||
m.commit()?;
|
||||
}
|
||||
|
||||
assert_current_logical_size(&*tline, Lsn(lsn));
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
@@ -1587,7 +1587,7 @@ mod tests {
|
||||
.no_ondemand_download()?,
|
||||
RELSEG_SIZE
|
||||
);
|
||||
assert_current_logical_size(&*tline, Lsn(lsn));
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
|
||||
// Truncate another block
|
||||
lsn += 0x10;
|
||||
@@ -1600,7 +1600,7 @@ mod tests {
|
||||
.no_ondemand_download()?,
|
||||
RELSEG_SIZE - 1
|
||||
);
|
||||
assert_current_logical_size(&*tline, Lsn(lsn));
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
|
||||
// Truncate to 1500, and then truncate all the way down to 0, one block at a time
|
||||
// This tests the behavior at segment boundaries
|
||||
@@ -1619,7 +1619,7 @@ mod tests {
|
||||
|
||||
size -= 1;
|
||||
}
|
||||
assert_current_logical_size(&*tline, Lsn(lsn));
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -805,7 +805,7 @@ fn wal_stream_connection_config(
|
||||
auth_token: Option<&str>,
|
||||
) -> anyhow::Result<PgConnectionConfig> {
|
||||
let (host, port) =
|
||||
parse_host_port(&listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
let port = port.unwrap_or(5432);
|
||||
Ok(PgConnectionConfig::new_host_port(host, port)
|
||||
.extend_options([
|
||||
|
||||
@@ -409,7 +409,7 @@ impl PostgresRedoManager {
|
||||
key
|
||||
);
|
||||
for &xid in xids {
|
||||
let pageno = xid as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
|
||||
@@ -459,7 +459,7 @@ impl PostgresRedoManager {
|
||||
key
|
||||
);
|
||||
for &xid in xids {
|
||||
let pageno = xid as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
|
||||
@@ -647,7 +647,7 @@ impl PostgresRedoProcess {
|
||||
|
||||
info!("running initdb in {}", datadir.display());
|
||||
let initdb = Command::new(pg_bin_dir_path.join("initdb"))
|
||||
.args(&["-D", &datadir.to_string_lossy()])
|
||||
.args(["-D", &datadir.to_string_lossy()])
|
||||
.arg("-N")
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
|
||||
|
||||
@@ -48,7 +48,7 @@ impl ServerSecret {
|
||||
|
||||
Self {
|
||||
iterations: 4096,
|
||||
salt_base64: base64::encode(&mocked_salt),
|
||||
salt_base64: base64::encode(mocked_salt),
|
||||
stored_key: ScramKey::default(),
|
||||
server_key: ScramKey::default(),
|
||||
doomed: true,
|
||||
@@ -68,7 +68,7 @@ impl ServerSecret {
|
||||
|
||||
Some(Self {
|
||||
iterations,
|
||||
salt_base64: base64::encode(&salt),
|
||||
salt_base64: base64::encode(salt),
|
||||
stored_key: password.client_key().sha256(),
|
||||
server_key: password.server_key(),
|
||||
doomed: false,
|
||||
|
||||
@@ -239,7 +239,7 @@ mod test {
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: &TenantTimelineId,
|
||||
) -> Result<(FileStorage, SafeKeeperState)> {
|
||||
fs::create_dir_all(&conf.timeline_dir(ttid)).expect("failed to create timeline dir");
|
||||
fs::create_dir_all(conf.timeline_dir(ttid)).expect("failed to create timeline dir");
|
||||
Ok((
|
||||
FileStorage::restore_new(ttid, conf)?,
|
||||
FileStorage::load_control_file_conf(conf, ttid)?,
|
||||
@@ -250,7 +250,7 @@ mod test {
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: &TenantTimelineId,
|
||||
) -> Result<(FileStorage, SafeKeeperState)> {
|
||||
fs::create_dir_all(&conf.timeline_dir(ttid)).expect("failed to create timeline dir");
|
||||
fs::create_dir_all(conf.timeline_dir(ttid)).expect("failed to create timeline dir");
|
||||
let state = SafeKeeperState::empty();
|
||||
let storage = FileStorage::create_new(ttid, conf, state.clone())?;
|
||||
Ok((storage, state))
|
||||
|
||||
@@ -425,7 +425,7 @@ impl Collector for TimelineCollector {
|
||||
.set(tli.num_computes as i64);
|
||||
self.acceptor_term
|
||||
.with_label_values(labels)
|
||||
.set(tli.persisted_state.acceptor_state.term as u64);
|
||||
.set(tli.persisted_state.acceptor_state.term);
|
||||
self.written_wal_bytes
|
||||
.with_label_values(labels)
|
||||
.set(tli.wal_storage.write_wal_bytes);
|
||||
|
||||
@@ -346,9 +346,7 @@ impl WalBackupTask {
|
||||
backup_lsn, commit_lsn, e
|
||||
);
|
||||
|
||||
if retry_attempt < u32::MAX {
|
||||
retry_attempt += 1;
|
||||
}
|
||||
retry_attempt = retry_attempt.saturating_add(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -387,7 +385,7 @@ async fn backup_single_segment(
|
||||
) -> Result<()> {
|
||||
let segment_file_path = seg.file_path(timeline_dir)?;
|
||||
let remote_segment_path = segment_file_path
|
||||
.strip_prefix(&workspace_dir)
|
||||
.strip_prefix(workspace_dir)
|
||||
.context("Failed to strip workspace dir prefix")
|
||||
.and_then(RemotePath::new)
|
||||
.with_context(|| {
|
||||
|
||||
@@ -223,7 +223,7 @@ impl PhysicalStorage {
|
||||
// Rename partial file to completed file
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
fs::rename(&wal_file_partial_path, &wal_file_path)?;
|
||||
fs::rename(wal_file_partial_path, wal_file_path)?;
|
||||
} else {
|
||||
// otherwise, file can be reused later
|
||||
self.file = Some(file);
|
||||
@@ -249,7 +249,7 @@ impl PhysicalStorage {
|
||||
|
||||
while !buf.is_empty() {
|
||||
// Extract WAL location for this block
|
||||
let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size) as usize;
|
||||
let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size);
|
||||
let segno = self.write_lsn.segment_number(self.wal_seg_size);
|
||||
|
||||
// If crossing a WAL boundary, only write up until we reach wal segment size.
|
||||
@@ -366,7 +366,7 @@ impl Storage for PhysicalStorage {
|
||||
self.fdatasync_file(&mut unflushed_file)?;
|
||||
}
|
||||
|
||||
let xlogoff = end_pos.segment_offset(self.wal_seg_size) as usize;
|
||||
let xlogoff = end_pos.segment_offset(self.wal_seg_size);
|
||||
let segno = end_pos.segment_number(self.wal_seg_size);
|
||||
|
||||
// Remove all segments after the given LSN.
|
||||
@@ -383,7 +383,7 @@ impl Storage for PhysicalStorage {
|
||||
// Make segment partial once again
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
fs::rename(&wal_file_path, &wal_file_partial_path)?;
|
||||
fs::rename(wal_file_path, wal_file_partial_path)?;
|
||||
}
|
||||
|
||||
// Update LSNs
|
||||
@@ -416,7 +416,7 @@ fn remove_segments_from_disk(
|
||||
let mut min_removed = u64::MAX;
|
||||
let mut max_removed = u64::MIN;
|
||||
|
||||
for entry in fs::read_dir(&timeline_dir)? {
|
||||
for entry in fs::read_dir(timeline_dir)? {
|
||||
let entry = entry?;
|
||||
let entry_path = entry.path();
|
||||
let fname = entry_path.file_name().unwrap();
|
||||
@@ -499,7 +499,7 @@ impl WalReader {
|
||||
|
||||
// How much to read and send in message? We cannot cross the WAL file
|
||||
// boundary, and we don't want send more than provided buffer.
|
||||
let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize;
|
||||
let xlogoff = self.pos.segment_offset(self.wal_seg_size);
|
||||
let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
|
||||
|
||||
// Read some data from the file.
|
||||
@@ -518,7 +518,7 @@ impl WalReader {
|
||||
|
||||
/// Open WAL segment at the current position of the reader.
|
||||
async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead>>> {
|
||||
let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize;
|
||||
let xlogoff = self.pos.segment_offset(self.wal_seg_size);
|
||||
let segno = self.pos.segment_number(self.wal_seg_size);
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
|
||||
let wal_file_path = self.timeline_dir.join(wal_file_name);
|
||||
|
||||
@@ -160,7 +160,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
for _i in 0..args.num_pubs {
|
||||
let c = None;
|
||||
tokio::spawn(publish(c, args.num_subs as u64));
|
||||
tokio::spawn(publish(c, args.num_subs));
|
||||
}
|
||||
|
||||
h.await?;
|
||||
|
||||
@@ -13,6 +13,10 @@ use proto::{
|
||||
|
||||
// Code generated by protobuf.
|
||||
pub mod proto {
|
||||
// Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
|
||||
// we don't use these types for anything but broker data transmission,
|
||||
// so it's ok to ignore this one.
|
||||
#![allow(clippy::derive_partial_eq_without_eq)]
|
||||
tonic::include_proto!("storage_broker");
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user