diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index ce19563679..270d6020d3 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -271,7 +271,6 @@ fn extract_pageserver_conninfo_from_guc( PageserverConnectionInfo { shards: pageserver_connstring_guc .split(',') - .into_iter() .enumerate() .map(|(i, connstr)| { ( diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index 0e800145dc..f265a5d955 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -83,13 +83,13 @@ fn acquire_lsn_lease_with_retry( spec.pageserver_conninfo .shards - .iter() - .map(|(_shardno, conninfo)| { + .values() + .map(|conninfo| { // FIXME: for now, this requires a libpq connection, the grpc API doesn't // have a "lease" method. let connstr = conninfo.libpq_url.as_ref().expect("missing libpq URL"); - let mut config = postgres::Config::from_str(&connstr).expect("Invalid connstr"); + let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr"); if let Some(storage_auth_token) = &spec.storage_auth_token { config.password(storage_auth_token.clone()); } diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index 2485fb6e79..b00c8a206c 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -121,7 +121,7 @@ where } HashMapInit { - shmem_handle: shmem_handle, + shmem_handle, shared_ptr, } } @@ -152,7 +152,7 @@ where let mut success = None; self.update_with_fn(key, |existing| { - if let Some(_) = existing { + if existing.is_some() { success = Some(false); UpdateAction::Nothing } else { @@ -294,7 +294,7 @@ where bucket_ptr.write(core::Bucket { hash: 0, next: if i < num_buckets { - i as u32 + 1 + i + 1 } else { inner.free_head }, @@ -317,8 +317,8 @@ where buckets = std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize); dictionary = std::slice::from_raw_parts_mut(dictionary_ptr, dictionary_size); } - for i in 0..dictionary.len() { - dictionary[i] = core::INVALID_POS; + for item in dictionary.iter_mut() { + *item = core::INVALID_POS; } for i in 0..old_num_buckets as usize { diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs index 8efbd4b36a..4f0032b158 100644 --- a/libs/neon-shmem/src/hash/core.rs +++ b/libs/neon-shmem/src/hash/core.rs @@ -90,8 +90,8 @@ where let dictionary = unsafe { std::slice::from_raw_parts_mut(dictionary_ptr, dictionary_size as usize) }; - for i in 0..dictionary.len() { - dictionary[i].write(INVALID_POS); + for item in dictionary.iter_mut() { + item.write(INVALID_POS); } // TODO: use std::slice::assume_init_mut() once it stabilizes unsafe { @@ -121,7 +121,7 @@ where let bucket = &self.buckets[next as usize]; let (bucket_key, bucket_value) = bucket.inner.as_ref().expect("entry is in use"); if bucket_key == key { - return Some(&bucket_value); + return Some(bucket_value); } next = bucket.next; } @@ -228,6 +228,6 @@ where bucket.next = INVALID_POS; bucket.inner = Some((key, value)); - return Ok(pos); + Ok(pos) } } diff --git a/libs/neon-shmem/src/hash/tests.rs b/libs/neon-shmem/src/hash/tests.rs index 073aea5220..425c935efd 100644 --- a/libs/neon-shmem/src/hash/tests.rs +++ b/libs/neon-shmem/src/hash/tests.rs @@ -81,7 +81,7 @@ fn sparse() { for _ in 0..10000 { loop { let key = rand::random::(); - if used_keys.get(&key).is_some() { + if used_keys.contains(&key) { continue; } used_keys.insert(key); diff --git a/libs/neonart/src/algorithm.rs b/libs/neonart/src/algorithm.rs index 4056dc5031..720604b1b3 100644 --- a/libs/neonart/src/algorithm.rs +++ b/libs/neonart/src/algorithm.rs @@ -163,7 +163,7 @@ fn next_recurse<'e, V: Value>( ) -> Result, ConcurrentUpdateError> { let rnode = node.read_lock_or_restart()?; let prefix = rnode.get_prefix(); - if prefix.len() != 0 { + if !prefix.is_empty() { path.extend_from_slice(prefix); } @@ -213,13 +213,14 @@ fn next_recurse<'e, V: Value>( } // This corresponds to the 'insertOpt' function in the paper -pub(crate) fn update_recurse<'e, 'g, K: Key, V: Value, A: ArtAllocator, F>( +#[allow(clippy::too_many_arguments)] +pub(crate) fn update_recurse<'e, K: Key, V: Value, A: ArtAllocator, F>( key: &[u8], value_fn: F, node: NodeRef<'e, V>, rparent: Option<(ReadLockedNodeRef, u8)>, rgrandparent: Option<(ReadLockedNodeRef, u8)>, - guard: &'g mut TreeWriteGuard<'e, K, V, A>, + guard: &'_ mut TreeWriteGuard<'e, K, V, A>, level: usize, orig_key: &[u8], ) -> Result<(), ArtError> @@ -248,8 +249,8 @@ where return Ok(()); } let prefix_match_len = prefix_match_len.unwrap(); - let key = &key[prefix_match_len as usize..]; - let level = level + prefix_match_len as usize; + let key = &key[prefix_match_len..]; + let level = level + prefix_match_len; if rnode.is_leaf() { assert_eq!(key.len(), 0); @@ -321,7 +322,7 @@ where }; wnode.write_unlock(); } - return Ok(()); + Ok(()) } else { let next_child = next_node.unwrap(); // checked above it's not None if let Some((ref rparent, _)) = rparent { @@ -357,14 +358,14 @@ impl std::fmt::Debug for PathElement { } } -pub(crate) fn dump_tree<'e, V: Value + std::fmt::Debug>( +pub(crate) fn dump_tree( root: RootPtr, - epoch_pin: &'e EpochPin, + epoch_pin: &'_ EpochPin, dst: &mut dyn std::io::Write, ) { let root_ref = NodeRef::from_root_ptr(root); - let _ = dump_recurse(&[], root_ref, &epoch_pin, 0, dst); + let _ = dump_recurse(&[], root_ref, epoch_pin, 0, dst); } // TODO: return an Err if writeln!() returns error, instead of unwrapping @@ -380,7 +381,7 @@ fn dump_recurse<'e, V: Value + std::fmt::Debug>( let rnode = node.read_lock_or_restart()?; let mut path = Vec::from(path); let prefix = rnode.get_prefix(); - if prefix.len() != 0 { + if !prefix.is_empty() { path.push(PathElement::Prefix(Vec::from(prefix))); } @@ -426,13 +427,13 @@ fn dump_recurse<'e, V: Value + std::fmt::Debug>( /// [foo]b -> [a]r -> value /// e -> [ls]e -> value ///``` -fn insert_split_prefix<'e, K: Key, V: Value, A: ArtAllocator>( +fn insert_split_prefix>( key: &[u8], value: V, node: &mut WriteLockedNodeRef, parent: &mut WriteLockedNodeRef, parent_key: u8, - guard: &'e TreeWriteGuard, + guard: &'_ TreeWriteGuard, ) -> Result<(), OutOfMemoryError> { let old_node = node; let old_prefix = old_node.get_prefix(); @@ -463,11 +464,11 @@ fn insert_split_prefix<'e, K: Key, V: Value, A: ArtAllocator>( Ok(()) } -fn insert_to_node<'e, K: Key, V: Value, A: ArtAllocator>( +fn insert_to_node>( wnode: &mut WriteLockedNodeRef, key: &[u8], value: V, - guard: &'e TreeWriteGuard, + guard: &'_ TreeWriteGuard, ) -> Result<(), OutOfMemoryError> { let value_child = allocate_node_for_value(&key[1..], value, guard.tree_writer.allocator)?; wnode.insert_child(key[0], value_child.into_ptr()); diff --git a/libs/neonart/src/algorithm/lock_and_version.rs b/libs/neonart/src/algorithm/lock_and_version.rs index ad3636a7d7..025897864c 100644 --- a/libs/neonart/src/algorithm/lock_and_version.rs +++ b/libs/neonart/src/algorithm/lock_and_version.rs @@ -105,13 +105,13 @@ impl AtomicLockAndVersion { } fn set_locked_bit(version: u64) -> u64 { - return version + 2; + version + 2 } fn is_obsolete(version: u64) -> bool { - return (version & 1) == 1; + (version & 1) == 1 } fn is_locked(version: u64) -> bool { - return (version & 2) == 2; + (version & 2) == 2 } diff --git a/libs/neonart/src/algorithm/node_ptr.rs b/libs/neonart/src/algorithm/node_ptr.rs index e97e5a7c63..a1b5a788de 100644 --- a/libs/neonart/src/algorithm/node_ptr.rs +++ b/libs/neonart/src/algorithm/node_ptr.rs @@ -305,14 +305,13 @@ impl NodePtr { &self, allocator: &impl ArtAllocator, ) -> Result, OutOfMemoryError> { - let bigger = match self.variant() { + match self.variant() { NodeVariant::Internal4(n) => n.grow(allocator), NodeVariant::Internal16(n) => n.grow(allocator), NodeVariant::Internal48(n) => n.grow(allocator), NodeVariant::Internal256(_) => panic!("cannot grow Internal256 node"), NodeVariant::Leaf(_) => panic!("cannot grow Leaf node"), - }; - bigger + } } pub(crate) fn insert_child(&mut self, key_byte: u8, child: NodePtr) { @@ -464,7 +463,7 @@ impl NodeInternal4 { new.extend_from_slice(prefix); new.push(prefix_byte); new.extend_from_slice(&self.prefix[0..self.prefix_len as usize]); - (&mut self.prefix[0..new.len()]).copy_from_slice(&new); + self.prefix[0..new.len()].copy_from_slice(&new); self.prefix_len = new.len() as u8; } @@ -558,7 +557,7 @@ impl NodeInternal4 { tag: NodeTag::Internal16, lock_and_version: AtomicLockAndVersion::new(), - prefix: self.prefix.clone(), + prefix: self.prefix, prefix_len: self.prefix_len, num_children: self.num_children, @@ -585,7 +584,7 @@ impl NodeInternal16 { new.extend_from_slice(prefix); new.push(prefix_byte); new.extend_from_slice(&self.prefix[0..self.prefix_len as usize]); - (&mut self.prefix[0..new.len()]).copy_from_slice(&new); + self.prefix[0..new.len()].copy_from_slice(&new); self.prefix_len = new.len() as u8; } @@ -679,7 +678,7 @@ impl NodeInternal16 { tag: NodeTag::Internal48, lock_and_version: AtomicLockAndVersion::new(), - prefix: self.prefix.clone(), + prefix: self.prefix, prefix_len: self.prefix_len, num_children: self.num_children, @@ -706,7 +705,7 @@ impl NodeInternal16 { tag: NodeTag::Internal4, lock_and_version: AtomicLockAndVersion::new(), - prefix: self.prefix.clone(), + prefix: self.prefix, prefix_len: self.prefix_len, num_children: self.num_children, @@ -736,7 +735,7 @@ impl NodeInternal48 { idx, self.num_children ); - assert!(shadow_indexes.get(&idx).is_none()); + assert!(!shadow_indexes.contains(&idx)); shadow_indexes.insert(idx); count += 1; } @@ -750,7 +749,7 @@ impl NodeInternal48 { new.extend_from_slice(prefix); new.push(prefix_byte); new.extend_from_slice(&self.prefix[0..self.prefix_len as usize]); - (&mut self.prefix[0..new.len()]).copy_from_slice(&new); + self.prefix[0..new.len()].copy_from_slice(&new); self.prefix_len = new.len() as u8; } @@ -853,7 +852,7 @@ impl NodeInternal48 { tag: NodeTag::Internal256, lock_and_version: AtomicLockAndVersion::new(), - prefix: self.prefix.clone(), + prefix: self.prefix, prefix_len: self.prefix_len, num_children: self.num_children as u16, @@ -879,7 +878,7 @@ impl NodeInternal48 { tag: NodeTag::Internal16, lock_and_version: AtomicLockAndVersion::new(), - prefix: self.prefix.clone(), + prefix: self.prefix, prefix_len: self.prefix_len, num_children: self.num_children, @@ -912,7 +911,7 @@ impl NodeInternal256 { new.extend_from_slice(prefix); new.push(prefix_byte); new.extend_from_slice(&self.prefix[0..self.prefix_len as usize]); - (&mut self.prefix[0..new.len()]).copy_from_slice(&new); + self.prefix[0..new.len()].copy_from_slice(&new); self.prefix_len = new.len() as u8; } @@ -987,7 +986,7 @@ impl NodeInternal256 { tag: NodeTag::Internal48, lock_and_version: AtomicLockAndVersion::new(), - prefix: self.prefix.clone(), + prefix: self.prefix, prefix_len: self.prefix_len, num_children: self.num_children as u8, @@ -1019,7 +1018,7 @@ impl NodeLeaf { new.extend_from_slice(prefix); new.push(prefix_byte); new.extend_from_slice(&self.prefix[0..self.prefix_len as usize]); - (&mut self.prefix[0..new.len()]).copy_from_slice(&new); + self.prefix[0..new.len()].copy_from_slice(&new); self.prefix_len = new.len() as u8; } diff --git a/libs/neonart/src/allocator.rs b/libs/neonart/src/allocator.rs index 8568357a2f..f95e251458 100644 --- a/libs/neonart/src/allocator.rs +++ b/libs/neonart/src/allocator.rs @@ -61,13 +61,11 @@ impl<'t, V: crate::Value> ArtMultiSlabAllocator<'t, V> { let (allocator_area, remain) = alloc_from_slice::>(area); let (tree_area, remain) = alloc_from_slice::>(remain); - let allocator = allocator_area.write(ArtMultiSlabAllocator { + allocator_area.write(ArtMultiSlabAllocator { tree_area: spin::Mutex::new(Some(tree_area)), inner: MultiSlabAllocator::new(remain, &Self::LAYOUTS), phantom_val: PhantomData, - }); - - allocator + }) } } diff --git a/libs/neonart/src/allocator/block.rs b/libs/neonart/src/allocator/block.rs index 5aa7d45188..9c1bb6e176 100644 --- a/libs/neonart/src/allocator/block.rs +++ b/libs/neonart/src/allocator/block.rs @@ -119,7 +119,7 @@ impl<'t> BlockAllocator<'t> { } // out of blocks - return INVALID_BLOCK; + INVALID_BLOCK } // TODO: this is currently unused. The slab allocator never releases blocks diff --git a/libs/neonart/src/allocator/slab.rs b/libs/neonart/src/allocator/slab.rs index aabbf6696d..d66e52d879 100644 --- a/libs/neonart/src/allocator/slab.rs +++ b/libs/neonart/src/allocator/slab.rs @@ -374,11 +374,11 @@ mod tests { assert!(unsafe { (*all[i]).val == i }); } - let distribution = Zipf::new(10 as f64, 1.1).unwrap(); + let distribution = Zipf::new(10.0, 1.1).unwrap(); let mut rng = rand::rng(); for _ in 0..100000 { slab.0.dump(); - let idx = (rng.sample(distribution) as usize).into(); + let idx = rng.sample(distribution) as usize; let ptr: *mut TestObject = all[idx]; if !ptr.is_null() { assert_eq!(unsafe { (*ptr).val }, idx); diff --git a/libs/neonart/src/epoch.rs b/libs/neonart/src/epoch.rs index eb4952ce67..a1a112bd17 100644 --- a/libs/neonart/src/epoch.rs +++ b/libs/neonart/src/epoch.rs @@ -3,7 +3,6 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use crossbeam_utils::CachePadded; -use spin; const NUM_SLOTS: usize = 1000; @@ -62,10 +61,8 @@ impl EpochShared { pub(crate) fn advance(&self) -> u64 { // Advance the global epoch let old_epoch = self.global_epoch.fetch_add(2, Ordering::Relaxed); - let new_epoch = old_epoch + 2; - // Anyone that release their pin after this will update their slot. - new_epoch + old_epoch + 2 } pub(crate) fn broadcast(&self) { @@ -99,10 +96,8 @@ impl EpochShared { let delta = now.wrapping_sub(this_epoch); if delta > u64::MAX / 2 { // this is very recent - } else { - if delta > now.wrapping_sub(oldest) { - oldest = this_epoch; - } + } else if delta > now.wrapping_sub(oldest) { + oldest = this_epoch; } } oldest diff --git a/libs/neonart/src/lib.rs b/libs/neonart/src/lib.rs index ea3527071c..5c1c36a91e 100644 --- a/libs/neonart/src/lib.rs +++ b/libs/neonart/src/lib.rs @@ -239,7 +239,7 @@ where phantom_key: PhantomData, } -impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator> TreeInitStruct<'t, K, V, A> { +impl<'t, K: Key, V: Value, A: ArtAllocator> TreeInitStruct<'t, K, V, A> { pub fn new(allocator: &'t A) -> TreeInitStruct<'t, K, V, A> { let tree_ptr = allocator.alloc_tree(); let tree_ptr = NonNull::new(tree_ptr).expect("out of memory"); @@ -295,7 +295,7 @@ impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteAccess<'t, K, V, A> { pub fn start_read(&'t self) -> TreeReadGuard<'t, K, V> { TreeReadGuard { - tree: &self.tree, + tree: self.tree, epoch_pin: self.epoch_handle.pin(), phantom_key: PhantomData, } @@ -305,7 +305,7 @@ impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteAccess<'t, K, V, A> { impl<'t, K: Key, V: Value> TreeReadAccess<'t, K, V> { pub fn start_read(&'t self) -> TreeReadGuard<'t, K, V> { TreeReadGuard { - tree: &self.tree, + tree: self.tree, epoch_pin: self.epoch_handle.pin(), phantom_key: PhantomData, } @@ -360,7 +360,7 @@ impl<'e, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'e, K, V, A> { let mut success = None; self.update_with_fn(key, |existing| { - if let Some(_) = existing { + if existing.is_some() { success = Some(false); UpdateAction::Nothing } else { @@ -461,11 +461,9 @@ where K: Key + for<'a> From<&'a [u8]>, { pub fn new_wrapping() -> TreeIterator { - let mut next_key = Vec::new(); - next_key.resize(K::KEY_LEN, 0); TreeIterator { done: false, - next_key, + next_key: vec![0; K::KEY_LEN], max_key: None, phantom_key: PhantomData, } @@ -495,11 +493,9 @@ where let mut wrapped_around = false; loop { assert_eq!(self.next_key.len(), K::KEY_LEN); - if let Some((k, v)) = algorithm::iter_next( - &mut self.next_key, - read_guard.tree.root, - &read_guard.epoch_pin, - ) { + if let Some((k, v)) = + algorithm::iter_next(&self.next_key, read_guard.tree.root, &read_guard.epoch_pin) + { assert_eq!(k.len(), K::KEY_LEN); assert_eq!(self.next_key.len(), K::KEY_LEN); diff --git a/libs/neonart/src/tests.rs b/libs/neonart/src/tests.rs index db674597f7..41f09051b1 100644 --- a/libs/neonart/src/tests.rs +++ b/libs/neonart/src/tests.rs @@ -102,7 +102,7 @@ fn sparse() { for _ in 0..10000 { loop { let key = rand::random::(); - if used_keys.get(&key).is_some() { + if used_keys.contains(&key) { continue; } used_keys.insert(key); @@ -182,7 +182,7 @@ fn test_iter>( let mut iter = TreeIterator::new(&(TestKey::MIN..TestKey::MAX)); loop { - let shadow_item = shadow_iter.next().map(|(k, v)| (k.clone(), v.clone())); + let shadow_item = shadow_iter.next().map(|(k, v)| (*k, *v)); let r = tree.start_read(); let item = iter.next(&r); @@ -194,8 +194,7 @@ fn test_iter>( tree.start_read().dump(&mut std::io::stderr()); eprintln!("SHADOW:"); - let mut si = shadow.iter(); - while let Some(si) = si.next() { + for si in shadow { eprintln!("key: {:?}, val: {}", si.0, si.1); } panic!( diff --git a/pageserver/client_grpc/examples/load_test.rs b/pageserver/client_grpc/examples/load_test.rs index b189daa5ea..5838fccb21 100644 --- a/pageserver/client_grpc/examples/load_test.rs +++ b/pageserver/client_grpc/examples/load_test.rs @@ -48,18 +48,11 @@ impl Drop for MockConnection { } } +#[derive(Default)] pub struct MockConnectionFactory { counter: AtomicU64, } -impl MockConnectionFactory { - pub fn new() -> Self { - MockConnectionFactory { - counter: AtomicU64::new(1), - } - } -} - #[async_trait::async_trait] impl PooledItemFactory for MockConnectionFactory { /// The trait on ConnectionPool expects: @@ -171,7 +164,7 @@ async fn main() { // -------------------------------------- // 1. Create factory and shared instrumentation // -------------------------------------- - let factory = Arc::new(MockConnectionFactory::new()); + let factory = Arc::new(MockConnectionFactory::default()); // Shared map: connection ID → Arc let usage_map: Arc>>> = diff --git a/pageserver/client_grpc/examples/request_tracker_load_test.rs b/pageserver/client_grpc/examples/request_tracker_load_test.rs deleted file mode 100644 index 5741b289a5..0000000000 --- a/pageserver/client_grpc/examples/request_tracker_load_test.rs +++ /dev/null @@ -1,137 +0,0 @@ -// examples/request_tracker_load_test.rs - -use pageserver_client_grpc::AuthInterceptor; -use pageserver_client_grpc::ClientCacheOptions; -use pageserver_client_grpc::PageserverClientAggregateMetrics; -use pageserver_client_grpc::client_cache::ConnectionPool; -use pageserver_client_grpc::client_cache::PooledItemFactory; -use pageserver_client_grpc::request_tracker::MockStreamFactory; -use pageserver_client_grpc::request_tracker::RequestTracker; -use pageserver_client_grpc::request_tracker::StreamReturner; -use std::{sync::Arc, time::Duration}; -use tokio; - -use pageserver_client_grpc::client_cache::ChannelFactory; - -use tonic::transport::Channel; - -use rand::prelude::*; - -use pageserver_api::key::Key; - -use utils::lsn::Lsn; -use utils::shard::ShardIndex; - -use futures::StreamExt; -use futures::stream::FuturesOrdered; - -use pageserver_page_api::proto; - -#[tokio::main] -async fn main() { - // 1) configure the client‐pool behavior - let client_cache_options = ClientCacheOptions { - max_delay_ms: 0, - drop_rate: 0.0, - hang_rate: 0.0, - connect_timeout: Duration::from_secs(10), - connect_backoff: Duration::from_millis(200), - max_consumers: 64, - error_threshold: 10, - max_idle_duration: Duration::from_secs(60), - max_total_connections: 12, - }; - - // 2) metrics collector (we assume Default is implemented) - let metrics = Arc::new(PageserverClientAggregateMetrics::new()); - let pool = ConnectionPool::::new( - Arc::new(MockStreamFactory::new()), - client_cache_options.connect_timeout, - client_cache_options.connect_backoff, - client_cache_options.max_consumers, - client_cache_options.error_threshold, - client_cache_options.max_idle_duration, - client_cache_options.max_total_connections, - Some(Arc::clone(&metrics)), - ); - - // ----------- - // There is no mock for the unary connection pool, so for now just - // don't use this pool - // - let channel_fact: Arc + Send + Sync> = - Arc::new(ChannelFactory::new( - "".to_string(), - client_cache_options.max_delay_ms, - client_cache_options.drop_rate, - client_cache_options.hang_rate, - )); - let unary_pool: Arc> = ConnectionPool::new( - Arc::clone(&channel_fact), - client_cache_options.connect_timeout, - client_cache_options.connect_backoff, - client_cache_options.max_consumers, - client_cache_options.error_threshold, - client_cache_options.max_idle_duration, - client_cache_options.max_total_connections, - Some(Arc::clone(&metrics)), - ); - - // ----------- - // Dummy auth interceptor. This is not used in this test. - let auth_interceptor = AuthInterceptor::new("dummy_tenant_id", "dummy_timeline_id", None); - let tracker = RequestTracker::new(pool, unary_pool, auth_interceptor, ShardIndex::unsharded()); - - // 4) fire off 10 000 requests in parallel - let mut handles = FuturesOrdered::new(); - for _i in 0..500000 { - let mut rng = rand::thread_rng(); - let r = 0..=1000000i128; - let key: i128 = rng.gen_range(r.clone()); - let key = Key::from_i128(key); - let (rel_tag, block_no) = key - .to_rel_block() - .expect("we filter non-rel-block keys out above"); - - let req2 = proto::GetPageRequest { - request_id: 0, - request_class: proto::GetPageClass::Normal as i32, - read_lsn: Some(proto::ReadLsn { - request_lsn: if rng.gen_bool(0.5) { - u64::from(Lsn::MAX) - } else { - 10000 - }, - not_modified_since_lsn: 10000, - }), - rel: Some(rel_tag.into()), - block_number: vec![block_no], - }; - let req_model = pageserver_page_api::GetPageRequest::try_from(req2.clone()); - - // RequestTracker is Clone, so we can share it - let mut tr = tracker.clone(); - let fut = async move { - let resp = tr.send_getpage_request(req_model.unwrap()).await.unwrap(); - // sanity‐check: the mock echo returns the same request_id - assert!(resp.request_id > 0); - }; - handles.push_back(fut); - - // empty future - let fut = async move {}; - fut.await; - } - - // print timestamp - println!("Starting 5000000 requests at: {}", chrono::Utc::now()); - // 5) wait for them all - for _i in 0..500000 { - handles.next().await.expect("Failed to get next handle"); - } - - // print timestamp - println!("Finished 5000000 requests at: {}", chrono::Utc::now()); - - println!("✅ All 100000 requests completed successfully"); -} diff --git a/pageserver/client_grpc/src/client_cache.rs b/pageserver/client_grpc/src/client_cache.rs index b366ad0878..6784c632ba 100644 --- a/pageserver/client_grpc/src/client_cache.rs +++ b/pageserver/client_grpc/src/client_cache.rs @@ -217,10 +217,7 @@ impl PooledItemFactory for ChannelFactory { // Random drop (connect error) if drop_rate > 0.0 && rng.gen_bool(drop_rate) { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "simulated connect drop", - )); + return Err(std::io::Error::other("simulated connect drop")); } // Otherwise perform real TCP connect @@ -309,6 +306,7 @@ pub struct PooledClient { } impl ConnectionPool { + #[allow(clippy::too_many_arguments)] pub fn new( fact: Arc + Send + Sync>, connect_timeout: Duration, @@ -391,14 +389,11 @@ impl ConnectionPool { && now.duration_since(entry.last_used) > self.max_idle_duration { // metric - match self.aggregate_metrics { - Some(ref metrics) => { - metrics - .retry_counters - .with_label_values(&["connection_swept"]) - .inc(); - } - None => {} + if let Some(ref metrics) = self.aggregate_metrics { + metrics + .retry_counters + .with_label_values(&["connection_swept"]) + .inc(); } ids_to_remove.push(*id); return false; // remove this entry @@ -436,7 +431,7 @@ impl ConnectionPool { pool: Arc::clone(&self), is_ok: true, id, - permit: permit, + permit, }; // re‐insert with updated priority @@ -444,7 +439,7 @@ impl ConnectionPool { if active_consumers < self.max_consumers { inner.pq.push(id, active_consumers as usize); } - return Some(client); + Some(client) } else { // If there is no connection to take, it is because permits for a connection // need to drain. This can happen if a connection is removed because it has @@ -453,7 +448,7 @@ impl ConnectionPool { // // Just forget the permit and retry. permit.forget(); - return None; + None } } @@ -485,14 +480,11 @@ impl ConnectionPool { } } Err(_) => { - match self_clone.aggregate_metrics { - Some(ref metrics) => { - metrics - .retry_counters - .with_label_values(&["sema_acquire_failed"]) - .inc(); - } - None => {} + if let Some(ref metrics) = self_clone.aggregate_metrics { + metrics + .retry_counters + .with_label_values(&["sema_acquire_success"]) + .inc(); } { @@ -504,16 +496,15 @@ impl ConnectionPool { // let mut inner = self_clone.inner.lock().await; inner.waiters += 1; - if inner.waiters > (inner.in_progress * self_clone.max_consumers) { - if (inner.entries.len() + inner.in_progress) + if inner.waiters > (inner.in_progress * self_clone.max_consumers) + && (inner.entries.len() + inner.in_progress) < self_clone.max_total_connections - { - let self_clone_spawn = Arc::clone(&self_clone); - tokio::task::spawn(async move { - self_clone_spawn.create_connection().await; - }); - inner.in_progress += 1; - } + { + let self_clone_spawn = Arc::clone(&self_clone); + tokio::task::spawn(async move { + self_clone_spawn.create_connection().await; + }); + inner.in_progress += 1; } } // Wait for a connection to become available, either because it @@ -541,7 +532,7 @@ impl ConnectionPool { } } - async fn create_connection(&self) -> () { + async fn create_connection(&self) { // Generate a random backoff to add some jitter so that connections // don't all retry at the same time. let mut backoff_delay = Duration::from_millis( @@ -558,17 +549,13 @@ impl ConnectionPool { // until the failure stopped for at least one backoff period. Backoff // period includes some jitter, so that if multiple connections are // failing, they don't all retry at the same time. - loop { - if let Some(delay) = { - let inner = self.inner.lock().await; - inner.last_connect_failure.and_then(|at| { - (at.elapsed() < backoff_delay).then(|| backoff_delay - at.elapsed()) - }) - } { - sleep(delay).await; - } else { - break; // No delay, so we can create a connection - } + while let Some(delay) = { + let inner = self.inner.lock().await; + inner.last_connect_failure.and_then(|at| { + (at.elapsed() < backoff_delay).then(|| backoff_delay - at.elapsed()) + }) + } { + sleep(delay).await; } // @@ -578,14 +565,11 @@ impl ConnectionPool { // on this connection. (Requests made later on this channel will time out // with the same timeout.) // - match self.aggregate_metrics { - Some(ref metrics) => { - metrics - .retry_counters - .with_label_values(&["connection_attempt"]) - .inc(); - } - None => {} + if let Some(ref metrics) = self.aggregate_metrics { + metrics + .retry_counters + .with_label_values(&["connection_attempt"]) + .inc(); } let attempt = self.fact.create(self.connect_timeout).await; @@ -594,14 +578,11 @@ impl ConnectionPool { // Connection succeeded Ok(Ok(channel)) => { { - match self.aggregate_metrics { - Some(ref metrics) => { - metrics - .retry_counters - .with_label_values(&["connection_success"]) - .inc(); - } - None => {} + if let Some(ref metrics) = self.aggregate_metrics { + metrics + .retry_counters + .with_label_values(&["connection_success"]) + .inc(); } let mut inner = self.inner.lock().await; let id = uuid::Uuid::new_v4(); @@ -622,14 +603,11 @@ impl ConnectionPool { } // Connection failed, back off and retry Ok(Err(_)) | Err(_) => { - match self.aggregate_metrics { - Some(ref metrics) => { - metrics - .retry_counters - .with_label_values(&["connect_failed"]) - .inc(); - } - None => {} + if let Some(ref metrics) = self.aggregate_metrics { + metrics + .retry_counters + .with_label_values(&["connect_failed"]) + .inc(); } let mut inner = self.inner.lock().await; inner.last_connect_failure = Some(Instant::now()); @@ -653,10 +631,10 @@ impl ConnectionPool { let mut inner = self.inner.lock().await; if let Some(entry) = inner.entries.get_mut(&id) { entry.last_used = Instant::now(); - if entry.active_consumers <= 0 { + if entry.active_consumers == 0 { panic!("A consumer completed when active_consumers was zero!") } - entry.active_consumers = entry.active_consumers - 1; + entry.active_consumers -= 1; if success { if entry.consecutive_errors < self.error_threshold { entry.consecutive_errors = 0; @@ -664,14 +642,11 @@ impl ConnectionPool { } else { entry.consecutive_errors += 1; if entry.consecutive_errors == self.error_threshold { - match self.aggregate_metrics { - Some(ref metrics) => { - metrics - .retry_counters - .with_label_values(&["connection_dropped"]) - .inc(); - } - None => {} + if let Some(ref metrics) = self.aggregate_metrics { + metrics + .retry_counters + .with_label_values(&["connection_dropped"]) + .inc(); } } } @@ -719,7 +694,7 @@ impl ConnectionPool { impl PooledClient { pub fn channel(&self) -> T { - return self.channel.clone(); + self.channel.clone() } pub async fn finish(mut self, result: Result<(), tonic::Status>) { self.is_ok = result.is_ok(); diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index cde5753fc2..7276a27215 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -47,6 +47,13 @@ pub struct PageserverClientAggregateMetrics { pub request_counters: IntCounterVec, pub retry_counters: IntCounterVec, } + +impl Default for PageserverClientAggregateMetrics { + fn default() -> Self { + Self::new() + } +} + impl PageserverClientAggregateMetrics { pub fn new() -> Self { let request_counters = IntCounterVec::new( @@ -167,11 +174,11 @@ impl PageserverClient { match response { Err(status) => { pooled_client.finish(Err(status.clone())).await; // Pass error to finish - return Err(PageserverClientError::RequestError(status)); + Err(PageserverClientError::RequestError(status)) } Ok(resp) => { pooled_client.finish(Ok(())).await; // Pass success to finish - return Ok(resp.get_ref().exists); + Ok(resp.get_ref().exists) } } } @@ -194,11 +201,11 @@ impl PageserverClient { match response { Err(status) => { pooled_client.finish(Err(status.clone())).await; // Pass error to finish - return Err(PageserverClientError::RequestError(status)); + Err(PageserverClientError::RequestError(status)) } Ok(resp) => { pooled_client.finish(Ok(())).await; // Pass success to finish - return Ok(resp.get_ref().num_blocks); + Ok(resp.get_ref().num_blocks) } } } @@ -233,25 +240,22 @@ impl PageserverClient { )); }; - match self.aggregate_metrics { - Some(ref metrics) => { - metrics - .request_counters - .with_label_values(&["get_page"]) - .inc(); - } - None => {} + if let Some(ref metrics) = self.aggregate_metrics { + metrics + .request_counters + .with_label_values(&["get_page"]) + .inc(); } match response { Err(status) => { pooled_client.finish(Err(status.clone())).await; // Pass error to finish - return Err(PageserverClientError::RequestError(status)); + Err(PageserverClientError::RequestError(status)) } Ok(resp) => { pooled_client.finish(Ok(())).await; // Pass success to finish let response: GetPageResponse = resp.into(); - return Ok(response.page_images.to_vec()); + Ok(response.page_images.to_vec()) } } } @@ -280,11 +284,9 @@ impl PageserverClient { match response { Err(status) => { pooled_client.finish(Err(status.clone())).await; // Pass error to finish - return Err(PageserverClientError::RequestError(status)); - } - Ok(resp) => { - return Ok(resp); + Err(PageserverClientError::RequestError(status)) } + Ok(resp) => Ok(resp), } } @@ -307,11 +309,11 @@ impl PageserverClient { match response { Err(status) => { pooled_client.finish(Err(status.clone())).await; // Pass error to finish - return Err(PageserverClientError::RequestError(status)); + Err(PageserverClientError::RequestError(status)) } Ok(resp) => { pooled_client.finish(Ok(())).await; // Pass success to finish - return Ok(resp.get_ref().num_bytes); + Ok(resp.get_ref().num_bytes) } } } @@ -342,11 +344,11 @@ impl PageserverClient { match response { Err(status) => { pooled_client.finish(Err(status.clone())).await; // Pass error to finish - return Err(PageserverClientError::RequestError(status)); + Err(PageserverClientError::RequestError(status)) } Ok(resp) => { pooled_client.finish(Ok(())).await; // Pass success to finish - return Ok(resp); + Ok(resp) } } } @@ -360,8 +362,7 @@ impl PageserverClient { channels.get(&shard).cloned() }; - let usable_pool: Arc>; - match reused_pool { + let usable_pool = match reused_pool { Some(pool) => { let pooled_client = pool.get_client().await.unwrap(); return pooled_client; @@ -370,14 +371,13 @@ impl PageserverClient { // Create a new pool using client_cache_options // declare new_pool - let new_pool: Arc>; let channel_fact = Arc::new(client_cache::ChannelFactory::new( self.shard_map.get(&shard).unwrap().clone(), self.client_cache_options.max_delay_ms, self.client_cache_options.drop_rate, self.client_cache_options.hang_rate, )); - new_pool = client_cache::ConnectionPool::new( + let new_pool = client_cache::ConnectionPool::new( channel_fact, self.client_cache_options.connect_timeout, self.client_cache_options.connect_backoff, @@ -389,12 +389,11 @@ impl PageserverClient { ); let mut write_pool = self.channels.write().unwrap(); write_pool.insert(shard, new_pool.clone()); - usable_pool = new_pool.clone(); + new_pool.clone() } - } + }; - let pooled_client = usable_pool.get_client().await.unwrap(); - return pooled_client; + usable_pool.get_client().await.unwrap() } } diff --git a/pageserver/client_grpc/src/request_tracker.rs b/pageserver/client_grpc/src/request_tracker.rs index 5f5e767c49..eb5ad2927b 100644 --- a/pageserver/client_grpc/src/request_tracker.rs +++ b/pageserver/client_grpc/src/request_tracker.rs @@ -41,12 +41,15 @@ use client_cache::PooledItemFactory; #[derive(Clone)] pub struct StreamReturner { sender: tokio::sync::mpsc::Sender, + #[allow(clippy::type_complexity)] sender_hashmap: Arc< - tokio::sync::Mutex>, - >>, + tokio::sync::Mutex< + Option< + std::collections::HashMap< + u64, + tokio::sync::mpsc::Sender>, + >, + >, >, >, } @@ -101,9 +104,9 @@ impl PooledItemFactory for StreamFactory { Ok(resp) => { let stream_returner = StreamReturner { sender: sender.clone(), - sender_hashmap: Arc::new(tokio::sync::Mutex::new( - Some(std::collections::HashMap::new()), - )), + sender_hashmap: Arc::new(tokio::sync::Mutex::new(Some( + std::collections::HashMap::new(), + ))), }; let map = Arc::clone(&stream_returner.sender_hashmap); @@ -122,7 +125,8 @@ impl PooledItemFactory for StreamFactory { Ok(Some(response)) => { // look up stream in hash map let mut hashmap = map_clone.lock().await; - let hashmap = hashmap.as_mut().expect("no other task clears the hashmap"); + let hashmap = + hashmap.as_mut().expect("no other task clears the hashmap"); if let Some(sender) = hashmap.get(&response.request_id) { // Send the response to the original request sender if let Err(e) = sender.send(Ok(response.clone())).await { @@ -130,7 +134,10 @@ impl PooledItemFactory for StreamFactory { } hashmap.remove(&response.request_id); } else { - eprintln!("No sender found for request ID: {}", response.request_id); + eprintln!( + "No sender found for request ID: {}", + response.request_id + ); } } } @@ -139,7 +146,9 @@ impl PooledItemFactory for StreamFactory { // Close every sender stream in the hashmap let mut hashmap_opt = map_clone.lock().await; - let hashmap = hashmap_opt.as_mut().expect("no other task clears the hashmap"); + let hashmap = hashmap_opt + .as_mut() + .expect("no other task clears the hashmap"); for sender in hashmap.values() { let error = Status::new(Code::Unknown, "Stream closed"); if let Err(e) = sender.send(Err(error)).await { @@ -175,10 +184,10 @@ impl RequestTracker { RequestTracker { _cur_id: cur_id.clone(), - stream_pool: stream_pool, - unary_pool: unary_pool, - auth_interceptor: auth_interceptor, - shard: shard.clone(), + stream_pool, + unary_pool, + auth_interceptor, + shard, } } @@ -194,7 +203,7 @@ impl RequestTracker { channel, self.auth_interceptor.for_shard(self.shard), ); - let request = proto::CheckRelExistsRequest::from(req.clone()); + let request = proto::CheckRelExistsRequest::from(req); let response = ps_client .check_rel_exists(tonic::Request::new(request)) .await; @@ -226,7 +235,7 @@ impl RequestTracker { self.auth_interceptor.for_shard(self.shard), ); - let request = proto::GetRelSizeRequest::from(req.clone()); + let request = proto::GetRelSizeRequest::from(req); let response = ps_client.get_rel_size(tonic::Request::new(request)).await; match response { @@ -256,7 +265,7 @@ impl RequestTracker { self.auth_interceptor.for_shard(self.shard), ); - let request = proto::GetDbSizeRequest::from(req.clone()); + let request = proto::GetDbSizeRequest::from(req); let response = ps_client.get_db_size(tonic::Request::new(request)).await; match response { @@ -335,8 +344,7 @@ impl RequestTracker { continue; } - let response: Option>; - response = response_receiver.recv().await; + let response = response_receiver.recv().await; match response { Some(resp) => { match resp { @@ -382,6 +390,13 @@ pub struct ShardedRequestTracker { // TODO: Functions in the ShardedRequestTracker should be able to timeout and // cancel a reqeust. The request should return an error if it is cancelled. // + +impl Default for ShardedRequestTracker { + fn default() -> Self { + ShardedRequestTracker::new() + } +} + impl ShardedRequestTracker { pub fn new() -> Self { // @@ -438,8 +453,7 @@ impl ShardedRequestTracker { self.tcp_client_cache_options.drop_rate, self.tcp_client_cache_options.hang_rate, )); - let new_pool: Arc>; - new_pool = ConnectionPool::new( + let new_pool = ConnectionPool::new( Arc::clone(&channel_fact), self.tcp_client_cache_options.connect_timeout, self.tcp_client_cache_options.connect_backoff, @@ -472,8 +486,7 @@ impl ShardedRequestTracker { // Create a client pool for unary requests // - let unary_pool: Arc>; - unary_pool = ConnectionPool::new( + let unary_pool = ConnectionPool::new( Arc::clone(&channel_fact), self.tcp_client_cache_options.connect_timeout, self.tcp_client_cache_options.connect_backoff, @@ -547,6 +560,7 @@ impl ShardedRequestTracker { } } + #[allow(clippy::result_large_err)] fn lookup_tracker_for_shard( &self, shard_index: ShardIndex, diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 719bbef5d9..fbb79a3a08 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -34,7 +34,6 @@ use axum::response::Response; use http::StatusCode; use http::header::CONTENT_TYPE; -use metrics; use metrics::proto::MetricFamily; use metrics::{Encoder, TextEncoder}; diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index a348852f14..3a6755d5d8 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -60,8 +60,8 @@ pub extern "C" fn rcommunicator_backend_init( /// /// Safety: The C caller must ensure that the references are valid. #[unsafe(no_mangle)] -pub extern "C" fn bcomm_start_io_request<'t>( - bs: &'t mut CommunicatorBackendStruct, +pub extern "C" fn bcomm_start_io_request( + bs: &'_ mut CommunicatorBackendStruct, request: &NeonIORequest, immediate_result_ptr: &mut NeonIOResult, ) -> i32 { @@ -81,12 +81,12 @@ pub extern "C" fn bcomm_start_io_request<'t>( // Tell the communicator about it bs.submit_request(request_idx); - return request_idx; + request_idx } #[unsafe(no_mangle)] -pub extern "C" fn bcomm_start_get_page_v_request<'t>( - bs: &'t mut CommunicatorBackendStruct, +pub extern "C" fn bcomm_start_get_page_v_request( + bs: &mut CommunicatorBackendStruct, request: &NeonIORequest, immediate_result_ptr: &mut CCachedGetPageVResult, ) -> i32 { @@ -104,7 +104,7 @@ pub extern "C" fn bcomm_start_get_page_v_request<'t>( &get_pagev_request.reltag(), get_pagev_request.block_number + i as u32, ) { - (*immediate_result_ptr).cache_block_numbers[i as usize] = cache_block; + immediate_result_ptr.cache_block_numbers[i as usize] = cache_block; } else { // not found in cache all_cached = false; @@ -194,6 +194,6 @@ impl<'t> CommunicatorBackendStruct<'t> { self.neon_request_slots[idx as usize].fill_request(request, my_proc_number); - return idx as i32; + idx as i32 } } diff --git a/pgxn/neon/communicator/src/file_cache.rs b/pgxn/neon/communicator/src/file_cache.rs index 05bbe1a57e..1f60c97f2c 100644 --- a/pgxn/neon/communicator/src/file_cache.rs +++ b/pgxn/neon/communicator/src/file_cache.rs @@ -96,8 +96,7 @@ impl FileCache { let dst_ref = unsafe { std::slice::from_raw_parts_mut(dst.stable_mut_ptr(), BLCKSZ) }; - spawn_blocking(move || file.read_exact_at(dst_ref, cache_block as u64 * BLCKSZ as u64)) - .await??; + spawn_blocking(move || file.read_exact_at(dst_ref, cache_block * BLCKSZ as u64)).await??; Ok(()) } @@ -111,8 +110,7 @@ impl FileCache { let src_ref = unsafe { std::slice::from_raw_parts(src.stable_ptr(), BLCKSZ) }; - spawn_blocking(move || file.write_all_at(src_ref, cache_block as u64 * BLCKSZ as u64)) - .await??; + spawn_blocking(move || file.write_all_at(src_ref, cache_block * BLCKSZ as u64)).await??; Ok(()) } @@ -151,7 +149,7 @@ impl metrics::core::Collector for FileCache { let total_free_blocks: i64 = free_list.free_blocks.len() as i64 + (free_list.max_blocks as i64 - free_list.next_free_block as i64); - self.num_free_blocks_gauge.set(total_free_blocks as i64); + self.num_free_blocks_gauge.set(total_free_blocks); } let mut values = Vec::new(); diff --git a/pgxn/neon/communicator/src/init.rs b/pgxn/neon/communicator/src/init.rs index db926a944c..0053016e55 100644 --- a/pgxn/neon/communicator/src/init.rs +++ b/pgxn/neon/communicator/src/init.rs @@ -96,8 +96,8 @@ pub extern "C" fn rcommunicator_shmem_init( let (neon_request_slots, remaining_area) = alloc_array_from_slice::(shmem_area, num_neon_request_slots); - for i in 0..num_neon_request_slots { - neon_request_slots[i].write(NeonIOHandle::default()); + for slot in neon_request_slots.iter_mut() { + slot.write(NeonIOHandle::default()); } // 'neon_request_slots' is initialized now. (MaybeUninit::slice_assume_init_mut() is nightly-only diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 4c65af6bdb..86d6c52df3 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -221,7 +221,7 @@ struct RelKey(RelTag); impl From<&RelTag> for RelKey { fn from(val: &RelTag) -> RelKey { - RelKey(val.clone()) + RelKey(*val) } } @@ -234,7 +234,7 @@ struct BlockKey { impl From<(&RelTag, u32)> for BlockKey { fn from(val: (&RelTag, u32)) -> BlockKey { BlockKey { - rel: val.0.clone(), + rel: *val.0, block_number: val.1, } } @@ -707,7 +707,7 @@ impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> { /// /// This is in a separate function so that it can be shared by /// IntegratedCacheReadAccess::get_rel_size() and IntegratedCacheWriteAccess::get_rel_size() -fn get_rel_size<'t>( +fn get_rel_size( r: &neon_shmem::hash::HashMapAccess, rel: &RelTag, ) -> Option { diff --git a/pgxn/neon/communicator/src/neon_request.rs b/pgxn/neon/communicator/src/neon_request.rs index 519e83ac24..12dc308f9c 100644 --- a/pgxn/neon/communicator/src/neon_request.rs +++ b/pgxn/neon/communicator/src/neon_request.rs @@ -6,6 +6,7 @@ pub const MAX_GETPAGEV_PAGES: usize = 32; use pageserver_page_api as page_api; +#[allow(clippy::large_enum_variant)] #[repr(C)] #[derive(Copy, Clone, Debug)] pub enum NeonIORequest { @@ -98,7 +99,7 @@ unsafe impl uring_common::buf::IoBufMut for ShmemBuf { } unsafe fn set_init(&mut self, pos: usize) { - if pos > crate::BLCKSZ as usize { + if pos > crate::BLCKSZ { panic!( "set_init called past end of buffer, pos {}, buffer size {}", pos, diff --git a/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs b/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs index 845479fcda..f2c738b2be 100644 --- a/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs +++ b/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs @@ -75,8 +75,8 @@ where } MutexHashSetGuard { - key: key, - set: &self, + key, + set: self, mutex: my_mutex, _guard: my_guard, } diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 5d46154eae..0f6e410d9e 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -186,7 +186,7 @@ pub(super) async fn init( impl<'t> CommunicatorWorkerProcessStruct<'t> { /// Main loop of the worker process. Receive requests from the backends and process them. - pub(super) async fn run(self: &'static Self) { + pub(super) async fn run(&'static self) { let mut idxbuf: [u8; 4] = [0; 4]; let mut submission_pipe_read = @@ -241,7 +241,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } } - async fn handle_request<'x>(self: &'static Self, req: &'x NeonIORequest) -> NeonIOResult { + async fn handle_request(&'static self, req: &'_ NeonIORequest) -> NeonIOResult { match req { NeonIORequest::Empty => { error!("unexpected Empty IO request"); @@ -251,9 +251,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { self.request_rel_exists_counter.inc(); let rel = req.reltag(); - let _in_progress_guard = self - .in_progress_table - .lock(RequestInProgressKey::Rel(rel.clone())); + let _in_progress_guard = + self.in_progress_table.lock(RequestInProgressKey::Rel(rel)); let not_modified_since = match self.cache.get_rel_exists(&rel) { CacheResult::Found(exists) => return NeonIOResult::RelExists(exists), @@ -280,9 +279,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { self.request_rel_size_counter.inc(); let rel = req.reltag(); - let _in_progress_guard = self - .in_progress_table - .lock(RequestInProgressKey::Rel(rel.clone())); + let _in_progress_guard = + self.in_progress_table.lock(RequestInProgressKey::Rel(rel)); // Check the cache first let not_modified_since = match self.cache.get_rel_size(&rel) { @@ -296,10 +294,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let read_lsn = self.request_lsns(not_modified_since); match self .request_tracker - .process_get_rel_size_request(page_api::GetRelSizeRequest { - read_lsn, - rel: rel.clone(), - }) + .process_get_rel_size_request(page_api::GetRelSizeRequest { read_lsn, rel }) .await { Ok(nblocks) => { @@ -371,7 +366,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let rel = req.reltag(); let _in_progress_guard = self .in_progress_table - .lock(RequestInProgressKey::Block(rel.clone(), req.block_number)); + .lock(RequestInProgressKey::Block(rel, req.block_number)); self.cache .remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true) .await; @@ -439,7 +434,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // because they're always acquired in the same order. let in_progress_guard = self .in_progress_table - .lock(RequestInProgressKey::Block(rel.clone(), blkno)) + .lock(RequestInProgressKey::Block(rel, blkno)) .await; let dest = req.dest[i as usize]; @@ -476,7 +471,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed), request_class: page_api::GetPageClass::Normal, read_lsn: self.request_lsns(not_modified_since), - rel: rel.clone(), + rel, block_numbers: vec![*blkno], }) .await @@ -485,13 +480,15 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // Write the received page image directly to the shared memory location // that the backend requested. if resp.page_images.len() != 1 { - error!("received unexpected response with {} page images received from pageserver for a request for one page", - resp.page_images.len()); + error!( + "received unexpected response with {} page images received from pageserver for a request for one page", + resp.page_images.len() + ); return Err(-1); } let page_image = resp.page_images[0].clone(); let src: &[u8] = page_image.as_ref(); - let len = std::cmp::min(src.len(), dest.bytes_total() as usize); + let len = std::cmp::min(src.len(), dest.bytes_total()); unsafe { std::ptr::copy_nonoverlapping(src.as_ptr(), dest.as_mut_ptr(), len); }; @@ -510,10 +507,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { Ok(()) } - async fn handle_prefetchv_request( - self: &'static Self, - req: &CPrefetchVRequest, - ) -> Result<(), i32> { + async fn handle_prefetchv_request(&'static self, req: &CPrefetchVRequest) -> Result<(), i32> { let rel = req.reltag(); // Check the cache first @@ -525,7 +519,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // because they're always acquired in the same order. let in_progress_guard = self .in_progress_table - .lock(RequestInProgressKey::Block(rel.clone(), blkno)) + .lock(RequestInProgressKey::Block(rel, blkno)) .await; let not_modified_since = match self.cache.page_is_cached(&rel, blkno).await { @@ -558,7 +552,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed), request_class: page_api::GetPageClass::Prefetch, read_lsn: self.request_lsns(not_modified_since), - rel: rel.clone(), + rel, block_numbers: vec![*blkno], }) .await @@ -569,8 +563,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { *blkno, rel ); if resp.page_images.len() != 1 { - error!("received unexpected response with {} page images received from pageserver for a request for one page", - resp.page_images.len()); + error!( + "received unexpected response with {} page images received from pageserver for a request for one page", + resp.page_images.len() + ); return Err(-1); } let page_image = resp.page_images[0].clone(); diff --git a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs index ff7aa20810..9b0891b5aa 100644 --- a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs +++ b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs @@ -8,7 +8,6 @@ use axum::response::Response; use http::StatusCode; use http::header::CONTENT_TYPE; -use metrics; use metrics::proto::MetricFamily; use metrics::{Encoder, TextEncoder};