Compare commits

..

8 Commits

Author SHA1 Message Date
Arseny Sher
bce8d4b3f5 Stop postgres in immediate mode in VM image.
PR #6712 might add a delay in graceful shutdown without change to compute -> sk
protocol to force sync of control file on safekeeper to persist commit_lsn, so
use 'immediate' mode instead; 'fast' one isn't useful for neon anyway.
2024-02-14 19:50:04 +03:00
John Spray
840abe3954 pageserver: store aux files as deltas (#6742)
## Problem

Aux files were stored with an O(N^2) cost, since on each modification
the entire map is re-written as a page image.

This addresses one axis of the inefficiency in logical replication's use
of storage (https://github.com/neondatabase/neon/issues/6626). It will
still be writing a large amount of duplicative data if writing the same
slot's state every 15 seconds, but the impact will be O(N) instead of
O(N^2).

## Summary of changes

- Introduce `NeonWalRecord::AuxFile`
- In `DatadirModification`, if the AUX_FILES_KEY has already been set,
then write a delta instead of an image
2024-02-14 15:01:16 +00:00
Christian Schwarz
774a6e7475 refactor(virtual_file) make write_all_at take owned buffers (#6673)
context: https://github.com/neondatabase/neon/issues/6663

Building atop #6664, this PR switches `write_all_at` to take owned
buffers.

The main challenge here is the `EphemeralFile::mutable_tail`, for which
I'm picking the ugly solution of an `Option` that is `None` while the IO
is in flight.

After this, we will be able to switch `write_at` to take owned buffers
and call tokio-epoll-uring's `write` function with that owned buffer.
That'll be done in #6378.
2024-02-14 15:59:06 +01:00
Christian Schwarz
df5d588f63 refactor(VirtualFile::crashsafe_overwrite): avoid Handle::block_on in callers (#6731)
Some callers of `VirtualFile::crashsafe_overwrite` call it on the
executor thread, thereby potentially stalling it.

Others are more diligent and wrap it in `spawn_blocking(...,
Handle::block_on, ... )` to avoid stalling the executor thread.

However, because `crashsafe_overwrite` uses
VirtualFile::open_with_options internally, we spawn a new thread-local
`tokio-epoll-uring::System` in the blocking pool thread that's used for
the `spawn_blocking` call.

This PR refactors the situation such that we do the `spawn_blocking`
inside `VirtualFile::crashsafe_overwrite`. This unifies the situation
for the better:

1. Callers who didn't wrap in `spawn_blocking(..., Handle::block_on,
...)` before no longer stall the executor.
2. Callers who did it before now can avoid the `block_on`, resolving the
problem with the short-lived `tokio-epoll-uring::System`s in the
blocking pool threads.

A future PR will build on top of this and divert to tokio-epoll-uring if
it's configures as the IO engine.

Changes
-------

- Convert implementation to std::fs and move it into `crashsafe.rs`
- Yes, I know, Safekeepers (cc @arssher ) added `durable_rename` and
`fsync_async_opt` recently. However, `crashsafe_overwrite` is different
in the sense that it's higher level, i.e., it's more like
`std::fs::write` and the Safekeeper team's code is more building block
style.
- The consequence is that we don't use the VirtualFile file descriptor
cache anymore.
- I don't think it's a big deal because we have plenty of slack wrt
production file descriptor limit rlimit (see [this
dashboard](https://neonprod.grafana.net/d/e4a40325-9acf-4aa0-8fd9-f6322b3f30bd/pageserver-open-file-descriptors?orgId=1))

- Use `tokio::task::spawn_blocking` in
`VirtualFile::crashsafe_overwrite` to call the new
`crashsafe::overwrite` API.
- Inspect all callers to remove any double-`spawn_blocking`
- spawn_blocking requires the captures data to be 'static + Send. So,
refactor the callers. We'll need this for future tokio-epoll-uring
support anyway, because tokio-epoll-uring requires owned buffers.

Related Issues
--------------

- overall epic to enable write path to tokio-epoll-uring: #6663
- this is also kind of relevant to the tokio-epoll-uring System creation
failures that we encountered in staging, investigation being tracked in
#6667
- why is it relevant? Because this PR removes two uses of
`spawn_blocking+Handle::block_on`
2024-02-14 14:22:41 +00:00
John Spray
f39b0fce9b Revert #6666 "tests: try to make restored-datadir comparison tests not flaky" (#6751)
The #6666  change appears to have made the test fail more often.

PR https://github.com/neondatabase/neon/pull/6712 should re-instate this
change, along with its change to make the overall flow more reliable.

This reverts commit 568f91420a.
2024-02-14 10:57:01 +00:00
Conrad Ludgate
a9ec4eb4fc hold cancel session (#6750)
## Problem

In a recent refactor, we accidentally dropped the cancel session early

## Summary of changes

Hold the cancel session during proxy passthrough
2024-02-14 10:26:32 +00:00
Heikki Linnakangas
a97b54e3b9 Cherry-pick Postgres bugfix to 'mmap' DSM implementation
Cherry-pick Upstream commit fbf9a7ac4d to neon stable branches. We'll
get it in the next PostgreSQL minor release anyway, but we need it
now, if we want to start using the 'mmap' implementation.

See https://github.com/neondatabase/autoscaling/issues/800 for the
plans on doing that.
2024-02-14 11:37:52 +02:00
Heikki Linnakangas
a5114a99b2 Create a symlink from pg_dynshmem to /dev/shm
See included comment and issue
https://github.com/neondatabase/autoscaling/issues/800 for details.

This has no effect, unless you set "dynamic_shared_memory_type = mmap"
in postgresql.conf.
2024-02-14 11:37:52 +02:00
22 changed files with 475 additions and 235 deletions

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::env;
use std::fs;
use std::io::BufRead;
use std::os::unix::fs::PermissionsExt;
use std::os::unix::fs::{symlink, PermissionsExt};
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
@@ -634,6 +634,48 @@ impl ComputeNode {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
// Place pg_dynshmem under /dev/shm. This allows us to use
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
// /dev/shm, similar to how 'dynamic_shared_memory_type = posix' works.
//
// Why on earth don't we just stick to the 'posix' default, you might
// ask. It turns out that making large allocations with 'posix' doesn't
// work very well with autoscaling. The behavior we want is that:
//
// 1. You can make large DSM allocations, larger than the current RAM
// size of the VM, without errors
//
// 2. If the allocated memory is really used, the VM is scaled up
// automatically to accommodate that
//
// We try to make that possible by having swap in the VM. But with the
// default 'posix' DSM implementation, we fail step 1, even when there's
// plenty of swap available. PostgreSQL uses posix_fallocate() to create
// the shmem segment, which is really just a file in /dev/shm in Linux,
// but posix_fallocate() on tmpfs returns ENOMEM if the size is larger
// than available RAM.
//
// Using 'dynamic_shared_memory_type = mmap' works around that, because
// the Postgres 'mmap' DSM implementation doesn't use
// posix_fallocate(). Instead, it uses repeated calls to write(2) to
// fill the file with zeros. It's weird that that differs between
// 'posix' and 'mmap', but we take advantage of it. When the file is
// filled slowly with write(2), the kernel allows it to grow larger, as
// long as there's swap available.
//
// In short, using 'dynamic_shared_memory_type = mmap' allows us one DSM
// segment to be larger than currently available RAM. But because we
// don't want to store it on a real file, which the kernel would try to
// flush to disk, so symlink pg_dynshm to /dev/shm.
//
// We don't set 'dynamic_shared_memory_type = mmap' here, we let the
// control plane control that option. If 'mmap' is not used, this
// symlink doesn't affect anything.
//
// See https://github.com/neondatabase/autoscaling/issues/800
std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?;
symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Replica | ComputeMode::Static(..) => {

View File

@@ -1,7 +1,7 @@
use std::{
borrow::Cow,
fs::{self, File},
io,
io::{self, Write},
};
use camino::{Utf8Path, Utf8PathBuf};
@@ -161,6 +161,48 @@ pub async fn durable_rename(
Ok(())
}
/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`].
///
/// The file is first written to the specified `tmp_path`, and in a second
/// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync
/// and atomic rename guarantee that, if we crash at any point, there will never
/// be a partially written file at `final_path` (but maybe at `tmp_path`).
///
/// Callers are responsible for serializing calls of this function for a given `final_path`.
/// If they don't, there may be an error due to conflicting `tmp_path`, or there will
/// be no error and the content of `final_path` will be the "winner" caller's `content`.
/// I.e., the atomticity guarantees still hold.
pub fn overwrite(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
};
std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?;
let mut file = std::fs::OpenOptions::new()
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true)
.open(tmp_path)?;
file.write_all(content)?;
file.sync_all()?;
drop(file); // don't keep the fd open for longer than we have to
std::fs::rename(tmp_path, final_path)?;
let final_parent_dirfd = std::fs::OpenOptions::new()
.read(true)
.open(final_path_parent)?;
final_parent_dirfd.sync_all()?;
Ok(())
}
#[cfg(test)]
mod tests {

View File

@@ -234,7 +234,7 @@ impl DeletionHeader {
let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
let header_path = conf.deletion_header_path();
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes)
VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
.await
.maybe_fatal_err("save deletion header")?;
@@ -325,7 +325,8 @@ impl DeletionList {
let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes)
VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
.await
.maybe_fatal_err("save deletion list")
.map_err(Into::into)

View File

@@ -156,6 +156,7 @@ impl Timeline {
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
pending_aux_files: None,
pending_directory_entries: Vec::new(),
lsn,
}
@@ -870,6 +871,14 @@ pub struct DatadirModification<'a> {
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,
// If we already wrote any aux file changes in this modification, stash the latest dir. If set,
// [`Self::put_file`] may assume that it is safe to emit a delta rather than checking
// if AUX_FILES_KEY is already set.
pending_aux_files: Option<AuxFilesDirectory>,
/// For special "directory" keys that store key-value maps, track the size of the map
/// if it was updated in this modification.
pending_directory_entries: Vec<(DirectoryKind, usize)>,
}
@@ -1384,31 +1393,76 @@ impl<'a> DatadirModification<'a> {
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
Ok(buf) => AuxFilesDirectory::des(&buf)?,
Err(e) => {
// This is expected: historical databases do not have the key.
debug!("Failed to get info about AUX files: {}", e);
AuxFilesDirectory {
files: HashMap::new(),
let file_path = path.to_string();
let content = if content.is_empty() {
None
} else {
Some(Bytes::copy_from_slice(content))
};
let dir = if let Some(mut dir) = self.pending_aux_files.take() {
// We already updated aux files in `self`: emit a delta and update our latest value
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
dir
} else {
// Check if the AUX_FILES_KEY is initialized
match self.get(AUX_FILES_KEY, ctx).await {
Ok(dir_bytes) => {
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
// Key is already set, we may append a delta
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
dir
}
Err(
e @ (PageReconstructError::AncestorStopping(_)
| PageReconstructError::Cancelled
| PageReconstructError::AncestorLsnTimeout(_)),
) => {
// Important that we do not interpret a shutdown error as "not found" and thereby
// reset the map.
return Err(e.into());
}
// FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
// we are assuming that all _other_ possible errors represents a missing key. If some
// other error occurs, we may incorrectly reset the map of aux files.
Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
// Key is missing, we must insert an image as the basis for subsequent deltas.
let mut dir = AuxFilesDirectory {
files: HashMap::new(),
};
dir.upsert(file_path, content);
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
dir
}
}
};
let path = path.to_string();
if content.is_empty() {
dir.files.remove(&path);
} else {
dir.files.insert(path, Bytes::copy_from_slice(content));
}
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, dir.files.len()));
self.pending_aux_files = Some(dir);
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
Ok(())
}
@@ -1618,8 +1672,18 @@ struct RelDirectory {
}
#[derive(Debug, Serialize, Deserialize, Default)]
struct AuxFilesDirectory {
files: HashMap<String, Bytes>,
pub(crate) struct AuxFilesDirectory {
pub(crate) files: HashMap<String, Bytes>,
}
impl AuxFilesDirectory {
pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
if let Some(value) = value {
self.files.insert(key, value);
} else {
self.files.remove(&key);
}
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -1655,8 +1719,60 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {
//use super::repo_harness::*;
//use super::*;
use hex_literal::hex;
use utils::id::TimelineId;
use super::*;
use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
/// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
#[tokio::test]
async fn aux_files_round_trip() -> anyhow::Result<()> {
let name = "aux_files_round_trip";
let harness = TenantHarness::create(name)?;
pub const TIMELINE_ID: TimelineId =
TimelineId::from_array(hex!("11223344556677881122334455667788"));
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = tline.raw_timeline().unwrap();
// First modification: insert two keys
let mut modification = tline.begin_modification(Lsn(0x1000));
modification.put_file("foo/bar1", b"content1", &ctx).await?;
modification.set_lsn(Lsn(0x1008))?;
modification.put_file("foo/bar2", b"content2", &ctx).await?;
modification.commit(&ctx).await?;
let expect_1008 = HashMap::from([
("foo/bar1".to_string(), Bytes::from_static(b"content1")),
("foo/bar2".to_string(), Bytes::from_static(b"content2")),
]);
let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
assert_eq!(readback, expect_1008);
// Second modification: update one key, remove the other
let mut modification = tline.begin_modification(Lsn(0x2000));
modification.put_file("foo/bar1", b"content3", &ctx).await?;
modification.set_lsn(Lsn(0x2008))?;
modification.put_file("foo/bar2", b"", &ctx).await?;
modification.commit(&ctx).await?;
let expect_2008 =
HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
assert_eq!(readback, expect_2008);
// Reading back in time works
let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
assert_eq!(readback, expect_1008);
Ok(())
}
/*
fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {

View File

@@ -28,7 +28,6 @@ use remote_storage::GenericRemoteStorage;
use std::fmt;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
use tokio::runtime::Handle;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -2878,17 +2877,10 @@ impl Tenant {
let tenant_shard_id = *tenant_shard_id;
let config_path = config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {config_path}")
})
})
})
.await??;
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?;
Ok(())
}
@@ -2915,17 +2907,12 @@ impl Tenant {
let tenant_shard_id = *tenant_shard_id;
let target_config_path = target_config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {target_config_path}")
})
})
})
.await??;
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {target_config_path}")
})?;
Ok(())
}
@@ -3914,6 +3901,7 @@ pub(crate) mod harness {
use utils::lsn::Lsn;
use crate::deletion_queue::mock::MockDeletionQueue;
use crate::walredo::apply_neon;
use crate::{
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
};
@@ -4173,20 +4161,34 @@ pub(crate) mod harness {
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
) -> anyhow::Result<Bytes> {
let s = format!(
"redo for {} to get to {}, with {} and {} records",
key,
lsn,
if base_img.is_some() {
"base image"
} else {
"no base image"
},
records.len()
);
println!("{s}");
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
Ok(TEST_IMG(&s))
if records_neon {
// For Neon wal records, we can decode without spawning postgres, so do so.
let base_img = base_img.expect("Neon WAL redo requires base image").1;
let mut page = BytesMut::new();
page.extend_from_slice(&base_img);
for (_record_lsn, record) in records {
apply_neon::apply_in_neon(&record, key, &mut page)?;
}
Ok(page.freeze())
} else {
// We never spawn a postgres walredo process in unit tests: just log what we might have done.
let s = format!(
"redo for {} to get to {}, with {} and {} records",
key,
lsn,
if base_img.is_some() {
"base image"
} else {
"no base image"
},
records.len()
);
println!("{s}");
Ok(TEST_IMG(&s))
}
}
}
}

View File

@@ -6,6 +6,7 @@ use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::{self, VirtualFile};
use bytes::BytesMut;
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use std::cmp::min;
@@ -26,7 +27,10 @@ pub struct EphemeralFile {
/// An ephemeral file is append-only.
/// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
/// The other pages, which can no longer be modified, are accessed through the page cache.
mutable_tail: [u8; PAGE_SZ],
///
/// None <=> IO is ongoing.
/// Size is fixed to PAGE_SZ at creation time and must not be changed.
mutable_tail: Option<BytesMut>,
}
impl EphemeralFile {
@@ -60,7 +64,7 @@ impl EphemeralFile {
_timeline_id: timeline_id,
file,
len: 0,
mutable_tail: [0u8; PAGE_SZ],
mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
})
}
@@ -103,7 +107,13 @@ impl EphemeralFile {
};
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
Ok(BlockLease::EphemeralFileMutableTail(
self.mutable_tail
.as_deref()
.expect("we're not doing IO, it must be Some()")
.try_into()
.expect("we ensure that it's always PAGE_SZ"),
))
}
}
@@ -135,21 +145,27 @@ impl EphemeralFile {
) -> Result<(), io::Error> {
let mut src_remaining = src;
while !src_remaining.is_empty() {
let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
let dst_remaining = &mut self
.ephemeral_file
.mutable_tail
.as_deref_mut()
.expect("IO is not yet ongoing")[self.off..];
let n = min(dst_remaining.len(), src_remaining.len());
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
self.off += n;
src_remaining = &src_remaining[n..];
if self.off == PAGE_SZ {
match self
let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
.expect("IO is not yet ongoing");
let (mutable_tail, res) = self
.ephemeral_file
.file
.write_all_at(
&self.ephemeral_file.mutable_tail,
self.blknum as u64 * PAGE_SZ as u64,
)
.await
{
.write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
.await;
// TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
// I.e., the IO isn't retryable if we panic.
self.ephemeral_file.mutable_tail = Some(mutable_tail);
match res {
Ok(_) => {
// Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it.
@@ -169,7 +185,12 @@ impl EphemeralFile {
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
buf.copy_from_slice(
self.ephemeral_file
.mutable_tail
.as_deref()
.expect("IO is not ongoing"),
);
let _ = write_guard.mark_valid();
// pre-warm successful
}
@@ -181,7 +202,11 @@ impl EphemeralFile {
// Zero the buffer for re-use.
// Zeroing is critical for correcntess because the write_blob code below
// and similarly read_blk expect zeroed pages.
self.ephemeral_file.mutable_tail.fill(0);
self.ephemeral_file
.mutable_tail
.as_deref_mut()
.expect("IO is not ongoing")
.fill(0);
// This block is done, move to next one.
self.blknum += 1;
self.off = 0;

View File

@@ -279,7 +279,7 @@ pub async fn save_metadata(
let path = conf.metadata_path(tenant_shard_id, timeline_id);
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes)
VirtualFile::crashsafe_overwrite(path, temp_path, metadata_bytes)
.await
.context("write metadata")?;
Ok(())

View File

@@ -484,14 +484,9 @@ impl<'a> TenantDownloader<'a> {
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
let heatmap_path_bg = heatmap_path.clone();
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
})
})
.await
.expect("Blocking task is never aborted")
.maybe_fatal_err(&context_msg)?;
VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
.await
.maybe_fatal_err(&context_msg)?;
tracing::debug!("Wrote local heatmap to {}", heatmap_path);

View File

@@ -19,14 +19,13 @@ use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use utils::fs_ext;
pub use pageserver_api::models::virtual_file as api;
pub(crate) mod io_engine;
@@ -404,47 +403,34 @@ impl VirtualFile {
Ok(vfile)
}
/// Writes a file to the specified `final_path` in a crash safe fasion
/// Async version of [`::utils::crashsafe::overwrite`].
///
/// The file is first written to the specified tmp_path, and in a second
/// step, the tmp path is renamed to the final path. As renames are
/// atomic, a crash during the write operation will never leave behind a
/// partially written file.
pub async fn crashsafe_overwrite<B: BoundedBuf>(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
/// # NB:
///
/// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
/// it did at an earlier time.
/// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
final_path: Utf8PathBuf,
tmp_path: Utf8PathBuf,
content: B,
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
};
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
let mut file = Self::open_with_options(
tmp_path,
OpenOptions::new()
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true),
)
.await?;
let (_content, res) = file.write_all(content).await;
res?;
file.sync_all().await?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
// VirtualFile., and it eventually does a blocking write lock instead of
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?;
final_parent_dirfd.sync_all().await?;
Ok(())
// TODO: use tokio_epoll_uring if configured as `io_engine`.
// See https://github.com/neondatabase/neon/issues/6663
tokio::task::spawn_blocking(move || {
let slice_storage;
let content_len = content.bytes_init();
let content = if content.bytes_init() > 0 {
slice_storage = Some(content.slice(0..content_len));
slice_storage.as_deref().expect("just set it to Some()")
} else {
&[]
};
utils::crashsafe::overwrite(&final_path, &tmp_path, content)
})
.await
.expect("blocking task is never aborted")
}
/// Call File::sync_all() on the underlying File.
@@ -582,24 +568,37 @@ impl VirtualFile {
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
pub async fn write_all_at<B: BoundedBuf>(
&self,
buf: B,
mut offset: u64,
) -> (B::Buf, Result<(), Error>) {
let buf_len = buf.bytes_init();
if buf_len == 0 {
return (Slice::into_inner(buf.slice_full()), Ok(()));
}
let mut buf = buf.slice(0..buf_len);
while !buf.is_empty() {
match self.write_at(buf, offset).await {
// TODO: push `buf` further down
match self.write_at(&buf, offset).await {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
return (
Slice::into_inner(buf),
Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
);
}
Ok(n) => {
buf = &buf[n..];
buf = buf.slice(n..);
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
Err(e) => return (Slice::into_inner(buf), Err(e)),
}
}
Ok(())
(Slice::into_inner(buf), Ok(()))
}
/// Writes `buf.slice(0..buf.bytes_init())`.
@@ -1064,10 +1063,19 @@ mod tests {
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
}
}
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
async fn write_all_at<B: BoundedBuf>(&self, buf: B, offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all_at(buf, offset).await;
res
}
MaybeVirtualFile::File(file) => {
let buf_len = buf.bytes_init();
if buf_len == 0 {
return Ok(());
}
file.write_all_at(&buf.slice(0..buf_len), offset)
}
}
}
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
@@ -1214,8 +1222,8 @@ mod tests {
.to_owned(),
)
.await?;
file_b.write_all_at(b"BAR", 3).await?;
file_b.write_all_at(b"FOO", 0).await?;
file_b.write_all_at(b"BAR".to_vec(), 3).await?;
file_b.write_all_at(b"FOO".to_vec(), 0).await?;
assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
@@ -1315,7 +1323,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
@@ -1324,7 +1332,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
@@ -1346,7 +1354,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();

View File

@@ -44,6 +44,11 @@ pub enum NeonWalRecord {
moff: MultiXactOffset,
members: Vec<MultiXactMember>,
},
/// Update the map of AUX files, either writing or dropping an entry
AuxFile {
file_path: String,
content: Option<Bytes>,
},
}
impl NeonWalRecord {

View File

@@ -22,7 +22,7 @@
mod process;
/// Code to apply [`NeonWalRecord`]s.
mod apply_neon;
pub(crate) mod apply_neon;
use crate::config::PageServerConf;
use crate::metrics::{

View File

@@ -1,7 +1,8 @@
use crate::pgdatadir_mapping::AuxFilesDirectory;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::BytesMut;
use bytes::{BufMut, BytesMut};
use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key};
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants;
@@ -12,6 +13,7 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
use tracing::*;
use utils::bin_ser::BeSer;
/// Can this request be served by neon redo functions
/// or we need to pass it to wal-redo postgres process?
@@ -230,6 +232,72 @@ pub(crate) fn apply_in_neon(
LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
}
}
NeonWalRecord::AuxFile { file_path, content } => {
let mut dir = AuxFilesDirectory::des(page)?;
dir.upsert(file_path.clone(), content.clone());
page.clear();
let mut writer = page.writer();
dir.ser_into(&mut writer)?;
}
}
Ok(())
}
#[cfg(test)]
mod test {
use bytes::Bytes;
use pageserver_api::key::AUX_FILES_KEY;
use super::*;
use std::collections::HashMap;
use crate::{pgdatadir_mapping::AuxFilesDirectory, walrecord::NeonWalRecord};
/// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
#[test]
fn apply_aux_file_deltas() -> anyhow::Result<()> {
let base_dir = AuxFilesDirectory {
files: HashMap::from([
("two".to_string(), Bytes::from_static(b"content0")),
("three".to_string(), Bytes::from_static(b"contentX")),
]),
};
let base_image = AuxFilesDirectory::ser(&base_dir)?;
let deltas = vec![
// Insert
NeonWalRecord::AuxFile {
file_path: "one".to_string(),
content: Some(Bytes::from_static(b"content1")),
},
// Update
NeonWalRecord::AuxFile {
file_path: "two".to_string(),
content: Some(Bytes::from_static(b"content99")),
},
// Delete
NeonWalRecord::AuxFile {
file_path: "three".to_string(),
content: None,
},
];
let file_path = AUX_FILES_KEY;
let mut page = BytesMut::from_iter(base_image);
for record in deltas {
apply_in_neon(&record, file_path, &mut page)?;
}
let reconstructed = AuxFilesDirectory::des(&page)?;
let expect = HashMap::from([
("one".to_string(), Bytes::from_static(b"content1")),
("two".to_string(), Bytes::from_static(b"content99")),
]);
assert_eq!(reconstructed.files, expect);
Ok(())
}
}

View File

@@ -234,9 +234,11 @@ async fn main() -> anyhow::Result<()> {
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(&config.endpoint_rps_limit));
let cancel_map = CancelMap::default();
let redis_publisher = match &args.redis_notifications {
Some(url) => Some(Arc::new(Mutex::new(
RedisPublisherClient::new(url, args.region.clone(), &config.redis_rps_limit).await?,
))),
Some(url) => Some(Arc::new(Mutex::new(RedisPublisherClient::new(
url,
args.region.clone(),
&config.redis_rps_limit,
)?))),
None => None,
};
let cancellation_handler = Arc::new(CancellationHandler::new(

View File

@@ -331,6 +331,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
compute: node,
req: _request_gauge,
conn: _client_gauge,
cancel: session,
}))
}

View File

@@ -1,4 +1,5 @@
use crate::{
cancellation,
compute::PostgresConnection,
console::messages::MetricsAuxInfo,
metrics::NUM_BYTES_PROXIED_COUNTER,
@@ -57,6 +58,7 @@ pub struct ProxyPassthrough<S> {
pub req: IntCounterPairGuard,
pub conn: IntCounterPairGuard,
pub cancel: cancellation::Session,
}
impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {

View File

@@ -1,12 +1,5 @@
use std::hash::BuildHasherDefault;
use lasso::{Spur, ThreadedRodeo};
use pq_proto::CancelKeyData;
use redis::{
streams::{StreamReadOptions, StreamReadReply},
AsyncCommands, FromRedisValue,
};
use rustc_hash::FxHasher;
use redis::AsyncCommands;
use uuid::Uuid;
use crate::rate_limiter::{RateBucketInfo, RedisRateLimiter};
@@ -21,74 +14,15 @@ pub struct RedisPublisherClient {
}
impl RedisPublisherClient {
pub async fn new(
pub fn new(
url: &str,
region_id: String,
info: &'static [RateBucketInfo],
) -> anyhow::Result<Self> {
let client = redis::Client::open(url)?;
let mut conn = client.get_async_connection().await.unwrap();
let len: u64 = conn.xlen("neon:endpoints:testing").await.unwrap();
tracing::info!("starting with {len} endpoints in redis");
for i in len..300000u64 {
let _key: String = conn
.xadd(
"neon:endpoints:testing",
"*",
&[("endpoint_id", format!("ep-hello-world-{i}"))],
)
.await
.unwrap();
if i.is_power_of_two() {
tracing::debug!("endpoints written {i}");
}
// client.
}
tracing::info!("written endpoints to redis");
// start reading
let s = ThreadedRodeo::<Spur, BuildHasherDefault<FxHasher>>::with_hasher(
BuildHasherDefault::default(),
);
let opts = StreamReadOptions::default().count(1000);
let mut id = "0-0".to_string();
loop {
let mut res: StreamReadReply = conn
.xread_options(&["neon:endpoints:testing"], &[&id], &opts)
.await
.unwrap();
if res.keys.is_empty() {
break;
}
assert_eq!(res.keys.len(), 1);
let res = res.keys.pop().unwrap();
assert_eq!(res.key, "neon:endpoints:testing");
if res.ids.is_empty() {
break;
}
for x in res.ids {
id = x.id;
if let Some(ep) = x.map.get("endpoint_id") {
let ep = String::from_redis_value(ep).unwrap();
s.get_or_intern(ep);
if s.len().is_power_of_two() {
tracing::debug!("endpoints read {}", s.len());
}
}
}
}
tracing::info!("read {} endpoints from redis", s.len());
Ok(Self {
client,
publisher: Some(conn),
publisher: None,
region_id,
limiter: RedisRateLimiter::new(info),
})

View File

@@ -3967,27 +3967,24 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]:
# pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint: Endpoint):
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
# Get the timeline ID. We need it for the 'basebackup' command
timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
# many tests already checkpoint, but do it just in case
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CHECKPOINT")
# wait for pageserver to catch up
wait_for_last_flush_lsn(env, endpoint, endpoint.tenant_id, timeline_id)
# stop postgres to ensure that files won't change
endpoint.stop()
# Read the shutdown checkpoint's LSN
pg_controldata_path = os.path.join(pg_bin.pg_bin_path, "pg_controldata")
cmd = f"{pg_controldata_path} -D {endpoint.pgdata_dir}"
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
checkpoint_lsn = re.findall(
"Latest checkpoint location:\\s+([0-9A-F]+/[0-9A-F]+)", result.stdout
)[0]
log.debug(f"last checkpoint at {checkpoint_lsn}")
# Take a basebackup from pageserver
restored_dir_path = env.repo_dir / f"{endpoint.endpoint_id}_restored_datadir"
restored_dir_path.mkdir(exist_ok=True)
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
pageserver_id = env.attachment_service.locate(endpoint.tenant_id)[0]["node_id"]
@@ -3995,7 +3992,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
{psql_path} \
--no-psqlrc \
postgres://localhost:{env.get_pageserver(pageserver_id).service_port.pg} \
-c 'basebackup {endpoint.tenant_id} {timeline_id} {checkpoint_lsn}' \
-c 'basebackup {endpoint.tenant_id} {timeline_id}' \
| tar -x -C {restored_dir_path}
"""

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "550cdd26d445afdd26b15aa93c8c2f3dc52f8361",
"postgres-v15": "6ee78a3c29e33cafd85ba09568b6b5eb031d29b9",
"postgres-v14": "018fb052011081dc2733d3118d12e5c36df6eba1"
"postgres-v16": "9c37a4988463a97d9cacb321acf3828b09823269",
"postgres-v15": "ca2def999368d9df098a637234ad5a9003189463",
"postgres-v14": "9dd9956c55ffbbd9abe77d10382453757fedfcf5"
}

View File

@@ -18,7 +18,7 @@ commands:
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter.yml'
shutdownHook: |
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m immediate --wait -t 10'
files:
- filename: pgbouncer.ini
content: |