we can avoid adding the Arc<Mutex<>> around EphemeralLayer if we instead extend the lifetime of the InMemoryLayer for the spawned IO future; plus it's semantically more similar to what we now do for Delta and Image layers

This commit is contained in:
Christian Schwarz
2025-01-17 18:16:17 +01:00
parent c43400389f
commit 40ab9c2c5e

View File

@@ -87,7 +87,7 @@ pub struct InMemoryLayerInner {
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
file: Arc<tokio::sync::RwLock<EphemeralFile>>,
file: EphemeralFile,
resource_units: GlobalResourceUnits,
}
@@ -381,11 +381,7 @@ impl InMemoryLayer {
}
pub(crate) fn try_len(&self) -> Option<u64> {
self.inner
.try_read()
.map(|i| i.file.try_read().map(|i| i.len()).ok())
.ok()
.flatten()
self.inner.try_read().map(|i| i.file.len()).ok()
}
pub(crate) fn assert_writable(&self) {
@@ -417,7 +413,7 @@ impl InMemoryLayer {
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
pub(crate) async fn get_values_reconstruct_data(
&self,
self: &Arc<InMemoryLayer>,
keyspace: KeySpace,
end_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
@@ -475,13 +471,13 @@ impl InMemoryLayer {
}
}
let read_from = inner.file.clone();
let read_from = Arc::clone(self);
let read_ctx = ctx.attached_child();
reconstruct_state
.spawn_io(async move {
let locked = read_from.read().await;
let inner = read_from.inner.read().await;
let f = vectored_dio_read::execute(
&*locked,
&inner.file,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
@@ -511,6 +507,13 @@ impl InMemoryLayer {
}
assert!(senders.is_empty());
// Keep layer existent until this IO is done;
// This is kinda forced for InMemoryLayer because we need to inner.read() anyway,
// but it's less obvious for DeltaLayer and ImageLayer. So, keep this explicit
// drop for consistency among all three layer types.
drop(inner);
drop(read_from);
})
.await;
@@ -543,8 +546,7 @@ impl InMemoryLayer {
/// Get layer size.
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
let locked = inner.file.try_read().expect("no contention");
Ok(locked.len())
Ok(inner.file.len())
}
pub fn estimated_in_mem_size(&self) -> u64 {
@@ -562,10 +564,8 @@ impl InMemoryLayer {
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
let file = Arc::new(tokio::sync::RwLock::new(
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, ctx).await?,
));
let key = InMemoryLayerFileId(file.read().await.page_cache_file_id());
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, ctx).await?;
let key = InMemoryLayerFileId(file.page_cache_file_id());
Ok(InMemoryLayer {
file_id: key,
@@ -598,7 +598,7 @@ impl InMemoryLayer {
let mut inner = self.inner.write().await;
self.assert_writable();
let base_offset = inner.file.read().await.len();
let base_offset = inner.file.len();
let SerializedValueBatch {
raw,
@@ -608,12 +608,8 @@ impl InMemoryLayer {
} = serialized_batch;
// Write the batch to the file
// FIXME: can't borrow arc
let new_size = {
let mut locked = inner.file.write().await;
locked.write_raw(&raw, ctx).await?;
locked.len()
};
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
@@ -676,7 +672,7 @@ impl InMemoryLayer {
pub(crate) async fn tick(&self) -> Option<u64> {
let mut inner = self.inner.write().await;
let size = inner.file.read().await.len();
let size = inner.file.len();
inner.resource_units.publish_size(size)
}
@@ -772,7 +768,7 @@ impl InMemoryLayer {
match l0_flush_global_state {
l0_flush::Inner::Direct { .. } => {
let file_contents = inner.file.read().await.load_to_io_buf(ctx).await?;
let file_contents = inner.file.load_to_io_buf(ctx).await?;
let file_contents = file_contents.freeze();
for (key, vec_map) in inner.index.iter() {