run 'cargo fmt'

This commit is contained in:
Heikki Linnakangas
2025-05-06 15:35:56 +03:00
parent 977bc09d2a
commit 6dbbdaae73
16 changed files with 179 additions and 129 deletions

View File

@@ -72,13 +72,13 @@ pub(crate) fn iter_next<'e, V: Value>(
match next_recurse(key, &mut path, root_ref, epoch_pin) {
Ok(Some(v)) => {
assert_eq!(path.len(), key.len());
break Some((path, v))
},
break Some((path, v));
}
Ok(None) => break None,
Err(ConcurrentUpdateError()) => {
// retry
continue;
},
}
}
}
}
@@ -110,15 +110,15 @@ pub(crate) fn update_fn<'e, 'g, K: Key, V: Value, A: ArtAllocator<V>, F>(
Err(ArtError::ConcurrentUpdate) => {
eprintln!("retrying");
continue; // retry
},
}
Err(ArtError::OutOfMemory) => {
panic!("todo: OOM: try to GC, propagate to caller");
},
}
Err(ArtError::GarbageQueueFull) => {
// FIXME: This can happen if someone is holding back the epoch. We should
// wait for the epoch to advance
panic!("todo: GC queue is full");
},
}
}
}
}
@@ -194,7 +194,7 @@ fn next_recurse<'e, V: Value>(
match rnode.find_next_child_or_value_or_restart(min_key_byte)? {
None => {
return Ok(None);
},
}
Some((key_byte, ChildOrValue::Child(child_ref))) => {
let path_len = path.len();
path.push(key_byte);
@@ -207,7 +207,7 @@ fn next_recurse<'e, V: Value>(
}
path.truncate(path_len);
min_key_byte = key_byte + 1;
},
}
Some((key_byte, ChildOrValue::Value(vptr))) => {
path.push(key_byte);
assert_eq!(path.len(), min_key.len());
@@ -215,8 +215,8 @@ fn next_recurse<'e, V: Value>(
// and the lifetime of 'epoch_pin' enforces that the reference is only accessible
// as long as the epoch is pinned.
let v = unsafe { vptr.as_ref().unwrap() };
return Ok(Some(v))
},
return Ok(Some(v));
}
}
}
}
@@ -300,7 +300,6 @@ where
// TODO: Shrink the node
// TODO: If the node becomes empty, unlink it from parent
wnode.delete_value(key[0]);
}
wnode.write_unlock();
@@ -400,12 +399,16 @@ fn insert_split_prefix<'e, K: Key, V: Value, A: ArtAllocator<V>>(
let common_prefix_len = common_prefix(key, old_prefix);
// Allocate a node for the new value.
let new_value_node =
allocate_node_for_value(&key[common_prefix_len + 1..], value, guard.tree_writer.allocator)?;
let new_value_node = allocate_node_for_value(
&key[common_prefix_len + 1..],
value,
guard.tree_writer.allocator,
)?;
// Allocate a new internal node with the common prefix
// FIXME: deallocate 'new_value_node' on OOM
let mut prefix_node = node_ref::new_internal(&key[..common_prefix_len], guard.tree_writer.allocator)?;
let mut prefix_node =
node_ref::new_internal(&key[..common_prefix_len], guard.tree_writer.allocator)?;
// Add the old node and the new nodes to the new internal node
prefix_node.insert_old_child(old_prefix[common_prefix_len], old_node);

View File

@@ -338,7 +338,10 @@ impl<V: Value> NodePtr<V> {
}
}
pub(crate) fn find_next_child_or_value(&self, key_byte: u8) -> Option<(u8, ChildOrValuePtr<V>)> {
pub(crate) fn find_next_child_or_value(
&self,
key_byte: u8,
) -> Option<(u8, ChildOrValuePtr<V>)> {
match self.variant() {
NodeVariant::Internal4(n) => n
.find_next_child(key_byte)
@@ -366,7 +369,7 @@ impl<V: Value> NodePtr<V> {
.map(|(k, v)| (k, ChildOrValuePtr::Value(v))),
}
}
pub(crate) fn truncate_prefix(&mut self, new_prefix_len: usize) {
match self.variant_mut() {
NodeVariantMut::Internal4(n) => n.truncate_prefix(new_prefix_len),
@@ -930,7 +933,10 @@ impl<V: Value> NodeLeaf4<V> {
assert!(self.child_values[i].is_some());
if i < self.num_values as usize - 1 {
self.child_keys[i] = self.child_keys[self.num_values as usize - 1];
self.child_values[i] = std::mem::replace(&mut self.child_values[self.num_values as usize - 1], None);
self.child_values[i] = std::mem::replace(
&mut self.child_values[self.num_values as usize - 1],
None,
);
}
self.num_values -= 1;
return;
@@ -1031,7 +1037,10 @@ impl<V: Value> NodeLeaf16<V> {
assert!(self.child_values[i as usize].is_some());
if i < self.num_values as usize - 1 {
self.child_keys[i] = self.child_keys[self.num_values as usize - 1];
self.child_values[i] = std::mem::replace(&mut self.child_values[self.num_values as usize - 1], None);
self.child_values[i] = std::mem::replace(
&mut self.child_values[self.num_values as usize - 1],
None,
);
}
self.num_values -= 1;
return;
@@ -1125,7 +1134,7 @@ impl<V: Value> NodeLeaf48<V> {
if idx < self.num_values {
// Move all existing values with higher indexes down one position
for i in idx as usize ..self.num_values as usize {
for i in idx as usize..self.num_values as usize {
self.child_values[i] = std::mem::replace(&mut self.child_values[i + 1], None);
}

View File

@@ -104,14 +104,17 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> {
match child_or_value {
None => Ok(None),
Some((k, ChildOrValuePtr::Value(vptr)) )=> Ok(Some((k, ChildOrValue::Value(vptr)))),
Some((k, ChildOrValuePtr::Child(child_ptr))) => Ok(Some((k, ChildOrValue::Child(NodeRef {
ptr: child_ptr,
phantom: self.phantom,
})))),
Some((k, ChildOrValuePtr::Value(vptr))) => Ok(Some((k, ChildOrValue::Value(vptr)))),
Some((k, ChildOrValuePtr::Child(child_ptr))) => Ok(Some((
k,
ChildOrValue::Child(NodeRef {
ptr: child_ptr,
phantom: self.phantom,
}),
))),
}
}
pub(crate) fn upgrade_to_write_lock_or_restart(
self,
) -> Result<WriteLockedNodeRef<'e, V>, ConcurrentUpdateError> {

View File

@@ -44,7 +44,8 @@ pub trait ArtAllocator<V: crate::Value> {
}
pub struct ArtMultiSlabAllocator<'t, V>
where V: crate::Value
where
V: crate::Value,
{
tree_area: spin::Mutex<Option<&'t mut MaybeUninit<Tree<V>>>>,
@@ -140,7 +141,6 @@ impl<'t, V: crate::Value> ArtAllocator<V> for ArtMultiSlabAllocator<'t, V> {
}
}
impl<'t, V: crate::Value> ArtMultiSlabAllocator<'t, V> {
pub fn get_statistics(&self) -> ArtTreeStatistics {
ArtTreeStatistics {

View File

@@ -65,7 +65,7 @@ impl<'t> BlockAllocator<'t> {
pub(crate) fn alloc_block(&self) -> &mut [MaybeUninit<u8>] {
// FIXME: handle OOM
let ptr: *mut MaybeUninit<u8> = self.get_block_ptr(self.alloc_block_internal()).cast();
unsafe { std::slice::from_raw_parts_mut( ptr, BLOCK_SIZE) }
unsafe { std::slice::from_raw_parts_mut(ptr, BLOCK_SIZE) }
}
fn alloc_block_internal(&self) -> u64 {
@@ -156,7 +156,7 @@ impl<'t> BlockAllocator<'t> {
pub(crate) fn get_statistics(&self) -> BlockAllocatorStats {
let mut num_free_blocks = 0;
let mut _prev_lock= None;
let mut _prev_lock = None;
let head_lock = self.freelist_head.lock();
let mut next_blk = *head_lock;
let mut _head_lock = Some(head_lock);

View File

@@ -1,6 +1,8 @@
use std::mem::MaybeUninit;
pub fn alloc_from_slice<T>(area: &mut [MaybeUninit<u8>]) -> (&mut MaybeUninit<T>, &mut [MaybeUninit<u8>]) {
pub fn alloc_from_slice<T>(
area: &mut [MaybeUninit<u8>],
) -> (&mut MaybeUninit<T>, &mut [MaybeUninit<u8>]) {
let layout = std::alloc::Layout::new::<T>();
let area_start = area.as_mut_ptr();
@@ -19,7 +21,10 @@ pub fn alloc_from_slice<T>(area: &mut [MaybeUninit<u8>]) -> (&mut MaybeUninit<T>
(result, remain)
}
pub fn alloc_array_from_slice<T>(area: &mut [MaybeUninit<u8>], len: usize) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<u8>]) {
pub fn alloc_array_from_slice<T>(
area: &mut [MaybeUninit<u8>],
len: usize,
) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<u8>]) {
let layout = std::alloc::Layout::new::<T>();
let area_start = area.as_mut_ptr();
@@ -33,7 +38,7 @@ pub fn alloc_array_from_slice<T>(area: &mut [MaybeUninit<u8>], len: usize) -> (&
let (result_area, remain) = area.split_at_mut(layout.size() * len);
let result_ptr: *mut MaybeUninit<T> = result_area.as_mut_ptr().cast();
let result = unsafe { std::slice::from_raw_parts_mut( result_ptr.as_mut().unwrap(), len) };
let result = unsafe { std::slice::from_raw_parts_mut(result_ptr.as_mut().unwrap(), len) };
(result, remain)
}

View File

@@ -181,11 +181,7 @@ impl<V> GarbageQueue<V> {
GarbageQueue(VecDeque::with_capacity(MAX_GARBAGE))
}
fn remember_obsolete_node(
&mut self,
ptr: NodePtr<V>,
epoch: u64,
) {
fn remember_obsolete_node(&mut self, ptr: NodePtr<V>, epoch: u64) {
self.0.push_front((ptr, epoch));
}
@@ -283,13 +279,14 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator<V>> TreeInitStruct<'t, K, V,
impl<'t, K: Key + Clone, V: Value, A: ArtAllocator<V>> TreeWriteAccess<'t, K, V, A> {
pub fn start_write<'g>(&'t self) -> TreeWriteGuard<'g, K, V, A>
where 't: 'g
where
't: 'g,
{
TreeWriteGuard {
tree_writer: self,
epoch_pin: self.epoch_handle.pin(),
phantom_key: PhantomData,
created_garbage: false
created_garbage: false,
}
}
@@ -344,7 +341,6 @@ where
}
impl<'t, K: Key, V: Value, A: ArtAllocator<V>> TreeWriteGuard<'t, K, V, A> {
/// Get a value
pub fn get(&'t mut self, key: &K) -> Option<&'t V> {
algorithm::search(key, self.tree_writer.tree.root, &self.epoch_pin)
@@ -408,7 +404,8 @@ impl<'t, K: Key, V: Value, A: ArtAllocator<V>> TreeWriteGuard<'t, K, V, A> {
}
pub struct TreeIterator<K>
where K: Key + for<'a> From<&'a [u8]>,
where
K: Key + for<'a> From<&'a [u8]>,
{
done: bool,
pub next_key: Vec<u8>,
@@ -418,7 +415,8 @@ pub struct TreeIterator<K>
}
impl<K> TreeIterator<K>
where K: Key + for<'a> From<&'a [u8]>,
where
K: Key + for<'a> From<&'a [u8]>,
{
pub fn new_wrapping() -> TreeIterator<K> {
let mut next_key = Vec::new();
@@ -440,13 +438,13 @@ impl<K> TreeIterator<K>
};
assert_eq!(result.next_key.len(), K::KEY_LEN);
assert_eq!(result.max_key.as_ref().unwrap().len(), K::KEY_LEN);
result
}
pub fn next<'g, V>(&mut self, read_guard: &'g TreeReadGuard<'g, K, V>) -> Option<(K, &'g V)>
where V: Value
where
V: Value,
{
if self.done {
return None;
@@ -455,7 +453,11 @@ impl<K> TreeIterator<K>
let mut wrapped_around = false;
loop {
assert_eq!(self.next_key.len(), K::KEY_LEN);
if let Some((k , v)) = algorithm::iter_next(&mut self.next_key, read_guard.tree.root, &read_guard.epoch_pin) {
if let Some((k, v)) = algorithm::iter_next(
&mut self.next_key,
read_guard.tree.root,
&read_guard.epoch_pin,
) {
assert_eq!(k.len(), K::KEY_LEN);
assert_eq!(self.next_key.len(), K::KEY_LEN);
@@ -472,7 +474,7 @@ impl<K> TreeIterator<K>
increment_key(self.next_key.as_mut_slice());
let k = k.as_slice().into();
break Some((k, v))
break Some((k, v));
} else {
if self.max_key.is_some() {
self.done = true;
@@ -491,7 +493,7 @@ impl<K> TreeIterator<K>
break None;
}
}
break None
break None;
}
}
}
@@ -508,7 +510,6 @@ fn increment_key(key: &mut [u8]) -> bool {
true
}
// Debugging functions
impl<'t, K: Key, V: Value + Debug> TreeReadGuard<'t, K, V> {
pub fn dump(&mut self) {

View File

@@ -1,16 +1,16 @@
use std::collections::HashSet;
use std::collections::BTreeMap;
use std::collections::HashSet;
use crate::ArtAllocator;
use crate::ArtMultiSlabAllocator;
use crate::TreeInitStruct;
use crate::TreeWriteAccess;
use crate::TreeIterator;
use crate::TreeWriteAccess;
use crate::{Key, Value};
use rand::seq::SliceRandom;
use rand::Rng;
use rand::seq::SliceRandom;
use rand_distr::Zipf;
const TEST_KEY_LEN: usize = 16;
@@ -103,12 +103,14 @@ fn sparse() {
test_inserts(&keys);
}
#[derive(Clone, Copy, Debug)]
struct TestOp(TestKey, Option<usize>);
fn apply_op<A: ArtAllocator<usize>>(op: &TestOp, tree: &TreeWriteAccess<TestKey, usize, A>, shadow: &mut BTreeMap<TestKey, usize>) {
fn apply_op<A: ArtAllocator<usize>>(
op: &TestOp,
tree: &TreeWriteAccess<TestKey, usize, A>,
shadow: &mut BTreeMap<TestKey, usize>,
) {
eprintln!("applying op: {op:?}");
// apply the change to the shadow tree first
@@ -119,14 +121,17 @@ fn apply_op<A: ArtAllocator<usize>>(op: &TestOp, tree: &TreeWriteAccess<TestKey,
};
// apply to Art tree
let w = tree.start_write();
let w = tree.start_write();
w.update_with_fn(&op.0, |existing| {
assert_eq!(existing, shadow_existing.as_ref());
return op.1;
});
}
fn test_iter<A: ArtAllocator<usize>>(tree: &TreeWriteAccess<TestKey, usize, A>, shadow: &BTreeMap<TestKey, usize>) {
fn test_iter<A: ArtAllocator<usize>>(
tree: &TreeWriteAccess<TestKey, usize, A>,
shadow: &BTreeMap<TestKey, usize>,
) {
let mut shadow_iter = shadow.iter();
let mut iter = TreeIterator::new(&(TestKey::MIN..TestKey::MAX));
@@ -136,7 +141,10 @@ fn test_iter<A: ArtAllocator<usize>>(tree: &TreeWriteAccess<TestKey, usize, A>,
let item = iter.next(&r);
if shadow_item != item {
eprintln!("FAIL: iterator returned {:?}, expected {:?}", item, shadow_item);
eprintln!(
"FAIL: iterator returned {:?}, expected {:?}",
item, shadow_item
);
tree.start_read().dump();
eprintln!("SHADOW:");
@@ -144,7 +152,10 @@ fn test_iter<A: ArtAllocator<usize>>(tree: &TreeWriteAccess<TestKey, usize, A>,
while let Some(si) = si.next() {
eprintln!("key: {:?}, val: {}", si.0, si.1);
}
panic!("FAIL: iterator returned {:?}, expected {:?}", item, shadow_item);
panic!(
"FAIL: iterator returned {:?}, expected {:?}",
item, shadow_item
);
}
if item.is_none() {
break;
@@ -169,14 +180,7 @@ fn random_ops() {
for i in 0..100000 {
let key: TestKey = (rng.sample(distribution) as u128).into();
let op = TestOp(
key,
if rng.random_bool(0.75) {
Some(i)
} else {
None
},
);
let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None });
apply_op(&op, &tree_writer, &mut shadow);

View File

@@ -1,5 +1,12 @@
use std::{collections::HashMap, sync::Arc, time::{Duration, Instant}};
use tokio::{sync::{Mutex, Notify, mpsc, watch}, time::sleep};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
sync::{Mutex, Notify, mpsc, watch},
time::sleep,
};
use tonic::transport::{Channel, Endpoint};
use uuid;
@@ -95,7 +102,11 @@ impl ConnectionPool {
while let Some(responder) = request_rx.recv().await {
// TODO: This call should time out and return an error
let (id, channel) = bg_pool.acquire_connection().await;
let client = PooledClient { channel, pool: Arc::clone(&bg_pool), id };
let client = PooledClient {
channel,
pool: Arc::clone(&bg_pool),
id,
};
let _ = responder.send(client).await;
}
});
@@ -109,7 +120,8 @@ impl ConnectionPool {
{
let mut inner = self.inner.lock().await;
// TODO: Use a heap, although the number of connections is small
if let Some((&id, entry)) = inner.entries
if let Some((&id, entry)) = inner
.entries
.iter_mut()
.filter(|(_, e)| e.active_consumers < self.max_consumers)
.filter(|(_, e)| e.consecutive_errors < self.error_threshold)
@@ -122,7 +134,6 @@ impl ConnectionPool {
// possible that a consumer will release a connection while the new one is being created, in
// which case we will use it right away, but the new connection will be created anyway.)
let _ = self.cc_watch_tx.send(true);
}
// Wait for a new connection, or for one of the consumers to release a connection
// TODO: Put this notify in a timeout
@@ -131,7 +142,6 @@ impl ConnectionPool {
}
async fn create_connection(&self) -> () {
// Wait to be signalled to create a connection.
let mut recv = self.cc_watch_tx.subscribe();
if !*self.cc_watch_rx.borrow() {
@@ -172,19 +182,23 @@ impl ConnectionPool {
.expect("invalid endpoint")
.timeout(self.connect_timeout)
.connect(),
).await;
)
.await;
match attempt {
Ok(Ok(channel)) => {
{
let mut inner = self.inner.lock().await;
let id = uuid::Uuid::new_v4();
inner.entries.insert(id, ConnectionEntry {
channel: channel.clone(),
active_consumers: 0,
consecutive_successes: 0,
consecutive_errors: 0,
});
inner.entries.insert(
id,
ConnectionEntry {
channel: channel.clone(),
active_consumers: 0,
consecutive_successes: 0,
consecutive_errors: 0,
},
);
self.notify.notify_one();
let _ = self.cc_watch_tx.send(false);
return;
@@ -194,15 +208,21 @@ impl ConnectionPool {
let mut inner = self.inner.lock().await;
inner.last_connect_failure = Some(Instant::now());
}
}
}
}
}
/// Get a client we can use to send gRPC messages.
pub async fn get_client(&self) -> PooledClient {
let (resp_tx, mut resp_rx) = mpsc::channel(1);
self.request_tx.send(resp_tx).await.expect("ConnectionPool task has shut down");
resp_rx.recv().await.expect("ConnectionPool task has shut down")
self.request_tx
.send(resp_tx)
.await
.expect("ConnectionPool task has shut down");
resp_rx
.recv()
.await
.expect("ConnectionPool task has shut down")
}
/// Return client to the pool, indicating success or error.
@@ -256,4 +276,3 @@ impl PooledClient {
self.pool.return_client(self.id, result.is_ok()).await;
}
}

View File

@@ -4,9 +4,9 @@
//! - Send requests to correct shards
//!
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
use std::sync::Arc;
use bytes::Bytes;
use futures::Stream;
@@ -19,7 +19,6 @@ use pageserver_page_api::proto;
use pageserver_page_api::proto::PageServiceClient;
use utils::shard::ShardIndex;
mod client_cache;
#[derive(Error, Debug)]
@@ -131,7 +130,6 @@ impl PageserverClient {
return Ok(response.page_image);
}
}
}
// TODO: this should use model::GetPageRequest and GetPageResponse
@@ -142,7 +140,6 @@ impl PageserverClient {
tonic::Response<tonic::codec::Streaming<proto::GetPageResponse>>,
PageserverClientError,
> {
// FIXME: calculate the shard number correctly
let shard = ShardIndex::unsharded();
@@ -158,7 +155,6 @@ impl PageserverClient {
// TODO: check for an error and pass it to "finish"
pooled_client.finish(Ok(())).await;
return Ok(client.get_pages(tonic::Request::new(requests)).await?);
}
/// Process a request to get the size of a database.
@@ -216,17 +212,13 @@ impl PageserverClient {
///
/// Get a client from the pool for this shard, also creating the pool if it doesn't exist.
///
async fn get_client(
&self,
shard: ShardIndex,
) -> client_cache::PooledClient {
async fn get_client(&self, shard: ShardIndex) -> client_cache::PooledClient {
let reused_pool: Option<Arc<client_cache::ConnectionPool>> = {
let channels = self.channels.read().unwrap();
channels.get(&shard).cloned()
};
let usable_pool : Arc<client_cache::ConnectionPool>;
let usable_pool: Arc<client_cache::ConnectionPool>;
match reused_pool {
Some(pool) => {
let pooled_client = pool.get_client().await;
@@ -235,7 +227,11 @@ impl PageserverClient {
None => {
let new_pool = client_cache::ConnectionPool::new(
self.shard_map.get(&shard).unwrap(),
5000, 5, Duration::from_millis(200), Duration::from_secs(1));
5000,
5,
Duration::from_millis(200),
Duration::from_secs(1),
);
let mut write_pool = self.channels.write().unwrap();
write_pool.insert(shard, new_pool.clone());
usable_pool = new_pool.clone();
@@ -245,7 +241,6 @@ impl PageserverClient {
let pooled_client = usable_pool.get_client().await;
return pooled_client;
}
}
/// Inject tenant_id, timeline_id and authentication token to all pageserver requests.
@@ -287,8 +282,7 @@ impl tonic::service::Interceptor for AuthInterceptor {
req.metadata_mut()
.insert("neon-tenant-id", self.tenant_id.clone());
if let Some(shard_id) = &self.shard_id {
req.metadata_mut()
.insert("neon-shard-id", shard_id.clone());
req.metadata_mut().insert("neon-shard-id", shard_id.clone());
}
req.metadata_mut()
.insert("neon-timeline-id", self.timeline_id.clone());

View File

@@ -46,7 +46,10 @@ impl FileCache {
uring_system: tokio_epoll_uring::SystemHandle,
) -> Result<FileCache, std::io::Error> {
if initial_size < 100 {
tracing::warn!("min size for file cache is 100 blocks, {} requested", initial_size);
tracing::warn!(
"min size for file cache is 100 blocks, {} requested",
initial_size
);
initial_size = 100;
}
@@ -60,11 +63,13 @@ impl FileCache {
let max_blocks_gauge = metrics::IntGauge::new(
"file_cache_max_blocks",
"Local File Cache size in 8KiB blocks",
).unwrap();
)
.unwrap();
let num_free_blocks_gauge = metrics::IntGauge::new(
"file_cache_num_free_blocks",
"Number of free 8KiB blocks in Local File Cache",
).unwrap();
)
.unwrap();
tracing::info!("initialized file cache with {} blocks", initial_size);
@@ -165,8 +170,7 @@ impl metrics::core::Collector for FileCache {
let free_list = self.free_list.lock().unwrap();
self.max_blocks_gauge.set(free_list.max_blocks as i64);
let total_free_blocks: i64 =
free_list.free_blocks.len() as i64
let total_free_blocks: i64 = free_list.free_blocks.len() as i64
+ (free_list.max_blocks as i64 - free_list.next_free_block as i64);
self.num_free_blocks_gauge.set(total_free_blocks as i64);
}

View File

@@ -85,9 +85,8 @@ pub extern "C" fn rcommunicator_shmem_init(
shmem_area_ptr: *mut MaybeUninit<u8>,
shmem_area_len: u64,
) -> &'static mut CommunicatorInitStruct {
let shmem_area: &'static mut [MaybeUninit<u8>] = unsafe {
std::slice::from_raw_parts_mut(shmem_area_ptr, shmem_area_len as usize)
};
let shmem_area: &'static mut [MaybeUninit<u8>] =
unsafe { std::slice::from_raw_parts_mut(shmem_area_ptr, shmem_area_len as usize) };
// Carve out the request slots from the shmem area and initialize them
let num_neon_request_slots_per_backend = NUM_NEON_REQUEST_SLOTS_PER_BACKEND as usize;
@@ -103,7 +102,9 @@ pub extern "C" fn rcommunicator_shmem_init(
// 'neon_request_slots' is initialized now. (MaybeUninit::slice_assume_init_mut() is nightly-only
// as of this writing.)
let neon_request_slots = unsafe {
std::mem::transmute::<&mut [MaybeUninit<NeonIOHandle>], &mut[NeonIOHandle]>(neon_request_slots)
std::mem::transmute::<&mut [MaybeUninit<NeonIOHandle>], &mut [NeonIOHandle]>(
neon_request_slots,
)
};
// Give the rest of the area to the integrated cache

View File

@@ -78,16 +78,16 @@ impl<'t> IntegratedCacheInitStruct<'t> {
/// Initialize the shared memory segment. This runs once in postmaster. Returns a struct which
/// will be inherited by all processes through fork.
pub fn shmem_init(_max_procs: u32, shmem_area: &'t mut [MaybeUninit<u8>]) -> IntegratedCacheInitStruct<'t> {
pub fn shmem_init(
_max_procs: u32,
shmem_area: &'t mut [MaybeUninit<u8>],
) -> IntegratedCacheInitStruct<'t> {
let allocator = neonart::ArtMultiSlabAllocator::new(shmem_area);
let handle = IntegratedCacheTreeInitStruct::new(allocator);
// Initialize the shared memory area
IntegratedCacheInitStruct {
allocator,
handle,
}
IntegratedCacheInitStruct { allocator, handle }
}
pub fn worker_process_init(
@@ -188,7 +188,7 @@ fn key_range_for_rel_blocks(rel: &RelTag) -> Range<TreeKey> {
fork_number: rel.fork_number,
block_number: 0,
},
end: TreeKey {
end: TreeKey {
spc_oid: rel.spc_oid,
db_oid: rel.db_oid,
rel_number: rel.rel_number,
@@ -436,18 +436,20 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
// The cache is completely empty. Pretty unexpected that this function
// was called then..
break;
},
}
Some((_k, TreeEntry::Rel(_))) => {
// ignore rel entries for now.
// TODO: They stick in the cache forever
},
}
Some((k, TreeEntry::Block(blk_entry))) => {
if !blk_entry.referenced.swap(false, Ordering::Relaxed) {
// Evict this
let w = self.cache_tree.start_write();
let old = w.remove(&k);
if let Some(TreeEntry::Block(old)) = old {
let _ = self.global_lw_lsn.fetch_max(old.lw_lsn.0, Ordering::Relaxed);
let _ = self
.global_lw_lsn
.fetch_max(old.lw_lsn.0, Ordering::Relaxed);
if let Some(cache_block) = old.cache_block {
return Some(cache_block);
}
@@ -455,7 +457,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
assert!(old.is_none());
}
}
},
}
}
}
// Give up if we didn't find anything

View File

@@ -47,7 +47,10 @@ pub(super) async fn init(
let uring_system = tokio_epoll_uring::System::launch().await.unwrap();
let file_cache = if let Some(path) = file_cache_path {
Some(FileCache::new(&path, file_cache_size, uring_system).expect("could not create cache file"))
Some(
FileCache::new(&path, file_cache_size, uring_system)
.expect("could not create cache file"),
)
} else {
// FIXME: temporarily for testing, use LFC even if disabled
Some(
@@ -391,7 +394,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
}
}
impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();

View File

@@ -2,15 +2,15 @@
//! prometheus metrics.
use axum::Router;
use axum::extract::State;
use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use metrics;
use metrics::proto::MetricFamily;
use metrics::{Encoder, TextEncoder};
use metrics;
use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
@@ -19,10 +19,12 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> {
use axum::routing::get;
let app = Router::new()
.route("/metrics", get(get_metrics))
.with_state(self);
.with_state(self);
// TODO: make configurable. Or listen on unix domain socket?
let listener = tokio::net::TcpListener::bind("127.0.0.1:9090").await.unwrap();
let listener = tokio::net::TcpListener::bind("127.0.0.1:9090")
.await
.unwrap();
tokio::spawn(async {
tracing::info!("metrics listener spawned");
@@ -32,9 +34,7 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> {
}
/// Expose Prometheus metrics.
async fn get_metrics(
State(state): State<&CommunicatorWorkerProcessStruct<'static>>
) -> Response {
async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct<'static>>) -> Response {
tracing::warn!("get_metrics called");
use metrics::core::Collector;
@@ -52,7 +52,7 @@ async fn get_metrics(
let mut buffer = vec![];
tracing::warn!("get_metrics done");
if let Err(e) = encoder.encode(&metrics, &mut buffer) {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)

View File

@@ -72,7 +72,10 @@ pub extern "C" fn communicator_worker_process_launch(
}
/// Convert the "shard map" from an array of C strings, indexed by shard no to a rust HashMap
fn parse_shard_map(nshards: u32, shard_map: *mut *mut c_char) -> HashMap<utils::shard::ShardIndex, String> {
fn parse_shard_map(
nshards: u32,
shard_map: *mut *mut c_char,
) -> HashMap<utils::shard::ShardIndex, String> {
use utils::shard::*;
assert!(nshards <= u8::MAX as u32);