Compare commits

..

29 Commits

Author SHA1 Message Date
Yuchen Liang
e565c4fbe9 Merge branch 'yuchen/direct-io-aligned-alloc' into yuchen/direct-io-aligned-alloc-usage-wip 2024-10-07 13:08:55 -04:00
Yuchen Liang
a46757b769 add with_capacity_aligned_zeroed and leak
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-07 13:06:00 -04:00
Yuchen Liang
9c32bfee3b fix put_io_mode to use the correct http endpoint
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 16:47:36 -04:00
Yuchen Liang
69ef8caf58 simplify virtual file wrapper
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 12:31:55 +00:00
Yuchen Liang
b7443dd643 add set_io_mode option to getpage_latest_lsn
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 12:16:42 +00:00
Yuchen Liang
cc433c76a3 fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 12:05:01 +00:00
Yuchen Liang
2034ec906a remove unused imports
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 12:04:11 +00:00
Yuchen Liang
f48ab8bcaa use O_DIRECT as preferred
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 03:57:58 +00:00
Yuchen Liang
2607a57990 pageserver: add direct io config to virtual file
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 03:57:13 +00:00
Yuchen Liang
f04c1c230c incr len in with_capacity_aligned_zeroed add a test
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 23:47:00 +00:00
Yuchen Liang
13f1931a09 fix build
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 21:18:29 +00:00
Yuchen Liang
e98a4eb5e2 add safety comments
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 21:00:05 +00:00
Yuchen Liang
e01d145066 remove example; add with_capacity_aligned_zeroed
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 21:00:05 +00:00
Yuchen Liang
9e9d76d6f2 use IoBufferMut for pagecache
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 21:00:05 +00:00
Yuchen Liang
14ec379d2b enable O_DIRECT for delta and image layers
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 21:00:05 +00:00
Yuchen Liang
ebfe88a463 use IoBufferMut for delta and image layers
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-29 21:00:05 +00:00
Yuchen Liang
eb16aa9e81 Merge branch 'main' into yuchen/direct-io-aligned-alloc 2024-09-29 16:59:27 -04:00
Yuchen Liang
f6d0ed6454 implement reserve for IoBufferMut
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-26 14:12:49 +00:00
Yuchen Liang
a2be8a440b Merge branch 'main' into yuchen/direct-io-aligned-alloc 2024-09-24 21:29:33 -04:00
Yuchen Liang
ff4a1db223 make sure we can Send IoBufferMut
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-03 23:52:34 -04:00
Yuchen Liang
29d54ccd20 Merge branch 'main' into yuchen/direct-io-aligned-alloc 2024-09-03 11:41:49 -04:00
Yuchen Liang
68a1fe20f2 review: use doc comments to reference struct in safety comment
Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-09-03 11:41:02 -04:00
Yuchen Liang
e8408c797a remove unused comments
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-03 11:38:58 -04:00
Yuchen Liang
027f28deb9 remove Vec dependency
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-03 11:38:00 -04:00
Yuchen Liang
ea6f9798c6 add safety comment
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-16 18:06:16 +00:00
Yuchen Liang
253e4d5843 Merge branch 'main' into yuchen/direct-io-aligned-alloc 2024-08-16 13:14:20 -04:00
Yuchen Liang
852099bc83 remove aligned-vec, use ManuallyDrop<Vec<u8>>
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-16 17:13:30 +00:00
Yuchen Liang
148e230d11 add mut version marker trait
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-15 04:03:13 +00:00
Yuchen Liang
6d664788c1 feat(pageserver): newtype aligned-vec as aligned buffer allocation
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-15 03:43:58 +00:00
40 changed files with 935 additions and 312 deletions

View File

@@ -557,7 +557,7 @@ jobs:
ln -s /home/nonroot/pg/usr/lib/postgresql/16/bin/psql /tmp/neon/pg_install/v16/bin/psql
ln -s /home/nonroot/pg/usr/lib/$(uname -m)-linux-gnu /tmp/neon/pg_install/v16/lib
LD_LIBRARY_PATH="/home/nonroot/pg/usr/lib/$(uname -m)-linux-gnu:${LD_LIBRARY_PATH:-}"
LD_LIBRARY_PATH="/home/nonroot/pg/usr/lib/$(uname -m)-linux-gnu:${LD_LIBRARY_PATH}"
export LD_LIBRARY_PATH
echo "LD_LIBRARY_PATH=${LD_LIBRARY_PATH}" >> ${GITHUB_ENV}

View File

@@ -773,7 +773,7 @@ jobs:
matrix:
version: [ v14, v15, v16, v17 ]
env:
VM_BUILDER_VERSION: v0.35.0
VM_BUILDER_VERSION: v0.29.3
steps:
- uses: actions/checkout@v4

View File

@@ -104,7 +104,7 @@ pub struct ConfigToml {
pub image_compression: ImageCompressionAlgorithm,
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: Option<crate::models::L0FlushConfig>,
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
pub io_buffer_alignment: usize,
}
@@ -381,7 +381,7 @@ impl Default for ConfigToml {
image_compression: (DEFAULT_IMAGE_COMPRESSION),
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
virtual_file_io_mode: None,
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,

View File

@@ -972,8 +972,6 @@ pub struct TopTenantShardsResponse {
}
pub mod virtual_file {
use std::path::PathBuf;
#[derive(
Copy,
Clone,
@@ -994,50 +992,49 @@ pub mod virtual_file {
}
/// Direct IO modes for a pageserver.
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum DirectIoMode {
/// Direct IO disabled (uses usual buffered IO).
#[default]
Disabled,
/// Direct IO disabled (performs checks and perf simulations).
Evaluate {
/// Alignment check level
alignment_check: DirectIoAlignmentCheckLevel,
/// Latency padded for performance simulation.
latency_padding: DirectIoLatencyPadding,
},
/// Direct IO enabled.
Enabled {
/// Actions to perform on alignment error.
on_alignment_error: DirectIoOnAlignmentErrorAction,
},
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
#[repr(u8)]
pub enum IoMode {
/// Uses buffered IO.
Buffered,
/// Uses direct IO, error out if the operation fails.
#[cfg(target_os = "linux")]
Direct,
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoAlignmentCheckLevel {
#[default]
Error,
Log,
None,
impl IoMode {
pub const fn preferred() -> Self {
if cfg!(target_os = "linux") {
Self::Direct
} else {
Self::Buffered
}
}
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoOnAlignmentErrorAction {
Error,
#[default]
FallbackToBuffered,
}
impl TryFrom<u8> for IoMode {
type Error = u8;
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum DirectIoLatencyPadding {
/// Pad virtual file operations with IO to a fake file.
FakeFileRW { path: PathBuf },
#[default]
None,
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
#[cfg(target_os = "linux")]
v if v == (IoMode::Direct as u8) => IoMode::Direct,
x => return Err(x),
})
}
}
}

View File

@@ -164,12 +164,10 @@ fn criterion_benchmark(c: &mut Criterion) {
let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
page_cache::init(conf.page_cache_size);
let align = pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
virtual_file::init(16384, virtual_file::io_engine_for_bench(), align);
page_cache::init(conf.page_cache_size, align);
{
let mut group = c.benchmark_group("ingest-small-values");

View File

@@ -550,6 +550,19 @@ impl Client {
.map_err(Error::ReceiveBody)
}
/// Configs io mode at runtime.
pub async fn put_io_mode(
&self,
mode: &pageserver_api::models::virtual_file::IoMode,
) -> Result<()> {
let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, mode)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
self.get(uri)

View File

@@ -151,13 +151,10 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let align = pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::page_cache::init(100);
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, align);
pageserver::page_cache::init(100, align);
let mut total_delta_layers = 0usize;
let mut total_image_layers = 0usize;

View File

@@ -59,8 +59,9 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, 1);
page_cache::init(100);
let align = pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, align);
page_cache::init(100, align);
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
@@ -190,12 +191,10 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::page_cache::init(100);
let align = pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, align);
pageserver::page_cache::init(100, align);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

View File

@@ -205,12 +205,9 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
DEFAULT_IO_BUFFER_ALIGNMENT,
);
page_cache::init(100);
let align = DEFAULT_IO_BUFFER_ALIGNMENT;
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, align);
page_cache::init(100, align);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await
}

View File

@@ -63,6 +63,10 @@ pub(crate) struct Args {
#[clap(long)]
set_io_alignment: Option<usize>,
/// Before starting the benchmark, live-reconfigure the pageserver to use specified io mode (buffered vs. direct).
#[clap(long)]
set_io_mode: Option<pageserver_api::models::virtual_file::IoMode>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -133,6 +137,10 @@ async fn main_impl(
mgmt_api_client.put_io_alignment(align).await?;
}
if let Some(mode) = &args.set_io_mode {
mgmt_api_client.put_io_mode(mode).await?;
}
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client,

View File

@@ -125,7 +125,7 @@ fn main() -> anyhow::Result<()> {
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
info!(?conf.virtual_file_io_mode, "starting with virtual_file Direct IO settings");
info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment");
// The tenants directory contains all the pageserver local disk state.
@@ -173,7 +173,7 @@ fn main() -> anyhow::Result<()> {
conf.virtual_file_io_engine,
conf.io_buffer_alignment,
);
page_cache::init(conf.page_cache_size);
page_cache::init(conf.page_cache_size, conf.io_buffer_alignment);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;

View File

@@ -174,7 +174,7 @@ pub struct PageServerConf {
pub l0_flush: crate::l0_flush::L0FlushConfig,
/// Direct IO settings
pub virtual_file_direct_io: virtual_file::DirectIoMode,
pub virtual_file_io_mode: virtual_file::IoMode,
pub io_buffer_alignment: usize,
}
@@ -325,7 +325,7 @@ impl PageServerConf {
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
virtual_file_direct_io,
virtual_file_io_mode,
concurrent_tenant_warmup,
concurrent_tenant_size_logical_size_queries,
virtual_file_io_engine,
@@ -368,7 +368,6 @@ impl PageServerConf {
max_vectored_read_bytes,
image_compression,
ephemeral_bytes_per_memory_kb,
virtual_file_direct_io,
io_buffer_alignment,
// ------------------------------------------------------------
@@ -408,6 +407,7 @@ impl PageServerConf {
l0_flush: l0_flush
.map(crate::l0_flush::L0FlushConfig::from)
.unwrap_or_default(),
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
};
// ------------------------------------------------------------

View File

@@ -17,6 +17,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use pageserver_api::models::IngestAuxFilesRequest;
@@ -56,7 +57,6 @@ use utils::http::endpoint::request_span;
use utils::http::request::must_parse_query_param;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::pgdatadir_mapping::LsnForTimestamp;
@@ -81,6 +81,7 @@ use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
@@ -1719,13 +1720,8 @@ async fn timeline_gc_handler(
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let state = get_state(&request);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let gc_result = state
.tenant_manager
.immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx)
.await?;
let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
json_response(StatusCode::OK, gc_result)
}
@@ -2386,6 +2382,16 @@ async fn put_io_alignment_handler(
json_response(StatusCode::OK, ())
}
async fn put_io_mode_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let mode: IoMode = json_request(&mut r).await?;
crate::virtual_file::set_io_mode(mode);
json_response(StatusCode::OK, ())
}
/// Polled by control plane.
///
/// See [`crate::utilization`].
@@ -3076,6 +3082,7 @@ pub fn make_router(
.put("/v1/io_alignment", |r| {
api_handler(r, put_io_alignment_handler)
})
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|r| api_handler(r, force_aux_policy_switch_handler),

View File

@@ -82,6 +82,7 @@ use once_cell::sync::OnceCell;
use crate::{
context::RequestContext,
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
virtual_file::{self, dio::IoBufferMut},
};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
@@ -90,8 +91,8 @@ const TEST_PAGE_CACHE_SIZE: usize = 50;
///
/// Initialize the page cache. This must be called once at page server startup.
///
pub fn init(size: usize) {
if PAGE_CACHE.set(PageCache::new(size)).is_err() {
pub fn init(size: usize, align: usize) {
if PAGE_CACHE.set(PageCache::new(size, align)).is_err() {
panic!("page cache already initialized");
}
}
@@ -106,7 +107,12 @@ pub fn get() -> &'static PageCache {
// page cache is usable in unit tests.
//
if cfg!(test) {
PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
PAGE_CACHE.get_or_init(|| {
PageCache::new(
TEST_PAGE_CACHE_SIZE,
virtual_file::get_io_buffer_alignment(),
)
})
} else {
PAGE_CACHE.get().expect("page cache not initialized")
}
@@ -637,13 +643,11 @@ impl PageCache {
/// Initialize a new page cache
///
/// This should be called only once at page server startup.
fn new(num_pages: usize) -> Self {
fn new(num_pages: usize, align: usize) -> Self {
assert!(num_pages > 0, "page cache size must be > 0");
// We could use Vec::leak here, but that potentially also leaks
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
// this is avoided.
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
let page_buffer =
IoBufferMut::with_capacity_aligned_zeroed(num_pages * PAGE_SZ, align).leak();
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);

View File

@@ -84,7 +84,7 @@ impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = &self.buffered_writer.as_inner().as_inner().path;
let path = self.buffered_writer.as_inner().as_inner().path();
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
@@ -356,7 +356,7 @@ mod tests {
}
let file_contents =
std::fs::read(&file.buffered_writer.as_inner().as_inner().path).unwrap();
std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap();
assert_eq!(file_contents, &content[0..cap]);
let buffer_contents = file.buffered_writer.inspect_buffer();
@@ -392,7 +392,7 @@ mod tests {
.buffered_writer
.as_inner()
.as_inner()
.path
.path()
.metadata()
.unwrap();
assert_eq!(

View File

@@ -2197,82 +2197,6 @@ impl TenantManager {
Ok((wanted_bytes, shard_count as u32))
}
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
pub(crate) async fn immediate_gc(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<GcResult, ApiError> {
let tenant = {
let guard = self.tenants.read().unwrap();
guard
.get(&tenant_shard_id)
.cloned()
.with_context(|| format!("tenant {tenant_shard_id}"))
.map_err(|e| ApiError::NotFound(e.into()))?
};
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
// Run in task_mgr to avoid race with tenant_detach operation
let ctx: RequestContext =
ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
fail::fail_point!("immediate_gc_task_pre");
#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
#[cfg(feature = "testing")]
{
// we need to synchronize with drop completion for python tests without polling for
// log messages
if let Ok(result) = result.as_mut() {
let mut js = tokio::task::JoinSet::new();
for layer in std::mem::take(&mut result.doomed_layers) {
js.spawn(layer.wait_drop());
}
tracing::info!(
total = js.len(),
"starting to wait for the gc'd layers to be dropped"
);
while let Some(res) = js.join_next().await {
res.expect("wait_drop should not panic");
}
}
let timeline = tenant.get_timeline(timeline_id, false).ok();
let rtc = timeline.as_ref().map(|x| &x.remote_client);
if let Some(rtc) = rtc {
// layer drops schedule actions on remote timeline client to actually do the
// deletions; don't care about the shutdown error, just exit fast
drop(rtc.wait_completion().await);
}
}
result.map_err(|e| match e {
GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
GcError::TimelineNotFound => {
ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
}
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
})
}
}
#[derive(Debug, thiserror::Error)]
@@ -2417,7 +2341,7 @@ enum TenantSlotDropError {
/// Errors that can happen any time we are walking the tenant map to try and acquire
/// the TenantSlot for a particular tenant.
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantMapError {
pub enum TenantMapError {
// Tried to read while initializing
#[error("tenant map is still initializing")]
StillInitializing,
@@ -2447,7 +2371,7 @@ pub(crate) enum TenantMapError {
/// The `old_value` may be dropped before the SlotGuard is dropped, by calling
/// `drop_old_value`. It is an error to call this without shutting down
/// the conents of `old_value`.
pub(crate) struct SlotGuard {
pub struct SlotGuard {
tenant_shard_id: TenantShardId,
old_value: Option<TenantSlot>,
upserted: bool,
@@ -2840,6 +2764,81 @@ use {
utils::http::error::ApiError,
};
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
pub(crate) async fn immediate_gc(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<GcResult, ApiError> {
let tenant = {
let guard = TENANTS.read().unwrap();
guard
.get(&tenant_shard_id)
.cloned()
.with_context(|| format!("tenant {tenant_shard_id}"))
.map_err(|e| ApiError::NotFound(e.into()))?
};
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
// Run in task_mgr to avoid race with tenant_detach operation
let ctx: RequestContext =
ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
fail::fail_point!("immediate_gc_task_pre");
#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
#[cfg(feature = "testing")]
{
// we need to synchronize with drop completion for python tests without polling for
// log messages
if let Ok(result) = result.as_mut() {
let mut js = tokio::task::JoinSet::new();
for layer in std::mem::take(&mut result.doomed_layers) {
js.spawn(layer.wait_drop());
}
tracing::info!(
total = js.len(),
"starting to wait for the gc'd layers to be dropped"
);
while let Some(res) = js.join_next().await {
res.expect("wait_drop should not panic");
}
}
let timeline = tenant.get_timeline(timeline_id, false).ok();
let rtc = timeline.as_ref().map(|x| &x.remote_client);
if let Some(rtc) = rtc {
// layer drops schedule actions on remote timeline client to actually do the
// deletions; don't care about the shutdown error, just exit fast
drop(rtc.wait_completion().await);
}
}
result.map_err(|e| match e {
GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
GcError::TimelineNotFound => {
ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
}
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
})
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

View File

@@ -43,12 +43,12 @@ use crate::tenant::vectored_blob_io::{
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::dio::IoBufferMut;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
@@ -572,7 +572,7 @@ impl DeltaLayerWriterInner {
ensure!(
metadata.len() <= S3_UPLOAD_LIMIT,
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path,
file.path(),
metadata.len()
);
@@ -790,7 +790,7 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
let file = VirtualFile::open_v2(path, ctx)
.await
.context("open layer file")?;
@@ -991,7 +991,8 @@ impl DeltaLayerInner {
.0
.into();
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
let mut buf = Some(BytesMut::with_capacity(buf_size));
let align = virtual_file::get_io_buffer_alignment();
let mut buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align));
// Note that reads are processed in reverse order (from highest key+lsn).
// This is the order that `ReconstructState` requires such that it can
@@ -1010,7 +1011,7 @@ impl DeltaLayerInner {
blob_meta.key,
PageReconstructError::Other(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
self.file.path(),
kind
)),
);
@@ -1018,7 +1019,7 @@ impl DeltaLayerInner {
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(buf_size));
buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align));
continue;
}
@@ -1036,7 +1037,7 @@ impl DeltaLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
self.file.path(),
))),
);
@@ -1054,7 +1055,7 @@ impl DeltaLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.file.path,
self.file.path(),
))),
);
@@ -1186,14 +1187,14 @@ impl DeltaLayerInner {
let mut prev: Option<(Key, Lsn, BlobRef)> = None;
let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
let align = virtual_file::get_io_buffer_alignment();
let max_read_size = self
.max_vectored_read_bytes
.map(|x| x.0.get())
.unwrap_or(8192);
let mut buffer = Some(BytesMut::with_capacity(max_read_size));
let align = virtual_file::get_io_buffer_alignment();
let mut buffer = Some(IoBufferMut::with_capacity_aligned(max_read_size, align));
// FIXME: buffering of DeltaLayerWriter
let mut per_blob_copy = Vec::new();
@@ -1552,12 +1553,12 @@ impl<'a> DeltaLayerIterator<'a> {
let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let buf = BytesMut::with_capacity(buf_size);
let align = virtual_file::get_io_buffer_alignment();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let blob_read = meta.read(&view).await?;
let value = Value::des(&blob_read)?;
@@ -1932,7 +1933,9 @@ pub(crate) mod test {
&vectored_reads,
constants::MAX_VECTORED_READ_BYTES,
);
let mut buf = Some(BytesMut::with_capacity(buf_size));
let align = virtual_file::get_io_buffer_alignment();
let mut buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align));
for read in vectored_reads {
let blobs_buf = vectored_blob_reader

View File

@@ -40,11 +40,12 @@ use crate::tenant::vectored_blob_io::{
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::dio::IoBufferMut;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
@@ -388,7 +389,7 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
let file = VirtualFile::open_v2(path, ctx)
.await
.context("open layer file")?;
let file_id = page_cache::next_file_id();
@@ -542,14 +543,15 @@ impl ImageLayerInner {
.await?;
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let align = virtual_file::get_io_buffer_alignment();
let mut key_count = 0;
for read in plan.into_iter() {
let buf_size = read.size();
let buf = BytesMut::with_capacity(buf_size);
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
@@ -597,13 +599,13 @@ impl ImageLayerInner {
);
}
let buf = BytesMut::with_capacity(buf_size);
let align = virtual_file::get_io_buffer_alignment();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
match res {
Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await;
@@ -614,7 +616,7 @@ impl ImageLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
self.file.path(),
))),
);
@@ -635,7 +637,7 @@ impl ImageLayerInner {
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
self.file.path(),
kind
)),
);
@@ -1039,12 +1041,12 @@ impl<'a> ImageLayerIterator<'a> {
let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let buf = BytesMut::with_capacity(buf_size);
let align = virtual_file::get_io_buffer_alignment();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
next_batch.push_back((

View File

@@ -18,7 +18,7 @@
use std::collections::BTreeMap;
use std::ops::Deref;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use pageserver_api::key::Key;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::BoundedBuf;
@@ -27,6 +27,7 @@ use utils::vec_map::VecMap;
use crate::context::RequestContext;
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
use crate::virtual_file::dio::IoBufferMut;
use crate::virtual_file::{self, VirtualFile};
/// Metadata bundled with the start and end offset of a blob.
@@ -158,7 +159,7 @@ impl std::fmt::Display for VectoredBlob {
/// Return type of [`VectoredBlobReader::read_blobs`]
pub struct VectoredBlobsBuf {
/// Buffer for all blobs in this read
pub buf: BytesMut,
pub buf: IoBufferMut,
/// Offsets into the buffer and metadata for all blobs in this read
pub blobs: Vec<VectoredBlob>,
}
@@ -460,7 +461,7 @@ impl<'a> VectoredBlobReader<'a> {
pub async fn read_blobs(
&self,
read: &VectoredRead,
buf: BytesMut,
buf: IoBufferMut,
ctx: &RequestContext,
) -> Result<VectoredBlobsBuf, std::io::Error> {
assert!(read.size() > 0);
@@ -945,7 +946,8 @@ mod tests {
// Multiply by two (compressed data might need more space), and add a few bytes for the header
let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
let mut buf = BytesMut::with_capacity(reserved_bytes);
let align = virtual_file::get_io_buffer_alignment();
let mut buf = IoBufferMut::with_capacity_aligned(reserved_bytes, align);
let align = virtual_file::get_io_buffer_alignment();
let vectored_blob_reader = VectoredBlobReader::new(&file);

View File

@@ -23,10 +23,12 @@ use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver_api::shard::TenantShardId;
use std::fs::File;
use std::io::{Error, ErrorKind, Seek, SeekFrom};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
@@ -38,10 +40,11 @@ pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
mod metadata;
mod open_options;
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) use api::DirectIoMode;
pub(crate) use api::IoMode;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
pub(crate) mod dio;
pub(crate) mod owned_buffers_io {
//! Abstractions for IO with owned buffers.
@@ -53,6 +56,7 @@ pub(crate) mod owned_buffers_io {
//! but for the time being we're proving out the primitives in the neon.git repo
//! for faster iteration.
pub(crate) mod io_buf_aligned;
pub(crate) mod io_buf_ext;
pub(crate) mod slice;
pub(crate) mod write;
@@ -61,6 +65,176 @@ pub(crate) mod owned_buffers_io {
}
}
#[derive(Debug)]
pub enum VirtualFile {
Buffered(VirtualFileInner),
Direct(VirtualFileInner),
}
impl VirtualFile {
fn inner(&self) -> &VirtualFileInner {
match self {
Self::Buffered(file) => file,
Self::Direct(file) => file,
}
}
fn inner_mut(&mut self) -> &mut VirtualFileInner {
match self {
Self::Buffered(file) => file,
Self::Direct(file) => file,
}
}
fn into_inner(self) -> VirtualFileInner {
match self {
Self::Buffered(file) => file,
Self::Direct(file) => file,
}
}
/// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let file = VirtualFileInner::open(path, ctx).await?;
Ok(Self::Buffered(file))
}
/// Open a file in read-only mode. Like File::open.
///
/// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
pub async fn open_v2<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let file = VirtualFileInner::create(path, ctx).await?;
Ok(Self::Buffered(file))
}
pub async fn create_v2<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
VirtualFile::open_with_options_v2(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
ctx,
)
.await
}
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<Self, std::io::Error> {
let file = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(Self::Buffered(file))
}
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
path: P,
open_options: &mut OpenOptions, // Uses `&mut` here to add `O_DIRECT`.
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<Self, std::io::Error> {
let file = match get_io_mode() {
IoMode::Buffered => {
let file = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Self::Buffered(file)
}
#[cfg(target_os = "linux")]
IoMode::Direct => {
let file = VirtualFileInner::open_with_options(
path,
open_options.custom_flags(nix::libc::O_DIRECT),
ctx,
)
.await?;
Self::Direct(file)
}
};
Ok(file)
}
pub fn path(&self) -> &Utf8Path {
self.inner().path.as_path()
}
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
final_path: Utf8PathBuf,
tmp_path: Utf8PathBuf,
content: B,
) -> std::io::Result<()> {
VirtualFileInner::crashsafe_overwrite(final_path, tmp_path, content).await
}
pub async fn sync_all(&self) -> Result<(), Error> {
self.inner().sync_all().await
}
pub async fn sync_data(&self) -> Result<(), Error> {
self.inner().sync_data().await
}
pub async fn metadata(&self) -> Result<Metadata, Error> {
self.inner().metadata().await
}
pub fn remove(self) {
self.into_inner().remove();
}
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
self.inner_mut().seek(pos).await
}
pub async fn read_exact_at<Buf>(
&self,
slice: Slice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> Result<Slice<Buf>, Error>
where
Buf: IoBufMut + Send,
{
self.inner().read_exact_at(slice, offset, ctx).await
}
pub async fn read_exact_at_page(
&self,
page: PageWriteGuard<'static>,
offset: u64,
ctx: &RequestContext,
) -> Result<PageWriteGuard<'static>, Error> {
self.inner().read_exact_at_page(page, offset, ctx).await
}
pub async fn write_all_at<Buf: IoBuf + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), Error>) {
self.inner().write_all_at(buf, offset, ctx).await
}
pub async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<usize, Error>) {
self.inner_mut().write_all(buf, ctx).await
}
}
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,
@@ -77,7 +251,7 @@ pub(crate) mod owned_buffers_io {
/// 'tag' field is used to detect whether the handle still is valid or not.
///
#[derive(Debug)]
pub struct VirtualFile {
pub struct VirtualFileInner {
/// Lazy handle to the global file descriptor cache. The slot that this points to
/// might contain our File, or it may be empty, or it may contain a File that
/// belongs to a different VirtualFile.
@@ -350,12 +524,12 @@ macro_rules! with_file {
}};
}
impl VirtualFile {
impl VirtualFileInner {
/// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
@@ -364,7 +538,7 @@ impl VirtualFile {
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
@@ -382,7 +556,7 @@ impl VirtualFile {
path: P,
open_options: &OpenOptions,
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFileInner, std::io::Error> {
let path_ref = path.as_ref();
let path_str = path_ref.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
@@ -413,7 +587,7 @@ impl VirtualFile {
open_options.open(path_ref.as_std_path()).await?
});
// Strip all options other than read and write.
// Strip all options other than read and write (O_DIRECT).
//
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
@@ -423,7 +597,7 @@ impl VirtualFile {
reopen_options.create_new(false);
reopen_options.truncate(false);
let vfile = VirtualFile {
let vfile = VirtualFileInner {
handle: RwLock::new(handle),
pos: 0,
path: path_ref.to_path_buf(),
@@ -1034,6 +1208,21 @@ impl tokio_epoll_uring::IoFd for FileGuard {
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
self.inner().read_blk(blknum, ctx).await
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
self.inner_mut().read_to_end(buf, ctx).await
}
}
#[cfg(test)]
impl VirtualFileInner {
pub(crate) async fn read_blk(
&self,
blknum: u32,
@@ -1067,7 +1256,7 @@ impl VirtualFile {
}
}
impl Drop for VirtualFile {
impl Drop for VirtualFileInner {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut();
@@ -1216,6 +1405,15 @@ pub(crate) fn get_io_buffer_alignment() -> usize {
}
}
static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8);
pub(crate) fn set_io_mode(mode: IoMode) {
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
}
pub(crate) fn get_io_mode() -> IoMode {
IoMode::try_from(IO_MODE.load(Ordering::Relaxed)).unwrap()
}
#[cfg(test)]
mod tests {
use crate::context::DownloadBehavior;
@@ -1524,7 +1722,7 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(
let f = VirtualFileInner::open_with_options(
&test_file_path,
OpenOptions::new().read(true),
&ctx,
@@ -1576,7 +1774,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
@@ -1585,7 +1783,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
@@ -1608,7 +1806,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();

View File

@@ -0,0 +1,410 @@
#![allow(unused)]
use core::slice;
use std::{
alloc::{self, Layout},
cmp,
mem::{ManuallyDrop, MaybeUninit},
ops::{Deref, DerefMut},
ptr::{addr_of_mut, NonNull},
};
use bytes::buf::UninitSlice;
struct IoBufferPtr(*mut u8);
// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer.
unsafe impl Send for IoBufferPtr {}
/// An aligned buffer type used for I/O.
pub struct IoBufferMut {
ptr: IoBufferPtr,
capacity: usize,
len: usize,
align: usize,
}
impl IoBufferMut {
/// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
///
/// The buffer will be able to hold at most `capacity` elements and will never resize.
///
///
/// # Panics
///
/// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
/// * `align` must not be zero,
///
/// * `align` must be a power of two,
///
/// * `capacity`, when rounded up to the nearest multiple of `align`,
/// must not overflow isize (i.e., the rounded value must be
/// less than or equal to `isize::MAX`).
pub fn with_capacity_aligned(capacity: usize, align: usize) -> Self {
let layout = Layout::from_size_align(capacity, align).expect("Invalid layout");
// SAFETY: Making an allocation with a sized and aligned layout. The memory is manually freed with the same layout.
let ptr = unsafe {
let ptr = alloc::alloc(layout);
if ptr.is_null() {
alloc::handle_alloc_error(layout);
}
IoBufferPtr(ptr)
};
IoBufferMut {
ptr,
capacity,
len: 0,
align,
}
}
/// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
pub fn with_capacity_aligned_zeroed(capacity: usize, align: usize) -> Self {
use bytes::BufMut;
let mut buf = Self::with_capacity_aligned(capacity, align);
buf.put_bytes(0, capacity);
buf.len = capacity;
buf
}
/// Returns the total number of bytes the buffer can hold.
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
/// Returns the alignment of the buffer.
#[inline]
pub fn align(&self) -> usize {
self.align
}
/// Returns the number of bytes in the buffer, also referred to as its 'length'.
#[inline]
pub fn len(&self) -> usize {
self.len
}
/// Force the length of the buffer to `new_len`.
#[inline]
unsafe fn set_len(&mut self, new_len: usize) {
debug_assert!(new_len <= self.capacity());
self.len = new_len;
}
#[inline]
fn as_ptr(&self) -> *const u8 {
self.ptr.0
}
#[inline]
fn as_mut_ptr(&mut self) -> *mut u8 {
self.ptr.0
}
/// Extracts a slice containing the entire buffer.
///
/// Equivalent to `&s[..]`.
#[inline]
fn as_slice(&self) -> &[u8] {
// SAFETY: The pointer is valid and `len` bytes are initialized.
unsafe { slice::from_raw_parts(self.as_ptr(), self.len) }
}
/// Extracts a mutable slice of the entire buffer.
///
/// Equivalent to `&mut s[..]`.
fn as_mut_slice(&mut self) -> &mut [u8] {
// SAFETY: The pointer is valid and `len` bytes are initialized.
unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
}
/// Drops the all the contents of the buffer, setting its length to `0`.
#[inline]
pub fn clear(&mut self) {
self.len = 0;
}
/// Reserves capacity for at least `additional` more bytes to be inserted
/// in the given `IoBufferMut`. The collection may reserve more space to
/// speculatively avoid frequent reallocations. After calling `reserve`,
/// capacity will be greater than or equal to `self.len() + additional`.
/// Does nothing if capacity is already sufficient.
///
/// # Panics
///
/// Panics if the new capacity exceeds `isize::MAX` _bytes_.
pub fn reserve(&mut self, additional: usize) {
if additional > self.capacity() - self.len() {
self.reserve_inner(additional);
}
}
fn reserve_inner(&mut self, additional: usize) {
let Some(required_cap) = self.len().checked_add(additional) else {
capacity_overflow()
};
let old_capacity = self.capacity();
let align = self.align();
// This guarantees exponential growth. The doubling cannot overflow
// because `cap <= isize::MAX` and the type of `cap` is `usize`.
let cap = cmp::max(old_capacity * 2, required_cap);
if !is_valid_alloc(cap) {
capacity_overflow()
}
let new_layout = Layout::from_size_align(cap, self.align()).expect("Invalid layout");
let old_ptr = self.as_mut_ptr();
// SAFETY: old allocation was allocated with std::alloc::alloc with the same layout,
// and we panics on null pointer.
let (ptr, cap) = unsafe {
let old_layout = Layout::from_size_align_unchecked(old_capacity, align);
let ptr = alloc::realloc(old_ptr, old_layout, new_layout.size());
if ptr.is_null() {
alloc::handle_alloc_error(new_layout);
}
(IoBufferPtr(ptr), cap)
};
self.ptr = ptr;
self.capacity = cap;
}
/// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
pub fn leak<'a>(self) -> &'a mut [u8] {
let mut buf = ManuallyDrop::new(self);
// SAFETY: leaking the buffer as intended.
unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len) }
}
}
fn capacity_overflow() -> ! {
panic!("capacity overflow")
}
// We need to guarantee the following:
// * We don't ever allocate `> isize::MAX` byte-size objects.
// * We don't overflow `usize::MAX` and actually allocate too little.
//
// On 64-bit we just need to check for overflow since trying to allocate
// `> isize::MAX` bytes will surely fail. On 32-bit and 16-bit we need to add
// an extra guard for this in case we're running on a platform which can use
// all 4GB in user-space, e.g., PAE or x32.
#[inline]
fn is_valid_alloc(alloc_size: usize) -> bool {
!(usize::BITS < 64 && alloc_size > isize::MAX as usize)
}
impl Drop for IoBufferMut {
fn drop(&mut self) {
// SAFETY: memory was allocated with std::alloc::alloc with the same layout.
unsafe {
alloc::dealloc(
self.as_mut_ptr(),
Layout::from_size_align_unchecked(self.capacity, self.align),
)
}
}
}
impl Deref for IoBufferMut {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
impl DerefMut for IoBufferMut {
fn deref_mut(&mut self) -> &mut Self::Target {
self.as_mut_slice()
}
}
/// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
unsafe impl bytes::BufMut for IoBufferMut {
#[inline]
fn remaining_mut(&self) -> usize {
// Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
// Thus, it can have at most `self.capacity` bytes.
self.capacity() - self.len()
}
// SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
#[inline]
unsafe fn advance_mut(&mut self, cnt: usize) {
let len: usize = self.len();
let remaining = self.remaining_mut();
if remaining < cnt {
panic_advance(cnt, remaining);
}
// Addition will not overflow since the sum is at most the capacity.
self.set_len(len + cnt);
}
#[inline]
fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
let cap = self.capacity();
let len = self.len();
// SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
// valid for `cap - len` bytes. The subtraction will not underflow since
// `len <= cap`.
unsafe { UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len) }
}
}
/// Panic with a nice error message.
#[cold]
fn panic_advance(idx: usize, len: usize) -> ! {
panic!(
"advance out of bounds: the len is {} but advancing by {}",
len, idx
);
}
/// Safety: [`IoBufferMut`] has exclusive ownership of the io buffer,
/// and the location remains stable even if [`Self`] is moved.
unsafe impl tokio_epoll_uring::IoBuf for IoBufferMut {
fn stable_ptr(&self) -> *const u8 {
self.as_ptr()
}
fn bytes_init(&self) -> usize {
self.len()
}
fn bytes_total(&self) -> usize {
self.capacity()
}
}
// SAFETY: See above.
unsafe impl tokio_epoll_uring::IoBufMut for IoBufferMut {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.as_mut_ptr()
}
unsafe fn set_init(&mut self, init_len: usize) {
if self.len() < init_len {
self.set_len(init_len);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_with_capacity_aligned() {
const ALIGN: usize = 4 * 1024;
let v = IoBufferMut::with_capacity_aligned(ALIGN * 4, ALIGN);
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
let v = IoBufferMut::with_capacity_aligned(ALIGN / 2, ALIGN);
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN / 2);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
#[test]
fn test_with_capacity_aligned_zeroed() {
const ALIGN: usize = 4 * 1024;
let v = IoBufferMut::with_capacity_aligned_zeroed(ALIGN, ALIGN);
assert_eq!(v.len(), ALIGN);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
assert_eq!(&v[..], &[0; ALIGN])
}
#[test]
fn test_reserve() {
use bytes::BufMut;
const ALIGN: usize = 4 * 1024;
let mut v = IoBufferMut::with_capacity_aligned(ALIGN, ALIGN);
let capacity = v.capacity();
v.reserve(capacity);
assert_eq!(v.capacity(), capacity);
let data = [b'a'; ALIGN];
v.put(&data[..]);
v.reserve(capacity);
assert!(v.capacity() >= capacity * 2);
assert_eq!(&v[..], &data[..]);
let capacity = v.capacity();
v.clear();
v.reserve(capacity);
assert_eq!(capacity, v.capacity());
}
#[test]
fn test_bytes_put() {
use bytes::BufMut;
const ALIGN: usize = 4 * 1024;
let mut v = IoBufferMut::with_capacity_aligned(ALIGN * 4, ALIGN);
let x = [b'a'; ALIGN];
for _ in 0..2 {
for _ in 0..4 {
v.put(&x[..]);
}
assert_eq!(v.len(), ALIGN * 4);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
v.clear()
}
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
#[test]
#[should_panic]
fn test_bytes_put_panic() {
use bytes::BufMut;
const ALIGN: usize = 4 * 1024;
let mut v = IoBufferMut::with_capacity_aligned(ALIGN * 4, ALIGN);
let x = [b'a'; ALIGN];
for _ in 0..5 {
v.put_slice(&x[..]);
}
}
#[test]
fn test_io_buf_put_slice() {
use tokio_epoll_uring::BoundedBufMut;
const ALIGN: usize = 4 * 1024;
let mut v = IoBufferMut::with_capacity_aligned(ALIGN, ALIGN);
let x = [b'a'; ALIGN];
for _ in 0..2 {
v.put_slice(&x[..]);
assert_eq!(v.len(), ALIGN);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
v.clear()
}
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
}

View File

@@ -0,0 +1,9 @@
#![allow(unused)]
use tokio_epoll_uring::IoBufMut;
use crate::virtual_file::dio::IoBufferMut;
pub(crate) trait IoBufAlignedMut: IoBufMut {}
impl IoBufAlignedMut for IoBufferMut {}

View File

@@ -1,5 +1,6 @@
//! See [`FullSlice`].
use crate::virtual_file::dio::IoBufferMut;
use bytes::{Bytes, BytesMut};
use std::ops::{Deref, Range};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
@@ -76,3 +77,4 @@ macro_rules! impl_io_buf_ext {
impl_io_buf_ext!(Bytes);
impl_io_buf_ext!(BytesMut);
impl_io_buf_ext!(Vec<u8>);
impl_io_buf_ext!(IoBufferMut);

View File

@@ -1,6 +1,8 @@
# neon extension
comment = 'cloud storage for PostgreSQL'
default_version = '1.5'
# TODO: bump default version to 1.5, after we are certain that we don't
# need to rollback the compute image
default_version = '1.4'
module_pathname = '$libdir/neon'
relocatable = true
trusted = true

View File

@@ -274,7 +274,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
rate_limit_ip_subnet: 64,
ip_allowlist_check_enabled: true,
},
proxy_protocol_v2: config::ProxyProtocolV2::Rejected,
require_client_ip: false,
handshake_timeout: Duration::from_secs(10),
region: "local".into(),
wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?,

View File

@@ -17,7 +17,6 @@ use proxy::config::AuthenticationConfig;
use proxy::config::CacheOptions;
use proxy::config::HttpConfig;
use proxy::config::ProjectInfoCacheOptions;
use proxy::config::ProxyProtocolV2;
use proxy::console;
use proxy::context::parquet::ParquetUploadArgs;
use proxy::http;
@@ -145,6 +144,9 @@ struct ProxyCliArgs {
/// size of the threadpool for password hashing
#[clap(long, default_value_t = 4)]
scram_thread_pool_size: u8,
/// Require that all incoming requests have a Proxy Protocol V2 packet **and** have an IP address associated.
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
require_client_ip: bool,
/// Disable dynamic rate limiter and store the metrics to ensure its production behaviour.
#[clap(long, default_value_t = true, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
disable_dynamic_rate_limiter: bool,
@@ -227,11 +229,6 @@ struct ProxyCliArgs {
/// Configure if this is a private access proxy for the POC: In that case the proxy will ignore the IP allowlist
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
is_private_access_proxy: bool,
/// Configure whether all incoming requests have a Proxy Protocol V2 packet.
// TODO(conradludgate): switch default to rejected or required once we've updated all deployments
#[clap(value_enum, long, default_value_t = ProxyProtocolV2::Supported)]
proxy_protocol_v2: ProxyProtocolV2,
}
#[derive(clap::Args, Clone, Copy, Debug)]
@@ -707,7 +704,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
authentication_config,
proxy_protocol_v2: args.proxy_protocol_v2,
require_client_ip: args.require_client_ip,
handshake_timeout: args.handshake_timeout,
region: args.region.clone(),
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,

View File

@@ -7,7 +7,6 @@ use crate::{
Host,
};
use anyhow::{bail, ensure, Context, Ok};
use clap::ValueEnum;
use itertools::Itertools;
use remote_storage::RemoteStorageConfig;
use rustls::{
@@ -31,7 +30,7 @@ pub struct ProxyConfig {
pub allow_self_signed_compute: bool,
pub http_config: HttpConfig,
pub authentication_config: AuthenticationConfig,
pub proxy_protocol_v2: ProxyProtocolV2,
pub require_client_ip: bool,
pub region: String,
pub handshake_timeout: Duration,
pub wake_compute_retry_config: RetryConfig,
@@ -39,16 +38,6 @@ pub struct ProxyConfig {
pub connect_to_compute_retry_config: RetryConfig,
}
#[derive(Copy, Clone, Debug, ValueEnum, PartialEq)]
pub enum ProxyProtocolV2 {
/// Connection will error if PROXY protocol v2 header is missing
Required,
/// Connection will parse PROXY protocol v2 header, but accept the connection if it's missing.
Supported,
/// Connection will error if PROXY protocol v2 header is provided
Rejected,
}
#[derive(Debug)]
pub struct MetricCollectionConfig {
pub endpoint: reqwest::Url,

View File

@@ -10,7 +10,6 @@ pub(crate) mod wake_compute;
pub use copy_bidirectional::copy_bidirectional_client_compute;
pub use copy_bidirectional::ErrorSource;
use crate::config::ProxyProtocolV2;
use crate::{
auth,
cancellation::{self, CancellationHandlerMain, CancellationHandlerMainInternal},
@@ -94,19 +93,15 @@ pub async fn task_main(
connections.spawn(async move {
let (socket, peer_addr) = match read_proxy_protocol(socket).await {
Ok((socket, Some(addr))) => (socket, addr.ip()),
Err(e) => {
error!("per-client task finished with an error: {e:#}");
return;
}
Ok((_socket, None)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
error!("missing required proxy protocol header");
Ok((_socket, None)) if config.require_client_ip => {
error!("missing required client IP");
return;
}
Ok((_socket, Some(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
error!("proxy protocol header not supported");
return;
}
Ok((socket, Some(addr))) => (socket, addr.ip()),
Ok((socket, None)) => (socket, peer_addr.ip()),
};

View File

@@ -1,15 +1,13 @@
//! This module implements Timeline lifecycle management and has all necessary code
//! to glue together SafeKeeper and all other background services.
use anyhow::{anyhow, bail, Context, Result};
use anyhow::{anyhow, bail, Result};
use camino::Utf8PathBuf;
use remote_storage::RemotePath;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use tokio::fs::{self, File};
use tokio::io::AsyncWriteExt;
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
use utils::crashsafe::{durable_rename, fsync_async_opt};
use utils::id::TenantId;
use std::cmp::max;
@@ -29,8 +27,6 @@ use utils::{
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::control_file::{ CONTROL_FILE_NAME};
use crate::pull_timeline::create_temp_timeline_dir;
use crate::rate_limit::RateLimiter;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{
@@ -618,49 +614,23 @@ impl Timeline {
}
}
// Create a temporary timeline directory
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, self.ttid).await?;
// Init the control file
let init_control_file = async {
let guard = shared_state.sk.state_mut();
let path = tli_dir_path.join(CONTROL_FILE_NAME);
let buf = guard.write_to_buf()?;
let mut control_file = File::create(&path)
.await
.with_context(|| format!("failed to create init control file at: {}", path))?;
control_file.write_all(&buf).await.with_context(|| {
format!(
"failed to write safekeeper state into control file at: {}",
path
)
})?;
control_file.flush().await.with_context(|| {
format!(
"failed to flush safekeeper state into control file at: {}",
path
)
})?;
drop(control_file);
Ok(())
};
// Create timeline directory.
fs::create_dir_all(&self.timeline_dir).await?;
// Write timeline to disk and start background tasks.
if let Err(e) = init_control_file.await {
if let Err(e) = shared_state.sk.state_mut().flush().await {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state);
if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
warn!(
"failed to remove timeline {} directory after bootstrap failure: {}",
self.ttid, fs_err
);
}
return Err(e);
}
info!(
"moving timeline {} from {} to {}",
self.ttid, tli_dir_path, self.timeline_dir
);
tokio::fs::create_dir_all(&self.timeline_dir).await?;
// fsync tenant dir creation
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
durable_rename(&tli_dir_path, &self.timeline_dir, !conf.no_sync).await?;
self.bootstrap(conf, broker_active_set, partial_backup_rate_limiter);
Ok(())
}

View File

@@ -401,6 +401,7 @@ class NeonEnvBuilder:
safekeeper_extra_opts: Optional[list[str]] = None,
storage_controller_port_override: Optional[int] = None,
pageserver_io_buffer_alignment: Optional[int] = None,
pageserver_virtual_file_io_mode: Optional[str] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -455,6 +456,7 @@ class NeonEnvBuilder:
self.storage_controller_port_override = storage_controller_port_override
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
self.pageserver_virtual_file_io_mode = pageserver_virtual_file_io_mode
assert test_name.startswith(
"test_"
@@ -950,6 +952,9 @@ class NeonEnv:
safekeepers - An array containing objects representing the safekeepers
pg_bin - pg_bin.run() can be used to execute Postgres client binaries,
like psql or pg_dump
initial_tenant - tenant ID of the initial tenant created in the repository
neon_cli - can be used to run the 'neon' CLI tool
@@ -1025,6 +1030,7 @@ class NeonEnv:
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
self.pageserver_aux_file_policy = config.pageserver_aux_file_policy
self.pageserver_io_buffer_alignment = config.pageserver_io_buffer_alignment
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
# Create the neon_local's `NeonLocalInitConf`
cfg: Dict[str, Any] = {
@@ -1088,7 +1094,10 @@ class NeonEnv:
for key, value in override.items():
ps_cfg[key] = value
ps_cfg["io_buffer_alignment"] = self.pageserver_io_buffer_alignment
if self.pageserver_io_buffer_alignment is not None:
ps_cfg["io_buffer_alignment"] = self.pageserver_io_buffer_alignment
if self.pageserver_virtual_file_io_mode is not None:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
# Create a corresponding NeonPageserver object
self.pageservers.append(
@@ -1327,6 +1336,7 @@ def neon_simple_env(
pageserver_aux_file_policy: Optional[AuxFileStore],
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
pageserver_io_buffer_alignment: Optional[int],
pageserver_virtual_file_io_mode: Optional[str],
) -> Iterator[NeonEnv]:
"""
Simple Neon environment, with no authentication and no safekeepers.
@@ -1353,6 +1363,7 @@ def neon_simple_env(
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
) as builder:
env = builder.init_start()
@@ -1377,6 +1388,7 @@ def neon_env_builder(
pageserver_aux_file_policy: Optional[AuxFileStore],
record_property: Callable[[str, object], None],
pageserver_io_buffer_alignment: Optional[int],
pageserver_virtual_file_io_mode: Optional[str],
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1412,6 +1424,7 @@ def neon_env_builder(
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
) as builder:
yield builder
# Propogate `preserve_database_files` to make it possible to use in other fixtures,
@@ -3297,8 +3310,6 @@ class PgBin:
@pytest.fixture(scope="function")
def pg_bin(test_output_dir: Path, pg_distrib_dir: Path, pg_version: PgVersion) -> PgBin:
"""pg_bin.run() can be used to execute Postgres client binaries, like psql or pg_dump"""
return PgBin(test_output_dir, pg_distrib_dir, pg_version)

View File

@@ -39,6 +39,11 @@ def pageserver_io_buffer_alignment() -> Optional[int]:
return None
@pytest.fixture(scope="function", autouse=True)
def pageserver_virtual_file_io_mode() -> Optional[str]:
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_MODE")
@pytest.fixture(scope="function", autouse=True)
def pageserver_aux_file_policy() -> Optional[AuxFileStore]:
return None

View File

@@ -53,7 +53,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
env = neon_simple_env
pageserver_http_client = env.pageserver.http_client()
tenant, timeline_main = env.neon_cli.create_tenant(
tenant, _ = env.neon_cli.create_tenant(
conf={
# disable background GC
"gc_period": "0s",
@@ -70,7 +70,8 @@ def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
}
)
endpoint_main = env.endpoints.create_start("main", tenant_id=tenant)
timeline_main = env.neon_cli.create_timeline("test_main", tenant_id=tenant)
endpoint_main = env.endpoints.create_start("test_main", tenant_id=tenant)
main_cur = endpoint_main.connect().cursor()
@@ -91,7 +92,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
pageserver_http_client.timeline_gc(tenant, timeline_main, lsn2 - lsn1 + 1024)
env.neon_cli.create_branch(
"test_branch", ancestor_branch_name="main", ancestor_start_lsn=lsn1, tenant_id=tenant
"test_branch", "test_main", tenant_id=tenant, ancestor_start_lsn=lsn1
)
endpoint_branch = env.endpoints.create_start("test_branch", tenant_id=tenant)

View File

@@ -252,7 +252,7 @@ def test_forward_compatibility(
# not using env.pageserver.version because it was initialized before
prev_pageserver_version_str = env.get_binary_version("pageserver")
prev_pageserver_version_match = re.search(
"Neon page server git(?:-env)?:(.*) failpoints: (.*), features: (.*)",
"Neon page server git-env:(.*) failpoints: (.*), features: (.*)",
prev_pageserver_version_str,
)
if prev_pageserver_version_match is not None:
@@ -263,12 +263,12 @@ def test_forward_compatibility(
)
# does not include logs from previous runs
assert not env.pageserver.log_contains(f"git(-env)?:{prev_pageserver_version}")
assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version)
env.start()
# ensure the specified pageserver is running
assert env.pageserver.log_contains(f"git(-env)?:{prev_pageserver_version}")
assert env.pageserver.log_contains("git-env:" + prev_pageserver_version)
check_neon_works(
env,

View File

@@ -31,7 +31,9 @@ def helper_compare_timeline_list(
)
)
timelines_cli = env.neon_cli.list_timelines(initial_tenant)
timelines_cli = env.neon_cli.list_timelines()
assert timelines_cli == env.neon_cli.list_timelines(initial_tenant)
cli_timeline_ids = sorted([timeline_id for (_, timeline_id) in timelines_cli])
assert timelines_api == cli_timeline_ids

View File

@@ -24,7 +24,7 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
# IMPORTANT:
# If the version has changed, the test should be updated.
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.5",)
assert cur.fetchone() == ("1.4",)
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
res = cur.fetchall()
log.info(res)
@@ -48,7 +48,7 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
# IMPORTANT:
# If the version has changed, the test should be updated.
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.5",)
assert cur.fetchone() == ("1.4",)
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
all_versions = ["1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
current_version = "1.5"

View File

@@ -174,7 +174,8 @@ def test_pageserver_chaos(
"checkpoint_distance": "5000000",
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
env.neon_cli.create_timeline("test_pageserver_chaos", tenant_id=tenant)
endpoint = env.endpoints.create_start("test_pageserver_chaos", tenant_id=tenant)
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer

View File

@@ -27,15 +27,20 @@ def test_empty_tenant_size(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_configs()
env.start()
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
(tenant_id, _) = env.neon_cli.create_tenant()
http_client = env.pageserver.http_client()
initial_size = http_client.tenant_size(tenant_id)
# we should never have zero, because there should be the initdb "changes"
assert initial_size > 0, "initial implementation returns ~initdb tenant_size"
main_branch_name = "main"
branch_name, main_timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
assert branch_name == main_branch_name
endpoint = env.endpoints.create_start(
"main",
main_branch_name,
tenant_id=tenant_id,
config_lines=["autovacuum=off", "checkpoint_timeout=10min"],
)
@@ -49,7 +54,7 @@ def test_empty_tenant_size(neon_env_builder: NeonEnvBuilder):
# The transaction above will make the compute generate a checkpoint.
# In turn, the pageserver persists the checkpoint. This should only be
# one key with a size of a couple hundred bytes.
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, main_timeline_id)
size = http_client.tenant_size(tenant_id)
assert size >= initial_size and size - initial_size < 1024
@@ -301,8 +306,7 @@ def test_single_branch_get_tenant_size_grows(
env = neon_env_builder.init_start(initial_tenant_conf=tenant_config)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
branch_name = "main"
branch_name, timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
http_client = env.pageserver.http_client()
@@ -512,8 +516,7 @@ def test_get_tenant_size_with_multiple_branches(
env.pageserver.allowed_errors.append(".*InternalServerError\\(No such file or directory.*")
tenant_id = env.initial_tenant
main_timeline_id = env.initial_timeline
main_branch_name = "main"
main_branch_name, main_timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
http_client = env.pageserver.http_client()

View File

@@ -71,9 +71,10 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder):
"checkpoint_distance": "5000000",
}
)
env.neon_cli.create_timeline("test_tenants_many", tenant_id=tenant)
endpoint = env.endpoints.create_start(
"main",
"test_tenants_many",
tenant_id=tenant,
)
tenants_endpoints.append((tenant, endpoint))

View File

@@ -638,7 +638,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
wait_until(50, 0.1, first_request_finished)
# check that the timeline is gone
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=10)
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=2)
def test_timeline_delete_works_for_remote_smoke(

View File

@@ -26,7 +26,8 @@ def test_truncate(neon_env_builder: NeonEnvBuilder, zenbenchmark):
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
env.neon_cli.create_timeline("test_truncate", tenant_id=tenant)
endpoint = env.endpoints.create_start("test_truncate", tenant_id=tenant)
cur = endpoint.connect().cursor()
cur.execute("create table t1(x integer)")
cur.execute(f"insert into t1 values (generate_series(1,{n_records}))")