fix all clippy complaints in this branch

This commit is contained in:
Christian Schwarz
2025-08-06 17:21:24 +02:00
parent 1877b70a35
commit 1635390a96
4 changed files with 51 additions and 53 deletions

View File

@@ -1,9 +1,9 @@
use std::{pin::Pin, str::FromStr, sync::Arc, time::Duration}; use std::{str::FromStr, sync::Arc, time::Duration};
use anyhow::Context; use anyhow::Context;
use chrono::Utc; use chrono::Utc;
use compute_api::spec::PageserverProtocol; use compute_api::spec::PageserverProtocol;
use futures::{StreamExt, stream::FuturesUnordered}; use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
use postgres::SimpleQueryMessage; use postgres::SimpleQueryMessage;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{Instrument, error, info, info_span, instrument, warn}; use tracing::{Instrument, error, info, info_span, instrument, warn};
@@ -163,40 +163,34 @@ async fn attempt(lease_id: String, compute: &Arc<ComputeNode>) -> anyhow::Result
shard_id=%connect_info.tenant_shard_id.shard_slug(), shard_id=%connect_info.tenant_shard_id.shard_slug(),
timeline_id=%timeline_id, timeline_id=%timeline_id,
); );
let logging_wrapper = let logging_wrapper = async |fut| {
|fut: Pin<Box<dyn Future<Output = anyhow::Result<Option<chrono::DateTime<Utc>>>>>>| { async move {
async move { // TODO: timeout?
// TODO: timeout? match fut.await {
match fut.await { Ok(Some(v)) => {
Ok(Some(v)) => { info!("lease obtained");
info!("lease obtained"); Ok(Some(v))
Ok(Some(v)) }
} Ok(None) => {
Ok(None) => { error!("pageserver rejected our request");
error!("pageserver rejected our request"); Ok(None)
Ok(None) }
} Err(err) => {
Err(err) => { error!("communication failure: {err:?}");
error!("communication failure: {err:?}"); Err(())
Err(())
}
} }
} }
.instrument(logging_span) }
}; .instrument(logging_span)
.await
};
let fut = match PageserverProtocol::from_connstring(&connect_info.connstring)? { let fut = match PageserverProtocol::from_connstring(&connect_info.connstring)? {
PageserverProtocol::Libpq => logging_wrapper(Box::pin(attempt_one_libpq( PageserverProtocol::Libpq => logging_wrapper(
connect_info, attempt_one_libpq(connect_info, timeline_id, lease_id.clone(), lsn).boxed(),
timeline_id, ),
lease_id.clone(), PageserverProtocol::Grpc => logging_wrapper(Box::pin(
lsn, attempt_one_grpc(connect_info, timeline_id, lease_id.clone(), lsn).boxed(),
))), )),
PageserverProtocol::Grpc => logging_wrapper(Box::pin(attempt_one_grpc(
connect_info,
timeline_id,
lease_id.clone(),
lsn,
))),
}; };
futs.push(fut); futs.push(fut);
} }
@@ -280,7 +274,7 @@ async fn attempt_one_grpc(
tenant_shard_id.tenant_id, tenant_shard_id.tenant_id,
timeline_id, timeline_id,
tenant_shard_id.to_index(), tenant_shard_id.to_index(),
auth.map(String::from), auth,
None, None,
) )
.await?; .await?;

View File

@@ -15,7 +15,10 @@
//! receivers should expect all sorts of junk from senders. This also allows the sender to use e.g. //! receivers should expect all sorts of junk from senders. This also allows the sender to use e.g.
//! stream combinators without dealing with errors, and avoids validating the same message twice. //! stream combinators without dealing with errors, and avoids validating the same message twice.
use std::{fmt::Display, time::{Duration, SystemTime, UNIX_EPOCH}}; use std::{
fmt::Display,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use bytes::Bytes; use bytes::Bytes;
use chrono::Utc; use chrono::Utc;
@@ -783,7 +786,7 @@ impl TryFrom<proto::LeaseStandbyHorizonRequest> for LeaseStandbyHorizonRequest {
if pb.lsn == 0 { if pb.lsn == 0 {
return Err(ProtocolError::Missing("lsn")); return Err(ProtocolError::Missing("lsn"));
} }
if pb.lease_id.len() == 0 { if pb.lease_id.is_empty() {
return Err(ProtocolError::Invalid("lease_id", pb.lease_id)); return Err(ProtocolError::Invalid("lease_id", pb.lease_id));
} }
Ok(Self { Ok(Self {
@@ -831,8 +834,9 @@ impl From<LeaseStandbyHorizonResponse> for proto::LeaseStandbyHorizonResponse {
fn from(response: LeaseStandbyHorizonResponse) -> Self { fn from(response: LeaseStandbyHorizonResponse) -> Self {
Self { Self {
expiration: Some(prost_types::Timestamp { expiration: Some(prost_types::Timestamp {
seconds: response.expiration.timestamp() as i64, seconds: response.expiration.timestamp(),
nanos: response.expiration.timestamp_subsec_nanos() as i32, nanos: i32::try_from(response.expiration.timestamp_subsec_nanos())
.expect("should fit in i32 max"),
}), }),
} }
} }

View File

@@ -8929,15 +8929,15 @@ mod tests {
], ],
// image layers // image layers
vec![ vec![
(Lsn(0x10), vec![(key1, test_img("metadata key 1"))]), (Lsn(0x10), vec![(key1, test_img("metadata key 1"))]),
( (
Lsn(0x30), Lsn(0x30),
vec![ vec![
(key0, test_img("metadata key 0")), (key0, test_img("metadata key 0")),
(key3, test_img("metadata key 3")), (key3, test_img("metadata key 3")),
], ],
), ),
], ],
Lsn(0x30), Lsn(0x30),
) )
.await?; .await?;
@@ -9600,7 +9600,7 @@ mod tests {
let initial_leases = [0x30, 0x50, 0x70]; let initial_leases = [0x30, 0x50, 0x70];
let mut expirations = Vec::new(); let mut expirations = Vec::new();
initial_leases.iter().enumerate().for_each(|(i, n)| { initial_leases.iter().enumerate().for_each(|(i, n)| {
let lease_id = format!("test_lease_{}", i); let lease_id = format!("test_lease_{i}");
expirations.push( expirations.push(
timeline timeline
.lease_standby_horizon(lease_id, Lsn(*n), &ctx) .lease_standby_horizon(lease_id, Lsn(*n), &ctx)
@@ -9684,9 +9684,9 @@ mod tests {
// Should be able to read at any LSN between any standby_horizon and tip // Should be able to read at any LSN between any standby_horizon and tip
let readable_lsns = (legacy.0..=end_lsn.0) let readable_lsns = (legacy.0..=end_lsn.0)
.chain(leases.iter().map(|(lsn, _)| (lsn.0..=end_lsn.0)).flatten()) .chain(leases.iter().flat_map(|(lsn, _)| (lsn.0..=end_lsn.0)))
.dedup() .dedup()
.map(|lsn| Lsn(lsn)) .map(Lsn)
.collect_vec(); .collect_vec();
for lsn in readable_lsns { for lsn in readable_lsns {
timeline timeline

View File

@@ -231,7 +231,7 @@ impl Horizons {
// Violation of this invariant would constitute a bug in gc: // Violation of this invariant would constitute a bug in gc:
// it should // it should
for (lease_id, lease) in inner.leases_by_id.iter() { for (lease_id, lease) in inner.leases_by_id.iter() {
if !(lease.lsn >= *applied_gc_cutoff_lsn) { if lease.lsn < *applied_gc_cutoff_lsn {
warn!(?lease_id, applied_gc_cutoff_lsn=%*applied_gc_cutoff_lsn, "lease is below the applied gc cutoff"); warn!(?lease_id, applied_gc_cutoff_lsn=%*applied_gc_cutoff_lsn, "lease is below the applied gc cutoff");
bug = true; bug = true;
} }
@@ -286,8 +286,8 @@ impl Horizons {
let inner = self.inner.lock().unwrap(); let inner = self.inner.lock().unwrap();
inner inner
.leases_by_id .leases_by_id
.iter() .values()
.map(|(_, lease)| (lease.lsn, lease.valid_until)) .map(|Lease { valid_until, lsn }| (*lsn, *valid_until))
.collect() .collect()
} }
} }