tokio-epoll-uring: fallback to std-fs if not available & not explicitly requested (#7120)

fixes https://github.com/neondatabase/neon/issues/7116

Changes:

- refactor PageServerConfigBuilder: support not-set values
- implement runtime feature test
- use runtime feature test to determine `virtual_file_io_engine` if not
explicitly configured in the config
- log the effective engine at startup
- drive-by: improve assertion messages in `test_pageserver_init_node_id`

This needed a tiny bit of tokio-epoll-uring work, hence bumping it.
Changelog:

```
    git log --no-decorate --oneline --reverse 868d2c42b5d54ca82fead6e8f2f233b69a540d3e..342ddd197a060a8354e8f11f4d12994419fff939
    c7a74c6 Bump mio from 0.8.8 to 0.8.11
    4df3466 Bump mio from 0.8.8 to 0.8.11 (#47)
    342ddd1 lifecycle: expose `LaunchResult` enum (#49)
```
This commit is contained in:
Christian Schwarz
2024-03-15 18:46:04 +01:00
committed by GitHub
parent bc1efa827f
commit 60f30000ef
6 changed files with 195 additions and 140 deletions

4
Cargo.lock generated
View File

@@ -5887,7 +5887,7 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#868d2c42b5d54ca82fead6e8f2f233b69a540d3e"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#342ddd197a060a8354e8f11f4d12994419fff939"
dependencies = [
"futures",
"nix 0.26.4",
@@ -6424,7 +6424,7 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#868d2c42b5d54ca82fead6e8f2f233b69a540d3e"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#342ddd197a060a8354e8f11f4d12994419fff939"
dependencies = [
"bytes",
"io-uring",

View File

@@ -120,6 +120,9 @@ fn main() -> anyhow::Result<()> {
&[("node_id", &conf.id.to_string())],
);
// after setting up logging, log the effective IO engine choice
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
let tenants_path = conf.tenants_path();
if !tenants_path.exists() {
utils::crashsafe::create_dir_all(conf.tenants_path())

View File

@@ -30,15 +30,14 @@ use utils::{
logging::LogFormat,
};
use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
use crate::tenant::config::TenantConf;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::timeline::GetVectoredImpl;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
use crate::virtual_file;
use crate::{disk_usage_eviction_task::DiskUsageEvictionTaskConfig, virtual_file::io_engine};
use crate::{tenant::config::TenantConf, virtual_file};
use crate::{
IGNORED_TENANT_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX,
@@ -291,16 +290,23 @@ pub static SAFEKEEPER_AUTH_TOKEN: OnceCell<Arc<String>> = OnceCell::new();
// use dedicated enum for builder to better indicate the intention
// and avoid possible confusion with nested options
#[derive(Clone, Default)]
pub enum BuilderValue<T> {
Set(T),
#[default]
NotSet,
}
impl<T> BuilderValue<T> {
pub fn ok_or<E>(self, err: E) -> Result<T, E> {
impl<T: Clone> BuilderValue<T> {
pub fn ok_or(&self, field_name: &'static str, default: BuilderValue<T>) -> anyhow::Result<T> {
match self {
Self::Set(v) => Ok(v),
Self::NotSet => Err(err),
Self::Set(v) => Ok(v.clone()),
Self::NotSet => match default {
BuilderValue::Set(v) => Ok(v.clone()),
BuilderValue::NotSet => {
anyhow::bail!("missing config value {field_name:?}")
}
},
}
}
}
@@ -326,6 +332,7 @@ pub(crate) struct NodeMetadata {
}
// needed to simplify config construction
#[derive(Default)]
struct PageServerConfigBuilder {
listen_pg_addr: BuilderValue<String>,
@@ -393,8 +400,9 @@ struct PageServerConfigBuilder {
validate_vectored_get: BuilderValue<bool>,
}
impl Default for PageServerConfigBuilder {
fn default() -> Self {
impl PageServerConfigBuilder {
#[inline(always)]
fn default_values() -> Self {
use self::BuilderValue::*;
use defaults::*;
Self {
@@ -647,125 +655,96 @@ impl PageServerConfigBuilder {
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_warmup = self
.concurrent_tenant_warmup
.ok_or(anyhow!("missing concurrent_tenant_warmup"))?;
let concurrent_tenant_size_logical_size_queries = self
.concurrent_tenant_size_logical_size_queries
.ok_or(anyhow!(
"missing concurrent_tenant_size_logical_size_queries"
))?;
Ok(PageServerConf {
listen_pg_addr: self
.listen_pg_addr
.ok_or(anyhow!("missing listen_pg_addr"))?,
listen_http_addr: self
.listen_http_addr
.ok_or(anyhow!("missing listen_http_addr"))?,
availability_zone: self
.availability_zone
.ok_or(anyhow!("missing availability_zone"))?,
wait_lsn_timeout: self
.wait_lsn_timeout
.ok_or(anyhow!("missing wait_lsn_timeout"))?,
wal_redo_timeout: self
.wal_redo_timeout
.ok_or(anyhow!("missing wal_redo_timeout"))?,
superuser: self.superuser.ok_or(anyhow!("missing superuser"))?,
page_cache_size: self
.page_cache_size
.ok_or(anyhow!("missing page_cache_size"))?,
max_file_descriptors: self
.max_file_descriptors
.ok_or(anyhow!("missing max_file_descriptors"))?,
workdir: self.workdir.ok_or(anyhow!("missing workdir"))?,
pg_distrib_dir: self
.pg_distrib_dir
.ok_or(anyhow!("missing pg_distrib_dir"))?,
http_auth_type: self
.http_auth_type
.ok_or(anyhow!("missing http_auth_type"))?,
pg_auth_type: self.pg_auth_type.ok_or(anyhow!("missing pg_auth_type"))?,
auth_validation_public_key_path: self
.auth_validation_public_key_path
.ok_or(anyhow!("missing auth_validation_public_key_path"))?,
remote_storage_config: self
.remote_storage_config
.ok_or(anyhow!("missing remote_storage_config"))?,
id: self.id.ok_or(anyhow!("missing id"))?,
// TenantConf is handled separately
default_tenant_conf: TenantConf::default(),
broker_endpoint: self
.broker_endpoint
.ok_or(anyhow!("No broker endpoints provided"))?,
broker_keepalive_interval: self
.broker_keepalive_interval
.ok_or(anyhow!("No broker keepalive interval provided"))?,
log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
concurrent_tenant_warmup: ConfigurableSemaphore::new(concurrent_tenant_warmup),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new(
concurrent_tenant_size_logical_size_queries,
),
eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::new(
concurrent_tenant_size_logical_size_queries,
),
metric_collection_interval: self
.metric_collection_interval
.ok_or(anyhow!("missing metric_collection_interval"))?,
cached_metric_collection_interval: self
.cached_metric_collection_interval
.ok_or(anyhow!("missing cached_metric_collection_interval"))?,
metric_collection_endpoint: self
.metric_collection_endpoint
.ok_or(anyhow!("missing metric_collection_endpoint"))?,
synthetic_size_calculation_interval: self
.synthetic_size_calculation_interval
.ok_or(anyhow!("missing synthetic_size_calculation_interval"))?,
disk_usage_based_eviction: self
.disk_usage_based_eviction
.ok_or(anyhow!("missing disk_usage_based_eviction"))?,
test_remote_failures: self
.test_remote_failures
.ok_or(anyhow!("missing test_remote_failuers"))?,
ondemand_download_behavior_treat_error_as_warn: self
.ondemand_download_behavior_treat_error_as_warn
.ok_or(anyhow!(
"missing ondemand_download_behavior_treat_error_as_warn"
))?,
background_task_maximum_delay: self
.background_task_maximum_delay
.ok_or(anyhow!("missing background_task_maximum_delay"))?,
control_plane_api: self
.control_plane_api
.ok_or(anyhow!("missing control_plane_api"))?,
control_plane_api_token: self
.control_plane_api_token
.ok_or(anyhow!("missing control_plane_api_token"))?,
control_plane_emergency_mode: self
.control_plane_emergency_mode
.ok_or(anyhow!("missing control_plane_emergency_mode"))?,
heatmap_upload_concurrency: self
.heatmap_upload_concurrency
.ok_or(anyhow!("missing heatmap_upload_concurrency"))?,
secondary_download_concurrency: self
.secondary_download_concurrency
.ok_or(anyhow!("missing secondary_download_concurrency"))?,
ingest_batch_size: self
.ingest_batch_size
.ok_or(anyhow!("missing ingest_batch_size"))?,
virtual_file_io_engine: self
.virtual_file_io_engine
.ok_or(anyhow!("missing virtual_file_io_engine"))?,
get_vectored_impl: self
.get_vectored_impl
.ok_or(anyhow!("missing get_vectored_impl"))?,
max_vectored_read_bytes: self
.max_vectored_read_bytes
.ok_or(anyhow!("missing max_vectored_read_bytes"))?,
validate_vectored_get: self
.validate_vectored_get
.ok_or(anyhow!("missing validate_vectored_get"))?,
})
let default = Self::default_values();
macro_rules! conf {
(USING DEFAULT { $($field:ident,)* } CUSTOM LOGIC { $($custom_field:ident : $custom_value:expr,)* } ) => {
PageServerConf {
$(
$field: self.$field.ok_or(stringify!($field), default.$field)?,
)*
$(
$custom_field: $custom_value,
)*
}
};
}
Ok(conf!(
USING DEFAULT
{
listen_pg_addr,
listen_http_addr,
availability_zone,
wait_lsn_timeout,
wal_redo_timeout,
superuser,
page_cache_size,
max_file_descriptors,
workdir,
pg_distrib_dir,
http_auth_type,
pg_auth_type,
auth_validation_public_key_path,
remote_storage_config,
id,
broker_endpoint,
broker_keepalive_interval,
log_format,
metric_collection_interval,
cached_metric_collection_interval,
metric_collection_endpoint,
synthetic_size_calculation_interval,
disk_usage_based_eviction,
test_remote_failures,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
control_plane_api,
control_plane_api_token,
control_plane_emergency_mode,
heatmap_upload_concurrency,
secondary_download_concurrency,
ingest_batch_size,
get_vectored_impl,
max_vectored_read_bytes,
validate_vectored_get,
}
CUSTOM LOGIC
{
// TenantConf is handled separately
default_tenant_conf: TenantConf::default(),
concurrent_tenant_warmup: ConfigurableSemaphore::new({
self
.concurrent_tenant_warmup
.ok_or("concurrent_tenant_warmpup",
default.concurrent_tenant_warmup)?
}),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new(
self
.concurrent_tenant_size_logical_size_queries
.ok_or("concurrent_tenant_size_logical_size_queries",
default.concurrent_tenant_size_logical_size_queries.clone())?
),
eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::new(
// re-use `concurrent_tenant_size_logical_size_queries`
self
.concurrent_tenant_size_logical_size_queries
.ok_or("eviction_task_immitated_concurrent_logical_size_queries",
default.concurrent_tenant_size_logical_size_queries.clone())?,
),
virtual_file_io_engine: match self.virtual_file_io_engine {
BuilderValue::Set(v) => v,
BuilderValue::NotSet => match crate::virtual_file::io_engine_feature_test().context("auto-detect virtual_file_io_engine")? {
io_engine::FeatureTestResult::PlatformPreferred(v) => v, // make no noise
io_engine::FeatureTestResult::Worse { engine, remark } => {
// TODO: bubble this up to the caller so we can tracing::warn! it.
eprintln!("auto-detected IO engine is not platform-preferred: engine={engine:?} remark={remark:?}");
engine
}
},
},
}
))
}
}

View File

@@ -28,6 +28,8 @@ use tokio::time::Instant;
pub use pageserver_api::models::virtual_file as api;
pub(crate) mod io_engine;
pub use io_engine::feature_test as io_engine_feature_test;
pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
mod metadata;
mod open_options;
pub(crate) use io_engine::IoEngineKind;

View File

@@ -253,3 +253,79 @@ impl IoEngine {
}
}
}
pub enum FeatureTestResult {
PlatformPreferred(IoEngineKind),
Worse {
engine: IoEngineKind,
remark: String,
},
}
impl FeatureTestResult {
#[cfg(target_os = "linux")]
const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::TokioEpollUring;
#[cfg(not(target_os = "linux"))]
const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::StdFs;
}
impl From<FeatureTestResult> for IoEngineKind {
fn from(val: FeatureTestResult) -> Self {
match val {
FeatureTestResult::PlatformPreferred(e) => e,
FeatureTestResult::Worse { engine, .. } => engine,
}
}
}
/// Somewhat costly under the hood, do only once.
/// Panics if we can't set up the feature test.
pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
std::thread::spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
#[cfg(not(target_os = "linux"))]
{
return Ok(FeatureTestResult::PlatformPreferred(
FeatureTestResult::PLATFORM_DEFAULT,
));
}
#[cfg(target_os = "linux")]
Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
Ok(_) => FeatureTestResult::PlatformPreferred({
assert!(matches!(
IoEngineKind::TokioEpollUring,
FeatureTestResult::PLATFORM_PREFERRED
));
FeatureTestResult::PLATFORM_PREFERRED
}),
Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
let remark = match e.raw_os_error() {
Some(nix::libc::EPERM) => {
// fall back
"creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
.to_string()
}
Some(nix::libc::EFAULT) => {
// fail feature test
anyhow::bail!(
"creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
);
}
Some(_) | None => {
// fall back
format!("creating tokio-epoll-uring fails with error: {e:#}")
}
};
FeatureTestResult::Worse {
engine: IoEngineKind::StdFs,
remark,
}
}
})
})
.join()
.unwrap()
}

View File

@@ -37,23 +37,18 @@ def test_pageserver_init_node_id(
assert (
bad_init.returncode == 1
), "pageserver should not be able to init new config without the node id"
assert "missing id" in bad_init.stderr
assert 'missing config value "id"' in bad_init.stderr
assert not pageserver_config.exists(), "config file should not be created after init error"
completed_init = run_pageserver(
["--init", "-c", "id = 12345", "-c", f'pg_distrib_dir="{pg_distrib_dir}"']
)
good_init_cmd = ["--init", "-c", "id = 12345", "-c", f'pg_distrib_dir="{pg_distrib_dir}"']
completed_init = run_pageserver(good_init_cmd)
assert (
completed_init.returncode == 0
), "pageserver should be able to create a new config with the node id given"
assert pageserver_config.exists(), "config file should be created successfully"
bad_reinit = run_pageserver(
["--init", "-c", "id = 12345", "-c", f'pg_distrib_dir="{pg_distrib_dir}"']
)
assert (
bad_reinit.returncode == 1
), "pageserver should not be able to init new config without the node id"
bad_reinit = run_pageserver(good_init_cmd)
assert bad_reinit.returncode == 1, "pageserver refuses to init if already exists"
assert "already exists, cannot init it" in bad_reinit.stderr
bad_update = run_pageserver(["--update-config", "-c", "id = 3"])