diff --git a/compute_tools/src/ro_replica.rs b/compute_tools/src/ro_replica.rs index 113a16984c..92ff5ec8d1 100644 --- a/compute_tools/src/ro_replica.rs +++ b/compute_tools/src/ro_replica.rs @@ -1,9 +1,9 @@ -use std::{pin::Pin, str::FromStr, sync::Arc, time::Duration}; +use std::{str::FromStr, sync::Arc, time::Duration}; use anyhow::Context; use chrono::Utc; use compute_api::spec::PageserverProtocol; -use futures::{StreamExt, stream::FuturesUnordered}; +use futures::{FutureExt, StreamExt, stream::FuturesUnordered}; use postgres::SimpleQueryMessage; use tokio_util::sync::CancellationToken; use tracing::{Instrument, error, info, info_span, instrument, warn}; @@ -163,40 +163,34 @@ async fn attempt(lease_id: String, compute: &Arc) -> anyhow::Result shard_id=%connect_info.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, ); - let logging_wrapper = - |fut: Pin>>>>>| { - async move { - // TODO: timeout? - match fut.await { - Ok(Some(v)) => { - info!("lease obtained"); - Ok(Some(v)) - } - Ok(None) => { - error!("pageserver rejected our request"); - Ok(None) - } - Err(err) => { - error!("communication failure: {err:?}"); - Err(()) - } + let logging_wrapper = async |fut| { + async move { + // TODO: timeout? + match fut.await { + Ok(Some(v)) => { + info!("lease obtained"); + Ok(Some(v)) + } + Ok(None) => { + error!("pageserver rejected our request"); + Ok(None) + } + Err(err) => { + error!("communication failure: {err:?}"); + Err(()) } } - .instrument(logging_span) - }; + } + .instrument(logging_span) + .await + }; let fut = match PageserverProtocol::from_connstring(&connect_info.connstring)? { - PageserverProtocol::Libpq => logging_wrapper(Box::pin(attempt_one_libpq( - connect_info, - timeline_id, - lease_id.clone(), - lsn, - ))), - PageserverProtocol::Grpc => logging_wrapper(Box::pin(attempt_one_grpc( - connect_info, - timeline_id, - lease_id.clone(), - lsn, - ))), + PageserverProtocol::Libpq => logging_wrapper( + attempt_one_libpq(connect_info, timeline_id, lease_id.clone(), lsn).boxed(), + ), + PageserverProtocol::Grpc => logging_wrapper(Box::pin( + attempt_one_grpc(connect_info, timeline_id, lease_id.clone(), lsn).boxed(), + )), }; futs.push(fut); } @@ -280,7 +274,7 @@ async fn attempt_one_grpc( tenant_shard_id.tenant_id, timeline_id, tenant_shard_id.to_index(), - auth.map(String::from), + auth, None, ) .await?; diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index e6753f1346..3c10a410d2 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -15,7 +15,10 @@ //! receivers should expect all sorts of junk from senders. This also allows the sender to use e.g. //! stream combinators without dealing with errors, and avoids validating the same message twice. -use std::{fmt::Display, time::{Duration, SystemTime, UNIX_EPOCH}}; +use std::{ + fmt::Display, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; use bytes::Bytes; use chrono::Utc; @@ -783,7 +786,7 @@ impl TryFrom for LeaseStandbyHorizonRequest { if pb.lsn == 0 { return Err(ProtocolError::Missing("lsn")); } - if pb.lease_id.len() == 0 { + if pb.lease_id.is_empty() { return Err(ProtocolError::Invalid("lease_id", pb.lease_id)); } Ok(Self { @@ -831,8 +834,9 @@ impl From for proto::LeaseStandbyHorizonResponse { fn from(response: LeaseStandbyHorizonResponse) -> Self { Self { expiration: Some(prost_types::Timestamp { - seconds: response.expiration.timestamp() as i64, - nanos: response.expiration.timestamp_subsec_nanos() as i32, + seconds: response.expiration.timestamp(), + nanos: i32::try_from(response.expiration.timestamp_subsec_nanos()) + .expect("should fit in i32 max"), }), } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 78f878d144..72a3d800b9 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -8929,15 +8929,15 @@ mod tests { ], // image layers vec![ -(Lsn(0x10), vec![(key1, test_img("metadata key 1"))]), -( - Lsn(0x30), - vec![ - (key0, test_img("metadata key 0")), - (key3, test_img("metadata key 3")), - ], - ), -], + (Lsn(0x10), vec![(key1, test_img("metadata key 1"))]), + ( + Lsn(0x30), + vec![ + (key0, test_img("metadata key 0")), + (key3, test_img("metadata key 3")), + ], + ), + ], Lsn(0x30), ) .await?; @@ -9600,7 +9600,7 @@ mod tests { let initial_leases = [0x30, 0x50, 0x70]; let mut expirations = Vec::new(); initial_leases.iter().enumerate().for_each(|(i, n)| { - let lease_id = format!("test_lease_{}", i); + let lease_id = format!("test_lease_{i}"); expirations.push( timeline .lease_standby_horizon(lease_id, Lsn(*n), &ctx) @@ -9684,9 +9684,9 @@ mod tests { // Should be able to read at any LSN between any standby_horizon and tip let readable_lsns = (legacy.0..=end_lsn.0) - .chain(leases.iter().map(|(lsn, _)| (lsn.0..=end_lsn.0)).flatten()) + .chain(leases.iter().flat_map(|(lsn, _)| (lsn.0..=end_lsn.0))) .dedup() - .map(|lsn| Lsn(lsn)) + .map(Lsn) .collect_vec(); for lsn in readable_lsns { timeline diff --git a/pageserver/src/tenant/timeline/standby_horizon.rs b/pageserver/src/tenant/timeline/standby_horizon.rs index 0865b13101..e2a88a802e 100644 --- a/pageserver/src/tenant/timeline/standby_horizon.rs +++ b/pageserver/src/tenant/timeline/standby_horizon.rs @@ -231,7 +231,7 @@ impl Horizons { // Violation of this invariant would constitute a bug in gc: // it should for (lease_id, lease) in inner.leases_by_id.iter() { - if !(lease.lsn >= *applied_gc_cutoff_lsn) { + if lease.lsn < *applied_gc_cutoff_lsn { warn!(?lease_id, applied_gc_cutoff_lsn=%*applied_gc_cutoff_lsn, "lease is below the applied gc cutoff"); bug = true; } @@ -286,8 +286,8 @@ impl Horizons { let inner = self.inner.lock().unwrap(); inner .leases_by_id - .iter() - .map(|(_, lease)| (lease.lsn, lease.valid_until)) + .values() + .map(|Lease { valid_until, lsn }| (*lsn, *valid_until)) .collect() } }