Bunch of fixes, smarter iterator, metrics exporter

This commit is contained in:
Heikki Linnakangas
2025-05-06 14:53:36 +03:00
parent 44269fcd5e
commit 977bc09d2a
15 changed files with 401 additions and 59 deletions

View File

@@ -7,11 +7,13 @@ edition = "2024"
crate-type = ["staticlib"]
[dependencies]
axum.workspace = true
bytes.workspace = true
http.workspace = true
libc.workspace = true
nix.workspace = true
atomic_enum = "0.3.0"
prometheus.workspace = true
prost.workspace = true
tonic = { version = "0.12.0", default-features = false, features=["codegen", "prost", "transport"] }
tokio = { version = "1.43.1", features = ["macros", "net", "io-util", "rt", "rt-multi-thread"] }
@@ -22,6 +24,7 @@ tracing-subscriber.workspace = true
zerocopy = "0.8.0"
zerocopy-derive = "0.8.0"
metrics.workspace = true
tokio-epoll-uring.workspace = true
uring-common.workspace = true

View File

@@ -11,11 +11,10 @@
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;
use tokio_epoll_uring;
use std::sync::Mutex;
use crate::BLCKSZ;
pub type CacheBlock = u64;
@@ -25,7 +24,11 @@ pub struct FileCache {
file: Arc<File>,
free_list: Mutex<FreeList>
free_list: Mutex<FreeList>,
// metrics
max_blocks_gauge: metrics::IntGauge,
num_free_blocks_gauge: metrics::IntGauge,
}
// TODO: We keep track of all free blocks in this vec. That doesn't really scale.
@@ -39,9 +42,14 @@ struct FreeList {
impl FileCache {
pub fn new(
file_cache_path: &Path,
initial_size: u64,
mut initial_size: u64,
uring_system: tokio_epoll_uring::SystemHandle,
) -> Result<FileCache, std::io::Error> {
if initial_size < 100 {
tracing::warn!("min size for file cache is 100 blocks, {} requested", initial_size);
initial_size = 100;
}
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
@@ -49,7 +57,16 @@ impl FileCache {
.create(true)
.open(file_cache_path)?;
tracing::info!("Created cache file {file_cache_path:?}");
let max_blocks_gauge = metrics::IntGauge::new(
"file_cache_max_blocks",
"Local File Cache size in 8KiB blocks",
).unwrap();
let num_free_blocks_gauge = metrics::IntGauge::new(
"file_cache_num_free_blocks",
"Number of free 8KiB blocks in Local File Cache",
).unwrap();
tracing::info!("initialized file cache with {} blocks", initial_size);
Ok(FileCache {
file: Arc::new(file),
@@ -59,6 +76,8 @@ impl FileCache {
max_blocks: initial_size,
free_blocks: Vec::new(),
}),
max_blocks_gauge,
num_free_blocks_gauge,
})
}
@@ -112,7 +131,7 @@ impl FileCache {
}
if free_list.next_free_block < free_list.max_blocks {
let result = free_list.next_free_block;
free_list.next_free_block -= 1;
free_list.next_free_block += 1;
return Some(result);
}
None
@@ -132,3 +151,29 @@ fn map_io_uring_error(err: tokio_epoll_uring::Error<std::io::Error>) -> std::io:
}
}
}
impl metrics::core::Collector for FileCache {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
descs.append(&mut self.max_blocks_gauge.desc());
descs.append(&mut self.num_free_blocks_gauge.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
// Update the gauges with fresh values first
{
let free_list = self.free_list.lock().unwrap();
self.max_blocks_gauge.set(free_list.max_blocks as i64);
let total_free_blocks: i64 =
free_list.free_blocks.len() as i64
+ (free_list.max_blocks as i64 - free_list.next_free_block as i64);
self.num_free_blocks_gauge.set(total_free_blocks as i64);
}
let mut values = Vec::new();
values.append(&mut self.max_blocks_gauge.collect());
values.append(&mut self.num_free_blocks_gauge.collect());
values
}
}

View File

@@ -58,7 +58,7 @@ pub struct IntegratedCacheWriteAccess<'t> {
global_lw_lsn: AtomicU64,
file_cache: Option<FileCache>,
pub(crate) file_cache: Option<FileCache>,
// Fields for eviction
clock_hand: std::sync::Mutex<TreeIterator<TreeKey>>,
@@ -223,7 +223,7 @@ impl From<(&RelTag, u32)> for TreeKey {
}
impl neonart::Key for TreeKey {
const KEY_LEN: usize = 4 + 4 + 4 + 1 + 32;
const KEY_LEN: usize = 4 + 4 + 4 + 1 + 4;
fn as_bytes(&self) -> &[u8] {
zerocopy::IntoBytes::as_bytes(self)
@@ -268,6 +268,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
} else {
panic!("unexpected tree entry type for block key");
};
block_entry.referenced.store(true, Ordering::Relaxed);
if let Some(cache_block) = block_entry.cache_block {
self.file_cache
@@ -298,6 +299,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
panic!("unexpected tree entry type for block key");
};
// This is used for prefetch requests. Treat the probe as an 'access', to keep it
// in cache.
block_entry.referenced.store(true, Ordering::Relaxed);
if let Some(_cache_block) = block_entry.cache_block {
Ok(CacheResult::Found(()))
} else {
@@ -373,6 +378,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
} else {
panic!("unexpected tree entry type for block key");
};
block_entry.referenced.store(true, Ordering::Relaxed);
block_entry.lw_lsn = lw_lsn;
if block_entry.cache_block.is_none() {
block_entry.cache_block = reserved_cache_block.take();
@@ -389,6 +395,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
}
});
// If we didn't need to block we reserved, put it back to the free list
if let Some(x) = reserved_cache_block {
file_cache.dealloc_block(x);
}
@@ -422,12 +429,13 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
/// caller to use immediately.
pub fn try_evict_one_cache_block(&self) -> Option<CacheBlock> {
let mut clock_hand = self.clock_hand.lock().unwrap();
for _ in 0..1000 {
for _ in 0..100 {
let r = self.cache_tree.start_read();
match clock_hand.next(&r) {
None => {
// The cache is completely empty. Pretty unexpected that this function
// was called then..
break;
},
Some((_k, TreeEntry::Rel(_))) => {
// ignore rel entries for now.
@@ -512,6 +520,7 @@ impl<'e> BackendCacheReadOp<'e> {
} else {
panic!("unexpected tree entry type for block key");
};
block_entry.referenced.store(true, Ordering::Relaxed);
block_entry.cache_block
} else {

View File

@@ -390,3 +390,21 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
Ok(())
}
}
impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
if let Some(file_cache) = &self.cache.file_cache {
descs.append(&mut file_cache.desc());
}
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut values = Vec::new();
if let Some(file_cache) = &self.cache.file_cache {
values.append(&mut file_cache.collect());
}
values
}
}

View File

@@ -0,0 +1,69 @@
//! Export information about Postgres, the communicator process, file cache etc. as
//! prometheus metrics.
use axum::Router;
use axum::extract::State;
use axum::body::Body;
use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use metrics::proto::MetricFamily;
use metrics::{Encoder, TextEncoder};
use metrics;
use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
impl<'a> CommunicatorWorkerProcessStruct<'a> {
pub(crate) async fn launch_exporter_task(&'static self) {
use axum::routing::get;
let app = Router::new()
.route("/metrics", get(get_metrics))
.with_state(self);
// TODO: make configurable. Or listen on unix domain socket?
let listener = tokio::net::TcpListener::bind("127.0.0.1:9090").await.unwrap();
tokio::spawn(async {
tracing::info!("metrics listener spawned");
axum::serve(listener, app).await.unwrap()
});
}
}
/// Expose Prometheus metrics.
async fn get_metrics(
State(state): State<&CommunicatorWorkerProcessStruct<'static>>
) -> Response {
tracing::warn!("get_metrics called");
use metrics::core::Collector;
let metrics = state.collect();
// When we call TextEncoder::encode() below, it will immediately return an
// error if a metric family has no metrics, so we need to preemptively
// filter out metric families with no metrics.
let metrics = metrics
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect::<Vec<MetricFamily>>();
let encoder = TextEncoder::new();
let mut buffer = vec![];
tracing::warn!("get_metrics done");
if let Err(e) = encoder.encode(&metrics, &mut buffer) {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(CONTENT_TYPE, "application/text")
.body(Body::from(e.to_string()))
.unwrap()
} else {
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}
}

View File

@@ -8,4 +8,5 @@
mod callbacks;
mod logging;
mod main_loop;
mod metrics_exporter;
mod worker_interface;

View File

@@ -65,6 +65,8 @@ pub extern "C" fn communicator_worker_process_launch(
error!("error: {err:?}");
});
runtime.block_on(worker_struct.launch_exporter_task());
// keep the runtime running after we exit this function
Box::leak(Box::new(runtime));
}

View File

@@ -211,6 +211,10 @@ communicator_new_bgworker_main(Datum main_arg)
struct LoggingState *logging;
char errbuf[1000];
int elevel;
uint64 initial_file_cache_size;
/* lfc_size_limit is in MBs */
initial_file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ);
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
@@ -231,7 +235,7 @@ communicator_new_bgworker_main(Datum main_arg)
connstrs,
num_shards,
lfc_path,
lfc_size_limit);
initial_file_cache_size);
cis = NULL;
elog(LOG, "communicator threads started");