Compare commits

..

20 Commits

Author SHA1 Message Date
Alexey Kondratov
0eca1d19de Add safety notes, benchmark. Optimize checksum calculation 2022-07-07 20:45:50 +02:00
Alexey Kondratov
53b9cb915e Turn off data-checksums for old tenants by default and explicitly enable for new ones 2022-07-07 19:44:47 +02:00
Alexey Kondratov
cc6ffb558d Verify checksum of the page and WAL records before sending to the redo process 2022-07-07 19:44:47 +02:00
Alexey Kondratov
b135dbb85d Bump vendor/postgres 2022-07-07 19:44:47 +02:00
Alexey Kondratov
6059801943 Enable Postgres data checksums (neondatabase/cloud#536)
We need checksums to verify data integrity, when we read it from
untrusted place (e.g. local disk) or via untrusted communication channel
(e.g. network). At the same time, we trust pageserver <-> redo process
communication channel, as it is just a pipe.

Here we enable calculation of data checksums in the wal redo process and
when we extract FPI during WAL injestion. Compute node (Postgres) will
verify checksum of every page after receiving it back from pageserver.
So it is pretty similar to how vanilla Postgres checks them.

There are two other places where we should verify checksums to
detect data corruption earlier:
- when we receive WAL records from safekeepers (already implemented,
  see: WalStreamDecoder::poll_decode)
- when we write layer files to disk and read back in memory from local
  disk or S3
2022-07-07 19:44:47 +02:00
Konstantin Knizhnik
2501afba6e Calculate postgres checksum for FPI stored in pageserver (neondatabase/cloud#536) 2022-07-07 19:44:47 +02:00
Andrey Taranik
ae116ff0a9 update timeout for proxy deploy (#2047) 2022-07-07 18:09:57 +03:00
Heikki Linnakangas
e6ea049165 If an error happens during import of base backup or WAL, log it.
We only sent the error to the client, with no trace in the pageserver log.
Log it, similar to how we log errors in GetPage@LSN requests.
2022-07-07 16:05:13 +03:00
Alexey Kondratov
747d009bb4 Fix panic while waiting for Postgres readiness in the compute_ctl (#2021)
We were reading Postgres pid file and looking for the 'ready' status,
but it could be empty or we could not read it. So add all the checks.
2022-07-07 11:56:58 +02:00
Alexander Bayandin
cb5df3c627 github/actions: set missing VIP_VAP_ACCESS_TOKEN (#2045) 2022-07-07 10:47:03 +01:00
Heikki Linnakangas
0e3456351f Shrink thread pools used for WAL receivers and background tasks.
I noticed that the pageserver has a very large virtual memory size,
several GB, even though it doesn't actually use that much
memory. That's not much of a problem normally, but I hit it because I
wanted to run tests with a limited virtual memory size, by calling
setrlimit(RLIMIT_AS), but the highest limit you can set is 2 GB. I was
not able to start pageserver with a limit of 2 GB.

On Linux, each thread allocates 32 MB of virtual memory. I read this
on some random forum on the Internet, but unfortunately could not find
the source again now. Empirically, reducing the number of threads clearly
helps to bring down the virtual memory size.

Aside from the virtual memory usage, it seems excessive to launch 40
threads in both of those thread pools. The tokio default is to have as
many worker threads as there are CPU cores in the system. That seems
like a fine heuristic for us, too, so remove the explicit setting of
the pool size and rely on the default. Note that the GC and compaction
tasks are actually run with tokio spawn_blocking, so the threads that
are actually doing the work, and possibly waiting on I/O, are not
consuming threads from the thread pool. The WAL receiver work is done
in the tokio worker threads, but the WAL receivers are more CPU bound
so that seems OK.

Also remove the explicit maxinum on blocking tasks. I'm not sure what
the right value for that would be, or whether the value we set (100)
would be better than the tokio default (512). Since the value was
arbitrary, let's just rely on the tokio default for that, too.
2022-07-06 22:36:38 +03:00
Alexander Bayandin
1faf49da0f github/actions: set PERF_TEST_RESULT_CONNSTR from secrets (#2040) 2022-07-06 19:24:06 +01:00
bojanserafimov
4a96259bdd Add export/import test (#2036) 2022-07-06 13:45:26 -04:00
bojanserafimov
242af75653 Fix signal file parsing (#2042) 2022-07-06 13:45:02 -04:00
Arthur Petukhovsky
8fabdc6708 Add tests with concurrent computes.
Removes test_restart_compute, as added test_compute_restarts is stronger.
2022-07-06 18:07:29 +04:00
Alexander Bayandin
07df7c2edd github/actions: fix storing perf data for main (#2038) 2022-07-06 13:15:15 +01:00
Kirill Bulatov
50821c0a3c Return download stream directly from the remote storage API 2022-07-05 21:45:15 +03:00
Andrey Taranik
68adfe0fc8 inventory file fix for neon-stress env 2022-07-05 21:29:03 +04:00
Dmitry Rodionov
cfdf79aceb harden create_empty_timeline
Reorder checks so it checks whether the timeline exists
before writing something to disk, possibly replacing valid content
2022-07-05 16:44:18 +03:00
bojanserafimov
32560e75d2 Enable relocation test (#1974) 2022-07-05 08:27:57 -04:00
42 changed files with 945 additions and 687 deletions

View File

@@ -12,6 +12,7 @@ pageservers
safekeepers
[storage:vars]
env_name = neon-stress
console_mgmt_base_url = http://neon-stress-console.local
bucket_name = neon-storage-ireland
bucket_region = eu-west-1

View File

@@ -495,8 +495,8 @@ jobs:
name: Re-deploy proxy
command: |
DOCKER_TAG=$(git log --oneline|wc -l)
helm upgrade neon-proxy neondatabase/neon-proxy --install -f .circleci/helm-values/staging.proxy.yaml --set image.tag=${DOCKER_TAG} --wait
helm upgrade neon-proxy-scram neondatabase/neon-proxy --install -f .circleci/helm-values/staging.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait
helm upgrade neon-proxy neondatabase/neon-proxy --install -f .circleci/helm-values/staging.proxy.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade neon-proxy-scram neondatabase/neon-proxy --install -f .circleci/helm-values/staging.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
deploy-neon-stress:
docker:

View File

@@ -85,7 +85,7 @@ runs:
EXTRA_PARAMS="-n4 $EXTRA_PARAMS"
fi
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
if [[ "$GITHUB_REF" == "main" ]]; then
if [[ "$GITHUB_REF" == "refs/heads/main" ]]; then
mkdir -p "$PERF_REPORT_DIR"
EXTRA_PARAMS="--out-dir $PERF_REPORT_DIR $EXTRA_PARAMS"
fi
@@ -115,7 +115,7 @@ runs:
-rA $TEST_SELECTION $EXTRA_PARAMS
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
if [[ "$GITHUB_REF" == "main" ]]; then
if [[ "$GITHUB_REF" == "refs/heads/main" ]]; then
export REPORT_FROM="$PERF_REPORT_DIR"
export REPORT_TO=local
scripts/generate_and_push_perf_report.sh

View File

@@ -271,6 +271,9 @@ jobs:
test_selection: performance
run_in_parallel: false
save_perf_report: true
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones

View File

@@ -248,18 +248,20 @@ pub fn wait_for_postgres(pg: &mut Child, port: &str, pgdata: &Path) -> Result<()
bail!("Postgres exited unexpectedly with code {}", code);
}
if pid_path.exists() {
let file = BufReader::new(File::open(&pid_path)?);
let status = file
.lines()
.last()
.unwrap()
.unwrap_or_else(|_| "unknown".to_string());
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
// Check that we can open pid file first.
if let Ok(file) = File::open(&pid_path) {
let file = BufReader::new(file);
let last_line = file.lines().last();
// Now Postgres is ready to accept connections
if status.trim() == "ready" && can_connect {
break;
// Pid file could be there and we could read it, but it could be empty, for example.
if let Some(Ok(line)) = last_line {
let status = line.trim();
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
// Now Postgres is ready to accept connections
if status == "ready" && can_connect {
break;
}
}
}

View File

@@ -427,6 +427,7 @@ impl PageServerNode {
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
data_checksums_enabled: Some(true),
})
.send()?
.error_from_body()?
@@ -436,7 +437,7 @@ impl PageServerNode {
.map(|id| {
id.parse().with_context(|| {
format!(
"Failed to parse tennat creation response as tenant id: {}",
"Failed to parse tenant creation response as tenant id: {}",
id
)
})

View File

@@ -9,6 +9,7 @@
use serde::{Deserialize, Serialize};
use utils::lsn::Lsn;
use utils::pg_checksum_page::pg_checksum_page;
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
@@ -56,3 +57,55 @@ pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) {
pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
}
/// Calculate page checksum and stamp it onto the page.
/// NB: this will zero out and ignore any existing checksum.
/// # Safety
/// See safety notes for `pg_checksum_page`
pub unsafe fn page_set_checksum(page: &mut [u8], blkno: u32) {
let checksum = pg_checksum_page(page, blkno);
page[8..10].copy_from_slice(&checksum.to_le_bytes());
}
/// Check if page checksum is valid.
/// # Safety
/// See safety notes for `pg_checksum_page`
pub unsafe fn page_verify_checksum(page: &[u8], blkno: u32) -> bool {
let checksum = pg_checksum_page(page, blkno);
checksum == u16::from_le_bytes(page[8..10].try_into().unwrap())
}
#[cfg(test)]
mod tests {
use crate::pg_constants::BLCKSZ;
use crate::{page_set_checksum, page_verify_checksum};
use utils::pg_checksum_page::pg_checksum_page;
#[test]
fn set_and_verify_checksum() {
// Create a page with some content and without a correct checksum.
let mut page: [u8; BLCKSZ as usize] = [0; BLCKSZ as usize];
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ as usize) {
*byte = i as u8;
}
// Calculate the checksum.
let checksum = unsafe { pg_checksum_page(&page[..], 0) };
// Sanity check: random bytes in the checksum attribute should not be
// a valid checksum.
assert_ne!(
checksum,
u16::from_le_bytes(page[8..10].try_into().unwrap())
);
// Set the actual checksum.
unsafe { page_set_checksum(&mut page, 0) };
// Verify the checksum.
assert!(unsafe { page_verify_checksum(&page[..], 0) });
// Checksum is not valid with another block number.
assert!(!unsafe { page_verify_checksum(&page[..], 1) });
}
}

View File

@@ -14,7 +14,6 @@ use super::XLogLongPageHeaderData;
use super::XLogPageHeaderData;
use super::XLogRecord;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crc32c::*;
use log::*;
use std::cmp::min;
use thiserror::Error;
@@ -198,18 +197,12 @@ impl WalStreamDecoder {
}
// We now have a record in the 'recordbuf' local variable.
let xlogrec =
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
WalDecodeError {
msg: format!("xlog record deserialization failed {}", e),
lsn: self.lsn,
}
})?;
let xlogrec = XLogRecord::from_buf(&recordbuf).map_err(|e| WalDecodeError {
msg: format!("xlog record deserialization failed {}", e),
lsn: self.lsn,
})?;
let mut crc = 0;
crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
if !wal_record_verify_checksum(&xlogrec, &recordbuf) {
return Err(WalDecodeError {
msg: "WAL record crc mismatch".into(),
lsn: self.lsn,

View File

@@ -477,6 +477,10 @@ impl XLogRecord {
XLogRecord::des(buf)
}
pub fn from_buf(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
XLogRecord::from_slice(&buf[0..XLOG_SIZE_OF_XLOG_RECORD])
}
pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
use utils::bin_ser::LeSer;
XLogRecord::des_from(&mut buf.reader())
@@ -742,3 +746,11 @@ mod tests {
assert_eq!(checkpoint.nextXid.value, 2048);
}
}
pub fn wal_record_verify_checksum(rec: &XLogRecord, recordbuf: &Bytes) -> bool {
let mut crc = 0;
crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
crc == rec.xl_crc
}

View File

@@ -56,6 +56,7 @@ impl Conf {
.new_pg_command("initdb")?
.arg("-D")
.arg(self.datadir.as_os_str())
.arg("--data-checksums")
.args(&["-U", "postgres", "--no-instructions", "--no-sync"])
.output()?;
debug!("initdb output: {:?}", output);

View File

@@ -12,8 +12,10 @@ use std::{
borrow::Cow,
collections::HashMap,
ffi::OsStr,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
path::{Path, PathBuf},
pin::Pin,
};
use anyhow::{bail, Context};
@@ -70,11 +72,7 @@ pub trait RemoteStorage: Send + Sync {
/// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
/// Returns the metadata, if any was stored with the file previously.
async fn download(
&self,
from: &Self::RemoteObjectId,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>>;
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError>;
/// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
/// Returns the metadata, if any was stored with the file previously.
@@ -83,12 +81,49 @@ pub trait RemoteStorage: Send + Sync {
from: &Self::RemoteObjectId,
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>>;
) -> Result<Download, DownloadError>;
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>;
}
pub struct Download {
pub download_stream: Pin<Box<dyn io::AsyncRead + Unpin + Send>>,
/// Extra key-value data, associated with the current remote file.
pub metadata: Option<StorageMetadata>,
}
impl Debug for Download {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Download")
.field("metadata", &self.metadata)
.finish()
}
}
#[derive(Debug)]
pub enum DownloadError {
/// Validation or other error happened due to user input.
BadInput(anyhow::Error),
/// The file was not found in the remote storage.
NotFound,
/// The file was found in the remote storage, but the download failed.
Other(anyhow::Error),
}
impl std::fmt::Display for DownloadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DownloadError::BadInput(e) => {
write!(f, "Failed to download a remote file due to user input: {e}")
}
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e}"),
}
}
}
impl std::error::Error for DownloadError {}
/// Every storage, currently supported.
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
pub enum GenericRemoteStorage {
@@ -180,7 +215,7 @@ pub struct S3Config {
pub concurrency_limit: NonZeroUsize,
}
impl std::fmt::Debug for S3Config {
impl Debug for S3Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("S3Config")
.field("bucket_name", &self.bucket_name)

View File

@@ -17,7 +17,7 @@ use tokio::{
};
use tracing::*;
use crate::path_with_suffix_extension;
use crate::{path_with_suffix_extension, Download, DownloadError};
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
@@ -192,15 +192,12 @@ impl RemoteStorage for LocalFs {
Ok(())
}
async fn download(
&self,
from: &Self::RemoteObjectId,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
let file_path = self.resolve_in_storage(from)?;
if file_path.exists() && file_path.is_file() {
let mut source = io::BufReader::new(
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
let file_path = self
.resolve_in_storage(from)
.map_err(DownloadError::BadInput)?;
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
let source = io::BufReader::new(
fs::OpenOptions::new()
.read(true)
.open(&file_path)
@@ -210,22 +207,20 @@ impl RemoteStorage for LocalFs {
"Failed to open source file '{}' to use in the download",
file_path.display()
)
})?,
})
.map_err(DownloadError::Other)?,
);
io::copy(&mut source, to).await.with_context(|| {
format!(
"Failed to download file '{}' from the local storage",
file_path.display()
)
})?;
source.flush().await?;
self.read_storage_metadata(&file_path).await
let metadata = self
.read_storage_metadata(&file_path)
.await
.map_err(DownloadError::Other)?;
Ok(Download {
metadata,
download_stream: Box::pin(source),
})
} else {
bail!(
"File '{}' either does not exist or is not a file",
file_path.display()
)
Err(DownloadError::NotFound)
}
}
@@ -234,22 +229,19 @@ impl RemoteStorage for LocalFs {
from: &Self::RemoteObjectId,
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
) -> Result<Download, DownloadError> {
if let Some(end_exclusive) = end_exclusive {
ensure!(
end_exclusive > start_inclusive,
"Invalid range, start ({}) is bigger then end ({:?})",
start_inclusive,
end_exclusive
);
if end_exclusive <= start_inclusive {
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
};
if start_inclusive == end_exclusive.saturating_sub(1) {
return Ok(None);
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
}
}
let file_path = self.resolve_in_storage(from)?;
if file_path.exists() && file_path.is_file() {
let file_path = self
.resolve_in_storage(from)
.map_err(DownloadError::BadInput)?;
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
let mut source = io::BufReader::new(
fs::OpenOptions::new()
.read(true)
@@ -260,31 +252,31 @@ impl RemoteStorage for LocalFs {
"Failed to open source file '{}' to use in the download",
file_path.display()
)
})?,
})
.map_err(DownloadError::Other)?,
);
source
.seek(io::SeekFrom::Start(start_inclusive))
.await
.context("Failed to seek to the range start in a local storage file")?;
match end_exclusive {
Some(end_exclusive) => {
io::copy(&mut source.take(end_exclusive - start_inclusive), to).await
}
None => io::copy(&mut source, to).await,
}
.with_context(|| {
format!(
"Failed to download file '{}' range from the local storage",
file_path.display()
)
})?;
.context("Failed to seek to the range start in a local storage file")
.map_err(DownloadError::Other)?;
let metadata = self
.read_storage_metadata(&file_path)
.await
.map_err(DownloadError::Other)?;
self.read_storage_metadata(&file_path).await
Ok(match end_exclusive {
Some(end_exclusive) => Download {
metadata,
download_stream: Box::pin(source.take(end_exclusive - start_inclusive)),
},
None => Download {
metadata,
download_stream: Box::pin(source),
},
})
} else {
bail!(
"File '{}' either does not exist or is not a file",
file_path.display()
)
Err(DownloadError::NotFound)
}
}
@@ -352,6 +344,19 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()>
Ok(())
}
fn file_exists(file_path: &Path) -> anyhow::Result<bool> {
if file_path.exists() {
ensure!(
file_path.is_file(),
"file path '{}' is not a file",
file_path.display()
);
Ok(true)
} else {
Ok(false)
}
}
#[cfg(test)]
mod pure_tests {
use tempfile::tempdir;
@@ -518,6 +523,31 @@ mod fs_tests {
use std::{collections::HashMap, io::Write};
use tempfile::tempdir;
async fn read_and_assert_remote_file_contents(
storage: &LocalFs,
#[allow(clippy::ptr_arg)]
// have to use &PathBuf due to `storage.local_path` parameter requirements
remote_storage_path: &PathBuf,
expected_metadata: Option<&StorageMetadata>,
) -> anyhow::Result<String> {
let mut download = storage
.download(remote_storage_path)
.await
.map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
ensure!(
download.metadata.as_ref() == expected_metadata,
"Unexpected metadata returned for the downloaded file"
);
let mut contents = String::new();
download
.download_stream
.read_to_string(&mut contents)
.await
.context("Failed to read remote file contents into string")?;
Ok(contents)
}
#[tokio::test]
async fn upload_file() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
@@ -568,15 +598,7 @@ mod fs_tests {
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let metadata = storage.download(&upload_target, &mut content_bytes).await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
content_bytes.flush().await?;
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
assert_eq!(
dummy_contents(upload_name),
contents,
@@ -584,13 +606,9 @@ mod fs_tests {
);
let non_existing_path = PathBuf::from("somewhere").join("else");
match storage.download(&non_existing_path, &mut io::sink()).await {
Ok(_) => panic!("Should not allow downloading non-existing storage files"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("does not exist"));
assert!(error_string.contains(&non_existing_path.display().to_string()));
}
match storage.download(&non_existing_path).await {
Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
}
Ok(())
}
@@ -603,58 +621,31 @@ mod fs_tests {
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let mut full_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let metadata = storage
.download_byte_range(&upload_target, 0, None, &mut full_range_bytes)
.await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
full_range_bytes.flush().await?;
let full_range_download_contents =
read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
assert_eq!(
dummy_contents(upload_name),
String::from_utf8(full_range_bytes.into_inner().into_inner())?,
full_range_download_contents,
"Download full range should return the whole upload"
);
let mut zero_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let same_byte = 1_000_000_000;
let metadata = storage
.download_byte_range(
&upload_target,
same_byte,
Some(same_byte + 1), // exclusive end
&mut zero_range_bytes,
)
.await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
zero_range_bytes.flush().await?;
assert!(
zero_range_bytes.into_inner().into_inner().is_empty(),
"Zero byte range should not download any part of the file"
);
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let metadata = storage
.download_byte_range(
&upload_target,
0,
Some(first_part_local.len() as u64),
&mut first_part_remote,
)
let mut first_part_download = storage
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
.await?;
assert!(
metadata.is_none(),
first_part_download.metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(
&mut first_part_download.download_stream,
&mut first_part_remote,
)
.await?;
first_part_remote.flush().await?;
let first_part_remote = first_part_remote.into_inner().into_inner();
assert_eq!(
@@ -663,20 +654,24 @@ mod fs_tests {
"First part bytes should be returned when requested"
);
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let metadata = storage
let mut second_part_download = storage
.download_byte_range(
&upload_target,
first_part_local.len() as u64,
Some((first_part_local.len() + second_part_local.len()) as u64),
&mut second_part_remote,
)
.await?;
assert!(
metadata.is_none(),
second_part_download.metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(
&mut second_part_download.download_stream,
&mut second_part_remote,
)
.await?;
second_part_remote.flush().await?;
let second_part_remote = second_part_remote.into_inner().into_inner();
assert_eq!(
@@ -696,11 +691,30 @@ mod fs_tests {
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let start = 1_000_000_000;
let end = start + 1;
match storage
.download_byte_range(
&upload_target,
start,
Some(end), // exclusive end
)
.await
{
Ok(_) => panic!("Should not allow downloading wrong ranges"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("zero bytes"));
assert!(error_string.contains(&start.to_string()));
assert!(error_string.contains(&end.to_string()));
}
}
let start = 10000;
let end = 234;
assert!(start > end, "Should test an incorrect range");
match storage
.download_byte_range(&upload_target, start, Some(end), &mut io::sink())
.download_byte_range(&upload_target, start, Some(end))
.await
{
Ok(_) => panic!("Should not allow downloading wrong ranges"),
@@ -712,18 +726,6 @@ mod fs_tests {
}
}
let non_existing_path = PathBuf::from("somewhere").join("else");
match storage
.download_byte_range(&non_existing_path, 1, Some(3), &mut io::sink())
.await
{
Ok(_) => panic!("Should not allow downloading non-existing storage file ranges"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("does not exist"));
assert!(error_string.contains(&non_existing_path.display().to_string()));
}
}
Ok(())
}
@@ -762,35 +764,26 @@ mod fs_tests {
let upload_target =
upload_dummy_file(&workdir, &storage, upload_name, Some(metadata.clone())).await?;
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let full_download_metadata = storage.download(&upload_target, &mut content_bytes).await?;
content_bytes.flush().await?;
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
let full_range_download_contents =
read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
assert_eq!(
dummy_contents(upload_name),
contents,
full_range_download_contents,
"We should upload and download the same contents"
);
assert_eq!(
full_download_metadata.as_ref(),
Some(&metadata),
"We should get the same metadata back for full download"
);
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
let (first_part_local, _) = uploaded_bytes.split_at(3);
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let partial_download_metadata = storage
.download_byte_range(
&upload_target,
0,
Some(first_part_local.len() as u64),
&mut first_part_remote,
)
let mut partial_download_with_metadata = storage
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
.await?;
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(
&mut partial_download_with_metadata.download_stream,
&mut first_part_remote,
)
.await?;
first_part_remote.flush().await?;
let first_part_remote = first_part_remote.into_inner().into_inner();
assert_eq!(
@@ -800,8 +793,8 @@ mod fs_tests {
);
assert_eq!(
partial_download_metadata.as_ref(),
Some(&metadata),
partial_download_with_metadata.metadata,
Some(metadata),
"We should get the same metadata back for partial download"
);
@@ -843,7 +836,7 @@ mod fs_tests {
}
fn dummy_contents(name: &str) -> String {
format!("contents for {}", name)
format!("contents for {name}")
}
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<PathBuf>> {

View File

@@ -9,17 +9,17 @@ use std::path::{Path, PathBuf};
use anyhow::Context;
use rusoto_core::{
credential::{InstanceMetadataProvider, StaticProvider},
HttpClient, Region,
HttpClient, Region, RusotoError,
};
use rusoto_s3::{
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client,
StreamingBody, S3,
DeleteObjectRequest, GetObjectError, GetObjectRequest, ListObjectsV2Request, PutObjectRequest,
S3Client, StreamingBody, S3,
};
use tokio::{io, sync::Semaphore};
use tokio_util::io::ReaderStream;
use tracing::debug;
use crate::{strip_path_prefix, RemoteStorage, S3Config};
use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config};
use super::StorageMetadata;
@@ -187,6 +187,39 @@ impl S3Bucket {
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
})
}
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 download")
.map_err(DownloadError::Other)?;
metrics::inc_get_object();
match self.client.get_object(request).await {
Ok(object_output) => match object_output.body {
None => {
metrics::inc_get_object_fail();
Err(DownloadError::Other(anyhow::anyhow!(
"Got no body for the S3 object given"
)))
}
Some(body) => Ok(Download {
metadata: object_output.metadata.map(StorageMetadata),
download_stream: Box::pin(io::BufReader::new(body.into_async_read())),
}),
},
Err(RusotoError::Service(GetObjectError::NoSuchKey(_))) => Err(DownloadError::NotFound),
Err(e) => {
metrics::inc_get_object_fail();
Err(DownloadError::Other(anyhow::anyhow!(
"Failed to download S3 object: {e}"
)))
}
}
}
}
#[async_trait::async_trait]
@@ -283,38 +316,13 @@ impl RemoteStorage for S3Bucket {
Ok(())
}
async fn download(
&self,
from: &Self::RemoteObjectId,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 download")?;
metrics::inc_get_object();
let object_output = self
.client
.get_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
..GetObjectRequest::default()
})
.await
.map_err(|e| {
metrics::inc_get_object_fail();
e
})?;
if let Some(body) = object_output.body {
let mut from = io::BufReader::new(body.into_async_read());
io::copy(&mut from, to).await?;
}
Ok(object_output.metadata.map(StorageMetadata))
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
self.download_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
..GetObjectRequest::default()
})
.await
}
async fn download_byte_range(
@@ -322,8 +330,7 @@ impl RemoteStorage for S3Bucket {
from: &Self::RemoteObjectId,
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
) -> Result<Download, DownloadError> {
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// and needs both ends to be exclusive
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
@@ -331,34 +338,14 @@ impl RemoteStorage for S3Bucket {
Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive),
None => format!("bytes={}-", start_inclusive),
});
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 range download")?;
metrics::inc_get_object();
let object_output = self
.client
.get_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
range,
..GetObjectRequest::default()
})
.await
.map_err(|e| {
metrics::inc_get_object_fail();
e
})?;
if let Some(body) = object_output.body {
let mut from = io::BufReader::new(body.into_async_read());
io::copy(&mut from, to).await?;
}
Ok(object_output.metadata.map(StorageMetadata))
self.download_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
range,
..GetObjectRequest::default()
})
.await
}
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> {

View File

@@ -1,6 +1,6 @@
#![allow(unused)]
use criterion::{criterion_group, criterion_main, Criterion};
use utils::pg_checksum_page::pg_checksum_page;
use utils::zid;
pub fn bench_zid_stringify(c: &mut Criterion) {
@@ -18,5 +18,20 @@ pub fn bench_zid_stringify(c: &mut Criterion) {
});
}
criterion_group!(benches, bench_zid_stringify);
// NB: adding `black_box` around arguments doesn't seem to change anything.
pub fn pg_checksum_page_basic(c: &mut Criterion) {
const BLCKSZ: usize = 8192;
let mut page: [u8; BLCKSZ] = [0; BLCKSZ];
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ) {
*byte = i as u8;
}
c.bench_function("pg_checksum_page_basic", |b| {
b.iter(|| {
unsafe { pg_checksum_page(&page[..], 0) };
})
});
}
criterion_group!(benches, pg_checksum_page_basic, bench_zid_stringify);
criterion_main!(benches);

View File

@@ -5,7 +5,7 @@ DATA_DIR=$3
PORT=$4
SYSID=`od -A n -j 24 -N 8 -t d8 $WAL_PATH/000000010000000000000002* | cut -c 3-`
rm -fr $DATA_DIR
env -i LD_LIBRARY_PATH=$PG_BIN/../lib $PG_BIN/initdb -E utf8 -U cloud_admin -D $DATA_DIR --sysid=$SYSID
env -i LD_LIBRARY_PATH=$PG_BIN/../lib $PG_BIN/initdb -E utf8 -U cloud_admin -D $DATA_DIR --data-checksums --sysid=$SYSID
echo port=$PORT >> $DATA_DIR/postgresql.conf
REDO_POS=0x`$PG_BIN/pg_controldata -D $DATA_DIR | fgrep "REDO location"| cut -c 42-`
declare -i WAL_SIZE=$REDO_POS+114

View File

@@ -54,6 +54,9 @@ pub mod nonblock;
// Default signal handling
pub mod signals;
// Postgres checksum calculation
pub mod pg_checksum_page;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

View File

@@ -0,0 +1,136 @@
///
/// Rust implementation of Postgres pg_checksum_page
/// See: https://github.com/postgres/postgres/blob/88210542106de5b26fe6aa088d1811b68502d224/src/include/storage/checksum_impl.h
/// for additional comments.
///
/// This is not a direct port of pg_checksum_page from Postgres, though.
/// For example, in the current state it can only produce a valid result
/// on the little-endian platform and with the standard 8 KB page size.
///
const BLCKSZ: usize = 8192;
const N_SUMS: usize = 32;
// Prime multiplier of FNV-1a hash
const FNV_PRIME: u32 = 16777619;
// Base offsets to initialize each of the parallel FNV hashes into a
// different initial state.
const CHECKSUM_BASE_OFFSETS: [u32; N_SUMS] = [
0x5B1F36E9, 0xB8525960, 0x02AB50AA, 0x1DE66D2A, 0x79FF467A, 0x9BB9F8A3, 0x217E7CD2, 0x83E13D2C,
0xF8D4474F, 0xE39EB970, 0x42C6AE16, 0x993216FA, 0x7B093B5D, 0x98DAFF3C, 0xF718902A, 0x0B1C9CDB,
0xE58F764B, 0x187636BC, 0x5D7B3BB1, 0xE73DE7DE, 0x92BEC979, 0xCCA6C0B2, 0x304A0979, 0x85AA43D4,
0x783125BB, 0x6CA8EAA2, 0xE407EAC6, 0x4B5CFC3E, 0x9FBF8C76, 0x15CA20BE, 0xF2CA9FD3, 0x959BD756,
];
// Calculate one round of the checksum.
fn checksum_comp(checksum: u32, value: u32) -> u32 {
let tmp = checksum ^ value;
tmp.wrapping_mul(FNV_PRIME) ^ (tmp >> 17)
}
/// Compute the checksum for a Postgres page.
///
/// The page must be adequately aligned (at least on a 4-byte boundary).
///
/// The checksum includes the block number (to detect the case where a page is
/// somehow moved to a different location), the page header (excluding the
/// checksum itself), and the page data.
///
/// As in C implementation in Postgres, the checksum attribute on the page is
/// excluded from the calculation and preserved.
///
/// NB: after doing any modifications run `cargo bench`. The baseline on the more
/// or less recent Intel laptop is around 700ns. If it's significantly higher,
/// then it's worth looking into.
///
/// # Arguments
/// * `data` - the page to checksum
/// * `blkno` - the block number of the page
///
/// # Safety
/// This function is safe to call only if:
/// * `data` is strictly a standard 8 KB Postgres page
/// * it's called on the little-endian platform
pub unsafe fn pg_checksum_page(data: &[u8], blkno: u32) -> u16 {
let page = std::mem::transmute::<&[u8], &[u32]>(data);
let mut checksum: u32 = 0;
let mut sums = CHECKSUM_BASE_OFFSETS;
// Calculate the checksum of the first 'row' of the page. Do it separately as
// we do an expensive comparison here, which is not required for the rest of the
// page. Putting it into the main loop slows it down ~3 times.
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
// Third 32-bit chunk of the page contains the checksum in the lower half
// (assuming we are on little-endian machine), which we need to zero out.
// See also `PageHeaderData` for reference.
let chunk: u32 = if j == 2 {
page[j] & 0xFFFF_0000
} else {
page[j]
};
*sum = checksum_comp(*sum, chunk);
}
// Main checksum calculation loop
for i in 1..(BLCKSZ / (4 * N_SUMS)) {
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
*sum = checksum_comp(*sum, page[i * N_SUMS + j]);
}
}
// Finally, add in two rounds of zeroes for additional mixing
for _i in 0..2 {
for s in sums.iter_mut().take(N_SUMS) {
*s = checksum_comp(*s, 0);
}
}
// Xor fold partial checksums together
for sum in sums {
checksum ^= sum;
}
// Mix in the block number to detect transposed pages
checksum ^= blkno;
// Reduce to a uint16 (to fit in the pd_checksum field) with an offset of
// one. That avoids checksums of zero, which seems like a good idea.
((checksum % 65535) + 1) as u16
}
#[cfg(test)]
mod tests {
use super::{pg_checksum_page, BLCKSZ};
#[test]
fn page_with_and_without_checksum() {
// Create a page with some content and without a correct checksum.
let mut page: [u8; BLCKSZ] = [0; BLCKSZ];
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ) {
*byte = i as u8;
}
// Calculate the checksum.
let checksum = unsafe { pg_checksum_page(&page[..], 0) };
// Zero the checksum attribute on the page.
page[8..10].copy_from_slice(&[0u8; 2]);
// Calculate the checksum again, should be the same.
let new_checksum = unsafe { pg_checksum_page(&page[..], 0) };
assert_eq!(checksum, new_checksum);
// Set the correct checksum into the page.
page[8..10].copy_from_slice(&checksum.to_le_bytes());
// Calculate the checksum again, should be the same.
let new_checksum = unsafe { pg_checksum_page(&page[..], 0) };
assert_eq!(checksum, new_checksum);
// Check that we protect from the page transposition, i.e. page is the
// same, but in the wrong place.
let wrong_blockno_checksum = unsafe { pg_checksum_page(&page[..], 1) };
assert_ne!(checksum, wrong_blockno_checksum);
}
}

View File

@@ -38,6 +38,7 @@ pub struct TenantCreateRequest {
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub data_checksums_enabled: Option<bool>,
}
#[serde_as]

View File

@@ -494,6 +494,8 @@ components:
type: string
compaction_threshold:
type: string
data_checksums_enabled:
type: boolean
TenantConfigInfo:
type: object
properties:

View File

@@ -412,6 +412,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
// Turn on data checksums for all new tenants
tenant_conf.data_checksums_enabled = Some(request_data.data_checksums_enabled.unwrap_or(true));
if let Some(compaction_period) = request_data.compaction_period {
tenant_conf.compaction_period =
Some(humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?);

View File

@@ -516,10 +516,23 @@ pub fn import_file<R: Repository, Reader: Read>(
// Parse zenith signal file to set correct previous LSN
let bytes = read_all_bytes(reader)?;
// zenith.signal format is "PREV LSN: prev_lsn"
let zenith_signal = std::str::from_utf8(&bytes)?;
let zenith_signal = zenith_signal.split(':').collect::<Vec<_>>();
let prev_lsn = zenith_signal[1].trim().parse::<Lsn>()?;
// TODO write serialization and deserialization in the same place.
let zenith_signal = std::str::from_utf8(&bytes)?.trim();
let prev_lsn = match zenith_signal {
"PREV LSN: none" => Lsn(0),
"PREV LSN: invalid" => Lsn(0),
other => {
let split = other.split(':').collect::<Vec<_>>();
split[1]
.trim()
.parse::<Lsn>()
.context("can't parse zenith.signal")?
}
};
// zenith.signal is not necessarily the last file, that we handle
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.tline.writer();
writer.finish_write(prev_lsn);

View File

@@ -232,23 +232,32 @@ impl Repository for LayeredRepository {
fn create_empty_timeline(
&self,
timelineid: ZTimelineId,
timeline_id: ZTimelineId,
initdb_lsn: Lsn,
) -> Result<Arc<LayeredTimeline>> {
let mut timelines = self.timelines.lock().unwrap();
let vacant_timeline_entry = match timelines.entry(timeline_id) {
Entry::Occupied(_) => bail!("Timeline already exists"),
Entry::Vacant(vacant_entry) => vacant_entry,
};
let timeline_path = self.conf.timeline_path(&timeline_id, &self.tenant_id);
if timeline_path.exists() {
bail!("Timeline directory already exists, but timeline is missing in repository map. This is a bug.")
}
// Create the timeline directory, and write initial metadata to file.
crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenant_id))?;
crashsafe_dir::create_dir_all(timeline_path)?;
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn);
Self::save_metadata(self.conf, timelineid, self.tenant_id, &metadata, true)?;
Self::save_metadata(self.conf, timeline_id, self.tenant_id, &metadata, true)?;
let timeline = LayeredTimeline::new(
self.conf,
Arc::clone(&self.tenant_conf),
metadata,
None,
timelineid,
timeline_id,
self.tenant_id,
Arc::clone(&self.walredo_mgr),
self.upload_layers,
@@ -257,12 +266,7 @@ impl Repository for LayeredRepository {
// Insert if not exists
let timeline = Arc::new(timeline);
match timelines.entry(timelineid) {
Entry::Occupied(_) => bail!("Timeline already exists"),
Entry::Vacant(vacant) => {
vacant.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)))
}
};
vacant_timeline_entry.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)));
Ok(timeline)
}

View File

@@ -951,7 +951,10 @@ impl postgres_backend::Handler for PageServerHandler {
match self.handle_import_basebackup(pgb, tenant, timeline, base_lsn, end_lsn) {
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
Err(e) => pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?,
Err(e) => {
error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?
}
};
} else if query_string.starts_with("import wal ") {
// Import the `pg_wal` section of a basebackup.
@@ -970,7 +973,10 @@ impl postgres_backend::Handler for PageServerHandler {
match self.handle_import_wal(pgb, tenant, timeline, start_lsn, end_lsn) {
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
Err(e) => pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?,
Err(e) => {
error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?
}
};
} else if query_string.to_ascii_lowercase().starts_with("set ") {
// important because psycopg2 executes "SET datestyle TO 'ISO'"

View File

@@ -225,7 +225,7 @@ pub trait Repository: Send + Sync {
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
fn create_empty_timeline(
&self,
timelineid: ZTimelineId,
timeline_id: ZTimelineId,
initdb_lsn: Lsn,
) -> Result<Arc<Self::Timeline>>;
@@ -473,6 +473,7 @@ pub mod repo_harness {
walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
data_checksums_enabled: Some(tenant_conf.data_checksums_enabled),
}
}
}
@@ -636,6 +637,19 @@ mod tests {
Ok(())
}
#[test]
fn no_duplicate_timelines() -> Result<()> {
let repo = RepoHarness::create("no_duplicate_timelines")?.load();
let _ = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
match repo.create_empty_timeline(TIMELINE_ID, Lsn(0)) {
Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!(e.to_string(), "Timeline already exists"),
}
Ok(())
}
/// Convenience function to create a page image with given string as the only content
pub fn test_value(s: &str) -> Value {
let mut buf = BytesMut::new();

View File

@@ -44,13 +44,23 @@ where
index_part_path.display()
)
})?;
let mut index_part_download =
storage
.download(&part_storage_path)
.await
.with_context(|| {
format!("Failed to open download stream for for storage path {part_storage_path:?}")
})?;
let mut index_part_bytes = Vec::new();
storage
.download(&part_storage_path, &mut index_part_bytes)
.await
.with_context(|| {
format!("Failed to download an index part from storage path {part_storage_path:?}")
})?;
io::copy(
&mut index_part_download.download_stream,
&mut index_part_bytes,
)
.await
.with_context(|| {
format!("Failed to download an index part from storage path {part_storage_path:?}")
})?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| {
format!("Failed to deserialize index part file from storage path '{part_storage_path:?}'")
@@ -162,15 +172,19 @@ where
temp_file_path.display()
)
})?;
storage
.download(&layer_storage_path, &mut destination_file)
let mut download = storage
.download(&layer_storage_path)
.await
.with_context(|| {
format!(
"Failed to download a layer from storage path '{layer_storage_path:?}'"
"Failed to open a download stream for layer with remote storage path '{layer_storage_path:?}'"
)
})?;
io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| {
format!(
"Failed to download layer with remote storage path '{layer_storage_path:?}' into file '{}'", temp_file_path.display()
)
})?;
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations

View File

@@ -38,6 +38,10 @@ pub mod defaults {
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
// Turn off data checksums by default to do not affect old tenants.
// We turn it on explicitly for all new tenants.
pub const DEFAULT_DATA_CHECKSUMS: bool = false;
}
/// Per-tenant configuration options
@@ -83,6 +87,7 @@ pub struct TenantConf {
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
pub data_checksums_enabled: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -105,6 +110,7 @@ pub struct TenantConfOpt {
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub data_checksums_enabled: Option<bool>,
}
impl TenantConfOpt {
@@ -135,6 +141,9 @@ impl TenantConfOpt {
.lagging_wal_timeout
.unwrap_or(global_conf.lagging_wal_timeout),
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
data_checksums_enabled: self
.data_checksums_enabled
.unwrap_or(global_conf.data_checksums_enabled),
}
}
@@ -172,6 +181,9 @@ impl TenantConfOpt {
if let Some(max_lsn_wal_lag) = other.max_lsn_wal_lag {
self.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(data_checksums_enabled) = other.data_checksums_enabled {
self.data_checksums_enabled = Some(data_checksums_enabled);
}
}
}
@@ -199,6 +211,7 @@ impl TenantConf {
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
data_checksums_enabled: DEFAULT_DATA_CHECKSUMS,
}
}
@@ -229,6 +242,7 @@ impl TenantConf {
.unwrap(),
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.unwrap(),
data_checksums_enabled: defaults::DEFAULT_DATA_CHECKSUMS,
}
}
}

View File

@@ -11,7 +11,7 @@ use crate::tenant_config::TenantConfOpt;
use crate::thread_mgr::ThreadKind;
use crate::timelines::CreateRepo;
use crate::walredo::PostgresRedoManager;
use crate::{thread_mgr, timelines, walreceiver};
use crate::{tenant_config, thread_mgr, timelines, walreceiver};
use crate::{DatadirTimelineImpl, RepositoryImpl};
use anyhow::{bail, Context};
use serde::{Deserialize, Serialize};
@@ -266,7 +266,14 @@ pub fn create_tenant_repository(
Ok(None)
}
Entry::Vacant(v) => {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let data_checksums_enabled = tenant_conf
.data_checksums_enabled
.unwrap_or(tenant_config::defaults::DEFAULT_DATA_CHECKSUMS);
let wal_redo_manager = Arc::new(PostgresRedoManager::new(
conf,
data_checksums_enabled,
tenant_id,
));
let repo = timelines::create_repo(
conf,
tenant_conf,
@@ -567,10 +574,16 @@ fn load_local_repo(
tenant_id: ZTenantId,
remote_index: &RemoteIndex,
) -> anyhow::Result<Arc<RepositoryImpl>> {
// Restore tenant config
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
let mut m = tenants_state::write_tenants();
let tenant = m.entry(tenant_id).or_insert_with(|| {
let data_checksums_enabled = tenant_conf
.data_checksums_enabled
.unwrap_or(tenant_config::defaults::DEFAULT_DATA_CHECKSUMS);
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
let walredo_mgr = PostgresRedoManager::new(conf, data_checksums_enabled, tenant_id);
// Set up an object repository, for actual data storage.
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
@@ -588,8 +601,6 @@ fn load_local_repo(
}
});
// Restore tenant config
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
tenant.repo.update_tenant_config(tenant_conf)?;
Ok(Arc::clone(&tenant.repo))

View File

@@ -119,8 +119,6 @@ pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
pub fn init_tenant_task_pool() -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("tenant-task-worker")
.worker_threads(40) // Way more than necessary
.max_blocking_threads(100) // Way more than necessary
.enable_all()
.build()?;

View File

@@ -253,6 +253,7 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
.args(&["-D", &initdbpath.to_string_lossy()])
.args(&["-U", &conf.superuser])
.args(&["-E", "utf8"])
.arg("--data-checksums")
.arg("--no-instructions")
// This is only used for a temporary installation that is deleted shortly after,
// so no need to fsync it

View File

@@ -24,7 +24,7 @@
use anyhow::Context;
use postgres_ffi::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{page_is_new, page_set_lsn};
use postgres_ffi::{page_is_new, page_set_checksum, page_set_lsn};
use anyhow::Result;
use bytes::{Buf, Bytes, BytesMut};
@@ -313,6 +313,8 @@ impl<'a, R: Repository> WalIngest<'a, R> {
if !page_is_new(&image) {
page_set_lsn(&mut image, lsn)
}
unsafe { page_set_checksum(&mut image, blk.blkno) };
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())?;
} else {

View File

@@ -91,7 +91,6 @@ pub fn init_wal_receiver_main_thread(
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("wal-receiver-runtime-thread")
.worker_threads(40)
.enable_all()
.on_thread_start(|| IS_WAL_RECEIVER.with(|c| c.set(true)))
.build()

View File

@@ -48,7 +48,8 @@ use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_bitshift;
use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_offset;
use postgres_ffi::nonrelfile_utils::mx_offset_to_member_offset;
use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::wal_record_verify_checksum;
use postgres_ffi::{page_verify_checksum, pg_constants, XLogRecord};
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
@@ -131,6 +132,7 @@ lazy_static! {
pub struct PostgresRedoManager {
tenantid: ZTenantId,
conf: &'static PageServerConf,
data_checksums_enabled: bool,
process: Mutex<Option<PostgresRedoProcess>>,
}
@@ -229,11 +231,16 @@ impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
///
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
pub fn new(
conf: &'static PageServerConf,
data_checksums_enabled: bool,
tenantid: ZTenantId,
) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
tenantid,
conf,
data_checksums_enabled,
process: Mutex::new(None),
}
}
@@ -268,7 +275,13 @@ impl PostgresRedoManager {
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
let result = process
.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout)
.apply_wal_records(
buf_tag,
base_img,
records,
wal_redo_timeout,
self.data_checksums_enabled,
)
.map_err(WalRedoError::IoError);
let end_time = Instant::now();
@@ -619,6 +632,7 @@ impl PostgresRedoProcess {
info!("running initdb in {:?}", datadir.display());
let initdb = Command::new(conf.pg_bin_dir().join("initdb"))
.args(&["-D", &datadir.to_string_lossy()])
.arg("--data-checksums")
.arg("-N")
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir())
@@ -716,6 +730,7 @@ impl PostgresRedoProcess {
base_img: Option<Bytes>,
records: &[(Lsn, ZenithWalRecord)],
wal_redo_timeout: Duration,
data_checksums_enabled: bool,
) -> Result<Bytes, std::io::Error> {
// Serialize all the messages to send the WAL redo process first.
//
@@ -725,6 +740,15 @@ impl PostgresRedoProcess {
let mut writebuf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
// Checksums could be not stamped for old tenants, so check them only if they
// are enabled (this is controlled by per-tenant config).
if data_checksums_enabled && !unsafe { page_verify_checksum(&img, tag.blknum) } {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("block {} of relation {} is invalid", tag.blknum, tag.rel),
));
}
build_push_page_msg(tag, &img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
@@ -733,6 +757,27 @@ impl PostgresRedoProcess {
rec: postgres_rec,
} = rec
{
let xlogrec = XLogRecord::from_buf(postgres_rec).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"could not deserialize WAL record for relation {} at LSN {}: {}",
tag.rel, lsn, e
),
)
})?;
// WAL records always have a checksum, check it before sending to redo process.
// It doesn't do these checks itself.
if !wal_record_verify_checksum(&xlogrec, postgres_rec) {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"WAL record for relation {} at LSN {} is invalid",
tag.rel, lsn
),
));
}
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
return Err(Error::new(

View File

@@ -2,18 +2,16 @@ use anyhow::{Context, Result};
use etcd_broker::subscription_key::{
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
};
use tokio::io::AsyncRead;
use tokio::task::JoinHandle;
use std::cmp::min;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use postgres_ffi::xlog_utils::{
XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, MAX_SEND_SIZE, PG_TLI,
};
use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI};
use remote_storage::{GenericRemoteStorage, RemoteStorage};
use tokio::fs::File;
use tokio::runtime::Builder;
@@ -452,45 +450,41 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
pub async fn read_object(
file_path: PathBuf,
offset: u64,
) -> (impl AsyncRead, JoinHandle<Result<()>>) {
let storage = REMOTE_STORAGE.get().expect("failed to get remote storage");
) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead>>> {
let download = match REMOTE_STORAGE
.get()
.context("Failed to get remote storage")?
.as_ref()
.context("No remote storage configured")?
{
GenericRemoteStorage::Local(local_storage) => {
let source = local_storage.remote_object_id(&file_path)?;
let (mut pipe_writer, pipe_reader) = tokio::io::duplex(MAX_SEND_SIZE);
let copy_result = tokio::spawn(async move {
let res = match storage.as_ref().unwrap() {
GenericRemoteStorage::Local(local_storage) => {
let source = local_storage.remote_object_id(&file_path)?;
info!(
"local download about to start from {} at offset {}",
source.display(),
offset
);
local_storage
.download_byte_range(&source, offset, None, &mut pipe_writer)
.await
}
GenericRemoteStorage::S3(s3_storage) => {
let s3key = s3_storage.remote_object_id(&file_path)?;
info!(
"S3 download about to start from {:?} at offset {}",
s3key, offset
);
s3_storage
.download_byte_range(&s3key, offset, None, &mut pipe_writer)
.await
}
};
if let Err(e) = res {
error!("failed to download WAL segment from remote storage: {}", e);
Err(e)
} else {
Ok(())
info!(
"local download about to start from {} at offset {}",
source.display(),
offset
);
local_storage
.download_byte_range(&source, offset, None)
.await
}
});
GenericRemoteStorage::S3(s3_storage) => {
let s3key = s3_storage.remote_object_id(&file_path)?;
(pipe_reader, copy_result)
info!(
"S3 download about to start from {:?} at offset {}",
s3key, offset
);
s3_storage.download_byte_range(&s3key, offset, None).await
}
}
.with_context(|| {
format!(
"Failed to open WAL segment download stream for local storage path {}",
file_path.display()
)
})?;
Ok(download.download_stream)
}

View File

@@ -604,8 +604,7 @@ impl WalReader {
// Try to open remote file, if remote reads are enabled
if self.enable_remote_read {
let (reader, _) = read_object(wal_file_path, xlogoff as u64).await;
return Ok(Box::pin(reader));
return read_object(wal_file_path, xlogoff as u64).await;
}
bail!("WAL segment is not found")

View File

@@ -1,222 +0,0 @@
#
# Simple script to export nodes from one pageserver
# and import them into another page server
#
from os import path
import os
import requests
import uuid
import subprocess
import argparse
from pathlib import Path
# directory to save exported tar files to
basepath = path.dirname(path.abspath(__file__))
class NeonPageserverApiException(Exception):
pass
class NeonPageserverHttpClient(requests.Session):
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
def verbose_error(self, res: requests.Response):
try:
res.raise_for_status()
except requests.RequestException as e:
try:
msg = res.json()['msg']
except:
msg = ''
raise NeonPageserverApiException(msg) from e
def check_status(self):
self.get(f"http://{self.host}:{self.port}/v1/status").raise_for_status()
def tenant_list(self):
res = self.get(f"http://{self.host}:{self.port}/v1/tenant")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def tenant_create(self, new_tenant_id: uuid.UUID, ok_if_exists):
res = self.post(
f"http://{self.host}:{self.port}/v1/tenant",
json={
'new_tenant_id': new_tenant_id.hex,
},
)
if res.status_code == 409:
if ok_if_exists:
print(f'could not create tenant: already exists for id {new_tenant_id}')
else:
res.raise_for_status()
elif res.status_code == 201:
print(f'created tenant {new_tenant_id}')
else:
self.verbose_error(res)
return new_tenant_id
def timeline_list(self, tenant_id: uuid.UUID):
res = self.get(f"http://{self.host}:{self.port}/v1/tenant/{tenant_id.hex}/timeline")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def main(args: argparse.Namespace):
old_pageserver_host = args.old_pageserver_host
new_pageserver_host = args.new_pageserver_host
tenants = args.tenants
old_http_client = NeonPageserverHttpClient(old_pageserver_host, args.old_pageserver_http_port)
old_http_client.check_status()
old_pageserver_connstr = f"postgresql://{old_pageserver_host}:{args.old_pageserver_pg_port}"
new_http_client = NeonPageserverHttpClient(new_pageserver_host, args.new_pageserver_http_port)
new_http_client.check_status()
new_pageserver_connstr = f"postgresql://{new_pageserver_host}:{args.new_pageserver_pg_port}"
psql_env = {**os.environ, 'LD_LIBRARY_PATH': '/usr/local/lib/'}
for tenant_id in tenants:
print(f"Tenant: {tenant_id}")
timelines = old_http_client.timeline_list(uuid.UUID(tenant_id))
print(f"Timelines: {timelines}")
# Create tenant in new pageserver
if args.only_import is False:
new_http_client.tenant_create(uuid.UUID(tenant_id), args.ok_if_exists)
for timeline in timelines:
# Export timelines from old pageserver
if args.only_import is False:
query = f"fullbackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']}"
cmd = ["psql", "--no-psqlrc", old_pageserver_connstr, "-c", query]
print(f"Running: {cmd}")
tar_filename = path.join(basepath,
f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar")
stderr_filename = path.join(
basepath, f"{timeline['tenant_id']}_{timeline['timeline_id']}.stderr")
with open(tar_filename, 'w') as stdout_f:
with open(stderr_filename, 'w') as stderr_f:
print(f"(capturing output to {tar_filename})")
subprocess.run(cmd, stdout=stdout_f, stderr=stderr_f, env=psql_env)
print(f"Done export: {tar_filename}")
# Import timelines to new pageserver
psql_path = Path(args.psql_path)
import_cmd = f"import basebackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']} {timeline['local']['last_record_lsn']}"
tar_filename = path.join(basepath,
f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar")
full_cmd = rf"""cat {tar_filename} | {psql_path} {new_pageserver_connstr} -c '{import_cmd}' """
stderr_filename2 = path.join(
basepath, f"import_{timeline['tenant_id']}_{timeline['timeline_id']}.stderr")
stdout_filename = path.join(
basepath, f"import_{timeline['tenant_id']}_{timeline['timeline_id']}.stdout")
print(f"Running: {full_cmd}")
with open(stdout_filename, 'w') as stdout_f:
with open(stderr_filename2, 'w') as stderr_f:
print(f"(capturing output to {stdout_filename})")
subprocess.run(full_cmd,
stdout=stdout_f,
stderr=stderr_f,
env=psql_env,
shell=True)
print(f"Done import")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--tenant-id',
dest='tenants',
required=True,
nargs='+',
help='Id of the tenant to migrate. You can pass multiple arguments',
)
parser.add_argument(
'--from-host',
dest='old_pageserver_host',
required=True,
help='Host of the pageserver to migrate data from',
)
parser.add_argument(
'--from-http-port',
dest='old_pageserver_http_port',
required=False,
type=int,
default=9898,
help='HTTP port of the pageserver to migrate data from. Default: 9898',
)
parser.add_argument(
'--from-pg-port',
dest='old_pageserver_pg_port',
required=False,
type=int,
default=6400,
help='pg port of the pageserver to migrate data from. Default: 6400',
)
parser.add_argument(
'--to-host',
dest='new_pageserver_host',
required=True,
help='Host of the pageserver to migrate data to',
)
parser.add_argument(
'--to-http-port',
dest='new_pageserver_http_port',
required=False,
default=9898,
type=int,
help='HTTP port of the pageserver to migrate data to. Default: 9898',
)
parser.add_argument(
'--to-pg-port',
dest='new_pageserver_pg_port',
required=False,
default=6400,
type=int,
help='pg port of the pageserver to migrate data to. Default: 6400',
)
parser.add_argument(
'--ignore-tenant-exists',
dest='ok_if_exists',
required=False,
help=
'Ignore error if we are trying to create the tenant that already exists. It can be dangerous if existing tenant already contains some data.',
)
parser.add_argument(
'--psql-path',
dest='psql_path',
required=False,
default='/usr/local/bin/psql',
help='Path to the psql binary. Default: /usr/local/bin/psql',
)
parser.add_argument(
'--only-import',
dest='only_import',
required=False,
default=False,
action='store_true',
help='Skip export and tenant creation part',
)
args = parser.parse_args()
main(args)

View File

@@ -37,7 +37,7 @@ You can run all the tests with:
If you want to run all the tests in a particular file:
`./scripts/pytest test_pgbench.py`
`./scripts/pytest test_runner/batch_others/test_restart_compute.py`
If you want to run all tests that have the string "bench" in their names:

View File

@@ -1,74 +0,0 @@
import pytest
from contextlib import closing
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.log_helper import log
#
# Test restarting and recreating a postgres instance
#
@pytest.mark.parametrize('with_safekeepers', [False, True])
def test_restart_compute(neon_env_builder: NeonEnvBuilder, with_safekeepers: bool):
neon_env_builder.auth_enabled = True
if with_safekeepers:
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_restart_compute')
pg = env.postgres.create_start('test_restart_compute')
log.info("postgres is running on 'test_restart_compute' branch")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute('SELECT sum(key) FROM t')
r = cur.fetchone()
assert r == (5000050000, )
log.info(f"res = {r}")
# Remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the row
cur.execute('SELECT sum(key) FROM t')
r = cur.fetchone()
assert r == (5000050000, )
log.info(f"res = {r}")
# Insert another row
cur.execute("INSERT INTO t VALUES (100001, 'payload2')")
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
log.info(f"res = {r}")
# Again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute')
# That select causes lots of FPI's and increases probability of wakeepers
# lagging behind after query completion
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the rows
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
log.info(f"res = {r}")
# And again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the rows
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
log.info(f"res = {r}")

View File

@@ -10,8 +10,8 @@ from typing import Optional
import signal
import pytest
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir
from fixtures.utils import lsn_from_hex
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir, base_dir
from fixtures.utils import lsn_from_hex, subprocess_capture
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
@@ -101,13 +101,23 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
log.info('load thread stopped')
@pytest.mark.skip(
reason=
"needs to replace callmemaybe call with better idea how to migrate timelines between pageservers"
)
@pytest.mark.parametrize(
'method',
[
# A minor migration involves no storage breaking changes.
# It is done by attaching the tenant to a new pageserver.
'minor',
# A major migration involves exporting a postgres datadir
# basebackup and importing it into the new pageserver.
# This kind of migration can tolerate breaking changes
# to storage format
pytest.param('major', marks=pytest.mark.xfail(reason="Not implemented")),
])
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
port_distributor: PortDistributor,
test_output_dir,
method: str,
with_load: str):
neon_env_builder.enable_local_fs_remote_storage()
@@ -157,8 +167,11 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
load_stop_event = threading.Event()
load_ok_event = threading.Event()
load_thread = threading.Thread(target=load,
args=(tenant_pg, load_stop_event, load_ok_event))
load_thread = threading.Thread(
target=load,
args=(tenant_pg, load_stop_event, load_ok_event),
daemon=True, # To make sure the child dies when the parent errors
)
load_thread.start()
# run checkpoint manually to be sure that data landed in remote storage
@@ -188,30 +201,47 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
new_pageserver_http_port,
neon_env_builder.broker):
# call to attach timeline to new pageserver
new_pageserver_http.timeline_attach(tenant, timeline)
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_until(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
# Migrate either by attaching from s3 or import/export basebackup
if method == "major":
cmd = [
"python",
os.path.join(base_dir, "scripts/export_import_between_pageservers.py"),
"--tenant-id",
tenant.hex,
"--from-host",
"localhost",
"--from-http-port",
str(pageserver_http.port),
"--from-pg-port",
str(env.pageserver.service_port.pg),
"--to-host",
"localhost",
"--to-http-port",
str(new_pageserver_http_port),
"--to-pg-port",
str(new_pageserver_pg_port),
"--psql-path",
os.path.join(pg_distrib_dir, "bin", "psql"),
"--work-dir",
os.path.join(test_output_dir),
]
subprocess_capture(str(env.repo_dir), cmd, check=True)
elif method == "minor":
# call to attach timeline to new pageserver
new_pageserver_http.timeline_attach(tenant, timeline)
# when load is active these checks can break because lsns are not static
# so lets check with some margin
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
0.03)
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_until(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
# callmemaybe to start replication from safekeeper to the new pageserver
# when there is no load there is a clean checkpoint and no wal delta
# needs to be streamed to the new pageserver
# TODO (rodionov) use attach to start replication
with pg_cur(PgProtocol(host='localhost', port=new_pageserver_pg_port)) as cur:
# "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'"
safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
cur.execute("callmemaybe {} {} {}".format(tenant.hex,
timeline.hex,
safekeeper_connstring))
# when load is active these checks can break because lsns are not static
# so lets check with some margin
assert_abs_margin_ratio(
lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
0.03)
tenant_pg.stop()

View File

@@ -682,7 +682,7 @@ class ProposerPostgres(PgProtocol):
def initdb(self):
""" Run initdb """
args = ["initdb", "-U", "cloud_admin", "-D", self.pg_data_dir_path()]
args = ["initdb", "-U", "cloud_admin", "-D", self.pg_data_dir_path(), "--data-checksums"]
self.pg_bin.run(args)
def start(self):

View File

@@ -1,5 +1,6 @@
import asyncio
import uuid
import asyncpg
import random
import time
@@ -7,7 +8,7 @@ import time
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, Safekeeper
from fixtures.log_helper import getLogger
from fixtures.utils import lsn_from_hex, lsn_to_hex
from typing import List
from typing import List, Optional
log = getLogger('root.safekeeper_async')
@@ -234,3 +235,156 @@ def test_restarts_frequent_checkpoints(neon_env_builder: NeonEnvBuilder):
# we try to simulate large (flush_lsn - truncate_lsn) lag, to test that WAL segments
# are not removed before broadcasted to all safekeepers, with the help of replication slot
asyncio.run(run_restarts_under_load(env, pg, env.safekeepers, period_time=15, iterations=5))
def postgres_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]):
pg = Postgres(
env,
tenant_id=env.initial_tenant,
port=env.port_distributor.get_port(),
# In these tests compute has high probability of terminating on its own
# before our stop() due to lost consensus leadership.
check_stop_result=False)
# embed current time in node name
node_name = pgdir_name or f'pg_node_{time.time()}'
return pg.create_start(branch_name=branch,
node_name=node_name,
config_lines=['log_statement=all'])
async def exec_compute_query(env: NeonEnv,
branch: str,
query: str,
pgdir_name: Optional[str] = None):
with postgres_create_start(env, branch=branch, pgdir_name=pgdir_name) as pg:
before_conn = time.time()
conn = await pg.connect_async()
res = await conn.fetch(query)
await conn.close()
after_conn = time.time()
log.info(f'{query} took {after_conn - before_conn}s')
return res
async def run_compute_restarts(env: NeonEnv,
queries=16,
batch_insert=10000,
branch='test_compute_restarts'):
cnt = 0
sum = 0
await exec_compute_query(env, branch, 'CREATE TABLE t (i int)')
for i in range(queries):
if i % 4 == 0:
await exec_compute_query(
env, branch, f'INSERT INTO t SELECT 1 FROM generate_series(1, {batch_insert})')
sum += batch_insert
cnt += batch_insert
elif (i % 4 == 1) or (i % 4 == 3):
# Note that select causes lots of FPI's and increases probability of safekeepers
# standing at different LSNs after compute termination.
actual_sum = (await exec_compute_query(env, branch, 'SELECT SUM(i) FROM t'))[0][0]
assert actual_sum == sum, f'Expected sum={sum}, actual={actual_sum}'
elif i % 4 == 2:
await exec_compute_query(env, branch, 'UPDATE t SET i = i + 1')
sum += cnt
# Add a test which creates compute for every query, and then destroys it right after.
def test_compute_restarts(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_compute_restarts')
asyncio.run(run_compute_restarts(env))
class BackgroundCompute(object):
def __init__(self, index: int, env: NeonEnv, branch: str):
self.index = index
self.env = env
self.branch = branch
self.running = False
self.stopped = False
self.total_tries = 0
self.successful_queries: List[int] = []
async def run(self):
if self.running:
raise Exception('BackgroundCompute is already running')
self.running = True
i = 0
while not self.stopped:
try:
verify_key = (self.index << 16) + i
i += 1
self.total_tries += 1
res = await exec_compute_query(
self.env,
self.branch,
f'INSERT INTO query_log(index, verify_key) VALUES ({self.index}, {verify_key}) RETURNING verify_key',
pgdir_name=f'bgcompute{self.index}_key{verify_key}',
)
log.info(f'result: {res}')
if len(res) != 1:
raise Exception('No result returned')
if res[0][0] != verify_key:
raise Exception('Wrong result returned')
self.successful_queries.append(verify_key)
except Exception as e:
log.info(f'BackgroundCompute {self.index} query failed: {e}')
# With less sleep, there is a very big chance of not committing
# anything or only 1 xact during test run.
await asyncio.sleep(2 * random.random())
self.running = False
async def run_concurrent_computes(env: NeonEnv,
num_computes=10,
run_seconds=20,
branch='test_concurrent_computes'):
await exec_compute_query(
env,
branch,
'CREATE TABLE query_log (t timestamp default now(), index int, verify_key int)')
computes = [BackgroundCompute(i, env, branch) for i in range(num_computes)]
background_tasks = [asyncio.create_task(compute.run()) for compute in computes]
await asyncio.sleep(run_seconds)
for compute in computes[1:]:
compute.stopped = True
log.info("stopped all tasks but one")
# work for some time with only one compute -- it should be able to make some xacts
await asyncio.sleep(8)
computes[0].stopped = True
await asyncio.gather(*background_tasks)
result = await exec_compute_query(env, branch, 'SELECT * FROM query_log')
# we should have inserted something while single compute was running
assert len(result) >= 4
log.info(f'Executed {len(result)} queries')
for row in result:
log.info(f'{row[0]} {row[1]} {row[2]}')
# ensure everything reported as committed wasn't lost
for compute in computes:
for verify_key in compute.successful_queries:
assert verify_key in [row[2] for row in result]
# Run multiple computes concurrently, creating-destroying them after single
# query. Ensure we don't lose any xacts reported as committed and be able to
# progress once only one compute remains.
def test_concurrent_computes(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_concurrent_computes')
asyncio.run(run_concurrent_computes(env))

View File

@@ -1160,6 +1160,7 @@ class NeonCli:
node_name: str,
tenant_id: Optional[uuid.UUID] = None,
destroy=False,
check_return_code=True,
) -> 'subprocess.CompletedProcess[str]':
args = [
'pg',
@@ -1172,7 +1173,7 @@ class NeonCli:
if node_name is not None:
args.append(node_name)
return self.raw_cli(args)
return self.raw_cli(args, check_return_code=check_return_code)
def raw_cli(self,
arguments: List[str],
@@ -1188,6 +1189,8 @@ class NeonCli:
>>> result = env.neon_cli.raw_cli(...)
>>> assert result.stderr == ""
>>> log.info(result.stdout)
If `check_return_code`, on non-zero exit code logs failure and raises.
"""
assert type(arguments) == list
@@ -1213,27 +1216,27 @@ class NeonCli:
env_vars[var] = val
# Intercept CalledProcessError and print more info
try:
res = subprocess.run(args,
env=env_vars,
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
res = subprocess.run(args,
env=env_vars,
check=False,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if not res.returncode:
log.info(f"Run success: {res.stdout}")
except subprocess.CalledProcessError as exc:
elif check_return_code:
# this way command output will be in recorded and shown in CI in failure message
msg = f"""\
Run failed: {exc}
stdout: {exc.stdout}
stderr: {exc.stderr}
Run {res.args} failed:
stdout: {res.stdout}
stderr: {res.stderr}
"""
log.info(msg)
raise Exception(msg) from subprocess.CalledProcessError(res.returncode,
res.args,
res.stdout,
res.stderr)
raise Exception(msg) from exc
if check_return_code:
res.check_returncode()
return res
@@ -1526,7 +1529,11 @@ def static_proxy(vanilla_pg, port_distributor) -> Iterator[NeonProxy]:
class Postgres(PgProtocol):
""" An object representing a running postgres daemon. """
def __init__(self, env: NeonEnv, tenant_id: uuid.UUID, port: int):
def __init__(self,
env: NeonEnv,
tenant_id: uuid.UUID,
port: int,
check_stop_result: bool = True):
super().__init__(host='localhost', port=port, user='cloud_admin', dbname='postgres')
self.env = env
self.running = False
@@ -1534,6 +1541,7 @@ class Postgres(PgProtocol):
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id
self.port = port
self.check_stop_result = check_stop_result
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<node_name>/postgresql.conf
def create(
@@ -1585,8 +1593,6 @@ class Postgres(PgProtocol):
port=self.port)
self.running = True
log.info(f"stdout: {run_result.stdout}")
return self
def pg_data_dir_path(self) -> str:
@@ -1650,7 +1656,9 @@ class Postgres(PgProtocol):
if self.running:
assert self.node_name is not None
self.env.neon_cli.pg_stop(self.node_name, self.tenant_id)
self.env.neon_cli.pg_stop(self.node_name,
self.tenant_id,
check_return_code=self.check_stop_result)
self.running = False
return self
@@ -1662,7 +1670,10 @@ class Postgres(PgProtocol):
"""
assert self.node_name is not None
self.env.neon_cli.pg_stop(self.node_name, self.tenant_id, True)
self.env.neon_cli.pg_stop(self.node_name,
self.tenant_id,
True,
check_return_code=self.check_stop_result)
self.node_name = None
self.running = False
@@ -1681,6 +1692,8 @@ class Postgres(PgProtocol):
Returns self.
"""
started_at = time.time()
self.create(
branch_name=branch_name,
node_name=node_name,
@@ -1688,6 +1701,8 @@ class Postgres(PgProtocol):
lsn=lsn,
).start()
log.info(f"Postgres startup took {time.time() - started_at} seconds")
return self
def __enter__(self):