diff --git a/Cargo.lock b/Cargo.lock index adf2594dbb..5de1164487 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2596,6 +2596,7 @@ dependencies = [ "serde_json", "serde_with", "signal-hook", + "smallvec", "storage_broker", "strum", "strum_macros", @@ -3972,9 +3973,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" +checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" [[package]] name = "socket2" @@ -5373,6 +5374,7 @@ dependencies = [ "scopeguard", "serde", "serde_json", + "smallvec", "socket2 0.4.9", "syn 1.0.109", "syn 2.0.16", diff --git a/Cargo.toml b/Cargo.toml index 5eab28e2e5..7c916ad61d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,6 +111,7 @@ serde_json = "1" serde_with = "2.0" sha2 = "0.10.2" signal-hook = "0.3" +smallvec = "1.11" socket2 = "0.5" strum = "0.24" strum_macros = "0.24" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index d709fac5cb..bbdd8b1e99 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -51,6 +51,7 @@ serde.workspace = true serde_json = { workspace = true, features = ["raw_value"] } serde_with.workspace = true signal-hook.workspace = true +smallvec = { workspace = true, features = ["write"] } svg_fmt.workspace = true sync_wrapper.workspace = true tokio-tar.workspace = true diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 410e436e3d..0bc88afd40 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -3,7 +3,6 @@ use crate::config::PageServerConf; use crate::page_cache::{self, PAGE_SZ}; -use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockLease, BlockReader}; use crate::virtual_file::VirtualFile; use std::cmp::min; @@ -61,19 +60,8 @@ impl EphemeralFile { pub(crate) fn len(&self) -> u64 { self.len } -} -/// Does the given filename look like an ephemeral file? -pub fn is_ephemeral_file(filename: &str) -> bool { - if let Some(rest) = filename.strip_prefix("ephemeral-") { - rest.parse::().is_ok() - } else { - false - } -} - -impl BlobWriter for EphemeralFile { - fn write_blob(&mut self, srcbuf: &[u8]) -> Result { + pub(crate) async fn write_blob(&mut self, srcbuf: &[u8]) -> Result { struct Writer<'a> { ephemeral_file: &'a mut EphemeralFile, /// The block to which the next [`push_bytes`] will write. @@ -90,7 +78,7 @@ impl BlobWriter for EphemeralFile { }) } #[inline(always)] - fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> { + async fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> { let mut src_remaining = src; while !src_remaining.is_empty() { let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..]; @@ -161,15 +149,15 @@ impl BlobWriter for EphemeralFile { if srcbuf.len() < 0x80 { // short one-byte length header let len_buf = [srcbuf.len() as u8]; - writer.push_bytes(&len_buf)?; + writer.push_bytes(&len_buf).await?; } else { let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32); len_buf[0] |= 0x80; - writer.push_bytes(&len_buf)?; + writer.push_bytes(&len_buf).await?; } // Write the payload - writer.push_bytes(srcbuf)?; + writer.push_bytes(srcbuf).await?; if srcbuf.len() < 0x80 { self.len += 1; @@ -182,6 +170,15 @@ impl BlobWriter for EphemeralFile { } } +/// Does the given filename look like an ephemeral file? +pub fn is_ephemeral_file(filename: &str) -> bool { + if let Some(rest) = filename.strip_prefix("ephemeral-") { + rest.parse::().is_ok() + } else { + false + } +} + impl Drop for EphemeralFile { fn drop(&mut self) { // drop all pages from page cache @@ -251,7 +248,6 @@ impl BlockReader for EphemeralFile { #[cfg(test)] mod tests { use super::*; - use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::BlockCursor; use rand::{thread_rng, RngCore}; use std::fs; @@ -280,12 +276,12 @@ mod tests { let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?; - let pos_foo = file.write_blob(b"foo")?; + let pos_foo = file.write_blob(b"foo").await?; assert_eq!( b"foo", file.block_cursor().read_blob(pos_foo).await?.as_slice() ); - let pos_bar = file.write_blob(b"bar")?; + let pos_bar = file.write_blob(b"bar").await?; assert_eq!( b"foo", file.block_cursor().read_blob(pos_foo).await?.as_slice() @@ -298,13 +294,13 @@ mod tests { let mut blobs = Vec::new(); for i in 0..10000 { let data = Vec::from(format!("blob{}", i).as_bytes()); - let pos = file.write_blob(&data)?; + let pos = file.write_blob(&data).await?; blobs.push((pos, data)); } // also test with a large blobs for i in 0..100 { let data = format!("blob{}", i).as_bytes().repeat(100); - let pos = file.write_blob(&data)?; + let pos = file.write_blob(&data).await?; blobs.push((pos, data)); } @@ -318,7 +314,7 @@ mod tests { let mut large_data = Vec::new(); large_data.resize(20000, 0); thread_rng().fill_bytes(&mut large_data); - let pos_large = file.write_blob(&large_data)?; + let pos_large = file.write_blob(&large_data).await?; let result = file.block_cursor().read_blob(pos_large).await?; assert_eq!(result, large_data); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index c0b6a88acc..35a77a7331 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -7,14 +7,12 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::{Key, Value}; -use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::BlockReader; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState}; use crate::walrecord; use anyhow::{ensure, Result}; use pageserver_api::models::InMemoryLayerInfo; -use std::cell::RefCell; use std::collections::HashMap; use std::sync::OnceLock; use tracing::*; @@ -32,12 +30,6 @@ use tokio::sync::RwLock; use super::{DeltaLayer, DeltaLayerWriter, Layer}; -thread_local! { - /// A buffer for serializing object during [`InMemoryLayer::put_value`]. - /// This buffer is reused for each serialization to avoid additional malloc calls. - static SER_BUFFER: RefCell> = RefCell::new(Vec::new()); -} - pub struct InMemoryLayer { conf: &'static PageServerConf, tenant_id: TenantId, @@ -273,17 +265,17 @@ impl InMemoryLayer { /// Adds the page version to the in-memory tree pub async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> { trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); - let mut inner = self.inner.write().await; + let inner: &mut _ = &mut *self.inner.write().await; self.assert_writable(); let off = { - SER_BUFFER.with(|x| -> Result<_> { - let mut buf = x.borrow_mut(); - buf.clear(); - val.ser_into(&mut (*buf))?; - let off = inner.file.write_blob(&buf)?; - Ok(off) - })? + // Avoid doing allocations for "small" values. + // In the regression test suite, the limit of 256 avoided allocations in 95% of cases: + // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061 + let mut buf = smallvec::SmallVec::<[u8; 256]>::new(); + buf.clear(); + val.ser_into(&mut buf)?; + inner.file.write_blob(&buf).await? }; let vec_map = inner.index.entry(key).or_default(); diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index d79c7a4104..dcac4677a5 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -45,6 +45,7 @@ rustls = { version = "0.20", features = ["dangerous_configuration"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive"] } serde_json = { version = "1", features = ["raw_value"] } +smallvec = { version = "1", default-features = false, features = ["write"] } socket2 = { version = "0.4", default-features = false, features = ["all"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] } tokio-rustls = { version = "0.23" }