mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-29 00:00:38 +00:00
Compare commits
8 Commits
proxy-test
...
vm-pg-stop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bce8d4b3f5 | ||
|
|
840abe3954 | ||
|
|
774a6e7475 | ||
|
|
df5d588f63 | ||
|
|
f39b0fce9b | ||
|
|
a9ec4eb4fc | ||
|
|
a97b54e3b9 | ||
|
|
a5114a99b2 |
@@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::io::BufRead;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::os::unix::fs::{symlink, PermissionsExt};
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
@@ -634,6 +634,48 @@ impl ComputeNode {
|
||||
// Update pg_hba.conf received with basebackup.
|
||||
update_pg_hba(pgdata_path)?;
|
||||
|
||||
// Place pg_dynshmem under /dev/shm. This allows us to use
|
||||
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
|
||||
// /dev/shm, similar to how 'dynamic_shared_memory_type = posix' works.
|
||||
//
|
||||
// Why on earth don't we just stick to the 'posix' default, you might
|
||||
// ask. It turns out that making large allocations with 'posix' doesn't
|
||||
// work very well with autoscaling. The behavior we want is that:
|
||||
//
|
||||
// 1. You can make large DSM allocations, larger than the current RAM
|
||||
// size of the VM, without errors
|
||||
//
|
||||
// 2. If the allocated memory is really used, the VM is scaled up
|
||||
// automatically to accommodate that
|
||||
//
|
||||
// We try to make that possible by having swap in the VM. But with the
|
||||
// default 'posix' DSM implementation, we fail step 1, even when there's
|
||||
// plenty of swap available. PostgreSQL uses posix_fallocate() to create
|
||||
// the shmem segment, which is really just a file in /dev/shm in Linux,
|
||||
// but posix_fallocate() on tmpfs returns ENOMEM if the size is larger
|
||||
// than available RAM.
|
||||
//
|
||||
// Using 'dynamic_shared_memory_type = mmap' works around that, because
|
||||
// the Postgres 'mmap' DSM implementation doesn't use
|
||||
// posix_fallocate(). Instead, it uses repeated calls to write(2) to
|
||||
// fill the file with zeros. It's weird that that differs between
|
||||
// 'posix' and 'mmap', but we take advantage of it. When the file is
|
||||
// filled slowly with write(2), the kernel allows it to grow larger, as
|
||||
// long as there's swap available.
|
||||
//
|
||||
// In short, using 'dynamic_shared_memory_type = mmap' allows us one DSM
|
||||
// segment to be larger than currently available RAM. But because we
|
||||
// don't want to store it on a real file, which the kernel would try to
|
||||
// flush to disk, so symlink pg_dynshm to /dev/shm.
|
||||
//
|
||||
// We don't set 'dynamic_shared_memory_type = mmap' here, we let the
|
||||
// control plane control that option. If 'mmap' is not used, this
|
||||
// symlink doesn't affect anything.
|
||||
//
|
||||
// See https://github.com/neondatabase/autoscaling/issues/800
|
||||
std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?;
|
||||
symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
|
||||
|
||||
match spec.mode {
|
||||
ComputeMode::Primary => {}
|
||||
ComputeMode::Replica | ComputeMode::Static(..) => {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
fs::{self, File},
|
||||
io,
|
||||
io::{self, Write},
|
||||
};
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -161,6 +161,48 @@ pub async fn durable_rename(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`].
|
||||
///
|
||||
/// The file is first written to the specified `tmp_path`, and in a second
|
||||
/// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync
|
||||
/// and atomic rename guarantee that, if we crash at any point, there will never
|
||||
/// be a partially written file at `final_path` (but maybe at `tmp_path`).
|
||||
///
|
||||
/// Callers are responsible for serializing calls of this function for a given `final_path`.
|
||||
/// If they don't, there may be an error due to conflicting `tmp_path`, or there will
|
||||
/// be no error and the content of `final_path` will be the "winner" caller's `content`.
|
||||
/// I.e., the atomticity guarantees still hold.
|
||||
pub fn overwrite(
|
||||
final_path: &Utf8Path,
|
||||
tmp_path: &Utf8Path,
|
||||
content: &[u8],
|
||||
) -> std::io::Result<()> {
|
||||
let Some(final_path_parent) = final_path.parent() else {
|
||||
return Err(std::io::Error::from_raw_os_error(
|
||||
nix::errno::Errno::EINVAL as i32,
|
||||
));
|
||||
};
|
||||
std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?;
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
// Use `create_new` so that, if we race with ourselves or something else,
|
||||
// we bail out instead of causing damage.
|
||||
.create_new(true)
|
||||
.open(tmp_path)?;
|
||||
file.write_all(content)?;
|
||||
file.sync_all()?;
|
||||
drop(file); // don't keep the fd open for longer than we have to
|
||||
|
||||
std::fs::rename(tmp_path, final_path)?;
|
||||
|
||||
let final_parent_dirfd = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(final_path_parent)?;
|
||||
|
||||
final_parent_dirfd.sync_all()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
|
||||
@@ -234,7 +234,7 @@ impl DeletionHeader {
|
||||
let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
|
||||
let header_path = conf.deletion_header_path();
|
||||
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
|
||||
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes)
|
||||
VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
|
||||
.await
|
||||
.maybe_fatal_err("save deletion header")?;
|
||||
|
||||
@@ -325,7 +325,8 @@ impl DeletionList {
|
||||
let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
|
||||
|
||||
let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
|
||||
VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes)
|
||||
|
||||
VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
|
||||
.await
|
||||
.maybe_fatal_err("save deletion list")
|
||||
.map_err(Into::into)
|
||||
|
||||
@@ -156,6 +156,7 @@ impl Timeline {
|
||||
pending_updates: HashMap::new(),
|
||||
pending_deletions: Vec::new(),
|
||||
pending_nblocks: 0,
|
||||
pending_aux_files: None,
|
||||
pending_directory_entries: Vec::new(),
|
||||
lsn,
|
||||
}
|
||||
@@ -870,6 +871,14 @@ pub struct DatadirModification<'a> {
|
||||
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
|
||||
pending_deletions: Vec<(Range<Key>, Lsn)>,
|
||||
pending_nblocks: i64,
|
||||
|
||||
// If we already wrote any aux file changes in this modification, stash the latest dir. If set,
|
||||
// [`Self::put_file`] may assume that it is safe to emit a delta rather than checking
|
||||
// if AUX_FILES_KEY is already set.
|
||||
pending_aux_files: Option<AuxFilesDirectory>,
|
||||
|
||||
/// For special "directory" keys that store key-value maps, track the size of the map
|
||||
/// if it was updated in this modification.
|
||||
pending_directory_entries: Vec<(DirectoryKind, usize)>,
|
||||
}
|
||||
|
||||
@@ -1384,31 +1393,76 @@ impl<'a> DatadirModification<'a> {
|
||||
content: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
|
||||
Ok(buf) => AuxFilesDirectory::des(&buf)?,
|
||||
Err(e) => {
|
||||
// This is expected: historical databases do not have the key.
|
||||
debug!("Failed to get info about AUX files: {}", e);
|
||||
AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
let file_path = path.to_string();
|
||||
let content = if content.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Bytes::copy_from_slice(content))
|
||||
};
|
||||
|
||||
let dir = if let Some(mut dir) = self.pending_aux_files.take() {
|
||||
// We already updated aux files in `self`: emit a delta and update our latest value
|
||||
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
file_path: file_path.clone(),
|
||||
content: content.clone(),
|
||||
}),
|
||||
);
|
||||
|
||||
dir.upsert(file_path, content);
|
||||
dir
|
||||
} else {
|
||||
// Check if the AUX_FILES_KEY is initialized
|
||||
match self.get(AUX_FILES_KEY, ctx).await {
|
||||
Ok(dir_bytes) => {
|
||||
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
|
||||
// Key is already set, we may append a delta
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
file_path: file_path.clone(),
|
||||
content: content.clone(),
|
||||
}),
|
||||
);
|
||||
dir.upsert(file_path, content);
|
||||
dir
|
||||
}
|
||||
Err(
|
||||
e @ (PageReconstructError::AncestorStopping(_)
|
||||
| PageReconstructError::Cancelled
|
||||
| PageReconstructError::AncestorLsnTimeout(_)),
|
||||
) => {
|
||||
// Important that we do not interpret a shutdown error as "not found" and thereby
|
||||
// reset the map.
|
||||
return Err(e.into());
|
||||
}
|
||||
// FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
|
||||
// we are assuming that all _other_ possible errors represents a missing key. If some
|
||||
// other error occurs, we may incorrectly reset the map of aux files.
|
||||
Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
|
||||
// Key is missing, we must insert an image as the basis for subsequent deltas.
|
||||
|
||||
let mut dir = AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
};
|
||||
dir.upsert(file_path, content);
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::Image(Bytes::from(
|
||||
AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
dir
|
||||
}
|
||||
}
|
||||
};
|
||||
let path = path.to_string();
|
||||
if content.is_empty() {
|
||||
dir.files.remove(&path);
|
||||
} else {
|
||||
dir.files.insert(path, Bytes::copy_from_slice(content));
|
||||
}
|
||||
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::AuxFiles, dir.files.len()));
|
||||
self.pending_aux_files = Some(dir);
|
||||
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::Image(Bytes::from(
|
||||
AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1618,8 +1672,18 @@ struct RelDirectory {
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||
struct AuxFilesDirectory {
|
||||
files: HashMap<String, Bytes>,
|
||||
pub(crate) struct AuxFilesDirectory {
|
||||
pub(crate) files: HashMap<String, Bytes>,
|
||||
}
|
||||
|
||||
impl AuxFilesDirectory {
|
||||
pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
|
||||
if let Some(value) = value {
|
||||
self.files.insert(key, value);
|
||||
} else {
|
||||
self.files.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -1655,8 +1719,60 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
|
||||
#[allow(clippy::bool_assert_comparison)]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
//use super::repo_harness::*;
|
||||
//use super::*;
|
||||
use hex_literal::hex;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use super::*;
|
||||
|
||||
use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
|
||||
|
||||
/// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
|
||||
#[tokio::test]
|
||||
async fn aux_files_round_trip() -> anyhow::Result<()> {
|
||||
let name = "aux_files_round_trip";
|
||||
let harness = TenantHarness::create(name)?;
|
||||
|
||||
pub const TIMELINE_ID: TimelineId =
|
||||
TimelineId::from_array(hex!("11223344556677881122334455667788"));
|
||||
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let tline = tline.raw_timeline().unwrap();
|
||||
|
||||
// First modification: insert two keys
|
||||
let mut modification = tline.begin_modification(Lsn(0x1000));
|
||||
modification.put_file("foo/bar1", b"content1", &ctx).await?;
|
||||
modification.set_lsn(Lsn(0x1008))?;
|
||||
modification.put_file("foo/bar2", b"content2", &ctx).await?;
|
||||
modification.commit(&ctx).await?;
|
||||
let expect_1008 = HashMap::from([
|
||||
("foo/bar1".to_string(), Bytes::from_static(b"content1")),
|
||||
("foo/bar2".to_string(), Bytes::from_static(b"content2")),
|
||||
]);
|
||||
|
||||
let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
|
||||
assert_eq!(readback, expect_1008);
|
||||
|
||||
// Second modification: update one key, remove the other
|
||||
let mut modification = tline.begin_modification(Lsn(0x2000));
|
||||
modification.put_file("foo/bar1", b"content3", &ctx).await?;
|
||||
modification.set_lsn(Lsn(0x2008))?;
|
||||
modification.put_file("foo/bar2", b"", &ctx).await?;
|
||||
modification.commit(&ctx).await?;
|
||||
let expect_2008 =
|
||||
HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
|
||||
|
||||
let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
|
||||
assert_eq!(readback, expect_2008);
|
||||
|
||||
// Reading back in time works
|
||||
let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
|
||||
assert_eq!(readback, expect_1008);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/*
|
||||
fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
|
||||
|
||||
@@ -28,7 +28,6 @@ use remote_storage::GenericRemoteStorage;
|
||||
use std::fmt;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -2878,17 +2877,10 @@ impl Tenant {
|
||||
|
||||
let tenant_shard_id = *tenant_shard_id;
|
||||
let config_path = config_path.to_owned();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Handle::current().block_on(async move {
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("write tenant {tenant_shard_id} config to {config_path}")
|
||||
})
|
||||
})
|
||||
})
|
||||
.await??;
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2915,17 +2907,12 @@ impl Tenant {
|
||||
|
||||
let tenant_shard_id = *tenant_shard_id;
|
||||
let target_config_path = target_config_path.to_owned();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Handle::current().block_on(async move {
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("write tenant {tenant_shard_id} config to {target_config_path}")
|
||||
})
|
||||
})
|
||||
})
|
||||
.await??;
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("write tenant {tenant_shard_id} config to {target_config_path}")
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -3914,6 +3901,7 @@ pub(crate) mod harness {
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::deletion_queue::mock::MockDeletionQueue;
|
||||
use crate::walredo::apply_neon;
|
||||
use crate::{
|
||||
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
|
||||
};
|
||||
@@ -4173,20 +4161,34 @@ pub(crate) mod harness {
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
_pg_version: u32,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let s = format!(
|
||||
"redo for {} to get to {}, with {} and {} records",
|
||||
key,
|
||||
lsn,
|
||||
if base_img.is_some() {
|
||||
"base image"
|
||||
} else {
|
||||
"no base image"
|
||||
},
|
||||
records.len()
|
||||
);
|
||||
println!("{s}");
|
||||
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
|
||||
|
||||
Ok(TEST_IMG(&s))
|
||||
if records_neon {
|
||||
// For Neon wal records, we can decode without spawning postgres, so do so.
|
||||
let base_img = base_img.expect("Neon WAL redo requires base image").1;
|
||||
let mut page = BytesMut::new();
|
||||
page.extend_from_slice(&base_img);
|
||||
for (_record_lsn, record) in records {
|
||||
apply_neon::apply_in_neon(&record, key, &mut page)?;
|
||||
}
|
||||
Ok(page.freeze())
|
||||
} else {
|
||||
// We never spawn a postgres walredo process in unit tests: just log what we might have done.
|
||||
let s = format!(
|
||||
"redo for {} to get to {}, with {} and {} records",
|
||||
key,
|
||||
lsn,
|
||||
if base_img.is_some() {
|
||||
"base image"
|
||||
} else {
|
||||
"no base image"
|
||||
},
|
||||
records.len()
|
||||
);
|
||||
println!("{s}");
|
||||
|
||||
Ok(TEST_IMG(&s))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use bytes::BytesMut;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::cmp::min;
|
||||
@@ -26,7 +27,10 @@ pub struct EphemeralFile {
|
||||
/// An ephemeral file is append-only.
|
||||
/// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
|
||||
/// The other pages, which can no longer be modified, are accessed through the page cache.
|
||||
mutable_tail: [u8; PAGE_SZ],
|
||||
///
|
||||
/// None <=> IO is ongoing.
|
||||
/// Size is fixed to PAGE_SZ at creation time and must not be changed.
|
||||
mutable_tail: Option<BytesMut>,
|
||||
}
|
||||
|
||||
impl EphemeralFile {
|
||||
@@ -60,7 +64,7 @@ impl EphemeralFile {
|
||||
_timeline_id: timeline_id,
|
||||
file,
|
||||
len: 0,
|
||||
mutable_tail: [0u8; PAGE_SZ],
|
||||
mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -103,7 +107,13 @@ impl EphemeralFile {
|
||||
};
|
||||
} else {
|
||||
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
|
||||
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
|
||||
Ok(BlockLease::EphemeralFileMutableTail(
|
||||
self.mutable_tail
|
||||
.as_deref()
|
||||
.expect("we're not doing IO, it must be Some()")
|
||||
.try_into()
|
||||
.expect("we ensure that it's always PAGE_SZ"),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,21 +145,27 @@ impl EphemeralFile {
|
||||
) -> Result<(), io::Error> {
|
||||
let mut src_remaining = src;
|
||||
while !src_remaining.is_empty() {
|
||||
let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
|
||||
let dst_remaining = &mut self
|
||||
.ephemeral_file
|
||||
.mutable_tail
|
||||
.as_deref_mut()
|
||||
.expect("IO is not yet ongoing")[self.off..];
|
||||
let n = min(dst_remaining.len(), src_remaining.len());
|
||||
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
|
||||
self.off += n;
|
||||
src_remaining = &src_remaining[n..];
|
||||
if self.off == PAGE_SZ {
|
||||
match self
|
||||
let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
|
||||
.expect("IO is not yet ongoing");
|
||||
let (mutable_tail, res) = self
|
||||
.ephemeral_file
|
||||
.file
|
||||
.write_all_at(
|
||||
&self.ephemeral_file.mutable_tail,
|
||||
self.blknum as u64 * PAGE_SZ as u64,
|
||||
)
|
||||
.await
|
||||
{
|
||||
.write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
|
||||
.await;
|
||||
// TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
|
||||
// I.e., the IO isn't retryable if we panic.
|
||||
self.ephemeral_file.mutable_tail = Some(mutable_tail);
|
||||
match res {
|
||||
Ok(_) => {
|
||||
// Pre-warm the page cache with what we just wrote.
|
||||
// This isn't necessary for coherency/correctness, but it's how we've always done it.
|
||||
@@ -169,7 +185,12 @@ impl EphemeralFile {
|
||||
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
||||
buf.copy_from_slice(
|
||||
self.ephemeral_file
|
||||
.mutable_tail
|
||||
.as_deref()
|
||||
.expect("IO is not ongoing"),
|
||||
);
|
||||
let _ = write_guard.mark_valid();
|
||||
// pre-warm successful
|
||||
}
|
||||
@@ -181,7 +202,11 @@ impl EphemeralFile {
|
||||
// Zero the buffer for re-use.
|
||||
// Zeroing is critical for correcntess because the write_blob code below
|
||||
// and similarly read_blk expect zeroed pages.
|
||||
self.ephemeral_file.mutable_tail.fill(0);
|
||||
self.ephemeral_file
|
||||
.mutable_tail
|
||||
.as_deref_mut()
|
||||
.expect("IO is not ongoing")
|
||||
.fill(0);
|
||||
// This block is done, move to next one.
|
||||
self.blknum += 1;
|
||||
self.off = 0;
|
||||
|
||||
@@ -279,7 +279,7 @@ pub async fn save_metadata(
|
||||
let path = conf.metadata_path(tenant_shard_id, timeline_id);
|
||||
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
|
||||
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
|
||||
VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes)
|
||||
VirtualFile::crashsafe_overwrite(path, temp_path, metadata_bytes)
|
||||
.await
|
||||
.context("write metadata")?;
|
||||
Ok(())
|
||||
|
||||
@@ -484,14 +484,9 @@ impl<'a> TenantDownloader<'a> {
|
||||
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
|
||||
let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
|
||||
let heatmap_path_bg = heatmap_path.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
tokio::runtime::Handle::current().block_on(async move {
|
||||
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
|
||||
})
|
||||
})
|
||||
.await
|
||||
.expect("Blocking task is never aborted")
|
||||
.maybe_fatal_err(&context_msg)?;
|
||||
VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
|
||||
.await
|
||||
.maybe_fatal_err(&context_msg)?;
|
||||
|
||||
tracing::debug!("Wrote local heatmap to {}", heatmap_path);
|
||||
|
||||
|
||||
@@ -19,14 +19,13 @@ use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
|
||||
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
use utils::fs_ext;
|
||||
|
||||
pub use pageserver_api::models::virtual_file as api;
|
||||
pub(crate) mod io_engine;
|
||||
@@ -404,47 +403,34 @@ impl VirtualFile {
|
||||
Ok(vfile)
|
||||
}
|
||||
|
||||
/// Writes a file to the specified `final_path` in a crash safe fasion
|
||||
/// Async version of [`::utils::crashsafe::overwrite`].
|
||||
///
|
||||
/// The file is first written to the specified tmp_path, and in a second
|
||||
/// step, the tmp path is renamed to the final path. As renames are
|
||||
/// atomic, a crash during the write operation will never leave behind a
|
||||
/// partially written file.
|
||||
pub async fn crashsafe_overwrite<B: BoundedBuf>(
|
||||
final_path: &Utf8Path,
|
||||
tmp_path: &Utf8Path,
|
||||
/// # NB:
|
||||
///
|
||||
/// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
|
||||
/// it did at an earlier time.
|
||||
/// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
|
||||
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
|
||||
final_path: Utf8PathBuf,
|
||||
tmp_path: Utf8PathBuf,
|
||||
content: B,
|
||||
) -> std::io::Result<()> {
|
||||
let Some(final_path_parent) = final_path.parent() else {
|
||||
return Err(std::io::Error::from_raw_os_error(
|
||||
nix::errno::Errno::EINVAL as i32,
|
||||
));
|
||||
};
|
||||
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
|
||||
let mut file = Self::open_with_options(
|
||||
tmp_path,
|
||||
OpenOptions::new()
|
||||
.write(true)
|
||||
// Use `create_new` so that, if we race with ourselves or something else,
|
||||
// we bail out instead of causing damage.
|
||||
.create_new(true),
|
||||
)
|
||||
.await?;
|
||||
let (_content, res) = file.write_all(content).await;
|
||||
res?;
|
||||
file.sync_all().await?;
|
||||
drop(file); // before the rename, that's important!
|
||||
// renames are atomic
|
||||
std::fs::rename(tmp_path, final_path)?;
|
||||
// Only open final path parent dirfd now, so that this operation only
|
||||
// ever holds one VirtualFile fd at a time. That's important because
|
||||
// the current `find_victim_slot` impl might pick the same slot for both
|
||||
// VirtualFile., and it eventually does a blocking write lock instead of
|
||||
// try_lock.
|
||||
let final_parent_dirfd =
|
||||
Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?;
|
||||
final_parent_dirfd.sync_all().await?;
|
||||
Ok(())
|
||||
// TODO: use tokio_epoll_uring if configured as `io_engine`.
|
||||
// See https://github.com/neondatabase/neon/issues/6663
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let slice_storage;
|
||||
let content_len = content.bytes_init();
|
||||
let content = if content.bytes_init() > 0 {
|
||||
slice_storage = Some(content.slice(0..content_len));
|
||||
slice_storage.as_deref().expect("just set it to Some()")
|
||||
} else {
|
||||
&[]
|
||||
};
|
||||
utils::crashsafe::overwrite(&final_path, &tmp_path, content)
|
||||
})
|
||||
.await
|
||||
.expect("blocking task is never aborted")
|
||||
}
|
||||
|
||||
/// Call File::sync_all() on the underlying File.
|
||||
@@ -582,24 +568,37 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||
pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
|
||||
pub async fn write_all_at<B: BoundedBuf>(
|
||||
&self,
|
||||
buf: B,
|
||||
mut offset: u64,
|
||||
) -> (B::Buf, Result<(), Error>) {
|
||||
let buf_len = buf.bytes_init();
|
||||
if buf_len == 0 {
|
||||
return (Slice::into_inner(buf.slice_full()), Ok(()));
|
||||
}
|
||||
let mut buf = buf.slice(0..buf_len);
|
||||
while !buf.is_empty() {
|
||||
match self.write_at(buf, offset).await {
|
||||
// TODO: push `buf` further down
|
||||
match self.write_at(&buf, offset).await {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
));
|
||||
return (
|
||||
Slice::into_inner(buf),
|
||||
Err(Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
)),
|
||||
);
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &buf[n..];
|
||||
buf = buf.slice(n..);
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
Err(e) => return (Slice::into_inner(buf), Err(e)),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
(Slice::into_inner(buf), Ok(()))
|
||||
}
|
||||
|
||||
/// Writes `buf.slice(0..buf.bytes_init())`.
|
||||
@@ -1064,10 +1063,19 @@ mod tests {
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
|
||||
}
|
||||
}
|
||||
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
|
||||
async fn write_all_at<B: BoundedBuf>(&self, buf: B, offset: u64) -> Result<(), Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
|
||||
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
|
||||
MaybeVirtualFile::VirtualFile(file) => {
|
||||
let (_buf, res) = file.write_all_at(buf, offset).await;
|
||||
res
|
||||
}
|
||||
MaybeVirtualFile::File(file) => {
|
||||
let buf_len = buf.bytes_init();
|
||||
if buf_len == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
file.write_all_at(&buf.slice(0..buf_len), offset)
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
@@ -1214,8 +1222,8 @@ mod tests {
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
file_b.write_all_at(b"BAR", 3).await?;
|
||||
file_b.write_all_at(b"FOO", 0).await?;
|
||||
file_b.write_all_at(b"BAR".to_vec(), 3).await?;
|
||||
file_b.write_all_at(b"FOO".to_vec(), 0).await?;
|
||||
|
||||
assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
|
||||
|
||||
@@ -1315,7 +1323,7 @@ mod tests {
|
||||
let path = testdir.join("myfile");
|
||||
let tmp_path = testdir.join("myfile.tmp");
|
||||
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
|
||||
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
|
||||
@@ -1324,7 +1332,7 @@ mod tests {
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
|
||||
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
|
||||
@@ -1346,7 +1354,7 @@ mod tests {
|
||||
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
|
||||
assert!(tmp_path.exists());
|
||||
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
|
||||
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -44,6 +44,11 @@ pub enum NeonWalRecord {
|
||||
moff: MultiXactOffset,
|
||||
members: Vec<MultiXactMember>,
|
||||
},
|
||||
/// Update the map of AUX files, either writing or dropping an entry
|
||||
AuxFile {
|
||||
file_path: String,
|
||||
content: Option<Bytes>,
|
||||
},
|
||||
}
|
||||
|
||||
impl NeonWalRecord {
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
mod process;
|
||||
|
||||
/// Code to apply [`NeonWalRecord`]s.
|
||||
mod apply_neon;
|
||||
pub(crate) mod apply_neon;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::{
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use crate::pgdatadir_mapping::AuxFilesDirectory;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::Context;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::BytesMut;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key};
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use postgres_ffi::pg_constants;
|
||||
@@ -12,6 +13,7 @@ use postgres_ffi::v14::nonrelfile_utils::{
|
||||
};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
|
||||
/// Can this request be served by neon redo functions
|
||||
/// or we need to pass it to wal-redo postgres process?
|
||||
@@ -230,6 +232,72 @@ pub(crate) fn apply_in_neon(
|
||||
LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
|
||||
}
|
||||
}
|
||||
NeonWalRecord::AuxFile { file_path, content } => {
|
||||
let mut dir = AuxFilesDirectory::des(page)?;
|
||||
dir.upsert(file_path.clone(), content.clone());
|
||||
|
||||
page.clear();
|
||||
let mut writer = page.writer();
|
||||
dir.ser_into(&mut writer)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::AUX_FILES_KEY;
|
||||
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::{pgdatadir_mapping::AuxFilesDirectory, walrecord::NeonWalRecord};
|
||||
|
||||
/// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
|
||||
#[test]
|
||||
fn apply_aux_file_deltas() -> anyhow::Result<()> {
|
||||
let base_dir = AuxFilesDirectory {
|
||||
files: HashMap::from([
|
||||
("two".to_string(), Bytes::from_static(b"content0")),
|
||||
("three".to_string(), Bytes::from_static(b"contentX")),
|
||||
]),
|
||||
};
|
||||
let base_image = AuxFilesDirectory::ser(&base_dir)?;
|
||||
|
||||
let deltas = vec![
|
||||
// Insert
|
||||
NeonWalRecord::AuxFile {
|
||||
file_path: "one".to_string(),
|
||||
content: Some(Bytes::from_static(b"content1")),
|
||||
},
|
||||
// Update
|
||||
NeonWalRecord::AuxFile {
|
||||
file_path: "two".to_string(),
|
||||
content: Some(Bytes::from_static(b"content99")),
|
||||
},
|
||||
// Delete
|
||||
NeonWalRecord::AuxFile {
|
||||
file_path: "three".to_string(),
|
||||
content: None,
|
||||
},
|
||||
];
|
||||
|
||||
let file_path = AUX_FILES_KEY;
|
||||
let mut page = BytesMut::from_iter(base_image);
|
||||
|
||||
for record in deltas {
|
||||
apply_in_neon(&record, file_path, &mut page)?;
|
||||
}
|
||||
|
||||
let reconstructed = AuxFilesDirectory::des(&page)?;
|
||||
let expect = HashMap::from([
|
||||
("one".to_string(), Bytes::from_static(b"content1")),
|
||||
("two".to_string(), Bytes::from_static(b"content99")),
|
||||
]);
|
||||
|
||||
assert_eq!(reconstructed.files, expect);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,9 +234,11 @@ async fn main() -> anyhow::Result<()> {
|
||||
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(&config.endpoint_rps_limit));
|
||||
let cancel_map = CancelMap::default();
|
||||
let redis_publisher = match &args.redis_notifications {
|
||||
Some(url) => Some(Arc::new(Mutex::new(
|
||||
RedisPublisherClient::new(url, args.region.clone(), &config.redis_rps_limit).await?,
|
||||
))),
|
||||
Some(url) => Some(Arc::new(Mutex::new(RedisPublisherClient::new(
|
||||
url,
|
||||
args.region.clone(),
|
||||
&config.redis_rps_limit,
|
||||
)?))),
|
||||
None => None,
|
||||
};
|
||||
let cancellation_handler = Arc::new(CancellationHandler::new(
|
||||
|
||||
@@ -331,6 +331,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
compute: node,
|
||||
req: _request_gauge,
|
||||
conn: _client_gauge,
|
||||
cancel: session,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::{
|
||||
cancellation,
|
||||
compute::PostgresConnection,
|
||||
console::messages::MetricsAuxInfo,
|
||||
metrics::NUM_BYTES_PROXIED_COUNTER,
|
||||
@@ -57,6 +58,7 @@ pub struct ProxyPassthrough<S> {
|
||||
|
||||
pub req: IntCounterPairGuard,
|
||||
pub conn: IntCounterPairGuard,
|
||||
pub cancel: cancellation::Session,
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
|
||||
|
||||
@@ -1,12 +1,5 @@
|
||||
use std::hash::BuildHasherDefault;
|
||||
|
||||
use lasso::{Spur, ThreadedRodeo};
|
||||
use pq_proto::CancelKeyData;
|
||||
use redis::{
|
||||
streams::{StreamReadOptions, StreamReadReply},
|
||||
AsyncCommands, FromRedisValue,
|
||||
};
|
||||
use rustc_hash::FxHasher;
|
||||
use redis::AsyncCommands;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::rate_limiter::{RateBucketInfo, RedisRateLimiter};
|
||||
@@ -21,74 +14,15 @@ pub struct RedisPublisherClient {
|
||||
}
|
||||
|
||||
impl RedisPublisherClient {
|
||||
pub async fn new(
|
||||
pub fn new(
|
||||
url: &str,
|
||||
region_id: String,
|
||||
info: &'static [RateBucketInfo],
|
||||
) -> anyhow::Result<Self> {
|
||||
let client = redis::Client::open(url)?;
|
||||
|
||||
let mut conn = client.get_async_connection().await.unwrap();
|
||||
|
||||
let len: u64 = conn.xlen("neon:endpoints:testing").await.unwrap();
|
||||
tracing::info!("starting with {len} endpoints in redis");
|
||||
|
||||
for i in len..300000u64 {
|
||||
let _key: String = conn
|
||||
.xadd(
|
||||
"neon:endpoints:testing",
|
||||
"*",
|
||||
&[("endpoint_id", format!("ep-hello-world-{i}"))],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
if i.is_power_of_two() {
|
||||
tracing::debug!("endpoints written {i}");
|
||||
}
|
||||
// client.
|
||||
}
|
||||
tracing::info!("written endpoints to redis");
|
||||
|
||||
// start reading
|
||||
|
||||
let s = ThreadedRodeo::<Spur, BuildHasherDefault<FxHasher>>::with_hasher(
|
||||
BuildHasherDefault::default(),
|
||||
);
|
||||
let opts = StreamReadOptions::default().count(1000);
|
||||
let mut id = "0-0".to_string();
|
||||
loop {
|
||||
let mut res: StreamReadReply = conn
|
||||
.xread_options(&["neon:endpoints:testing"], &[&id], &opts)
|
||||
.await
|
||||
.unwrap();
|
||||
if res.keys.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
assert_eq!(res.keys.len(), 1);
|
||||
let res = res.keys.pop().unwrap();
|
||||
assert_eq!(res.key, "neon:endpoints:testing");
|
||||
|
||||
if res.ids.is_empty() {
|
||||
break;
|
||||
}
|
||||
for x in res.ids {
|
||||
id = x.id;
|
||||
if let Some(ep) = x.map.get("endpoint_id") {
|
||||
let ep = String::from_redis_value(ep).unwrap();
|
||||
s.get_or_intern(ep);
|
||||
|
||||
if s.len().is_power_of_two() {
|
||||
tracing::debug!("endpoints read {}", s.len());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::info!("read {} endpoints from redis", s.len());
|
||||
|
||||
Ok(Self {
|
||||
client,
|
||||
publisher: Some(conn),
|
||||
publisher: None,
|
||||
region_id,
|
||||
limiter: RedisRateLimiter::new(info),
|
||||
})
|
||||
|
||||
@@ -3967,27 +3967,24 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]:
|
||||
|
||||
# pg is the existing and running compute node, that we want to compare with a basebackup
|
||||
def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint: Endpoint):
|
||||
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
|
||||
|
||||
# Get the timeline ID. We need it for the 'basebackup' command
|
||||
timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
|
||||
|
||||
# many tests already checkpoint, but do it just in case
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CHECKPOINT")
|
||||
|
||||
# wait for pageserver to catch up
|
||||
wait_for_last_flush_lsn(env, endpoint, endpoint.tenant_id, timeline_id)
|
||||
# stop postgres to ensure that files won't change
|
||||
endpoint.stop()
|
||||
|
||||
# Read the shutdown checkpoint's LSN
|
||||
pg_controldata_path = os.path.join(pg_bin.pg_bin_path, "pg_controldata")
|
||||
cmd = f"{pg_controldata_path} -D {endpoint.pgdata_dir}"
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
|
||||
checkpoint_lsn = re.findall(
|
||||
"Latest checkpoint location:\\s+([0-9A-F]+/[0-9A-F]+)", result.stdout
|
||||
)[0]
|
||||
log.debug(f"last checkpoint at {checkpoint_lsn}")
|
||||
|
||||
# Take a basebackup from pageserver
|
||||
restored_dir_path = env.repo_dir / f"{endpoint.endpoint_id}_restored_datadir"
|
||||
restored_dir_path.mkdir(exist_ok=True)
|
||||
|
||||
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
|
||||
psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
|
||||
|
||||
pageserver_id = env.attachment_service.locate(endpoint.tenant_id)[0]["node_id"]
|
||||
@@ -3995,7 +3992,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
|
||||
{psql_path} \
|
||||
--no-psqlrc \
|
||||
postgres://localhost:{env.get_pageserver(pageserver_id).service_port.pg} \
|
||||
-c 'basebackup {endpoint.tenant_id} {timeline_id} {checkpoint_lsn}' \
|
||||
-c 'basebackup {endpoint.tenant_id} {timeline_id}' \
|
||||
| tar -x -C {restored_dir_path}
|
||||
"""
|
||||
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 018fb05201...9dd9956c55
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 6ee78a3c29...ca2def9993
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 550cdd26d4...9c37a49884
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"postgres-v16": "550cdd26d445afdd26b15aa93c8c2f3dc52f8361",
|
||||
"postgres-v15": "6ee78a3c29e33cafd85ba09568b6b5eb031d29b9",
|
||||
"postgres-v14": "018fb052011081dc2733d3118d12e5c36df6eba1"
|
||||
"postgres-v16": "9c37a4988463a97d9cacb321acf3828b09823269",
|
||||
"postgres-v15": "ca2def999368d9df098a637234ad5a9003189463",
|
||||
"postgres-v14": "9dd9956c55ffbbd9abe77d10382453757fedfcf5"
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ commands:
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter.yml'
|
||||
shutdownHook: |
|
||||
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
|
||||
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m immediate --wait -t 10'
|
||||
files:
|
||||
- filename: pgbouncer.ini
|
||||
content: |
|
||||
|
||||
Reference in New Issue
Block a user