From e5b7ddfeeebc25d9ca0f183704244d6abb90a449 Mon Sep 17 00:00:00 2001 From: arpad-m Date: Mon, 24 Jul 2023 14:01:54 +0200 Subject: [PATCH] Preparatory pageserver async conversions (#4773) In #4743, we'd like to convert the read path to use `async` rust. In preparation of that, this PR switches some functions that are calling lower level functions like `BlockReader::read_blk`, `BlockCursor::read_blob`, etc into `async`. The PR does not switch all functions however, and only focuses on the ones which are easy to switch. This leaves around some async functions that are (currently) unnecessarily `async`, but on the other hand it makes future changes smaller in diff. Part of #4743 (but does not completely address it). --- Cargo.lock | 1 + pageserver/ctl/Cargo.toml | 1 + pageserver/ctl/src/layer_map_analyzer.rs | 6 +-- pageserver/ctl/src/layers.rs | 7 ++- pageserver/ctl/src/main.rs | 13 +++--- pageserver/src/tenant.rs | 14 ++++-- pageserver/src/tenant/blob_io.rs | 22 +++------ pageserver/src/tenant/ephemeral_file.rs | 2 +- pageserver/src/tenant/layer_map.rs | 6 +-- pageserver/src/tenant/storage_layer.rs | 7 +-- .../src/tenant/storage_layer/delta_layer.rs | 7 +-- .../src/tenant/storage_layer/image_layer.rs | 7 +-- .../tenant/storage_layer/inmemory_layer.rs | 7 +-- .../src/tenant/storage_layer/remote_layer.rs | 5 ++- pageserver/src/tenant/timeline.rs | 45 +++++++++++-------- 15 files changed, 82 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9ca53064f..05a70bfe55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2506,6 +2506,7 @@ dependencies = [ "pageserver", "postgres_ffi", "svg_fmt", + "tokio", "utils", "workspace_hack", ] diff --git a/pageserver/ctl/Cargo.toml b/pageserver/ctl/Cargo.toml index 89e0d0486e..b3f12b72b3 100644 --- a/pageserver/ctl/Cargo.toml +++ b/pageserver/ctl/Cargo.toml @@ -13,6 +13,7 @@ clap = { workspace = true, features = ["string"] } git-version.workspace = true pageserver = { path = ".." } postgres_ffi.workspace = true +tokio.workspace = true utils.workspace = true svg_fmt.workspace = true workspace_hack.workspace = true diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index f2ced6154f..da83c061c0 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -95,7 +95,7 @@ pub(crate) fn parse_filename(name: &str) -> Option { } // Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH" -fn get_holes(path: &Path, max_holes: usize) -> Result> { +async fn get_holes(path: &Path, max_holes: usize) -> Result> { let file = FileBlockReader::new(VirtualFile::open(path)?); let summary_blk = file.read_blk(0)?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; @@ -129,7 +129,7 @@ fn get_holes(path: &Path, max_holes: usize) -> Result> { Ok(holes) } -pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { +pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { let storage_path = &cmd.path; let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES); @@ -160,7 +160,7 @@ pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { parse_filename(&layer.file_name().into_string().unwrap()) { if layer_file.is_delta { - layer_file.holes = get_holes(&layer.path(), max_holes)?; + layer_file.holes = get_holes(&layer.path(), max_holes).await?; n_deltas += 1; } layers.push(layer_file); diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index d77cf0908c..d81303d533 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -43,8 +43,7 @@ pub(crate) enum LayerCmd { }, } -fn read_delta_file(path: impl AsRef) -> Result<()> { - use pageserver::tenant::blob_io::BlobCursor; +async fn read_delta_file(path: impl AsRef) -> Result<()> { use pageserver::tenant::block_io::BlockReader; let path = path.as_ref(); @@ -78,7 +77,7 @@ fn read_delta_file(path: impl AsRef) -> Result<()> { Ok(()) } -pub(crate) fn main(cmd: &LayerCmd) -> Result<()> { +pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { match cmd { LayerCmd::List { path } => { for tenant in fs::read_dir(path.join("tenants"))? { @@ -153,7 +152,7 @@ pub(crate) fn main(cmd: &LayerCmd) -> Result<()> { ); if layer_file.is_delta { - read_delta_file(layer.path())?; + read_delta_file(layer.path()).await?; } else { anyhow::bail!("not supported yet :("); } diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 55db9eb7e7..0d154aca0c 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -72,12 +72,13 @@ struct AnalyzeLayerMapCmd { max_holes: Option, } -fn main() -> anyhow::Result<()> { +#[tokio::main] +async fn main() -> anyhow::Result<()> { let cli = CliOpts::parse(); match cli.command { Commands::Layer(cmd) => { - layers::main(&cmd)?; + layers::main(&cmd).await?; } Commands::Metadata(cmd) => { handle_metadata(&cmd)?; @@ -86,7 +87,7 @@ fn main() -> anyhow::Result<()> { draw_timeline_dir::main()?; } Commands::AnalyzeLayerMap(cmd) => { - layer_map_analyzer::main(&cmd)?; + layer_map_analyzer::main(&cmd).await?; } Commands::PrintLayerFile(cmd) => { if let Err(e) = read_pg_control_file(&cmd.path) { @@ -94,7 +95,7 @@ fn main() -> anyhow::Result<()> { "Failed to read input file as a pg control one: {e:#}\n\ Attempting to read it as layer file" ); - print_layerfile(&cmd.path)?; + print_layerfile(&cmd.path).await?; } } }; @@ -113,12 +114,12 @@ fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> { Ok(()) } -fn print_layerfile(path: &Path) -> anyhow::Result<()> { +async fn print_layerfile(path: &Path) -> anyhow::Result<()> { // Basic initialization of things that don't change after startup virtual_file::init(10); page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); - dump_layerfile_from_path(path, true, &ctx) + dump_layerfile_from_path(path, true, &ctx).await } fn handle_metadata( diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 44f1a6cd65..81dce84b04 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3210,7 +3210,7 @@ impl Drop for Tenant { } } /// Dump contents of a layer file to stdout. -pub fn dump_layerfile_from_path( +pub async fn dump_layerfile_from_path( path: &Path, verbose: bool, ctx: &RequestContext, @@ -3224,8 +3224,16 @@ pub fn dump_layerfile_from_path( file.read_exact_at(&mut header_buf, 0)?; match u16::from_be_bytes(header_buf) { - crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose, ctx)?, - crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose, ctx)?, + crate::IMAGE_FILE_MAGIC => { + ImageLayer::new_for_path(path, file)? + .dump(verbose, ctx) + .await? + } + crate::DELTA_FILE_MAGIC => { + DeltaLayer::new_for_path(path, file)? + .dump(verbose, ctx) + .await? + } magic => bail!("unrecognized magic identifier: {:?}", magic), } diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 52eafc72ee..4dcf7fe5fe 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -16,29 +16,19 @@ use crate::tenant::block_io::{BlockCursor, BlockReader}; use std::cmp::min; use std::io::{Error, ErrorKind}; -/// For reading -pub trait BlobCursor { +impl BlockCursor +where + R: BlockReader, +{ /// Read a blob into a new buffer. - fn read_blob(&mut self, offset: u64) -> Result, std::io::Error> { + pub fn read_blob(&mut self, offset: u64) -> Result, std::io::Error> { let mut buf = Vec::new(); self.read_blob_into_buf(offset, &mut buf)?; Ok(buf) } - /// Read blob into the given buffer. Any previous contents in the buffer /// are overwritten. - fn read_blob_into_buf( - &mut self, - offset: u64, - dstbuf: &mut Vec, - ) -> Result<(), std::io::Error>; -} - -impl BlobCursor for BlockCursor -where - R: BlockReader, -{ - fn read_blob_into_buf( + pub fn read_blob_into_buf( &mut self, offset: u64, dstbuf: &mut Vec, diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 0d3c5da91c..f44534249c 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -328,7 +328,7 @@ fn to_io_error(e: anyhow::Error, context: &str) -> io::Error { #[cfg(test)] mod tests { use super::*; - use crate::tenant::blob_io::{BlobCursor, BlobWriter}; + use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::BlockCursor; use rand::{seq::SliceRandom, thread_rng, RngCore}; use std::fs; diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 2908d3a83c..c4b894fcf5 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -626,17 +626,17 @@ impl LayerMap { /// debugging function to print out the contents of the layer map #[allow(unused)] - pub fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { + pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { println!("Begin dump LayerMap"); println!("open_layer:"); if let Some(open_layer) = &self.open_layer { - open_layer.dump(verbose, ctx)?; + open_layer.dump(verbose, ctx).await?; } println!("frozen_layers:"); for frozen_layer in self.frozen_layers.iter() { - frozen_layer.dump(verbose, ctx)?; + frozen_layer.dump(verbose, ctx).await?; } println!("historic_layers:"); diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index c6d1a0052a..05381cb56d 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -338,7 +338,8 @@ impl LayerAccessStats { /// All layers should implement a minimal `std::fmt::Debug` without tenant or /// timeline names, because those are known in the context of which the layers /// are used in (timeline). -pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync { +#[async_trait::async_trait] +pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static { /// Range of keys that this layer covers fn get_key_range(&self) -> Range; @@ -368,7 +369,7 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync { /// is available. If this returns ValueReconstructResult::Continue, look up /// the predecessor layer and call again with the same 'reconstruct_data' to /// collect more data. - fn get_value_reconstruct_data( + async fn get_value_reconstruct_data( &self, key: Key, lsn_range: Range, @@ -377,7 +378,7 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync { ) -> Result; /// Dump summary of the contents of the layer to stdout - fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>; + async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>; } /// Returned by [`PersistentLayer::iter`] diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 83a22f9f13..9585c04120 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -31,7 +31,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache::{PageReadGuard, PAGE_SZ}; use crate::repository::{Key, Value, KEY_SIZE}; -use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; +use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter}; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ @@ -223,9 +223,10 @@ impl std::fmt::Debug for DeltaLayerInner { } } +#[async_trait::async_trait] impl Layer for DeltaLayer { /// debugging function to print out the contents of the layer - fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { + async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { println!( "----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----", self.desc.tenant_id, @@ -300,7 +301,7 @@ impl Layer for DeltaLayer { Ok(()) } - fn get_value_reconstruct_data( + async fn get_value_reconstruct_data( &self, key: Key, lsn_range: Range, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index b8601af818..53cff824e3 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -27,7 +27,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::repository::{Key, KEY_SIZE}; -use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; +use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter}; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ @@ -155,9 +155,10 @@ impl std::fmt::Debug for ImageLayerInner { } } +#[async_trait::async_trait] impl Layer for ImageLayer { /// debugging function to print out the contents of the layer - fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { + async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { println!( "----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----", self.desc.tenant_id, @@ -189,7 +190,7 @@ impl Layer for ImageLayer { } /// Look up given page in the file - fn get_value_reconstruct_data( + async fn get_value_reconstruct_data( &self, key: Key, lsn_range: Range, diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 77778822cf..31d0b5997a 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -7,7 +7,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::{Key, Value}; -use crate::tenant::blob_io::{BlobCursor, BlobWriter}; +use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::BlockReader; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState}; @@ -110,6 +110,7 @@ impl InMemoryLayer { } } +#[async_trait::async_trait] impl Layer for InMemoryLayer { fn get_key_range(&self) -> Range { Key::MIN..Key::MAX @@ -132,7 +133,7 @@ impl Layer for InMemoryLayer { } /// debugging function to print out the contents of the layer - fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> { + async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> { let inner = self.inner.read().unwrap(); let end_str = inner @@ -183,7 +184,7 @@ impl Layer for InMemoryLayer { } /// Look up given value in the layer. - fn get_value_reconstruct_data( + async fn get_value_reconstruct_data( &self, key: Key, lsn_range: Range, diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index d3c40d93bb..e5511d6051 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -65,8 +65,9 @@ impl std::fmt::Debug for RemoteLayer { } } +#[async_trait::async_trait] impl Layer for RemoteLayer { - fn get_value_reconstruct_data( + async fn get_value_reconstruct_data( &self, _key: Key, _lsn_range: Range, @@ -77,7 +78,7 @@ impl Layer for RemoteLayer { } /// debugging function to print out the contents of the layer - fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { + async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { println!( "----- remote layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----", self.desc.tenant_id, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 616d184393..eceb54efc5 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2387,12 +2387,15 @@ impl Timeline { // Get all the data needed to reconstruct the page version from this layer. // But if we have an older cached page image, no need to go past that. let lsn_floor = max(cached_lsn + 1, start_lsn); - result = match open_layer.get_value_reconstruct_data( - key, - lsn_floor..cont_lsn, - reconstruct_state, - ctx, - ) { + result = match open_layer + .get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + ctx, + ) + .await + { Ok(result) => result, Err(e) => return Err(PageReconstructError::from(e)), }; @@ -2414,12 +2417,15 @@ impl Timeline { if cont_lsn > start_lsn { //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display()); let lsn_floor = max(cached_lsn + 1, start_lsn); - result = match frozen_layer.get_value_reconstruct_data( - key, - lsn_floor..cont_lsn, - reconstruct_state, - ctx, - ) { + result = match frozen_layer + .get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + ctx, + ) + .await + { Ok(result) => result, Err(e) => return Err(PageReconstructError::from(e)), }; @@ -2450,12 +2456,15 @@ impl Timeline { // Get all the data needed to reconstruct the page version from this layer. // But if we have an older cached page image, no need to go past that. let lsn_floor = max(cached_lsn + 1, lsn_floor); - result = match layer.get_value_reconstruct_data( - key, - lsn_floor..cont_lsn, - reconstruct_state, - ctx, - ) { + result = match layer + .get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + ctx, + ) + .await + { Ok(result) => result, Err(e) => return Err(PageReconstructError::from(e)), };