mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-20 03:42:55 +00:00
Compare commits
8 Commits
projects_m
...
projects-m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f7efbb2d42 | ||
|
|
0aff7c9ee9 | ||
|
|
4a2a55d9b2 | ||
|
|
50821c0a3c | ||
|
|
68adfe0fc8 | ||
|
|
cfdf79aceb | ||
|
|
99a0a5a19b | ||
|
|
32560e75d2 |
@@ -12,6 +12,7 @@ pageservers
|
||||
safekeepers
|
||||
|
||||
[storage:vars]
|
||||
env_name = neon-stress
|
||||
console_mgmt_base_url = http://neon-stress-console.local
|
||||
bucket_name = neon-storage-ireland
|
||||
bucket_region = eu-west-1
|
||||
|
||||
@@ -12,8 +12,10 @@ use std::{
|
||||
borrow::Cow,
|
||||
collections::HashMap,
|
||||
ffi::OsStr,
|
||||
fmt::Debug,
|
||||
num::{NonZeroU32, NonZeroUsize},
|
||||
path::{Path, PathBuf},
|
||||
pin::Pin,
|
||||
};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
@@ -70,11 +72,7 @@ pub trait RemoteStorage: Send + Sync {
|
||||
|
||||
/// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
|
||||
/// Returns the metadata, if any was stored with the file previously.
|
||||
async fn download(
|
||||
&self,
|
||||
from: &Self::RemoteObjectId,
|
||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
||||
) -> anyhow::Result<Option<StorageMetadata>>;
|
||||
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError>;
|
||||
|
||||
/// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
|
||||
/// Returns the metadata, if any was stored with the file previously.
|
||||
@@ -83,12 +81,49 @@ pub trait RemoteStorage: Send + Sync {
|
||||
from: &Self::RemoteObjectId,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
||||
) -> anyhow::Result<Option<StorageMetadata>>;
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
pub struct Download {
|
||||
pub download_stream: Pin<Box<dyn io::AsyncRead + Unpin + Send>>,
|
||||
/// Extra key-value data, associated with the current remote file.
|
||||
pub metadata: Option<StorageMetadata>,
|
||||
}
|
||||
|
||||
impl Debug for Download {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Download")
|
||||
.field("metadata", &self.metadata)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DownloadError {
|
||||
/// Validation or other error happened due to user input.
|
||||
BadInput(anyhow::Error),
|
||||
/// The file was not found in the remote storage.
|
||||
NotFound,
|
||||
/// The file was found in the remote storage, but the download failed.
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for DownloadError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
DownloadError::BadInput(e) => {
|
||||
write!(f, "Failed to download a remote file due to user input: {e}")
|
||||
}
|
||||
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
|
||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for DownloadError {}
|
||||
|
||||
/// Every storage, currently supported.
|
||||
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
|
||||
pub enum GenericRemoteStorage {
|
||||
@@ -180,7 +215,7 @@ pub struct S3Config {
|
||||
pub concurrency_limit: NonZeroUsize,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for S3Config {
|
||||
impl Debug for S3Config {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("S3Config")
|
||||
.field("bucket_name", &self.bucket_name)
|
||||
|
||||
@@ -17,7 +17,7 @@ use tokio::{
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
use crate::path_with_suffix_extension;
|
||||
use crate::{path_with_suffix_extension, Download, DownloadError};
|
||||
|
||||
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
|
||||
|
||||
@@ -192,15 +192,12 @@ impl RemoteStorage for LocalFs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download(
|
||||
&self,
|
||||
from: &Self::RemoteObjectId,
|
||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
||||
) -> anyhow::Result<Option<StorageMetadata>> {
|
||||
let file_path = self.resolve_in_storage(from)?;
|
||||
|
||||
if file_path.exists() && file_path.is_file() {
|
||||
let mut source = io::BufReader::new(
|
||||
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
|
||||
let file_path = self
|
||||
.resolve_in_storage(from)
|
||||
.map_err(DownloadError::BadInput)?;
|
||||
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
|
||||
let source = io::BufReader::new(
|
||||
fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(&file_path)
|
||||
@@ -210,22 +207,20 @@ impl RemoteStorage for LocalFs {
|
||||
"Failed to open source file '{}' to use in the download",
|
||||
file_path.display()
|
||||
)
|
||||
})?,
|
||||
})
|
||||
.map_err(DownloadError::Other)?,
|
||||
);
|
||||
io::copy(&mut source, to).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to download file '{}' from the local storage",
|
||||
file_path.display()
|
||||
)
|
||||
})?;
|
||||
source.flush().await?;
|
||||
|
||||
self.read_storage_metadata(&file_path).await
|
||||
let metadata = self
|
||||
.read_storage_metadata(&file_path)
|
||||
.await
|
||||
.map_err(DownloadError::Other)?;
|
||||
Ok(Download {
|
||||
metadata,
|
||||
download_stream: Box::pin(source),
|
||||
})
|
||||
} else {
|
||||
bail!(
|
||||
"File '{}' either does not exist or is not a file",
|
||||
file_path.display()
|
||||
)
|
||||
Err(DownloadError::NotFound)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,22 +229,19 @@ impl RemoteStorage for LocalFs {
|
||||
from: &Self::RemoteObjectId,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
||||
) -> anyhow::Result<Option<StorageMetadata>> {
|
||||
) -> Result<Download, DownloadError> {
|
||||
if let Some(end_exclusive) = end_exclusive {
|
||||
ensure!(
|
||||
end_exclusive > start_inclusive,
|
||||
"Invalid range, start ({}) is bigger then end ({:?})",
|
||||
start_inclusive,
|
||||
end_exclusive
|
||||
);
|
||||
if end_exclusive <= start_inclusive {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
|
||||
};
|
||||
if start_inclusive == end_exclusive.saturating_sub(1) {
|
||||
return Ok(None);
|
||||
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
|
||||
}
|
||||
}
|
||||
let file_path = self.resolve_in_storage(from)?;
|
||||
|
||||
if file_path.exists() && file_path.is_file() {
|
||||
let file_path = self
|
||||
.resolve_in_storage(from)
|
||||
.map_err(DownloadError::BadInput)?;
|
||||
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
|
||||
let mut source = io::BufReader::new(
|
||||
fs::OpenOptions::new()
|
||||
.read(true)
|
||||
@@ -260,31 +252,31 @@ impl RemoteStorage for LocalFs {
|
||||
"Failed to open source file '{}' to use in the download",
|
||||
file_path.display()
|
||||
)
|
||||
})?,
|
||||
})
|
||||
.map_err(DownloadError::Other)?,
|
||||
);
|
||||
source
|
||||
.seek(io::SeekFrom::Start(start_inclusive))
|
||||
.await
|
||||
.context("Failed to seek to the range start in a local storage file")?;
|
||||
match end_exclusive {
|
||||
Some(end_exclusive) => {
|
||||
io::copy(&mut source.take(end_exclusive - start_inclusive), to).await
|
||||
}
|
||||
None => io::copy(&mut source, to).await,
|
||||
}
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to download file '{}' range from the local storage",
|
||||
file_path.display()
|
||||
)
|
||||
})?;
|
||||
.context("Failed to seek to the range start in a local storage file")
|
||||
.map_err(DownloadError::Other)?;
|
||||
let metadata = self
|
||||
.read_storage_metadata(&file_path)
|
||||
.await
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
self.read_storage_metadata(&file_path).await
|
||||
Ok(match end_exclusive {
|
||||
Some(end_exclusive) => Download {
|
||||
metadata,
|
||||
download_stream: Box::pin(source.take(end_exclusive - start_inclusive)),
|
||||
},
|
||||
None => Download {
|
||||
metadata,
|
||||
download_stream: Box::pin(source),
|
||||
},
|
||||
})
|
||||
} else {
|
||||
bail!(
|
||||
"File '{}' either does not exist or is not a file",
|
||||
file_path.display()
|
||||
)
|
||||
Err(DownloadError::NotFound)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -352,6 +344,19 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn file_exists(file_path: &Path) -> anyhow::Result<bool> {
|
||||
if file_path.exists() {
|
||||
ensure!(
|
||||
file_path.is_file(),
|
||||
"file path '{}' is not a file",
|
||||
file_path.display()
|
||||
);
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod pure_tests {
|
||||
use tempfile::tempdir;
|
||||
@@ -518,6 +523,31 @@ mod fs_tests {
|
||||
use std::{collections::HashMap, io::Write};
|
||||
use tempfile::tempdir;
|
||||
|
||||
async fn read_and_assert_remote_file_contents(
|
||||
storage: &LocalFs,
|
||||
#[allow(clippy::ptr_arg)]
|
||||
// have to use &PathBuf due to `storage.local_path` parameter requirements
|
||||
remote_storage_path: &PathBuf,
|
||||
expected_metadata: Option<&StorageMetadata>,
|
||||
) -> anyhow::Result<String> {
|
||||
let mut download = storage
|
||||
.download(remote_storage_path)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
|
||||
ensure!(
|
||||
download.metadata.as_ref() == expected_metadata,
|
||||
"Unexpected metadata returned for the downloaded file"
|
||||
);
|
||||
|
||||
let mut contents = String::new();
|
||||
download
|
||||
.download_stream
|
||||
.read_to_string(&mut contents)
|
||||
.await
|
||||
.context("Failed to read remote file contents into string")?;
|
||||
Ok(contents)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn upload_file() -> anyhow::Result<()> {
|
||||
let workdir = tempdir()?.path().to_owned();
|
||||
@@ -568,15 +598,7 @@ mod fs_tests {
|
||||
let upload_name = "upload_1";
|
||||
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
|
||||
|
||||
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let metadata = storage.download(&upload_target, &mut content_bytes).await?;
|
||||
assert!(
|
||||
metadata.is_none(),
|
||||
"No metadata should be returned for no metadata upload"
|
||||
);
|
||||
|
||||
content_bytes.flush().await?;
|
||||
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
|
||||
let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
|
||||
assert_eq!(
|
||||
dummy_contents(upload_name),
|
||||
contents,
|
||||
@@ -584,13 +606,9 @@ mod fs_tests {
|
||||
);
|
||||
|
||||
let non_existing_path = PathBuf::from("somewhere").join("else");
|
||||
match storage.download(&non_existing_path, &mut io::sink()).await {
|
||||
Ok(_) => panic!("Should not allow downloading non-existing storage files"),
|
||||
Err(e) => {
|
||||
let error_string = e.to_string();
|
||||
assert!(error_string.contains("does not exist"));
|
||||
assert!(error_string.contains(&non_existing_path.display().to_string()));
|
||||
}
|
||||
match storage.download(&non_existing_path).await {
|
||||
Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
|
||||
other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -603,58 +621,31 @@ mod fs_tests {
|
||||
let upload_name = "upload_1";
|
||||
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
|
||||
|
||||
let mut full_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let metadata = storage
|
||||
.download_byte_range(&upload_target, 0, None, &mut full_range_bytes)
|
||||
.await?;
|
||||
assert!(
|
||||
metadata.is_none(),
|
||||
"No metadata should be returned for no metadata upload"
|
||||
);
|
||||
full_range_bytes.flush().await?;
|
||||
let full_range_download_contents =
|
||||
read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
|
||||
assert_eq!(
|
||||
dummy_contents(upload_name),
|
||||
String::from_utf8(full_range_bytes.into_inner().into_inner())?,
|
||||
full_range_download_contents,
|
||||
"Download full range should return the whole upload"
|
||||
);
|
||||
|
||||
let mut zero_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let same_byte = 1_000_000_000;
|
||||
let metadata = storage
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
same_byte,
|
||||
Some(same_byte + 1), // exclusive end
|
||||
&mut zero_range_bytes,
|
||||
)
|
||||
.await?;
|
||||
assert!(
|
||||
metadata.is_none(),
|
||||
"No metadata should be returned for no metadata upload"
|
||||
);
|
||||
zero_range_bytes.flush().await?;
|
||||
assert!(
|
||||
zero_range_bytes.into_inner().into_inner().is_empty(),
|
||||
"Zero byte range should not download any part of the file"
|
||||
);
|
||||
|
||||
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
|
||||
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
|
||||
|
||||
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let metadata = storage
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
0,
|
||||
Some(first_part_local.len() as u64),
|
||||
&mut first_part_remote,
|
||||
)
|
||||
let mut first_part_download = storage
|
||||
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
|
||||
.await?;
|
||||
assert!(
|
||||
metadata.is_none(),
|
||||
first_part_download.metadata.is_none(),
|
||||
"No metadata should be returned for no metadata upload"
|
||||
);
|
||||
|
||||
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
io::copy(
|
||||
&mut first_part_download.download_stream,
|
||||
&mut first_part_remote,
|
||||
)
|
||||
.await?;
|
||||
first_part_remote.flush().await?;
|
||||
let first_part_remote = first_part_remote.into_inner().into_inner();
|
||||
assert_eq!(
|
||||
@@ -663,20 +654,24 @@ mod fs_tests {
|
||||
"First part bytes should be returned when requested"
|
||||
);
|
||||
|
||||
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let metadata = storage
|
||||
let mut second_part_download = storage
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
first_part_local.len() as u64,
|
||||
Some((first_part_local.len() + second_part_local.len()) as u64),
|
||||
&mut second_part_remote,
|
||||
)
|
||||
.await?;
|
||||
assert!(
|
||||
metadata.is_none(),
|
||||
second_part_download.metadata.is_none(),
|
||||
"No metadata should be returned for no metadata upload"
|
||||
);
|
||||
|
||||
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
io::copy(
|
||||
&mut second_part_download.download_stream,
|
||||
&mut second_part_remote,
|
||||
)
|
||||
.await?;
|
||||
second_part_remote.flush().await?;
|
||||
let second_part_remote = second_part_remote.into_inner().into_inner();
|
||||
assert_eq!(
|
||||
@@ -696,11 +691,30 @@ mod fs_tests {
|
||||
let upload_name = "upload_1";
|
||||
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
|
||||
|
||||
let start = 1_000_000_000;
|
||||
let end = start + 1;
|
||||
match storage
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
start,
|
||||
Some(end), // exclusive end
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("Should not allow downloading wrong ranges"),
|
||||
Err(e) => {
|
||||
let error_string = e.to_string();
|
||||
assert!(error_string.contains("zero bytes"));
|
||||
assert!(error_string.contains(&start.to_string()));
|
||||
assert!(error_string.contains(&end.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let start = 10000;
|
||||
let end = 234;
|
||||
assert!(start > end, "Should test an incorrect range");
|
||||
match storage
|
||||
.download_byte_range(&upload_target, start, Some(end), &mut io::sink())
|
||||
.download_byte_range(&upload_target, start, Some(end))
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("Should not allow downloading wrong ranges"),
|
||||
@@ -712,18 +726,6 @@ mod fs_tests {
|
||||
}
|
||||
}
|
||||
|
||||
let non_existing_path = PathBuf::from("somewhere").join("else");
|
||||
match storage
|
||||
.download_byte_range(&non_existing_path, 1, Some(3), &mut io::sink())
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("Should not allow downloading non-existing storage file ranges"),
|
||||
Err(e) => {
|
||||
let error_string = e.to_string();
|
||||
assert!(error_string.contains("does not exist"));
|
||||
assert!(error_string.contains(&non_existing_path.display().to_string()));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -762,35 +764,26 @@ mod fs_tests {
|
||||
let upload_target =
|
||||
upload_dummy_file(&workdir, &storage, upload_name, Some(metadata.clone())).await?;
|
||||
|
||||
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let full_download_metadata = storage.download(&upload_target, &mut content_bytes).await?;
|
||||
|
||||
content_bytes.flush().await?;
|
||||
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
|
||||
let full_range_download_contents =
|
||||
read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
|
||||
assert_eq!(
|
||||
dummy_contents(upload_name),
|
||||
contents,
|
||||
full_range_download_contents,
|
||||
"We should upload and download the same contents"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
full_download_metadata.as_ref(),
|
||||
Some(&metadata),
|
||||
"We should get the same metadata back for full download"
|
||||
);
|
||||
|
||||
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
|
||||
let (first_part_local, _) = uploaded_bytes.split_at(3);
|
||||
|
||||
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
let partial_download_metadata = storage
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
0,
|
||||
Some(first_part_local.len() as u64),
|
||||
&mut first_part_remote,
|
||||
)
|
||||
let mut partial_download_with_metadata = storage
|
||||
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
|
||||
.await?;
|
||||
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
io::copy(
|
||||
&mut partial_download_with_metadata.download_stream,
|
||||
&mut first_part_remote,
|
||||
)
|
||||
.await?;
|
||||
first_part_remote.flush().await?;
|
||||
let first_part_remote = first_part_remote.into_inner().into_inner();
|
||||
assert_eq!(
|
||||
@@ -800,8 +793,8 @@ mod fs_tests {
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
partial_download_metadata.as_ref(),
|
||||
Some(&metadata),
|
||||
partial_download_with_metadata.metadata,
|
||||
Some(metadata),
|
||||
"We should get the same metadata back for partial download"
|
||||
);
|
||||
|
||||
@@ -843,7 +836,7 @@ mod fs_tests {
|
||||
}
|
||||
|
||||
fn dummy_contents(name: &str) -> String {
|
||||
format!("contents for {}", name)
|
||||
format!("contents for {name}")
|
||||
}
|
||||
|
||||
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<PathBuf>> {
|
||||
|
||||
@@ -9,17 +9,17 @@ use std::path::{Path, PathBuf};
|
||||
use anyhow::Context;
|
||||
use rusoto_core::{
|
||||
credential::{InstanceMetadataProvider, StaticProvider},
|
||||
HttpClient, Region,
|
||||
HttpClient, Region, RusotoError,
|
||||
};
|
||||
use rusoto_s3::{
|
||||
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client,
|
||||
StreamingBody, S3,
|
||||
DeleteObjectRequest, GetObjectError, GetObjectRequest, ListObjectsV2Request, PutObjectRequest,
|
||||
S3Client, StreamingBody, S3,
|
||||
};
|
||||
use tokio::{io, sync::Semaphore};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{strip_path_prefix, RemoteStorage, S3Config};
|
||||
use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config};
|
||||
|
||||
use super::StorageMetadata;
|
||||
|
||||
@@ -187,6 +187,39 @@ impl S3Bucket {
|
||||
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
|
||||
})
|
||||
}
|
||||
|
||||
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 download")
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
metrics::inc_get_object();
|
||||
|
||||
match self.client.get_object(request).await {
|
||||
Ok(object_output) => match object_output.body {
|
||||
None => {
|
||||
metrics::inc_get_object_fail();
|
||||
Err(DownloadError::Other(anyhow::anyhow!(
|
||||
"Got no body for the S3 object given"
|
||||
)))
|
||||
}
|
||||
Some(body) => Ok(Download {
|
||||
metadata: object_output.metadata.map(StorageMetadata),
|
||||
download_stream: Box::pin(io::BufReader::new(body.into_async_read())),
|
||||
}),
|
||||
},
|
||||
Err(RusotoError::Service(GetObjectError::NoSuchKey(_))) => Err(DownloadError::NotFound),
|
||||
Err(e) => {
|
||||
metrics::inc_get_object_fail();
|
||||
Err(DownloadError::Other(anyhow::anyhow!(
|
||||
"Failed to download S3 object: {e}"
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -283,38 +316,13 @@ impl RemoteStorage for S3Bucket {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download(
|
||||
&self,
|
||||
from: &Self::RemoteObjectId,
|
||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
||||
) -> anyhow::Result<Option<StorageMetadata>> {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 download")?;
|
||||
|
||||
metrics::inc_get_object();
|
||||
|
||||
let object_output = self
|
||||
.client
|
||||
.get_object(GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: from.key().to_owned(),
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_get_object_fail();
|
||||
e
|
||||
})?;
|
||||
|
||||
if let Some(body) = object_output.body {
|
||||
let mut from = io::BufReader::new(body.into_async_read());
|
||||
io::copy(&mut from, to).await?;
|
||||
}
|
||||
|
||||
Ok(object_output.metadata.map(StorageMetadata))
|
||||
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
|
||||
self.download_object(GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: from.key().to_owned(),
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn download_byte_range(
|
||||
@@ -322,8 +330,7 @@ impl RemoteStorage for S3Bucket {
|
||||
from: &Self::RemoteObjectId,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
||||
) -> anyhow::Result<Option<StorageMetadata>> {
|
||||
) -> Result<Download, DownloadError> {
|
||||
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
|
||||
// and needs both ends to be exclusive
|
||||
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
|
||||
@@ -331,34 +338,14 @@ impl RemoteStorage for S3Bucket {
|
||||
Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive),
|
||||
None => format!("bytes={}-", start_inclusive),
|
||||
});
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 range download")?;
|
||||
|
||||
metrics::inc_get_object();
|
||||
|
||||
let object_output = self
|
||||
.client
|
||||
.get_object(GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: from.key().to_owned(),
|
||||
range,
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_get_object_fail();
|
||||
e
|
||||
})?;
|
||||
|
||||
if let Some(body) = object_output.body {
|
||||
let mut from = io::BufReader::new(body.into_async_read());
|
||||
io::copy(&mut from, to).await?;
|
||||
}
|
||||
|
||||
Ok(object_output.metadata.map(StorageMetadata))
|
||||
self.download_object(GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: from.key().to_owned(),
|
||||
range,
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> {
|
||||
|
||||
@@ -232,23 +232,32 @@ impl Repository for LayeredRepository {
|
||||
|
||||
fn create_empty_timeline(
|
||||
&self,
|
||||
timelineid: ZTimelineId,
|
||||
timeline_id: ZTimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
) -> Result<Arc<LayeredTimeline>> {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let vacant_timeline_entry = match timelines.entry(timeline_id) {
|
||||
Entry::Occupied(_) => bail!("Timeline already exists"),
|
||||
Entry::Vacant(vacant_entry) => vacant_entry,
|
||||
};
|
||||
|
||||
let timeline_path = self.conf.timeline_path(&timeline_id, &self.tenant_id);
|
||||
if timeline_path.exists() {
|
||||
bail!("Timeline directory already exists, but timeline is missing in repository map. This is a bug.")
|
||||
}
|
||||
|
||||
// Create the timeline directory, and write initial metadata to file.
|
||||
crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenant_id))?;
|
||||
crashsafe_dir::create_dir_all(timeline_path)?;
|
||||
|
||||
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn);
|
||||
Self::save_metadata(self.conf, timelineid, self.tenant_id, &metadata, true)?;
|
||||
Self::save_metadata(self.conf, timeline_id, self.tenant_id, &metadata, true)?;
|
||||
|
||||
let timeline = LayeredTimeline::new(
|
||||
self.conf,
|
||||
Arc::clone(&self.tenant_conf),
|
||||
metadata,
|
||||
None,
|
||||
timelineid,
|
||||
timeline_id,
|
||||
self.tenant_id,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
self.upload_layers,
|
||||
@@ -257,12 +266,7 @@ impl Repository for LayeredRepository {
|
||||
|
||||
// Insert if not exists
|
||||
let timeline = Arc::new(timeline);
|
||||
match timelines.entry(timelineid) {
|
||||
Entry::Occupied(_) => bail!("Timeline already exists"),
|
||||
Entry::Vacant(vacant) => {
|
||||
vacant.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)))
|
||||
}
|
||||
};
|
||||
vacant_timeline_entry.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)));
|
||||
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -225,7 +225,7 @@ pub trait Repository: Send + Sync {
|
||||
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
|
||||
fn create_empty_timeline(
|
||||
&self,
|
||||
timelineid: ZTimelineId,
|
||||
timeline_id: ZTimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
) -> Result<Arc<Self::Timeline>>;
|
||||
|
||||
@@ -636,6 +636,19 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_duplicate_timelines() -> Result<()> {
|
||||
let repo = RepoHarness::create("no_duplicate_timelines")?.load();
|
||||
let _ = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
|
||||
match repo.create_empty_timeline(TIMELINE_ID, Lsn(0)) {
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
Err(e) => assert_eq!(e.to_string(), "Timeline already exists"),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Convenience function to create a page image with given string as the only content
|
||||
pub fn test_value(s: &str) -> Value {
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
@@ -44,13 +44,23 @@ where
|
||||
index_part_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
let mut index_part_download =
|
||||
storage
|
||||
.download(&part_storage_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to open download stream for for storage path {part_storage_path:?}")
|
||||
})?;
|
||||
let mut index_part_bytes = Vec::new();
|
||||
storage
|
||||
.download(&part_storage_path, &mut index_part_bytes)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to download an index part from storage path {part_storage_path:?}")
|
||||
})?;
|
||||
io::copy(
|
||||
&mut index_part_download.download_stream,
|
||||
&mut index_part_bytes,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to download an index part from storage path {part_storage_path:?}")
|
||||
})?;
|
||||
|
||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| {
|
||||
format!("Failed to deserialize index part file from storage path '{part_storage_path:?}'")
|
||||
@@ -162,15 +172,19 @@ where
|
||||
temp_file_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
storage
|
||||
.download(&layer_storage_path, &mut destination_file)
|
||||
let mut download = storage
|
||||
.download(&layer_storage_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to download a layer from storage path '{layer_storage_path:?}'"
|
||||
"Failed to open a download stream for layer with remote storage path '{layer_storage_path:?}'"
|
||||
)
|
||||
})?;
|
||||
io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to download layer with remote storage path '{layer_storage_path:?}' into file '{}'", temp_file_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
|
||||
// A file will not be closed immediately when it goes out of scope if there are any IO operations
|
||||
|
||||
@@ -2,18 +2,16 @@ use anyhow::{Context, Result};
|
||||
use etcd_broker::subscription_key::{
|
||||
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
|
||||
};
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use postgres_ffi::xlog_utils::{
|
||||
XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, MAX_SEND_SIZE, PG_TLI,
|
||||
};
|
||||
use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI};
|
||||
use remote_storage::{GenericRemoteStorage, RemoteStorage};
|
||||
use tokio::fs::File;
|
||||
use tokio::runtime::Builder;
|
||||
@@ -452,45 +450,41 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
|
||||
pub async fn read_object(
|
||||
file_path: PathBuf,
|
||||
offset: u64,
|
||||
) -> (impl AsyncRead, JoinHandle<Result<()>>) {
|
||||
let storage = REMOTE_STORAGE.get().expect("failed to get remote storage");
|
||||
) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead>>> {
|
||||
let download = match REMOTE_STORAGE
|
||||
.get()
|
||||
.context("Failed to get remote storage")?
|
||||
.as_ref()
|
||||
.context("No remote storage configured")?
|
||||
{
|
||||
GenericRemoteStorage::Local(local_storage) => {
|
||||
let source = local_storage.remote_object_id(&file_path)?;
|
||||
|
||||
let (mut pipe_writer, pipe_reader) = tokio::io::duplex(MAX_SEND_SIZE);
|
||||
|
||||
let copy_result = tokio::spawn(async move {
|
||||
let res = match storage.as_ref().unwrap() {
|
||||
GenericRemoteStorage::Local(local_storage) => {
|
||||
let source = local_storage.remote_object_id(&file_path)?;
|
||||
|
||||
info!(
|
||||
"local download about to start from {} at offset {}",
|
||||
source.display(),
|
||||
offset
|
||||
);
|
||||
local_storage
|
||||
.download_byte_range(&source, offset, None, &mut pipe_writer)
|
||||
.await
|
||||
}
|
||||
GenericRemoteStorage::S3(s3_storage) => {
|
||||
let s3key = s3_storage.remote_object_id(&file_path)?;
|
||||
|
||||
info!(
|
||||
"S3 download about to start from {:?} at offset {}",
|
||||
s3key, offset
|
||||
);
|
||||
s3_storage
|
||||
.download_byte_range(&s3key, offset, None, &mut pipe_writer)
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = res {
|
||||
error!("failed to download WAL segment from remote storage: {}", e);
|
||||
Err(e)
|
||||
} else {
|
||||
Ok(())
|
||||
info!(
|
||||
"local download about to start from {} at offset {}",
|
||||
source.display(),
|
||||
offset
|
||||
);
|
||||
local_storage
|
||||
.download_byte_range(&source, offset, None)
|
||||
.await
|
||||
}
|
||||
});
|
||||
GenericRemoteStorage::S3(s3_storage) => {
|
||||
let s3key = s3_storage.remote_object_id(&file_path)?;
|
||||
|
||||
(pipe_reader, copy_result)
|
||||
info!(
|
||||
"S3 download about to start from {:?} at offset {}",
|
||||
s3key, offset
|
||||
);
|
||||
s3_storage.download_byte_range(&s3key, offset, None).await
|
||||
}
|
||||
}
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to open WAL segment download stream for local storage path {}",
|
||||
file_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(download.download_stream)
|
||||
}
|
||||
|
||||
@@ -604,8 +604,7 @@ impl WalReader {
|
||||
|
||||
// Try to open remote file, if remote reads are enabled
|
||||
if self.enable_remote_read {
|
||||
let (reader, _) = read_object(wal_file_path, xlogoff as u64).await;
|
||||
return Ok(Box::pin(reader));
|
||||
return read_object(wal_file_path, xlogoff as u64).await;
|
||||
}
|
||||
|
||||
bail!("WAL segment is not found")
|
||||
|
||||
438
scripts/add_missing_rels.py
Normal file
438
scripts/add_missing_rels.py
Normal file
@@ -0,0 +1,438 @@
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
from contextlib import closing
|
||||
import psycopg2
|
||||
import subprocess
|
||||
import argparse
|
||||
|
||||
### utils copied from test fixtures
|
||||
from typing import Any, List
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
import asyncpg
|
||||
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple
|
||||
|
||||
Env = Dict[str, str]
|
||||
|
||||
_global_counter = 0
|
||||
|
||||
|
||||
def global_counter() -> int:
|
||||
""" A really dumb global counter.
|
||||
|
||||
This is useful for giving output files a unique number, so if we run the
|
||||
same command multiple times we can keep their output separate.
|
||||
"""
|
||||
global _global_counter
|
||||
_global_counter += 1
|
||||
return _global_counter
|
||||
|
||||
|
||||
def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
|
||||
""" Run a process and capture its output
|
||||
|
||||
Output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr"
|
||||
where "cmd" is the name of the program and NNN is an incrementing
|
||||
counter.
|
||||
|
||||
If those files already exist, we will overwrite them.
|
||||
Returns basepath for files with captured output.
|
||||
"""
|
||||
assert type(cmd) is list
|
||||
base = os.path.basename(cmd[0]) + '_{}'.format(global_counter())
|
||||
basepath = os.path.join(capture_dir, base)
|
||||
stdout_filename = basepath + '.stdout'
|
||||
stderr_filename = basepath + '.stderr'
|
||||
|
||||
with open(stdout_filename, 'w') as stdout_f:
|
||||
with open(stderr_filename, 'w') as stderr_f:
|
||||
print('(capturing output to "{}.stdout")'.format(base))
|
||||
subprocess.run(cmd, **kwargs, stdout=stdout_f, stderr=stderr_f)
|
||||
|
||||
return basepath
|
||||
|
||||
|
||||
class PgBin:
|
||||
""" A helper class for executing postgres binaries """
|
||||
def __init__(self, log_dir: Path, pg_distrib_dir):
|
||||
self.log_dir = log_dir
|
||||
self.pg_bin_path = os.path.join(str(pg_distrib_dir), 'bin')
|
||||
self.env = os.environ.copy()
|
||||
self.env['LD_LIBRARY_PATH'] = os.path.join(str(pg_distrib_dir), 'lib')
|
||||
|
||||
def _fixpath(self, command: List[str]):
|
||||
if '/' not in command[0]:
|
||||
command[0] = os.path.join(self.pg_bin_path, command[0])
|
||||
|
||||
def _build_env(self, env_add: Optional[Env]) -> Env:
|
||||
if env_add is None:
|
||||
return self.env
|
||||
env = self.env.copy()
|
||||
env.update(env_add)
|
||||
return env
|
||||
|
||||
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None):
|
||||
"""
|
||||
Run one of the postgres binaries.
|
||||
|
||||
The command should be in list form, e.g. ['pgbench', '-p', '55432']
|
||||
|
||||
All the necessary environment variables will be set.
|
||||
|
||||
If the first argument (the command name) doesn't include a path (no '/'
|
||||
characters present), then it will be edited to include the correct path.
|
||||
|
||||
If you want stdout/stderr captured to files, use `run_capture` instead.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
subprocess.run(command, env=env, cwd=cwd, check=True)
|
||||
|
||||
def run_capture(self,
|
||||
command: List[str],
|
||||
env: Optional[Env] = None,
|
||||
cwd: Optional[str] = None,
|
||||
**kwargs: Any) -> str:
|
||||
"""
|
||||
Run one of the postgres binaries, with stderr and stdout redirected to a file.
|
||||
|
||||
This is just like `run`, but for chatty programs. Returns basepath for files
|
||||
with captured output.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
return subprocess_capture(str(self.log_dir),
|
||||
command,
|
||||
env=env,
|
||||
cwd=cwd,
|
||||
check=True,
|
||||
**kwargs)
|
||||
|
||||
class PgProtocol:
|
||||
""" Reusable connection logic """
|
||||
def __init__(self, **kwargs):
|
||||
self.default_options = kwargs
|
||||
|
||||
def connstr(self, **kwargs) -> str:
|
||||
"""
|
||||
Build a libpq connection string for the Postgres instance.
|
||||
"""
|
||||
return str(make_dsn(**self.conn_options(**kwargs)))
|
||||
|
||||
def conn_options(self, **kwargs):
|
||||
conn_options = self.default_options.copy()
|
||||
if 'dsn' in kwargs:
|
||||
conn_options.update(parse_dsn(kwargs['dsn']))
|
||||
conn_options.update(kwargs)
|
||||
|
||||
# Individual statement timeout in seconds. 2 minutes should be
|
||||
# enough for our tests, but if you need a longer, you can
|
||||
# change it by calling "SET statement_timeout" after
|
||||
# connecting.
|
||||
if 'options' in conn_options:
|
||||
conn_options['options'] = f"-cstatement_timeout=120s " + conn_options['options']
|
||||
else:
|
||||
conn_options['options'] = "-cstatement_timeout=120s"
|
||||
return conn_options
|
||||
|
||||
# autocommit=True here by default because that's what we need most of the time
|
||||
def connect(self, autocommit=True, **kwargs) -> PgConnection:
|
||||
"""
|
||||
Connect to the node.
|
||||
Returns psycopg2's connection object.
|
||||
This method passes all extra params to connstr.
|
||||
"""
|
||||
conn = psycopg2.connect(**self.conn_options(**kwargs))
|
||||
|
||||
# WARNING: this setting affects *all* tests!
|
||||
conn.autocommit = autocommit
|
||||
return conn
|
||||
|
||||
async def connect_async(self, **kwargs) -> asyncpg.Connection:
|
||||
"""
|
||||
Connect to the node from async python.
|
||||
Returns asyncpg's connection object.
|
||||
"""
|
||||
|
||||
# asyncpg takes slightly different options than psycopg2. Try
|
||||
# to convert the defaults from the psycopg2 format.
|
||||
|
||||
# The psycopg2 option 'dbname' is called 'database' is asyncpg
|
||||
conn_options = self.conn_options(**kwargs)
|
||||
if 'dbname' in conn_options:
|
||||
conn_options['database'] = conn_options.pop('dbname')
|
||||
|
||||
# Convert options='-c<key>=<val>' to server_settings
|
||||
if 'options' in conn_options:
|
||||
options = conn_options.pop('options')
|
||||
for match in re.finditer('-c(\w*)=(\w*)', options):
|
||||
key = match.group(1)
|
||||
val = match.group(2)
|
||||
if 'server_options' in conn_options:
|
||||
conn_options['server_settings'].update({key: val})
|
||||
else:
|
||||
conn_options['server_settings'] = {key: val}
|
||||
return await asyncpg.connect(**conn_options)
|
||||
|
||||
def safe_psql(self, query: str, **kwargs: Any) -> List[Tuple[Any, ...]]:
|
||||
"""
|
||||
Execute query against the node and return all rows.
|
||||
This method passes all extra params to connstr.
|
||||
"""
|
||||
return self.safe_psql_many([query], **kwargs)[0]
|
||||
|
||||
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
|
||||
"""
|
||||
Execute queries against the node and return all rows.
|
||||
This method passes all extra params to connstr.
|
||||
"""
|
||||
result: List[List[Any]] = []
|
||||
with closing(self.connect(**kwargs)) as conn:
|
||||
with conn.cursor() as cur:
|
||||
for query in queries:
|
||||
print(f"Executing query: {query}")
|
||||
cur.execute(query)
|
||||
|
||||
if cur.description is None:
|
||||
result.append([]) # query didn't return data
|
||||
else:
|
||||
result.append(cast(List[Any], cur.fetchall()))
|
||||
return result
|
||||
|
||||
|
||||
class VanillaPostgres(PgProtocol):
|
||||
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
|
||||
super().__init__(host='localhost', port=port, dbname='postgres')
|
||||
self.pgdatadir = pgdatadir
|
||||
self.pg_bin = pg_bin
|
||||
self.running = False
|
||||
if init:
|
||||
self.pg_bin.run_capture(['initdb', '-D', str(pgdatadir)])
|
||||
self.configure([f"port = {port}\n"])
|
||||
|
||||
def configure(self, options: List[str]):
|
||||
"""Append lines into postgresql.conf file."""
|
||||
assert not self.running
|
||||
with open(os.path.join(self.pgdatadir, 'postgresql.conf'), 'a') as conf_file:
|
||||
conf_file.write("\n".join(options))
|
||||
|
||||
def start(self, log_path: Optional[str] = None):
|
||||
assert not self.running
|
||||
self.running = True
|
||||
|
||||
if log_path is None:
|
||||
log_path = os.path.join(self.pgdatadir, "pg.log")
|
||||
|
||||
self.pg_bin.run_capture(
|
||||
['pg_ctl', '-w', '-D', str(self.pgdatadir), '-l', log_path, 'start'])
|
||||
|
||||
def stop(self):
|
||||
assert self.running
|
||||
self.running = False
|
||||
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', str(self.pgdatadir), 'stop'])
|
||||
|
||||
def get_subdir_size(self, subdir) -> int:
|
||||
"""Return size of pgdatadir subdirectory in bytes."""
|
||||
return get_dir_size(os.path.join(self.pgdatadir, subdir))
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
if self.running:
|
||||
self.stop()
|
||||
|
||||
|
||||
### actual code
|
||||
|
||||
|
||||
def get_rel_paths(log_dir, pg_bin, base_tar):
|
||||
"""Yeild list of relation paths"""
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
||||
|
||||
port = "55439" # Probably free
|
||||
with VanillaPostgres(restored_dir, pg_bin, port, init=False) as vanilla_pg:
|
||||
vanilla_pg.configure([f"port={port}"])
|
||||
vanilla_pg.start()
|
||||
|
||||
# Create database based on template0 because we can't connect to template0
|
||||
query = "create database template0copy template template0"
|
||||
vanilla_pg.safe_psql(query, user="cloud_admin")
|
||||
vanilla_pg.safe_psql("CHECKPOINT", user="cloud_admin")
|
||||
|
||||
# Get all databases
|
||||
query = "select oid, datname from pg_database"
|
||||
oid_dbname_pairs = vanilla_pg.safe_psql(query, user="cloud_admin")
|
||||
template0_oid = [
|
||||
oid
|
||||
for (oid, database) in oid_dbname_pairs
|
||||
if database == "template0"
|
||||
][0]
|
||||
|
||||
# Get rel paths for each database
|
||||
for oid, database in oid_dbname_pairs:
|
||||
if database == "template0":
|
||||
# We can't connect to template0
|
||||
continue
|
||||
|
||||
query = "select relname, pg_relation_filepath(oid) from pg_class"
|
||||
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
|
||||
for relname, filepath in result:
|
||||
if filepath is not None:
|
||||
|
||||
if database == "template0copy":
|
||||
# Add all template0copy paths to template0
|
||||
prefix = f"base/{oid}/"
|
||||
if filepath.startswith(prefix):
|
||||
suffix = filepath[len(prefix):]
|
||||
yield f"base/{template0_oid}/{suffix}"
|
||||
elif filepath.startswith("global"):
|
||||
print(f"skipping {database} global file {filepath}")
|
||||
else:
|
||||
raise AssertionError
|
||||
else:
|
||||
yield filepath
|
||||
|
||||
|
||||
def pack_base(log_dir, restored_dir, output_tar):
|
||||
tmp_tar_name = "tmp.tar"
|
||||
tmp_tar_path = os.path.join(restored_dir, tmp_tar_name)
|
||||
cmd = ["tar", "-cf", tmp_tar_name] + os.listdir(restored_dir)
|
||||
subprocess_capture(log_dir, cmd, cwd=restored_dir)
|
||||
shutil.move(tmp_tar_path, output_tar)
|
||||
|
||||
|
||||
def get_files_in_tar(log_dir, tar):
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", tar, "-C", restored_dir])
|
||||
|
||||
# Find empty files
|
||||
empty_files = []
|
||||
for root, dirs, files in os.walk(restored_dir):
|
||||
for name in files:
|
||||
file_path = os.path.join(root, name)
|
||||
yield file_path[len(restored_dir) + 1:]
|
||||
|
||||
|
||||
def corrupt(log_dir, base_tar, output_tar):
|
||||
"""Remove all empty files and repackage. Return paths of files removed."""
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
||||
|
||||
# Find empty files
|
||||
empty_files = []
|
||||
for root, dirs, files in os.walk(restored_dir):
|
||||
for name in files:
|
||||
file_path = os.path.join(root, name)
|
||||
file_size = os.path.getsize(file_path)
|
||||
if file_size == 0:
|
||||
empty_files.append(file_path)
|
||||
|
||||
# Delete empty files (just to see if they get recreated)
|
||||
for empty_file in empty_files:
|
||||
os.remove(empty_file)
|
||||
|
||||
# Repackage
|
||||
pack_base(log_dir, restored_dir, output_tar)
|
||||
|
||||
# Return relative paths
|
||||
return {
|
||||
empty_file[len(restored_dir) + 1:]
|
||||
for empty_file in empty_files
|
||||
}
|
||||
|
||||
|
||||
def touch_missing_rels(log_dir, corrupt_tar, output_tar, paths):
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", corrupt_tar, "-C", restored_dir])
|
||||
|
||||
# Touch files that don't exist
|
||||
for path in paths:
|
||||
absolute_path = os.path.join(restored_dir, path)
|
||||
exists = os.path.exists(absolute_path)
|
||||
if not exists:
|
||||
print("File {absolute_path} didn't exist. Creating..")
|
||||
Path(absolute_path).touch()
|
||||
|
||||
# Repackage
|
||||
pack_base(log_dir, restored_dir, output_tar)
|
||||
|
||||
|
||||
# TODO this test is not currently called. It needs any ordinary base.tar path as input
|
||||
def test_add_missing_rels(base_tar):
|
||||
output_tar = base_tar + ".fixed"
|
||||
|
||||
# Create new base tar with missing empty files
|
||||
corrupt_tar = os.path.join(test_output_dir, "psql_2-corrupted.stdout")
|
||||
deleted_files = corrupt(test_output_dir, base_tar, corrupt_tar)
|
||||
assert len(set(get_files_in_tar(test_output_dir, base_tar)) -
|
||||
set(get_files_in_tar(test_output_dir, corrupt_tar))) > 0
|
||||
|
||||
# Reconstruct paths from the corrupted tar, assert it covers everything important
|
||||
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, corrupt_tar))
|
||||
paths_missed = deleted_files - reconstructed_paths
|
||||
assert paths_missed.issubset({
|
||||
"postgresql.auto.conf",
|
||||
"pg_ident.conf",
|
||||
})
|
||||
|
||||
# Recreate the correct tar by touching files, compare with original tar
|
||||
touch_missing_rels(test_output_dir, corrupt_tar, output_tar, reconstructed_paths)
|
||||
paths_missed = (set(get_files_in_tar(test_output_dir, base_tar)) -
|
||||
set(get_files_in_tar(test_output_dir, output_tar)))
|
||||
assert paths_missed.issubset({
|
||||
"postgresql.auto.conf",
|
||||
"pg_ident.conf",
|
||||
})
|
||||
|
||||
|
||||
# Example command:
|
||||
# poetry run python scripts/add_missing_rels.py \
|
||||
# --base-tar /home/bojan/src/neondatabase/neon/test_output/test_import_from_pageserver/psql_2.stdout \
|
||||
# --output-tar output-base.tar \
|
||||
# --log-dir /home/bojan/tmp
|
||||
# --pg-distrib-dir /home/bojan/src/neondatabase/neon/tmp_install/
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'--base-tar',
|
||||
dest='base_tar',
|
||||
required=True,
|
||||
help='base.tar file to add missing rels to (file will not be modified)',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--output-tar',
|
||||
dest='output_tar',
|
||||
required=True,
|
||||
help='path and name for the output base.tar file',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--log-dir',
|
||||
dest='log_dir',
|
||||
required=True,
|
||||
help='directory to save log files in',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--pg-distrib-dir',
|
||||
dest='pg_distrib_dir',
|
||||
required=True,
|
||||
help='directory where postgres is installed',
|
||||
)
|
||||
args = parser.parse_args()
|
||||
base_tar = args.base_tar
|
||||
output_tar = args.output_tar
|
||||
log_dir = args.log_dir
|
||||
pg_bin = PgBin(log_dir, args.pg_distrib_dir)
|
||||
|
||||
reconstructed_paths = set(get_rel_paths(log_dir, pg_bin, base_tar))
|
||||
touch_missing_rels(log_dir, base_tar, output_tar, reconstructed_paths)
|
||||
@@ -72,6 +72,14 @@ class NeonPageserverHttpClient(requests.Session):
|
||||
return res_json
|
||||
|
||||
|
||||
import pytest
|
||||
import os
|
||||
def add_missing_empty_rels(base_tar, output_tar):
|
||||
os.environ['INPUT_BASE_TAR'] = base_tar
|
||||
os.environ['OUTPUT_BASE_TAR'] = output_tar
|
||||
pytest.main(["-s", "-k", "test_main_hack"])
|
||||
|
||||
|
||||
def main(args: argparse.Namespace):
|
||||
old_pageserver_host = args.old_pageserver_host
|
||||
new_pageserver_host = args.new_pageserver_host
|
||||
@@ -102,7 +110,7 @@ def main(args: argparse.Namespace):
|
||||
if args.only_import is False:
|
||||
query = f"fullbackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']}"
|
||||
|
||||
cmd = ["psql", "--no-psqlrc", old_pageserver_connstr, "-c", query]
|
||||
cmd = [args.psql_path, "--no-psqlrc", old_pageserver_connstr, "-c", query]
|
||||
print(f"Running: {cmd}")
|
||||
|
||||
tar_filename = path.join(basepath,
|
||||
@@ -115,6 +123,8 @@ def main(args: argparse.Namespace):
|
||||
print(f"(capturing output to {tar_filename})")
|
||||
subprocess.run(cmd, stdout=stdout_f, stderr=stderr_f, env=psql_env)
|
||||
|
||||
# add_missing_emtpy_rels(incomplete_tar_filename, tar_filename)
|
||||
|
||||
print(f"Done export: {tar_filename}")
|
||||
|
||||
# Import timelines to new pageserver
|
||||
|
||||
167
test_runner/batch_others/test_complete_basebackup.py
Normal file
167
test_runner/batch_others/test_complete_basebackup.py
Normal file
@@ -0,0 +1,167 @@
|
||||
from fixtures.neon_fixtures import VanillaPostgres
|
||||
from fixtures.utils import subprocess_capture
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
|
||||
|
||||
def get_rel_paths(log_dir, pg_bin, base_tar):
|
||||
"""Yeild list of relation paths"""
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
||||
|
||||
port = "55439" # Probably free
|
||||
with VanillaPostgres(restored_dir, pg_bin, port, init=False) as vanilla_pg:
|
||||
vanilla_pg.configure([f"port={port}"])
|
||||
vanilla_pg.start()
|
||||
|
||||
# Create database based on template0 because we can't connect to template0
|
||||
query = "create database template0copy template template0"
|
||||
vanilla_pg.safe_psql(query, user="cloud_admin")
|
||||
vanilla_pg.safe_psql("CHECKPOINT", user="cloud_admin")
|
||||
|
||||
# Get all databases
|
||||
query = "select oid, datname from pg_database"
|
||||
oid_dbname_pairs = vanilla_pg.safe_psql(query, user="cloud_admin")
|
||||
template0_oid = [
|
||||
oid
|
||||
for (oid, database) in oid_dbname_pairs
|
||||
if database == "template0"
|
||||
][0]
|
||||
|
||||
# Get rel paths for each database
|
||||
for oid, database in oid_dbname_pairs:
|
||||
if database == "template0":
|
||||
# We can't connect to template0
|
||||
continue
|
||||
|
||||
query = "select relname, pg_relation_filepath(oid) from pg_class"
|
||||
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
|
||||
for relname, filepath in result:
|
||||
if filepath is not None:
|
||||
|
||||
if database == "template0copy":
|
||||
# Add all template0copy paths to template0
|
||||
prefix = f"base/{oid}/"
|
||||
if filepath.startswith(prefix):
|
||||
suffix = filepath[len(prefix):]
|
||||
yield f"base/{template0_oid}/{suffix}"
|
||||
elif filepath.startswith("global"):
|
||||
print(f"skipping {database} global file {filepath}")
|
||||
else:
|
||||
raise AssertionError
|
||||
else:
|
||||
yield filepath
|
||||
|
||||
|
||||
def pack_base(log_dir, restored_dir, output_tar):
|
||||
tmp_tar_name = "tmp.tar"
|
||||
tmp_tar_path = os.path.join(restored_dir, tmp_tar_name)
|
||||
cmd = ["tar", "-cf", tmp_tar_name] + os.listdir(restored_dir)
|
||||
subprocess_capture(log_dir, cmd, cwd=restored_dir)
|
||||
shutil.move(tmp_tar_path, output_tar)
|
||||
|
||||
|
||||
def get_files_in_tar(log_dir, tar):
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", tar, "-C", restored_dir])
|
||||
|
||||
# Find empty files
|
||||
empty_files = []
|
||||
for root, dirs, files in os.walk(restored_dir):
|
||||
for name in files:
|
||||
file_path = os.path.join(root, name)
|
||||
yield file_path[len(restored_dir) + 1:]
|
||||
|
||||
|
||||
def corrupt(log_dir, base_tar, output_tar):
|
||||
"""Remove all empty files and repackage. Return paths of files removed."""
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
||||
|
||||
# Find empty files
|
||||
empty_files = []
|
||||
for root, dirs, files in os.walk(restored_dir):
|
||||
for name in files:
|
||||
file_path = os.path.join(root, name)
|
||||
file_size = os.path.getsize(file_path)
|
||||
if file_size == 0:
|
||||
empty_files.append(file_path)
|
||||
|
||||
# Delete empty files (just to see if they get recreated)
|
||||
for empty_file in empty_files:
|
||||
os.remove(empty_file)
|
||||
|
||||
# Repackage
|
||||
pack_base(log_dir, restored_dir, output_tar)
|
||||
|
||||
# Return relative paths
|
||||
return {
|
||||
empty_file[len(restored_dir) + 1:]
|
||||
for empty_file in empty_files
|
||||
}
|
||||
|
||||
|
||||
def touch_missing_rels(log_dir, corrupt_tar, output_tar, paths):
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", corrupt_tar, "-C", restored_dir])
|
||||
|
||||
# Touch files that don't exist
|
||||
for path in paths:
|
||||
absolute_path = os.path.join(restored_dir, path)
|
||||
exists = os.path.exists(absolute_path)
|
||||
if not exists:
|
||||
print("File {absolute_path} didn't exist. Creating..")
|
||||
Path(absolute_path).touch()
|
||||
|
||||
# Repackage
|
||||
pack_base(log_dir, restored_dir, output_tar)
|
||||
|
||||
|
||||
def test_complete(test_output_dir, pg_bin):
|
||||
# Specify directories
|
||||
# TODO make a basebackup instead of using one from another test
|
||||
work_dir = "/home/bojan/src/neondatabase/neon/test_output/test_import_from_pageserver/"
|
||||
base_tar = os.path.join(work_dir, "psql_2.stdout")
|
||||
output_tar = os.path.join(work_dir, "psql_2-completed.stdout")
|
||||
|
||||
# Create new base tar with missing empty files
|
||||
corrupt_tar = os.path.join(test_output_dir, "psql_2-corrupted.stdout")
|
||||
deleted_files = corrupt(test_output_dir, base_tar, corrupt_tar)
|
||||
assert len(set(get_files_in_tar(test_output_dir, base_tar)) -
|
||||
set(get_files_in_tar(test_output_dir, corrupt_tar))) > 0
|
||||
|
||||
# Reconstruct paths from the corrupted tar, assert it covers everything important
|
||||
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, corrupt_tar))
|
||||
paths_missed = deleted_files - reconstructed_paths
|
||||
assert paths_missed.issubset({
|
||||
"postgresql.auto.conf",
|
||||
"pg_ident.conf",
|
||||
})
|
||||
|
||||
# Recreate the correct tar by touching files, compare with original tar
|
||||
touch_missing_rels(test_output_dir, corrupt_tar, output_tar, reconstructed_paths)
|
||||
paths_missed = (set(get_files_in_tar(test_output_dir, base_tar)) -
|
||||
set(get_files_in_tar(test_output_dir, output_tar)))
|
||||
assert paths_missed.issubset({
|
||||
"postgresql.auto.conf",
|
||||
"pg_ident.conf",
|
||||
})
|
||||
|
||||
# HACK this script relies on test fixtures, but you can run it with
|
||||
# poetry run pytest -k test_main_hack and pass inputs via envvars
|
||||
#
|
||||
# The script takes a base tar, infers what empty rel files might be missing
|
||||
# and creates a new base tar with those files included. It does not modify
|
||||
# the original file.
|
||||
def test_main_hack(test_output_dir, pg_bin, pytestconfig):
|
||||
base_tar = os.environ['INPUT_BASE_TAR']
|
||||
output_tar = os.environ['OUTPUT_BASE_TAR']
|
||||
|
||||
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, base_tar))
|
||||
touch_missing_rels(test_output_dir, base_tar, output_tar, reconstructed_paths)
|
||||
@@ -11,7 +11,7 @@ import signal
|
||||
import pytest
|
||||
|
||||
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir
|
||||
from fixtures.utils import lsn_from_hex
|
||||
from fixtures.utils import lsn_from_hex, subprocess_capture
|
||||
|
||||
|
||||
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
|
||||
@@ -101,10 +101,6 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
|
||||
log.info('load thread stopped')
|
||||
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason=
|
||||
"needs to replace callmemaybe call with better idea how to migrate timelines between pageservers"
|
||||
)
|
||||
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
|
||||
def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
port_distributor: PortDistributor,
|
||||
@@ -188,30 +184,38 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
new_pageserver_http_port,
|
||||
neon_env_builder.broker):
|
||||
|
||||
# call to attach timeline to new pageserver
|
||||
new_pageserver_http.timeline_attach(tenant, timeline)
|
||||
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
|
||||
new_timeline_detail = wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
|
||||
# Migrate either by attacking from s3 or import/export basebackup
|
||||
relocation_method = "import"
|
||||
if relocation_method == "import":
|
||||
scripts_dir = "/home/bojan/src/neondatabase/neon/scripts/"
|
||||
cmd = [
|
||||
"python",
|
||||
os.path.join(scripts_dir, "export_import_betwen_pageservers.py"),
|
||||
"--tenant-id", tenant.hex,
|
||||
"--from-host", "localhost",
|
||||
"--from-http-port", str(pageserver_http.port),
|
||||
"--from-pg-port", str(env.pageserver.service_port.pg),
|
||||
"--to-host", "localhost",
|
||||
"--to-http-port", str(new_pageserver_http_port),
|
||||
"--to-pg-port", str(new_pageserver_pg_port),
|
||||
"--psql-path", os.path.join(pg_distrib_dir, "bin", "psql"),
|
||||
]
|
||||
subprocess_capture(env.repo_dir, cmd, check=True)
|
||||
elif relocation_method == "attach":
|
||||
# call to attach timeline to new pageserver
|
||||
new_pageserver_http.timeline_attach(tenant, timeline)
|
||||
|
||||
# when load is active these checks can break because lsns are not static
|
||||
# so lets check with some margin
|
||||
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
||||
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
|
||||
0.03)
|
||||
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
|
||||
new_timeline_detail = wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
|
||||
|
||||
# callmemaybe to start replication from safekeeper to the new pageserver
|
||||
# when there is no load there is a clean checkpoint and no wal delta
|
||||
# needs to be streamed to the new pageserver
|
||||
# TODO (rodionov) use attach to start replication
|
||||
with pg_cur(PgProtocol(host='localhost', port=new_pageserver_pg_port)) as cur:
|
||||
# "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'"
|
||||
safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
|
||||
cur.execute("callmemaybe {} {} {}".format(tenant.hex,
|
||||
timeline.hex,
|
||||
safekeeper_connstring))
|
||||
# when load is active these checks can break because lsns are not static
|
||||
# so lets check with some margin
|
||||
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
||||
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
|
||||
0.03)
|
||||
|
||||
tenant_pg.stop()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user