From 6dbbdaae73546dc046abaaace8854025394b021d Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 6 May 2025 15:35:56 +0300 Subject: [PATCH] run 'cargo fmt' --- libs/neonart/src/algorithm.rs | 31 ++++++----- libs/neonart/src/algorithm/node_ptr.rs | 19 +++++-- libs/neonart/src/algorithm/node_ref.rs | 15 +++--- libs/neonart/src/allocator.rs | 4 +- libs/neonart/src/allocator/block.rs | 4 +- libs/neonart/src/allocator/static.rs | 11 ++-- libs/neonart/src/lib.rs | 35 ++++++------ libs/neonart/src/tests.rs | 40 +++++++------- pageserver/client_grpc/src/client_cache.rs | 53 +++++++++++++------ pageserver/client_grpc/src/lib.rs | 24 ++++----- pgxn/neon/communicator/src/file_cache.rs | 14 +++-- pgxn/neon/communicator/src/init.rs | 9 ++-- .../neon/communicator/src/integrated_cache.rs | 22 ++++---- .../src/worker_process/main_loop.rs | 6 ++- .../src/worker_process/metrics_exporter.rs | 16 +++--- .../src/worker_process/worker_interface.rs | 5 +- 16 files changed, 179 insertions(+), 129 deletions(-) diff --git a/libs/neonart/src/algorithm.rs b/libs/neonart/src/algorithm.rs index ae05970ca4..573ef87c92 100644 --- a/libs/neonart/src/algorithm.rs +++ b/libs/neonart/src/algorithm.rs @@ -72,13 +72,13 @@ pub(crate) fn iter_next<'e, V: Value>( match next_recurse(key, &mut path, root_ref, epoch_pin) { Ok(Some(v)) => { assert_eq!(path.len(), key.len()); - break Some((path, v)) - }, + break Some((path, v)); + } Ok(None) => break None, Err(ConcurrentUpdateError()) => { // retry continue; - }, + } } } } @@ -110,15 +110,15 @@ pub(crate) fn update_fn<'e, 'g, K: Key, V: Value, A: ArtAllocator, F>( Err(ArtError::ConcurrentUpdate) => { eprintln!("retrying"); continue; // retry - }, + } Err(ArtError::OutOfMemory) => { panic!("todo: OOM: try to GC, propagate to caller"); - }, + } Err(ArtError::GarbageQueueFull) => { // FIXME: This can happen if someone is holding back the epoch. We should // wait for the epoch to advance panic!("todo: GC queue is full"); - }, + } } } } @@ -194,7 +194,7 @@ fn next_recurse<'e, V: Value>( match rnode.find_next_child_or_value_or_restart(min_key_byte)? { None => { return Ok(None); - }, + } Some((key_byte, ChildOrValue::Child(child_ref))) => { let path_len = path.len(); path.push(key_byte); @@ -207,7 +207,7 @@ fn next_recurse<'e, V: Value>( } path.truncate(path_len); min_key_byte = key_byte + 1; - }, + } Some((key_byte, ChildOrValue::Value(vptr))) => { path.push(key_byte); assert_eq!(path.len(), min_key.len()); @@ -215,8 +215,8 @@ fn next_recurse<'e, V: Value>( // and the lifetime of 'epoch_pin' enforces that the reference is only accessible // as long as the epoch is pinned. let v = unsafe { vptr.as_ref().unwrap() }; - return Ok(Some(v)) - }, + return Ok(Some(v)); + } } } } @@ -300,7 +300,6 @@ where // TODO: Shrink the node // TODO: If the node becomes empty, unlink it from parent wnode.delete_value(key[0]); - } wnode.write_unlock(); @@ -400,12 +399,16 @@ fn insert_split_prefix<'e, K: Key, V: Value, A: ArtAllocator>( let common_prefix_len = common_prefix(key, old_prefix); // Allocate a node for the new value. - let new_value_node = - allocate_node_for_value(&key[common_prefix_len + 1..], value, guard.tree_writer.allocator)?; + let new_value_node = allocate_node_for_value( + &key[common_prefix_len + 1..], + value, + guard.tree_writer.allocator, + )?; // Allocate a new internal node with the common prefix // FIXME: deallocate 'new_value_node' on OOM - let mut prefix_node = node_ref::new_internal(&key[..common_prefix_len], guard.tree_writer.allocator)?; + let mut prefix_node = + node_ref::new_internal(&key[..common_prefix_len], guard.tree_writer.allocator)?; // Add the old node and the new nodes to the new internal node prefix_node.insert_old_child(old_prefix[common_prefix_len], old_node); diff --git a/libs/neonart/src/algorithm/node_ptr.rs b/libs/neonart/src/algorithm/node_ptr.rs index be1f36e560..71e2c9f347 100644 --- a/libs/neonart/src/algorithm/node_ptr.rs +++ b/libs/neonart/src/algorithm/node_ptr.rs @@ -338,7 +338,10 @@ impl NodePtr { } } - pub(crate) fn find_next_child_or_value(&self, key_byte: u8) -> Option<(u8, ChildOrValuePtr)> { + pub(crate) fn find_next_child_or_value( + &self, + key_byte: u8, + ) -> Option<(u8, ChildOrValuePtr)> { match self.variant() { NodeVariant::Internal4(n) => n .find_next_child(key_byte) @@ -366,7 +369,7 @@ impl NodePtr { .map(|(k, v)| (k, ChildOrValuePtr::Value(v))), } } - + pub(crate) fn truncate_prefix(&mut self, new_prefix_len: usize) { match self.variant_mut() { NodeVariantMut::Internal4(n) => n.truncate_prefix(new_prefix_len), @@ -930,7 +933,10 @@ impl NodeLeaf4 { assert!(self.child_values[i].is_some()); if i < self.num_values as usize - 1 { self.child_keys[i] = self.child_keys[self.num_values as usize - 1]; - self.child_values[i] = std::mem::replace(&mut self.child_values[self.num_values as usize - 1], None); + self.child_values[i] = std::mem::replace( + &mut self.child_values[self.num_values as usize - 1], + None, + ); } self.num_values -= 1; return; @@ -1031,7 +1037,10 @@ impl NodeLeaf16 { assert!(self.child_values[i as usize].is_some()); if i < self.num_values as usize - 1 { self.child_keys[i] = self.child_keys[self.num_values as usize - 1]; - self.child_values[i] = std::mem::replace(&mut self.child_values[self.num_values as usize - 1], None); + self.child_values[i] = std::mem::replace( + &mut self.child_values[self.num_values as usize - 1], + None, + ); } self.num_values -= 1; return; @@ -1125,7 +1134,7 @@ impl NodeLeaf48 { if idx < self.num_values { // Move all existing values with higher indexes down one position - for i in idx as usize ..self.num_values as usize { + for i in idx as usize..self.num_values as usize { self.child_values[i] = std::mem::replace(&mut self.child_values[i + 1], None); } diff --git a/libs/neonart/src/algorithm/node_ref.rs b/libs/neonart/src/algorithm/node_ref.rs index c896b4b147..12ab0e40db 100644 --- a/libs/neonart/src/algorithm/node_ref.rs +++ b/libs/neonart/src/algorithm/node_ref.rs @@ -104,14 +104,17 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> { match child_or_value { None => Ok(None), - Some((k, ChildOrValuePtr::Value(vptr)) )=> Ok(Some((k, ChildOrValue::Value(vptr)))), - Some((k, ChildOrValuePtr::Child(child_ptr))) => Ok(Some((k, ChildOrValue::Child(NodeRef { - ptr: child_ptr, - phantom: self.phantom, - })))), + Some((k, ChildOrValuePtr::Value(vptr))) => Ok(Some((k, ChildOrValue::Value(vptr)))), + Some((k, ChildOrValuePtr::Child(child_ptr))) => Ok(Some(( + k, + ChildOrValue::Child(NodeRef { + ptr: child_ptr, + phantom: self.phantom, + }), + ))), } } - + pub(crate) fn upgrade_to_write_lock_or_restart( self, ) -> Result, ConcurrentUpdateError> { diff --git a/libs/neonart/src/allocator.rs b/libs/neonart/src/allocator.rs index 008ed34194..860d024269 100644 --- a/libs/neonart/src/allocator.rs +++ b/libs/neonart/src/allocator.rs @@ -44,7 +44,8 @@ pub trait ArtAllocator { } pub struct ArtMultiSlabAllocator<'t, V> - where V: crate::Value +where + V: crate::Value, { tree_area: spin::Mutex>>>, @@ -140,7 +141,6 @@ impl<'t, V: crate::Value> ArtAllocator for ArtMultiSlabAllocator<'t, V> { } } - impl<'t, V: crate::Value> ArtMultiSlabAllocator<'t, V> { pub fn get_statistics(&self) -> ArtTreeStatistics { ArtTreeStatistics { diff --git a/libs/neonart/src/allocator/block.rs b/libs/neonart/src/allocator/block.rs index 4e7e37adef..292a74f148 100644 --- a/libs/neonart/src/allocator/block.rs +++ b/libs/neonart/src/allocator/block.rs @@ -65,7 +65,7 @@ impl<'t> BlockAllocator<'t> { pub(crate) fn alloc_block(&self) -> &mut [MaybeUninit] { // FIXME: handle OOM let ptr: *mut MaybeUninit = self.get_block_ptr(self.alloc_block_internal()).cast(); - unsafe { std::slice::from_raw_parts_mut( ptr, BLOCK_SIZE) } + unsafe { std::slice::from_raw_parts_mut(ptr, BLOCK_SIZE) } } fn alloc_block_internal(&self) -> u64 { @@ -156,7 +156,7 @@ impl<'t> BlockAllocator<'t> { pub(crate) fn get_statistics(&self) -> BlockAllocatorStats { let mut num_free_blocks = 0; - let mut _prev_lock= None; + let mut _prev_lock = None; let head_lock = self.freelist_head.lock(); let mut next_blk = *head_lock; let mut _head_lock = Some(head_lock); diff --git a/libs/neonart/src/allocator/static.rs b/libs/neonart/src/allocator/static.rs index 87b7ab9c4b..ab1683c411 100644 --- a/libs/neonart/src/allocator/static.rs +++ b/libs/neonart/src/allocator/static.rs @@ -1,6 +1,8 @@ use std::mem::MaybeUninit; -pub fn alloc_from_slice(area: &mut [MaybeUninit]) -> (&mut MaybeUninit, &mut [MaybeUninit]) { +pub fn alloc_from_slice( + area: &mut [MaybeUninit], +) -> (&mut MaybeUninit, &mut [MaybeUninit]) { let layout = std::alloc::Layout::new::(); let area_start = area.as_mut_ptr(); @@ -19,7 +21,10 @@ pub fn alloc_from_slice(area: &mut [MaybeUninit]) -> (&mut MaybeUninit (result, remain) } -pub fn alloc_array_from_slice(area: &mut [MaybeUninit], len: usize) -> (&mut [MaybeUninit], &mut [MaybeUninit]) { +pub fn alloc_array_from_slice( + area: &mut [MaybeUninit], + len: usize, +) -> (&mut [MaybeUninit], &mut [MaybeUninit]) { let layout = std::alloc::Layout::new::(); let area_start = area.as_mut_ptr(); @@ -33,7 +38,7 @@ pub fn alloc_array_from_slice(area: &mut [MaybeUninit], len: usize) -> (& let (result_area, remain) = area.split_at_mut(layout.size() * len); let result_ptr: *mut MaybeUninit = result_area.as_mut_ptr().cast(); - let result = unsafe { std::slice::from_raw_parts_mut( result_ptr.as_mut().unwrap(), len) }; + let result = unsafe { std::slice::from_raw_parts_mut(result_ptr.as_mut().unwrap(), len) }; (result, remain) } diff --git a/libs/neonart/src/lib.rs b/libs/neonart/src/lib.rs index 9eafb8c7e6..88641379a1 100644 --- a/libs/neonart/src/lib.rs +++ b/libs/neonart/src/lib.rs @@ -181,11 +181,7 @@ impl GarbageQueue { GarbageQueue(VecDeque::with_capacity(MAX_GARBAGE)) } - fn remember_obsolete_node( - &mut self, - ptr: NodePtr, - epoch: u64, - ) { + fn remember_obsolete_node(&mut self, ptr: NodePtr, epoch: u64) { self.0.push_front((ptr, epoch)); } @@ -283,13 +279,14 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator> TreeInitStruct<'t, K, V, impl<'t, K: Key + Clone, V: Value, A: ArtAllocator> TreeWriteAccess<'t, K, V, A> { pub fn start_write<'g>(&'t self) -> TreeWriteGuard<'g, K, V, A> - where 't: 'g + where + 't: 'g, { TreeWriteGuard { tree_writer: self, epoch_pin: self.epoch_handle.pin(), phantom_key: PhantomData, - created_garbage: false + created_garbage: false, } } @@ -344,7 +341,6 @@ where } impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'t, K, V, A> { - /// Get a value pub fn get(&'t mut self, key: &K) -> Option<&'t V> { algorithm::search(key, self.tree_writer.tree.root, &self.epoch_pin) @@ -408,7 +404,8 @@ impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'t, K, V, A> { } pub struct TreeIterator - where K: Key + for<'a> From<&'a [u8]>, +where + K: Key + for<'a> From<&'a [u8]>, { done: bool, pub next_key: Vec, @@ -418,7 +415,8 @@ pub struct TreeIterator } impl TreeIterator - where K: Key + for<'a> From<&'a [u8]>, +where + K: Key + for<'a> From<&'a [u8]>, { pub fn new_wrapping() -> TreeIterator { let mut next_key = Vec::new(); @@ -440,13 +438,13 @@ impl TreeIterator }; assert_eq!(result.next_key.len(), K::KEY_LEN); assert_eq!(result.max_key.as_ref().unwrap().len(), K::KEY_LEN); - + result } - pub fn next<'g, V>(&mut self, read_guard: &'g TreeReadGuard<'g, K, V>) -> Option<(K, &'g V)> - where V: Value + where + V: Value, { if self.done { return None; @@ -455,7 +453,11 @@ impl TreeIterator let mut wrapped_around = false; loop { assert_eq!(self.next_key.len(), K::KEY_LEN); - if let Some((k , v)) = algorithm::iter_next(&mut self.next_key, read_guard.tree.root, &read_guard.epoch_pin) { + if let Some((k, v)) = algorithm::iter_next( + &mut self.next_key, + read_guard.tree.root, + &read_guard.epoch_pin, + ) { assert_eq!(k.len(), K::KEY_LEN); assert_eq!(self.next_key.len(), K::KEY_LEN); @@ -472,7 +474,7 @@ impl TreeIterator increment_key(self.next_key.as_mut_slice()); let k = k.as_slice().into(); - break Some((k, v)) + break Some((k, v)); } else { if self.max_key.is_some() { self.done = true; @@ -491,7 +493,7 @@ impl TreeIterator break None; } } - break None + break None; } } } @@ -508,7 +510,6 @@ fn increment_key(key: &mut [u8]) -> bool { true } - // Debugging functions impl<'t, K: Key, V: Value + Debug> TreeReadGuard<'t, K, V> { pub fn dump(&mut self) { diff --git a/libs/neonart/src/tests.rs b/libs/neonart/src/tests.rs index 2d33ee53b0..0be971fde3 100644 --- a/libs/neonart/src/tests.rs +++ b/libs/neonart/src/tests.rs @@ -1,16 +1,16 @@ -use std::collections::HashSet; use std::collections::BTreeMap; +use std::collections::HashSet; use crate::ArtAllocator; use crate::ArtMultiSlabAllocator; use crate::TreeInitStruct; -use crate::TreeWriteAccess; use crate::TreeIterator; +use crate::TreeWriteAccess; use crate::{Key, Value}; -use rand::seq::SliceRandom; use rand::Rng; +use rand::seq::SliceRandom; use rand_distr::Zipf; const TEST_KEY_LEN: usize = 16; @@ -103,12 +103,14 @@ fn sparse() { test_inserts(&keys); } - - #[derive(Clone, Copy, Debug)] struct TestOp(TestKey, Option); -fn apply_op>(op: &TestOp, tree: &TreeWriteAccess, shadow: &mut BTreeMap) { +fn apply_op>( + op: &TestOp, + tree: &TreeWriteAccess, + shadow: &mut BTreeMap, +) { eprintln!("applying op: {op:?}"); // apply the change to the shadow tree first @@ -119,14 +121,17 @@ fn apply_op>(op: &TestOp, tree: &TreeWriteAccess>(tree: &TreeWriteAccess, shadow: &BTreeMap) { +fn test_iter>( + tree: &TreeWriteAccess, + shadow: &BTreeMap, +) { let mut shadow_iter = shadow.iter(); let mut iter = TreeIterator::new(&(TestKey::MIN..TestKey::MAX)); @@ -136,7 +141,10 @@ fn test_iter>(tree: &TreeWriteAccess, let item = iter.next(&r); if shadow_item != item { - eprintln!("FAIL: iterator returned {:?}, expected {:?}", item, shadow_item); + eprintln!( + "FAIL: iterator returned {:?}, expected {:?}", + item, shadow_item + ); tree.start_read().dump(); eprintln!("SHADOW:"); @@ -144,7 +152,10 @@ fn test_iter>(tree: &TreeWriteAccess, while let Some(si) = si.next() { eprintln!("key: {:?}, val: {}", si.0, si.1); } - panic!("FAIL: iterator returned {:?}, expected {:?}", item, shadow_item); + panic!( + "FAIL: iterator returned {:?}, expected {:?}", + item, shadow_item + ); } if item.is_none() { break; @@ -169,14 +180,7 @@ fn random_ops() { for i in 0..100000 { let key: TestKey = (rng.sample(distribution) as u128).into(); - let op = TestOp( - key, - if rng.random_bool(0.75) { - Some(i) - } else { - None - }, - ); + let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None }); apply_op(&op, &tree_writer, &mut shadow); diff --git a/pageserver/client_grpc/src/client_cache.rs b/pageserver/client_grpc/src/client_cache.rs index b40886ccd5..a1a4447c6a 100644 --- a/pageserver/client_grpc/src/client_cache.rs +++ b/pageserver/client_grpc/src/client_cache.rs @@ -1,5 +1,12 @@ -use std::{collections::HashMap, sync::Arc, time::{Duration, Instant}}; -use tokio::{sync::{Mutex, Notify, mpsc, watch}, time::sleep}; +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::{ + sync::{Mutex, Notify, mpsc, watch}, + time::sleep, +}; use tonic::transport::{Channel, Endpoint}; use uuid; @@ -95,7 +102,11 @@ impl ConnectionPool { while let Some(responder) = request_rx.recv().await { // TODO: This call should time out and return an error let (id, channel) = bg_pool.acquire_connection().await; - let client = PooledClient { channel, pool: Arc::clone(&bg_pool), id }; + let client = PooledClient { + channel, + pool: Arc::clone(&bg_pool), + id, + }; let _ = responder.send(client).await; } }); @@ -109,7 +120,8 @@ impl ConnectionPool { { let mut inner = self.inner.lock().await; // TODO: Use a heap, although the number of connections is small - if let Some((&id, entry)) = inner.entries + if let Some((&id, entry)) = inner + .entries .iter_mut() .filter(|(_, e)| e.active_consumers < self.max_consumers) .filter(|(_, e)| e.consecutive_errors < self.error_threshold) @@ -122,7 +134,6 @@ impl ConnectionPool { // possible that a consumer will release a connection while the new one is being created, in // which case we will use it right away, but the new connection will be created anyway.) let _ = self.cc_watch_tx.send(true); - } // Wait for a new connection, or for one of the consumers to release a connection // TODO: Put this notify in a timeout @@ -131,7 +142,6 @@ impl ConnectionPool { } async fn create_connection(&self) -> () { - // Wait to be signalled to create a connection. let mut recv = self.cc_watch_tx.subscribe(); if !*self.cc_watch_rx.borrow() { @@ -172,19 +182,23 @@ impl ConnectionPool { .expect("invalid endpoint") .timeout(self.connect_timeout) .connect(), - ).await; + ) + .await; match attempt { Ok(Ok(channel)) => { { let mut inner = self.inner.lock().await; let id = uuid::Uuid::new_v4(); - inner.entries.insert(id, ConnectionEntry { - channel: channel.clone(), - active_consumers: 0, - consecutive_successes: 0, - consecutive_errors: 0, - }); + inner.entries.insert( + id, + ConnectionEntry { + channel: channel.clone(), + active_consumers: 0, + consecutive_successes: 0, + consecutive_errors: 0, + }, + ); self.notify.notify_one(); let _ = self.cc_watch_tx.send(false); return; @@ -194,15 +208,21 @@ impl ConnectionPool { let mut inner = self.inner.lock().await; inner.last_connect_failure = Some(Instant::now()); } - } + } } } /// Get a client we can use to send gRPC messages. pub async fn get_client(&self) -> PooledClient { let (resp_tx, mut resp_rx) = mpsc::channel(1); - self.request_tx.send(resp_tx).await.expect("ConnectionPool task has shut down"); - resp_rx.recv().await.expect("ConnectionPool task has shut down") + self.request_tx + .send(resp_tx) + .await + .expect("ConnectionPool task has shut down"); + resp_rx + .recv() + .await + .expect("ConnectionPool task has shut down") } /// Return client to the pool, indicating success or error. @@ -256,4 +276,3 @@ impl PooledClient { self.pool.return_client(self.id, result.is_ok()).await; } } - diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 3374fcc2dc..48ccf00292 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -4,9 +4,9 @@ //! - Send requests to correct shards //! use std::collections::HashMap; +use std::sync::Arc; use std::sync::RwLock; use std::time::Duration; -use std::sync::Arc; use bytes::Bytes; use futures::Stream; @@ -19,7 +19,6 @@ use pageserver_page_api::proto; use pageserver_page_api::proto::PageServiceClient; use utils::shard::ShardIndex; - mod client_cache; #[derive(Error, Debug)] @@ -131,7 +130,6 @@ impl PageserverClient { return Ok(response.page_image); } } - } // TODO: this should use model::GetPageRequest and GetPageResponse @@ -142,7 +140,6 @@ impl PageserverClient { tonic::Response>, PageserverClientError, > { - // FIXME: calculate the shard number correctly let shard = ShardIndex::unsharded(); @@ -158,7 +155,6 @@ impl PageserverClient { // TODO: check for an error and pass it to "finish" pooled_client.finish(Ok(())).await; return Ok(client.get_pages(tonic::Request::new(requests)).await?); - } /// Process a request to get the size of a database. @@ -216,17 +212,13 @@ impl PageserverClient { /// /// Get a client from the pool for this shard, also creating the pool if it doesn't exist. /// - async fn get_client( - &self, - shard: ShardIndex, - ) -> client_cache::PooledClient { - + async fn get_client(&self, shard: ShardIndex) -> client_cache::PooledClient { let reused_pool: Option> = { let channels = self.channels.read().unwrap(); channels.get(&shard).cloned() }; - let usable_pool : Arc; + let usable_pool: Arc; match reused_pool { Some(pool) => { let pooled_client = pool.get_client().await; @@ -235,7 +227,11 @@ impl PageserverClient { None => { let new_pool = client_cache::ConnectionPool::new( self.shard_map.get(&shard).unwrap(), - 5000, 5, Duration::from_millis(200), Duration::from_secs(1)); + 5000, + 5, + Duration::from_millis(200), + Duration::from_secs(1), + ); let mut write_pool = self.channels.write().unwrap(); write_pool.insert(shard, new_pool.clone()); usable_pool = new_pool.clone(); @@ -245,7 +241,6 @@ impl PageserverClient { let pooled_client = usable_pool.get_client().await; return pooled_client; } - } /// Inject tenant_id, timeline_id and authentication token to all pageserver requests. @@ -287,8 +282,7 @@ impl tonic::service::Interceptor for AuthInterceptor { req.metadata_mut() .insert("neon-tenant-id", self.tenant_id.clone()); if let Some(shard_id) = &self.shard_id { - req.metadata_mut() - .insert("neon-shard-id", shard_id.clone()); + req.metadata_mut().insert("neon-shard-id", shard_id.clone()); } req.metadata_mut() .insert("neon-timeline-id", self.timeline_id.clone()); diff --git a/pgxn/neon/communicator/src/file_cache.rs b/pgxn/neon/communicator/src/file_cache.rs index 2d0ec726b4..cc85cc2f57 100644 --- a/pgxn/neon/communicator/src/file_cache.rs +++ b/pgxn/neon/communicator/src/file_cache.rs @@ -46,7 +46,10 @@ impl FileCache { uring_system: tokio_epoll_uring::SystemHandle, ) -> Result { if initial_size < 100 { - tracing::warn!("min size for file cache is 100 blocks, {} requested", initial_size); + tracing::warn!( + "min size for file cache is 100 blocks, {} requested", + initial_size + ); initial_size = 100; } @@ -60,11 +63,13 @@ impl FileCache { let max_blocks_gauge = metrics::IntGauge::new( "file_cache_max_blocks", "Local File Cache size in 8KiB blocks", - ).unwrap(); + ) + .unwrap(); let num_free_blocks_gauge = metrics::IntGauge::new( "file_cache_num_free_blocks", "Number of free 8KiB blocks in Local File Cache", - ).unwrap(); + ) + .unwrap(); tracing::info!("initialized file cache with {} blocks", initial_size); @@ -165,8 +170,7 @@ impl metrics::core::Collector for FileCache { let free_list = self.free_list.lock().unwrap(); self.max_blocks_gauge.set(free_list.max_blocks as i64); - let total_free_blocks: i64 = - free_list.free_blocks.len() as i64 + let total_free_blocks: i64 = free_list.free_blocks.len() as i64 + (free_list.max_blocks as i64 - free_list.next_free_block as i64); self.num_free_blocks_gauge.set(total_free_blocks as i64); } diff --git a/pgxn/neon/communicator/src/init.rs b/pgxn/neon/communicator/src/init.rs index eb38002edc..6a9b9b0b7d 100644 --- a/pgxn/neon/communicator/src/init.rs +++ b/pgxn/neon/communicator/src/init.rs @@ -85,9 +85,8 @@ pub extern "C" fn rcommunicator_shmem_init( shmem_area_ptr: *mut MaybeUninit, shmem_area_len: u64, ) -> &'static mut CommunicatorInitStruct { - let shmem_area: &'static mut [MaybeUninit] = unsafe { - std::slice::from_raw_parts_mut(shmem_area_ptr, shmem_area_len as usize) - }; + let shmem_area: &'static mut [MaybeUninit] = + unsafe { std::slice::from_raw_parts_mut(shmem_area_ptr, shmem_area_len as usize) }; // Carve out the request slots from the shmem area and initialize them let num_neon_request_slots_per_backend = NUM_NEON_REQUEST_SLOTS_PER_BACKEND as usize; @@ -103,7 +102,9 @@ pub extern "C" fn rcommunicator_shmem_init( // 'neon_request_slots' is initialized now. (MaybeUninit::slice_assume_init_mut() is nightly-only // as of this writing.) let neon_request_slots = unsafe { - std::mem::transmute::<&mut [MaybeUninit], &mut[NeonIOHandle]>(neon_request_slots) + std::mem::transmute::<&mut [MaybeUninit], &mut [NeonIOHandle]>( + neon_request_slots, + ) }; // Give the rest of the area to the integrated cache diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 5b7af67722..8cf0119691 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -78,16 +78,16 @@ impl<'t> IntegratedCacheInitStruct<'t> { /// Initialize the shared memory segment. This runs once in postmaster. Returns a struct which /// will be inherited by all processes through fork. - pub fn shmem_init(_max_procs: u32, shmem_area: &'t mut [MaybeUninit]) -> IntegratedCacheInitStruct<'t> { + pub fn shmem_init( + _max_procs: u32, + shmem_area: &'t mut [MaybeUninit], + ) -> IntegratedCacheInitStruct<'t> { let allocator = neonart::ArtMultiSlabAllocator::new(shmem_area); let handle = IntegratedCacheTreeInitStruct::new(allocator); // Initialize the shared memory area - IntegratedCacheInitStruct { - allocator, - handle, - } + IntegratedCacheInitStruct { allocator, handle } } pub fn worker_process_init( @@ -188,7 +188,7 @@ fn key_range_for_rel_blocks(rel: &RelTag) -> Range { fork_number: rel.fork_number, block_number: 0, }, - end: TreeKey { + end: TreeKey { spc_oid: rel.spc_oid, db_oid: rel.db_oid, rel_number: rel.rel_number, @@ -436,18 +436,20 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // The cache is completely empty. Pretty unexpected that this function // was called then.. break; - }, + } Some((_k, TreeEntry::Rel(_))) => { // ignore rel entries for now. // TODO: They stick in the cache forever - }, + } Some((k, TreeEntry::Block(blk_entry))) => { if !blk_entry.referenced.swap(false, Ordering::Relaxed) { // Evict this let w = self.cache_tree.start_write(); let old = w.remove(&k); if let Some(TreeEntry::Block(old)) = old { - let _ = self.global_lw_lsn.fetch_max(old.lw_lsn.0, Ordering::Relaxed); + let _ = self + .global_lw_lsn + .fetch_max(old.lw_lsn.0, Ordering::Relaxed); if let Some(cache_block) = old.cache_block { return Some(cache_block); } @@ -455,7 +457,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { assert!(old.is_none()); } } - }, + } } } // Give up if we didn't find anything diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 336dcb3c1f..622acc8361 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -47,7 +47,10 @@ pub(super) async fn init( let uring_system = tokio_epoll_uring::System::launch().await.unwrap(); let file_cache = if let Some(path) = file_cache_path { - Some(FileCache::new(&path, file_cache_size, uring_system).expect("could not create cache file")) + Some( + FileCache::new(&path, file_cache_size, uring_system) + .expect("could not create cache file"), + ) } else { // FIXME: temporarily for testing, use LFC even if disabled Some( @@ -391,7 +394,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } } - impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> { fn desc(&self) -> Vec<&metrics::core::Desc> { let mut descs = Vec::new(); diff --git a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs index e9efb92a6d..d6987978d4 100644 --- a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs +++ b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs @@ -2,15 +2,15 @@ //! prometheus metrics. use axum::Router; -use axum::extract::State; use axum::body::Body; +use axum::extract::State; use axum::response::Response; use http::StatusCode; use http::header::CONTENT_TYPE; +use metrics; use metrics::proto::MetricFamily; use metrics::{Encoder, TextEncoder}; -use metrics; use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct; @@ -19,10 +19,12 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> { use axum::routing::get; let app = Router::new() .route("/metrics", get(get_metrics)) - .with_state(self); + .with_state(self); // TODO: make configurable. Or listen on unix domain socket? - let listener = tokio::net::TcpListener::bind("127.0.0.1:9090").await.unwrap(); + let listener = tokio::net::TcpListener::bind("127.0.0.1:9090") + .await + .unwrap(); tokio::spawn(async { tracing::info!("metrics listener spawned"); @@ -32,9 +34,7 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> { } /// Expose Prometheus metrics. -async fn get_metrics( - State(state): State<&CommunicatorWorkerProcessStruct<'static>> -) -> Response { +async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct<'static>>) -> Response { tracing::warn!("get_metrics called"); use metrics::core::Collector; @@ -52,7 +52,7 @@ async fn get_metrics( let mut buffer = vec![]; tracing::warn!("get_metrics done"); - + if let Err(e) = encoder.encode(&metrics, &mut buffer) { Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) diff --git a/pgxn/neon/communicator/src/worker_process/worker_interface.rs b/pgxn/neon/communicator/src/worker_process/worker_interface.rs index 562a40fbf9..74f2711310 100644 --- a/pgxn/neon/communicator/src/worker_process/worker_interface.rs +++ b/pgxn/neon/communicator/src/worker_process/worker_interface.rs @@ -72,7 +72,10 @@ pub extern "C" fn communicator_worker_process_launch( } /// Convert the "shard map" from an array of C strings, indexed by shard no to a rust HashMap -fn parse_shard_map(nshards: u32, shard_map: *mut *mut c_char) -> HashMap { +fn parse_shard_map( + nshards: u32, + shard_map: *mut *mut c_char, +) -> HashMap { use utils::shard::*; assert!(nshards <= u8::MAX as u32);