mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
# Problem On-demand downloads are still using `tokio::fs`, which we know is inefficient. # Changes - Add `pagebench ondemand-download-churn` to quantify on-demand download throughput - Requires dumping layer map, which required making `history_buffer` impl `Deserialize` - Implement an equivalent of `tokio::io::copy_buf` for owned buffers => `owned_buffers_io` module and children. - Make layer file download sensitive to `io_engine::get()`, using VirtualFile + above copy loop - For this, I had to move some code into the `retry_download`, e.g., `sync_all()` call. Drive-by: - fix missing escaping in `scripts/ps_ec2_setup_instance_store` - if we failed in retry_download to create a file, we'd try to remove it, encounter `NotFound`, and `abort()` the process using `on_fatal_io_error`. This PR adds treats `NotFound` as a success. # Testing Functional - The copy loop is generic & unit tested. Performance - Used the `ondemand-download-churn` benchmark to manually test against real S3. - Results (public Notion page): https://neondatabase.notion.site/Benchmarking-tokio-epoll-uring-on-demand-downloads-2024-04-15-newer-code-03c0fdc475c54492b44d9627b6e4e710?pvs=4 - Performance is equivalent at low concurrency. Jumpier situation at high concurrency, but, still less CPU / throughput with tokio-epoll-uring. - It’s a win. # Future Work Turn the manual performance testing described in the above results document into a performance regression test: https://github.com/neondatabase/neon/issues/7146
197 lines
5.4 KiB
Rust
197 lines
5.4 KiB
Rust
//! A heapless buffer for events of sorts.
|
|
|
|
use std::ops;
|
|
|
|
use heapless::HistoryBuffer;
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct HistoryBufferWithDropCounter<T, const L: usize> {
|
|
buffer: HistoryBuffer<T, L>,
|
|
drop_count: u64,
|
|
}
|
|
|
|
impl<T, const L: usize> HistoryBufferWithDropCounter<T, L> {
|
|
pub fn write(&mut self, data: T) {
|
|
let len_before = self.buffer.len();
|
|
self.buffer.write(data);
|
|
let len_after = self.buffer.len();
|
|
self.drop_count += u64::from(len_before == len_after);
|
|
}
|
|
pub fn drop_count(&self) -> u64 {
|
|
self.drop_count
|
|
}
|
|
pub fn map<U, F: Fn(&T) -> U>(&self, f: F) -> HistoryBufferWithDropCounter<U, L> {
|
|
let mut buffer = HistoryBuffer::new();
|
|
buffer.extend(self.buffer.oldest_ordered().map(f));
|
|
HistoryBufferWithDropCounter::<U, L> {
|
|
buffer,
|
|
drop_count: self.drop_count,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T, const L: usize> Default for HistoryBufferWithDropCounter<T, L> {
|
|
fn default() -> Self {
|
|
Self {
|
|
buffer: HistoryBuffer::default(),
|
|
drop_count: 0,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T, const L: usize> ops::Deref for HistoryBufferWithDropCounter<T, L> {
|
|
type Target = HistoryBuffer<T, L>;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.buffer
|
|
}
|
|
}
|
|
|
|
#[derive(serde::Serialize, serde::Deserialize)]
|
|
struct SerdeRepr<T> {
|
|
buffer: Vec<T>,
|
|
buffer_size: usize,
|
|
drop_count: u64,
|
|
}
|
|
|
|
impl<'a, T, const L: usize> From<&'a HistoryBufferWithDropCounter<T, L>> for SerdeRepr<T>
|
|
where
|
|
T: Clone + serde::Serialize,
|
|
{
|
|
fn from(value: &'a HistoryBufferWithDropCounter<T, L>) -> Self {
|
|
let HistoryBufferWithDropCounter { buffer, drop_count } = value;
|
|
SerdeRepr {
|
|
buffer: buffer.iter().cloned().collect(),
|
|
buffer_size: L,
|
|
drop_count: *drop_count,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T, const L: usize> serde::Serialize for HistoryBufferWithDropCounter<T, L>
|
|
where
|
|
T: Clone + serde::Serialize,
|
|
{
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: serde::Serializer,
|
|
{
|
|
SerdeRepr::from(self).serialize(serializer)
|
|
}
|
|
}
|
|
|
|
impl<'de, T, const L: usize> serde::de::Deserialize<'de> for HistoryBufferWithDropCounter<T, L>
|
|
where
|
|
T: Clone + serde::Deserialize<'de>,
|
|
{
|
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
|
where
|
|
D: serde::Deserializer<'de>,
|
|
{
|
|
let SerdeRepr {
|
|
buffer: des_buffer,
|
|
drop_count,
|
|
buffer_size,
|
|
} = SerdeRepr::<T>::deserialize(deserializer)?;
|
|
if buffer_size != L {
|
|
use serde::de::Error;
|
|
return Err(D::Error::custom(format!(
|
|
"invalid buffer_size, expecting {L} got {buffer_size}"
|
|
)));
|
|
}
|
|
let mut buffer = HistoryBuffer::new();
|
|
buffer.extend(des_buffer);
|
|
Ok(HistoryBufferWithDropCounter { buffer, drop_count })
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use super::HistoryBufferWithDropCounter;
|
|
|
|
#[test]
|
|
fn test_basics() {
|
|
let mut b = HistoryBufferWithDropCounter::<usize, 2>::default();
|
|
b.write(1);
|
|
b.write(2);
|
|
b.write(3);
|
|
assert!(b.iter().any(|e| *e == 2));
|
|
assert!(b.iter().any(|e| *e == 3));
|
|
assert!(!b.iter().any(|e| *e == 1));
|
|
|
|
// round-trip serde
|
|
let round_tripped: HistoryBufferWithDropCounter<usize, 2> =
|
|
serde_json::from_str(&serde_json::to_string(&b).unwrap()).unwrap();
|
|
assert_eq!(
|
|
round_tripped.iter().cloned().collect::<Vec<_>>(),
|
|
b.iter().cloned().collect::<Vec<_>>()
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_drop_count_works() {
|
|
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
|
|
b.write(1);
|
|
assert_eq!(b.drop_count(), 0);
|
|
b.write(2);
|
|
assert_eq!(b.drop_count(), 0);
|
|
b.write(3);
|
|
assert_eq!(b.drop_count(), 1);
|
|
b.write(4);
|
|
assert_eq!(b.drop_count(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn test_clone_works() {
|
|
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
|
|
b.write(1);
|
|
b.write(2);
|
|
b.write(3);
|
|
assert_eq!(b.drop_count(), 1);
|
|
let mut c = b.clone();
|
|
assert_eq!(c.drop_count(), 1);
|
|
assert!(c.iter().any(|e| *e == 2));
|
|
assert!(c.iter().any(|e| *e == 3));
|
|
assert!(!c.iter().any(|e| *e == 1));
|
|
|
|
c.write(4);
|
|
assert!(c.iter().any(|e| *e == 4));
|
|
assert!(!b.iter().any(|e| *e == 4));
|
|
}
|
|
|
|
#[test]
|
|
fn test_map() {
|
|
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
|
|
|
|
b.write(1);
|
|
assert_eq!(b.drop_count(), 0);
|
|
{
|
|
let c = b.map(|i| i + 10);
|
|
assert_eq!(c.oldest_ordered().cloned().collect::<Vec<_>>(), vec![11]);
|
|
assert_eq!(c.drop_count(), 0);
|
|
}
|
|
|
|
b.write(2);
|
|
assert_eq!(b.drop_count(), 0);
|
|
{
|
|
let c = b.map(|i| i + 10);
|
|
assert_eq!(
|
|
c.oldest_ordered().cloned().collect::<Vec<_>>(),
|
|
vec![11, 12]
|
|
);
|
|
assert_eq!(c.drop_count(), 0);
|
|
}
|
|
|
|
b.write(3);
|
|
assert_eq!(b.drop_count(), 1);
|
|
{
|
|
let c = b.map(|i| i + 10);
|
|
assert_eq!(
|
|
c.oldest_ordered().cloned().collect::<Vec<_>>(),
|
|
vec![12, 13]
|
|
);
|
|
assert_eq!(c.drop_count(), 1);
|
|
}
|
|
}
|
|
}
|