Compare commits

..

11 Commits

Author SHA1 Message Date
Alex Chi Z
a6b6597d1b refactor(safekeeper): use temp dir when creating timeline
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-09-30 14:47:51 -04:00
Arthur Petukhovsky
c07cea80bd Bump vm-builder v0.29.3 -> v0.35.0 (#9208)
We haven't updated it for a while. Now I need the update to add quotas
support to compute images
(https://github.com/neondatabase/cloud/issues/13127).

Previous update: https://github.com/neondatabase/neon/pull/7849
2024-09-30 19:18:42 +01:00
Conrad Ludgate
a2e2362ee9 add proxy-protocol header disable option (#9203)
resolves https://github.com/neondatabase/cloud/issues/18026
2024-09-30 18:11:50 +00:00
Heikki Linnakangas
0a567acdb9 tests: Move comment to more appropriate place
There is no 'pg_bin' in NeonEnv.
2024-09-30 17:56:43 +03:00
Heikki Linnakangas
69ea2776e9 tests: Remove creation of extra timelines in some tests
neon_cli.create_tenant() creates a new tenant *and* a timeline on the
tenant, with name "main". In most tests, there's no need to create
another timeline on the same tenant.

There are some more tests that do that, but in the remaining cases, I
wasn't be 100% if the presence of extra root timelines affect what the
tests test, so I left them alone.
2024-09-30 17:56:40 +03:00
Heikki Linnakangas
4dc9cb7cf9 tests: Remove some spurious list_timelines calls
These calls seem really out of place. We know what the initial tenant
and branch are in these tests, just like in all other tests.
2024-09-30 17:56:37 +03:00
John Spray
7424e7269c tests: longer timeout in test_delete_timeline_client_hangup (#9161)
## Problem

This test waits for a request to finish, and then expects deletion to
complete almost immediately. The request completes, but it's a 202, the
timeline is still deleting in the background: we need to be more
patient.

## Summary of changes

- Adjust iterations from 2 to 10 when waiting for deletion
2024-09-30 15:46:07 +01:00
a-masterov
5dc68e4e6a test_compatibility: fix the regexes detecting the version (#9205)
## Problem
The Neon components, built locally and by the GitHub workflow have
slightly different version prefixes (git: vs git-env:)
This does not allow running tests against local builds correctly.

## Summary of changes
The regular expressions were changed to work with both
prefixes.
2024-09-30 16:37:14 +02:00
John Spray
7cfd116856 pageserver: refactor immediate_gc into TenantManager (#9183)
## Problem

Legacy functions that were called as `mgr::` and relied on the static
TENANTS, see #5796

## Summary of changes

- Move the last stray function (immediate_gc) into TenantManager

Closes: https://github.com/neondatabase/neon/issues/5796
2024-09-30 09:27:28 +01:00
Heikki Linnakangas
d696c41807 Bump default neon extension version to 1.5 (#9188)
Commit 263dfba6ee introduced neon extension version 1.5, which included
some new functions and views for metrics. It didn't bump the default
neon extension number yet, so that we could still safely roll back to
the old binary if necessary. This bumps the default version.
2024-09-30 09:20:52 +03:00
Alexander Bayandin
3c72192065 CI(benchmarking): fix setting LD_LIBRARY_PATH (#9191)
## Problem

`pgbench-pgvector` job from Nightly Benchmarks fails with the error:

```
/__w/_temp/f45bc2eb-4c4c-4f0a-8030-99079303fa65.sh: line 17: LD_LIBRARY_PATH: unbound variable
```

## Summary of changes
- Fix `LD_LIBRARY_PATH: unbound variable` error in benchmarks
2024-09-29 22:27:53 +00:00
40 changed files with 312 additions and 935 deletions

View File

@@ -557,7 +557,7 @@ jobs:
ln -s /home/nonroot/pg/usr/lib/postgresql/16/bin/psql /tmp/neon/pg_install/v16/bin/psql
ln -s /home/nonroot/pg/usr/lib/$(uname -m)-linux-gnu /tmp/neon/pg_install/v16/lib
LD_LIBRARY_PATH="/home/nonroot/pg/usr/lib/$(uname -m)-linux-gnu:${LD_LIBRARY_PATH}"
LD_LIBRARY_PATH="/home/nonroot/pg/usr/lib/$(uname -m)-linux-gnu:${LD_LIBRARY_PATH:-}"
export LD_LIBRARY_PATH
echo "LD_LIBRARY_PATH=${LD_LIBRARY_PATH}" >> ${GITHUB_ENV}

View File

@@ -773,7 +773,7 @@ jobs:
matrix:
version: [ v14, v15, v16, v17 ]
env:
VM_BUILDER_VERSION: v0.29.3
VM_BUILDER_VERSION: v0.35.0
steps:
- uses: actions/checkout@v4

View File

@@ -104,7 +104,7 @@ pub struct ConfigToml {
pub image_compression: ImageCompressionAlgorithm,
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: Option<crate::models::L0FlushConfig>,
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
}
@@ -381,7 +381,7 @@ impl Default for ConfigToml {
image_compression: (DEFAULT_IMAGE_COMPRESSION),
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
virtual_file_io_mode: None,
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,

View File

@@ -972,6 +972,8 @@ pub struct TopTenantShardsResponse {
}
pub mod virtual_file {
use std::path::PathBuf;
#[derive(
Copy,
Clone,
@@ -992,49 +994,50 @@ pub mod virtual_file {
}
/// Direct IO modes for a pageserver.
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
#[repr(u8)]
pub enum IoMode {
/// Uses buffered IO.
Buffered,
/// Uses direct IO, error out if the operation fails.
#[cfg(target_os = "linux")]
Direct,
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum DirectIoMode {
/// Direct IO disabled (uses usual buffered IO).
#[default]
Disabled,
/// Direct IO disabled (performs checks and perf simulations).
Evaluate {
/// Alignment check level
alignment_check: DirectIoAlignmentCheckLevel,
/// Latency padded for performance simulation.
latency_padding: DirectIoLatencyPadding,
},
/// Direct IO enabled.
Enabled {
/// Actions to perform on alignment error.
on_alignment_error: DirectIoOnAlignmentErrorAction,
},
}
impl IoMode {
pub const fn preferred() -> Self {
if cfg!(target_os = "linux") {
Self::Direct
} else {
Self::Buffered
}
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoAlignmentCheckLevel {
#[default]
Error,
Log,
None,
}
impl TryFrom<u8> for IoMode {
type Error = u8;
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoOnAlignmentErrorAction {
Error,
#[default]
FallbackToBuffered,
}
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
#[cfg(target_os = "linux")]
v if v == (IoMode::Direct as u8) => IoMode::Direct,
x => return Err(x),
})
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum DirectIoLatencyPadding {
/// Pad virtual file operations with IO to a fake file.
FakeFileRW { path: PathBuf },
#[default]
None,
}
}

View File

@@ -164,10 +164,12 @@ fn criterion_benchmark(c: &mut Criterion) {
let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
let align = pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
virtual_file::init(16384, virtual_file::io_engine_for_bench(), align);
page_cache::init(conf.page_cache_size, align);
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
page_cache::init(conf.page_cache_size);
{
let mut group = c.benchmark_group("ingest-small-values");

View File

@@ -550,19 +550,6 @@ impl Client {
.map_err(Error::ReceiveBody)
}
/// Configs io mode at runtime.
pub async fn put_io_mode(
&self,
mode: &pageserver_api::models::virtual_file::IoMode,
) -> Result<()> {
let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, mode)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
self.get(uri)

View File

@@ -151,10 +151,13 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let align = pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, align);
pageserver::page_cache::init(100, align);
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::page_cache::init(100);
let mut total_delta_layers = 0usize;
let mut total_image_layers = 0usize;

View File

@@ -59,9 +59,8 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
let align = pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, align);
page_cache::init(100, align);
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, 1);
page_cache::init(100);
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
@@ -191,10 +190,12 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
let align = pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, align);
pageserver::page_cache::init(100, align);
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

View File

@@ -205,9 +205,12 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
let align = DEFAULT_IO_BUFFER_ALIGNMENT;
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, align);
page_cache::init(100, align);
virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
DEFAULT_IO_BUFFER_ALIGNMENT,
);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await
}

View File

@@ -63,10 +63,6 @@ pub(crate) struct Args {
#[clap(long)]
set_io_alignment: Option<usize>,
/// Before starting the benchmark, live-reconfigure the pageserver to use specified io mode (buffered vs. direct).
#[clap(long)]
set_io_mode: Option<pageserver_api::models::virtual_file::IoMode>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -137,10 +133,6 @@ async fn main_impl(
mgmt_api_client.put_io_alignment(align).await?;
}
if let Some(mode) = &args.set_io_mode {
mgmt_api_client.put_io_mode(mode).await?;
}
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client,

View File

@@ -125,7 +125,7 @@ fn main() -> anyhow::Result<()> {
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file Direct IO settings");
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment");
// The tenants directory contains all the pageserver local disk state.
@@ -173,7 +173,7 @@ fn main() -> anyhow::Result<()> {
conf.virtual_file_io_engine,
conf.io_buffer_alignment,
);
page_cache::init(conf.page_cache_size, conf.io_buffer_alignment);
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;

View File

@@ -174,7 +174,7 @@ pub struct PageServerConf {
pub l0_flush: crate::l0_flush::L0FlushConfig,
/// Direct IO settings
pub virtual_file_io_mode: virtual_file::IoMode,
pub virtual_file_direct_io: virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
}
@@ -325,7 +325,7 @@ impl PageServerConf {
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
virtual_file_io_mode,
virtual_file_direct_io,
concurrent_tenant_warmup,
concurrent_tenant_size_logical_size_queries,
virtual_file_io_engine,
@@ -368,6 +368,7 @@ impl PageServerConf {
max_vectored_read_bytes,
image_compression,
ephemeral_bytes_per_memory_kb,
virtual_file_direct_io,
io_buffer_alignment,
// ------------------------------------------------------------
@@ -407,7 +408,6 @@ impl PageServerConf {
l0_flush: l0_flush
.map(crate::l0_flush::L0FlushConfig::from)
.unwrap_or_default(),
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
};
// ------------------------------------------------------------

View File

@@ -17,7 +17,6 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use pageserver_api::models::IngestAuxFilesRequest;
@@ -57,6 +56,7 @@ use utils::http::endpoint::request_span;
use utils::http::request::must_parse_query_param;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::pgdatadir_mapping::LsnForTimestamp;
@@ -81,7 +81,6 @@ use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
@@ -1720,8 +1719,13 @@ async fn timeline_gc_handler(
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let state = get_state(&request);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
let gc_result = state
.tenant_manager
.immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx)
.await?;
json_response(StatusCode::OK, gc_result)
}
@@ -2382,16 +2386,6 @@ async fn put_io_alignment_handler(
json_response(StatusCode::OK, ())
}
async fn put_io_mode_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let mode: IoMode = json_request(&mut r).await?;
crate::virtual_file::set_io_mode(mode);
json_response(StatusCode::OK, ())
}
/// Polled by control plane.
///
/// See [`crate::utilization`].
@@ -3082,7 +3076,6 @@ pub fn make_router(
.put("/v1/io_alignment", |r| {
api_handler(r, put_io_alignment_handler)
})
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|r| api_handler(r, force_aux_policy_switch_handler),

View File

@@ -82,7 +82,6 @@ use once_cell::sync::OnceCell;
use crate::{
context::RequestContext,
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
virtual_file::{self, dio::IoBufferMut},
};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
@@ -91,8 +90,8 @@ const TEST_PAGE_CACHE_SIZE: usize = 50;
///
/// Initialize the page cache. This must be called once at page server startup.
///
pub fn init(size: usize, align: usize) {
if PAGE_CACHE.set(PageCache::new(size, align)).is_err() {
pub fn init(size: usize) {
if PAGE_CACHE.set(PageCache::new(size)).is_err() {
panic!("page cache already initialized");
}
}
@@ -107,12 +106,7 @@ pub fn get() -> &'static PageCache {
// page cache is usable in unit tests.
//
if cfg!(test) {
PAGE_CACHE.get_or_init(|| {
PageCache::new(
TEST_PAGE_CACHE_SIZE,
virtual_file::get_io_buffer_alignment(),
)
})
PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
} else {
PAGE_CACHE.get().expect("page cache not initialized")
}
@@ -643,11 +637,13 @@ impl PageCache {
/// Initialize a new page cache
///
/// This should be called only once at page server startup.
fn new(num_pages: usize, align: usize) -> Self {
fn new(num_pages: usize) -> Self {
assert!(num_pages > 0, "page cache size must be > 0");
let page_buffer =
IoBufferMut::with_capacity_aligned_zeroed(num_pages * PAGE_SZ, align).leak();
// We could use Vec::leak here, but that potentially also leaks
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
// this is avoided.
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);

View File

@@ -84,7 +84,7 @@ impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = self.buffered_writer.as_inner().as_inner().path();
let path = &self.buffered_writer.as_inner().as_inner().path;
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
@@ -356,7 +356,7 @@ mod tests {
}
let file_contents =
std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap();
std::fs::read(&file.buffered_writer.as_inner().as_inner().path).unwrap();
assert_eq!(file_contents, &content[0..cap]);
let buffer_contents = file.buffered_writer.inspect_buffer();
@@ -392,7 +392,7 @@ mod tests {
.buffered_writer
.as_inner()
.as_inner()
.path()
.path
.metadata()
.unwrap();
assert_eq!(

View File

@@ -2197,6 +2197,82 @@ impl TenantManager {
Ok((wanted_bytes, shard_count as u32))
}
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
pub(crate) async fn immediate_gc(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<GcResult, ApiError> {
let tenant = {
let guard = self.tenants.read().unwrap();
guard
.get(&tenant_shard_id)
.cloned()
.with_context(|| format!("tenant {tenant_shard_id}"))
.map_err(|e| ApiError::NotFound(e.into()))?
};
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
// Run in task_mgr to avoid race with tenant_detach operation
let ctx: RequestContext =
ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
fail::fail_point!("immediate_gc_task_pre");
#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
#[cfg(feature = "testing")]
{
// we need to synchronize with drop completion for python tests without polling for
// log messages
if let Ok(result) = result.as_mut() {
let mut js = tokio::task::JoinSet::new();
for layer in std::mem::take(&mut result.doomed_layers) {
js.spawn(layer.wait_drop());
}
tracing::info!(
total = js.len(),
"starting to wait for the gc'd layers to be dropped"
);
while let Some(res) = js.join_next().await {
res.expect("wait_drop should not panic");
}
}
let timeline = tenant.get_timeline(timeline_id, false).ok();
let rtc = timeline.as_ref().map(|x| &x.remote_client);
if let Some(rtc) = rtc {
// layer drops schedule actions on remote timeline client to actually do the
// deletions; don't care about the shutdown error, just exit fast
drop(rtc.wait_completion().await);
}
}
result.map_err(|e| match e {
GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
GcError::TimelineNotFound => {
ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
}
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
})
}
}
#[derive(Debug, thiserror::Error)]
@@ -2341,7 +2417,7 @@ enum TenantSlotDropError {
/// Errors that can happen any time we are walking the tenant map to try and acquire
/// the TenantSlot for a particular tenant.
#[derive(Debug, thiserror::Error)]
pub enum TenantMapError {
pub(crate) enum TenantMapError {
// Tried to read while initializing
#[error("tenant map is still initializing")]
StillInitializing,
@@ -2371,7 +2447,7 @@ pub enum TenantMapError {
/// The `old_value` may be dropped before the SlotGuard is dropped, by calling
/// `drop_old_value`. It is an error to call this without shutting down
/// the conents of `old_value`.
pub struct SlotGuard {
pub(crate) struct SlotGuard {
tenant_shard_id: TenantShardId,
old_value: Option<TenantSlot>,
upserted: bool,
@@ -2764,81 +2840,6 @@ use {
utils::http::error::ApiError,
};
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
pub(crate) async fn immediate_gc(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<GcResult, ApiError> {
let tenant = {
let guard = TENANTS.read().unwrap();
guard
.get(&tenant_shard_id)
.cloned()
.with_context(|| format!("tenant {tenant_shard_id}"))
.map_err(|e| ApiError::NotFound(e.into()))?
};
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
// Run in task_mgr to avoid race with tenant_detach operation
let ctx: RequestContext =
ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
fail::fail_point!("immediate_gc_task_pre");
#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
#[cfg(feature = "testing")]
{
// we need to synchronize with drop completion for python tests without polling for
// log messages
if let Ok(result) = result.as_mut() {
let mut js = tokio::task::JoinSet::new();
for layer in std::mem::take(&mut result.doomed_layers) {
js.spawn(layer.wait_drop());
}
tracing::info!(
total = js.len(),
"starting to wait for the gc'd layers to be dropped"
);
while let Some(res) = js.join_next().await {
res.expect("wait_drop should not panic");
}
}
let timeline = tenant.get_timeline(timeline_id, false).ok();
let rtc = timeline.as_ref().map(|x| &x.remote_client);
if let Some(rtc) = rtc {
// layer drops schedule actions on remote timeline client to actually do the
// deletions; don't care about the shutdown error, just exit fast
drop(rtc.wait_completion().await);
}
}
result.map_err(|e| match e {
GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
GcError::TimelineNotFound => {
ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
}
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
})
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

View File

@@ -43,12 +43,12 @@ use crate::tenant::vectored_blob_io::{
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::dio::IoBufferMut;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
@@ -572,7 +572,7 @@ impl DeltaLayerWriterInner {
ensure!(
metadata.len() <= S3_UPLOAD_LIMIT,
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path(),
file.path,
metadata.len()
);
@@ -790,7 +790,7 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open_v2(path, ctx)
let file = VirtualFile::open(path, ctx)
.await
.context("open layer file")?;
@@ -991,8 +991,7 @@ impl DeltaLayerInner {
.0
.into();
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
let align = virtual_file::get_io_buffer_alignment();
let mut buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align));
let mut buf = Some(BytesMut::with_capacity(buf_size));
// Note that reads are processed in reverse order (from highest key+lsn).
// This is the order that `ReconstructState` requires such that it can
@@ -1011,7 +1010,7 @@ impl DeltaLayerInner {
blob_meta.key,
PageReconstructError::Other(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path(),
self.file.path,
kind
)),
);
@@ -1019,7 +1018,7 @@ impl DeltaLayerInner {
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align));
buf = Some(BytesMut::with_capacity(buf_size));
continue;
}
@@ -1037,7 +1036,7 @@ impl DeltaLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path(),
self.file.path,
))),
);
@@ -1055,7 +1054,7 @@ impl DeltaLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.file.path(),
self.file.path,
))),
);
@@ -1187,14 +1186,14 @@ impl DeltaLayerInner {
let mut prev: Option<(Key, Lsn, BlobRef)> = None;
let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
let align = virtual_file::get_io_buffer_alignment();
let max_read_size = self
.max_vectored_read_bytes
.map(|x| x.0.get())
.unwrap_or(8192);
let align = virtual_file::get_io_buffer_alignment();
let mut buffer = Some(IoBufferMut::with_capacity_aligned(max_read_size, align));
let mut buffer = Some(BytesMut::with_capacity(max_read_size));
// FIXME: buffering of DeltaLayerWriter
let mut per_blob_copy = Vec::new();
@@ -1553,12 +1552,12 @@ impl<'a> DeltaLayerIterator<'a> {
let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let align = virtual_file::get_io_buffer_alignment();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let buf = BytesMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let view = BufView::new_slice(&blobs_buf.buf);
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let blob_read = meta.read(&view).await?;
let value = Value::des(&blob_read)?;
@@ -1933,9 +1932,7 @@ pub(crate) mod test {
&vectored_reads,
constants::MAX_VECTORED_READ_BYTES,
);
let align = virtual_file::get_io_buffer_alignment();
let mut buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align));
let mut buf = Some(BytesMut::with_capacity(buf_size));
for read in vectored_reads {
let blobs_buf = vectored_blob_reader

View File

@@ -40,12 +40,11 @@ use crate::tenant::vectored_blob_io::{
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::dio::IoBufferMut;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
@@ -389,7 +388,7 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open_v2(path, ctx)
let file = VirtualFile::open(path, ctx)
.await
.context("open layer file")?;
let file_id = page_cache::next_file_id();
@@ -543,15 +542,14 @@ impl ImageLayerInner {
.await?;
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let align = virtual_file::get_io_buffer_alignment();
let mut key_count = 0;
for read in plan.into_iter() {
let buf_size = read.size();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let buf = BytesMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
let view = BufView::new_slice(&blobs_buf.buf);
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
@@ -599,13 +597,13 @@ impl ImageLayerInner {
);
}
let align = virtual_file::get_io_buffer_alignment();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let buf = BytesMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
match res {
Ok(blobs_buf) => {
let view = BufView::new_slice(&blobs_buf.buf);
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await;
@@ -616,7 +614,7 @@ impl ImageLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path(),
self.file.path,
))),
);
@@ -637,7 +635,7 @@ impl ImageLayerInner {
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path(),
self.file.path,
kind
)),
);
@@ -1041,12 +1039,12 @@ impl<'a> ImageLayerIterator<'a> {
let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let align = virtual_file::get_io_buffer_alignment();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let buf = BytesMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let view = BufView::new_slice(&blobs_buf.buf);
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
next_batch.push_back((

View File

@@ -18,7 +18,7 @@
use std::collections::BTreeMap;
use std::ops::Deref;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use pageserver_api::key::Key;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::BoundedBuf;
@@ -27,7 +27,6 @@ use utils::vec_map::VecMap;
use crate::context::RequestContext;
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
use crate::virtual_file::dio::IoBufferMut;
use crate::virtual_file::{self, VirtualFile};
/// Metadata bundled with the start and end offset of a blob.
@@ -159,7 +158,7 @@ impl std::fmt::Display for VectoredBlob {
/// Return type of [`VectoredBlobReader::read_blobs`]
pub struct VectoredBlobsBuf {
/// Buffer for all blobs in this read
pub buf: IoBufferMut,
pub buf: BytesMut,
/// Offsets into the buffer and metadata for all blobs in this read
pub blobs: Vec<VectoredBlob>,
}
@@ -461,7 +460,7 @@ impl<'a> VectoredBlobReader<'a> {
pub async fn read_blobs(
&self,
read: &VectoredRead,
buf: IoBufferMut,
buf: BytesMut,
ctx: &RequestContext,
) -> Result<VectoredBlobsBuf, std::io::Error> {
assert!(read.size() > 0);
@@ -946,8 +945,7 @@ mod tests {
// Multiply by two (compressed data might need more space), and add a few bytes for the header
let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
let align = virtual_file::get_io_buffer_alignment();
let mut buf = IoBufferMut::with_capacity_aligned(reserved_bytes, align);
let mut buf = BytesMut::with_capacity(reserved_bytes);
let align = virtual_file::get_io_buffer_alignment();
let vectored_blob_reader = VectoredBlobReader::new(&file);

View File

@@ -23,12 +23,10 @@ use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver_api::shard::TenantShardId;
use std::fs::File;
use std::io::{Error, ErrorKind, Seek, SeekFrom};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
@@ -40,11 +38,10 @@ pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
mod metadata;
mod open_options;
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) use api::IoMode;
pub(crate) use api::DirectIoMode;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
pub(crate) mod dio;
pub(crate) mod owned_buffers_io {
//! Abstractions for IO with owned buffers.
@@ -56,7 +53,6 @@ pub(crate) mod owned_buffers_io {
//! but for the time being we're proving out the primitives in the neon.git repo
//! for faster iteration.
pub(crate) mod io_buf_aligned;
pub(crate) mod io_buf_ext;
pub(crate) mod slice;
pub(crate) mod write;
@@ -65,176 +61,6 @@ pub(crate) mod owned_buffers_io {
}
}
#[derive(Debug)]
pub enum VirtualFile {
Buffered(VirtualFileInner),
Direct(VirtualFileInner),
}
impl VirtualFile {
fn inner(&self) -> &VirtualFileInner {
match self {
Self::Buffered(file) => file,
Self::Direct(file) => file,
}
}
fn inner_mut(&mut self) -> &mut VirtualFileInner {
match self {
Self::Buffered(file) => file,
Self::Direct(file) => file,
}
}
fn into_inner(self) -> VirtualFileInner {
match self {
Self::Buffered(file) => file,
Self::Direct(file) => file,
}
}
/// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let file = VirtualFileInner::open(path, ctx).await?;
Ok(Self::Buffered(file))
}
/// Open a file in read-only mode. Like File::open.
///
/// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
pub async fn open_v2<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let file = VirtualFileInner::create(path, ctx).await?;
Ok(Self::Buffered(file))
}
pub async fn create_v2<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
VirtualFile::open_with_options_v2(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
ctx,
)
.await
}
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<Self, std::io::Error> {
let file = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(Self::Buffered(file))
}
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
path: P,
open_options: &mut OpenOptions, // Uses `&mut` here to add `O_DIRECT`.
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<Self, std::io::Error> {
let file = match get_io_mode() {
IoMode::Buffered => {
let file = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Self::Buffered(file)
}
#[cfg(target_os = "linux")]
IoMode::Direct => {
let file = VirtualFileInner::open_with_options(
path,
open_options.custom_flags(nix::libc::O_DIRECT),
ctx,
)
.await?;
Self::Direct(file)
}
};
Ok(file)
}
pub fn path(&self) -> &Utf8Path {
self.inner().path.as_path()
}
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
final_path: Utf8PathBuf,
tmp_path: Utf8PathBuf,
content: B,
) -> std::io::Result<()> {
VirtualFileInner::crashsafe_overwrite(final_path, tmp_path, content).await
}
pub async fn sync_all(&self) -> Result<(), Error> {
self.inner().sync_all().await
}
pub async fn sync_data(&self) -> Result<(), Error> {
self.inner().sync_data().await
}
pub async fn metadata(&self) -> Result<Metadata, Error> {
self.inner().metadata().await
}
pub fn remove(self) {
self.into_inner().remove();
}
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
self.inner_mut().seek(pos).await
}
pub async fn read_exact_at<Buf>(
&self,
slice: Slice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> Result<Slice<Buf>, Error>
where
Buf: IoBufMut + Send,
{
self.inner().read_exact_at(slice, offset, ctx).await
}
pub async fn read_exact_at_page(
&self,
page: PageWriteGuard<'static>,
offset: u64,
ctx: &RequestContext,
) -> Result<PageWriteGuard<'static>, Error> {
self.inner().read_exact_at_page(page, offset, ctx).await
}
pub async fn write_all_at<Buf: IoBuf + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), Error>) {
self.inner().write_all_at(buf, offset, ctx).await
}
pub async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<usize, Error>) {
self.inner_mut().write_all(buf, ctx).await
}
}
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,
@@ -251,7 +77,7 @@ impl VirtualFile {
/// 'tag' field is used to detect whether the handle still is valid or not.
///
#[derive(Debug)]
pub struct VirtualFileInner {
pub struct VirtualFile {
/// Lazy handle to the global file descriptor cache. The slot that this points to
/// might contain our File, or it may be empty, or it may contain a File that
/// belongs to a different VirtualFile.
@@ -524,12 +350,12 @@ macro_rules! with_file {
}};
}
impl VirtualFileInner {
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<VirtualFileInner, std::io::Error> {
) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
@@ -538,7 +364,7 @@ impl VirtualFileInner {
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<VirtualFileInner, std::io::Error> {
) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
@@ -556,7 +382,7 @@ impl VirtualFileInner {
path: P,
open_options: &OpenOptions,
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<VirtualFileInner, std::io::Error> {
) -> Result<VirtualFile, std::io::Error> {
let path_ref = path.as_ref();
let path_str = path_ref.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
@@ -587,7 +413,7 @@ impl VirtualFileInner {
open_options.open(path_ref.as_std_path()).await?
});
// Strip all options other than read and write (O_DIRECT).
// Strip all options other than read and write.
//
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
@@ -597,7 +423,7 @@ impl VirtualFileInner {
reopen_options.create_new(false);
reopen_options.truncate(false);
let vfile = VirtualFileInner {
let vfile = VirtualFile {
handle: RwLock::new(handle),
pos: 0,
path: path_ref.to_path_buf(),
@@ -1208,21 +1034,6 @@ impl tokio_epoll_uring::IoFd for FileGuard {
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
self.inner().read_blk(blknum, ctx).await
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
self.inner_mut().read_to_end(buf, ctx).await
}
}
#[cfg(test)]
impl VirtualFileInner {
pub(crate) async fn read_blk(
&self,
blknum: u32,
@@ -1256,7 +1067,7 @@ impl VirtualFileInner {
}
}
impl Drop for VirtualFileInner {
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut();
@@ -1405,15 +1216,6 @@ pub(crate) fn get_io_buffer_alignment() -> usize {
}
}
static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8);
pub(crate) fn set_io_mode(mode: IoMode) {
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
}
pub(crate) fn get_io_mode() -> IoMode {
IoMode::try_from(IO_MODE.load(Ordering::Relaxed)).unwrap()
}
#[cfg(test)]
mod tests {
use crate::context::DownloadBehavior;
@@ -1722,7 +1524,7 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFileInner::open_with_options(
let f = VirtualFile::open_with_options(
&test_file_path,
OpenOptions::new().read(true),
&ctx,
@@ -1774,7 +1576,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
@@ -1783,7 +1585,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
@@ -1806,7 +1608,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();

View File

@@ -1,410 +0,0 @@
#![allow(unused)]
use core::slice;
use std::{
alloc::{self, Layout},
cmp,
mem::{ManuallyDrop, MaybeUninit},
ops::{Deref, DerefMut},
ptr::{addr_of_mut, NonNull},
};
use bytes::buf::UninitSlice;
struct IoBufferPtr(*mut u8);
// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer.
unsafe impl Send for IoBufferPtr {}
/// An aligned buffer type used for I/O.
pub struct IoBufferMut {
ptr: IoBufferPtr,
capacity: usize,
len: usize,
align: usize,
}
impl IoBufferMut {
/// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
///
/// The buffer will be able to hold at most `capacity` elements and will never resize.
///
///
/// # Panics
///
/// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
/// * `align` must not be zero,
///
/// * `align` must be a power of two,
///
/// * `capacity`, when rounded up to the nearest multiple of `align`,
/// must not overflow isize (i.e., the rounded value must be
/// less than or equal to `isize::MAX`).
pub fn with_capacity_aligned(capacity: usize, align: usize) -> Self {
let layout = Layout::from_size_align(capacity, align).expect("Invalid layout");
// SAFETY: Making an allocation with a sized and aligned layout. The memory is manually freed with the same layout.
let ptr = unsafe {
let ptr = alloc::alloc(layout);
if ptr.is_null() {
alloc::handle_alloc_error(layout);
}
IoBufferPtr(ptr)
};
IoBufferMut {
ptr,
capacity,
len: 0,
align,
}
}
/// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
pub fn with_capacity_aligned_zeroed(capacity: usize, align: usize) -> Self {
use bytes::BufMut;
let mut buf = Self::with_capacity_aligned(capacity, align);
buf.put_bytes(0, capacity);
buf.len = capacity;
buf
}
/// Returns the total number of bytes the buffer can hold.
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
/// Returns the alignment of the buffer.
#[inline]
pub fn align(&self) -> usize {
self.align
}
/// Returns the number of bytes in the buffer, also referred to as its 'length'.
#[inline]
pub fn len(&self) -> usize {
self.len
}
/// Force the length of the buffer to `new_len`.
#[inline]
unsafe fn set_len(&mut self, new_len: usize) {
debug_assert!(new_len <= self.capacity());
self.len = new_len;
}
#[inline]
fn as_ptr(&self) -> *const u8 {
self.ptr.0
}
#[inline]
fn as_mut_ptr(&mut self) -> *mut u8 {
self.ptr.0
}
/// Extracts a slice containing the entire buffer.
///
/// Equivalent to `&s[..]`.
#[inline]
fn as_slice(&self) -> &[u8] {
// SAFETY: The pointer is valid and `len` bytes are initialized.
unsafe { slice::from_raw_parts(self.as_ptr(), self.len) }
}
/// Extracts a mutable slice of the entire buffer.
///
/// Equivalent to `&mut s[..]`.
fn as_mut_slice(&mut self) -> &mut [u8] {
// SAFETY: The pointer is valid and `len` bytes are initialized.
unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
}
/// Drops the all the contents of the buffer, setting its length to `0`.
#[inline]
pub fn clear(&mut self) {
self.len = 0;
}
/// Reserves capacity for at least `additional` more bytes to be inserted
/// in the given `IoBufferMut`. The collection may reserve more space to
/// speculatively avoid frequent reallocations. After calling `reserve`,
/// capacity will be greater than or equal to `self.len() + additional`.
/// Does nothing if capacity is already sufficient.
///
/// # Panics
///
/// Panics if the new capacity exceeds `isize::MAX` _bytes_.
pub fn reserve(&mut self, additional: usize) {
if additional > self.capacity() - self.len() {
self.reserve_inner(additional);
}
}
fn reserve_inner(&mut self, additional: usize) {
let Some(required_cap) = self.len().checked_add(additional) else {
capacity_overflow()
};
let old_capacity = self.capacity();
let align = self.align();
// This guarantees exponential growth. The doubling cannot overflow
// because `cap <= isize::MAX` and the type of `cap` is `usize`.
let cap = cmp::max(old_capacity * 2, required_cap);
if !is_valid_alloc(cap) {
capacity_overflow()
}
let new_layout = Layout::from_size_align(cap, self.align()).expect("Invalid layout");
let old_ptr = self.as_mut_ptr();
// SAFETY: old allocation was allocated with std::alloc::alloc with the same layout,
// and we panics on null pointer.
let (ptr, cap) = unsafe {
let old_layout = Layout::from_size_align_unchecked(old_capacity, align);
let ptr = alloc::realloc(old_ptr, old_layout, new_layout.size());
if ptr.is_null() {
alloc::handle_alloc_error(new_layout);
}
(IoBufferPtr(ptr), cap)
};
self.ptr = ptr;
self.capacity = cap;
}
/// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
pub fn leak<'a>(self) -> &'a mut [u8] {
let mut buf = ManuallyDrop::new(self);
// SAFETY: leaking the buffer as intended.
unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len) }
}
}
fn capacity_overflow() -> ! {
panic!("capacity overflow")
}
// We need to guarantee the following:
// * We don't ever allocate `> isize::MAX` byte-size objects.
// * We don't overflow `usize::MAX` and actually allocate too little.
//
// On 64-bit we just need to check for overflow since trying to allocate
// `> isize::MAX` bytes will surely fail. On 32-bit and 16-bit we need to add
// an extra guard for this in case we're running on a platform which can use
// all 4GB in user-space, e.g., PAE or x32.
#[inline]
fn is_valid_alloc(alloc_size: usize) -> bool {
!(usize::BITS < 64 && alloc_size > isize::MAX as usize)
}
impl Drop for IoBufferMut {
fn drop(&mut self) {
// SAFETY: memory was allocated with std::alloc::alloc with the same layout.
unsafe {
alloc::dealloc(
self.as_mut_ptr(),
Layout::from_size_align_unchecked(self.capacity, self.align),
)
}
}
}
impl Deref for IoBufferMut {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
impl DerefMut for IoBufferMut {
fn deref_mut(&mut self) -> &mut Self::Target {
self.as_mut_slice()
}
}
/// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
unsafe impl bytes::BufMut for IoBufferMut {
#[inline]
fn remaining_mut(&self) -> usize {
// Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
// Thus, it can have at most `self.capacity` bytes.
self.capacity() - self.len()
}
// SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
#[inline]
unsafe fn advance_mut(&mut self, cnt: usize) {
let len: usize = self.len();
let remaining = self.remaining_mut();
if remaining < cnt {
panic_advance(cnt, remaining);
}
// Addition will not overflow since the sum is at most the capacity.
self.set_len(len + cnt);
}
#[inline]
fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
let cap = self.capacity();
let len = self.len();
// SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
// valid for `cap - len` bytes. The subtraction will not underflow since
// `len <= cap`.
unsafe { UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len) }
}
}
/// Panic with a nice error message.
#[cold]
fn panic_advance(idx: usize, len: usize) -> ! {
panic!(
"advance out of bounds: the len is {} but advancing by {}",
len, idx
);
}
/// Safety: [`IoBufferMut`] has exclusive ownership of the io buffer,
/// and the location remains stable even if [`Self`] is moved.
unsafe impl tokio_epoll_uring::IoBuf for IoBufferMut {
fn stable_ptr(&self) -> *const u8 {
self.as_ptr()
}
fn bytes_init(&self) -> usize {
self.len()
}
fn bytes_total(&self) -> usize {
self.capacity()
}
}
// SAFETY: See above.
unsafe impl tokio_epoll_uring::IoBufMut for IoBufferMut {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.as_mut_ptr()
}
unsafe fn set_init(&mut self, init_len: usize) {
if self.len() < init_len {
self.set_len(init_len);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_with_capacity_aligned() {
const ALIGN: usize = 4 * 1024;
let v = IoBufferMut::with_capacity_aligned(ALIGN * 4, ALIGN);
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
let v = IoBufferMut::with_capacity_aligned(ALIGN / 2, ALIGN);
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN / 2);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
#[test]
fn test_with_capacity_aligned_zeroed() {
const ALIGN: usize = 4 * 1024;
let v = IoBufferMut::with_capacity_aligned_zeroed(ALIGN, ALIGN);
assert_eq!(v.len(), ALIGN);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
assert_eq!(&v[..], &[0; ALIGN])
}
#[test]
fn test_reserve() {
use bytes::BufMut;
const ALIGN: usize = 4 * 1024;
let mut v = IoBufferMut::with_capacity_aligned(ALIGN, ALIGN);
let capacity = v.capacity();
v.reserve(capacity);
assert_eq!(v.capacity(), capacity);
let data = [b'a'; ALIGN];
v.put(&data[..]);
v.reserve(capacity);
assert!(v.capacity() >= capacity * 2);
assert_eq!(&v[..], &data[..]);
let capacity = v.capacity();
v.clear();
v.reserve(capacity);
assert_eq!(capacity, v.capacity());
}
#[test]
fn test_bytes_put() {
use bytes::BufMut;
const ALIGN: usize = 4 * 1024;
let mut v = IoBufferMut::with_capacity_aligned(ALIGN * 4, ALIGN);
let x = [b'a'; ALIGN];
for _ in 0..2 {
for _ in 0..4 {
v.put(&x[..]);
}
assert_eq!(v.len(), ALIGN * 4);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
v.clear()
}
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN * 4);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
#[test]
#[should_panic]
fn test_bytes_put_panic() {
use bytes::BufMut;
const ALIGN: usize = 4 * 1024;
let mut v = IoBufferMut::with_capacity_aligned(ALIGN * 4, ALIGN);
let x = [b'a'; ALIGN];
for _ in 0..5 {
v.put_slice(&x[..]);
}
}
#[test]
fn test_io_buf_put_slice() {
use tokio_epoll_uring::BoundedBufMut;
const ALIGN: usize = 4 * 1024;
let mut v = IoBufferMut::with_capacity_aligned(ALIGN, ALIGN);
let x = [b'a'; ALIGN];
for _ in 0..2 {
v.put_slice(&x[..]);
assert_eq!(v.len(), ALIGN);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
v.clear()
}
assert_eq!(v.len(), 0);
assert_eq!(v.capacity(), ALIGN);
assert_eq!(v.align(), ALIGN);
assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
}
}

View File

@@ -1,9 +0,0 @@
#![allow(unused)]
use tokio_epoll_uring::IoBufMut;
use crate::virtual_file::dio::IoBufferMut;
pub(crate) trait IoBufAlignedMut: IoBufMut {}
impl IoBufAlignedMut for IoBufferMut {}

View File

@@ -1,6 +1,5 @@
//! See [`FullSlice`].
use crate::virtual_file::dio::IoBufferMut;
use bytes::{Bytes, BytesMut};
use std::ops::{Deref, Range};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
@@ -77,4 +76,3 @@ macro_rules! impl_io_buf_ext {
impl_io_buf_ext!(Bytes);
impl_io_buf_ext!(BytesMut);
impl_io_buf_ext!(Vec<u8>);
impl_io_buf_ext!(IoBufferMut);

View File

@@ -1,8 +1,6 @@
# neon extension
comment = 'cloud storage for PostgreSQL'
# TODO: bump default version to 1.5, after we are certain that we don't
# need to rollback the compute image
default_version = '1.4'
default_version = '1.5'
module_pathname = '$libdir/neon'
relocatable = true
trusted = true

View File

@@ -274,7 +274,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
rate_limit_ip_subnet: 64,
ip_allowlist_check_enabled: true,
},
require_client_ip: false,
proxy_protocol_v2: config::ProxyProtocolV2::Rejected,
handshake_timeout: Duration::from_secs(10),
region: "local".into(),
wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?,

View File

@@ -17,6 +17,7 @@ use proxy::config::AuthenticationConfig;
use proxy::config::CacheOptions;
use proxy::config::HttpConfig;
use proxy::config::ProjectInfoCacheOptions;
use proxy::config::ProxyProtocolV2;
use proxy::console;
use proxy::context::parquet::ParquetUploadArgs;
use proxy::http;
@@ -144,9 +145,6 @@ struct ProxyCliArgs {
/// size of the threadpool for password hashing
#[clap(long, default_value_t = 4)]
scram_thread_pool_size: u8,
/// Require that all incoming requests have a Proxy Protocol V2 packet **and** have an IP address associated.
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
require_client_ip: bool,
/// Disable dynamic rate limiter and store the metrics to ensure its production behaviour.
#[clap(long, default_value_t = true, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
disable_dynamic_rate_limiter: bool,
@@ -229,6 +227,11 @@ struct ProxyCliArgs {
/// Configure if this is a private access proxy for the POC: In that case the proxy will ignore the IP allowlist
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
is_private_access_proxy: bool,
/// Configure whether all incoming requests have a Proxy Protocol V2 packet.
// TODO(conradludgate): switch default to rejected or required once we've updated all deployments
#[clap(value_enum, long, default_value_t = ProxyProtocolV2::Supported)]
proxy_protocol_v2: ProxyProtocolV2,
}
#[derive(clap::Args, Clone, Copy, Debug)]
@@ -704,7 +707,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
authentication_config,
require_client_ip: args.require_client_ip,
proxy_protocol_v2: args.proxy_protocol_v2,
handshake_timeout: args.handshake_timeout,
region: args.region.clone(),
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,

View File

@@ -7,6 +7,7 @@ use crate::{
Host,
};
use anyhow::{bail, ensure, Context, Ok};
use clap::ValueEnum;
use itertools::Itertools;
use remote_storage::RemoteStorageConfig;
use rustls::{
@@ -30,7 +31,7 @@ pub struct ProxyConfig {
pub allow_self_signed_compute: bool,
pub http_config: HttpConfig,
pub authentication_config: AuthenticationConfig,
pub require_client_ip: bool,
pub proxy_protocol_v2: ProxyProtocolV2,
pub region: String,
pub handshake_timeout: Duration,
pub wake_compute_retry_config: RetryConfig,
@@ -38,6 +39,16 @@ pub struct ProxyConfig {
pub connect_to_compute_retry_config: RetryConfig,
}
#[derive(Copy, Clone, Debug, ValueEnum, PartialEq)]
pub enum ProxyProtocolV2 {
/// Connection will error if PROXY protocol v2 header is missing
Required,
/// Connection will parse PROXY protocol v2 header, but accept the connection if it's missing.
Supported,
/// Connection will error if PROXY protocol v2 header is provided
Rejected,
}
#[derive(Debug)]
pub struct MetricCollectionConfig {
pub endpoint: reqwest::Url,

View File

@@ -10,6 +10,7 @@ pub(crate) mod wake_compute;
pub use copy_bidirectional::copy_bidirectional_client_compute;
pub use copy_bidirectional::ErrorSource;
use crate::config::ProxyProtocolV2;
use crate::{
auth,
cancellation::{self, CancellationHandlerMain, CancellationHandlerMainInternal},
@@ -93,15 +94,19 @@ pub async fn task_main(
connections.spawn(async move {
let (socket, peer_addr) = match read_proxy_protocol(socket).await {
Ok((socket, Some(addr))) => (socket, addr.ip()),
Err(e) => {
error!("per-client task finished with an error: {e:#}");
return;
}
Ok((_socket, None)) if config.require_client_ip => {
error!("missing required client IP");
Ok((_socket, None)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
error!("missing required proxy protocol header");
return;
}
Ok((_socket, Some(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
error!("proxy protocol header not supported");
return;
}
Ok((socket, Some(addr))) => (socket, addr.ip()),
Ok((socket, None)) => (socket, peer_addr.ip()),
};

View File

@@ -1,13 +1,15 @@
//! This module implements Timeline lifecycle management and has all necessary code
//! to glue together SafeKeeper and all other background services.
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail, Context, Result};
use camino::Utf8PathBuf;
use remote_storage::RemotePath;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use tokio::fs::{self};
use tokio::fs::{self, File};
use tokio::io::AsyncWriteExt;
use tokio_util::sync::CancellationToken;
use utils::crashsafe::{durable_rename, fsync_async_opt};
use utils::id::TenantId;
use std::cmp::max;
@@ -27,6 +29,8 @@ use utils::{
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::control_file::{ CONTROL_FILE_NAME};
use crate::pull_timeline::create_temp_timeline_dir;
use crate::rate_limit::RateLimiter;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{
@@ -614,23 +618,49 @@ impl Timeline {
}
}
// Create timeline directory.
fs::create_dir_all(&self.timeline_dir).await?;
// Create a temporary timeline directory
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, self.ttid).await?;
// Init the control file
let init_control_file = async {
let guard = shared_state.sk.state_mut();
let path = tli_dir_path.join(CONTROL_FILE_NAME);
let buf = guard.write_to_buf()?;
let mut control_file = File::create(&path)
.await
.with_context(|| format!("failed to create init control file at: {}", path))?;
control_file.write_all(&buf).await.with_context(|| {
format!(
"failed to write safekeeper state into control file at: {}",
path
)
})?;
control_file.flush().await.with_context(|| {
format!(
"failed to flush safekeeper state into control file at: {}",
path
)
})?;
drop(control_file);
Ok(())
};
// Write timeline to disk and start background tasks.
if let Err(e) = shared_state.sk.state_mut().flush().await {
if let Err(e) = init_control_file.await {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state);
if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
warn!(
"failed to remove timeline {} directory after bootstrap failure: {}",
self.ttid, fs_err
);
}
return Err(e);
}
info!(
"moving timeline {} from {} to {}",
self.ttid, tli_dir_path, self.timeline_dir
);
tokio::fs::create_dir_all(&self.timeline_dir).await?;
// fsync tenant dir creation
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
durable_rename(&tli_dir_path, &self.timeline_dir, !conf.no_sync).await?;
self.bootstrap(conf, broker_active_set, partial_backup_rate_limiter);
Ok(())
}

View File

@@ -401,7 +401,6 @@ class NeonEnvBuilder:
safekeeper_extra_opts: Optional[list[str]] = None,
storage_controller_port_override: Optional[int] = None,
pageserver_io_buffer_alignment: Optional[int] = None,
pageserver_virtual_file_io_mode: Optional[str] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -456,7 +455,6 @@ class NeonEnvBuilder:
self.storage_controller_port_override = storage_controller_port_override
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
self.pageserver_virtual_file_io_mode = pageserver_virtual_file_io_mode
assert test_name.startswith(
"test_"
@@ -952,9 +950,6 @@ class NeonEnv:
safekeepers - An array containing objects representing the safekeepers
pg_bin - pg_bin.run() can be used to execute Postgres client binaries,
like psql or pg_dump
initial_tenant - tenant ID of the initial tenant created in the repository
neon_cli - can be used to run the 'neon' CLI tool
@@ -1030,7 +1025,6 @@ class NeonEnv:
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
self.pageserver_aux_file_policy = config.pageserver_aux_file_policy
self.pageserver_io_buffer_alignment = config.pageserver_io_buffer_alignment
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
# Create the neon_local's `NeonLocalInitConf`
cfg: Dict[str, Any] = {
@@ -1094,10 +1088,7 @@ class NeonEnv:
for key, value in override.items():
ps_cfg[key] = value
if self.pageserver_io_buffer_alignment is not None:
ps_cfg["io_buffer_alignment"] = self.pageserver_io_buffer_alignment
if self.pageserver_virtual_file_io_mode is not None:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
ps_cfg["io_buffer_alignment"] = self.pageserver_io_buffer_alignment
# Create a corresponding NeonPageserver object
self.pageservers.append(
@@ -1336,7 +1327,6 @@ def neon_simple_env(
pageserver_aux_file_policy: Optional[AuxFileStore],
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
pageserver_io_buffer_alignment: Optional[int],
pageserver_virtual_file_io_mode: Optional[str],
) -> Iterator[NeonEnv]:
"""
Simple Neon environment, with no authentication and no safekeepers.
@@ -1363,7 +1353,6 @@ def neon_simple_env(
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
) as builder:
env = builder.init_start()
@@ -1388,7 +1377,6 @@ def neon_env_builder(
pageserver_aux_file_policy: Optional[AuxFileStore],
record_property: Callable[[str, object], None],
pageserver_io_buffer_alignment: Optional[int],
pageserver_virtual_file_io_mode: Optional[str],
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1424,7 +1412,6 @@ def neon_env_builder(
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
) as builder:
yield builder
# Propogate `preserve_database_files` to make it possible to use in other fixtures,
@@ -3310,6 +3297,8 @@ class PgBin:
@pytest.fixture(scope="function")
def pg_bin(test_output_dir: Path, pg_distrib_dir: Path, pg_version: PgVersion) -> PgBin:
"""pg_bin.run() can be used to execute Postgres client binaries, like psql or pg_dump"""
return PgBin(test_output_dir, pg_distrib_dir, pg_version)

View File

@@ -39,11 +39,6 @@ def pageserver_io_buffer_alignment() -> Optional[int]:
return None
@pytest.fixture(scope="function", autouse=True)
def pageserver_virtual_file_io_mode() -> Optional[str]:
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_MODE")
@pytest.fixture(scope="function", autouse=True)
def pageserver_aux_file_policy() -> Optional[AuxFileStore]:
return None

View File

@@ -53,7 +53,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
env = neon_simple_env
pageserver_http_client = env.pageserver.http_client()
tenant, _ = env.neon_cli.create_tenant(
tenant, timeline_main = env.neon_cli.create_tenant(
conf={
# disable background GC
"gc_period": "0s",
@@ -70,8 +70,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
}
)
timeline_main = env.neon_cli.create_timeline("test_main", tenant_id=tenant)
endpoint_main = env.endpoints.create_start("test_main", tenant_id=tenant)
endpoint_main = env.endpoints.create_start("main", tenant_id=tenant)
main_cur = endpoint_main.connect().cursor()
@@ -92,7 +91,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
pageserver_http_client.timeline_gc(tenant, timeline_main, lsn2 - lsn1 + 1024)
env.neon_cli.create_branch(
"test_branch", "test_main", tenant_id=tenant, ancestor_start_lsn=lsn1
"test_branch", ancestor_branch_name="main", ancestor_start_lsn=lsn1, tenant_id=tenant
)
endpoint_branch = env.endpoints.create_start("test_branch", tenant_id=tenant)

View File

@@ -252,7 +252,7 @@ def test_forward_compatibility(
# not using env.pageserver.version because it was initialized before
prev_pageserver_version_str = env.get_binary_version("pageserver")
prev_pageserver_version_match = re.search(
"Neon page server git-env:(.*) failpoints: (.*), features: (.*)",
"Neon page server git(?:-env)?:(.*) failpoints: (.*), features: (.*)",
prev_pageserver_version_str,
)
if prev_pageserver_version_match is not None:
@@ -263,12 +263,12 @@ def test_forward_compatibility(
)
# does not include logs from previous runs
assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version)
assert not env.pageserver.log_contains(f"git(-env)?:{prev_pageserver_version}")
env.start()
# ensure the specified pageserver is running
assert env.pageserver.log_contains("git-env:" + prev_pageserver_version)
assert env.pageserver.log_contains(f"git(-env)?:{prev_pageserver_version}")
check_neon_works(
env,

View File

@@ -31,9 +31,7 @@ def helper_compare_timeline_list(
)
)
timelines_cli = env.neon_cli.list_timelines()
assert timelines_cli == env.neon_cli.list_timelines(initial_tenant)
timelines_cli = env.neon_cli.list_timelines(initial_tenant)
cli_timeline_ids = sorted([timeline_id for (_, timeline_id) in timelines_cli])
assert timelines_api == cli_timeline_ids

View File

@@ -24,7 +24,7 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
# IMPORTANT:
# If the version has changed, the test should be updated.
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.4",)
assert cur.fetchone() == ("1.5",)
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
res = cur.fetchall()
log.info(res)
@@ -48,7 +48,7 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
# IMPORTANT:
# If the version has changed, the test should be updated.
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.4",)
assert cur.fetchone() == ("1.5",)
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
all_versions = ["1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
current_version = "1.5"

View File

@@ -174,8 +174,7 @@ def test_pageserver_chaos(
"checkpoint_distance": "5000000",
}
)
env.neon_cli.create_timeline("test_pageserver_chaos", tenant_id=tenant)
endpoint = env.endpoints.create_start("test_pageserver_chaos", tenant_id=tenant)
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer

View File

@@ -27,20 +27,15 @@ def test_empty_tenant_size(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_configs()
env.start()
(tenant_id, _) = env.neon_cli.create_tenant()
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
http_client = env.pageserver.http_client()
initial_size = http_client.tenant_size(tenant_id)
# we should never have zero, because there should be the initdb "changes"
assert initial_size > 0, "initial implementation returns ~initdb tenant_size"
main_branch_name = "main"
branch_name, main_timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
assert branch_name == main_branch_name
endpoint = env.endpoints.create_start(
main_branch_name,
"main",
tenant_id=tenant_id,
config_lines=["autovacuum=off", "checkpoint_timeout=10min"],
)
@@ -54,7 +49,7 @@ def test_empty_tenant_size(neon_env_builder: NeonEnvBuilder):
# The transaction above will make the compute generate a checkpoint.
# In turn, the pageserver persists the checkpoint. This should only be
# one key with a size of a couple hundred bytes.
wait_for_last_flush_lsn(env, endpoint, tenant_id, main_timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
size = http_client.tenant_size(tenant_id)
assert size >= initial_size and size - initial_size < 1024
@@ -306,7 +301,8 @@ def test_single_branch_get_tenant_size_grows(
env = neon_env_builder.init_start(initial_tenant_conf=tenant_config)
tenant_id = env.initial_tenant
branch_name, timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
timeline_id = env.initial_timeline
branch_name = "main"
http_client = env.pageserver.http_client()
@@ -516,7 +512,8 @@ def test_get_tenant_size_with_multiple_branches(
env.pageserver.allowed_errors.append(".*InternalServerError\\(No such file or directory.*")
tenant_id = env.initial_tenant
main_branch_name, main_timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
main_timeline_id = env.initial_timeline
main_branch_name = "main"
http_client = env.pageserver.http_client()

View File

@@ -71,10 +71,9 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder):
"checkpoint_distance": "5000000",
}
)
env.neon_cli.create_timeline("test_tenants_many", tenant_id=tenant)
endpoint = env.endpoints.create_start(
"test_tenants_many",
"main",
tenant_id=tenant,
)
tenants_endpoints.append((tenant, endpoint))

View File

@@ -638,7 +638,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
wait_until(50, 0.1, first_request_finished)
# check that the timeline is gone
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=2)
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=10)
def test_timeline_delete_works_for_remote_smoke(

View File

@@ -26,8 +26,7 @@ def test_truncate(neon_env_builder: NeonEnvBuilder, zenbenchmark):
}
)
env.neon_cli.create_timeline("test_truncate", tenant_id=tenant)
endpoint = env.endpoints.create_start("test_truncate", tenant_id=tenant)
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
cur = endpoint.connect().cursor()
cur.execute("create table t1(x integer)")
cur.execute(f"insert into t1 values (generate_series(1,{n_records}))")