mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
Preparatory pageserver async conversions (#4773)
In #4743, we'd like to convert the read path to use `async` rust. In preparation of that, this PR switches some functions that are calling lower level functions like `BlockReader::read_blk`, `BlockCursor::read_blob`, etc into `async`. The PR does not switch all functions however, and only focuses on the ones which are easy to switch. This leaves around some async functions that are (currently) unnecessarily `async`, but on the other hand it makes future changes smaller in diff. Part of #4743 (but does not completely address it).
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2506,6 +2506,7 @@ dependencies = [
|
||||
"pageserver",
|
||||
"postgres_ffi",
|
||||
"svg_fmt",
|
||||
"tokio",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -13,6 +13,7 @@ clap = { workspace = true, features = ["string"] }
|
||||
git-version.workspace = true
|
||||
pageserver = { path = ".." }
|
||||
postgres_ffi.workspace = true
|
||||
tokio.workspace = true
|
||||
utils.workspace = true
|
||||
svg_fmt.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -95,7 +95,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
|
||||
}
|
||||
|
||||
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
|
||||
fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
let file = FileBlockReader::new(VirtualFile::open(path)?);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
@@ -129,7 +129,7 @@ fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
Ok(holes)
|
||||
}
|
||||
|
||||
pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
let storage_path = &cmd.path;
|
||||
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
|
||||
|
||||
@@ -160,7 +160,7 @@ pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
parse_filename(&layer.file_name().into_string().unwrap())
|
||||
{
|
||||
if layer_file.is_delta {
|
||||
layer_file.holes = get_holes(&layer.path(), max_holes)?;
|
||||
layer_file.holes = get_holes(&layer.path(), max_holes).await?;
|
||||
n_deltas += 1;
|
||||
}
|
||||
layers.push(layer_file);
|
||||
|
||||
@@ -43,8 +43,7 @@ pub(crate) enum LayerCmd {
|
||||
},
|
||||
}
|
||||
|
||||
fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
use pageserver::tenant::blob_io::BlobCursor;
|
||||
async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
use pageserver::tenant::block_io::BlockReader;
|
||||
|
||||
let path = path.as_ref();
|
||||
@@ -78,7 +77,7 @@ fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
match cmd {
|
||||
LayerCmd::List { path } => {
|
||||
for tenant in fs::read_dir(path.join("tenants"))? {
|
||||
@@ -153,7 +152,7 @@ pub(crate) fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
);
|
||||
|
||||
if layer_file.is_delta {
|
||||
read_delta_file(layer.path())?;
|
||||
read_delta_file(layer.path()).await?;
|
||||
} else {
|
||||
anyhow::bail!("not supported yet :(");
|
||||
}
|
||||
|
||||
@@ -72,12 +72,13 @@ struct AnalyzeLayerMapCmd {
|
||||
max_holes: Option<usize>,
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let cli = CliOpts::parse();
|
||||
|
||||
match cli.command {
|
||||
Commands::Layer(cmd) => {
|
||||
layers::main(&cmd)?;
|
||||
layers::main(&cmd).await?;
|
||||
}
|
||||
Commands::Metadata(cmd) => {
|
||||
handle_metadata(&cmd)?;
|
||||
@@ -86,7 +87,7 @@ fn main() -> anyhow::Result<()> {
|
||||
draw_timeline_dir::main()?;
|
||||
}
|
||||
Commands::AnalyzeLayerMap(cmd) => {
|
||||
layer_map_analyzer::main(&cmd)?;
|
||||
layer_map_analyzer::main(&cmd).await?;
|
||||
}
|
||||
Commands::PrintLayerFile(cmd) => {
|
||||
if let Err(e) = read_pg_control_file(&cmd.path) {
|
||||
@@ -94,7 +95,7 @@ fn main() -> anyhow::Result<()> {
|
||||
"Failed to read input file as a pg control one: {e:#}\n\
|
||||
Attempting to read it as layer file"
|
||||
);
|
||||
print_layerfile(&cmd.path)?;
|
||||
print_layerfile(&cmd.path).await?;
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -113,12 +114,12 @@ fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
async fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx)
|
||||
dump_layerfile_from_path(path, true, &ctx).await
|
||||
}
|
||||
|
||||
fn handle_metadata(
|
||||
|
||||
@@ -3210,7 +3210,7 @@ impl Drop for Tenant {
|
||||
}
|
||||
}
|
||||
/// Dump contents of a layer file to stdout.
|
||||
pub fn dump_layerfile_from_path(
|
||||
pub async fn dump_layerfile_from_path(
|
||||
path: &Path,
|
||||
verbose: bool,
|
||||
ctx: &RequestContext,
|
||||
@@ -3224,8 +3224,16 @@ pub fn dump_layerfile_from_path(
|
||||
file.read_exact_at(&mut header_buf, 0)?;
|
||||
|
||||
match u16::from_be_bytes(header_buf) {
|
||||
crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
|
||||
crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
|
||||
crate::IMAGE_FILE_MAGIC => {
|
||||
ImageLayer::new_for_path(path, file)?
|
||||
.dump(verbose, ctx)
|
||||
.await?
|
||||
}
|
||||
crate::DELTA_FILE_MAGIC => {
|
||||
DeltaLayer::new_for_path(path, file)?
|
||||
.dump(verbose, ctx)
|
||||
.await?
|
||||
}
|
||||
magic => bail!("unrecognized magic identifier: {:?}", magic),
|
||||
}
|
||||
|
||||
|
||||
@@ -16,29 +16,19 @@ use crate::tenant::block_io::{BlockCursor, BlockReader};
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
/// For reading
|
||||
pub trait BlobCursor {
|
||||
impl<R> BlockCursor<R>
|
||||
where
|
||||
R: BlockReader,
|
||||
{
|
||||
/// Read a blob into a new buffer.
|
||||
fn read_blob(&mut self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
|
||||
pub fn read_blob(&mut self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
|
||||
let mut buf = Vec::new();
|
||||
self.read_blob_into_buf(offset, &mut buf)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
fn read_blob_into_buf(
|
||||
&mut self,
|
||||
offset: u64,
|
||||
dstbuf: &mut Vec<u8>,
|
||||
) -> Result<(), std::io::Error>;
|
||||
}
|
||||
|
||||
impl<R> BlobCursor for BlockCursor<R>
|
||||
where
|
||||
R: BlockReader,
|
||||
{
|
||||
fn read_blob_into_buf(
|
||||
pub fn read_blob_into_buf(
|
||||
&mut self,
|
||||
offset: u64,
|
||||
dstbuf: &mut Vec<u8>,
|
||||
|
||||
@@ -328,7 +328,7 @@ fn to_io_error(e: anyhow::Error, context: &str) -> io::Error {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use rand::{seq::SliceRandom, thread_rng, RngCore};
|
||||
use std::fs;
|
||||
|
||||
@@ -626,17 +626,17 @@ impl LayerMap {
|
||||
|
||||
/// debugging function to print out the contents of the layer map
|
||||
#[allow(unused)]
|
||||
pub fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!("Begin dump LayerMap");
|
||||
|
||||
println!("open_layer:");
|
||||
if let Some(open_layer) = &self.open_layer {
|
||||
open_layer.dump(verbose, ctx)?;
|
||||
open_layer.dump(verbose, ctx).await?;
|
||||
}
|
||||
|
||||
println!("frozen_layers:");
|
||||
for frozen_layer in self.frozen_layers.iter() {
|
||||
frozen_layer.dump(verbose, ctx)?;
|
||||
frozen_layer.dump(verbose, ctx).await?;
|
||||
}
|
||||
|
||||
println!("historic_layers:");
|
||||
|
||||
@@ -338,7 +338,8 @@ impl LayerAccessStats {
|
||||
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
|
||||
/// timeline names, because those are known in the context of which the layers
|
||||
/// are used in (timeline).
|
||||
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync {
|
||||
#[async_trait::async_trait]
|
||||
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {
|
||||
/// Range of keys that this layer covers
|
||||
fn get_key_range(&self) -> Range<Key>;
|
||||
|
||||
@@ -368,7 +369,7 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync {
|
||||
/// is available. If this returns ValueReconstructResult::Continue, look up
|
||||
/// the predecessor layer and call again with the same 'reconstruct_data' to
|
||||
/// collect more data.
|
||||
fn get_value_reconstruct_data(
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
@@ -377,7 +378,7 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync {
|
||||
) -> Result<ValueReconstructResult>;
|
||||
|
||||
/// Dump summary of the contents of the layer to stdout
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
|
||||
async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
|
||||
}
|
||||
|
||||
/// Returned by [`PersistentLayer::iter`]
|
||||
|
||||
@@ -31,7 +31,7 @@ use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{PageReadGuard, PAGE_SZ};
|
||||
use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{
|
||||
@@ -223,9 +223,10 @@ impl std::fmt::Debug for DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for DeltaLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
@@ -300,7 +301,7 @@ impl Layer for DeltaLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::repository::{Key, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{
|
||||
@@ -155,9 +155,10 @@ impl std::fmt::Debug for ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for ImageLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
@@ -189,7 +190,7 @@ impl Layer for ImageLayer {
|
||||
}
|
||||
|
||||
/// Look up given page in the file
|
||||
fn get_value_reconstruct_data(
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::block_io::BlockReader;
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
|
||||
@@ -110,6 +110,7 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for InMemoryLayer {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
Key::MIN..Key::MAX
|
||||
@@ -132,7 +133,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_str = inner
|
||||
@@ -183,7 +184,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
|
||||
/// Look up given value in the layer.
|
||||
fn get_value_reconstruct_data(
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
@@ -65,8 +65,9 @@ impl std::fmt::Debug for RemoteLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for RemoteLayer {
|
||||
fn get_value_reconstruct_data(
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
@@ -77,7 +78,7 @@ impl Layer for RemoteLayer {
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- remote layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
|
||||
@@ -2387,12 +2387,15 @@ impl Timeline {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match open_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
result = match open_layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
@@ -2414,12 +2417,15 @@ impl Timeline {
|
||||
if cont_lsn > start_lsn {
|
||||
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match frozen_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
result = match frozen_layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
@@ -2450,12 +2456,15 @@ impl Timeline {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||
result = match layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
result = match layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user