Compare commits

..

1 Commits

Author SHA1 Message Date
Anastasia Lubennikova
263a3ea5e3 Add script export_import_betwen_pageservers.py to migrate projects between pageservers 2022-07-05 15:27:31 +03:00
42 changed files with 709 additions and 967 deletions

View File

@@ -12,7 +12,6 @@ pageservers
safekeepers
[storage:vars]
env_name = neon-stress
console_mgmt_base_url = http://neon-stress-console.local
bucket_name = neon-storage-ireland
bucket_region = eu-west-1

View File

@@ -495,8 +495,8 @@ jobs:
name: Re-deploy proxy
command: |
DOCKER_TAG=$(git log --oneline|wc -l)
helm upgrade neon-proxy neondatabase/neon-proxy --install -f .circleci/helm-values/staging.proxy.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade neon-proxy-scram neondatabase/neon-proxy --install -f .circleci/helm-values/staging.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade neon-proxy neondatabase/neon-proxy --install -f .circleci/helm-values/staging.proxy.yaml --set image.tag=${DOCKER_TAG} --wait
helm upgrade neon-proxy-scram neondatabase/neon-proxy --install -f .circleci/helm-values/staging.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait
deploy-neon-stress:
docker:

View File

@@ -85,7 +85,7 @@ runs:
EXTRA_PARAMS="-n4 $EXTRA_PARAMS"
fi
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
if [[ "$GITHUB_REF" == "refs/heads/main" ]]; then
if [[ "$GITHUB_REF" == "main" ]]; then
mkdir -p "$PERF_REPORT_DIR"
EXTRA_PARAMS="--out-dir $PERF_REPORT_DIR $EXTRA_PARAMS"
fi
@@ -115,7 +115,7 @@ runs:
-rA $TEST_SELECTION $EXTRA_PARAMS
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
if [[ "$GITHUB_REF" == "refs/heads/main" ]]; then
if [[ "$GITHUB_REF" == "main" ]]; then
export REPORT_FROM="$PERF_REPORT_DIR"
export REPORT_TO=local
scripts/generate_and_push_perf_report.sh

View File

@@ -271,9 +271,6 @@ jobs:
test_selection: performance
run_in_parallel: false
save_perf_report: true
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones

View File

@@ -248,20 +248,18 @@ pub fn wait_for_postgres(pg: &mut Child, port: &str, pgdata: &Path) -> Result<()
bail!("Postgres exited unexpectedly with code {}", code);
}
// Check that we can open pid file first.
if let Ok(file) = File::open(&pid_path) {
let file = BufReader::new(file);
let last_line = file.lines().last();
if pid_path.exists() {
let file = BufReader::new(File::open(&pid_path)?);
let status = file
.lines()
.last()
.unwrap()
.unwrap_or_else(|_| "unknown".to_string());
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
// Pid file could be there and we could read it, but it could be empty, for example.
if let Some(Ok(line)) = last_line {
let status = line.trim();
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
// Now Postgres is ready to accept connections
if status == "ready" && can_connect {
break;
}
// Now Postgres is ready to accept connections
if status.trim() == "ready" && can_connect {
break;
}
}

View File

@@ -427,7 +427,6 @@ impl PageServerNode {
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
data_checksums_enabled: Some(true),
})
.send()?
.error_from_body()?
@@ -437,7 +436,7 @@ impl PageServerNode {
.map(|id| {
id.parse().with_context(|| {
format!(
"Failed to parse tenant creation response as tenant id: {}",
"Failed to parse tennat creation response as tenant id: {}",
id
)
})

View File

@@ -9,7 +9,6 @@
use serde::{Deserialize, Serialize};
use utils::lsn::Lsn;
use utils::pg_checksum_page::pg_checksum_page;
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
@@ -57,55 +56,3 @@ pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) {
pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
}
/// Calculate page checksum and stamp it onto the page.
/// NB: this will zero out and ignore any existing checksum.
/// # Safety
/// See safety notes for `pg_checksum_page`
pub unsafe fn page_set_checksum(page: &mut [u8], blkno: u32) {
let checksum = pg_checksum_page(page, blkno);
page[8..10].copy_from_slice(&checksum.to_le_bytes());
}
/// Check if page checksum is valid.
/// # Safety
/// See safety notes for `pg_checksum_page`
pub unsafe fn page_verify_checksum(page: &[u8], blkno: u32) -> bool {
let checksum = pg_checksum_page(page, blkno);
checksum == u16::from_le_bytes(page[8..10].try_into().unwrap())
}
#[cfg(test)]
mod tests {
use crate::pg_constants::BLCKSZ;
use crate::{page_set_checksum, page_verify_checksum};
use utils::pg_checksum_page::pg_checksum_page;
#[test]
fn set_and_verify_checksum() {
// Create a page with some content and without a correct checksum.
let mut page: [u8; BLCKSZ as usize] = [0; BLCKSZ as usize];
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ as usize) {
*byte = i as u8;
}
// Calculate the checksum.
let checksum = unsafe { pg_checksum_page(&page[..], 0) };
// Sanity check: random bytes in the checksum attribute should not be
// a valid checksum.
assert_ne!(
checksum,
u16::from_le_bytes(page[8..10].try_into().unwrap())
);
// Set the actual checksum.
unsafe { page_set_checksum(&mut page, 0) };
// Verify the checksum.
assert!(unsafe { page_verify_checksum(&page[..], 0) });
// Checksum is not valid with another block number.
assert!(!unsafe { page_verify_checksum(&page[..], 1) });
}
}

View File

@@ -14,6 +14,7 @@ use super::XLogLongPageHeaderData;
use super::XLogPageHeaderData;
use super::XLogRecord;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crc32c::*;
use log::*;
use std::cmp::min;
use thiserror::Error;
@@ -197,12 +198,18 @@ impl WalStreamDecoder {
}
// We now have a record in the 'recordbuf' local variable.
let xlogrec = XLogRecord::from_buf(&recordbuf).map_err(|e| WalDecodeError {
msg: format!("xlog record deserialization failed {}", e),
lsn: self.lsn,
})?;
let xlogrec =
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
WalDecodeError {
msg: format!("xlog record deserialization failed {}", e),
lsn: self.lsn,
}
})?;
if !wal_record_verify_checksum(&xlogrec, &recordbuf) {
let mut crc = 0;
crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
return Err(WalDecodeError {
msg: "WAL record crc mismatch".into(),
lsn: self.lsn,

View File

@@ -477,10 +477,6 @@ impl XLogRecord {
XLogRecord::des(buf)
}
pub fn from_buf(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
XLogRecord::from_slice(&buf[0..XLOG_SIZE_OF_XLOG_RECORD])
}
pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
use utils::bin_ser::LeSer;
XLogRecord::des_from(&mut buf.reader())
@@ -746,11 +742,3 @@ mod tests {
assert_eq!(checkpoint.nextXid.value, 2048);
}
}
pub fn wal_record_verify_checksum(rec: &XLogRecord, recordbuf: &Bytes) -> bool {
let mut crc = 0;
crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
crc == rec.xl_crc
}

View File

@@ -56,7 +56,6 @@ impl Conf {
.new_pg_command("initdb")?
.arg("-D")
.arg(self.datadir.as_os_str())
.arg("--data-checksums")
.args(&["-U", "postgres", "--no-instructions", "--no-sync"])
.output()?;
debug!("initdb output: {:?}", output);

View File

@@ -12,10 +12,8 @@ use std::{
borrow::Cow,
collections::HashMap,
ffi::OsStr,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
path::{Path, PathBuf},
pin::Pin,
};
use anyhow::{bail, Context};
@@ -72,7 +70,11 @@ pub trait RemoteStorage: Send + Sync {
/// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
/// Returns the metadata, if any was stored with the file previously.
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError>;
async fn download(
&self,
from: &Self::RemoteObjectId,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>>;
/// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
/// Returns the metadata, if any was stored with the file previously.
@@ -81,49 +83,12 @@ pub trait RemoteStorage: Send + Sync {
from: &Self::RemoteObjectId,
start_inclusive: u64,
end_exclusive: Option<u64>,
) -> Result<Download, DownloadError>;
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>>;
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>;
}
pub struct Download {
pub download_stream: Pin<Box<dyn io::AsyncRead + Unpin + Send>>,
/// Extra key-value data, associated with the current remote file.
pub metadata: Option<StorageMetadata>,
}
impl Debug for Download {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Download")
.field("metadata", &self.metadata)
.finish()
}
}
#[derive(Debug)]
pub enum DownloadError {
/// Validation or other error happened due to user input.
BadInput(anyhow::Error),
/// The file was not found in the remote storage.
NotFound,
/// The file was found in the remote storage, but the download failed.
Other(anyhow::Error),
}
impl std::fmt::Display for DownloadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DownloadError::BadInput(e) => {
write!(f, "Failed to download a remote file due to user input: {e}")
}
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e}"),
}
}
}
impl std::error::Error for DownloadError {}
/// Every storage, currently supported.
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
pub enum GenericRemoteStorage {
@@ -215,7 +180,7 @@ pub struct S3Config {
pub concurrency_limit: NonZeroUsize,
}
impl Debug for S3Config {
impl std::fmt::Debug for S3Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("S3Config")
.field("bucket_name", &self.bucket_name)

View File

@@ -17,7 +17,7 @@ use tokio::{
};
use tracing::*;
use crate::{path_with_suffix_extension, Download, DownloadError};
use crate::path_with_suffix_extension;
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
@@ -192,56 +192,14 @@ impl RemoteStorage for LocalFs {
Ok(())
}
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
let file_path = self
.resolve_in_storage(from)
.map_err(DownloadError::BadInput)?;
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
let source = io::BufReader::new(
fs::OpenOptions::new()
.read(true)
.open(&file_path)
.await
.with_context(|| {
format!(
"Failed to open source file '{}' to use in the download",
file_path.display()
)
})
.map_err(DownloadError::Other)?,
);
let metadata = self
.read_storage_metadata(&file_path)
.await
.map_err(DownloadError::Other)?;
Ok(Download {
metadata,
download_stream: Box::pin(source),
})
} else {
Err(DownloadError::NotFound)
}
}
async fn download_byte_range(
async fn download(
&self,
from: &Self::RemoteObjectId,
start_inclusive: u64,
end_exclusive: Option<u64>,
) -> Result<Download, DownloadError> {
if let Some(end_exclusive) = end_exclusive {
if end_exclusive <= start_inclusive {
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
};
if start_inclusive == end_exclusive.saturating_sub(1) {
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
}
}
let file_path = self
.resolve_in_storage(from)
.map_err(DownloadError::BadInput)?;
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
let file_path = self.resolve_in_storage(from)?;
if file_path.exists() && file_path.is_file() {
let mut source = io::BufReader::new(
fs::OpenOptions::new()
.read(true)
@@ -252,31 +210,81 @@ impl RemoteStorage for LocalFs {
"Failed to open source file '{}' to use in the download",
file_path.display()
)
})
.map_err(DownloadError::Other)?,
})?,
);
io::copy(&mut source, to).await.with_context(|| {
format!(
"Failed to download file '{}' from the local storage",
file_path.display()
)
})?;
source.flush().await?;
self.read_storage_metadata(&file_path).await
} else {
bail!(
"File '{}' either does not exist or is not a file",
file_path.display()
)
}
}
async fn download_byte_range(
&self,
from: &Self::RemoteObjectId,
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
if let Some(end_exclusive) = end_exclusive {
ensure!(
end_exclusive > start_inclusive,
"Invalid range, start ({}) is bigger then end ({:?})",
start_inclusive,
end_exclusive
);
if start_inclusive == end_exclusive.saturating_sub(1) {
return Ok(None);
}
}
let file_path = self.resolve_in_storage(from)?;
if file_path.exists() && file_path.is_file() {
let mut source = io::BufReader::new(
fs::OpenOptions::new()
.read(true)
.open(&file_path)
.await
.with_context(|| {
format!(
"Failed to open source file '{}' to use in the download",
file_path.display()
)
})?,
);
source
.seek(io::SeekFrom::Start(start_inclusive))
.await
.context("Failed to seek to the range start in a local storage file")
.map_err(DownloadError::Other)?;
let metadata = self
.read_storage_metadata(&file_path)
.await
.map_err(DownloadError::Other)?;
.context("Failed to seek to the range start in a local storage file")?;
match end_exclusive {
Some(end_exclusive) => {
io::copy(&mut source.take(end_exclusive - start_inclusive), to).await
}
None => io::copy(&mut source, to).await,
}
.with_context(|| {
format!(
"Failed to download file '{}' range from the local storage",
file_path.display()
)
})?;
Ok(match end_exclusive {
Some(end_exclusive) => Download {
metadata,
download_stream: Box::pin(source.take(end_exclusive - start_inclusive)),
},
None => Download {
metadata,
download_stream: Box::pin(source),
},
})
self.read_storage_metadata(&file_path).await
} else {
Err(DownloadError::NotFound)
bail!(
"File '{}' either does not exist or is not a file",
file_path.display()
)
}
}
@@ -344,19 +352,6 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()>
Ok(())
}
fn file_exists(file_path: &Path) -> anyhow::Result<bool> {
if file_path.exists() {
ensure!(
file_path.is_file(),
"file path '{}' is not a file",
file_path.display()
);
Ok(true)
} else {
Ok(false)
}
}
#[cfg(test)]
mod pure_tests {
use tempfile::tempdir;
@@ -523,31 +518,6 @@ mod fs_tests {
use std::{collections::HashMap, io::Write};
use tempfile::tempdir;
async fn read_and_assert_remote_file_contents(
storage: &LocalFs,
#[allow(clippy::ptr_arg)]
// have to use &PathBuf due to `storage.local_path` parameter requirements
remote_storage_path: &PathBuf,
expected_metadata: Option<&StorageMetadata>,
) -> anyhow::Result<String> {
let mut download = storage
.download(remote_storage_path)
.await
.map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
ensure!(
download.metadata.as_ref() == expected_metadata,
"Unexpected metadata returned for the downloaded file"
);
let mut contents = String::new();
download
.download_stream
.read_to_string(&mut contents)
.await
.context("Failed to read remote file contents into string")?;
Ok(contents)
}
#[tokio::test]
async fn upload_file() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
@@ -598,7 +568,15 @@ mod fs_tests {
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let metadata = storage.download(&upload_target, &mut content_bytes).await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
content_bytes.flush().await?;
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
assert_eq!(
dummy_contents(upload_name),
contents,
@@ -606,9 +584,13 @@ mod fs_tests {
);
let non_existing_path = PathBuf::from("somewhere").join("else");
match storage.download(&non_existing_path).await {
Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
match storage.download(&non_existing_path, &mut io::sink()).await {
Ok(_) => panic!("Should not allow downloading non-existing storage files"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("does not exist"));
assert!(error_string.contains(&non_existing_path.display().to_string()));
}
}
Ok(())
}
@@ -621,31 +603,58 @@ mod fs_tests {
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let full_range_download_contents =
read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
let mut full_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let metadata = storage
.download_byte_range(&upload_target, 0, None, &mut full_range_bytes)
.await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
full_range_bytes.flush().await?;
assert_eq!(
dummy_contents(upload_name),
full_range_download_contents,
String::from_utf8(full_range_bytes.into_inner().into_inner())?,
"Download full range should return the whole upload"
);
let mut zero_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let same_byte = 1_000_000_000;
let metadata = storage
.download_byte_range(
&upload_target,
same_byte,
Some(same_byte + 1), // exclusive end
&mut zero_range_bytes,
)
.await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
zero_range_bytes.flush().await?;
assert!(
zero_range_bytes.into_inner().into_inner().is_empty(),
"Zero byte range should not download any part of the file"
);
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
let mut first_part_download = storage
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let metadata = storage
.download_byte_range(
&upload_target,
0,
Some(first_part_local.len() as u64),
&mut first_part_remote,
)
.await?;
assert!(
first_part_download.metadata.is_none(),
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(
&mut first_part_download.download_stream,
&mut first_part_remote,
)
.await?;
first_part_remote.flush().await?;
let first_part_remote = first_part_remote.into_inner().into_inner();
assert_eq!(
@@ -654,24 +663,20 @@ mod fs_tests {
"First part bytes should be returned when requested"
);
let mut second_part_download = storage
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let metadata = storage
.download_byte_range(
&upload_target,
first_part_local.len() as u64,
Some((first_part_local.len() + second_part_local.len()) as u64),
&mut second_part_remote,
)
.await?;
assert!(
second_part_download.metadata.is_none(),
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(
&mut second_part_download.download_stream,
&mut second_part_remote,
)
.await?;
second_part_remote.flush().await?;
let second_part_remote = second_part_remote.into_inner().into_inner();
assert_eq!(
@@ -691,30 +696,11 @@ mod fs_tests {
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let start = 1_000_000_000;
let end = start + 1;
match storage
.download_byte_range(
&upload_target,
start,
Some(end), // exclusive end
)
.await
{
Ok(_) => panic!("Should not allow downloading wrong ranges"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("zero bytes"));
assert!(error_string.contains(&start.to_string()));
assert!(error_string.contains(&end.to_string()));
}
}
let start = 10000;
let end = 234;
assert!(start > end, "Should test an incorrect range");
match storage
.download_byte_range(&upload_target, start, Some(end))
.download_byte_range(&upload_target, start, Some(end), &mut io::sink())
.await
{
Ok(_) => panic!("Should not allow downloading wrong ranges"),
@@ -726,6 +712,18 @@ mod fs_tests {
}
}
let non_existing_path = PathBuf::from("somewhere").join("else");
match storage
.download_byte_range(&non_existing_path, 1, Some(3), &mut io::sink())
.await
{
Ok(_) => panic!("Should not allow downloading non-existing storage file ranges"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("does not exist"));
assert!(error_string.contains(&non_existing_path.display().to_string()));
}
}
Ok(())
}
@@ -764,26 +762,35 @@ mod fs_tests {
let upload_target =
upload_dummy_file(&workdir, &storage, upload_name, Some(metadata.clone())).await?;
let full_range_download_contents =
read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let full_download_metadata = storage.download(&upload_target, &mut content_bytes).await?;
content_bytes.flush().await?;
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
assert_eq!(
dummy_contents(upload_name),
full_range_download_contents,
contents,
"We should upload and download the same contents"
);
assert_eq!(
full_download_metadata.as_ref(),
Some(&metadata),
"We should get the same metadata back for full download"
);
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
let (first_part_local, _) = uploaded_bytes.split_at(3);
let mut partial_download_with_metadata = storage
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
.await?;
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(
&mut partial_download_with_metadata.download_stream,
&mut first_part_remote,
)
.await?;
let partial_download_metadata = storage
.download_byte_range(
&upload_target,
0,
Some(first_part_local.len() as u64),
&mut first_part_remote,
)
.await?;
first_part_remote.flush().await?;
let first_part_remote = first_part_remote.into_inner().into_inner();
assert_eq!(
@@ -793,8 +800,8 @@ mod fs_tests {
);
assert_eq!(
partial_download_with_metadata.metadata,
Some(metadata),
partial_download_metadata.as_ref(),
Some(&metadata),
"We should get the same metadata back for partial download"
);
@@ -836,7 +843,7 @@ mod fs_tests {
}
fn dummy_contents(name: &str) -> String {
format!("contents for {name}")
format!("contents for {}", name)
}
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<PathBuf>> {

View File

@@ -9,17 +9,17 @@ use std::path::{Path, PathBuf};
use anyhow::Context;
use rusoto_core::{
credential::{InstanceMetadataProvider, StaticProvider},
HttpClient, Region, RusotoError,
HttpClient, Region,
};
use rusoto_s3::{
DeleteObjectRequest, GetObjectError, GetObjectRequest, ListObjectsV2Request, PutObjectRequest,
S3Client, StreamingBody, S3,
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client,
StreamingBody, S3,
};
use tokio::{io, sync::Semaphore};
use tokio_util::io::ReaderStream;
use tracing::debug;
use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config};
use crate::{strip_path_prefix, RemoteStorage, S3Config};
use super::StorageMetadata;
@@ -187,39 +187,6 @@ impl S3Bucket {
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
})
}
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 download")
.map_err(DownloadError::Other)?;
metrics::inc_get_object();
match self.client.get_object(request).await {
Ok(object_output) => match object_output.body {
None => {
metrics::inc_get_object_fail();
Err(DownloadError::Other(anyhow::anyhow!(
"Got no body for the S3 object given"
)))
}
Some(body) => Ok(Download {
metadata: object_output.metadata.map(StorageMetadata),
download_stream: Box::pin(io::BufReader::new(body.into_async_read())),
}),
},
Err(RusotoError::Service(GetObjectError::NoSuchKey(_))) => Err(DownloadError::NotFound),
Err(e) => {
metrics::inc_get_object_fail();
Err(DownloadError::Other(anyhow::anyhow!(
"Failed to download S3 object: {e}"
)))
}
}
}
}
#[async_trait::async_trait]
@@ -316,13 +283,38 @@ impl RemoteStorage for S3Bucket {
Ok(())
}
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
self.download_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
..GetObjectRequest::default()
})
.await
async fn download(
&self,
from: &Self::RemoteObjectId,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 download")?;
metrics::inc_get_object();
let object_output = self
.client
.get_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
..GetObjectRequest::default()
})
.await
.map_err(|e| {
metrics::inc_get_object_fail();
e
})?;
if let Some(body) = object_output.body {
let mut from = io::BufReader::new(body.into_async_read());
io::copy(&mut from, to).await?;
}
Ok(object_output.metadata.map(StorageMetadata))
}
async fn download_byte_range(
@@ -330,7 +322,8 @@ impl RemoteStorage for S3Bucket {
from: &Self::RemoteObjectId,
start_inclusive: u64,
end_exclusive: Option<u64>,
) -> Result<Download, DownloadError> {
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// and needs both ends to be exclusive
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
@@ -338,14 +331,34 @@ impl RemoteStorage for S3Bucket {
Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive),
None => format!("bytes={}-", start_inclusive),
});
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 range download")?;
self.download_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
range,
..GetObjectRequest::default()
})
.await
metrics::inc_get_object();
let object_output = self
.client
.get_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
range,
..GetObjectRequest::default()
})
.await
.map_err(|e| {
metrics::inc_get_object_fail();
e
})?;
if let Some(body) = object_output.body {
let mut from = io::BufReader::new(body.into_async_read());
io::copy(&mut from, to).await?;
}
Ok(object_output.metadata.map(StorageMetadata))
}
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> {

View File

@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};
#![allow(unused)]
use utils::pg_checksum_page::pg_checksum_page;
use criterion::{criterion_group, criterion_main, Criterion};
use utils::zid;
pub fn bench_zid_stringify(c: &mut Criterion) {
@@ -18,20 +18,5 @@ pub fn bench_zid_stringify(c: &mut Criterion) {
});
}
// NB: adding `black_box` around arguments doesn't seem to change anything.
pub fn pg_checksum_page_basic(c: &mut Criterion) {
const BLCKSZ: usize = 8192;
let mut page: [u8; BLCKSZ] = [0; BLCKSZ];
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ) {
*byte = i as u8;
}
c.bench_function("pg_checksum_page_basic", |b| {
b.iter(|| {
unsafe { pg_checksum_page(&page[..], 0) };
})
});
}
criterion_group!(benches, pg_checksum_page_basic, bench_zid_stringify);
criterion_group!(benches, bench_zid_stringify);
criterion_main!(benches);

View File

@@ -5,7 +5,7 @@ DATA_DIR=$3
PORT=$4
SYSID=`od -A n -j 24 -N 8 -t d8 $WAL_PATH/000000010000000000000002* | cut -c 3-`
rm -fr $DATA_DIR
env -i LD_LIBRARY_PATH=$PG_BIN/../lib $PG_BIN/initdb -E utf8 -U cloud_admin -D $DATA_DIR --data-checksums --sysid=$SYSID
env -i LD_LIBRARY_PATH=$PG_BIN/../lib $PG_BIN/initdb -E utf8 -U cloud_admin -D $DATA_DIR --sysid=$SYSID
echo port=$PORT >> $DATA_DIR/postgresql.conf
REDO_POS=0x`$PG_BIN/pg_controldata -D $DATA_DIR | fgrep "REDO location"| cut -c 42-`
declare -i WAL_SIZE=$REDO_POS+114

View File

@@ -54,9 +54,6 @@ pub mod nonblock;
// Default signal handling
pub mod signals;
// Postgres checksum calculation
pub mod pg_checksum_page;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

View File

@@ -1,136 +0,0 @@
///
/// Rust implementation of Postgres pg_checksum_page
/// See: https://github.com/postgres/postgres/blob/88210542106de5b26fe6aa088d1811b68502d224/src/include/storage/checksum_impl.h
/// for additional comments.
///
/// This is not a direct port of pg_checksum_page from Postgres, though.
/// For example, in the current state it can only produce a valid result
/// on the little-endian platform and with the standard 8 KB page size.
///
const BLCKSZ: usize = 8192;
const N_SUMS: usize = 32;
// Prime multiplier of FNV-1a hash
const FNV_PRIME: u32 = 16777619;
// Base offsets to initialize each of the parallel FNV hashes into a
// different initial state.
const CHECKSUM_BASE_OFFSETS: [u32; N_SUMS] = [
0x5B1F36E9, 0xB8525960, 0x02AB50AA, 0x1DE66D2A, 0x79FF467A, 0x9BB9F8A3, 0x217E7CD2, 0x83E13D2C,
0xF8D4474F, 0xE39EB970, 0x42C6AE16, 0x993216FA, 0x7B093B5D, 0x98DAFF3C, 0xF718902A, 0x0B1C9CDB,
0xE58F764B, 0x187636BC, 0x5D7B3BB1, 0xE73DE7DE, 0x92BEC979, 0xCCA6C0B2, 0x304A0979, 0x85AA43D4,
0x783125BB, 0x6CA8EAA2, 0xE407EAC6, 0x4B5CFC3E, 0x9FBF8C76, 0x15CA20BE, 0xF2CA9FD3, 0x959BD756,
];
// Calculate one round of the checksum.
fn checksum_comp(checksum: u32, value: u32) -> u32 {
let tmp = checksum ^ value;
tmp.wrapping_mul(FNV_PRIME) ^ (tmp >> 17)
}
/// Compute the checksum for a Postgres page.
///
/// The page must be adequately aligned (at least on a 4-byte boundary).
///
/// The checksum includes the block number (to detect the case where a page is
/// somehow moved to a different location), the page header (excluding the
/// checksum itself), and the page data.
///
/// As in C implementation in Postgres, the checksum attribute on the page is
/// excluded from the calculation and preserved.
///
/// NB: after doing any modifications run `cargo bench`. The baseline on the more
/// or less recent Intel laptop is around 700ns. If it's significantly higher,
/// then it's worth looking into.
///
/// # Arguments
/// * `data` - the page to checksum
/// * `blkno` - the block number of the page
///
/// # Safety
/// This function is safe to call only if:
/// * `data` is strictly a standard 8 KB Postgres page
/// * it's called on the little-endian platform
pub unsafe fn pg_checksum_page(data: &[u8], blkno: u32) -> u16 {
let page = std::mem::transmute::<&[u8], &[u32]>(data);
let mut checksum: u32 = 0;
let mut sums = CHECKSUM_BASE_OFFSETS;
// Calculate the checksum of the first 'row' of the page. Do it separately as
// we do an expensive comparison here, which is not required for the rest of the
// page. Putting it into the main loop slows it down ~3 times.
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
// Third 32-bit chunk of the page contains the checksum in the lower half
// (assuming we are on little-endian machine), which we need to zero out.
// See also `PageHeaderData` for reference.
let chunk: u32 = if j == 2 {
page[j] & 0xFFFF_0000
} else {
page[j]
};
*sum = checksum_comp(*sum, chunk);
}
// Main checksum calculation loop
for i in 1..(BLCKSZ / (4 * N_SUMS)) {
for (j, sum) in sums.iter_mut().enumerate().take(N_SUMS) {
*sum = checksum_comp(*sum, page[i * N_SUMS + j]);
}
}
// Finally, add in two rounds of zeroes for additional mixing
for _i in 0..2 {
for s in sums.iter_mut().take(N_SUMS) {
*s = checksum_comp(*s, 0);
}
}
// Xor fold partial checksums together
for sum in sums {
checksum ^= sum;
}
// Mix in the block number to detect transposed pages
checksum ^= blkno;
// Reduce to a uint16 (to fit in the pd_checksum field) with an offset of
// one. That avoids checksums of zero, which seems like a good idea.
((checksum % 65535) + 1) as u16
}
#[cfg(test)]
mod tests {
use super::{pg_checksum_page, BLCKSZ};
#[test]
fn page_with_and_without_checksum() {
// Create a page with some content and without a correct checksum.
let mut page: [u8; BLCKSZ] = [0; BLCKSZ];
for (i, byte) in page.iter_mut().enumerate().take(BLCKSZ) {
*byte = i as u8;
}
// Calculate the checksum.
let checksum = unsafe { pg_checksum_page(&page[..], 0) };
// Zero the checksum attribute on the page.
page[8..10].copy_from_slice(&[0u8; 2]);
// Calculate the checksum again, should be the same.
let new_checksum = unsafe { pg_checksum_page(&page[..], 0) };
assert_eq!(checksum, new_checksum);
// Set the correct checksum into the page.
page[8..10].copy_from_slice(&checksum.to_le_bytes());
// Calculate the checksum again, should be the same.
let new_checksum = unsafe { pg_checksum_page(&page[..], 0) };
assert_eq!(checksum, new_checksum);
// Check that we protect from the page transposition, i.e. page is the
// same, but in the wrong place.
let wrong_blockno_checksum = unsafe { pg_checksum_page(&page[..], 1) };
assert_ne!(checksum, wrong_blockno_checksum);
}
}

View File

@@ -38,7 +38,6 @@ pub struct TenantCreateRequest {
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub data_checksums_enabled: Option<bool>,
}
#[serde_as]

View File

@@ -494,8 +494,6 @@ components:
type: string
compaction_threshold:
type: string
data_checksums_enabled:
type: boolean
TenantConfigInfo:
type: object
properties:

View File

@@ -412,9 +412,6 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
// Turn on data checksums for all new tenants
tenant_conf.data_checksums_enabled = Some(request_data.data_checksums_enabled.unwrap_or(true));
if let Some(compaction_period) = request_data.compaction_period {
tenant_conf.compaction_period =
Some(humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?);

View File

@@ -516,23 +516,10 @@ pub fn import_file<R: Repository, Reader: Read>(
// Parse zenith signal file to set correct previous LSN
let bytes = read_all_bytes(reader)?;
// zenith.signal format is "PREV LSN: prev_lsn"
// TODO write serialization and deserialization in the same place.
let zenith_signal = std::str::from_utf8(&bytes)?.trim();
let prev_lsn = match zenith_signal {
"PREV LSN: none" => Lsn(0),
"PREV LSN: invalid" => Lsn(0),
other => {
let split = other.split(':').collect::<Vec<_>>();
split[1]
.trim()
.parse::<Lsn>()
.context("can't parse zenith.signal")?
}
};
let zenith_signal = std::str::from_utf8(&bytes)?;
let zenith_signal = zenith_signal.split(':').collect::<Vec<_>>();
let prev_lsn = zenith_signal[1].trim().parse::<Lsn>()?;
// zenith.signal is not necessarily the last file, that we handle
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.tline.writer();
writer.finish_write(prev_lsn);

View File

@@ -232,32 +232,23 @@ impl Repository for LayeredRepository {
fn create_empty_timeline(
&self,
timeline_id: ZTimelineId,
timelineid: ZTimelineId,
initdb_lsn: Lsn,
) -> Result<Arc<LayeredTimeline>> {
let mut timelines = self.timelines.lock().unwrap();
let vacant_timeline_entry = match timelines.entry(timeline_id) {
Entry::Occupied(_) => bail!("Timeline already exists"),
Entry::Vacant(vacant_entry) => vacant_entry,
};
let timeline_path = self.conf.timeline_path(&timeline_id, &self.tenant_id);
if timeline_path.exists() {
bail!("Timeline directory already exists, but timeline is missing in repository map. This is a bug.")
}
// Create the timeline directory, and write initial metadata to file.
crashsafe_dir::create_dir_all(timeline_path)?;
crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenant_id))?;
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn);
Self::save_metadata(self.conf, timeline_id, self.tenant_id, &metadata, true)?;
Self::save_metadata(self.conf, timelineid, self.tenant_id, &metadata, true)?;
let timeline = LayeredTimeline::new(
self.conf,
Arc::clone(&self.tenant_conf),
metadata,
None,
timeline_id,
timelineid,
self.tenant_id,
Arc::clone(&self.walredo_mgr),
self.upload_layers,
@@ -266,7 +257,12 @@ impl Repository for LayeredRepository {
// Insert if not exists
let timeline = Arc::new(timeline);
vacant_timeline_entry.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)));
match timelines.entry(timelineid) {
Entry::Occupied(_) => bail!("Timeline already exists"),
Entry::Vacant(vacant) => {
vacant.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)))
}
};
Ok(timeline)
}

View File

@@ -951,10 +951,7 @@ impl postgres_backend::Handler for PageServerHandler {
match self.handle_import_basebackup(pgb, tenant, timeline, base_lsn, end_lsn) {
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
Err(e) => {
error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?
}
Err(e) => pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?,
};
} else if query_string.starts_with("import wal ") {
// Import the `pg_wal` section of a basebackup.
@@ -973,10 +970,7 @@ impl postgres_backend::Handler for PageServerHandler {
match self.handle_import_wal(pgb, tenant, timeline, start_lsn, end_lsn) {
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
Err(e) => {
error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?
}
Err(e) => pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?,
};
} else if query_string.to_ascii_lowercase().starts_with("set ") {
// important because psycopg2 executes "SET datestyle TO 'ISO'"

View File

@@ -225,7 +225,7 @@ pub trait Repository: Send + Sync {
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
fn create_empty_timeline(
&self,
timeline_id: ZTimelineId,
timelineid: ZTimelineId,
initdb_lsn: Lsn,
) -> Result<Arc<Self::Timeline>>;
@@ -473,7 +473,6 @@ pub mod repo_harness {
walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
data_checksums_enabled: Some(tenant_conf.data_checksums_enabled),
}
}
}
@@ -637,19 +636,6 @@ mod tests {
Ok(())
}
#[test]
fn no_duplicate_timelines() -> Result<()> {
let repo = RepoHarness::create("no_duplicate_timelines")?.load();
let _ = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
match repo.create_empty_timeline(TIMELINE_ID, Lsn(0)) {
Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!(e.to_string(), "Timeline already exists"),
}
Ok(())
}
/// Convenience function to create a page image with given string as the only content
pub fn test_value(s: &str) -> Value {
let mut buf = BytesMut::new();

View File

@@ -44,23 +44,13 @@ where
index_part_path.display()
)
})?;
let mut index_part_download =
storage
.download(&part_storage_path)
.await
.with_context(|| {
format!("Failed to open download stream for for storage path {part_storage_path:?}")
})?;
let mut index_part_bytes = Vec::new();
io::copy(
&mut index_part_download.download_stream,
&mut index_part_bytes,
)
.await
.with_context(|| {
format!("Failed to download an index part from storage path {part_storage_path:?}")
})?;
storage
.download(&part_storage_path, &mut index_part_bytes)
.await
.with_context(|| {
format!("Failed to download an index part from storage path {part_storage_path:?}")
})?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| {
format!("Failed to deserialize index part file from storage path '{part_storage_path:?}'")
@@ -172,19 +162,15 @@ where
temp_file_path.display()
)
})?;
let mut download = storage
.download(&layer_storage_path)
storage
.download(&layer_storage_path, &mut destination_file)
.await
.with_context(|| {
format!(
"Failed to open a download stream for layer with remote storage path '{layer_storage_path:?}'"
"Failed to download a layer from storage path '{layer_storage_path:?}'"
)
})?;
io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| {
format!(
"Failed to download layer with remote storage path '{layer_storage_path:?}' into file '{}'", temp_file_path.display()
)
})?;
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations

View File

@@ -38,10 +38,6 @@ pub mod defaults {
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
// Turn off data checksums by default to do not affect old tenants.
// We turn it on explicitly for all new tenants.
pub const DEFAULT_DATA_CHECKSUMS: bool = false;
}
/// Per-tenant configuration options
@@ -87,7 +83,6 @@ pub struct TenantConf {
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
pub data_checksums_enabled: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -110,7 +105,6 @@ pub struct TenantConfOpt {
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub data_checksums_enabled: Option<bool>,
}
impl TenantConfOpt {
@@ -141,9 +135,6 @@ impl TenantConfOpt {
.lagging_wal_timeout
.unwrap_or(global_conf.lagging_wal_timeout),
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
data_checksums_enabled: self
.data_checksums_enabled
.unwrap_or(global_conf.data_checksums_enabled),
}
}
@@ -181,9 +172,6 @@ impl TenantConfOpt {
if let Some(max_lsn_wal_lag) = other.max_lsn_wal_lag {
self.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(data_checksums_enabled) = other.data_checksums_enabled {
self.data_checksums_enabled = Some(data_checksums_enabled);
}
}
}
@@ -211,7 +199,6 @@ impl TenantConf {
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
data_checksums_enabled: DEFAULT_DATA_CHECKSUMS,
}
}
@@ -242,7 +229,6 @@ impl TenantConf {
.unwrap(),
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.unwrap(),
data_checksums_enabled: defaults::DEFAULT_DATA_CHECKSUMS,
}
}
}

View File

@@ -11,7 +11,7 @@ use crate::tenant_config::TenantConfOpt;
use crate::thread_mgr::ThreadKind;
use crate::timelines::CreateRepo;
use crate::walredo::PostgresRedoManager;
use crate::{tenant_config, thread_mgr, timelines, walreceiver};
use crate::{thread_mgr, timelines, walreceiver};
use crate::{DatadirTimelineImpl, RepositoryImpl};
use anyhow::{bail, Context};
use serde::{Deserialize, Serialize};
@@ -266,14 +266,7 @@ pub fn create_tenant_repository(
Ok(None)
}
Entry::Vacant(v) => {
let data_checksums_enabled = tenant_conf
.data_checksums_enabled
.unwrap_or(tenant_config::defaults::DEFAULT_DATA_CHECKSUMS);
let wal_redo_manager = Arc::new(PostgresRedoManager::new(
conf,
data_checksums_enabled,
tenant_id,
));
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let repo = timelines::create_repo(
conf,
tenant_conf,
@@ -574,16 +567,10 @@ fn load_local_repo(
tenant_id: ZTenantId,
remote_index: &RemoteIndex,
) -> anyhow::Result<Arc<RepositoryImpl>> {
// Restore tenant config
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
let mut m = tenants_state::write_tenants();
let tenant = m.entry(tenant_id).or_insert_with(|| {
let data_checksums_enabled = tenant_conf
.data_checksums_enabled
.unwrap_or(tenant_config::defaults::DEFAULT_DATA_CHECKSUMS);
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, data_checksums_enabled, tenant_id);
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
// Set up an object repository, for actual data storage.
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
@@ -601,6 +588,8 @@ fn load_local_repo(
}
});
// Restore tenant config
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
tenant.repo.update_tenant_config(tenant_conf)?;
Ok(Arc::clone(&tenant.repo))

View File

@@ -119,6 +119,8 @@ pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
pub fn init_tenant_task_pool() -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("tenant-task-worker")
.worker_threads(40) // Way more than necessary
.max_blocking_threads(100) // Way more than necessary
.enable_all()
.build()?;

View File

@@ -253,7 +253,6 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
.args(&["-D", &initdbpath.to_string_lossy()])
.args(&["-U", &conf.superuser])
.args(&["-E", "utf8"])
.arg("--data-checksums")
.arg("--no-instructions")
// This is only used for a temporary installation that is deleted shortly after,
// so no need to fsync it

View File

@@ -24,7 +24,7 @@
use anyhow::Context;
use postgres_ffi::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{page_is_new, page_set_checksum, page_set_lsn};
use postgres_ffi::{page_is_new, page_set_lsn};
use anyhow::Result;
use bytes::{Buf, Bytes, BytesMut};
@@ -313,8 +313,6 @@ impl<'a, R: Repository> WalIngest<'a, R> {
if !page_is_new(&image) {
page_set_lsn(&mut image, lsn)
}
unsafe { page_set_checksum(&mut image, blk.blkno) };
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())?;
} else {

View File

@@ -91,6 +91,7 @@ pub fn init_wal_receiver_main_thread(
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("wal-receiver-runtime-thread")
.worker_threads(40)
.enable_all()
.on_thread_start(|| IS_WAL_RECEIVER.with(|c| c.set(true)))
.build()

View File

@@ -48,8 +48,7 @@ use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_bitshift;
use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_offset;
use postgres_ffi::nonrelfile_utils::mx_offset_to_member_offset;
use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::xlog_utils::wal_record_verify_checksum;
use postgres_ffi::{page_verify_checksum, pg_constants, XLogRecord};
use postgres_ffi::pg_constants;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
@@ -132,7 +131,6 @@ lazy_static! {
pub struct PostgresRedoManager {
tenantid: ZTenantId,
conf: &'static PageServerConf,
data_checksums_enabled: bool,
process: Mutex<Option<PostgresRedoProcess>>,
}
@@ -231,16 +229,11 @@ impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
///
pub fn new(
conf: &'static PageServerConf,
data_checksums_enabled: bool,
tenantid: ZTenantId,
) -> PostgresRedoManager {
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
tenantid,
conf,
data_checksums_enabled,
process: Mutex::new(None),
}
}
@@ -275,13 +268,7 @@ impl PostgresRedoManager {
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
let result = process
.apply_wal_records(
buf_tag,
base_img,
records,
wal_redo_timeout,
self.data_checksums_enabled,
)
.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout)
.map_err(WalRedoError::IoError);
let end_time = Instant::now();
@@ -632,7 +619,6 @@ impl PostgresRedoProcess {
info!("running initdb in {:?}", datadir.display());
let initdb = Command::new(conf.pg_bin_dir().join("initdb"))
.args(&["-D", &datadir.to_string_lossy()])
.arg("--data-checksums")
.arg("-N")
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir())
@@ -730,7 +716,6 @@ impl PostgresRedoProcess {
base_img: Option<Bytes>,
records: &[(Lsn, ZenithWalRecord)],
wal_redo_timeout: Duration,
data_checksums_enabled: bool,
) -> Result<Bytes, std::io::Error> {
// Serialize all the messages to send the WAL redo process first.
//
@@ -740,15 +725,6 @@ impl PostgresRedoProcess {
let mut writebuf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
// Checksums could be not stamped for old tenants, so check them only if they
// are enabled (this is controlled by per-tenant config).
if data_checksums_enabled && !unsafe { page_verify_checksum(&img, tag.blknum) } {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("block {} of relation {} is invalid", tag.blknum, tag.rel),
));
}
build_push_page_msg(tag, &img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
@@ -757,27 +733,6 @@ impl PostgresRedoProcess {
rec: postgres_rec,
} = rec
{
let xlogrec = XLogRecord::from_buf(postgres_rec).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"could not deserialize WAL record for relation {} at LSN {}: {}",
tag.rel, lsn, e
),
)
})?;
// WAL records always have a checksum, check it before sending to redo process.
// It doesn't do these checks itself.
if !wal_record_verify_checksum(&xlogrec, postgres_rec) {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"WAL record for relation {} at LSN {} is invalid",
tag.rel, lsn
),
));
}
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
return Err(Error::new(

View File

@@ -2,16 +2,18 @@ use anyhow::{Context, Result};
use etcd_broker::subscription_key::{
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
};
use tokio::io::AsyncRead;
use tokio::task::JoinHandle;
use std::cmp::min;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI};
use postgres_ffi::xlog_utils::{
XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, MAX_SEND_SIZE, PG_TLI,
};
use remote_storage::{GenericRemoteStorage, RemoteStorage};
use tokio::fs::File;
use tokio::runtime::Builder;
@@ -450,41 +452,45 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
pub async fn read_object(
file_path: PathBuf,
offset: u64,
) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead>>> {
let download = match REMOTE_STORAGE
.get()
.context("Failed to get remote storage")?
.as_ref()
.context("No remote storage configured")?
{
GenericRemoteStorage::Local(local_storage) => {
let source = local_storage.remote_object_id(&file_path)?;
) -> (impl AsyncRead, JoinHandle<Result<()>>) {
let storage = REMOTE_STORAGE.get().expect("failed to get remote storage");
info!(
"local download about to start from {} at offset {}",
source.display(),
offset
);
local_storage
.download_byte_range(&source, offset, None)
.await
let (mut pipe_writer, pipe_reader) = tokio::io::duplex(MAX_SEND_SIZE);
let copy_result = tokio::spawn(async move {
let res = match storage.as_ref().unwrap() {
GenericRemoteStorage::Local(local_storage) => {
let source = local_storage.remote_object_id(&file_path)?;
info!(
"local download about to start from {} at offset {}",
source.display(),
offset
);
local_storage
.download_byte_range(&source, offset, None, &mut pipe_writer)
.await
}
GenericRemoteStorage::S3(s3_storage) => {
let s3key = s3_storage.remote_object_id(&file_path)?;
info!(
"S3 download about to start from {:?} at offset {}",
s3key, offset
);
s3_storage
.download_byte_range(&s3key, offset, None, &mut pipe_writer)
.await
}
};
if let Err(e) = res {
error!("failed to download WAL segment from remote storage: {}", e);
Err(e)
} else {
Ok(())
}
GenericRemoteStorage::S3(s3_storage) => {
let s3key = s3_storage.remote_object_id(&file_path)?;
});
info!(
"S3 download about to start from {:?} at offset {}",
s3key, offset
);
s3_storage.download_byte_range(&s3key, offset, None).await
}
}
.with_context(|| {
format!(
"Failed to open WAL segment download stream for local storage path {}",
file_path.display()
)
})?;
Ok(download.download_stream)
(pipe_reader, copy_result)
}

View File

@@ -604,7 +604,8 @@ impl WalReader {
// Try to open remote file, if remote reads are enabled
if self.enable_remote_read {
return read_object(wal_file_path, xlogoff as u64).await;
let (reader, _) = read_object(wal_file_path, xlogoff as u64).await;
return Ok(Box::pin(reader));
}
bail!("WAL segment is not found")

View File

@@ -0,0 +1,222 @@
#
# Simple script to export nodes from one pageserver
# and import them into another page server
#
from os import path
import os
import requests
import uuid
import subprocess
import argparse
from pathlib import Path
# directory to save exported tar files to
basepath = path.dirname(path.abspath(__file__))
class NeonPageserverApiException(Exception):
pass
class NeonPageserverHttpClient(requests.Session):
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
def verbose_error(self, res: requests.Response):
try:
res.raise_for_status()
except requests.RequestException as e:
try:
msg = res.json()['msg']
except:
msg = ''
raise NeonPageserverApiException(msg) from e
def check_status(self):
self.get(f"http://{self.host}:{self.port}/v1/status").raise_for_status()
def tenant_list(self):
res = self.get(f"http://{self.host}:{self.port}/v1/tenant")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def tenant_create(self, new_tenant_id: uuid.UUID, ok_if_exists):
res = self.post(
f"http://{self.host}:{self.port}/v1/tenant",
json={
'new_tenant_id': new_tenant_id.hex,
},
)
if res.status_code == 409:
if ok_if_exists:
print(f'could not create tenant: already exists for id {new_tenant_id}')
else:
res.raise_for_status()
elif res.status_code == 201:
print(f'created tenant {new_tenant_id}')
else:
self.verbose_error(res)
return new_tenant_id
def timeline_list(self, tenant_id: uuid.UUID):
res = self.get(f"http://{self.host}:{self.port}/v1/tenant/{tenant_id.hex}/timeline")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def main(args: argparse.Namespace):
old_pageserver_host = args.old_pageserver_host
new_pageserver_host = args.new_pageserver_host
tenants = args.tenants
old_http_client = NeonPageserverHttpClient(old_pageserver_host, args.old_pageserver_http_port)
old_http_client.check_status()
old_pageserver_connstr = f"postgresql://{old_pageserver_host}:{args.old_pageserver_pg_port}"
new_http_client = NeonPageserverHttpClient(new_pageserver_host, args.new_pageserver_http_port)
new_http_client.check_status()
new_pageserver_connstr = f"postgresql://{new_pageserver_host}:{args.new_pageserver_pg_port}"
psql_env = {**os.environ, 'LD_LIBRARY_PATH': '/usr/local/lib/'}
for tenant_id in tenants:
print(f"Tenant: {tenant_id}")
timelines = old_http_client.timeline_list(uuid.UUID(tenant_id))
print(f"Timelines: {timelines}")
# Create tenant in new pageserver
if args.only_import is False:
new_http_client.tenant_create(uuid.UUID(tenant_id), args.ok_if_exists)
for timeline in timelines:
# Export timelines from old pageserver
if args.only_import is False:
query = f"fullbackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']}"
cmd = ["psql", "--no-psqlrc", old_pageserver_connstr, "-c", query]
print(f"Running: {cmd}")
tar_filename = path.join(basepath,
f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar")
stderr_filename = path.join(
basepath, f"{timeline['tenant_id']}_{timeline['timeline_id']}.stderr")
with open(tar_filename, 'w') as stdout_f:
with open(stderr_filename, 'w') as stderr_f:
print(f"(capturing output to {tar_filename})")
subprocess.run(cmd, stdout=stdout_f, stderr=stderr_f, env=psql_env)
print(f"Done export: {tar_filename}")
# Import timelines to new pageserver
psql_path = Path(args.psql_path)
import_cmd = f"import basebackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']} {timeline['local']['last_record_lsn']}"
tar_filename = path.join(basepath,
f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar")
full_cmd = rf"""cat {tar_filename} | {psql_path} {new_pageserver_connstr} -c '{import_cmd}' """
stderr_filename2 = path.join(
basepath, f"import_{timeline['tenant_id']}_{timeline['timeline_id']}.stderr")
stdout_filename = path.join(
basepath, f"import_{timeline['tenant_id']}_{timeline['timeline_id']}.stdout")
print(f"Running: {full_cmd}")
with open(stdout_filename, 'w') as stdout_f:
with open(stderr_filename2, 'w') as stderr_f:
print(f"(capturing output to {stdout_filename})")
subprocess.run(full_cmd,
stdout=stdout_f,
stderr=stderr_f,
env=psql_env,
shell=True)
print(f"Done import")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--tenant-id',
dest='tenants',
required=True,
nargs='+',
help='Id of the tenant to migrate. You can pass multiple arguments',
)
parser.add_argument(
'--from-host',
dest='old_pageserver_host',
required=True,
help='Host of the pageserver to migrate data from',
)
parser.add_argument(
'--from-http-port',
dest='old_pageserver_http_port',
required=False,
type=int,
default=9898,
help='HTTP port of the pageserver to migrate data from. Default: 9898',
)
parser.add_argument(
'--from-pg-port',
dest='old_pageserver_pg_port',
required=False,
type=int,
default=6400,
help='pg port of the pageserver to migrate data from. Default: 6400',
)
parser.add_argument(
'--to-host',
dest='new_pageserver_host',
required=True,
help='Host of the pageserver to migrate data to',
)
parser.add_argument(
'--to-http-port',
dest='new_pageserver_http_port',
required=False,
default=9898,
type=int,
help='HTTP port of the pageserver to migrate data to. Default: 9898',
)
parser.add_argument(
'--to-pg-port',
dest='new_pageserver_pg_port',
required=False,
default=6400,
type=int,
help='pg port of the pageserver to migrate data to. Default: 6400',
)
parser.add_argument(
'--ignore-tenant-exists',
dest='ok_if_exists',
required=False,
help=
'Ignore error if we are trying to create the tenant that already exists. It can be dangerous if existing tenant already contains some data.',
)
parser.add_argument(
'--psql-path',
dest='psql_path',
required=False,
default='/usr/local/bin/psql',
help='Path to the psql binary. Default: /usr/local/bin/psql',
)
parser.add_argument(
'--only-import',
dest='only_import',
required=False,
default=False,
action='store_true',
help='Skip export and tenant creation part',
)
args = parser.parse_args()
main(args)

View File

@@ -37,7 +37,7 @@ You can run all the tests with:
If you want to run all the tests in a particular file:
`./scripts/pytest test_runner/batch_others/test_restart_compute.py`
`./scripts/pytest test_pgbench.py`
If you want to run all tests that have the string "bench" in their names:

View File

@@ -0,0 +1,74 @@
import pytest
from contextlib import closing
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.log_helper import log
#
# Test restarting and recreating a postgres instance
#
@pytest.mark.parametrize('with_safekeepers', [False, True])
def test_restart_compute(neon_env_builder: NeonEnvBuilder, with_safekeepers: bool):
neon_env_builder.auth_enabled = True
if with_safekeepers:
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_restart_compute')
pg = env.postgres.create_start('test_restart_compute')
log.info("postgres is running on 'test_restart_compute' branch")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute('SELECT sum(key) FROM t')
r = cur.fetchone()
assert r == (5000050000, )
log.info(f"res = {r}")
# Remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the row
cur.execute('SELECT sum(key) FROM t')
r = cur.fetchone()
assert r == (5000050000, )
log.info(f"res = {r}")
# Insert another row
cur.execute("INSERT INTO t VALUES (100001, 'payload2')")
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
log.info(f"res = {r}")
# Again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute')
# That select causes lots of FPI's and increases probability of wakeepers
# lagging behind after query completion
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the rows
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
log.info(f"res = {r}")
# And again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the rows
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
log.info(f"res = {r}")

View File

@@ -10,8 +10,8 @@ from typing import Optional
import signal
import pytest
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir, base_dir
from fixtures.utils import lsn_from_hex, subprocess_capture
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir
from fixtures.utils import lsn_from_hex
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
@@ -101,23 +101,13 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
log.info('load thread stopped')
@pytest.mark.parametrize(
'method',
[
# A minor migration involves no storage breaking changes.
# It is done by attaching the tenant to a new pageserver.
'minor',
# A major migration involves exporting a postgres datadir
# basebackup and importing it into the new pageserver.
# This kind of migration can tolerate breaking changes
# to storage format
pytest.param('major', marks=pytest.mark.xfail(reason="Not implemented")),
])
@pytest.mark.skip(
reason=
"needs to replace callmemaybe call with better idea how to migrate timelines between pageservers"
)
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
port_distributor: PortDistributor,
test_output_dir,
method: str,
with_load: str):
neon_env_builder.enable_local_fs_remote_storage()
@@ -167,11 +157,8 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
load_stop_event = threading.Event()
load_ok_event = threading.Event()
load_thread = threading.Thread(
target=load,
args=(tenant_pg, load_stop_event, load_ok_event),
daemon=True, # To make sure the child dies when the parent errors
)
load_thread = threading.Thread(target=load,
args=(tenant_pg, load_stop_event, load_ok_event))
load_thread.start()
# run checkpoint manually to be sure that data landed in remote storage
@@ -201,47 +188,30 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
new_pageserver_http_port,
neon_env_builder.broker):
# Migrate either by attaching from s3 or import/export basebackup
if method == "major":
cmd = [
"python",
os.path.join(base_dir, "scripts/export_import_between_pageservers.py"),
"--tenant-id",
tenant.hex,
"--from-host",
"localhost",
"--from-http-port",
str(pageserver_http.port),
"--from-pg-port",
str(env.pageserver.service_port.pg),
"--to-host",
"localhost",
"--to-http-port",
str(new_pageserver_http_port),
"--to-pg-port",
str(new_pageserver_pg_port),
"--psql-path",
os.path.join(pg_distrib_dir, "bin", "psql"),
"--work-dir",
os.path.join(test_output_dir),
]
subprocess_capture(str(env.repo_dir), cmd, check=True)
elif method == "minor":
# call to attach timeline to new pageserver
new_pageserver_http.timeline_attach(tenant, timeline)
# call to attach timeline to new pageserver
new_pageserver_http.timeline_attach(tenant, timeline)
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_until(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_until(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
# when load is active these checks can break because lsns are not static
# so lets check with some margin
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
0.03)
# when load is active these checks can break because lsns are not static
# so lets check with some margin
assert_abs_margin_ratio(
lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
0.03)
# callmemaybe to start replication from safekeeper to the new pageserver
# when there is no load there is a clean checkpoint and no wal delta
# needs to be streamed to the new pageserver
# TODO (rodionov) use attach to start replication
with pg_cur(PgProtocol(host='localhost', port=new_pageserver_pg_port)) as cur:
# "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'"
safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
cur.execute("callmemaybe {} {} {}".format(tenant.hex,
timeline.hex,
safekeeper_connstring))
tenant_pg.stop()

View File

@@ -682,7 +682,7 @@ class ProposerPostgres(PgProtocol):
def initdb(self):
""" Run initdb """
args = ["initdb", "-U", "cloud_admin", "-D", self.pg_data_dir_path(), "--data-checksums"]
args = ["initdb", "-U", "cloud_admin", "-D", self.pg_data_dir_path()]
self.pg_bin.run(args)
def start(self):

View File

@@ -1,6 +1,5 @@
import asyncio
import uuid
import asyncpg
import random
import time
@@ -8,7 +7,7 @@ import time
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, Safekeeper
from fixtures.log_helper import getLogger
from fixtures.utils import lsn_from_hex, lsn_to_hex
from typing import List, Optional
from typing import List
log = getLogger('root.safekeeper_async')
@@ -235,156 +234,3 @@ def test_restarts_frequent_checkpoints(neon_env_builder: NeonEnvBuilder):
# we try to simulate large (flush_lsn - truncate_lsn) lag, to test that WAL segments
# are not removed before broadcasted to all safekeepers, with the help of replication slot
asyncio.run(run_restarts_under_load(env, pg, env.safekeepers, period_time=15, iterations=5))
def postgres_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]):
pg = Postgres(
env,
tenant_id=env.initial_tenant,
port=env.port_distributor.get_port(),
# In these tests compute has high probability of terminating on its own
# before our stop() due to lost consensus leadership.
check_stop_result=False)
# embed current time in node name
node_name = pgdir_name or f'pg_node_{time.time()}'
return pg.create_start(branch_name=branch,
node_name=node_name,
config_lines=['log_statement=all'])
async def exec_compute_query(env: NeonEnv,
branch: str,
query: str,
pgdir_name: Optional[str] = None):
with postgres_create_start(env, branch=branch, pgdir_name=pgdir_name) as pg:
before_conn = time.time()
conn = await pg.connect_async()
res = await conn.fetch(query)
await conn.close()
after_conn = time.time()
log.info(f'{query} took {after_conn - before_conn}s')
return res
async def run_compute_restarts(env: NeonEnv,
queries=16,
batch_insert=10000,
branch='test_compute_restarts'):
cnt = 0
sum = 0
await exec_compute_query(env, branch, 'CREATE TABLE t (i int)')
for i in range(queries):
if i % 4 == 0:
await exec_compute_query(
env, branch, f'INSERT INTO t SELECT 1 FROM generate_series(1, {batch_insert})')
sum += batch_insert
cnt += batch_insert
elif (i % 4 == 1) or (i % 4 == 3):
# Note that select causes lots of FPI's and increases probability of safekeepers
# standing at different LSNs after compute termination.
actual_sum = (await exec_compute_query(env, branch, 'SELECT SUM(i) FROM t'))[0][0]
assert actual_sum == sum, f'Expected sum={sum}, actual={actual_sum}'
elif i % 4 == 2:
await exec_compute_query(env, branch, 'UPDATE t SET i = i + 1')
sum += cnt
# Add a test which creates compute for every query, and then destroys it right after.
def test_compute_restarts(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_compute_restarts')
asyncio.run(run_compute_restarts(env))
class BackgroundCompute(object):
def __init__(self, index: int, env: NeonEnv, branch: str):
self.index = index
self.env = env
self.branch = branch
self.running = False
self.stopped = False
self.total_tries = 0
self.successful_queries: List[int] = []
async def run(self):
if self.running:
raise Exception('BackgroundCompute is already running')
self.running = True
i = 0
while not self.stopped:
try:
verify_key = (self.index << 16) + i
i += 1
self.total_tries += 1
res = await exec_compute_query(
self.env,
self.branch,
f'INSERT INTO query_log(index, verify_key) VALUES ({self.index}, {verify_key}) RETURNING verify_key',
pgdir_name=f'bgcompute{self.index}_key{verify_key}',
)
log.info(f'result: {res}')
if len(res) != 1:
raise Exception('No result returned')
if res[0][0] != verify_key:
raise Exception('Wrong result returned')
self.successful_queries.append(verify_key)
except Exception as e:
log.info(f'BackgroundCompute {self.index} query failed: {e}')
# With less sleep, there is a very big chance of not committing
# anything or only 1 xact during test run.
await asyncio.sleep(2 * random.random())
self.running = False
async def run_concurrent_computes(env: NeonEnv,
num_computes=10,
run_seconds=20,
branch='test_concurrent_computes'):
await exec_compute_query(
env,
branch,
'CREATE TABLE query_log (t timestamp default now(), index int, verify_key int)')
computes = [BackgroundCompute(i, env, branch) for i in range(num_computes)]
background_tasks = [asyncio.create_task(compute.run()) for compute in computes]
await asyncio.sleep(run_seconds)
for compute in computes[1:]:
compute.stopped = True
log.info("stopped all tasks but one")
# work for some time with only one compute -- it should be able to make some xacts
await asyncio.sleep(8)
computes[0].stopped = True
await asyncio.gather(*background_tasks)
result = await exec_compute_query(env, branch, 'SELECT * FROM query_log')
# we should have inserted something while single compute was running
assert len(result) >= 4
log.info(f'Executed {len(result)} queries')
for row in result:
log.info(f'{row[0]} {row[1]} {row[2]}')
# ensure everything reported as committed wasn't lost
for compute in computes:
for verify_key in compute.successful_queries:
assert verify_key in [row[2] for row in result]
# Run multiple computes concurrently, creating-destroying them after single
# query. Ensure we don't lose any xacts reported as committed and be able to
# progress once only one compute remains.
def test_concurrent_computes(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_concurrent_computes')
asyncio.run(run_concurrent_computes(env))

View File

@@ -1160,7 +1160,6 @@ class NeonCli:
node_name: str,
tenant_id: Optional[uuid.UUID] = None,
destroy=False,
check_return_code=True,
) -> 'subprocess.CompletedProcess[str]':
args = [
'pg',
@@ -1173,7 +1172,7 @@ class NeonCli:
if node_name is not None:
args.append(node_name)
return self.raw_cli(args, check_return_code=check_return_code)
return self.raw_cli(args)
def raw_cli(self,
arguments: List[str],
@@ -1189,8 +1188,6 @@ class NeonCli:
>>> result = env.neon_cli.raw_cli(...)
>>> assert result.stderr == ""
>>> log.info(result.stdout)
If `check_return_code`, on non-zero exit code logs failure and raises.
"""
assert type(arguments) == list
@@ -1216,27 +1213,27 @@ class NeonCli:
env_vars[var] = val
# Intercept CalledProcessError and print more info
res = subprocess.run(args,
env=env_vars,
check=False,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if not res.returncode:
try:
res = subprocess.run(args,
env=env_vars,
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
log.info(f"Run success: {res.stdout}")
elif check_return_code:
except subprocess.CalledProcessError as exc:
# this way command output will be in recorded and shown in CI in failure message
msg = f"""\
Run {res.args} failed:
stdout: {res.stdout}
stderr: {res.stderr}
Run failed: {exc}
stdout: {exc.stdout}
stderr: {exc.stderr}
"""
log.info(msg)
raise Exception(msg) from subprocess.CalledProcessError(res.returncode,
res.args,
res.stdout,
res.stderr)
raise Exception(msg) from exc
if check_return_code:
res.check_returncode()
return res
@@ -1529,11 +1526,7 @@ def static_proxy(vanilla_pg, port_distributor) -> Iterator[NeonProxy]:
class Postgres(PgProtocol):
""" An object representing a running postgres daemon. """
def __init__(self,
env: NeonEnv,
tenant_id: uuid.UUID,
port: int,
check_stop_result: bool = True):
def __init__(self, env: NeonEnv, tenant_id: uuid.UUID, port: int):
super().__init__(host='localhost', port=port, user='cloud_admin', dbname='postgres')
self.env = env
self.running = False
@@ -1541,7 +1534,6 @@ class Postgres(PgProtocol):
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id
self.port = port
self.check_stop_result = check_stop_result
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<node_name>/postgresql.conf
def create(
@@ -1593,6 +1585,8 @@ class Postgres(PgProtocol):
port=self.port)
self.running = True
log.info(f"stdout: {run_result.stdout}")
return self
def pg_data_dir_path(self) -> str:
@@ -1656,9 +1650,7 @@ class Postgres(PgProtocol):
if self.running:
assert self.node_name is not None
self.env.neon_cli.pg_stop(self.node_name,
self.tenant_id,
check_return_code=self.check_stop_result)
self.env.neon_cli.pg_stop(self.node_name, self.tenant_id)
self.running = False
return self
@@ -1670,10 +1662,7 @@ class Postgres(PgProtocol):
"""
assert self.node_name is not None
self.env.neon_cli.pg_stop(self.node_name,
self.tenant_id,
True,
check_return_code=self.check_stop_result)
self.env.neon_cli.pg_stop(self.node_name, self.tenant_id, True)
self.node_name = None
self.running = False
@@ -1692,8 +1681,6 @@ class Postgres(PgProtocol):
Returns self.
"""
started_at = time.time()
self.create(
branch_name=branch_name,
node_name=node_name,
@@ -1701,8 +1688,6 @@ class Postgres(PgProtocol):
lsn=lsn,
).start()
log.info(f"Postgres startup took {time.time() - started_at} seconds")
return self
def __enter__(self):