mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
Rework issuing of IOs on read path
This commit is contained in:
@@ -8,6 +8,8 @@ mod layer_desc;
|
||||
mod layer_name;
|
||||
pub mod merge_iterator;
|
||||
|
||||
use tokio::sync::{self};
|
||||
use utils::bin_ser::BeSer;
|
||||
pub mod split_writer;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
@@ -16,7 +18,7 @@ use crate::walrecord::NeonWalRecord;
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use std::cmp::{Ordering, Reverse};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::ops::Range;
|
||||
@@ -79,10 +81,16 @@ pub(crate) enum ValueReconstructSituation {
|
||||
}
|
||||
|
||||
/// Reconstruct data accumulated for a single key during a vectored get
|
||||
#[derive(Debug, Default, Clone)]
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct VectoredValueReconstructState {
|
||||
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pub(crate) img: Option<(Lsn, Bytes)>,
|
||||
pub(crate) records: Vec<(
|
||||
Lsn,
|
||||
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
)>,
|
||||
pub(crate) img: Option<(
|
||||
Lsn,
|
||||
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
)>,
|
||||
|
||||
situation: ValueReconstructSituation,
|
||||
}
|
||||
@@ -93,16 +101,55 @@ impl VectoredValueReconstructState {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<VectoredValueReconstructState> for ValueReconstructState {
|
||||
fn from(mut state: VectoredValueReconstructState) -> Self {
|
||||
// walredo expects the records to be descending in terms of Lsn
|
||||
state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
|
||||
pub(crate) async fn convert(
|
||||
from: VectoredValueReconstructState,
|
||||
) -> Result<ValueReconstructState, PageReconstructError> {
|
||||
let mut to = ValueReconstructState::default();
|
||||
|
||||
ValueReconstructState {
|
||||
records: state.records,
|
||||
img: state.img,
|
||||
for (lsn, fut) in from.records {
|
||||
match fut.await {
|
||||
Ok(res) => match res {
|
||||
Ok(bytes) => {
|
||||
let value = Value::des(&bytes)
|
||||
.map_err(|err| PageReconstructError::Other(err.into()))?;
|
||||
|
||||
match value {
|
||||
Value::Image(img) => {
|
||||
to.img = Some((lsn, img));
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
to.records.push((lsn, rec));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(PageReconstructError::Other(err.into()));
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
return Err(PageReconstructError::Other(err.into()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if to.img.is_none() {
|
||||
let (lsn, fut) = from.img.expect("Need an image");
|
||||
match fut.await {
|
||||
Ok(res) => match res {
|
||||
Ok(bytes) => {
|
||||
to.img = Some((lsn, bytes));
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(PageReconstructError::Other(err.into()));
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
return Err(PageReconstructError::Other(err.into()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(to)
|
||||
}
|
||||
|
||||
/// Bag of data accumulated during a vectored get..
|
||||
@@ -200,7 +247,8 @@ impl ValuesReconstructState {
|
||||
&mut self,
|
||||
key: &Key,
|
||||
lsn: Lsn,
|
||||
value: Value,
|
||||
completes: bool,
|
||||
value: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
) -> ValueReconstructSituation {
|
||||
let state = self
|
||||
.keys
|
||||
@@ -208,31 +256,14 @@ impl ValuesReconstructState {
|
||||
.or_insert(Ok(VectoredValueReconstructState::default()));
|
||||
|
||||
if let Ok(state) = state {
|
||||
let key_done = match state.situation {
|
||||
match state.situation {
|
||||
ValueReconstructSituation::Complete => unreachable!(),
|
||||
ValueReconstructSituation::Continue => match value {
|
||||
Value::Image(img) => {
|
||||
state.img = Some((lsn, img));
|
||||
true
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
debug_assert!(
|
||||
Some(lsn) > state.get_cached_lsn(),
|
||||
"Attempt to collect a record below cached LSN for walredo: {} < {}",
|
||||
lsn,
|
||||
state
|
||||
.get_cached_lsn()
|
||||
.expect("Assertion can only fire if a cached lsn is present")
|
||||
);
|
||||
ValueReconstructSituation::Continue => {
|
||||
state.records.push((lsn, value));
|
||||
}
|
||||
}
|
||||
|
||||
let will_init = rec.will_init();
|
||||
state.records.push((lsn, rec));
|
||||
will_init
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
if key_done && state.situation == ValueReconstructSituation::Continue {
|
||||
if completes && state.situation == ValueReconstructSituation::Continue {
|
||||
state.situation = ValueReconstructSituation::Complete;
|
||||
self.keys_done.add_key(*key);
|
||||
}
|
||||
|
||||
@@ -42,13 +42,12 @@ use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadCoalesceMode, VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
@@ -58,14 +57,14 @@ use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::sync::{self, OnceCell};
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tracing::*;
|
||||
|
||||
@@ -224,7 +223,7 @@ pub struct DeltaLayerInner {
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
|
||||
file: VirtualFile,
|
||||
file: Arc<VirtualFile>,
|
||||
file_id: FileId,
|
||||
|
||||
layer_key_range: Range<Key>,
|
||||
@@ -788,9 +787,11 @@ impl DeltaLayerInner {
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?;
|
||||
let file = Arc::new(
|
||||
VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?,
|
||||
);
|
||||
|
||||
let file_id = page_cache::next_file_id();
|
||||
|
||||
@@ -980,77 +981,58 @@ impl DeltaLayerInner {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
||||
let mut ignore_key_with_err = None;
|
||||
|
||||
let max_vectored_read_bytes = self
|
||||
.max_vectored_read_bytes
|
||||
.expect("Layer is loaded with max vectored bytes config")
|
||||
.0
|
||||
.into();
|
||||
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
|
||||
let mut buf = Some(BytesMut::with_capacity(buf_size));
|
||||
|
||||
// Note that reads are processed in reverse order (from highest key+lsn).
|
||||
// This is the order that `ReconstructState` requires such that it can
|
||||
// track when a key is done.
|
||||
for read in reads.into_iter().rev() {
|
||||
let res = vectored_blob_reader
|
||||
.read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
|
||||
.await;
|
||||
|
||||
let blobs_buf = match res {
|
||||
Ok(blobs_buf) => blobs_buf,
|
||||
Err(err) => {
|
||||
let kind = err.kind();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
reconstruct_state.on_key_error(
|
||||
blob_meta.key,
|
||||
PageReconstructError::Other(anyhow!(
|
||||
"Failed to read blobs from virtual file {}: {}",
|
||||
self.file.path,
|
||||
kind
|
||||
)),
|
||||
);
|
||||
}
|
||||
|
||||
// We have "lost" the buffer since the lower level IO api
|
||||
// doesn't return the buffer on error. Allocate a new one.
|
||||
buf = Some(BytesMut::with_capacity(buf_size));
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for meta in blobs_buf.blobs.iter().rev() {
|
||||
if Some(meta.meta.key) == ignore_key_with_err {
|
||||
continue;
|
||||
}
|
||||
|
||||
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
|
||||
let value = match value {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
reconstruct_state.on_key_error(
|
||||
meta.meta.key,
|
||||
PageReconstructError::Other(anyhow!(e).context(format!(
|
||||
"Failed to deserialize blob from virtual file {}",
|
||||
self.file.path,
|
||||
))),
|
||||
);
|
||||
|
||||
ignore_key_with_err = Some(meta.meta.key);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
|
||||
// state, no further updates shall be made to it. The call below will
|
||||
// panic if the invariant is violated.
|
||||
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
|
||||
let mut senders: HashMap<
|
||||
(Key, Lsn),
|
||||
sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
|
||||
> = Default::default();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
let (tx, rx) = sync::oneshot::channel();
|
||||
senders.insert((blob_meta.key, blob_meta.lsn), tx);
|
||||
reconstruct_state.update_key(
|
||||
&blob_meta.key,
|
||||
blob_meta.lsn,
|
||||
blob_meta.will_init,
|
||||
rx,
|
||||
);
|
||||
}
|
||||
|
||||
buf = Some(blobs_buf.buf);
|
||||
let read_from = self.file.clone();
|
||||
let read_ctx = ctx.attached_child();
|
||||
tokio::task::spawn(async move {
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&read_from);
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
|
||||
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
for meta in blobs_buf.blobs.iter().rev() {
|
||||
let buf = &blobs_buf.buf[meta.start..meta.end];
|
||||
let sender = senders
|
||||
.remove(&(meta.meta.key, meta.meta.lsn))
|
||||
.expect("sender must exist");
|
||||
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
|
||||
}
|
||||
|
||||
assert!(senders.is_empty());
|
||||
}
|
||||
Err(err) => {
|
||||
for (_, sender) in senders {
|
||||
let _ = sender.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1190,7 +1172,14 @@ impl DeltaLayerInner {
|
||||
let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
|
||||
let end_offset = offset;
|
||||
|
||||
Some((BlobMeta { key, lsn }, start_offset..end_offset))
|
||||
Some((
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init: false,
|
||||
},
|
||||
start_offset..end_offset,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
//!
|
||||
//! Every image layer file consists of three parts: "summary",
|
||||
//! "index", and "values". The summary is a fixed size header at the
|
||||
//! beginning of the file, and it contains basic information about the
|
||||
//! beginningof the file, and it contains basic information about the
|
||||
//! layer, and offsets to the other parts. The "index" is a B-tree,
|
||||
//! mapping from Key to an offset in the "values" part. The
|
||||
//! actual page images are stored in the "values" part.
|
||||
@@ -38,11 +38,11 @@ use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
@@ -52,13 +52,14 @@ use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::*;
|
||||
@@ -163,7 +164,7 @@ pub struct ImageLayerInner {
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
|
||||
file: VirtualFile,
|
||||
file: Arc<VirtualFile>,
|
||||
file_id: FileId,
|
||||
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
@@ -390,9 +391,11 @@ impl ImageLayerInner {
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?;
|
||||
let file = Arc::new(
|
||||
VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?,
|
||||
);
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader
|
||||
@@ -579,8 +582,21 @@ impl ImageLayerInner {
|
||||
.0
|
||||
.into();
|
||||
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
||||
for read in reads.into_iter() {
|
||||
let mut senders: HashMap<(Key, Lsn), oneshot::Sender<Result<Bytes, std::io::Error>>> =
|
||||
Default::default();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
senders.insert((blob_meta.key, blob_meta.lsn), tx);
|
||||
|
||||
reconstruct_state.update_key(
|
||||
&blob_meta.key,
|
||||
blob_meta.lsn,
|
||||
true,
|
||||
rx,
|
||||
);
|
||||
}
|
||||
|
||||
let buf_size = read.size();
|
||||
|
||||
if buf_size > max_vectored_read_bytes {
|
||||
@@ -599,36 +615,32 @@ impl ImageLayerInner {
|
||||
);
|
||||
}
|
||||
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
|
||||
let read_from = self.file.clone();
|
||||
let read_ctx = ctx.attached_child();
|
||||
tokio::task::spawn(async move {
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&*read_from);
|
||||
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
|
||||
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
for meta in blobs_buf.blobs.iter().rev() {
|
||||
let buf = &blobs_buf.buf[meta.start..meta.end];
|
||||
let sender = senders
|
||||
.remove(&(meta.meta.key, meta.meta.lsn))
|
||||
.expect("sender must exist");
|
||||
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
|
||||
}
|
||||
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = frozen_buf.slice(meta.start..meta.end);
|
||||
reconstruct_state.update_key(
|
||||
&meta.meta.key,
|
||||
self.lsn,
|
||||
Value::Image(img_buf),
|
||||
);
|
||||
assert!(senders.is_empty());
|
||||
}
|
||||
Err(err) => {
|
||||
for (_, sender) in senders {
|
||||
let _ = sender.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let kind = err.kind();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
reconstruct_state.on_key_error(
|
||||
blob_meta.key,
|
||||
PageReconstructError::from(anyhow!(
|
||||
"Failed to read blobs from virtual file {}: {}",
|
||||
self.file.path,
|
||||
kind
|
||||
)),
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,10 +10,9 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::{l0_flush, page_cache};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use anyhow::{Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::CompactKey;
|
||||
@@ -36,7 +35,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{
|
||||
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
|
||||
DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState,
|
||||
};
|
||||
|
||||
pub(crate) mod vectored_dio_read;
|
||||
@@ -87,7 +86,7 @@ pub struct InMemoryLayerInner {
|
||||
/// The values are stored in a serialized format in this file.
|
||||
/// Each serialized Value is preceded by a 'u32' length field.
|
||||
/// PerSeg::page_versions map stores offsets into this file.
|
||||
file: EphemeralFile,
|
||||
file: Arc<tokio::sync::RwLock<EphemeralFile>>,
|
||||
|
||||
resource_units: GlobalResourceUnits,
|
||||
}
|
||||
@@ -381,7 +380,11 @@ impl InMemoryLayer {
|
||||
}
|
||||
|
||||
pub(crate) fn try_len(&self) -> Option<u64> {
|
||||
self.inner.try_read().map(|i| i.file.len()).ok()
|
||||
self.inner
|
||||
.try_read()
|
||||
.map(|i| i.file.try_read().map(|i| i.len()).ok())
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
pub(crate) fn assert_writable(&self) {
|
||||
@@ -432,6 +435,10 @@ impl InMemoryLayer {
|
||||
read: vectored_dio_read::LogicalRead<Vec<u8>>,
|
||||
}
|
||||
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
|
||||
let mut senders: HashMap<
|
||||
(Key, Lsn),
|
||||
tokio::sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
|
||||
> = Default::default();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
for (key, vec_map) in inner
|
||||
@@ -459,6 +466,11 @@ impl InMemoryLayer {
|
||||
Vec::with_capacity(len as usize),
|
||||
),
|
||||
});
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
senders.insert((key, *entry_lsn), tx);
|
||||
reconstruct_state.update_key(&key, *entry_lsn, will_init, rx);
|
||||
|
||||
if will_init {
|
||||
break;
|
||||
}
|
||||
@@ -466,46 +478,41 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// Execute the reads.
|
||||
let read_from = inner.file.clone();
|
||||
let read_ctx = ctx.attached_child();
|
||||
tokio::task::spawn(async move {
|
||||
let locked = read_from.read().await;
|
||||
let f = vectored_dio_read::execute(
|
||||
&*locked,
|
||||
reads
|
||||
.iter()
|
||||
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
|
||||
&read_ctx,
|
||||
);
|
||||
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
|
||||
.await;
|
||||
|
||||
let f = vectored_dio_read::execute(
|
||||
&inner.file,
|
||||
reads
|
||||
.iter()
|
||||
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
|
||||
&ctx,
|
||||
);
|
||||
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
|
||||
.await;
|
||||
|
||||
// Process results into the reconstruct state
|
||||
'next_key: for (key, value_reads) in reads {
|
||||
for ValueRead { entry_lsn, read } in value_reads {
|
||||
match read.into_result().expect("we run execute() above") {
|
||||
Err(e) => {
|
||||
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
|
||||
continue 'next_key;
|
||||
}
|
||||
Ok(value_buf) => {
|
||||
let value = Value::des(&value_buf);
|
||||
if let Err(e) = value {
|
||||
reconstruct_state
|
||||
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
|
||||
continue 'next_key;
|
||||
for (key, value_reads) in reads {
|
||||
for ValueRead { entry_lsn, read } in value_reads {
|
||||
let sender = senders
|
||||
.remove(&(key, entry_lsn))
|
||||
.expect("sender must exist");
|
||||
match read.into_result().expect("we run execute() above") {
|
||||
Err(e) => {
|
||||
let sender = senders
|
||||
.remove(&(key, entry_lsn))
|
||||
.expect("sender must exist");
|
||||
let _ = sender.send(Err(std::io::Error::new(e.kind(), "dio vec read failed")));
|
||||
}
|
||||
|
||||
let key_situation =
|
||||
reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
|
||||
if key_situation == ValueReconstructSituation::Complete {
|
||||
// TODO: metric to see if we fetched more values than necessary
|
||||
continue 'next_key;
|
||||
Ok(value_buf) => {
|
||||
let _ = sender.send(Ok(value_buf.into()));
|
||||
}
|
||||
|
||||
// process the next value in the next iteration of the loop
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert!(senders.is_empty());
|
||||
});
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
|
||||
|
||||
@@ -600,7 +607,8 @@ impl InMemoryLayer {
|
||||
/// Get layer size.
|
||||
pub async fn size(&self) -> Result<u64> {
|
||||
let inner = self.inner.read().await;
|
||||
Ok(inner.file.len())
|
||||
let locked = inner.file.try_read().expect("no contention");
|
||||
Ok(locked.len())
|
||||
}
|
||||
|
||||
/// Create a new, empty, in-memory layer
|
||||
@@ -614,9 +622,10 @@ impl InMemoryLayer {
|
||||
) -> Result<InMemoryLayer> {
|
||||
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
|
||||
|
||||
let file =
|
||||
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
|
||||
let key = InMemoryLayerFileId(file.page_cache_file_id());
|
||||
let file = Arc::new(tokio::sync::RwLock::new(
|
||||
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?,
|
||||
));
|
||||
let key = InMemoryLayerFileId(file.read().await.page_cache_file_id());
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
file_id: key,
|
||||
@@ -648,7 +657,7 @@ impl InMemoryLayer {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
|
||||
let base_offset = inner.file.len();
|
||||
let base_offset = inner.file.read().await.len();
|
||||
|
||||
let SerializedBatch {
|
||||
raw,
|
||||
@@ -672,8 +681,13 @@ impl InMemoryLayer {
|
||||
}
|
||||
|
||||
// Write the batch to the file
|
||||
inner.file.write_raw(&raw, ctx).await?;
|
||||
let new_size = inner.file.len();
|
||||
// FIXME: can't borrow arc
|
||||
let new_size = {
|
||||
let mut locked = inner.file.write().await;
|
||||
locked.write_raw(&raw, ctx).await?;
|
||||
locked.len()
|
||||
};
|
||||
|
||||
let expected_new_len = base_offset
|
||||
.checked_add(raw.len().into_u64())
|
||||
// write_raw would error if we were to overflow u64.
|
||||
@@ -713,7 +727,7 @@ impl InMemoryLayer {
|
||||
|
||||
pub(crate) async fn tick(&self) -> Option<u64> {
|
||||
let mut inner = self.inner.write().await;
|
||||
let size = inner.file.len();
|
||||
let size = inner.file.read().await.len();
|
||||
inner.resource_units.publish_size(size)
|
||||
}
|
||||
|
||||
@@ -809,7 +823,7 @@ impl InMemoryLayer {
|
||||
|
||||
match l0_flush_global_state {
|
||||
l0_flush::Inner::Direct { .. } => {
|
||||
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
|
||||
let file_contents: Vec<u8> = inner.file.read().await.load_to_vec(ctx).await?;
|
||||
|
||||
let file_contents = Bytes::from(file_contents);
|
||||
|
||||
|
||||
@@ -107,6 +107,8 @@ async fn smoke_test() {
|
||||
.expect("tenant harness writes the control file")
|
||||
};
|
||||
|
||||
let img_before = (img_before.0, img_before.1.await.unwrap().unwrap());
|
||||
let img_after = (img_after.0, img_after.1.await.unwrap().unwrap());
|
||||
assert_eq!(img_before, img_after);
|
||||
|
||||
// evict_and_wait can timeout, but it doesn't cancel the evicting itself
|
||||
|
||||
@@ -69,7 +69,7 @@ use crate::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::TimelineMetadata,
|
||||
storage_layer::{
|
||||
inmemory_layer::IndexEntry, LayerId, PersistentLayerDesc, ValueReconstructSituation,
|
||||
convert, inmemory_layer::IndexEntry, PersistentLayerDesc,
|
||||
},
|
||||
},
|
||||
walredo,
|
||||
@@ -1130,11 +1130,11 @@ impl Timeline {
|
||||
|
||||
// transform reconstruct state which is per key into a map
|
||||
// layer => all reads from that layer
|
||||
struct KeyWaiter {
|
||||
img: Option<oneshot::Receiver<Bytes>>,
|
||||
values: Vec<oneshot::Receiver<Bytes>>,
|
||||
}
|
||||
let mut key_waiters: BTreeMap<Key, Result<KeyWaiter, PageReconstructError>> = todo!();
|
||||
// struct KeyWaiter {
|
||||
// img: Option<oneshot::Receiver<Bytes>>,
|
||||
// values: Vec<oneshot::Receiver<Bytes>>,
|
||||
// }
|
||||
// let mut key_waiters: BTreeMap<Key, Result<KeyWaiter, PageReconstructError>> = todo!();
|
||||
|
||||
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
|
||||
.for_get_kind(get_kind)
|
||||
@@ -1148,7 +1148,9 @@ impl Timeline {
|
||||
results.insert(key, Err(err));
|
||||
}
|
||||
Ok(state) => {
|
||||
let state = ValueReconstructState::from(state);
|
||||
let state = convert(state)
|
||||
.await
|
||||
.map_err(|err| GetVectoredError::Other(err.into()))?;
|
||||
|
||||
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
|
||||
results.insert(key, reconstruct_res);
|
||||
@@ -5506,30 +5508,30 @@ impl Timeline {
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn inspect_image_layers(
|
||||
self: &Arc<Timeline>,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
_lsn: Lsn,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Key, Bytes)>> {
|
||||
let mut all_data = Vec::new();
|
||||
let guard = self.layers.read().await;
|
||||
for layer in guard.layer_map()?.iter_historic_layers() {
|
||||
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
|
||||
let layer = guard.get_from_desc(&layer);
|
||||
let mut reconstruct_data = ValuesReconstructState::default();
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
KeySpace::single(Key::MIN..Key::MAX),
|
||||
lsn..Lsn(lsn.0 + 1),
|
||||
&mut reconstruct_data,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
for (k, v) in reconstruct_data.keys {
|
||||
all_data.push((k, v?.img.unwrap().1));
|
||||
}
|
||||
}
|
||||
}
|
||||
all_data.sort();
|
||||
Ok(all_data)
|
||||
// let mut all_data = Vec::new();
|
||||
// let guard = self.layers.read().await;
|
||||
// for layer in guard.layer_map()?.iter_historic_layers() {
|
||||
// if !layer.is_delta() && layer.image_layer_lsn() == lsn {
|
||||
// let layer = guard.get_from_desc(&layer);
|
||||
// let mut reconstruct_data = ValuesReconstructState::default();
|
||||
// layer
|
||||
// .get_values_reconstruct_data(
|
||||
// KeySpace::single(Key::MIN..Key::MAX),
|
||||
// lsn..Lsn(lsn.0 + 1),
|
||||
// &mut reconstruct_data,
|
||||
// ctx,
|
||||
// )
|
||||
// .await?;
|
||||
// for (k, v) in reconstruct_data.keys {
|
||||
// all_data.push((k, v?.img.unwrap().1));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// all_data.sort();
|
||||
Ok(Vec::new())
|
||||
}
|
||||
|
||||
/// Get all historic layer descriptors in the layer map
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use futures::channel::oneshot;
|
||||
use pageserver_api::key::Key;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::BoundedBuf;
|
||||
@@ -34,6 +33,7 @@ use crate::virtual_file::{self, VirtualFile};
|
||||
pub struct BlobMeta {
|
||||
pub key: Key,
|
||||
pub lsn: Lsn,
|
||||
pub will_init: bool,
|
||||
}
|
||||
|
||||
/// Blob offsets into [`VectoredBlobsBuf::buf`]
|
||||
@@ -356,7 +356,8 @@ pub enum BlobFlag {
|
||||
/// * Iterate over the collected blobs and coalesce them into reads at the end
|
||||
pub struct VectoredReadPlanner {
|
||||
// Track all the blob offsets. Start offsets must be ordered.
|
||||
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
|
||||
// Note: last bool is will_init
|
||||
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
|
||||
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
|
||||
prev: Option<(Key, Lsn, u64, BlobFlag)>,
|
||||
|
||||
@@ -421,12 +422,12 @@ impl VectoredReadPlanner {
|
||||
match flag {
|
||||
BlobFlag::None => {
|
||||
let blobs_for_key = self.blobs.entry(key).or_default();
|
||||
blobs_for_key.push((lsn, start_offset, end_offset));
|
||||
blobs_for_key.push((lsn, start_offset, end_offset, false));
|
||||
}
|
||||
BlobFlag::ReplaceAll => {
|
||||
let blobs_for_key = self.blobs.entry(key).or_default();
|
||||
blobs_for_key.clear();
|
||||
blobs_for_key.push((lsn, start_offset, end_offset));
|
||||
blobs_for_key.push((lsn, start_offset, end_offset, true));
|
||||
}
|
||||
BlobFlag::Ignore => {}
|
||||
}
|
||||
@@ -437,11 +438,17 @@ impl VectoredReadPlanner {
|
||||
let mut reads = Vec::new();
|
||||
|
||||
for (key, blobs_for_key) in self.blobs {
|
||||
for (lsn, start_offset, end_offset) in blobs_for_key {
|
||||
for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
|
||||
let extended = match &mut current_read_builder {
|
||||
Some(read_builder) => {
|
||||
read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
|
||||
}
|
||||
Some(read_builder) => read_builder.extend(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init,
|
||||
},
|
||||
),
|
||||
None => VectoredReadExtended::No,
|
||||
};
|
||||
|
||||
@@ -449,7 +456,11 @@ impl VectoredReadPlanner {
|
||||
let next_read_builder = VectoredReadBuilder::new(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta { key, lsn },
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init,
|
||||
},
|
||||
self.max_read_size,
|
||||
self.mode,
|
||||
);
|
||||
@@ -670,7 +681,15 @@ impl StreamingVectoredReadPlanner {
|
||||
) -> Option<VectoredRead> {
|
||||
match &mut self.read_builder {
|
||||
Some(read_builder) => {
|
||||
let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
|
||||
let extended = read_builder.extend(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init: false,
|
||||
},
|
||||
);
|
||||
assert_eq!(extended, VectoredReadExtended::Yes);
|
||||
}
|
||||
None => {
|
||||
@@ -678,7 +697,11 @@ impl StreamingVectoredReadPlanner {
|
||||
Some(VectoredReadBuilder::new_streaming(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta { key, lsn },
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init: false,
|
||||
},
|
||||
self.mode,
|
||||
))
|
||||
};
|
||||
@@ -1010,6 +1033,7 @@ mod tests {
|
||||
let meta = BlobMeta {
|
||||
key: Key::MIN,
|
||||
lsn: Lsn(0),
|
||||
will_init: false,
|
||||
};
|
||||
|
||||
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
|
||||
|
||||
Reference in New Issue
Block a user