mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
implement coalescing of multiple reads onto same page
This commit is contained in:
@@ -67,7 +67,8 @@ async fn ingest(
|
||||
let layer =
|
||||
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?;
|
||||
|
||||
let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?;
|
||||
let value = Value::Image(Bytes::from(vec![0u8; put_size]));
|
||||
let data = value.ser()?;
|
||||
let ctx = RequestContext::new(
|
||||
pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler,
|
||||
pageserver::context::DownloadBehavior::Download,
|
||||
@@ -95,7 +96,9 @@ async fn ingest(
|
||||
}
|
||||
}
|
||||
|
||||
layer.put_value(key.to_compact(), lsn, &data, &ctx).await?;
|
||||
layer
|
||||
.put_value(key.to_compact(), lsn, &data, value.will_init(), &ctx)
|
||||
.await?;
|
||||
}
|
||||
layer.freeze(lsn + 1).await;
|
||||
|
||||
|
||||
@@ -84,6 +84,7 @@ impl EphemeralFile {
|
||||
pub(crate) async fn write_blob(
|
||||
&mut self,
|
||||
buf: &[u8],
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<InMemoryLayerIndexValue, io::Error> {
|
||||
let pos = self.rw.bytes_written();
|
||||
@@ -105,7 +106,11 @@ impl EphemeralFile {
|
||||
|
||||
self.rw.write_all_borrowed(buf, ctx).await?;
|
||||
|
||||
Ok(InMemoryLayerIndexValue { pos, len })
|
||||
Ok(InMemoryLayerIndexValue {
|
||||
pos,
|
||||
len,
|
||||
will_init,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -21,8 +21,8 @@ use pageserver_api::key::CompactKey;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::{Arc, Mutex, OnceLock};
|
||||
use std::time::Instant;
|
||||
use tracing::*;
|
||||
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
|
||||
@@ -93,6 +93,7 @@ pub struct InMemoryLayerInner {
|
||||
pub(crate) struct InMemoryLayerIndexValue {
|
||||
pub(crate) pos: u32,
|
||||
pub(crate) len: u32,
|
||||
pub(crate) will_init: bool, // XXX this blows up the size, can we shrink down `len`?
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for InMemoryLayerInner {
|
||||
@@ -282,6 +283,14 @@ impl InMemoryLayer {
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
// Determine ValueReads
|
||||
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
|
||||
struct ValueRead {
|
||||
entry_lsn: Lsn,
|
||||
pos: u32,
|
||||
len: u32,
|
||||
value_buf: Mutex<Result<Vec<u8>, Arc<std::io::Error>>>,
|
||||
}
|
||||
for range in keyspace.ranges.iter() {
|
||||
for (key, vec_map) in inner
|
||||
.index
|
||||
@@ -295,62 +304,126 @@ impl InMemoryLayer {
|
||||
|
||||
let slice = vec_map.slice_range(lsn_range);
|
||||
|
||||
'foreach_value: for (entry_lsn, value) in slice.iter().rev() {
|
||||
let InMemoryLayerIndexValue { pos, len } = value;
|
||||
|
||||
// TODO: coalesce multiple reads that hit the same page into one page read
|
||||
// Yuchen is working on a VectoredReadPlanner change to support this.
|
||||
// In the meantime, we prepare the way for direct IO by doing full page reads.
|
||||
let len = usize::try_from(*len).unwrap();
|
||||
let mut value_buf = Vec::with_capacity(len);
|
||||
let mut page_buf_storage = Some(PageBuf::from(Box::new([0u8; PAGE_SZ])));
|
||||
let mut page_no = *pos / (PAGE_SZ as u32);
|
||||
let mut offset_in_page = usize::try_from(*pos % (PAGE_SZ as u32)).unwrap();
|
||||
while value_buf.len() < len {
|
||||
let read_result = match inner
|
||||
.file
|
||||
.read_page(
|
||||
page_no,
|
||||
page_buf_storage
|
||||
.take()
|
||||
.expect("we put it back each iteration"),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(page) => page,
|
||||
Err(e) => {
|
||||
reconstruct_state
|
||||
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
|
||||
break 'foreach_value;
|
||||
}
|
||||
};
|
||||
{
|
||||
let page_contents = read_result.contents();
|
||||
let remaining_in_page = std::cmp::min(
|
||||
len - value_buf.len(),
|
||||
page_contents.len() - offset_in_page,
|
||||
);
|
||||
value_buf.extend_from_slice(
|
||||
&page_contents[offset_in_page..offset_in_page + remaining_in_page],
|
||||
);
|
||||
}
|
||||
offset_in_page = 0;
|
||||
page_no += 1;
|
||||
page_buf_storage = Some(read_result.into_page_buf());
|
||||
for (entry_lsn, index_value) in slice.iter().rev() {
|
||||
reads.entry(key).or_default().push(ValueRead {
|
||||
entry_lsn: *entry_lsn,
|
||||
pos: index_value.pos,
|
||||
len: index_value.len,
|
||||
value_buf: Mutex::new(Ok(Vec::with_capacity(index_value.len as usize))),
|
||||
});
|
||||
if index_value.will_init {
|
||||
break;
|
||||
}
|
||||
assert!(value_buf.len() == len);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let value = Value::des(&value_buf);
|
||||
if let Err(e) = value {
|
||||
// Plan which parts of which pages need to be appended to which value_buf
|
||||
struct PageReadDestination<'a> {
|
||||
value_read: &'a ValueRead,
|
||||
offset_in_page: u32,
|
||||
len: u32,
|
||||
}
|
||||
// use of BTreeMap's sorted iterator is critical to esnure value_buf is filled in order
|
||||
let mut page_reads: BTreeMap<u32, Vec<PageReadDestination>> = BTreeMap::new();
|
||||
for value_read in reads.iter().flat_map(|(_, v)| v.iter()) {
|
||||
let ValueRead { pos, len, .. } = value_read;
|
||||
let mut remaining = usize::try_from(*len).unwrap();
|
||||
let mut page_no = *pos / (PAGE_SZ as u32);
|
||||
let mut offset_in_page = usize::try_from(*pos % (PAGE_SZ as u32)).unwrap();
|
||||
while remaining > 0 {
|
||||
let remaining_in_page = std::cmp::min(remaining, PAGE_SZ - offset_in_page);
|
||||
page_reads
|
||||
.entry(page_no)
|
||||
.or_default()
|
||||
.push(PageReadDestination {
|
||||
value_read,
|
||||
offset_in_page: offset_in_page as u32,
|
||||
len: remaining_in_page as u32,
|
||||
});
|
||||
offset_in_page = 0;
|
||||
page_no += 1;
|
||||
remaining -= remaining_in_page;
|
||||
}
|
||||
}
|
||||
|
||||
// Execute reads and fill the destination
|
||||
// TODO: prefetch
|
||||
let mut page_buf = PageBuf::from(Box::new([0u8; PAGE_SZ]));
|
||||
for (page_no, dsts) in page_reads.into_iter() {
|
||||
let all_done = dsts.iter().all(|PageReadDestination { value_read, .. }| {
|
||||
let value_buf = value_read.value_buf.lock().unwrap();
|
||||
let Ok(buf) = &*value_buf else {
|
||||
return true; // on Err() there's no need to read more
|
||||
};
|
||||
buf.len() == value_read.len as usize
|
||||
});
|
||||
if all_done {
|
||||
continue;
|
||||
}
|
||||
let read_result = match inner.file.read_page(page_no, page_buf, &ctx).await {
|
||||
Ok(read_result) => read_result,
|
||||
Err(e) => {
|
||||
let e = Arc::new(e);
|
||||
for PageReadDestination { value_read, .. } in dsts {
|
||||
*value_read.value_buf.lock().unwrap() = Err(Arc::clone(&e));
|
||||
// this will make later reads short-circuit, see top of loop body
|
||||
}
|
||||
page_buf = PageBuf::from(Box::new([0u8; PAGE_SZ])); // TODO: change read_page API to return the buffer
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let page_contents = read_result.contents();
|
||||
for PageReadDestination {
|
||||
value_read,
|
||||
offset_in_page,
|
||||
len,
|
||||
} in dsts
|
||||
{
|
||||
if let Ok(buf) = &mut *value_read.value_buf.lock().unwrap() {
|
||||
buf.extend_from_slice(
|
||||
&page_contents[offset_in_page as usize..(offset_in_page + len) as usize],
|
||||
);
|
||||
}
|
||||
}
|
||||
page_buf = read_result.into_page_buf();
|
||||
}
|
||||
drop(page_buf);
|
||||
|
||||
// Process results into the reconstruct state
|
||||
'next_key: for (key, value_reads) in reads {
|
||||
for ValueRead {
|
||||
entry_lsn,
|
||||
value_buf,
|
||||
len,
|
||||
..
|
||||
} in value_reads
|
||||
{
|
||||
let value_buf = value_buf.into_inner().unwrap();
|
||||
match value_buf {
|
||||
Err(e) => {
|
||||
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
|
||||
break;
|
||||
continue 'next_key;
|
||||
}
|
||||
Ok(value_buf) => {
|
||||
assert_eq!(
|
||||
value_buf.len(),
|
||||
len as usize,
|
||||
"bug in this function's planning logic"
|
||||
);
|
||||
let value = Value::des(&value_buf);
|
||||
if let Err(e) = value {
|
||||
reconstruct_state
|
||||
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
|
||||
continue 'next_key;
|
||||
}
|
||||
|
||||
let key_situation =
|
||||
reconstruct_state.update_key(&key, *entry_lsn, value.unwrap());
|
||||
if key_situation == ValueReconstructSituation::Complete {
|
||||
break;
|
||||
let key_situation =
|
||||
reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
|
||||
if key_situation == ValueReconstructSituation::Complete {
|
||||
// TODO: metric to see if we fetched more values than necessary
|
||||
continue 'next_key;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -431,11 +504,13 @@ impl InMemoryLayer {
|
||||
key: CompactKey,
|
||||
lsn: Lsn,
|
||||
buf: &[u8],
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
|
||||
self.put_value_locked(&mut inner, key, lsn, buf, will_init, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn put_value_locked(
|
||||
@@ -444,6 +519,7 @@ impl InMemoryLayer {
|
||||
key: CompactKey,
|
||||
lsn: Lsn,
|
||||
buf: &[u8],
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
|
||||
@@ -452,6 +528,7 @@ impl InMemoryLayer {
|
||||
.file
|
||||
.write_blob(
|
||||
buf,
|
||||
will_init,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build(),
|
||||
@@ -593,15 +670,18 @@ impl InMemoryLayer {
|
||||
for (key, vec_map) in inner.index.iter() {
|
||||
// Write all page versions
|
||||
for (lsn, entry) in vec_map.as_slice() {
|
||||
let InMemoryLayerIndexValue { pos, len } = entry;
|
||||
let InMemoryLayerIndexValue {
|
||||
pos,
|
||||
len,
|
||||
will_init,
|
||||
} = entry;
|
||||
let buf = file_contents.slice(*pos as usize..(*pos + *len) as usize);
|
||||
let will_init = Value::des(&buf)?.will_init();
|
||||
let (_buf, res) = delta_layer_writer
|
||||
.put_value_bytes(
|
||||
Key::from_compact(*key),
|
||||
*lsn,
|
||||
buf.slice_len(),
|
||||
will_init,
|
||||
*will_init,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -5577,7 +5577,9 @@ impl<'a> TimelineWriter<'a> {
|
||||
|
||||
let action = self.get_open_layer_action(lsn, buf_size);
|
||||
let layer = self.handle_open_layer_action(lsn, action, ctx).await?;
|
||||
let res = layer.put_value(key.to_compact(), lsn, &buf, ctx).await;
|
||||
let res = layer
|
||||
.put_value(key.to_compact(), lsn, &buf, value.will_init(), ctx)
|
||||
.await;
|
||||
|
||||
if res.is_ok() {
|
||||
// Update the current size only when the entire write was ok.
|
||||
|
||||
Reference in New Issue
Block a user