mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-18 02:42:56 +00:00
Compare commits
20 Commits
projects_m
...
pg-checksu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0eca1d19de | ||
|
|
53b9cb915e | ||
|
|
cc6ffb558d | ||
|
|
b135dbb85d | ||
|
|
6059801943 | ||
|
|
2501afba6e | ||
|
|
ae116ff0a9 | ||
|
|
e6ea049165 | ||
|
|
747d009bb4 | ||
|
|
cb5df3c627 | ||
|
|
0e3456351f | ||
|
|
1faf49da0f | ||
|
|
4a96259bdd | ||
|
|
242af75653 | ||
|
|
8fabdc6708 | ||
|
|
07df7c2edd | ||
|
|
50821c0a3c | ||
|
|
68adfe0fc8 | ||
|
|
cfdf79aceb | ||
|
|
32560e75d2 |
@@ -12,6 +12,7 @@ pageservers
|
||||
safekeepers
|
||||
|
||||
[storage:vars]
|
||||
env_name = neon-stress
|
||||
console_mgmt_base_url = http://neon-stress-console.local
|
||||
bucket_name = neon-storage-ireland
|
||||
bucket_region = eu-west-1
|
||||
|
||||
@@ -495,8 +495,8 @@ jobs:
|
||||
name: Re-deploy proxy
|
||||
command: |
|
||||
DOCKER_TAG=$(git log --oneline|wc -l)
|
||||
helm upgrade neon-proxy neondatabase/neon-proxy --install -f .circleci/helm-values/staging.proxy.yaml --set image.tag=${DOCKER_TAG} --wait
|
||||
helm upgrade neon-proxy-scram neondatabase/neon-proxy --install -f .circleci/helm-values/staging.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait
|
||||
helm upgrade neon-proxy neondatabase/neon-proxy --install -f .circleci/helm-values/staging.proxy.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
helm upgrade neon-proxy-scram neondatabase/neon-proxy --install -f .circleci/helm-values/staging.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
|
||||
deploy-neon-stress:
|
||||
docker:
|
||||
|
||||
@@ -85,7 +85,7 @@ runs:
|
||||
EXTRA_PARAMS="-n4 $EXTRA_PARAMS"
|
||||
fi
|
||||
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
|
||||
if [[ "$GITHUB_REF" == "main" ]]; then
|
||||
if [[ "$GITHUB_REF" == "refs/heads/main" ]]; then
|
||||
mkdir -p "$PERF_REPORT_DIR"
|
||||
EXTRA_PARAMS="--out-dir $PERF_REPORT_DIR $EXTRA_PARAMS"
|
||||
fi
|
||||
@@ -115,7 +115,7 @@ runs:
|
||||
-rA $TEST_SELECTION $EXTRA_PARAMS
|
||||
|
||||
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
|
||||
if [[ "$GITHUB_REF" == "main" ]]; then
|
||||
if [[ "$GITHUB_REF" == "refs/heads/main" ]]; then
|
||||
export REPORT_FROM="$PERF_REPORT_DIR"
|
||||
export REPORT_TO=local
|
||||
scripts/generate_and_push_perf_report.sh
|
||||
|
||||
3
.github/workflows/build_and_test.yml
vendored
3
.github/workflows/build_and_test.yml
vendored
@@ -271,6 +271,9 @@ jobs:
|
||||
test_selection: performance
|
||||
run_in_parallel: false
|
||||
save_perf_report: true
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
# XXX: no coverage data handling here, since benchmarks are run on release builds,
|
||||
# while coverage is currently collected for the debug ones
|
||||
|
||||
|
||||
@@ -248,18 +248,20 @@ pub fn wait_for_postgres(pg: &mut Child, port: &str, pgdata: &Path) -> Result<()
|
||||
bail!("Postgres exited unexpectedly with code {}", code);
|
||||
}
|
||||
|
||||
if pid_path.exists() {
|
||||
let file = BufReader::new(File::open(&pid_path)?);
|
||||
let status = file
|
||||
.lines()
|
||||
.last()
|
||||
.unwrap()
|
||||
.unwrap_or_else(|_| "unknown".to_string());
|
||||
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
|
||||
// Check that we can open pid file first.
|
||||
if let Ok(file) = File::open(&pid_path) {
|
||||
let file = BufReader::new(file);
|
||||
let last_line = file.lines().last();
|
||||
|
||||
// Now Postgres is ready to accept connections
|
||||
if status.trim() == "ready" && can_connect {
|
||||
break;
|
||||
// Pid file could be there and we could read it, but it could be empty, for example.
|
||||
if let Some(Ok(line)) = last_line {
|
||||
let status = line.trim();
|
||||
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
|
||||
|
||||
// Now Postgres is ready to accept connections
|
||||
if status == "ready" && can_connect {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -427,6 +427,7 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
data_checksums_enabled: Some(true),
|
||||
})
|
||||
.send()?
|
||||
.error_from_body()?
|
||||
@@ -436,7 +437,7 @@ impl PageServerNode {
|
||||
.map(|id| {
|
||||
id.parse().with_context(|| {
|
||||
format!(
|
||||
"Failed to parse tennat creation response as tenant id: {}",
|
||||
"Failed to parse tenant creation response as tenant id: {}",
|
||||
id
|
||||
)
|
||||
})
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pg_checksum_page::pg_checksum_page;
|
||||
|
||||
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
|
||||
|
||||
@@ -56,3 +57,55 @@ pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) {
|
||||
pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
|
||||
pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
|
||||
}
|
||||
|
||||
/// Calculate page checksum and stamp it onto the page.
|
||||
/// NB: this will zero out and ignore any existing checksum.
|
||||
/// # Safety
|
||||
/// See safety notes for `pg_checksum_page`
|
||||
pub unsafe fn page_set_checksum(page: &mut [u8], blkno: u32) {
|
||||
let checksum = pg_checksum_page(page, blkno);
|
||||
page[8..10].copy_from_slice(&checksum.to_le_bytes());
|
||||
}
|
||||
|
||||
/// Check if page checksum is valid.
|
||||
/// # Safety
|
||||
/// See safety notes for `pg_checksum_page`
|
||||
pub unsafe fn page_verify_checksum(page: &[u8], blkno: u32) -> bool {
|
||||
let checksum = pg_checksum_page(page, blkno);
|
||||
checksum == u16::from_le_bytes(page[8..10].try_into().unwrap())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::pg_constants::BLCKSZ;
|
||||
use crate::{page_set_checksum, page_verify_checksum};
|
||||
use utils::pg_checksum_page::pg_checksum_page;
|
||||
|
||||
#[test]
|
||||
fn set_and_verify_checksum() {
|
||||
// Create a page with some content and without a correct checksum.
|
||||
let mut page: [u8; BLCKSZ as usize] = [0; BLCKSZ as usize];
|
||||
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ as usize) {
|
||||
*byte = i as u8;
|
||||
}
|
||||
|
||||
// Calculate the checksum.
|
||||
let checksum = unsafe { pg_checksum_page(&page[..], 0) };
|
||||
|
||||
// Sanity check: random bytes in the checksum attribute should not be
|
||||
// a valid checksum.
|
||||
assert_ne!(
|
||||
checksum,
|
||||
u16::from_le_bytes(page[8..10].try_into().unwrap())
|
||||
);
|
||||
|
||||
// Set the actual checksum.
|
||||
unsafe { page_set_checksum(&mut page, 0) };
|
||||
|
||||
// Verify the checksum.
|
||||
assert!(unsafe { page_verify_checksum(&page[..], 0) });
|
||||
|
||||
// Checksum is not valid with another block number.
|
||||
assert!(!unsafe { page_verify_checksum(&page[..], 1) });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@ use super::XLogLongPageHeaderData;
|
||||
use super::XLogPageHeaderData;
|
||||
use super::XLogRecord;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use crc32c::*;
|
||||
use log::*;
|
||||
use std::cmp::min;
|
||||
use thiserror::Error;
|
||||
@@ -198,18 +197,12 @@ impl WalStreamDecoder {
|
||||
}
|
||||
|
||||
// We now have a record in the 'recordbuf' local variable.
|
||||
let xlogrec =
|
||||
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
|
||||
WalDecodeError {
|
||||
msg: format!("xlog record deserialization failed {}", e),
|
||||
lsn: self.lsn,
|
||||
}
|
||||
})?;
|
||||
let xlogrec = XLogRecord::from_buf(&recordbuf).map_err(|e| WalDecodeError {
|
||||
msg: format!("xlog record deserialization failed {}", e),
|
||||
lsn: self.lsn,
|
||||
})?;
|
||||
|
||||
let mut crc = 0;
|
||||
crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
||||
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
||||
if crc != xlogrec.xl_crc {
|
||||
if !wal_record_verify_checksum(&xlogrec, &recordbuf) {
|
||||
return Err(WalDecodeError {
|
||||
msg: "WAL record crc mismatch".into(),
|
||||
lsn: self.lsn,
|
||||
|
||||
@@ -477,6 +477,10 @@ impl XLogRecord {
|
||||
XLogRecord::des(buf)
|
||||
}
|
||||
|
||||
pub fn from_buf(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
|
||||
XLogRecord::from_slice(&buf[0..XLOG_SIZE_OF_XLOG_RECORD])
|
||||
}
|
||||
|
||||
pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
|
||||
use utils::bin_ser::LeSer;
|
||||
XLogRecord::des_from(&mut buf.reader())
|
||||
@@ -742,3 +746,11 @@ mod tests {
|
||||
assert_eq!(checkpoint.nextXid.value, 2048);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wal_record_verify_checksum(rec: &XLogRecord, recordbuf: &Bytes) -> bool {
|
||||
let mut crc = 0;
|
||||
crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
||||
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
||||
|
||||
crc == rec.xl_crc
|
||||
}
|
||||
|
||||
@@ -56,6 +56,7 @@ impl Conf {
|
||||
.new_pg_command("initdb")?
|
||||
.arg("-D")
|
||||
.arg(self.datadir.as_os_str())
|
||||
.arg("--data-checksums")
|
||||
.args(&["-U", "postgres", "--no-instructions", "--no-sync"])
|
||||
.output()?;
|
||||
debug!("initdb output: {:?}", output);
|
||||
|
||||
@@ -12,8 +12,10 @@ use std::{
|
||||
borrow::Cow,
|
||||
collections::HashMap,
|
||||
ffi::OsStr,
|
||||
fmt::Debug,
|
||||
num::{NonZeroU32, NonZeroUsize},
|
||||
path::{Path, PathBuf},
|
||||
pin::Pin,
|
||||
};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
@@ -70,11 +72,7 @@ pub trait RemoteStorage: Send + Sync {
|
||||
|
||||
/// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
|
||||
/// Returns the metadata, if any was stored with the file previously.
|
||||
async fn download(
|
||||
&self,
|
||||
from: &Self::RemoteObjectId,
|
||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
||||
) -> anyhow::Result<Option<StorageMetadata>>;
|
||||
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError>;
|
||||
|
||||
/// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
|
||||
/// Returns the metadata, if any was stored with the file previously.
|
||||
@@ -83,12 +81,49 @@ pub trait RemoteStorage: Send + Sync {
|
||||
from: &Self::RemoteObjectId,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
||||
) -> anyhow::Result<Option<StorageMetadata>>;
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
pub struct Download {
|
||||
pub download_stream: Pin<Box<dyn io::AsyncRead + Unpin + Send>>,
|
||||
/// Extra key-value data, associated with the current remote file.
|
||||
pub metadata: Option<StorageMetadata>,
|
||||
}
|
||||
|
||||
impl Debug for Download {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Download")
|
||||
.field("metadata", &self.metadata)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DownloadError {
|
||||
/// Validation or other error happened due to user input.
|
||||
BadInput(anyhow::Error),
|
||||
/// The file was not found in the remote storage.
|
||||
NotFound,
|
||||
/// The file was found in the remote storage, but the download failed.
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for DownloadError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
DownloadError::BadInput(e) => {
|
||||
write!(f, "Failed to download a remote file due to user input: {e}")
|
||||
}
|
||||
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
|
||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for DownloadError {}
|
||||
|
||||
/// Every storage, currently supported.
|
||||
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
|
||||
pub enum GenericRemoteStorage {
|
||||
@@ -180,7 +215,7 @@ pub struct S3Config {
|
||||
pub concurrency_limit: NonZeroUsize,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for S3Config {
|
||||
impl Debug for S3Config {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("S3Config")
|
||||
.field("bucket_name", &self.bucket_name)
|
||||
|
||||
@@ -17,7 +17,7 @@ use tokio::{
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
use crate::path_with_suffix_extension;
|
||||
use crate::{path_with_suffix_extension, Download, DownloadError};
|
||||
|
||||
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
|
||||
|
||||
@@ -192,15 +192,12 @@ impl RemoteStorage for LocalFs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download(
|
||||
&self,
|
||||
from: &Self::RemoteObjectId,
|
||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
||||
) -> anyhow::Result<Option<StorageMetadata>> {
|
||||
let file_path = self.resolve_in_storage(from)?;
|
||||
|
||||
if file_path.exists() && file_path.is_file() {
|
||||
let mut source = io::BufReader::new(
|
||||
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
|
||||
let file_path = self
|
||||
.resolve_in_storage(from)
|
||||
.map_err(DownloadError::BadInput)?;
|
||||
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
|
||||
let source = io::BufReader::new(
|
||||
fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(&file_path)
|
||||
@@ -210,22 +207,20 @@ impl RemoteStorage for LocalFs {
|
||||
"Failed to open source file '{}' to use in the download",
|
||||
file_path.display()
|
||||
)
|
||||
})?,
|
||||
})
|
||||
.map_err(DownloadError::Other)?,
|
||||
);
|
||||
io::copy(&mut source, to).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to download file '{}' from the local storage",
|
||||
file_path.display()
|
||||
)
|
||||
})?;
|
||||
source.flush().await?;
|
||||
|
||||
self.read_storage_metadata(&file_path).await
|
||||
let metadata = self
|
||||
.read_storage_metadata(&file_path)
|
||||
.await
|
||||
.map_err(DownloadError::Other)?;
|
||||
Ok(Download {
|
||||
metadata,
|
||||
download_stream: Box::pin(source),
|
||||
})
|
||||
} else {
|
||||
bail!(
|
||||
"File '{}' either does not exist or is not a file",
|
||||
file_path.display()
|
||||
)
|
||||
Err(DownloadError::NotFound)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,22 +229,19 @@ impl RemoteStorage for LocalFs {
|
||||
from: &Self::RemoteObjectId,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
||||
) -> anyhow::Result<Option<StorageMetadata>> {
|
||||
) -> Result<Download, DownloadError> {
|
||||
if let Some(end_exclusive) = end_exclusive {
|
||||
ensure!(
|
||||
end_exclusive > start_inclusive,
|
||||
"Invalid range, start ({}) is bigger then end ({:?})",
|
||||
start_inclusive,
|
||||
end_exclusive
|
||||
);
|
||||
if end_exclusive <= start_inclusive {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
|
||||
};
|
||||
if start_inclusive == end_exclusive.saturating_sub(1) {
|
||||
return Ok(None);
|
||||
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
|
||||
}
|
||||
}
|
||||
let file_path = self.resolve_in_storage(from)?;
|
||||
|
||||
if file_path.exists() && file_path.is_file() {
|
||||
let file_path = self
|
||||
.resolve_in_storage(from)
|
||||
.map_err(DownloadError::BadInput)?;
|
||||
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
|
||||
let mut source = io::BufReader::new(
|
||||
fs::OpenOptions::new()
|
||||
.read(true)
|
||||
@@ -260,31 +252,31 @@ impl RemoteStorage for LocalFs {
|
||||
"Failed to open source file '{}' to use in the download",
|
||||
file_path.display()
|
||||
)
|
||||
})?,
|
||||
})
|
||||
.map_err(DownloadError::Other)?,
|
||||
);
|
||||
source
|
||||
.seek(io::SeekFrom::Start(start_inclusive))
|
||||
.await
|
||||
.context("Failed to seek to the range start in a local storage file")?;
|
||||
match end_exclusive {
|
||||
Some(end_exclusive) => {
|
||||
io::copy(&mut source.take(end_exclusive - start_inclusive), to).await
|
||||
}
|
||||
None => io::copy(&mut source, to).await,
|
||||
}
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to download file '{}' range from the local storage",
|
||||
file_path.display()
|
||||
)
|
||||
})?;
|
||||
.context("Failed to seek to the range start in a local storage file")
|
||||
.map_err(DownloadError::Other)?;
|
||||
let metadata = self
|
||||
.read_storage_metadata(&file_path)
|
||||
.await
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
self.read_storage_metadata(&file_path).await
|
||||
Ok(match end_exclusive {
|
||||
Some(end_exclusive) => Download {
|
||||
metadata,
|
||||
download_stream: Box::pin(source.take(end_exclusive - start_inclusive)),
|
||||
},
|
||||
None => Download {
|
||||
metadata,
|
||||
download_stream: Box::pin(source),
|
||||
},
|
||||
})
|
||||
} else {
|
||||
bail!(
|
||||
"File '{}' either does not exist or is not a file",
|
||||
file_path.display()
|
||||
)
|
||||
Err(DownloadError::NotFound)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -352,6 +344,19 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn file_exists(file_path: &Path) -> anyhow::Result<bool> {
|
||||
if file_path.exists() {
|
||||
ensure!(
|
||||
file_path.is_file(),
|
||||
"file path '{}' is not a file",
|
||||
file_path.display()
|
||||
);
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod pure_tests {
|
||||
use tempfile::tempdir;
|
||||
@@ -518,6 +523,31 @@ mod fs_tests {
|
||||
use std::{collections::HashMap, io::Write};
|
||||
use tempfile::tempdir;
|
||||
|
||||
async fn read_and_assert_remote_file_contents(
|
||||
storage: &LocalFs,
|
||||
#[allow(clippy::ptr_arg)]
|
||||
// have to use &PathBuf due to `storage.local_path` parameter requirements
|
||||
remote_storage_path: &PathBuf,
|
||||
expected_metadata: Option<&StorageMetadata>,
|
||||
) -> anyhow::Result<String> {
|
||||
let mut download = storage
|
||||
.download(remote_storage_path)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
|
||||
ensure!(
|
||||
download.metadata.as_ref() == expected_metadata,
|
||||
"Unexpected metadata returned for the downloaded file"
|
||||
);
|
||||
|
||||
let mut contents = String::new();
|
||||
download
|
||||
.download_stream
|
||||
.read_to_string(&mut contents)
|
||||
.await
|
||||
.context("Failed to read remote file contents into string")?;
|
||||
Ok(contents)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn upload_file() -> anyhow::Result<()> {
|
||||
let workdir = tempdir()?.path().to_owned();
|
||||
@@ -568,15 +598,7 @@ mod fs_tests {
|
||||
let upload_name = "upload_1";
|
||||
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
|
||||
|
||||
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let metadata = storage.download(&upload_target, &mut content_bytes).await?;
|
||||
assert!(
|
||||
metadata.is_none(),
|
||||
"No metadata should be returned for no metadata upload"
|
||||
);
|
||||
|
||||
content_bytes.flush().await?;
|
||||
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
|
||||
let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
|
||||
assert_eq!(
|
||||
dummy_contents(upload_name),
|
||||
contents,
|
||||
@@ -584,13 +606,9 @@ mod fs_tests {
|
||||
);
|
||||
|
||||
let non_existing_path = PathBuf::from("somewhere").join("else");
|
||||
match storage.download(&non_existing_path, &mut io::sink()).await {
|
||||
Ok(_) => panic!("Should not allow downloading non-existing storage files"),
|
||||
Err(e) => {
|
||||
let error_string = e.to_string();
|
||||
assert!(error_string.contains("does not exist"));
|
||||
assert!(error_string.contains(&non_existing_path.display().to_string()));
|
||||
}
|
||||
match storage.download(&non_existing_path).await {
|
||||
Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
|
||||
other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -603,58 +621,31 @@ mod fs_tests {
|
||||
let upload_name = "upload_1";
|
||||
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
|
||||
|
||||
let mut full_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let metadata = storage
|
||||
.download_byte_range(&upload_target, 0, None, &mut full_range_bytes)
|
||||
.await?;
|
||||
assert!(
|
||||
metadata.is_none(),
|
||||
"No metadata should be returned for no metadata upload"
|
||||
);
|
||||
full_range_bytes.flush().await?;
|
||||
let full_range_download_contents =
|
||||
read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
|
||||
assert_eq!(
|
||||
dummy_contents(upload_name),
|
||||
String::from_utf8(full_range_bytes.into_inner().into_inner())?,
|
||||
full_range_download_contents,
|
||||
"Download full range should return the whole upload"
|
||||
);
|
||||
|
||||
let mut zero_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let same_byte = 1_000_000_000;
|
||||
let metadata = storage
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
same_byte,
|
||||
Some(same_byte + 1), // exclusive end
|
||||
&mut zero_range_bytes,
|
||||
)
|
||||
.await?;
|
||||
assert!(
|
||||
metadata.is_none(),
|
||||
"No metadata should be returned for no metadata upload"
|
||||
);
|
||||
zero_range_bytes.flush().await?;
|
||||
assert!(
|
||||
zero_range_bytes.into_inner().into_inner().is_empty(),
|
||||
"Zero byte range should not download any part of the file"
|
||||
);
|
||||
|
||||
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
|
||||
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
|
||||
|
||||
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let metadata = storage
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
0,
|
||||
Some(first_part_local.len() as u64),
|
||||
&mut first_part_remote,
|
||||
)
|
||||
let mut first_part_download = storage
|
||||
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
|
||||
.await?;
|
||||
assert!(
|
||||
metadata.is_none(),
|
||||
first_part_download.metadata.is_none(),
|
||||
"No metadata should be returned for no metadata upload"
|
||||
);
|
||||
|
||||
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
io::copy(
|
||||
&mut first_part_download.download_stream,
|
||||
&mut first_part_remote,
|
||||
)
|
||||
.await?;
|
||||
first_part_remote.flush().await?;
|
||||
let first_part_remote = first_part_remote.into_inner().into_inner();
|
||||
assert_eq!(
|
||||
@@ -663,20 +654,24 @@ mod fs_tests {
|
||||
"First part bytes should be returned when requested"
|
||||
);
|
||||
|
||||
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let metadata = storage
|
||||
let mut second_part_download = storage
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
first_part_local.len() as u64,
|
||||
Some((first_part_local.len() + second_part_local.len()) as u64),
|
||||
&mut second_part_remote,
|
||||
)
|
||||
.await?;
|
||||
assert!(
|
||||
metadata.is_none(),
|
||||
second_part_download.metadata.is_none(),
|
||||
"No metadata should be returned for no metadata upload"
|
||||
);
|
||||
|
||||
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
io::copy(
|
||||
&mut second_part_download.download_stream,
|
||||
&mut second_part_remote,
|
||||
)
|
||||
.await?;
|
||||
second_part_remote.flush().await?;
|
||||
let second_part_remote = second_part_remote.into_inner().into_inner();
|
||||
assert_eq!(
|
||||
@@ -696,11 +691,30 @@ mod fs_tests {
|
||||
let upload_name = "upload_1";
|
||||
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
|
||||
|
||||
let start = 1_000_000_000;
|
||||
let end = start + 1;
|
||||
match storage
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
start,
|
||||
Some(end), // exclusive end
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("Should not allow downloading wrong ranges"),
|
||||
Err(e) => {
|
||||
let error_string = e.to_string();
|
||||
assert!(error_string.contains("zero bytes"));
|
||||
assert!(error_string.contains(&start.to_string()));
|
||||
assert!(error_string.contains(&end.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let start = 10000;
|
||||
let end = 234;
|
||||
assert!(start > end, "Should test an incorrect range");
|
||||
match storage
|
||||
.download_byte_range(&upload_target, start, Some(end), &mut io::sink())
|
||||
.download_byte_range(&upload_target, start, Some(end))
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("Should not allow downloading wrong ranges"),
|
||||
@@ -712,18 +726,6 @@ mod fs_tests {
|
||||
}
|
||||
}
|
||||
|
||||
let non_existing_path = PathBuf::from("somewhere").join("else");
|
||||
match storage
|
||||
.download_byte_range(&non_existing_path, 1, Some(3), &mut io::sink())
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("Should not allow downloading non-existing storage file ranges"),
|
||||
Err(e) => {
|
||||
let error_string = e.to_string();
|
||||
assert!(error_string.contains("does not exist"));
|
||||
assert!(error_string.contains(&non_existing_path.display().to_string()));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -762,35 +764,26 @@ mod fs_tests {
|
||||
let upload_target =
|
||||
upload_dummy_file(&workdir, &storage, upload_name, Some(metadata.clone())).await?;
|
||||
|
||||
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let full_download_metadata = storage.download(&upload_target, &mut content_bytes).await?;
|
||||
|
||||
content_bytes.flush().await?;
|
||||
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
|
||||
let full_range_download_contents =
|
||||
read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
|
||||
assert_eq!(
|
||||
dummy_contents(upload_name),
|
||||
contents,
|
||||
full_range_download_contents,
|
||||
"We should upload and download the same contents"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
full_download_metadata.as_ref(),
|
||||
Some(&metadata),
|
||||
"We should get the same metadata back for full download"
|
||||
);
|
||||
|
||||
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
|
||||
let (first_part_local, _) = uploaded_bytes.split_at(3);
|
||||
|
||||
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let partial_download_metadata = storage
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
0,
|
||||
Some(first_part_local.len() as u64),
|
||||
&mut first_part_remote,
|
||||
)
|
||||
let mut partial_download_with_metadata = storage
|
||||
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
|
||||
.await?;
|
||||
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
io::copy(
|
||||
&mut partial_download_with_metadata.download_stream,
|
||||
&mut first_part_remote,
|
||||
)
|
||||
.await?;
|
||||
first_part_remote.flush().await?;
|
||||
let first_part_remote = first_part_remote.into_inner().into_inner();
|
||||
assert_eq!(
|
||||
@@ -800,8 +793,8 @@ mod fs_tests {
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
partial_download_metadata.as_ref(),
|
||||
Some(&metadata),
|
||||
partial_download_with_metadata.metadata,
|
||||
Some(metadata),
|
||||
"We should get the same metadata back for partial download"
|
||||
);
|
||||
|
||||
@@ -843,7 +836,7 @@ mod fs_tests {
|
||||
}
|
||||
|
||||
fn dummy_contents(name: &str) -> String {
|
||||
format!("contents for {}", name)
|
||||
format!("contents for {name}")
|
||||
}
|
||||
|
||||
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<PathBuf>> {
|
||||
|
||||
@@ -9,17 +9,17 @@ use std::path::{Path, PathBuf};
|
||||
use anyhow::Context;
|
||||
use rusoto_core::{
|
||||
credential::{InstanceMetadataProvider, StaticProvider},
|
||||
HttpClient, Region,
|
||||
HttpClient, Region, RusotoError,
|
||||
};
|
||||
use rusoto_s3::{
|
||||
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client,
|
||||
StreamingBody, S3,
|
||||
DeleteObjectRequest, GetObjectError, GetObjectRequest, ListObjectsV2Request, PutObjectRequest,
|
||||
S3Client, StreamingBody, S3,
|
||||
};
|
||||
use tokio::{io, sync::Semaphore};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{strip_path_prefix, RemoteStorage, S3Config};
|
||||
use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config};
|
||||
|
||||
use super::StorageMetadata;
|
||||
|
||||
@@ -187,6 +187,39 @@ impl S3Bucket {
|
||||
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
|
||||
})
|
||||
}
|
||||
|
||||
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 download")
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
metrics::inc_get_object();
|
||||
|
||||
match self.client.get_object(request).await {
|
||||
Ok(object_output) => match object_output.body {
|
||||
None => {
|
||||
metrics::inc_get_object_fail();
|
||||
Err(DownloadError::Other(anyhow::anyhow!(
|
||||
"Got no body for the S3 object given"
|
||||
)))
|
||||
}
|
||||
Some(body) => Ok(Download {
|
||||
metadata: object_output.metadata.map(StorageMetadata),
|
||||
download_stream: Box::pin(io::BufReader::new(body.into_async_read())),
|
||||
}),
|
||||
},
|
||||
Err(RusotoError::Service(GetObjectError::NoSuchKey(_))) => Err(DownloadError::NotFound),
|
||||
Err(e) => {
|
||||
metrics::inc_get_object_fail();
|
||||
Err(DownloadError::Other(anyhow::anyhow!(
|
||||
"Failed to download S3 object: {e}"
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -283,38 +316,13 @@ impl RemoteStorage for S3Bucket {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download(
|
||||
&self,
|
||||
from: &Self::RemoteObjectId,
|
||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
||||
) -> anyhow::Result<Option<StorageMetadata>> {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 download")?;
|
||||
|
||||
metrics::inc_get_object();
|
||||
|
||||
let object_output = self
|
||||
.client
|
||||
.get_object(GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: from.key().to_owned(),
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_get_object_fail();
|
||||
e
|
||||
})?;
|
||||
|
||||
if let Some(body) = object_output.body {
|
||||
let mut from = io::BufReader::new(body.into_async_read());
|
||||
io::copy(&mut from, to).await?;
|
||||
}
|
||||
|
||||
Ok(object_output.metadata.map(StorageMetadata))
|
||||
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
|
||||
self.download_object(GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: from.key().to_owned(),
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn download_byte_range(
|
||||
@@ -322,8 +330,7 @@ impl RemoteStorage for S3Bucket {
|
||||
from: &Self::RemoteObjectId,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
||||
) -> anyhow::Result<Option<StorageMetadata>> {
|
||||
) -> Result<Download, DownloadError> {
|
||||
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
|
||||
// and needs both ends to be exclusive
|
||||
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
|
||||
@@ -331,34 +338,14 @@ impl RemoteStorage for S3Bucket {
|
||||
Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive),
|
||||
None => format!("bytes={}-", start_inclusive),
|
||||
});
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 range download")?;
|
||||
|
||||
metrics::inc_get_object();
|
||||
|
||||
let object_output = self
|
||||
.client
|
||||
.get_object(GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: from.key().to_owned(),
|
||||
range,
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_get_object_fail();
|
||||
e
|
||||
})?;
|
||||
|
||||
if let Some(body) = object_output.body {
|
||||
let mut from = io::BufReader::new(body.into_async_read());
|
||||
io::copy(&mut from, to).await?;
|
||||
}
|
||||
|
||||
Ok(object_output.metadata.map(StorageMetadata))
|
||||
self.download_object(GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: from.key().to_owned(),
|
||||
range,
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#![allow(unused)]
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
|
||||
use utils::pg_checksum_page::pg_checksum_page;
|
||||
use utils::zid;
|
||||
|
||||
pub fn bench_zid_stringify(c: &mut Criterion) {
|
||||
@@ -18,5 +18,20 @@ pub fn bench_zid_stringify(c: &mut Criterion) {
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_zid_stringify);
|
||||
// NB: adding `black_box` around arguments doesn't seem to change anything.
|
||||
pub fn pg_checksum_page_basic(c: &mut Criterion) {
|
||||
const BLCKSZ: usize = 8192;
|
||||
let mut page: [u8; BLCKSZ] = [0; BLCKSZ];
|
||||
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ) {
|
||||
*byte = i as u8;
|
||||
}
|
||||
|
||||
c.bench_function("pg_checksum_page_basic", |b| {
|
||||
b.iter(|| {
|
||||
unsafe { pg_checksum_page(&page[..], 0) };
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, pg_checksum_page_basic, bench_zid_stringify);
|
||||
criterion_main!(benches);
|
||||
|
||||
@@ -5,7 +5,7 @@ DATA_DIR=$3
|
||||
PORT=$4
|
||||
SYSID=`od -A n -j 24 -N 8 -t d8 $WAL_PATH/000000010000000000000002* | cut -c 3-`
|
||||
rm -fr $DATA_DIR
|
||||
env -i LD_LIBRARY_PATH=$PG_BIN/../lib $PG_BIN/initdb -E utf8 -U cloud_admin -D $DATA_DIR --sysid=$SYSID
|
||||
env -i LD_LIBRARY_PATH=$PG_BIN/../lib $PG_BIN/initdb -E utf8 -U cloud_admin -D $DATA_DIR --data-checksums --sysid=$SYSID
|
||||
echo port=$PORT >> $DATA_DIR/postgresql.conf
|
||||
REDO_POS=0x`$PG_BIN/pg_controldata -D $DATA_DIR | fgrep "REDO location"| cut -c 42-`
|
||||
declare -i WAL_SIZE=$REDO_POS+114
|
||||
|
||||
@@ -54,6 +54,9 @@ pub mod nonblock;
|
||||
// Default signal handling
|
||||
pub mod signals;
|
||||
|
||||
// Postgres checksum calculation
|
||||
pub mod pg_checksum_page;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
|
||||
136
libs/utils/src/pg_checksum_page.rs
Normal file
136
libs/utils/src/pg_checksum_page.rs
Normal file
@@ -0,0 +1,136 @@
|
||||
///
|
||||
/// Rust implementation of Postgres pg_checksum_page
|
||||
/// See: https://github.com/postgres/postgres/blob/88210542106de5b26fe6aa088d1811b68502d224/src/include/storage/checksum_impl.h
|
||||
/// for additional comments.
|
||||
///
|
||||
/// This is not a direct port of pg_checksum_page from Postgres, though.
|
||||
/// For example, in the current state it can only produce a valid result
|
||||
/// on the little-endian platform and with the standard 8 KB page size.
|
||||
///
|
||||
|
||||
const BLCKSZ: usize = 8192;
|
||||
const N_SUMS: usize = 32;
|
||||
// Prime multiplier of FNV-1a hash
|
||||
const FNV_PRIME: u32 = 16777619;
|
||||
|
||||
// Base offsets to initialize each of the parallel FNV hashes into a
|
||||
// different initial state.
|
||||
const CHECKSUM_BASE_OFFSETS: [u32; N_SUMS] = [
|
||||
0x5B1F36E9, 0xB8525960, 0x02AB50AA, 0x1DE66D2A, 0x79FF467A, 0x9BB9F8A3, 0x217E7CD2, 0x83E13D2C,
|
||||
0xF8D4474F, 0xE39EB970, 0x42C6AE16, 0x993216FA, 0x7B093B5D, 0x98DAFF3C, 0xF718902A, 0x0B1C9CDB,
|
||||
0xE58F764B, 0x187636BC, 0x5D7B3BB1, 0xE73DE7DE, 0x92BEC979, 0xCCA6C0B2, 0x304A0979, 0x85AA43D4,
|
||||
0x783125BB, 0x6CA8EAA2, 0xE407EAC6, 0x4B5CFC3E, 0x9FBF8C76, 0x15CA20BE, 0xF2CA9FD3, 0x959BD756,
|
||||
];
|
||||
|
||||
// Calculate one round of the checksum.
|
||||
fn checksum_comp(checksum: u32, value: u32) -> u32 {
|
||||
let tmp = checksum ^ value;
|
||||
tmp.wrapping_mul(FNV_PRIME) ^ (tmp >> 17)
|
||||
}
|
||||
|
||||
/// Compute the checksum for a Postgres page.
|
||||
///
|
||||
/// The page must be adequately aligned (at least on a 4-byte boundary).
|
||||
///
|
||||
/// The checksum includes the block number (to detect the case where a page is
|
||||
/// somehow moved to a different location), the page header (excluding the
|
||||
/// checksum itself), and the page data.
|
||||
///
|
||||
/// As in C implementation in Postgres, the checksum attribute on the page is
|
||||
/// excluded from the calculation and preserved.
|
||||
///
|
||||
/// NB: after doing any modifications run `cargo bench`. The baseline on the more
|
||||
/// or less recent Intel laptop is around 700ns. If it's significantly higher,
|
||||
/// then it's worth looking into.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `data` - the page to checksum
|
||||
/// * `blkno` - the block number of the page
|
||||
///
|
||||
/// # Safety
|
||||
/// This function is safe to call only if:
|
||||
/// * `data` is strictly a standard 8 KB Postgres page
|
||||
/// * it's called on the little-endian platform
|
||||
pub unsafe fn pg_checksum_page(data: &[u8], blkno: u32) -> u16 {
|
||||
let page = std::mem::transmute::<&[u8], &[u32]>(data);
|
||||
let mut checksum: u32 = 0;
|
||||
let mut sums = CHECKSUM_BASE_OFFSETS;
|
||||
|
||||
// Calculate the checksum of the first 'row' of the page. Do it separately as
|
||||
// we do an expensive comparison here, which is not required for the rest of the
|
||||
// page. Putting it into the main loop slows it down ~3 times.
|
||||
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
|
||||
// Third 32-bit chunk of the page contains the checksum in the lower half
|
||||
// (assuming we are on little-endian machine), which we need to zero out.
|
||||
// See also `PageHeaderData` for reference.
|
||||
let chunk: u32 = if j == 2 {
|
||||
page[j] & 0xFFFF_0000
|
||||
} else {
|
||||
page[j]
|
||||
};
|
||||
|
||||
*sum = checksum_comp(*sum, chunk);
|
||||
}
|
||||
|
||||
// Main checksum calculation loop
|
||||
for i in 1..(BLCKSZ / (4 * N_SUMS)) {
|
||||
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
|
||||
*sum = checksum_comp(*sum, page[i * N_SUMS + j]);
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, add in two rounds of zeroes for additional mixing
|
||||
for _i in 0..2 {
|
||||
for s in sums.iter_mut().take(N_SUMS) {
|
||||
*s = checksum_comp(*s, 0);
|
||||
}
|
||||
}
|
||||
|
||||
// Xor fold partial checksums together
|
||||
for sum in sums {
|
||||
checksum ^= sum;
|
||||
}
|
||||
|
||||
// Mix in the block number to detect transposed pages
|
||||
checksum ^= blkno;
|
||||
|
||||
// Reduce to a uint16 (to fit in the pd_checksum field) with an offset of
|
||||
// one. That avoids checksums of zero, which seems like a good idea.
|
||||
((checksum % 65535) + 1) as u16
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{pg_checksum_page, BLCKSZ};
|
||||
|
||||
#[test]
|
||||
fn page_with_and_without_checksum() {
|
||||
// Create a page with some content and without a correct checksum.
|
||||
let mut page: [u8; BLCKSZ] = [0; BLCKSZ];
|
||||
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ) {
|
||||
*byte = i as u8;
|
||||
}
|
||||
|
||||
// Calculate the checksum.
|
||||
let checksum = unsafe { pg_checksum_page(&page[..], 0) };
|
||||
|
||||
// Zero the checksum attribute on the page.
|
||||
page[8..10].copy_from_slice(&[0u8; 2]);
|
||||
|
||||
// Calculate the checksum again, should be the same.
|
||||
let new_checksum = unsafe { pg_checksum_page(&page[..], 0) };
|
||||
assert_eq!(checksum, new_checksum);
|
||||
|
||||
// Set the correct checksum into the page.
|
||||
page[8..10].copy_from_slice(&checksum.to_le_bytes());
|
||||
|
||||
// Calculate the checksum again, should be the same.
|
||||
let new_checksum = unsafe { pg_checksum_page(&page[..], 0) };
|
||||
assert_eq!(checksum, new_checksum);
|
||||
|
||||
// Check that we protect from the page transposition, i.e. page is the
|
||||
// same, but in the wrong place.
|
||||
let wrong_blockno_checksum = unsafe { pg_checksum_page(&page[..], 1) };
|
||||
assert_ne!(checksum, wrong_blockno_checksum);
|
||||
}
|
||||
}
|
||||
@@ -38,6 +38,7 @@ pub struct TenantCreateRequest {
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub data_checksums_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
|
||||
@@ -494,6 +494,8 @@ components:
|
||||
type: string
|
||||
compaction_threshold:
|
||||
type: string
|
||||
data_checksums_enabled:
|
||||
type: boolean
|
||||
TenantConfigInfo:
|
||||
type: object
|
||||
properties:
|
||||
|
||||
@@ -412,6 +412,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
tenant_conf.compaction_target_size = request_data.compaction_target_size;
|
||||
tenant_conf.compaction_threshold = request_data.compaction_threshold;
|
||||
|
||||
// Turn on data checksums for all new tenants
|
||||
tenant_conf.data_checksums_enabled = Some(request_data.data_checksums_enabled.unwrap_or(true));
|
||||
|
||||
if let Some(compaction_period) = request_data.compaction_period {
|
||||
tenant_conf.compaction_period =
|
||||
Some(humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?);
|
||||
|
||||
@@ -516,10 +516,23 @@ pub fn import_file<R: Repository, Reader: Read>(
|
||||
// Parse zenith signal file to set correct previous LSN
|
||||
let bytes = read_all_bytes(reader)?;
|
||||
// zenith.signal format is "PREV LSN: prev_lsn"
|
||||
let zenith_signal = std::str::from_utf8(&bytes)?;
|
||||
let zenith_signal = zenith_signal.split(':').collect::<Vec<_>>();
|
||||
let prev_lsn = zenith_signal[1].trim().parse::<Lsn>()?;
|
||||
// TODO write serialization and deserialization in the same place.
|
||||
let zenith_signal = std::str::from_utf8(&bytes)?.trim();
|
||||
let prev_lsn = match zenith_signal {
|
||||
"PREV LSN: none" => Lsn(0),
|
||||
"PREV LSN: invalid" => Lsn(0),
|
||||
other => {
|
||||
let split = other.split(':').collect::<Vec<_>>();
|
||||
split[1]
|
||||
.trim()
|
||||
.parse::<Lsn>()
|
||||
.context("can't parse zenith.signal")?
|
||||
}
|
||||
};
|
||||
|
||||
// zenith.signal is not necessarily the last file, that we handle
|
||||
// but it is ok to call `finish_write()`, because final `modification.commit()`
|
||||
// will update lsn once more to the final one.
|
||||
let writer = modification.tline.tline.writer();
|
||||
writer.finish_write(prev_lsn);
|
||||
|
||||
|
||||
@@ -232,23 +232,32 @@ impl Repository for LayeredRepository {
|
||||
|
||||
fn create_empty_timeline(
|
||||
&self,
|
||||
timelineid: ZTimelineId,
|
||||
timeline_id: ZTimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
) -> Result<Arc<LayeredTimeline>> {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let vacant_timeline_entry = match timelines.entry(timeline_id) {
|
||||
Entry::Occupied(_) => bail!("Timeline already exists"),
|
||||
Entry::Vacant(vacant_entry) => vacant_entry,
|
||||
};
|
||||
|
||||
let timeline_path = self.conf.timeline_path(&timeline_id, &self.tenant_id);
|
||||
if timeline_path.exists() {
|
||||
bail!("Timeline directory already exists, but timeline is missing in repository map. This is a bug.")
|
||||
}
|
||||
|
||||
// Create the timeline directory, and write initial metadata to file.
|
||||
crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenant_id))?;
|
||||
crashsafe_dir::create_dir_all(timeline_path)?;
|
||||
|
||||
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn);
|
||||
Self::save_metadata(self.conf, timelineid, self.tenant_id, &metadata, true)?;
|
||||
Self::save_metadata(self.conf, timeline_id, self.tenant_id, &metadata, true)?;
|
||||
|
||||
let timeline = LayeredTimeline::new(
|
||||
self.conf,
|
||||
Arc::clone(&self.tenant_conf),
|
||||
metadata,
|
||||
None,
|
||||
timelineid,
|
||||
timeline_id,
|
||||
self.tenant_id,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
self.upload_layers,
|
||||
@@ -257,12 +266,7 @@ impl Repository for LayeredRepository {
|
||||
|
||||
// Insert if not exists
|
||||
let timeline = Arc::new(timeline);
|
||||
match timelines.entry(timelineid) {
|
||||
Entry::Occupied(_) => bail!("Timeline already exists"),
|
||||
Entry::Vacant(vacant) => {
|
||||
vacant.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)))
|
||||
}
|
||||
};
|
||||
vacant_timeline_entry.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)));
|
||||
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -951,7 +951,10 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
|
||||
match self.handle_import_basebackup(pgb, tenant, timeline, base_lsn, end_lsn) {
|
||||
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
|
||||
Err(e) => pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?,
|
||||
Err(e) => {
|
||||
error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?
|
||||
}
|
||||
};
|
||||
} else if query_string.starts_with("import wal ") {
|
||||
// Import the `pg_wal` section of a basebackup.
|
||||
@@ -970,7 +973,10 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
|
||||
match self.handle_import_wal(pgb, tenant, timeline, start_lsn, end_lsn) {
|
||||
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
|
||||
Err(e) => pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?,
|
||||
Err(e) => {
|
||||
error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?
|
||||
}
|
||||
};
|
||||
} else if query_string.to_ascii_lowercase().starts_with("set ") {
|
||||
// important because psycopg2 executes "SET datestyle TO 'ISO'"
|
||||
|
||||
@@ -225,7 +225,7 @@ pub trait Repository: Send + Sync {
|
||||
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
|
||||
fn create_empty_timeline(
|
||||
&self,
|
||||
timelineid: ZTimelineId,
|
||||
timeline_id: ZTimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
) -> Result<Arc<Self::Timeline>>;
|
||||
|
||||
@@ -473,6 +473,7 @@ pub mod repo_harness {
|
||||
walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
|
||||
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
|
||||
data_checksums_enabled: Some(tenant_conf.data_checksums_enabled),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -636,6 +637,19 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_duplicate_timelines() -> Result<()> {
|
||||
let repo = RepoHarness::create("no_duplicate_timelines")?.load();
|
||||
let _ = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
|
||||
match repo.create_empty_timeline(TIMELINE_ID, Lsn(0)) {
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
Err(e) => assert_eq!(e.to_string(), "Timeline already exists"),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Convenience function to create a page image with given string as the only content
|
||||
pub fn test_value(s: &str) -> Value {
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
@@ -44,13 +44,23 @@ where
|
||||
index_part_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
let mut index_part_download =
|
||||
storage
|
||||
.download(&part_storage_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to open download stream for for storage path {part_storage_path:?}")
|
||||
})?;
|
||||
let mut index_part_bytes = Vec::new();
|
||||
storage
|
||||
.download(&part_storage_path, &mut index_part_bytes)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to download an index part from storage path {part_storage_path:?}")
|
||||
})?;
|
||||
io::copy(
|
||||
&mut index_part_download.download_stream,
|
||||
&mut index_part_bytes,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to download an index part from storage path {part_storage_path:?}")
|
||||
})?;
|
||||
|
||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| {
|
||||
format!("Failed to deserialize index part file from storage path '{part_storage_path:?}'")
|
||||
@@ -162,15 +172,19 @@ where
|
||||
temp_file_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
storage
|
||||
.download(&layer_storage_path, &mut destination_file)
|
||||
let mut download = storage
|
||||
.download(&layer_storage_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to download a layer from storage path '{layer_storage_path:?}'"
|
||||
"Failed to open a download stream for layer with remote storage path '{layer_storage_path:?}'"
|
||||
)
|
||||
})?;
|
||||
io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to download layer with remote storage path '{layer_storage_path:?}' into file '{}'", temp_file_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
|
||||
// A file will not be closed immediately when it goes out of scope if there are any IO operations
|
||||
|
||||
@@ -38,6 +38,10 @@ pub mod defaults {
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
|
||||
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
|
||||
|
||||
// Turn off data checksums by default to do not affect old tenants.
|
||||
// We turn it on explicitly for all new tenants.
|
||||
pub const DEFAULT_DATA_CHECKSUMS: bool = false;
|
||||
}
|
||||
|
||||
/// Per-tenant configuration options
|
||||
@@ -83,6 +87,7 @@ pub struct TenantConf {
|
||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub data_checksums_enabled: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -105,6 +110,7 @@ pub struct TenantConfOpt {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Option<Duration>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub data_checksums_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -135,6 +141,9 @@ impl TenantConfOpt {
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(global_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
|
||||
data_checksums_enabled: self
|
||||
.data_checksums_enabled
|
||||
.unwrap_or(global_conf.data_checksums_enabled),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,6 +181,9 @@ impl TenantConfOpt {
|
||||
if let Some(max_lsn_wal_lag) = other.max_lsn_wal_lag {
|
||||
self.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||
}
|
||||
if let Some(data_checksums_enabled) = other.data_checksums_enabled {
|
||||
self.data_checksums_enabled = Some(data_checksums_enabled);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,6 +211,7 @@ impl TenantConf {
|
||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
data_checksums_enabled: DEFAULT_DATA_CHECKSUMS,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -229,6 +242,7 @@ impl TenantConf {
|
||||
.unwrap(),
|
||||
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.unwrap(),
|
||||
data_checksums_enabled: defaults::DEFAULT_DATA_CHECKSUMS,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::tenant_config::TenantConfOpt;
|
||||
use crate::thread_mgr::ThreadKind;
|
||||
use crate::timelines::CreateRepo;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::{thread_mgr, timelines, walreceiver};
|
||||
use crate::{tenant_config, thread_mgr, timelines, walreceiver};
|
||||
use crate::{DatadirTimelineImpl, RepositoryImpl};
|
||||
use anyhow::{bail, Context};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -266,7 +266,14 @@ pub fn create_tenant_repository(
|
||||
Ok(None)
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
|
||||
let data_checksums_enabled = tenant_conf
|
||||
.data_checksums_enabled
|
||||
.unwrap_or(tenant_config::defaults::DEFAULT_DATA_CHECKSUMS);
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(
|
||||
conf,
|
||||
data_checksums_enabled,
|
||||
tenant_id,
|
||||
));
|
||||
let repo = timelines::create_repo(
|
||||
conf,
|
||||
tenant_conf,
|
||||
@@ -567,10 +574,16 @@ fn load_local_repo(
|
||||
tenant_id: ZTenantId,
|
||||
remote_index: &RemoteIndex,
|
||||
) -> anyhow::Result<Arc<RepositoryImpl>> {
|
||||
// Restore tenant config
|
||||
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
|
||||
|
||||
let mut m = tenants_state::write_tenants();
|
||||
let tenant = m.entry(tenant_id).or_insert_with(|| {
|
||||
let data_checksums_enabled = tenant_conf
|
||||
.data_checksums_enabled
|
||||
.unwrap_or(tenant_config::defaults::DEFAULT_DATA_CHECKSUMS);
|
||||
// Set up a WAL redo manager, for applying WAL records.
|
||||
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
|
||||
let walredo_mgr = PostgresRedoManager::new(conf, data_checksums_enabled, tenant_id);
|
||||
|
||||
// Set up an object repository, for actual data storage.
|
||||
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
|
||||
@@ -588,8 +601,6 @@ fn load_local_repo(
|
||||
}
|
||||
});
|
||||
|
||||
// Restore tenant config
|
||||
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
|
||||
tenant.repo.update_tenant_config(tenant_conf)?;
|
||||
|
||||
Ok(Arc::clone(&tenant.repo))
|
||||
|
||||
@@ -119,8 +119,6 @@ pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
pub fn init_tenant_task_pool() -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("tenant-task-worker")
|
||||
.worker_threads(40) // Way more than necessary
|
||||
.max_blocking_threads(100) // Way more than necessary
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
|
||||
@@ -253,6 +253,7 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
|
||||
.args(&["-D", &initdbpath.to_string_lossy()])
|
||||
.args(&["-U", &conf.superuser])
|
||||
.args(&["-E", "utf8"])
|
||||
.arg("--data-checksums")
|
||||
.arg("--no-instructions")
|
||||
// This is only used for a temporary installation that is deleted shortly after,
|
||||
// so no need to fsync it
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
use anyhow::Context;
|
||||
use postgres_ffi::nonrelfile_utils::clogpage_precedes;
|
||||
use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment;
|
||||
use postgres_ffi::{page_is_new, page_set_lsn};
|
||||
use postgres_ffi::{page_is_new, page_set_checksum, page_set_lsn};
|
||||
|
||||
use anyhow::Result;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
@@ -313,6 +313,8 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
if !page_is_new(&image) {
|
||||
page_set_lsn(&mut image, lsn)
|
||||
}
|
||||
unsafe { page_set_checksum(&mut image, blk.blkno) };
|
||||
|
||||
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
|
||||
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())?;
|
||||
} else {
|
||||
|
||||
@@ -91,7 +91,6 @@ pub fn init_wal_receiver_main_thread(
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("wal-receiver-runtime-thread")
|
||||
.worker_threads(40)
|
||||
.enable_all()
|
||||
.on_thread_start(|| IS_WAL_RECEIVER.with(|c| c.set(true)))
|
||||
.build()
|
||||
|
||||
@@ -48,7 +48,8 @@ use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_bitshift;
|
||||
use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_offset;
|
||||
use postgres_ffi::nonrelfile_utils::mx_offset_to_member_offset;
|
||||
use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::xlog_utils::wal_record_verify_checksum;
|
||||
use postgres_ffi::{page_verify_checksum, pg_constants, XLogRecord};
|
||||
|
||||
///
|
||||
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
|
||||
@@ -131,6 +132,7 @@ lazy_static! {
|
||||
pub struct PostgresRedoManager {
|
||||
tenantid: ZTenantId,
|
||||
conf: &'static PageServerConf,
|
||||
data_checksums_enabled: bool,
|
||||
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
}
|
||||
@@ -229,11 +231,16 @@ impl PostgresRedoManager {
|
||||
///
|
||||
/// Create a new PostgresRedoManager.
|
||||
///
|
||||
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
data_checksums_enabled: bool,
|
||||
tenantid: ZTenantId,
|
||||
) -> PostgresRedoManager {
|
||||
// The actual process is launched lazily, on first request.
|
||||
PostgresRedoManager {
|
||||
tenantid,
|
||||
conf,
|
||||
data_checksums_enabled,
|
||||
process: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
@@ -268,7 +275,13 @@ impl PostgresRedoManager {
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
let result = process
|
||||
.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout)
|
||||
.apply_wal_records(
|
||||
buf_tag,
|
||||
base_img,
|
||||
records,
|
||||
wal_redo_timeout,
|
||||
self.data_checksums_enabled,
|
||||
)
|
||||
.map_err(WalRedoError::IoError);
|
||||
|
||||
let end_time = Instant::now();
|
||||
@@ -619,6 +632,7 @@ impl PostgresRedoProcess {
|
||||
info!("running initdb in {:?}", datadir.display());
|
||||
let initdb = Command::new(conf.pg_bin_dir().join("initdb"))
|
||||
.args(&["-D", &datadir.to_string_lossy()])
|
||||
.arg("--data-checksums")
|
||||
.arg("-N")
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", conf.pg_lib_dir())
|
||||
@@ -716,6 +730,7 @@ impl PostgresRedoProcess {
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, ZenithWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
data_checksums_enabled: bool,
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
//
|
||||
@@ -725,6 +740,15 @@ impl PostgresRedoProcess {
|
||||
let mut writebuf: Vec<u8> = Vec::new();
|
||||
build_begin_redo_for_block_msg(tag, &mut writebuf);
|
||||
if let Some(img) = base_img {
|
||||
// Checksums could be not stamped for old tenants, so check them only if they
|
||||
// are enabled (this is controlled by per-tenant config).
|
||||
if data_checksums_enabled && !unsafe { page_verify_checksum(&img, tag.blknum) } {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("block {} of relation {} is invalid", tag.blknum, tag.rel),
|
||||
));
|
||||
}
|
||||
|
||||
build_push_page_msg(tag, &img, &mut writebuf);
|
||||
}
|
||||
for (lsn, rec) in records.iter() {
|
||||
@@ -733,6 +757,27 @@ impl PostgresRedoProcess {
|
||||
rec: postgres_rec,
|
||||
} = rec
|
||||
{
|
||||
let xlogrec = XLogRecord::from_buf(postgres_rec).map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"could not deserialize WAL record for relation {} at LSN {}: {}",
|
||||
tag.rel, lsn, e
|
||||
),
|
||||
)
|
||||
})?;
|
||||
// WAL records always have a checksum, check it before sending to redo process.
|
||||
// It doesn't do these checks itself.
|
||||
if !wal_record_verify_checksum(&xlogrec, postgres_rec) {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"WAL record for relation {} at LSN {} is invalid",
|
||||
tag.rel, lsn
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
|
||||
} else {
|
||||
return Err(Error::new(
|
||||
|
||||
@@ -2,18 +2,16 @@ use anyhow::{Context, Result};
|
||||
use etcd_broker::subscription_key::{
|
||||
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
|
||||
};
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use postgres_ffi::xlog_utils::{
|
||||
XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, MAX_SEND_SIZE, PG_TLI,
|
||||
};
|
||||
use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI};
|
||||
use remote_storage::{GenericRemoteStorage, RemoteStorage};
|
||||
use tokio::fs::File;
|
||||
use tokio::runtime::Builder;
|
||||
@@ -452,45 +450,41 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
|
||||
pub async fn read_object(
|
||||
file_path: PathBuf,
|
||||
offset: u64,
|
||||
) -> (impl AsyncRead, JoinHandle<Result<()>>) {
|
||||
let storage = REMOTE_STORAGE.get().expect("failed to get remote storage");
|
||||
) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead>>> {
|
||||
let download = match REMOTE_STORAGE
|
||||
.get()
|
||||
.context("Failed to get remote storage")?
|
||||
.as_ref()
|
||||
.context("No remote storage configured")?
|
||||
{
|
||||
GenericRemoteStorage::Local(local_storage) => {
|
||||
let source = local_storage.remote_object_id(&file_path)?;
|
||||
|
||||
let (mut pipe_writer, pipe_reader) = tokio::io::duplex(MAX_SEND_SIZE);
|
||||
|
||||
let copy_result = tokio::spawn(async move {
|
||||
let res = match storage.as_ref().unwrap() {
|
||||
GenericRemoteStorage::Local(local_storage) => {
|
||||
let source = local_storage.remote_object_id(&file_path)?;
|
||||
|
||||
info!(
|
||||
"local download about to start from {} at offset {}",
|
||||
source.display(),
|
||||
offset
|
||||
);
|
||||
local_storage
|
||||
.download_byte_range(&source, offset, None, &mut pipe_writer)
|
||||
.await
|
||||
}
|
||||
GenericRemoteStorage::S3(s3_storage) => {
|
||||
let s3key = s3_storage.remote_object_id(&file_path)?;
|
||||
|
||||
info!(
|
||||
"S3 download about to start from {:?} at offset {}",
|
||||
s3key, offset
|
||||
);
|
||||
s3_storage
|
||||
.download_byte_range(&s3key, offset, None, &mut pipe_writer)
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = res {
|
||||
error!("failed to download WAL segment from remote storage: {}", e);
|
||||
Err(e)
|
||||
} else {
|
||||
Ok(())
|
||||
info!(
|
||||
"local download about to start from {} at offset {}",
|
||||
source.display(),
|
||||
offset
|
||||
);
|
||||
local_storage
|
||||
.download_byte_range(&source, offset, None)
|
||||
.await
|
||||
}
|
||||
});
|
||||
GenericRemoteStorage::S3(s3_storage) => {
|
||||
let s3key = s3_storage.remote_object_id(&file_path)?;
|
||||
|
||||
(pipe_reader, copy_result)
|
||||
info!(
|
||||
"S3 download about to start from {:?} at offset {}",
|
||||
s3key, offset
|
||||
);
|
||||
s3_storage.download_byte_range(&s3key, offset, None).await
|
||||
}
|
||||
}
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to open WAL segment download stream for local storage path {}",
|
||||
file_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(download.download_stream)
|
||||
}
|
||||
|
||||
@@ -604,8 +604,7 @@ impl WalReader {
|
||||
|
||||
// Try to open remote file, if remote reads are enabled
|
||||
if self.enable_remote_read {
|
||||
let (reader, _) = read_object(wal_file_path, xlogoff as u64).await;
|
||||
return Ok(Box::pin(reader));
|
||||
return read_object(wal_file_path, xlogoff as u64).await;
|
||||
}
|
||||
|
||||
bail!("WAL segment is not found")
|
||||
|
||||
@@ -1,222 +0,0 @@
|
||||
#
|
||||
# Simple script to export nodes from one pageserver
|
||||
# and import them into another page server
|
||||
#
|
||||
from os import path
|
||||
import os
|
||||
import requests
|
||||
import uuid
|
||||
import subprocess
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
# directory to save exported tar files to
|
||||
basepath = path.dirname(path.abspath(__file__))
|
||||
|
||||
|
||||
class NeonPageserverApiException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NeonPageserverHttpClient(requests.Session):
|
||||
def __init__(self, host, port):
|
||||
super().__init__()
|
||||
self.host = host
|
||||
self.port = port
|
||||
|
||||
def verbose_error(self, res: requests.Response):
|
||||
try:
|
||||
res.raise_for_status()
|
||||
except requests.RequestException as e:
|
||||
try:
|
||||
msg = res.json()['msg']
|
||||
except:
|
||||
msg = ''
|
||||
raise NeonPageserverApiException(msg) from e
|
||||
|
||||
def check_status(self):
|
||||
self.get(f"http://{self.host}:{self.port}/v1/status").raise_for_status()
|
||||
|
||||
def tenant_list(self):
|
||||
res = self.get(f"http://{self.host}:{self.port}/v1/tenant")
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, list)
|
||||
return res_json
|
||||
|
||||
def tenant_create(self, new_tenant_id: uuid.UUID, ok_if_exists):
|
||||
res = self.post(
|
||||
f"http://{self.host}:{self.port}/v1/tenant",
|
||||
json={
|
||||
'new_tenant_id': new_tenant_id.hex,
|
||||
},
|
||||
)
|
||||
|
||||
if res.status_code == 409:
|
||||
if ok_if_exists:
|
||||
print(f'could not create tenant: already exists for id {new_tenant_id}')
|
||||
else:
|
||||
res.raise_for_status()
|
||||
elif res.status_code == 201:
|
||||
print(f'created tenant {new_tenant_id}')
|
||||
else:
|
||||
self.verbose_error(res)
|
||||
|
||||
return new_tenant_id
|
||||
|
||||
def timeline_list(self, tenant_id: uuid.UUID):
|
||||
res = self.get(f"http://{self.host}:{self.port}/v1/tenant/{tenant_id.hex}/timeline")
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, list)
|
||||
return res_json
|
||||
|
||||
|
||||
def main(args: argparse.Namespace):
|
||||
old_pageserver_host = args.old_pageserver_host
|
||||
new_pageserver_host = args.new_pageserver_host
|
||||
tenants = args.tenants
|
||||
|
||||
old_http_client = NeonPageserverHttpClient(old_pageserver_host, args.old_pageserver_http_port)
|
||||
old_http_client.check_status()
|
||||
old_pageserver_connstr = f"postgresql://{old_pageserver_host}:{args.old_pageserver_pg_port}"
|
||||
|
||||
new_http_client = NeonPageserverHttpClient(new_pageserver_host, args.new_pageserver_http_port)
|
||||
new_http_client.check_status()
|
||||
new_pageserver_connstr = f"postgresql://{new_pageserver_host}:{args.new_pageserver_pg_port}"
|
||||
|
||||
psql_env = {**os.environ, 'LD_LIBRARY_PATH': '/usr/local/lib/'}
|
||||
|
||||
for tenant_id in tenants:
|
||||
print(f"Tenant: {tenant_id}")
|
||||
timelines = old_http_client.timeline_list(uuid.UUID(tenant_id))
|
||||
print(f"Timelines: {timelines}")
|
||||
|
||||
# Create tenant in new pageserver
|
||||
if args.only_import is False:
|
||||
new_http_client.tenant_create(uuid.UUID(tenant_id), args.ok_if_exists)
|
||||
|
||||
for timeline in timelines:
|
||||
|
||||
# Export timelines from old pageserver
|
||||
if args.only_import is False:
|
||||
query = f"fullbackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']}"
|
||||
|
||||
cmd = ["psql", "--no-psqlrc", old_pageserver_connstr, "-c", query]
|
||||
print(f"Running: {cmd}")
|
||||
|
||||
tar_filename = path.join(basepath,
|
||||
f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar")
|
||||
stderr_filename = path.join(
|
||||
basepath, f"{timeline['tenant_id']}_{timeline['timeline_id']}.stderr")
|
||||
|
||||
with open(tar_filename, 'w') as stdout_f:
|
||||
with open(stderr_filename, 'w') as stderr_f:
|
||||
print(f"(capturing output to {tar_filename})")
|
||||
subprocess.run(cmd, stdout=stdout_f, stderr=stderr_f, env=psql_env)
|
||||
|
||||
print(f"Done export: {tar_filename}")
|
||||
|
||||
# Import timelines to new pageserver
|
||||
psql_path = Path(args.psql_path)
|
||||
import_cmd = f"import basebackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']} {timeline['local']['last_record_lsn']}"
|
||||
tar_filename = path.join(basepath,
|
||||
f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar")
|
||||
full_cmd = rf"""cat {tar_filename} | {psql_path} {new_pageserver_connstr} -c '{import_cmd}' """
|
||||
|
||||
stderr_filename2 = path.join(
|
||||
basepath, f"import_{timeline['tenant_id']}_{timeline['timeline_id']}.stderr")
|
||||
stdout_filename = path.join(
|
||||
basepath, f"import_{timeline['tenant_id']}_{timeline['timeline_id']}.stdout")
|
||||
|
||||
print(f"Running: {full_cmd}")
|
||||
|
||||
with open(stdout_filename, 'w') as stdout_f:
|
||||
with open(stderr_filename2, 'w') as stderr_f:
|
||||
print(f"(capturing output to {stdout_filename})")
|
||||
subprocess.run(full_cmd,
|
||||
stdout=stdout_f,
|
||||
stderr=stderr_f,
|
||||
env=psql_env,
|
||||
shell=True)
|
||||
|
||||
print(f"Done import")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'--tenant-id',
|
||||
dest='tenants',
|
||||
required=True,
|
||||
nargs='+',
|
||||
help='Id of the tenant to migrate. You can pass multiple arguments',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--from-host',
|
||||
dest='old_pageserver_host',
|
||||
required=True,
|
||||
help='Host of the pageserver to migrate data from',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--from-http-port',
|
||||
dest='old_pageserver_http_port',
|
||||
required=False,
|
||||
type=int,
|
||||
default=9898,
|
||||
help='HTTP port of the pageserver to migrate data from. Default: 9898',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--from-pg-port',
|
||||
dest='old_pageserver_pg_port',
|
||||
required=False,
|
||||
type=int,
|
||||
default=6400,
|
||||
help='pg port of the pageserver to migrate data from. Default: 6400',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--to-host',
|
||||
dest='new_pageserver_host',
|
||||
required=True,
|
||||
help='Host of the pageserver to migrate data to',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--to-http-port',
|
||||
dest='new_pageserver_http_port',
|
||||
required=False,
|
||||
default=9898,
|
||||
type=int,
|
||||
help='HTTP port of the pageserver to migrate data to. Default: 9898',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--to-pg-port',
|
||||
dest='new_pageserver_pg_port',
|
||||
required=False,
|
||||
default=6400,
|
||||
type=int,
|
||||
help='pg port of the pageserver to migrate data to. Default: 6400',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--ignore-tenant-exists',
|
||||
dest='ok_if_exists',
|
||||
required=False,
|
||||
help=
|
||||
'Ignore error if we are trying to create the tenant that already exists. It can be dangerous if existing tenant already contains some data.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--psql-path',
|
||||
dest='psql_path',
|
||||
required=False,
|
||||
default='/usr/local/bin/psql',
|
||||
help='Path to the psql binary. Default: /usr/local/bin/psql',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--only-import',
|
||||
dest='only_import',
|
||||
required=False,
|
||||
default=False,
|
||||
action='store_true',
|
||||
help='Skip export and tenant creation part',
|
||||
)
|
||||
args = parser.parse_args()
|
||||
main(args)
|
||||
@@ -37,7 +37,7 @@ You can run all the tests with:
|
||||
|
||||
If you want to run all the tests in a particular file:
|
||||
|
||||
`./scripts/pytest test_pgbench.py`
|
||||
`./scripts/pytest test_runner/batch_others/test_restart_compute.py`
|
||||
|
||||
If you want to run all tests that have the string "bench" in their names:
|
||||
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from contextlib import closing
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.log_helper import log
|
||||
|
||||
|
||||
#
|
||||
# Test restarting and recreating a postgres instance
|
||||
#
|
||||
@pytest.mark.parametrize('with_safekeepers', [False, True])
|
||||
def test_restart_compute(neon_env_builder: NeonEnvBuilder, with_safekeepers: bool):
|
||||
neon_env_builder.auth_enabled = True
|
||||
if with_safekeepers:
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch('test_restart_compute')
|
||||
pg = env.postgres.create_start('test_restart_compute')
|
||||
log.info("postgres is running on 'test_restart_compute' branch")
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('CREATE TABLE t(key int primary key, value text)')
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
||||
cur.execute('SELECT sum(key) FROM t')
|
||||
r = cur.fetchone()
|
||||
assert r == (5000050000, )
|
||||
log.info(f"res = {r}")
|
||||
|
||||
# Remove data directory and restart
|
||||
pg.stop_and_destroy().create_start('test_restart_compute')
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the row
|
||||
cur.execute('SELECT sum(key) FROM t')
|
||||
r = cur.fetchone()
|
||||
assert r == (5000050000, )
|
||||
log.info(f"res = {r}")
|
||||
|
||||
# Insert another row
|
||||
cur.execute("INSERT INTO t VALUES (100001, 'payload2')")
|
||||
cur.execute('SELECT count(*) FROM t')
|
||||
|
||||
r = cur.fetchone()
|
||||
assert r == (100001, )
|
||||
log.info(f"res = {r}")
|
||||
|
||||
# Again remove data directory and restart
|
||||
pg.stop_and_destroy().create_start('test_restart_compute')
|
||||
|
||||
# That select causes lots of FPI's and increases probability of wakeepers
|
||||
# lagging behind after query completion
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the rows
|
||||
cur.execute('SELECT count(*) FROM t')
|
||||
|
||||
r = cur.fetchone()
|
||||
assert r == (100001, )
|
||||
log.info(f"res = {r}")
|
||||
|
||||
# And again remove data directory and restart
|
||||
pg.stop_and_destroy().create_start('test_restart_compute')
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the rows
|
||||
cur.execute('SELECT count(*) FROM t')
|
||||
|
||||
r = cur.fetchone()
|
||||
assert r == (100001, )
|
||||
log.info(f"res = {r}")
|
||||
@@ -10,8 +10,8 @@ from typing import Optional
|
||||
import signal
|
||||
import pytest
|
||||
|
||||
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir
|
||||
from fixtures.utils import lsn_from_hex
|
||||
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir, base_dir
|
||||
from fixtures.utils import lsn_from_hex, subprocess_capture
|
||||
|
||||
|
||||
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
|
||||
@@ -101,13 +101,23 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
|
||||
log.info('load thread stopped')
|
||||
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason=
|
||||
"needs to replace callmemaybe call with better idea how to migrate timelines between pageservers"
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
'method',
|
||||
[
|
||||
# A minor migration involves no storage breaking changes.
|
||||
# It is done by attaching the tenant to a new pageserver.
|
||||
'minor',
|
||||
# A major migration involves exporting a postgres datadir
|
||||
# basebackup and importing it into the new pageserver.
|
||||
# This kind of migration can tolerate breaking changes
|
||||
# to storage format
|
||||
pytest.param('major', marks=pytest.mark.xfail(reason="Not implemented")),
|
||||
])
|
||||
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
|
||||
def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
port_distributor: PortDistributor,
|
||||
test_output_dir,
|
||||
method: str,
|
||||
with_load: str):
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
|
||||
@@ -157,8 +167,11 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
|
||||
load_stop_event = threading.Event()
|
||||
load_ok_event = threading.Event()
|
||||
load_thread = threading.Thread(target=load,
|
||||
args=(tenant_pg, load_stop_event, load_ok_event))
|
||||
load_thread = threading.Thread(
|
||||
target=load,
|
||||
args=(tenant_pg, load_stop_event, load_ok_event),
|
||||
daemon=True, # To make sure the child dies when the parent errors
|
||||
)
|
||||
load_thread.start()
|
||||
|
||||
# run checkpoint manually to be sure that data landed in remote storage
|
||||
@@ -188,30 +201,47 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
new_pageserver_http_port,
|
||||
neon_env_builder.broker):
|
||||
|
||||
# call to attach timeline to new pageserver
|
||||
new_pageserver_http.timeline_attach(tenant, timeline)
|
||||
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
|
||||
new_timeline_detail = wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
|
||||
# Migrate either by attaching from s3 or import/export basebackup
|
||||
if method == "major":
|
||||
cmd = [
|
||||
"python",
|
||||
os.path.join(base_dir, "scripts/export_import_between_pageservers.py"),
|
||||
"--tenant-id",
|
||||
tenant.hex,
|
||||
"--from-host",
|
||||
"localhost",
|
||||
"--from-http-port",
|
||||
str(pageserver_http.port),
|
||||
"--from-pg-port",
|
||||
str(env.pageserver.service_port.pg),
|
||||
"--to-host",
|
||||
"localhost",
|
||||
"--to-http-port",
|
||||
str(new_pageserver_http_port),
|
||||
"--to-pg-port",
|
||||
str(new_pageserver_pg_port),
|
||||
"--psql-path",
|
||||
os.path.join(pg_distrib_dir, "bin", "psql"),
|
||||
"--work-dir",
|
||||
os.path.join(test_output_dir),
|
||||
]
|
||||
subprocess_capture(str(env.repo_dir), cmd, check=True)
|
||||
elif method == "minor":
|
||||
# call to attach timeline to new pageserver
|
||||
new_pageserver_http.timeline_attach(tenant, timeline)
|
||||
|
||||
# when load is active these checks can break because lsns are not static
|
||||
# so lets check with some margin
|
||||
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
||||
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
|
||||
0.03)
|
||||
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
|
||||
new_timeline_detail = wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
|
||||
|
||||
# callmemaybe to start replication from safekeeper to the new pageserver
|
||||
# when there is no load there is a clean checkpoint and no wal delta
|
||||
# needs to be streamed to the new pageserver
|
||||
# TODO (rodionov) use attach to start replication
|
||||
with pg_cur(PgProtocol(host='localhost', port=new_pageserver_pg_port)) as cur:
|
||||
# "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'"
|
||||
safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
|
||||
cur.execute("callmemaybe {} {} {}".format(tenant.hex,
|
||||
timeline.hex,
|
||||
safekeeper_connstring))
|
||||
# when load is active these checks can break because lsns are not static
|
||||
# so lets check with some margin
|
||||
assert_abs_margin_ratio(
|
||||
lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
||||
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
|
||||
0.03)
|
||||
|
||||
tenant_pg.stop()
|
||||
|
||||
|
||||
@@ -682,7 +682,7 @@ class ProposerPostgres(PgProtocol):
|
||||
def initdb(self):
|
||||
""" Run initdb """
|
||||
|
||||
args = ["initdb", "-U", "cloud_admin", "-D", self.pg_data_dir_path()]
|
||||
args = ["initdb", "-U", "cloud_admin", "-D", self.pg_data_dir_path(), "--data-checksums"]
|
||||
self.pg_bin.run(args)
|
||||
|
||||
def start(self):
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import uuid
|
||||
|
||||
import asyncpg
|
||||
import random
|
||||
import time
|
||||
@@ -7,7 +8,7 @@ import time
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, Safekeeper
|
||||
from fixtures.log_helper import getLogger
|
||||
from fixtures.utils import lsn_from_hex, lsn_to_hex
|
||||
from typing import List
|
||||
from typing import List, Optional
|
||||
|
||||
log = getLogger('root.safekeeper_async')
|
||||
|
||||
@@ -234,3 +235,156 @@ def test_restarts_frequent_checkpoints(neon_env_builder: NeonEnvBuilder):
|
||||
# we try to simulate large (flush_lsn - truncate_lsn) lag, to test that WAL segments
|
||||
# are not removed before broadcasted to all safekeepers, with the help of replication slot
|
||||
asyncio.run(run_restarts_under_load(env, pg, env.safekeepers, period_time=15, iterations=5))
|
||||
|
||||
|
||||
def postgres_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]):
|
||||
pg = Postgres(
|
||||
env,
|
||||
tenant_id=env.initial_tenant,
|
||||
port=env.port_distributor.get_port(),
|
||||
# In these tests compute has high probability of terminating on its own
|
||||
# before our stop() due to lost consensus leadership.
|
||||
check_stop_result=False)
|
||||
|
||||
# embed current time in node name
|
||||
node_name = pgdir_name or f'pg_node_{time.time()}'
|
||||
return pg.create_start(branch_name=branch,
|
||||
node_name=node_name,
|
||||
config_lines=['log_statement=all'])
|
||||
|
||||
|
||||
async def exec_compute_query(env: NeonEnv,
|
||||
branch: str,
|
||||
query: str,
|
||||
pgdir_name: Optional[str] = None):
|
||||
with postgres_create_start(env, branch=branch, pgdir_name=pgdir_name) as pg:
|
||||
before_conn = time.time()
|
||||
conn = await pg.connect_async()
|
||||
res = await conn.fetch(query)
|
||||
await conn.close()
|
||||
after_conn = time.time()
|
||||
log.info(f'{query} took {after_conn - before_conn}s')
|
||||
return res
|
||||
|
||||
|
||||
async def run_compute_restarts(env: NeonEnv,
|
||||
queries=16,
|
||||
batch_insert=10000,
|
||||
branch='test_compute_restarts'):
|
||||
cnt = 0
|
||||
sum = 0
|
||||
|
||||
await exec_compute_query(env, branch, 'CREATE TABLE t (i int)')
|
||||
|
||||
for i in range(queries):
|
||||
if i % 4 == 0:
|
||||
await exec_compute_query(
|
||||
env, branch, f'INSERT INTO t SELECT 1 FROM generate_series(1, {batch_insert})')
|
||||
sum += batch_insert
|
||||
cnt += batch_insert
|
||||
elif (i % 4 == 1) or (i % 4 == 3):
|
||||
# Note that select causes lots of FPI's and increases probability of safekeepers
|
||||
# standing at different LSNs after compute termination.
|
||||
actual_sum = (await exec_compute_query(env, branch, 'SELECT SUM(i) FROM t'))[0][0]
|
||||
assert actual_sum == sum, f'Expected sum={sum}, actual={actual_sum}'
|
||||
elif i % 4 == 2:
|
||||
await exec_compute_query(env, branch, 'UPDATE t SET i = i + 1')
|
||||
sum += cnt
|
||||
|
||||
|
||||
# Add a test which creates compute for every query, and then destroys it right after.
|
||||
def test_compute_restarts(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch('test_compute_restarts')
|
||||
asyncio.run(run_compute_restarts(env))
|
||||
|
||||
|
||||
class BackgroundCompute(object):
|
||||
def __init__(self, index: int, env: NeonEnv, branch: str):
|
||||
self.index = index
|
||||
self.env = env
|
||||
self.branch = branch
|
||||
self.running = False
|
||||
self.stopped = False
|
||||
self.total_tries = 0
|
||||
self.successful_queries: List[int] = []
|
||||
|
||||
async def run(self):
|
||||
if self.running:
|
||||
raise Exception('BackgroundCompute is already running')
|
||||
|
||||
self.running = True
|
||||
i = 0
|
||||
while not self.stopped:
|
||||
try:
|
||||
verify_key = (self.index << 16) + i
|
||||
i += 1
|
||||
self.total_tries += 1
|
||||
res = await exec_compute_query(
|
||||
self.env,
|
||||
self.branch,
|
||||
f'INSERT INTO query_log(index, verify_key) VALUES ({self.index}, {verify_key}) RETURNING verify_key',
|
||||
pgdir_name=f'bgcompute{self.index}_key{verify_key}',
|
||||
)
|
||||
log.info(f'result: {res}')
|
||||
if len(res) != 1:
|
||||
raise Exception('No result returned')
|
||||
if res[0][0] != verify_key:
|
||||
raise Exception('Wrong result returned')
|
||||
self.successful_queries.append(verify_key)
|
||||
except Exception as e:
|
||||
log.info(f'BackgroundCompute {self.index} query failed: {e}')
|
||||
|
||||
# With less sleep, there is a very big chance of not committing
|
||||
# anything or only 1 xact during test run.
|
||||
await asyncio.sleep(2 * random.random())
|
||||
self.running = False
|
||||
|
||||
|
||||
async def run_concurrent_computes(env: NeonEnv,
|
||||
num_computes=10,
|
||||
run_seconds=20,
|
||||
branch='test_concurrent_computes'):
|
||||
await exec_compute_query(
|
||||
env,
|
||||
branch,
|
||||
'CREATE TABLE query_log (t timestamp default now(), index int, verify_key int)')
|
||||
|
||||
computes = [BackgroundCompute(i, env, branch) for i in range(num_computes)]
|
||||
background_tasks = [asyncio.create_task(compute.run()) for compute in computes]
|
||||
|
||||
await asyncio.sleep(run_seconds)
|
||||
for compute in computes[1:]:
|
||||
compute.stopped = True
|
||||
log.info("stopped all tasks but one")
|
||||
|
||||
# work for some time with only one compute -- it should be able to make some xacts
|
||||
await asyncio.sleep(8)
|
||||
computes[0].stopped = True
|
||||
|
||||
await asyncio.gather(*background_tasks)
|
||||
|
||||
result = await exec_compute_query(env, branch, 'SELECT * FROM query_log')
|
||||
# we should have inserted something while single compute was running
|
||||
assert len(result) >= 4
|
||||
log.info(f'Executed {len(result)} queries')
|
||||
for row in result:
|
||||
log.info(f'{row[0]} {row[1]} {row[2]}')
|
||||
|
||||
# ensure everything reported as committed wasn't lost
|
||||
for compute in computes:
|
||||
for verify_key in compute.successful_queries:
|
||||
assert verify_key in [row[2] for row in result]
|
||||
|
||||
|
||||
# Run multiple computes concurrently, creating-destroying them after single
|
||||
# query. Ensure we don't lose any xacts reported as committed and be able to
|
||||
# progress once only one compute remains.
|
||||
def test_concurrent_computes(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch('test_concurrent_computes')
|
||||
asyncio.run(run_concurrent_computes(env))
|
||||
|
||||
@@ -1160,6 +1160,7 @@ class NeonCli:
|
||||
node_name: str,
|
||||
tenant_id: Optional[uuid.UUID] = None,
|
||||
destroy=False,
|
||||
check_return_code=True,
|
||||
) -> 'subprocess.CompletedProcess[str]':
|
||||
args = [
|
||||
'pg',
|
||||
@@ -1172,7 +1173,7 @@ class NeonCli:
|
||||
if node_name is not None:
|
||||
args.append(node_name)
|
||||
|
||||
return self.raw_cli(args)
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
|
||||
def raw_cli(self,
|
||||
arguments: List[str],
|
||||
@@ -1188,6 +1189,8 @@ class NeonCli:
|
||||
>>> result = env.neon_cli.raw_cli(...)
|
||||
>>> assert result.stderr == ""
|
||||
>>> log.info(result.stdout)
|
||||
|
||||
If `check_return_code`, on non-zero exit code logs failure and raises.
|
||||
"""
|
||||
|
||||
assert type(arguments) == list
|
||||
@@ -1213,27 +1216,27 @@ class NeonCli:
|
||||
env_vars[var] = val
|
||||
|
||||
# Intercept CalledProcessError and print more info
|
||||
try:
|
||||
res = subprocess.run(args,
|
||||
env=env_vars,
|
||||
check=True,
|
||||
universal_newlines=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
res = subprocess.run(args,
|
||||
env=env_vars,
|
||||
check=False,
|
||||
universal_newlines=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
if not res.returncode:
|
||||
log.info(f"Run success: {res.stdout}")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
elif check_return_code:
|
||||
# this way command output will be in recorded and shown in CI in failure message
|
||||
msg = f"""\
|
||||
Run failed: {exc}
|
||||
stdout: {exc.stdout}
|
||||
stderr: {exc.stderr}
|
||||
Run {res.args} failed:
|
||||
stdout: {res.stdout}
|
||||
stderr: {res.stderr}
|
||||
"""
|
||||
log.info(msg)
|
||||
raise Exception(msg) from subprocess.CalledProcessError(res.returncode,
|
||||
res.args,
|
||||
res.stdout,
|
||||
res.stderr)
|
||||
|
||||
raise Exception(msg) from exc
|
||||
|
||||
if check_return_code:
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
|
||||
@@ -1526,7 +1529,11 @@ def static_proxy(vanilla_pg, port_distributor) -> Iterator[NeonProxy]:
|
||||
|
||||
class Postgres(PgProtocol):
|
||||
""" An object representing a running postgres daemon. """
|
||||
def __init__(self, env: NeonEnv, tenant_id: uuid.UUID, port: int):
|
||||
def __init__(self,
|
||||
env: NeonEnv,
|
||||
tenant_id: uuid.UUID,
|
||||
port: int,
|
||||
check_stop_result: bool = True):
|
||||
super().__init__(host='localhost', port=port, user='cloud_admin', dbname='postgres')
|
||||
self.env = env
|
||||
self.running = False
|
||||
@@ -1534,6 +1541,7 @@ class Postgres(PgProtocol):
|
||||
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
|
||||
self.tenant_id = tenant_id
|
||||
self.port = port
|
||||
self.check_stop_result = check_stop_result
|
||||
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<node_name>/postgresql.conf
|
||||
|
||||
def create(
|
||||
@@ -1585,8 +1593,6 @@ class Postgres(PgProtocol):
|
||||
port=self.port)
|
||||
self.running = True
|
||||
|
||||
log.info(f"stdout: {run_result.stdout}")
|
||||
|
||||
return self
|
||||
|
||||
def pg_data_dir_path(self) -> str:
|
||||
@@ -1650,7 +1656,9 @@ class Postgres(PgProtocol):
|
||||
|
||||
if self.running:
|
||||
assert self.node_name is not None
|
||||
self.env.neon_cli.pg_stop(self.node_name, self.tenant_id)
|
||||
self.env.neon_cli.pg_stop(self.node_name,
|
||||
self.tenant_id,
|
||||
check_return_code=self.check_stop_result)
|
||||
self.running = False
|
||||
|
||||
return self
|
||||
@@ -1662,7 +1670,10 @@ class Postgres(PgProtocol):
|
||||
"""
|
||||
|
||||
assert self.node_name is not None
|
||||
self.env.neon_cli.pg_stop(self.node_name, self.tenant_id, True)
|
||||
self.env.neon_cli.pg_stop(self.node_name,
|
||||
self.tenant_id,
|
||||
True,
|
||||
check_return_code=self.check_stop_result)
|
||||
self.node_name = None
|
||||
self.running = False
|
||||
|
||||
@@ -1681,6 +1692,8 @@ class Postgres(PgProtocol):
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
started_at = time.time()
|
||||
|
||||
self.create(
|
||||
branch_name=branch_name,
|
||||
node_name=node_name,
|
||||
@@ -1688,6 +1701,8 @@ class Postgres(PgProtocol):
|
||||
lsn=lsn,
|
||||
).start()
|
||||
|
||||
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
||||
|
||||
return self
|
||||
|
||||
def __enter__(self):
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 35ad142301...b62a5c1aec
Reference in New Issue
Block a user