tokio-epoll-uring: use it for on-demand downloads (#6992)

# Problem

On-demand downloads are still using `tokio::fs`, which we know is
inefficient.

# Changes

- Add `pagebench ondemand-download-churn` to quantify on-demand download
throughput
- Requires dumping layer map, which required making `history_buffer`
impl `Deserialize`
- Implement an equivalent of `tokio::io::copy_buf` for owned buffers =>
`owned_buffers_io` module and children.
- Make layer file download sensitive to `io_engine::get()`, using
VirtualFile + above copy loop
- For this, I had to move some code into the `retry_download`, e.g.,
`sync_all()` call.

Drive-by:
- fix missing escaping in `scripts/ps_ec2_setup_instance_store` 
- if we failed in retry_download to create a file, we'd try to remove
it, encounter `NotFound`, and `abort()` the process using
`on_fatal_io_error`. This PR adds treats `NotFound` as a success.

# Testing

Functional

- The copy loop is generic & unit tested.

Performance

- Used the `ondemand-download-churn` benchmark to manually test against
real S3.
- Results (public Notion page):
https://neondatabase.notion.site/Benchmarking-tokio-epoll-uring-on-demand-downloads-2024-04-15-newer-code-03c0fdc475c54492b44d9627b6e4e710?pvs=4
- Performance is equivalent at low concurrency. Jumpier situation at
high concurrency, but, still less CPU / throughput with
tokio-epoll-uring.
  - It’s a win.

# Future Work

Turn the manual performance testing described in the above results
document into a performance regression test:
https://github.com/neondatabase/neon/issues/7146
This commit is contained in:
Christian Schwarz
2024-03-15 19:57:05 +01:00
committed by GitHub
parent 1aa159acca
commit ad6f538aef
12 changed files with 845 additions and 102 deletions

View File

@@ -4,6 +4,7 @@ pub mod utilization;
pub use utilization::PageserverUtilization;
use std::{
borrow::Cow,
collections::HashMap,
io::{BufRead, Read},
num::{NonZeroU64, NonZeroUsize},
@@ -577,7 +578,7 @@ pub struct TimelineInfo {
pub walreceiver_status: String,
}
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerMapInfo {
pub in_memory_layers: Vec<InMemoryLayerInfo>,
pub historic_layers: Vec<HistoricLayerInfo>,
@@ -595,7 +596,7 @@ pub enum LayerAccessKind {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerAccessStatFullDetails {
pub when_millis_since_epoch: u64,
pub task_kind: &'static str,
pub task_kind: Cow<'static, str>,
pub access_kind: LayerAccessKind,
}
@@ -654,23 +655,23 @@ impl LayerResidenceEvent {
}
}
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerAccessStats {
pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
pub task_kind_access_flag: Vec<&'static str>,
pub task_kind_access_flag: Vec<Cow<'static, str>>,
pub first: Option<LayerAccessStatFullDetails>,
pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
}
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum InMemoryLayerInfo {
Open { lsn_start: Lsn },
Frozen { lsn_start: Lsn, lsn_end: Lsn },
}
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum HistoricLayerInfo {
Delta {
@@ -692,6 +693,32 @@ pub enum HistoricLayerInfo {
},
}
impl HistoricLayerInfo {
pub fn layer_file_name(&self) -> &str {
match self {
HistoricLayerInfo::Delta {
layer_file_name, ..
} => layer_file_name,
HistoricLayerInfo::Image {
layer_file_name, ..
} => layer_file_name,
}
}
pub fn is_remote(&self) -> bool {
match self {
HistoricLayerInfo::Delta { remote, .. } => *remote,
HistoricLayerInfo::Image { remote, .. } => *remote,
}
}
pub fn set_remote(&mut self, value: bool) {
let field = match self {
HistoricLayerInfo::Delta { remote, .. } => remote,
HistoricLayerInfo::Image { remote, .. } => remote,
};
*field = value;
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DownloadRemoteLayersTaskSpawnRequest {
pub max_concurrent_downloads: NonZeroUsize,

View File

@@ -47,9 +47,10 @@ impl<T, const L: usize> ops::Deref for HistoryBufferWithDropCounter<T, L> {
}
}
#[derive(serde::Serialize)]
#[derive(serde::Serialize, serde::Deserialize)]
struct SerdeRepr<T> {
buffer: Vec<T>,
buffer_size: usize,
drop_count: u64,
}
@@ -61,6 +62,7 @@ where
let HistoryBufferWithDropCounter { buffer, drop_count } = value;
SerdeRepr {
buffer: buffer.iter().cloned().collect(),
buffer_size: L,
drop_count: *drop_count,
}
}
@@ -78,19 +80,52 @@ where
}
}
impl<'de, T, const L: usize> serde::de::Deserialize<'de> for HistoryBufferWithDropCounter<T, L>
where
T: Clone + serde::Deserialize<'de>,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let SerdeRepr {
buffer: des_buffer,
drop_count,
buffer_size,
} = SerdeRepr::<T>::deserialize(deserializer)?;
if buffer_size != L {
use serde::de::Error;
return Err(D::Error::custom(format!(
"invalid buffer_size, expecting {L} got {buffer_size}"
)));
}
let mut buffer = HistoryBuffer::new();
buffer.extend(des_buffer);
Ok(HistoryBufferWithDropCounter { buffer, drop_count })
}
}
#[cfg(test)]
mod test {
use super::HistoryBufferWithDropCounter;
#[test]
fn test_basics() {
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
let mut b = HistoryBufferWithDropCounter::<usize, 2>::default();
b.write(1);
b.write(2);
b.write(3);
assert!(b.iter().any(|e| *e == 2));
assert!(b.iter().any(|e| *e == 3));
assert!(!b.iter().any(|e| *e == 1));
// round-trip serde
let round_tripped: HistoryBufferWithDropCounter<usize, 2> =
serde_json::from_str(&serde_json::to_string(&b).unwrap()).unwrap();
assert_eq!(
round_tripped.iter().cloned().collect::<Vec<_>>(),
b.iter().cloned().collect::<Vec<_>>()
);
}
#[test]

View File

@@ -169,7 +169,7 @@ impl Client {
self.request(Method::GET, uri, ()).await
}
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
@@ -181,7 +181,16 @@ impl Client {
} else {
req
};
let res = req.json(&body).send().await.map_err(Error::ReceiveBody)?;
req.json(&body).send().await.map_err(Error::ReceiveBody)
}
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
let res = self.request_noerror(method, uri, body).await?;
let response = res.error_from_body().await?;
Ok(response)
}
@@ -425,4 +434,68 @@ impl Client {
.await
.map_err(Error::ReceiveBody)
}
pub async fn layer_map_info(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<LayerMapInfo> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/layer",
self.mgmt_api_endpoint, tenant_shard_id, timeline_id,
);
self.get(&uri)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn layer_evict(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
layer_file_name: &str,
) -> Result<bool> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/layer/{}",
self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
);
let resp = self.request_noerror(Method::DELETE, &uri, ()).await?;
match resp.status() {
StatusCode::OK => Ok(true),
StatusCode::NOT_MODIFIED => Ok(false),
// TODO: dedupe this pattern / introduce separate error variant?
status => Err(match resp.json::<HttpErrorBody>().await {
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
Err(_) => {
Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
}
}),
}
}
pub async fn layer_ondemand_download(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
layer_file_name: &str,
) -> Result<bool> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/layer/{}",
self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
);
let resp = self.request_noerror(Method::GET, &uri, ()).await?;
match resp.status() {
StatusCode::OK => Ok(true),
StatusCode::NOT_MODIFIED => Ok(false),
// TODO: dedupe this pattern / introduce separate error variant?
status => Err(match resp.json::<HttpErrorBody>().await {
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
Err(_) => {
Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
}
}),
}
}
}

View File

@@ -0,0 +1,272 @@
use pageserver_api::{models::HistoricLayerInfo, shard::TenantShardId};
use pageserver_client::mgmt_api;
use rand::seq::SliceRandom;
use tracing::{debug, info};
use utils::id::{TenantTimelineId, TimelineId};
use tokio::{
sync::{mpsc, OwnedSemaphorePermit},
task::JoinSet,
};
use std::{
num::NonZeroUsize,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
};
/// Evict & on-demand download random layers.
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long)]
runtime: Option<humantime::Duration>,
#[clap(long, default_value = "1")]
tasks_per_target: NonZeroUsize,
#[clap(long, default_value = "1")]
concurrency_per_target: NonZeroUsize,
/// Probability for sending `latest=true` in the request (uniform distribution).
#[clap(long)]
limit_to_first_n_targets: Option<usize>,
/// Before starting the benchmark, live-reconfigure the pageserver to use the given
/// [`pageserver_api::models::virtual_file::IoEngineKind`].
#[clap(long)]
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
targets: Option<Vec<TenantTimelineId>>,
}
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let task = rt.spawn(main_impl(args));
rt.block_on(task).unwrap().unwrap();
Ok(())
}
#[derive(Debug, Default)]
struct LiveStats {
evictions: AtomicU64,
downloads: AtomicU64,
timeline_restarts: AtomicU64,
}
impl LiveStats {
fn eviction_done(&self) {
self.evictions.fetch_add(1, Ordering::Relaxed);
}
fn download_done(&self) {
self.downloads.fetch_add(1, Ordering::Relaxed);
}
fn timeline_restart_done(&self) {
self.timeline_restarts.fetch_add(1, Ordering::Relaxed);
}
}
async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
));
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.put_io_engine(engine_str).await?;
}
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client,
crate::util::cli::targets::Spec {
limit_to_first_n_targets: args.limit_to_first_n_targets,
targets: args.targets.clone(),
},
)
.await?;
let mut tasks = JoinSet::new();
let live_stats = Arc::new(LiveStats::default());
tasks.spawn({
let live_stats = Arc::clone(&live_stats);
async move {
let mut last_at = Instant::now();
loop {
tokio::time::sleep_until((last_at + Duration::from_secs(1)).into()).await;
let now = Instant::now();
let delta: Duration = now - last_at;
last_at = now;
let LiveStats {
evictions,
downloads,
timeline_restarts,
} = &*live_stats;
let evictions = evictions.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64();
let downloads = downloads.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64();
let timeline_restarts = timeline_restarts.swap(0, Ordering::Relaxed);
info!("evictions={evictions:.2}/s downloads={downloads:.2}/s timeline_restarts={timeline_restarts}");
}
}
});
for tl in timelines {
for _ in 0..args.tasks_per_target.get() {
tasks.spawn(timeline_actor(
args,
Arc::clone(&mgmt_api_client),
tl,
Arc::clone(&live_stats),
));
}
}
while let Some(res) = tasks.join_next().await {
res.unwrap();
}
Ok(())
}
async fn timeline_actor(
args: &'static Args,
mgmt_api_client: Arc<pageserver_client::mgmt_api::Client>,
timeline: TenantTimelineId,
live_stats: Arc<LiveStats>,
) {
// TODO: support sharding
let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id);
struct Timeline {
joinset: JoinSet<()>,
layers: Vec<mpsc::Sender<OwnedSemaphorePermit>>,
concurrency: Arc<tokio::sync::Semaphore>,
}
loop {
debug!("restarting timeline");
let layer_map_info = mgmt_api_client
.layer_map_info(tenant_shard_id, timeline.timeline_id)
.await
.unwrap();
let concurrency = Arc::new(tokio::sync::Semaphore::new(
args.concurrency_per_target.get(),
));
let mut joinset = JoinSet::new();
let layers = layer_map_info
.historic_layers
.into_iter()
.map(|historic_layer| {
let (tx, rx) = mpsc::channel(1);
joinset.spawn(layer_actor(
tenant_shard_id,
timeline.timeline_id,
historic_layer,
rx,
Arc::clone(&mgmt_api_client),
Arc::clone(&live_stats),
));
tx
})
.collect::<Vec<_>>();
let mut timeline = Timeline {
joinset,
layers,
concurrency,
};
live_stats.timeline_restart_done();
loop {
assert!(!timeline.joinset.is_empty());
if let Some(res) = timeline.joinset.try_join_next() {
debug!(?res, "a layer actor exited, should not happen");
timeline.joinset.shutdown().await;
break;
}
let mut permit = Some(
Arc::clone(&timeline.concurrency)
.acquire_owned()
.await
.unwrap(),
);
loop {
let layer_tx = {
let mut rng = rand::thread_rng();
timeline.layers.choose_mut(&mut rng).expect("no layers")
};
match layer_tx.try_send(permit.take().unwrap()) {
Ok(_) => break,
Err(e) => match e {
mpsc::error::TrySendError::Full(back) => {
// TODO: retrying introduces bias away from slow downloaders
permit.replace(back);
}
mpsc::error::TrySendError::Closed(_) => panic!(),
},
}
}
}
}
}
async fn layer_actor(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
mut layer: HistoricLayerInfo,
mut rx: mpsc::Receiver<tokio::sync::OwnedSemaphorePermit>,
mgmt_api_client: Arc<mgmt_api::Client>,
live_stats: Arc<LiveStats>,
) {
#[derive(Clone, Copy)]
enum Action {
Evict,
OnDemandDownload,
}
while let Some(_permit) = rx.recv().await {
let action = if layer.is_remote() {
Action::OnDemandDownload
} else {
Action::Evict
};
let did_it = match action {
Action::Evict => {
let did_it = mgmt_api_client
.layer_evict(tenant_shard_id, timeline_id, layer.layer_file_name())
.await
.unwrap();
live_stats.eviction_done();
did_it
}
Action::OnDemandDownload => {
let did_it = mgmt_api_client
.layer_ondemand_download(tenant_shard_id, timeline_id, layer.layer_file_name())
.await
.unwrap();
live_stats.download_done();
did_it
}
};
if !did_it {
debug!("local copy of layer map appears out of sync, re-downloading");
return;
}
debug!("did it");
layer.set_remote(match action {
Action::Evict => true,
Action::OnDemandDownload => false,
});
}
}

View File

@@ -16,6 +16,7 @@ mod util {
mod cmd {
pub(super) mod basebackup;
pub(super) mod getpage_latest_lsn;
pub(super) mod ondemand_download_churn;
pub(super) mod trigger_initial_size_calculation;
}
@@ -25,6 +26,7 @@ enum Args {
Basebackup(cmd::basebackup::Args),
GetPageLatestLsn(cmd::getpage_latest_lsn::Args),
TriggerInitialSizeCalculation(cmd::trigger_initial_size_calculation::Args),
OndemandDownloadChurn(cmd::ondemand_download_churn::Args),
}
fn main() {
@@ -43,6 +45,7 @@ fn main() {
Args::TriggerInitialSizeCalculation(args) => {
cmd::trigger_initial_size_calculation::main(args)
}
Args::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args),
}
.unwrap()
}

View File

@@ -23,7 +23,7 @@ use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::Generation;
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
use crate::TEMP_FILE_SUFFIX;
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath};
use utils::crashsafe::path_with_suffix_extension;
use utils::id::TimelineId;
@@ -73,55 +73,13 @@ pub async fn download_layer_file<'a>(
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
let (mut destination_file, bytes_amount) = download_retry(
|| async {
let destination_file = tokio::fs::File::create(&temp_file_path)
.await
.with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
.map_err(DownloadError::Other)?;
let download = storage.download(&remote_path, cancel).await?;
let mut destination_file =
tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file).await;
match bytes_amount {
Ok(bytes_amount) => {
let destination_file = destination_file.into_inner();
Ok((destination_file, bytes_amount))
}
Err(e) => {
if let Err(e) = tokio::fs::remove_file(&temp_file_path).await {
on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
}
Err(e.into())
}
}
},
let bytes_amount = download_retry(
|| async { download_object(storage, &remote_path, &temp_file_path, cancel).await },
&format!("download {remote_path:?}"),
cancel,
)
.await?;
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations
// that have not yet completed. To ensure that a file is closed immediately when it is dropped,
// you should call flush before dropping it.
//
// From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
// we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
// But for additional safety lets check/wait for any pending operations.
destination_file
.flush()
.await
.with_context(|| format!("flush source file at {temp_file_path}"))
.map_err(DownloadError::Other)?;
let expected = layer_metadata.file_size();
if expected != bytes_amount {
return Err(DownloadError::Other(anyhow!(
@@ -129,14 +87,6 @@ pub async fn download_layer_file<'a>(
)));
}
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.with_context(|| format!("failed to fsync source file at {temp_file_path}"))
.map_err(DownloadError::Other)?;
drop(destination_file);
fail::fail_point!("remote-storage-download-pre-rename", |_| {
Err(DownloadError::Other(anyhow!(
"remote-storage-download-pre-rename failpoint triggered"
@@ -169,6 +119,128 @@ pub async fn download_layer_file<'a>(
Ok(bytes_amount)
}
/// Download the object `src_path` in the remote `storage` to local path `dst_path`.
///
/// If Ok() is returned, the download succeeded and the inode & data have been made durable.
/// (Note that the directory entry for the inode is not made durable.)
/// The file size in bytes is returned.
///
/// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
/// The unlinking has _not_ been made durable.
async fn download_object<'a>(
storage: &'a GenericRemoteStorage,
src_path: &RemotePath,
dst_path: &Utf8PathBuf,
cancel: &CancellationToken,
) -> Result<u64, DownloadError> {
let res = match crate::virtual_file::io_engine::get() {
crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
crate::virtual_file::io_engine::IoEngine::StdFs => {
async {
let destination_file = tokio::fs::File::create(dst_path)
.await
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;
let download = storage.download(src_path, cancel).await?;
let mut buf_writer =
tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
buf_writer.flush().await?;
let mut destination_file = buf_writer.into_inner();
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations
// that have not yet completed. To ensure that a file is closed immediately when it is dropped,
// you should call flush before dropping it.
//
// From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
// we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
// But for additional safety lets check/wait for any pending operations.
destination_file
.flush()
.await
.with_context(|| format!("flush source file at {dst_path}"))
.map_err(DownloadError::Other)?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok(bytes_amount)
}
.await
}
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
async {
let destination_file = VirtualFile::create(dst_path)
.await
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;
let mut download = storage.download(src_path, cancel).await?;
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
let size_tracking = size_tracking_writer::Writer::new(destination_file);
let mut buffered = owned_buffers_io::write::BufferedWriter::<
{ super::BUFFER_SIZE },
_,
>::new(size_tracking);
while let Some(res) =
futures::StreamExt::next(&mut download.download_stream).await
{
let chunk = match res {
Ok(chunk) => chunk,
Err(e) => return Err(e),
};
buffered
.write_buffered(tokio_epoll_uring::BoundedBuf::slice_full(chunk))
.await?;
}
let size_tracking = buffered.flush_and_into_inner().await?;
Ok(size_tracking.into_inner())
}
.await?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok(bytes_amount)
}
.await
}
};
// in case the download failed, clean up
match res {
Ok(bytes_amount) => Ok(bytes_amount),
Err(e) => {
if let Err(e) = tokio::fs::remove_file(dst_path).await {
if e.kind() != std::io::ErrorKind::NotFound {
on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
}
}
Err(e)
}
}
}
const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {

View File

@@ -20,6 +20,7 @@ use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::models::{
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use std::borrow::Cow;
use std::cmp::{Ordering, Reverse};
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
@@ -427,7 +428,7 @@ impl LayerAccessStatFullDetails {
} = self;
pageserver_api::models::LayerAccessStatFullDetails {
when_millis_since_epoch: system_time_to_millis_since_epoch(when),
task_kind: task_kind.into(), // into static str, powered by strum_macros
task_kind: Cow::Borrowed(task_kind.into()), // into static str, powered by strum_macros
access_kind: *access_kind,
}
}
@@ -525,7 +526,7 @@ impl LayerAccessStats {
.collect(),
task_kind_access_flag: task_kind_flag
.iter()
.map(|task_kind| task_kind.into()) // into static str, powered by strum_macros
.map(|task_kind| Cow::Borrowed(task_kind.into())) // into static str, powered by strum_macros
.collect(),
first: first_access.as_ref().map(|a| a.as_api_model()),
accesses_history: last_accesses.map(|m| m.as_api_model()),

View File

@@ -36,6 +36,23 @@ pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
pub(crate) mod owned_buffers_io {
//! Abstractions for IO with owned buffers.
//!
//! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
//! reason we need this abstraction.
//!
//! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
//! but for the time being we're proving out the primitives in the neon.git repo
//! for faster iteration.
pub(crate) mod write;
pub(crate) mod util {
pub(crate) mod size_tracking_writer;
}
}
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,

View File

@@ -282,49 +282,52 @@ impl From<FeatureTestResult> for IoEngineKind {
/// Panics if we can't set up the feature test.
pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
std::thread::spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
#[cfg(not(target_os = "linux"))]
{
return Ok(FeatureTestResult::PlatformPreferred(
FeatureTestResult::PLATFORM_DEFAULT,
));
Ok(FeatureTestResult::PlatformPreferred(
FeatureTestResult::PLATFORM_PREFERRED,
))
}
#[cfg(target_os = "linux")]
Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
Ok(_) => FeatureTestResult::PlatformPreferred({
assert!(matches!(
IoEngineKind::TokioEpollUring,
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
Ok(_) => FeatureTestResult::PlatformPreferred({
assert!(matches!(
IoEngineKind::TokioEpollUring,
FeatureTestResult::PLATFORM_PREFERRED
));
FeatureTestResult::PLATFORM_PREFERRED
));
FeatureTestResult::PLATFORM_PREFERRED
}),
Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
let remark = match e.raw_os_error() {
Some(nix::libc::EPERM) => {
// fall back
"creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
.to_string()
}),
Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
let remark = match e.raw_os_error() {
Some(nix::libc::EPERM) => {
// fall back
"creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
.to_string()
}
Some(nix::libc::EFAULT) => {
// fail feature test
anyhow::bail!(
"creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
);
}
Some(_) | None => {
// fall back
format!("creating tokio-epoll-uring fails with error: {e:#}")
}
};
FeatureTestResult::Worse {
engine: IoEngineKind::StdFs,
remark,
}
Some(nix::libc::EFAULT) => {
// fail feature test
anyhow::bail!(
"creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
);
}
Some(_) | None => {
// fall back
format!("creating tokio-epoll-uring fails with error: {e:#}")
}
};
FeatureTestResult::Worse {
engine: IoEngineKind::StdFs,
remark,
}
}
})
})
}
})
.join()
.unwrap()

View File

@@ -0,0 +1,34 @@
use crate::virtual_file::{owned_buffers_io::write::OwnedAsyncWriter, VirtualFile};
use tokio_epoll_uring::{BoundedBuf, IoBuf};
pub struct Writer {
dst: VirtualFile,
bytes_amount: u64,
}
impl Writer {
pub fn new(dst: VirtualFile) -> Self {
Self {
dst,
bytes_amount: 0,
}
}
/// Returns the wrapped `VirtualFile` object as well as the number
/// of bytes that were written to it through this object.
pub fn into_inner(self) -> (u64, VirtualFile) {
(self.bytes_amount, self.dst)
}
}
impl OwnedAsyncWriter for Writer {
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let (buf, res) = self.dst.write_all(buf).await;
let nwritten = res?;
self.bytes_amount += u64::try_from(nwritten).unwrap();
Ok((nwritten, buf))
}
}

View File

@@ -0,0 +1,206 @@
use bytes::BytesMut;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
/// A trait for doing owned-buffer write IO.
/// Think [`tokio::io::AsyncWrite`] but with owned buffers.
pub trait OwnedAsyncWriter {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)>;
}
/// A wrapper aorund an [`OwnedAsyncWriter`] that batches smaller writers
/// into `BUFFER_SIZE`-sized writes.
///
/// # Passthrough Of Large Writers
///
/// Buffered writes larger than the `BUFFER_SIZE` cause the internal
/// buffer to be flushed, even if it is not full yet. Then, the large
/// buffered write is passed through to the unerlying [`OwnedAsyncWriter`].
///
/// This pass-through is generally beneficial for throughput, but if
/// the storage backend of the [`OwnedAsyncWriter`] is a shared resource,
/// unlimited large writes may cause latency or fairness issues.
///
/// In such cases, a different implementation that always buffers in memory
/// may be preferable.
pub struct BufferedWriter<const BUFFER_SIZE: usize, W> {
writer: W,
// invariant: always remains Some(buf)
// with buf.capacity() == BUFFER_SIZE except
// - while IO is ongoing => goes back to Some() once the IO completed successfully
// - after an IO error => stays `None` forever
// In these exceptional cases, it's `None`.
buf: Option<BytesMut>,
}
impl<const BUFFER_SIZE: usize, W> BufferedWriter<BUFFER_SIZE, W>
where
W: OwnedAsyncWriter,
{
pub fn new(writer: W) -> Self {
Self {
writer,
buf: Some(BytesMut::with_capacity(BUFFER_SIZE)),
}
}
pub async fn flush_and_into_inner(mut self) -> std::io::Result<W> {
self.flush().await?;
let Self { buf, writer } = self;
assert!(buf.is_some());
Ok(writer)
}
pub async fn write_buffered<B: IoBuf>(&mut self, chunk: Slice<B>) -> std::io::Result<()>
where
B: IoBuf + Send,
{
// avoid memcpy for the middle of the chunk
if chunk.len() >= BUFFER_SIZE {
self.flush().await?;
// do a big write, bypassing `buf`
assert_eq!(
self.buf
.as_ref()
.expect("must not use after an error")
.len(),
0
);
let chunk_len = chunk.len();
let (nwritten, chunk) = self.writer.write_all(chunk).await?;
assert_eq!(nwritten, chunk_len);
drop(chunk);
return Ok(());
}
// in-memory copy the < BUFFER_SIZED tail of the chunk
assert!(chunk.len() < BUFFER_SIZE);
let mut chunk = &chunk[..];
while !chunk.is_empty() {
let buf = self.buf.as_mut().expect("must not use after an error");
let need = BUFFER_SIZE - buf.len();
let have = chunk.len();
let n = std::cmp::min(need, have);
buf.extend_from_slice(&chunk[..n]);
chunk = &chunk[n..];
if buf.len() >= BUFFER_SIZE {
assert_eq!(buf.len(), BUFFER_SIZE);
self.flush().await?;
}
}
assert!(chunk.is_empty(), "by now we should have drained the chunk");
Ok(())
}
async fn flush(&mut self) -> std::io::Result<()> {
let buf = self.buf.take().expect("must not use after an error");
if buf.is_empty() {
self.buf = Some(buf);
return std::io::Result::Ok(());
}
let buf_len = buf.len();
let (nwritten, mut buf) = self.writer.write_all(buf).await?;
assert_eq!(nwritten, buf_len);
buf.clear();
self.buf = Some(buf);
Ok(())
}
}
impl OwnedAsyncWriter for Vec<u8> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let nbytes = buf.bytes_init();
if nbytes == 0 {
return Ok((0, Slice::into_inner(buf.slice_full())));
}
let buf = buf.slice(0..nbytes);
self.extend_from_slice(&buf[..]);
Ok((buf.len(), Slice::into_inner(buf)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Default)]
struct RecorderWriter {
writes: Vec<Vec<u8>>,
}
impl OwnedAsyncWriter for RecorderWriter {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let nbytes = buf.bytes_init();
if nbytes == 0 {
self.writes.push(vec![]);
return Ok((0, Slice::into_inner(buf.slice_full())));
}
let buf = buf.slice(0..nbytes);
self.writes.push(Vec::from(&buf[..]));
Ok((buf.len(), Slice::into_inner(buf)))
}
}
macro_rules! write {
($writer:ident, $data:literal) => {{
$writer
.write_buffered(::bytes::Bytes::from_static($data).slice_full())
.await?;
}};
}
#[tokio::test]
async fn test_buffered_writes_only() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
write!(writer, b"a");
write!(writer, b"b");
write!(writer, b"c");
write!(writer, b"d");
write!(writer, b"e");
let recorder = writer.flush_and_into_inner().await?;
assert_eq!(
recorder.writes,
vec![Vec::from(b"ab"), Vec::from(b"cd"), Vec::from(b"e")]
);
Ok(())
}
#[tokio::test]
async fn test_passthrough_writes_only() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
write!(writer, b"abc");
write!(writer, b"de");
write!(writer, b"");
write!(writer, b"fghijk");
let recorder = writer.flush_and_into_inner().await?;
assert_eq!(
recorder.writes,
vec![Vec::from(b"abc"), Vec::from(b"de"), Vec::from(b"fghijk")]
);
Ok(())
}
#[tokio::test]
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
write!(writer, b"a");
write!(writer, b"bc");
write!(writer, b"d");
write!(writer, b"e");
let recorder = writer.flush_and_into_inner().await?;
assert_eq!(
recorder.writes,
vec![Vec::from(b"a"), Vec::from(b"bc"), Vec::from(b"de")]
);
Ok(())
}
}

View File

@@ -40,7 +40,7 @@ To run your local neon.git build on the instance store volume,
run the following commands from the top of the neon.git checkout
# raise file descriptor limit of your shell and its child processes
sudo prlimit -p $$ --nofile=800000:800000
sudo prlimit -p \$\$ --nofile=800000:800000
# test suite run
export TEST_OUTPUT="$TEST_OUTPUT"