layer impls: run get_value_reconstruct_data in spawn_blocking

Effectively, this means we use the tokio runtime's spawn_blocking-thread-pool
to execute the layer reads, instead of doing the reads on the
tokio runtime's main executor threads.

The use of the thread pool adds some overhead, but, not blocking
the main executor threads is more important, because they can now
execute other async tasks while we do the IO.

With a sufficiently large spawn_blocking-thread-pool, we also get more
IO parallelism between timelines than with blocking the main executor
threads. So, we might push the pageserver's NVMe closer to its limits.
But right now, there's lots of headroom.
This commit is contained in:
Christian Schwarz
2023-05-11 17:10:15 +02:00
parent 6718cf0ae6
commit 5443c73d20
3 changed files with 133 additions and 120 deletions

View File

@@ -325,83 +325,86 @@ impl Layer for DeltaLayer {
ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
ensure!(lsn_range.start >= self.lsn_range.start);
let mut need_image = true;
tokio::task::spawn_blocking(move || {
ensure!(lsn_range.start >= self.lsn_range.start);
let mut need_image = true;
ensure!(self.key_range.contains(&key));
ensure!(self.key_range.contains(&key));
{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})?;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})?;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
// release metadata lock and close the file
}
// release metadata lock and close the file
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
})
.await
.context("spawn_blocking")?
})
}
}

View File

@@ -204,33 +204,37 @@ impl Layer for ImageLayer {
ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
assert!(self.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
tokio::task::spawn_blocking(move || {
assert!(self.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let file = inner.file.as_ref().unwrap();
let tree_reader =
DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
let file = inner.file.as_ref().unwrap();
let tree_reader =
DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf)? {
let blob = file.block_cursor().read_blob(offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.path().display(),
offset
)
})?;
let value = Bytes::from(blob);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf)? {
let blob = file.block_cursor().read_blob(offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.path().display(),
offset
)
})?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok((reconstruct_state, ValueReconstructResult::Complete))
} else {
Ok((reconstruct_state, ValueReconstructResult::Missing))
}
reconstruct_state.img = Some((self.lsn, value));
Ok((reconstruct_state, ValueReconstructResult::Complete))
} else {
Ok((reconstruct_state, ValueReconstructResult::Missing))
}
})
.await
.context("spawn_blocking")?
})
}
}

View File

@@ -12,7 +12,7 @@ use crate::tenant::block_io::BlockReader;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
use crate::walrecord;
use anyhow::{ensure, Result};
use anyhow::{ensure, Context, Result};
use pageserver_api::models::InMemoryLayerInfo;
use std::cell::RefCell;
use std::collections::HashMap;
@@ -198,46 +198,52 @@ impl Layer for InMemoryLayer {
_ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
// The in-memory layer isn't actually in-memory. It uses EphemeralFile.
// So, this does do IO.
tokio::task::spawn_blocking(move || {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
let inner = self.inner.read().unwrap();
let inner = self.inner.read().unwrap();
let mut reader = inner.file.block_cursor();
let mut reader = inner.file.block_cursor();
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos)?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok((reconstruct_state, ValueReconstructResult::Complete));
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos)?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok((reconstruct_state, ValueReconstructResult::Complete));
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
}
}
// release lock on 'inner'
// release lock on 'inner'
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
})
.await
.context("spawn_blocking")?
})
}
}