mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-12 15:10:38 +00:00
Compare commits
8 Commits
projects_m
...
projects-m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f7efbb2d42 | ||
|
|
0aff7c9ee9 | ||
|
|
4a2a55d9b2 | ||
|
|
50821c0a3c | ||
|
|
68adfe0fc8 | ||
|
|
cfdf79aceb | ||
|
|
99a0a5a19b | ||
|
|
32560e75d2 |
@@ -12,6 +12,7 @@ pageservers
|
|||||||
safekeepers
|
safekeepers
|
||||||
|
|
||||||
[storage:vars]
|
[storage:vars]
|
||||||
|
env_name = neon-stress
|
||||||
console_mgmt_base_url = http://neon-stress-console.local
|
console_mgmt_base_url = http://neon-stress-console.local
|
||||||
bucket_name = neon-storage-ireland
|
bucket_name = neon-storage-ireland
|
||||||
bucket_region = eu-west-1
|
bucket_region = eu-west-1
|
||||||
|
|||||||
@@ -12,8 +12,10 @@ use std::{
|
|||||||
borrow::Cow,
|
borrow::Cow,
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
ffi::OsStr,
|
ffi::OsStr,
|
||||||
|
fmt::Debug,
|
||||||
num::{NonZeroU32, NonZeroUsize},
|
num::{NonZeroU32, NonZeroUsize},
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
|
pin::Pin,
|
||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
@@ -70,11 +72,7 @@ pub trait RemoteStorage: Send + Sync {
|
|||||||
|
|
||||||
/// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
|
/// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
|
||||||
/// Returns the metadata, if any was stored with the file previously.
|
/// Returns the metadata, if any was stored with the file previously.
|
||||||
async fn download(
|
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError>;
|
||||||
&self,
|
|
||||||
from: &Self::RemoteObjectId,
|
|
||||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
|
||||||
) -> anyhow::Result<Option<StorageMetadata>>;
|
|
||||||
|
|
||||||
/// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
|
/// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
|
||||||
/// Returns the metadata, if any was stored with the file previously.
|
/// Returns the metadata, if any was stored with the file previously.
|
||||||
@@ -83,12 +81,49 @@ pub trait RemoteStorage: Send + Sync {
|
|||||||
from: &Self::RemoteObjectId,
|
from: &Self::RemoteObjectId,
|
||||||
start_inclusive: u64,
|
start_inclusive: u64,
|
||||||
end_exclusive: Option<u64>,
|
end_exclusive: Option<u64>,
|
||||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
) -> Result<Download, DownloadError>;
|
||||||
) -> anyhow::Result<Option<StorageMetadata>>;
|
|
||||||
|
|
||||||
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>;
|
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct Download {
|
||||||
|
pub download_stream: Pin<Box<dyn io::AsyncRead + Unpin + Send>>,
|
||||||
|
/// Extra key-value data, associated with the current remote file.
|
||||||
|
pub metadata: Option<StorageMetadata>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Debug for Download {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("Download")
|
||||||
|
.field("metadata", &self.metadata)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum DownloadError {
|
||||||
|
/// Validation or other error happened due to user input.
|
||||||
|
BadInput(anyhow::Error),
|
||||||
|
/// The file was not found in the remote storage.
|
||||||
|
NotFound,
|
||||||
|
/// The file was found in the remote storage, but the download failed.
|
||||||
|
Other(anyhow::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for DownloadError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
DownloadError::BadInput(e) => {
|
||||||
|
write!(f, "Failed to download a remote file due to user input: {e}")
|
||||||
|
}
|
||||||
|
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
|
||||||
|
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for DownloadError {}
|
||||||
|
|
||||||
/// Every storage, currently supported.
|
/// Every storage, currently supported.
|
||||||
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
|
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
|
||||||
pub enum GenericRemoteStorage {
|
pub enum GenericRemoteStorage {
|
||||||
@@ -180,7 +215,7 @@ pub struct S3Config {
|
|||||||
pub concurrency_limit: NonZeroUsize,
|
pub concurrency_limit: NonZeroUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for S3Config {
|
impl Debug for S3Config {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
f.debug_struct("S3Config")
|
f.debug_struct("S3Config")
|
||||||
.field("bucket_name", &self.bucket_name)
|
.field("bucket_name", &self.bucket_name)
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ use tokio::{
|
|||||||
};
|
};
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
|
|
||||||
use crate::path_with_suffix_extension;
|
use crate::{path_with_suffix_extension, Download, DownloadError};
|
||||||
|
|
||||||
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
|
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
|
||||||
|
|
||||||
@@ -192,15 +192,12 @@ impl RemoteStorage for LocalFs {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn download(
|
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
|
||||||
&self,
|
let file_path = self
|
||||||
from: &Self::RemoteObjectId,
|
.resolve_in_storage(from)
|
||||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
.map_err(DownloadError::BadInput)?;
|
||||||
) -> anyhow::Result<Option<StorageMetadata>> {
|
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
|
||||||
let file_path = self.resolve_in_storage(from)?;
|
let source = io::BufReader::new(
|
||||||
|
|
||||||
if file_path.exists() && file_path.is_file() {
|
|
||||||
let mut source = io::BufReader::new(
|
|
||||||
fs::OpenOptions::new()
|
fs::OpenOptions::new()
|
||||||
.read(true)
|
.read(true)
|
||||||
.open(&file_path)
|
.open(&file_path)
|
||||||
@@ -210,22 +207,20 @@ impl RemoteStorage for LocalFs {
|
|||||||
"Failed to open source file '{}' to use in the download",
|
"Failed to open source file '{}' to use in the download",
|
||||||
file_path.display()
|
file_path.display()
|
||||||
)
|
)
|
||||||
})?,
|
})
|
||||||
|
.map_err(DownloadError::Other)?,
|
||||||
);
|
);
|
||||||
io::copy(&mut source, to).await.with_context(|| {
|
|
||||||
format!(
|
|
||||||
"Failed to download file '{}' from the local storage",
|
|
||||||
file_path.display()
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
source.flush().await?;
|
|
||||||
|
|
||||||
self.read_storage_metadata(&file_path).await
|
let metadata = self
|
||||||
|
.read_storage_metadata(&file_path)
|
||||||
|
.await
|
||||||
|
.map_err(DownloadError::Other)?;
|
||||||
|
Ok(Download {
|
||||||
|
metadata,
|
||||||
|
download_stream: Box::pin(source),
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
bail!(
|
Err(DownloadError::NotFound)
|
||||||
"File '{}' either does not exist or is not a file",
|
|
||||||
file_path.display()
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -234,22 +229,19 @@ impl RemoteStorage for LocalFs {
|
|||||||
from: &Self::RemoteObjectId,
|
from: &Self::RemoteObjectId,
|
||||||
start_inclusive: u64,
|
start_inclusive: u64,
|
||||||
end_exclusive: Option<u64>,
|
end_exclusive: Option<u64>,
|
||||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
) -> Result<Download, DownloadError> {
|
||||||
) -> anyhow::Result<Option<StorageMetadata>> {
|
|
||||||
if let Some(end_exclusive) = end_exclusive {
|
if let Some(end_exclusive) = end_exclusive {
|
||||||
ensure!(
|
if end_exclusive <= start_inclusive {
|
||||||
end_exclusive > start_inclusive,
|
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
|
||||||
"Invalid range, start ({}) is bigger then end ({:?})",
|
};
|
||||||
start_inclusive,
|
|
||||||
end_exclusive
|
|
||||||
);
|
|
||||||
if start_inclusive == end_exclusive.saturating_sub(1) {
|
if start_inclusive == end_exclusive.saturating_sub(1) {
|
||||||
return Ok(None);
|
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let file_path = self.resolve_in_storage(from)?;
|
let file_path = self
|
||||||
|
.resolve_in_storage(from)
|
||||||
if file_path.exists() && file_path.is_file() {
|
.map_err(DownloadError::BadInput)?;
|
||||||
|
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
|
||||||
let mut source = io::BufReader::new(
|
let mut source = io::BufReader::new(
|
||||||
fs::OpenOptions::new()
|
fs::OpenOptions::new()
|
||||||
.read(true)
|
.read(true)
|
||||||
@@ -260,31 +252,31 @@ impl RemoteStorage for LocalFs {
|
|||||||
"Failed to open source file '{}' to use in the download",
|
"Failed to open source file '{}' to use in the download",
|
||||||
file_path.display()
|
file_path.display()
|
||||||
)
|
)
|
||||||
})?,
|
})
|
||||||
|
.map_err(DownloadError::Other)?,
|
||||||
);
|
);
|
||||||
source
|
source
|
||||||
.seek(io::SeekFrom::Start(start_inclusive))
|
.seek(io::SeekFrom::Start(start_inclusive))
|
||||||
.await
|
.await
|
||||||
.context("Failed to seek to the range start in a local storage file")?;
|
.context("Failed to seek to the range start in a local storage file")
|
||||||
match end_exclusive {
|
.map_err(DownloadError::Other)?;
|
||||||
Some(end_exclusive) => {
|
let metadata = self
|
||||||
io::copy(&mut source.take(end_exclusive - start_inclusive), to).await
|
.read_storage_metadata(&file_path)
|
||||||
}
|
.await
|
||||||
None => io::copy(&mut source, to).await,
|
.map_err(DownloadError::Other)?;
|
||||||
}
|
|
||||||
.with_context(|| {
|
|
||||||
format!(
|
|
||||||
"Failed to download file '{}' range from the local storage",
|
|
||||||
file_path.display()
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
self.read_storage_metadata(&file_path).await
|
Ok(match end_exclusive {
|
||||||
|
Some(end_exclusive) => Download {
|
||||||
|
metadata,
|
||||||
|
download_stream: Box::pin(source.take(end_exclusive - start_inclusive)),
|
||||||
|
},
|
||||||
|
None => Download {
|
||||||
|
metadata,
|
||||||
|
download_stream: Box::pin(source),
|
||||||
|
},
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
bail!(
|
Err(DownloadError::NotFound)
|
||||||
"File '{}' either does not exist or is not a file",
|
|
||||||
file_path.display()
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -352,6 +344,19 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()>
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn file_exists(file_path: &Path) -> anyhow::Result<bool> {
|
||||||
|
if file_path.exists() {
|
||||||
|
ensure!(
|
||||||
|
file_path.is_file(),
|
||||||
|
"file path '{}' is not a file",
|
||||||
|
file_path.display()
|
||||||
|
);
|
||||||
|
Ok(true)
|
||||||
|
} else {
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod pure_tests {
|
mod pure_tests {
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
@@ -518,6 +523,31 @@ mod fs_tests {
|
|||||||
use std::{collections::HashMap, io::Write};
|
use std::{collections::HashMap, io::Write};
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
async fn read_and_assert_remote_file_contents(
|
||||||
|
storage: &LocalFs,
|
||||||
|
#[allow(clippy::ptr_arg)]
|
||||||
|
// have to use &PathBuf due to `storage.local_path` parameter requirements
|
||||||
|
remote_storage_path: &PathBuf,
|
||||||
|
expected_metadata: Option<&StorageMetadata>,
|
||||||
|
) -> anyhow::Result<String> {
|
||||||
|
let mut download = storage
|
||||||
|
.download(remote_storage_path)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
|
||||||
|
ensure!(
|
||||||
|
download.metadata.as_ref() == expected_metadata,
|
||||||
|
"Unexpected metadata returned for the downloaded file"
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut contents = String::new();
|
||||||
|
download
|
||||||
|
.download_stream
|
||||||
|
.read_to_string(&mut contents)
|
||||||
|
.await
|
||||||
|
.context("Failed to read remote file contents into string")?;
|
||||||
|
Ok(contents)
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn upload_file() -> anyhow::Result<()> {
|
async fn upload_file() -> anyhow::Result<()> {
|
||||||
let workdir = tempdir()?.path().to_owned();
|
let workdir = tempdir()?.path().to_owned();
|
||||||
@@ -568,15 +598,7 @@ mod fs_tests {
|
|||||||
let upload_name = "upload_1";
|
let upload_name = "upload_1";
|
||||||
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
|
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
|
||||||
|
|
||||||
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
|
||||||
let metadata = storage.download(&upload_target, &mut content_bytes).await?;
|
|
||||||
assert!(
|
|
||||||
metadata.is_none(),
|
|
||||||
"No metadata should be returned for no metadata upload"
|
|
||||||
);
|
|
||||||
|
|
||||||
content_bytes.flush().await?;
|
|
||||||
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
dummy_contents(upload_name),
|
dummy_contents(upload_name),
|
||||||
contents,
|
contents,
|
||||||
@@ -584,13 +606,9 @@ mod fs_tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let non_existing_path = PathBuf::from("somewhere").join("else");
|
let non_existing_path = PathBuf::from("somewhere").join("else");
|
||||||
match storage.download(&non_existing_path, &mut io::sink()).await {
|
match storage.download(&non_existing_path).await {
|
||||||
Ok(_) => panic!("Should not allow downloading non-existing storage files"),
|
Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
|
||||||
Err(e) => {
|
other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
|
||||||
let error_string = e.to_string();
|
|
||||||
assert!(error_string.contains("does not exist"));
|
|
||||||
assert!(error_string.contains(&non_existing_path.display().to_string()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -603,58 +621,31 @@ mod fs_tests {
|
|||||||
let upload_name = "upload_1";
|
let upload_name = "upload_1";
|
||||||
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
|
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
|
||||||
|
|
||||||
let mut full_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
let full_range_download_contents =
|
||||||
let metadata = storage
|
read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
|
||||||
.download_byte_range(&upload_target, 0, None, &mut full_range_bytes)
|
|
||||||
.await?;
|
|
||||||
assert!(
|
|
||||||
metadata.is_none(),
|
|
||||||
"No metadata should be returned for no metadata upload"
|
|
||||||
);
|
|
||||||
full_range_bytes.flush().await?;
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
dummy_contents(upload_name),
|
dummy_contents(upload_name),
|
||||||
String::from_utf8(full_range_bytes.into_inner().into_inner())?,
|
full_range_download_contents,
|
||||||
"Download full range should return the whole upload"
|
"Download full range should return the whole upload"
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut zero_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
|
||||||
let same_byte = 1_000_000_000;
|
|
||||||
let metadata = storage
|
|
||||||
.download_byte_range(
|
|
||||||
&upload_target,
|
|
||||||
same_byte,
|
|
||||||
Some(same_byte + 1), // exclusive end
|
|
||||||
&mut zero_range_bytes,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
assert!(
|
|
||||||
metadata.is_none(),
|
|
||||||
"No metadata should be returned for no metadata upload"
|
|
||||||
);
|
|
||||||
zero_range_bytes.flush().await?;
|
|
||||||
assert!(
|
|
||||||
zero_range_bytes.into_inner().into_inner().is_empty(),
|
|
||||||
"Zero byte range should not download any part of the file"
|
|
||||||
);
|
|
||||||
|
|
||||||
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
|
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
|
||||||
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
|
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
|
||||||
|
|
||||||
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
let mut first_part_download = storage
|
||||||
let metadata = storage
|
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
|
||||||
.download_byte_range(
|
|
||||||
&upload_target,
|
|
||||||
0,
|
|
||||||
Some(first_part_local.len() as u64),
|
|
||||||
&mut first_part_remote,
|
|
||||||
)
|
|
||||||
.await?;
|
.await?;
|
||||||
assert!(
|
assert!(
|
||||||
metadata.is_none(),
|
first_part_download.metadata.is_none(),
|
||||||
"No metadata should be returned for no metadata upload"
|
"No metadata should be returned for no metadata upload"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||||
|
io::copy(
|
||||||
|
&mut first_part_download.download_stream,
|
||||||
|
&mut first_part_remote,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
first_part_remote.flush().await?;
|
first_part_remote.flush().await?;
|
||||||
let first_part_remote = first_part_remote.into_inner().into_inner();
|
let first_part_remote = first_part_remote.into_inner().into_inner();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@@ -663,20 +654,24 @@ mod fs_tests {
|
|||||||
"First part bytes should be returned when requested"
|
"First part bytes should be returned when requested"
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
let mut second_part_download = storage
|
||||||
let metadata = storage
|
|
||||||
.download_byte_range(
|
.download_byte_range(
|
||||||
&upload_target,
|
&upload_target,
|
||||||
first_part_local.len() as u64,
|
first_part_local.len() as u64,
|
||||||
Some((first_part_local.len() + second_part_local.len()) as u64),
|
Some((first_part_local.len() + second_part_local.len()) as u64),
|
||||||
&mut second_part_remote,
|
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
assert!(
|
assert!(
|
||||||
metadata.is_none(),
|
second_part_download.metadata.is_none(),
|
||||||
"No metadata should be returned for no metadata upload"
|
"No metadata should be returned for no metadata upload"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||||
|
io::copy(
|
||||||
|
&mut second_part_download.download_stream,
|
||||||
|
&mut second_part_remote,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
second_part_remote.flush().await?;
|
second_part_remote.flush().await?;
|
||||||
let second_part_remote = second_part_remote.into_inner().into_inner();
|
let second_part_remote = second_part_remote.into_inner().into_inner();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@@ -696,11 +691,30 @@ mod fs_tests {
|
|||||||
let upload_name = "upload_1";
|
let upload_name = "upload_1";
|
||||||
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
|
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
|
||||||
|
|
||||||
|
let start = 1_000_000_000;
|
||||||
|
let end = start + 1;
|
||||||
|
match storage
|
||||||
|
.download_byte_range(
|
||||||
|
&upload_target,
|
||||||
|
start,
|
||||||
|
Some(end), // exclusive end
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => panic!("Should not allow downloading wrong ranges"),
|
||||||
|
Err(e) => {
|
||||||
|
let error_string = e.to_string();
|
||||||
|
assert!(error_string.contains("zero bytes"));
|
||||||
|
assert!(error_string.contains(&start.to_string()));
|
||||||
|
assert!(error_string.contains(&end.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let start = 10000;
|
let start = 10000;
|
||||||
let end = 234;
|
let end = 234;
|
||||||
assert!(start > end, "Should test an incorrect range");
|
assert!(start > end, "Should test an incorrect range");
|
||||||
match storage
|
match storage
|
||||||
.download_byte_range(&upload_target, start, Some(end), &mut io::sink())
|
.download_byte_range(&upload_target, start, Some(end))
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(_) => panic!("Should not allow downloading wrong ranges"),
|
Ok(_) => panic!("Should not allow downloading wrong ranges"),
|
||||||
@@ -712,18 +726,6 @@ mod fs_tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let non_existing_path = PathBuf::from("somewhere").join("else");
|
|
||||||
match storage
|
|
||||||
.download_byte_range(&non_existing_path, 1, Some(3), &mut io::sink())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(_) => panic!("Should not allow downloading non-existing storage file ranges"),
|
|
||||||
Err(e) => {
|
|
||||||
let error_string = e.to_string();
|
|
||||||
assert!(error_string.contains("does not exist"));
|
|
||||||
assert!(error_string.contains(&non_existing_path.display().to_string()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -762,35 +764,26 @@ mod fs_tests {
|
|||||||
let upload_target =
|
let upload_target =
|
||||||
upload_dummy_file(&workdir, &storage, upload_name, Some(metadata.clone())).await?;
|
upload_dummy_file(&workdir, &storage, upload_name, Some(metadata.clone())).await?;
|
||||||
|
|
||||||
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
let full_range_download_contents =
|
||||||
let full_download_metadata = storage.download(&upload_target, &mut content_bytes).await?;
|
read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
|
||||||
|
|
||||||
content_bytes.flush().await?;
|
|
||||||
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
dummy_contents(upload_name),
|
dummy_contents(upload_name),
|
||||||
contents,
|
full_range_download_contents,
|
||||||
"We should upload and download the same contents"
|
"We should upload and download the same contents"
|
||||||
);
|
);
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
full_download_metadata.as_ref(),
|
|
||||||
Some(&metadata),
|
|
||||||
"We should get the same metadata back for full download"
|
|
||||||
);
|
|
||||||
|
|
||||||
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
|
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
|
||||||
let (first_part_local, _) = uploaded_bytes.split_at(3);
|
let (first_part_local, _) = uploaded_bytes.split_at(3);
|
||||||
|
|
||||||
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
let mut partial_download_with_metadata = storage
|
||||||
let partial_download_metadata = storage
|
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
|
||||||
.download_byte_range(
|
|
||||||
&upload_target,
|
|
||||||
0,
|
|
||||||
Some(first_part_local.len() as u64),
|
|
||||||
&mut first_part_remote,
|
|
||||||
)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||||
|
io::copy(
|
||||||
|
&mut partial_download_with_metadata.download_stream,
|
||||||
|
&mut first_part_remote,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
first_part_remote.flush().await?;
|
first_part_remote.flush().await?;
|
||||||
let first_part_remote = first_part_remote.into_inner().into_inner();
|
let first_part_remote = first_part_remote.into_inner().into_inner();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@@ -800,8 +793,8 @@ mod fs_tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
partial_download_metadata.as_ref(),
|
partial_download_with_metadata.metadata,
|
||||||
Some(&metadata),
|
Some(metadata),
|
||||||
"We should get the same metadata back for partial download"
|
"We should get the same metadata back for partial download"
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -843,7 +836,7 @@ mod fs_tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn dummy_contents(name: &str) -> String {
|
fn dummy_contents(name: &str) -> String {
|
||||||
format!("contents for {}", name)
|
format!("contents for {name}")
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<PathBuf>> {
|
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<PathBuf>> {
|
||||||
|
|||||||
@@ -9,17 +9,17 @@ use std::path::{Path, PathBuf};
|
|||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use rusoto_core::{
|
use rusoto_core::{
|
||||||
credential::{InstanceMetadataProvider, StaticProvider},
|
credential::{InstanceMetadataProvider, StaticProvider},
|
||||||
HttpClient, Region,
|
HttpClient, Region, RusotoError,
|
||||||
};
|
};
|
||||||
use rusoto_s3::{
|
use rusoto_s3::{
|
||||||
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client,
|
DeleteObjectRequest, GetObjectError, GetObjectRequest, ListObjectsV2Request, PutObjectRequest,
|
||||||
StreamingBody, S3,
|
S3Client, StreamingBody, S3,
|
||||||
};
|
};
|
||||||
use tokio::{io, sync::Semaphore};
|
use tokio::{io, sync::Semaphore};
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
use crate::{strip_path_prefix, RemoteStorage, S3Config};
|
use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config};
|
||||||
|
|
||||||
use super::StorageMetadata;
|
use super::StorageMetadata;
|
||||||
|
|
||||||
@@ -187,6 +187,39 @@ impl S3Bucket {
|
|||||||
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
|
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
|
||||||
|
let _guard = self
|
||||||
|
.concurrency_limiter
|
||||||
|
.acquire()
|
||||||
|
.await
|
||||||
|
.context("Concurrency limiter semaphore got closed during S3 download")
|
||||||
|
.map_err(DownloadError::Other)?;
|
||||||
|
|
||||||
|
metrics::inc_get_object();
|
||||||
|
|
||||||
|
match self.client.get_object(request).await {
|
||||||
|
Ok(object_output) => match object_output.body {
|
||||||
|
None => {
|
||||||
|
metrics::inc_get_object_fail();
|
||||||
|
Err(DownloadError::Other(anyhow::anyhow!(
|
||||||
|
"Got no body for the S3 object given"
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
Some(body) => Ok(Download {
|
||||||
|
metadata: object_output.metadata.map(StorageMetadata),
|
||||||
|
download_stream: Box::pin(io::BufReader::new(body.into_async_read())),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
Err(RusotoError::Service(GetObjectError::NoSuchKey(_))) => Err(DownloadError::NotFound),
|
||||||
|
Err(e) => {
|
||||||
|
metrics::inc_get_object_fail();
|
||||||
|
Err(DownloadError::Other(anyhow::anyhow!(
|
||||||
|
"Failed to download S3 object: {e}"
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
@@ -283,38 +316,13 @@ impl RemoteStorage for S3Bucket {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn download(
|
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
|
||||||
&self,
|
self.download_object(GetObjectRequest {
|
||||||
from: &Self::RemoteObjectId,
|
bucket: self.bucket_name.clone(),
|
||||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
key: from.key().to_owned(),
|
||||||
) -> anyhow::Result<Option<StorageMetadata>> {
|
..GetObjectRequest::default()
|
||||||
let _guard = self
|
})
|
||||||
.concurrency_limiter
|
.await
|
||||||
.acquire()
|
|
||||||
.await
|
|
||||||
.context("Concurrency limiter semaphore got closed during S3 download")?;
|
|
||||||
|
|
||||||
metrics::inc_get_object();
|
|
||||||
|
|
||||||
let object_output = self
|
|
||||||
.client
|
|
||||||
.get_object(GetObjectRequest {
|
|
||||||
bucket: self.bucket_name.clone(),
|
|
||||||
key: from.key().to_owned(),
|
|
||||||
..GetObjectRequest::default()
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
metrics::inc_get_object_fail();
|
|
||||||
e
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if let Some(body) = object_output.body {
|
|
||||||
let mut from = io::BufReader::new(body.into_async_read());
|
|
||||||
io::copy(&mut from, to).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(object_output.metadata.map(StorageMetadata))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn download_byte_range(
|
async fn download_byte_range(
|
||||||
@@ -322,8 +330,7 @@ impl RemoteStorage for S3Bucket {
|
|||||||
from: &Self::RemoteObjectId,
|
from: &Self::RemoteObjectId,
|
||||||
start_inclusive: u64,
|
start_inclusive: u64,
|
||||||
end_exclusive: Option<u64>,
|
end_exclusive: Option<u64>,
|
||||||
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
|
) -> Result<Download, DownloadError> {
|
||||||
) -> anyhow::Result<Option<StorageMetadata>> {
|
|
||||||
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
|
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
|
||||||
// and needs both ends to be exclusive
|
// and needs both ends to be exclusive
|
||||||
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
|
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
|
||||||
@@ -331,34 +338,14 @@ impl RemoteStorage for S3Bucket {
|
|||||||
Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive),
|
Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive),
|
||||||
None => format!("bytes={}-", start_inclusive),
|
None => format!("bytes={}-", start_inclusive),
|
||||||
});
|
});
|
||||||
let _guard = self
|
|
||||||
.concurrency_limiter
|
|
||||||
.acquire()
|
|
||||||
.await
|
|
||||||
.context("Concurrency limiter semaphore got closed during S3 range download")?;
|
|
||||||
|
|
||||||
metrics::inc_get_object();
|
self.download_object(GetObjectRequest {
|
||||||
|
bucket: self.bucket_name.clone(),
|
||||||
let object_output = self
|
key: from.key().to_owned(),
|
||||||
.client
|
range,
|
||||||
.get_object(GetObjectRequest {
|
..GetObjectRequest::default()
|
||||||
bucket: self.bucket_name.clone(),
|
})
|
||||||
key: from.key().to_owned(),
|
.await
|
||||||
range,
|
|
||||||
..GetObjectRequest::default()
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
metrics::inc_get_object_fail();
|
|
||||||
e
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if let Some(body) = object_output.body {
|
|
||||||
let mut from = io::BufReader::new(body.into_async_read());
|
|
||||||
io::copy(&mut from, to).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(object_output.metadata.map(StorageMetadata))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> {
|
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> {
|
||||||
|
|||||||
@@ -232,23 +232,32 @@ impl Repository for LayeredRepository {
|
|||||||
|
|
||||||
fn create_empty_timeline(
|
fn create_empty_timeline(
|
||||||
&self,
|
&self,
|
||||||
timelineid: ZTimelineId,
|
timeline_id: ZTimelineId,
|
||||||
initdb_lsn: Lsn,
|
initdb_lsn: Lsn,
|
||||||
) -> Result<Arc<LayeredTimeline>> {
|
) -> Result<Arc<LayeredTimeline>> {
|
||||||
let mut timelines = self.timelines.lock().unwrap();
|
let mut timelines = self.timelines.lock().unwrap();
|
||||||
|
let vacant_timeline_entry = match timelines.entry(timeline_id) {
|
||||||
|
Entry::Occupied(_) => bail!("Timeline already exists"),
|
||||||
|
Entry::Vacant(vacant_entry) => vacant_entry,
|
||||||
|
};
|
||||||
|
|
||||||
|
let timeline_path = self.conf.timeline_path(&timeline_id, &self.tenant_id);
|
||||||
|
if timeline_path.exists() {
|
||||||
|
bail!("Timeline directory already exists, but timeline is missing in repository map. This is a bug.")
|
||||||
|
}
|
||||||
|
|
||||||
// Create the timeline directory, and write initial metadata to file.
|
// Create the timeline directory, and write initial metadata to file.
|
||||||
crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenant_id))?;
|
crashsafe_dir::create_dir_all(timeline_path)?;
|
||||||
|
|
||||||
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn);
|
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn);
|
||||||
Self::save_metadata(self.conf, timelineid, self.tenant_id, &metadata, true)?;
|
Self::save_metadata(self.conf, timeline_id, self.tenant_id, &metadata, true)?;
|
||||||
|
|
||||||
let timeline = LayeredTimeline::new(
|
let timeline = LayeredTimeline::new(
|
||||||
self.conf,
|
self.conf,
|
||||||
Arc::clone(&self.tenant_conf),
|
Arc::clone(&self.tenant_conf),
|
||||||
metadata,
|
metadata,
|
||||||
None,
|
None,
|
||||||
timelineid,
|
timeline_id,
|
||||||
self.tenant_id,
|
self.tenant_id,
|
||||||
Arc::clone(&self.walredo_mgr),
|
Arc::clone(&self.walredo_mgr),
|
||||||
self.upload_layers,
|
self.upload_layers,
|
||||||
@@ -257,12 +266,7 @@ impl Repository for LayeredRepository {
|
|||||||
|
|
||||||
// Insert if not exists
|
// Insert if not exists
|
||||||
let timeline = Arc::new(timeline);
|
let timeline = Arc::new(timeline);
|
||||||
match timelines.entry(timelineid) {
|
vacant_timeline_entry.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)));
|
||||||
Entry::Occupied(_) => bail!("Timeline already exists"),
|
|
||||||
Entry::Vacant(vacant) => {
|
|
||||||
vacant.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(timeline)
|
Ok(timeline)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -225,7 +225,7 @@ pub trait Repository: Send + Sync {
|
|||||||
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
|
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
|
||||||
fn create_empty_timeline(
|
fn create_empty_timeline(
|
||||||
&self,
|
&self,
|
||||||
timelineid: ZTimelineId,
|
timeline_id: ZTimelineId,
|
||||||
initdb_lsn: Lsn,
|
initdb_lsn: Lsn,
|
||||||
) -> Result<Arc<Self::Timeline>>;
|
) -> Result<Arc<Self::Timeline>>;
|
||||||
|
|
||||||
@@ -636,6 +636,19 @@ mod tests {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn no_duplicate_timelines() -> Result<()> {
|
||||||
|
let repo = RepoHarness::create("no_duplicate_timelines")?.load();
|
||||||
|
let _ = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||||
|
|
||||||
|
match repo.create_empty_timeline(TIMELINE_ID, Lsn(0)) {
|
||||||
|
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||||
|
Err(e) => assert_eq!(e.to_string(), "Timeline already exists"),
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Convenience function to create a page image with given string as the only content
|
/// Convenience function to create a page image with given string as the only content
|
||||||
pub fn test_value(s: &str) -> Value {
|
pub fn test_value(s: &str) -> Value {
|
||||||
let mut buf = BytesMut::new();
|
let mut buf = BytesMut::new();
|
||||||
|
|||||||
@@ -44,13 +44,23 @@ where
|
|||||||
index_part_path.display()
|
index_part_path.display()
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
let mut index_part_download =
|
||||||
|
storage
|
||||||
|
.download(&part_storage_path)
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!("Failed to open download stream for for storage path {part_storage_path:?}")
|
||||||
|
})?;
|
||||||
let mut index_part_bytes = Vec::new();
|
let mut index_part_bytes = Vec::new();
|
||||||
storage
|
io::copy(
|
||||||
.download(&part_storage_path, &mut index_part_bytes)
|
&mut index_part_download.download_stream,
|
||||||
.await
|
&mut index_part_bytes,
|
||||||
.with_context(|| {
|
)
|
||||||
format!("Failed to download an index part from storage path {part_storage_path:?}")
|
.await
|
||||||
})?;
|
.with_context(|| {
|
||||||
|
format!("Failed to download an index part from storage path {part_storage_path:?}")
|
||||||
|
})?;
|
||||||
|
|
||||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| {
|
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| {
|
||||||
format!("Failed to deserialize index part file from storage path '{part_storage_path:?}'")
|
format!("Failed to deserialize index part file from storage path '{part_storage_path:?}'")
|
||||||
@@ -162,15 +172,19 @@ where
|
|||||||
temp_file_path.display()
|
temp_file_path.display()
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
let mut download = storage
|
||||||
storage
|
.download(&layer_storage_path)
|
||||||
.download(&layer_storage_path, &mut destination_file)
|
|
||||||
.await
|
.await
|
||||||
.with_context(|| {
|
.with_context(|| {
|
||||||
format!(
|
format!(
|
||||||
"Failed to download a layer from storage path '{layer_storage_path:?}'"
|
"Failed to open a download stream for layer with remote storage path '{layer_storage_path:?}'"
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"Failed to download layer with remote storage path '{layer_storage_path:?}' into file '{}'", temp_file_path.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
|
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
|
||||||
// A file will not be closed immediately when it goes out of scope if there are any IO operations
|
// A file will not be closed immediately when it goes out of scope if there are any IO operations
|
||||||
|
|||||||
@@ -2,18 +2,16 @@ use anyhow::{Context, Result};
|
|||||||
use etcd_broker::subscription_key::{
|
use etcd_broker::subscription_key::{
|
||||||
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
|
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
|
||||||
};
|
};
|
||||||
use tokio::io::AsyncRead;
|
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use postgres_ffi::xlog_utils::{
|
use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI};
|
||||||
XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, MAX_SEND_SIZE, PG_TLI,
|
|
||||||
};
|
|
||||||
use remote_storage::{GenericRemoteStorage, RemoteStorage};
|
use remote_storage::{GenericRemoteStorage, RemoteStorage};
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use tokio::runtime::Builder;
|
use tokio::runtime::Builder;
|
||||||
@@ -452,45 +450,41 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
|
|||||||
pub async fn read_object(
|
pub async fn read_object(
|
||||||
file_path: PathBuf,
|
file_path: PathBuf,
|
||||||
offset: u64,
|
offset: u64,
|
||||||
) -> (impl AsyncRead, JoinHandle<Result<()>>) {
|
) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead>>> {
|
||||||
let storage = REMOTE_STORAGE.get().expect("failed to get remote storage");
|
let download = match REMOTE_STORAGE
|
||||||
|
.get()
|
||||||
|
.context("Failed to get remote storage")?
|
||||||
|
.as_ref()
|
||||||
|
.context("No remote storage configured")?
|
||||||
|
{
|
||||||
|
GenericRemoteStorage::Local(local_storage) => {
|
||||||
|
let source = local_storage.remote_object_id(&file_path)?;
|
||||||
|
|
||||||
let (mut pipe_writer, pipe_reader) = tokio::io::duplex(MAX_SEND_SIZE);
|
info!(
|
||||||
|
"local download about to start from {} at offset {}",
|
||||||
let copy_result = tokio::spawn(async move {
|
source.display(),
|
||||||
let res = match storage.as_ref().unwrap() {
|
offset
|
||||||
GenericRemoteStorage::Local(local_storage) => {
|
);
|
||||||
let source = local_storage.remote_object_id(&file_path)?;
|
local_storage
|
||||||
|
.download_byte_range(&source, offset, None)
|
||||||
info!(
|
.await
|
||||||
"local download about to start from {} at offset {}",
|
|
||||||
source.display(),
|
|
||||||
offset
|
|
||||||
);
|
|
||||||
local_storage
|
|
||||||
.download_byte_range(&source, offset, None, &mut pipe_writer)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
GenericRemoteStorage::S3(s3_storage) => {
|
|
||||||
let s3key = s3_storage.remote_object_id(&file_path)?;
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"S3 download about to start from {:?} at offset {}",
|
|
||||||
s3key, offset
|
|
||||||
);
|
|
||||||
s3_storage
|
|
||||||
.download_byte_range(&s3key, offset, None, &mut pipe_writer)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = res {
|
|
||||||
error!("failed to download WAL segment from remote storage: {}", e);
|
|
||||||
Err(e)
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
});
|
GenericRemoteStorage::S3(s3_storage) => {
|
||||||
|
let s3key = s3_storage.remote_object_id(&file_path)?;
|
||||||
|
|
||||||
(pipe_reader, copy_result)
|
info!(
|
||||||
|
"S3 download about to start from {:?} at offset {}",
|
||||||
|
s3key, offset
|
||||||
|
);
|
||||||
|
s3_storage.download_byte_range(&s3key, offset, None).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"Failed to open WAL segment download stream for local storage path {}",
|
||||||
|
file_path.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(download.download_stream)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -604,8 +604,7 @@ impl WalReader {
|
|||||||
|
|
||||||
// Try to open remote file, if remote reads are enabled
|
// Try to open remote file, if remote reads are enabled
|
||||||
if self.enable_remote_read {
|
if self.enable_remote_read {
|
||||||
let (reader, _) = read_object(wal_file_path, xlogoff as u64).await;
|
return read_object(wal_file_path, xlogoff as u64).await;
|
||||||
return Ok(Box::pin(reader));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bail!("WAL segment is not found")
|
bail!("WAL segment is not found")
|
||||||
|
|||||||
438
scripts/add_missing_rels.py
Normal file
438
scripts/add_missing_rels.py
Normal file
@@ -0,0 +1,438 @@
|
|||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
from pathlib import Path
|
||||||
|
import tempfile
|
||||||
|
from contextlib import closing
|
||||||
|
import psycopg2
|
||||||
|
import subprocess
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
### utils copied from test fixtures
|
||||||
|
from typing import Any, List
|
||||||
|
from psycopg2.extensions import connection as PgConnection
|
||||||
|
import asyncpg
|
||||||
|
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple
|
||||||
|
|
||||||
|
Env = Dict[str, str]
|
||||||
|
|
||||||
|
_global_counter = 0
|
||||||
|
|
||||||
|
|
||||||
|
def global_counter() -> int:
|
||||||
|
""" A really dumb global counter.
|
||||||
|
|
||||||
|
This is useful for giving output files a unique number, so if we run the
|
||||||
|
same command multiple times we can keep their output separate.
|
||||||
|
"""
|
||||||
|
global _global_counter
|
||||||
|
_global_counter += 1
|
||||||
|
return _global_counter
|
||||||
|
|
||||||
|
|
||||||
|
def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
|
||||||
|
""" Run a process and capture its output
|
||||||
|
|
||||||
|
Output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr"
|
||||||
|
where "cmd" is the name of the program and NNN is an incrementing
|
||||||
|
counter.
|
||||||
|
|
||||||
|
If those files already exist, we will overwrite them.
|
||||||
|
Returns basepath for files with captured output.
|
||||||
|
"""
|
||||||
|
assert type(cmd) is list
|
||||||
|
base = os.path.basename(cmd[0]) + '_{}'.format(global_counter())
|
||||||
|
basepath = os.path.join(capture_dir, base)
|
||||||
|
stdout_filename = basepath + '.stdout'
|
||||||
|
stderr_filename = basepath + '.stderr'
|
||||||
|
|
||||||
|
with open(stdout_filename, 'w') as stdout_f:
|
||||||
|
with open(stderr_filename, 'w') as stderr_f:
|
||||||
|
print('(capturing output to "{}.stdout")'.format(base))
|
||||||
|
subprocess.run(cmd, **kwargs, stdout=stdout_f, stderr=stderr_f)
|
||||||
|
|
||||||
|
return basepath
|
||||||
|
|
||||||
|
|
||||||
|
class PgBin:
|
||||||
|
""" A helper class for executing postgres binaries """
|
||||||
|
def __init__(self, log_dir: Path, pg_distrib_dir):
|
||||||
|
self.log_dir = log_dir
|
||||||
|
self.pg_bin_path = os.path.join(str(pg_distrib_dir), 'bin')
|
||||||
|
self.env = os.environ.copy()
|
||||||
|
self.env['LD_LIBRARY_PATH'] = os.path.join(str(pg_distrib_dir), 'lib')
|
||||||
|
|
||||||
|
def _fixpath(self, command: List[str]):
|
||||||
|
if '/' not in command[0]:
|
||||||
|
command[0] = os.path.join(self.pg_bin_path, command[0])
|
||||||
|
|
||||||
|
def _build_env(self, env_add: Optional[Env]) -> Env:
|
||||||
|
if env_add is None:
|
||||||
|
return self.env
|
||||||
|
env = self.env.copy()
|
||||||
|
env.update(env_add)
|
||||||
|
return env
|
||||||
|
|
||||||
|
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None):
|
||||||
|
"""
|
||||||
|
Run one of the postgres binaries.
|
||||||
|
|
||||||
|
The command should be in list form, e.g. ['pgbench', '-p', '55432']
|
||||||
|
|
||||||
|
All the necessary environment variables will be set.
|
||||||
|
|
||||||
|
If the first argument (the command name) doesn't include a path (no '/'
|
||||||
|
characters present), then it will be edited to include the correct path.
|
||||||
|
|
||||||
|
If you want stdout/stderr captured to files, use `run_capture` instead.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self._fixpath(command)
|
||||||
|
print('Running command "{}"'.format(' '.join(command)))
|
||||||
|
env = self._build_env(env)
|
||||||
|
subprocess.run(command, env=env, cwd=cwd, check=True)
|
||||||
|
|
||||||
|
def run_capture(self,
|
||||||
|
command: List[str],
|
||||||
|
env: Optional[Env] = None,
|
||||||
|
cwd: Optional[str] = None,
|
||||||
|
**kwargs: Any) -> str:
|
||||||
|
"""
|
||||||
|
Run one of the postgres binaries, with stderr and stdout redirected to a file.
|
||||||
|
|
||||||
|
This is just like `run`, but for chatty programs. Returns basepath for files
|
||||||
|
with captured output.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self._fixpath(command)
|
||||||
|
print('Running command "{}"'.format(' '.join(command)))
|
||||||
|
env = self._build_env(env)
|
||||||
|
return subprocess_capture(str(self.log_dir),
|
||||||
|
command,
|
||||||
|
env=env,
|
||||||
|
cwd=cwd,
|
||||||
|
check=True,
|
||||||
|
**kwargs)
|
||||||
|
|
||||||
|
class PgProtocol:
|
||||||
|
""" Reusable connection logic """
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self.default_options = kwargs
|
||||||
|
|
||||||
|
def connstr(self, **kwargs) -> str:
|
||||||
|
"""
|
||||||
|
Build a libpq connection string for the Postgres instance.
|
||||||
|
"""
|
||||||
|
return str(make_dsn(**self.conn_options(**kwargs)))
|
||||||
|
|
||||||
|
def conn_options(self, **kwargs):
|
||||||
|
conn_options = self.default_options.copy()
|
||||||
|
if 'dsn' in kwargs:
|
||||||
|
conn_options.update(parse_dsn(kwargs['dsn']))
|
||||||
|
conn_options.update(kwargs)
|
||||||
|
|
||||||
|
# Individual statement timeout in seconds. 2 minutes should be
|
||||||
|
# enough for our tests, but if you need a longer, you can
|
||||||
|
# change it by calling "SET statement_timeout" after
|
||||||
|
# connecting.
|
||||||
|
if 'options' in conn_options:
|
||||||
|
conn_options['options'] = f"-cstatement_timeout=120s " + conn_options['options']
|
||||||
|
else:
|
||||||
|
conn_options['options'] = "-cstatement_timeout=120s"
|
||||||
|
return conn_options
|
||||||
|
|
||||||
|
# autocommit=True here by default because that's what we need most of the time
|
||||||
|
def connect(self, autocommit=True, **kwargs) -> PgConnection:
|
||||||
|
"""
|
||||||
|
Connect to the node.
|
||||||
|
Returns psycopg2's connection object.
|
||||||
|
This method passes all extra params to connstr.
|
||||||
|
"""
|
||||||
|
conn = psycopg2.connect(**self.conn_options(**kwargs))
|
||||||
|
|
||||||
|
# WARNING: this setting affects *all* tests!
|
||||||
|
conn.autocommit = autocommit
|
||||||
|
return conn
|
||||||
|
|
||||||
|
async def connect_async(self, **kwargs) -> asyncpg.Connection:
|
||||||
|
"""
|
||||||
|
Connect to the node from async python.
|
||||||
|
Returns asyncpg's connection object.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# asyncpg takes slightly different options than psycopg2. Try
|
||||||
|
# to convert the defaults from the psycopg2 format.
|
||||||
|
|
||||||
|
# The psycopg2 option 'dbname' is called 'database' is asyncpg
|
||||||
|
conn_options = self.conn_options(**kwargs)
|
||||||
|
if 'dbname' in conn_options:
|
||||||
|
conn_options['database'] = conn_options.pop('dbname')
|
||||||
|
|
||||||
|
# Convert options='-c<key>=<val>' to server_settings
|
||||||
|
if 'options' in conn_options:
|
||||||
|
options = conn_options.pop('options')
|
||||||
|
for match in re.finditer('-c(\w*)=(\w*)', options):
|
||||||
|
key = match.group(1)
|
||||||
|
val = match.group(2)
|
||||||
|
if 'server_options' in conn_options:
|
||||||
|
conn_options['server_settings'].update({key: val})
|
||||||
|
else:
|
||||||
|
conn_options['server_settings'] = {key: val}
|
||||||
|
return await asyncpg.connect(**conn_options)
|
||||||
|
|
||||||
|
def safe_psql(self, query: str, **kwargs: Any) -> List[Tuple[Any, ...]]:
|
||||||
|
"""
|
||||||
|
Execute query against the node and return all rows.
|
||||||
|
This method passes all extra params to connstr.
|
||||||
|
"""
|
||||||
|
return self.safe_psql_many([query], **kwargs)[0]
|
||||||
|
|
||||||
|
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
|
||||||
|
"""
|
||||||
|
Execute queries against the node and return all rows.
|
||||||
|
This method passes all extra params to connstr.
|
||||||
|
"""
|
||||||
|
result: List[List[Any]] = []
|
||||||
|
with closing(self.connect(**kwargs)) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
for query in queries:
|
||||||
|
print(f"Executing query: {query}")
|
||||||
|
cur.execute(query)
|
||||||
|
|
||||||
|
if cur.description is None:
|
||||||
|
result.append([]) # query didn't return data
|
||||||
|
else:
|
||||||
|
result.append(cast(List[Any], cur.fetchall()))
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
class VanillaPostgres(PgProtocol):
|
||||||
|
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
|
||||||
|
super().__init__(host='localhost', port=port, dbname='postgres')
|
||||||
|
self.pgdatadir = pgdatadir
|
||||||
|
self.pg_bin = pg_bin
|
||||||
|
self.running = False
|
||||||
|
if init:
|
||||||
|
self.pg_bin.run_capture(['initdb', '-D', str(pgdatadir)])
|
||||||
|
self.configure([f"port = {port}\n"])
|
||||||
|
|
||||||
|
def configure(self, options: List[str]):
|
||||||
|
"""Append lines into postgresql.conf file."""
|
||||||
|
assert not self.running
|
||||||
|
with open(os.path.join(self.pgdatadir, 'postgresql.conf'), 'a') as conf_file:
|
||||||
|
conf_file.write("\n".join(options))
|
||||||
|
|
||||||
|
def start(self, log_path: Optional[str] = None):
|
||||||
|
assert not self.running
|
||||||
|
self.running = True
|
||||||
|
|
||||||
|
if log_path is None:
|
||||||
|
log_path = os.path.join(self.pgdatadir, "pg.log")
|
||||||
|
|
||||||
|
self.pg_bin.run_capture(
|
||||||
|
['pg_ctl', '-w', '-D', str(self.pgdatadir), '-l', log_path, 'start'])
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
assert self.running
|
||||||
|
self.running = False
|
||||||
|
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', str(self.pgdatadir), 'stop'])
|
||||||
|
|
||||||
|
def get_subdir_size(self, subdir) -> int:
|
||||||
|
"""Return size of pgdatadir subdirectory in bytes."""
|
||||||
|
return get_dir_size(os.path.join(self.pgdatadir, subdir))
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb):
|
||||||
|
if self.running:
|
||||||
|
self.stop()
|
||||||
|
|
||||||
|
|
||||||
|
### actual code
|
||||||
|
|
||||||
|
|
||||||
|
def get_rel_paths(log_dir, pg_bin, base_tar):
|
||||||
|
"""Yeild list of relation paths"""
|
||||||
|
with tempfile.TemporaryDirectory() as restored_dir:
|
||||||
|
# Unpack the base tar
|
||||||
|
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
||||||
|
|
||||||
|
port = "55439" # Probably free
|
||||||
|
with VanillaPostgres(restored_dir, pg_bin, port, init=False) as vanilla_pg:
|
||||||
|
vanilla_pg.configure([f"port={port}"])
|
||||||
|
vanilla_pg.start()
|
||||||
|
|
||||||
|
# Create database based on template0 because we can't connect to template0
|
||||||
|
query = "create database template0copy template template0"
|
||||||
|
vanilla_pg.safe_psql(query, user="cloud_admin")
|
||||||
|
vanilla_pg.safe_psql("CHECKPOINT", user="cloud_admin")
|
||||||
|
|
||||||
|
# Get all databases
|
||||||
|
query = "select oid, datname from pg_database"
|
||||||
|
oid_dbname_pairs = vanilla_pg.safe_psql(query, user="cloud_admin")
|
||||||
|
template0_oid = [
|
||||||
|
oid
|
||||||
|
for (oid, database) in oid_dbname_pairs
|
||||||
|
if database == "template0"
|
||||||
|
][0]
|
||||||
|
|
||||||
|
# Get rel paths for each database
|
||||||
|
for oid, database in oid_dbname_pairs:
|
||||||
|
if database == "template0":
|
||||||
|
# We can't connect to template0
|
||||||
|
continue
|
||||||
|
|
||||||
|
query = "select relname, pg_relation_filepath(oid) from pg_class"
|
||||||
|
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
|
||||||
|
for relname, filepath in result:
|
||||||
|
if filepath is not None:
|
||||||
|
|
||||||
|
if database == "template0copy":
|
||||||
|
# Add all template0copy paths to template0
|
||||||
|
prefix = f"base/{oid}/"
|
||||||
|
if filepath.startswith(prefix):
|
||||||
|
suffix = filepath[len(prefix):]
|
||||||
|
yield f"base/{template0_oid}/{suffix}"
|
||||||
|
elif filepath.startswith("global"):
|
||||||
|
print(f"skipping {database} global file {filepath}")
|
||||||
|
else:
|
||||||
|
raise AssertionError
|
||||||
|
else:
|
||||||
|
yield filepath
|
||||||
|
|
||||||
|
|
||||||
|
def pack_base(log_dir, restored_dir, output_tar):
|
||||||
|
tmp_tar_name = "tmp.tar"
|
||||||
|
tmp_tar_path = os.path.join(restored_dir, tmp_tar_name)
|
||||||
|
cmd = ["tar", "-cf", tmp_tar_name] + os.listdir(restored_dir)
|
||||||
|
subprocess_capture(log_dir, cmd, cwd=restored_dir)
|
||||||
|
shutil.move(tmp_tar_path, output_tar)
|
||||||
|
|
||||||
|
|
||||||
|
def get_files_in_tar(log_dir, tar):
|
||||||
|
with tempfile.TemporaryDirectory() as restored_dir:
|
||||||
|
# Unpack the base tar
|
||||||
|
subprocess_capture(log_dir, ["tar", "-xf", tar, "-C", restored_dir])
|
||||||
|
|
||||||
|
# Find empty files
|
||||||
|
empty_files = []
|
||||||
|
for root, dirs, files in os.walk(restored_dir):
|
||||||
|
for name in files:
|
||||||
|
file_path = os.path.join(root, name)
|
||||||
|
yield file_path[len(restored_dir) + 1:]
|
||||||
|
|
||||||
|
|
||||||
|
def corrupt(log_dir, base_tar, output_tar):
|
||||||
|
"""Remove all empty files and repackage. Return paths of files removed."""
|
||||||
|
with tempfile.TemporaryDirectory() as restored_dir:
|
||||||
|
# Unpack the base tar
|
||||||
|
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
||||||
|
|
||||||
|
# Find empty files
|
||||||
|
empty_files = []
|
||||||
|
for root, dirs, files in os.walk(restored_dir):
|
||||||
|
for name in files:
|
||||||
|
file_path = os.path.join(root, name)
|
||||||
|
file_size = os.path.getsize(file_path)
|
||||||
|
if file_size == 0:
|
||||||
|
empty_files.append(file_path)
|
||||||
|
|
||||||
|
# Delete empty files (just to see if they get recreated)
|
||||||
|
for empty_file in empty_files:
|
||||||
|
os.remove(empty_file)
|
||||||
|
|
||||||
|
# Repackage
|
||||||
|
pack_base(log_dir, restored_dir, output_tar)
|
||||||
|
|
||||||
|
# Return relative paths
|
||||||
|
return {
|
||||||
|
empty_file[len(restored_dir) + 1:]
|
||||||
|
for empty_file in empty_files
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def touch_missing_rels(log_dir, corrupt_tar, output_tar, paths):
|
||||||
|
with tempfile.TemporaryDirectory() as restored_dir:
|
||||||
|
# Unpack the base tar
|
||||||
|
subprocess_capture(log_dir, ["tar", "-xf", corrupt_tar, "-C", restored_dir])
|
||||||
|
|
||||||
|
# Touch files that don't exist
|
||||||
|
for path in paths:
|
||||||
|
absolute_path = os.path.join(restored_dir, path)
|
||||||
|
exists = os.path.exists(absolute_path)
|
||||||
|
if not exists:
|
||||||
|
print("File {absolute_path} didn't exist. Creating..")
|
||||||
|
Path(absolute_path).touch()
|
||||||
|
|
||||||
|
# Repackage
|
||||||
|
pack_base(log_dir, restored_dir, output_tar)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO this test is not currently called. It needs any ordinary base.tar path as input
|
||||||
|
def test_add_missing_rels(base_tar):
|
||||||
|
output_tar = base_tar + ".fixed"
|
||||||
|
|
||||||
|
# Create new base tar with missing empty files
|
||||||
|
corrupt_tar = os.path.join(test_output_dir, "psql_2-corrupted.stdout")
|
||||||
|
deleted_files = corrupt(test_output_dir, base_tar, corrupt_tar)
|
||||||
|
assert len(set(get_files_in_tar(test_output_dir, base_tar)) -
|
||||||
|
set(get_files_in_tar(test_output_dir, corrupt_tar))) > 0
|
||||||
|
|
||||||
|
# Reconstruct paths from the corrupted tar, assert it covers everything important
|
||||||
|
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, corrupt_tar))
|
||||||
|
paths_missed = deleted_files - reconstructed_paths
|
||||||
|
assert paths_missed.issubset({
|
||||||
|
"postgresql.auto.conf",
|
||||||
|
"pg_ident.conf",
|
||||||
|
})
|
||||||
|
|
||||||
|
# Recreate the correct tar by touching files, compare with original tar
|
||||||
|
touch_missing_rels(test_output_dir, corrupt_tar, output_tar, reconstructed_paths)
|
||||||
|
paths_missed = (set(get_files_in_tar(test_output_dir, base_tar)) -
|
||||||
|
set(get_files_in_tar(test_output_dir, output_tar)))
|
||||||
|
assert paths_missed.issubset({
|
||||||
|
"postgresql.auto.conf",
|
||||||
|
"pg_ident.conf",
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
# Example command:
|
||||||
|
# poetry run python scripts/add_missing_rels.py \
|
||||||
|
# --base-tar /home/bojan/src/neondatabase/neon/test_output/test_import_from_pageserver/psql_2.stdout \
|
||||||
|
# --output-tar output-base.tar \
|
||||||
|
# --log-dir /home/bojan/tmp
|
||||||
|
# --pg-distrib-dir /home/bojan/src/neondatabase/neon/tmp_install/
|
||||||
|
if __name__ == '__main__':
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument(
|
||||||
|
'--base-tar',
|
||||||
|
dest='base_tar',
|
||||||
|
required=True,
|
||||||
|
help='base.tar file to add missing rels to (file will not be modified)',
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--output-tar',
|
||||||
|
dest='output_tar',
|
||||||
|
required=True,
|
||||||
|
help='path and name for the output base.tar file',
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--log-dir',
|
||||||
|
dest='log_dir',
|
||||||
|
required=True,
|
||||||
|
help='directory to save log files in',
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--pg-distrib-dir',
|
||||||
|
dest='pg_distrib_dir',
|
||||||
|
required=True,
|
||||||
|
help='directory where postgres is installed',
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
base_tar = args.base_tar
|
||||||
|
output_tar = args.output_tar
|
||||||
|
log_dir = args.log_dir
|
||||||
|
pg_bin = PgBin(log_dir, args.pg_distrib_dir)
|
||||||
|
|
||||||
|
reconstructed_paths = set(get_rel_paths(log_dir, pg_bin, base_tar))
|
||||||
|
touch_missing_rels(log_dir, base_tar, output_tar, reconstructed_paths)
|
||||||
@@ -72,6 +72,14 @@ class NeonPageserverHttpClient(requests.Session):
|
|||||||
return res_json
|
return res_json
|
||||||
|
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import os
|
||||||
|
def add_missing_empty_rels(base_tar, output_tar):
|
||||||
|
os.environ['INPUT_BASE_TAR'] = base_tar
|
||||||
|
os.environ['OUTPUT_BASE_TAR'] = output_tar
|
||||||
|
pytest.main(["-s", "-k", "test_main_hack"])
|
||||||
|
|
||||||
|
|
||||||
def main(args: argparse.Namespace):
|
def main(args: argparse.Namespace):
|
||||||
old_pageserver_host = args.old_pageserver_host
|
old_pageserver_host = args.old_pageserver_host
|
||||||
new_pageserver_host = args.new_pageserver_host
|
new_pageserver_host = args.new_pageserver_host
|
||||||
@@ -102,7 +110,7 @@ def main(args: argparse.Namespace):
|
|||||||
if args.only_import is False:
|
if args.only_import is False:
|
||||||
query = f"fullbackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']}"
|
query = f"fullbackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']}"
|
||||||
|
|
||||||
cmd = ["psql", "--no-psqlrc", old_pageserver_connstr, "-c", query]
|
cmd = [args.psql_path, "--no-psqlrc", old_pageserver_connstr, "-c", query]
|
||||||
print(f"Running: {cmd}")
|
print(f"Running: {cmd}")
|
||||||
|
|
||||||
tar_filename = path.join(basepath,
|
tar_filename = path.join(basepath,
|
||||||
@@ -115,6 +123,8 @@ def main(args: argparse.Namespace):
|
|||||||
print(f"(capturing output to {tar_filename})")
|
print(f"(capturing output to {tar_filename})")
|
||||||
subprocess.run(cmd, stdout=stdout_f, stderr=stderr_f, env=psql_env)
|
subprocess.run(cmd, stdout=stdout_f, stderr=stderr_f, env=psql_env)
|
||||||
|
|
||||||
|
# add_missing_emtpy_rels(incomplete_tar_filename, tar_filename)
|
||||||
|
|
||||||
print(f"Done export: {tar_filename}")
|
print(f"Done export: {tar_filename}")
|
||||||
|
|
||||||
# Import timelines to new pageserver
|
# Import timelines to new pageserver
|
||||||
|
|||||||
167
test_runner/batch_others/test_complete_basebackup.py
Normal file
167
test_runner/batch_others/test_complete_basebackup.py
Normal file
@@ -0,0 +1,167 @@
|
|||||||
|
from fixtures.neon_fixtures import VanillaPostgres
|
||||||
|
from fixtures.utils import subprocess_capture
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
from pathlib import Path
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
|
||||||
|
def get_rel_paths(log_dir, pg_bin, base_tar):
|
||||||
|
"""Yeild list of relation paths"""
|
||||||
|
with tempfile.TemporaryDirectory() as restored_dir:
|
||||||
|
# Unpack the base tar
|
||||||
|
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
||||||
|
|
||||||
|
port = "55439" # Probably free
|
||||||
|
with VanillaPostgres(restored_dir, pg_bin, port, init=False) as vanilla_pg:
|
||||||
|
vanilla_pg.configure([f"port={port}"])
|
||||||
|
vanilla_pg.start()
|
||||||
|
|
||||||
|
# Create database based on template0 because we can't connect to template0
|
||||||
|
query = "create database template0copy template template0"
|
||||||
|
vanilla_pg.safe_psql(query, user="cloud_admin")
|
||||||
|
vanilla_pg.safe_psql("CHECKPOINT", user="cloud_admin")
|
||||||
|
|
||||||
|
# Get all databases
|
||||||
|
query = "select oid, datname from pg_database"
|
||||||
|
oid_dbname_pairs = vanilla_pg.safe_psql(query, user="cloud_admin")
|
||||||
|
template0_oid = [
|
||||||
|
oid
|
||||||
|
for (oid, database) in oid_dbname_pairs
|
||||||
|
if database == "template0"
|
||||||
|
][0]
|
||||||
|
|
||||||
|
# Get rel paths for each database
|
||||||
|
for oid, database in oid_dbname_pairs:
|
||||||
|
if database == "template0":
|
||||||
|
# We can't connect to template0
|
||||||
|
continue
|
||||||
|
|
||||||
|
query = "select relname, pg_relation_filepath(oid) from pg_class"
|
||||||
|
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
|
||||||
|
for relname, filepath in result:
|
||||||
|
if filepath is not None:
|
||||||
|
|
||||||
|
if database == "template0copy":
|
||||||
|
# Add all template0copy paths to template0
|
||||||
|
prefix = f"base/{oid}/"
|
||||||
|
if filepath.startswith(prefix):
|
||||||
|
suffix = filepath[len(prefix):]
|
||||||
|
yield f"base/{template0_oid}/{suffix}"
|
||||||
|
elif filepath.startswith("global"):
|
||||||
|
print(f"skipping {database} global file {filepath}")
|
||||||
|
else:
|
||||||
|
raise AssertionError
|
||||||
|
else:
|
||||||
|
yield filepath
|
||||||
|
|
||||||
|
|
||||||
|
def pack_base(log_dir, restored_dir, output_tar):
|
||||||
|
tmp_tar_name = "tmp.tar"
|
||||||
|
tmp_tar_path = os.path.join(restored_dir, tmp_tar_name)
|
||||||
|
cmd = ["tar", "-cf", tmp_tar_name] + os.listdir(restored_dir)
|
||||||
|
subprocess_capture(log_dir, cmd, cwd=restored_dir)
|
||||||
|
shutil.move(tmp_tar_path, output_tar)
|
||||||
|
|
||||||
|
|
||||||
|
def get_files_in_tar(log_dir, tar):
|
||||||
|
with tempfile.TemporaryDirectory() as restored_dir:
|
||||||
|
# Unpack the base tar
|
||||||
|
subprocess_capture(log_dir, ["tar", "-xf", tar, "-C", restored_dir])
|
||||||
|
|
||||||
|
# Find empty files
|
||||||
|
empty_files = []
|
||||||
|
for root, dirs, files in os.walk(restored_dir):
|
||||||
|
for name in files:
|
||||||
|
file_path = os.path.join(root, name)
|
||||||
|
yield file_path[len(restored_dir) + 1:]
|
||||||
|
|
||||||
|
|
||||||
|
def corrupt(log_dir, base_tar, output_tar):
|
||||||
|
"""Remove all empty files and repackage. Return paths of files removed."""
|
||||||
|
with tempfile.TemporaryDirectory() as restored_dir:
|
||||||
|
# Unpack the base tar
|
||||||
|
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
||||||
|
|
||||||
|
# Find empty files
|
||||||
|
empty_files = []
|
||||||
|
for root, dirs, files in os.walk(restored_dir):
|
||||||
|
for name in files:
|
||||||
|
file_path = os.path.join(root, name)
|
||||||
|
file_size = os.path.getsize(file_path)
|
||||||
|
if file_size == 0:
|
||||||
|
empty_files.append(file_path)
|
||||||
|
|
||||||
|
# Delete empty files (just to see if they get recreated)
|
||||||
|
for empty_file in empty_files:
|
||||||
|
os.remove(empty_file)
|
||||||
|
|
||||||
|
# Repackage
|
||||||
|
pack_base(log_dir, restored_dir, output_tar)
|
||||||
|
|
||||||
|
# Return relative paths
|
||||||
|
return {
|
||||||
|
empty_file[len(restored_dir) + 1:]
|
||||||
|
for empty_file in empty_files
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def touch_missing_rels(log_dir, corrupt_tar, output_tar, paths):
|
||||||
|
with tempfile.TemporaryDirectory() as restored_dir:
|
||||||
|
# Unpack the base tar
|
||||||
|
subprocess_capture(log_dir, ["tar", "-xf", corrupt_tar, "-C", restored_dir])
|
||||||
|
|
||||||
|
# Touch files that don't exist
|
||||||
|
for path in paths:
|
||||||
|
absolute_path = os.path.join(restored_dir, path)
|
||||||
|
exists = os.path.exists(absolute_path)
|
||||||
|
if not exists:
|
||||||
|
print("File {absolute_path} didn't exist. Creating..")
|
||||||
|
Path(absolute_path).touch()
|
||||||
|
|
||||||
|
# Repackage
|
||||||
|
pack_base(log_dir, restored_dir, output_tar)
|
||||||
|
|
||||||
|
|
||||||
|
def test_complete(test_output_dir, pg_bin):
|
||||||
|
# Specify directories
|
||||||
|
# TODO make a basebackup instead of using one from another test
|
||||||
|
work_dir = "/home/bojan/src/neondatabase/neon/test_output/test_import_from_pageserver/"
|
||||||
|
base_tar = os.path.join(work_dir, "psql_2.stdout")
|
||||||
|
output_tar = os.path.join(work_dir, "psql_2-completed.stdout")
|
||||||
|
|
||||||
|
# Create new base tar with missing empty files
|
||||||
|
corrupt_tar = os.path.join(test_output_dir, "psql_2-corrupted.stdout")
|
||||||
|
deleted_files = corrupt(test_output_dir, base_tar, corrupt_tar)
|
||||||
|
assert len(set(get_files_in_tar(test_output_dir, base_tar)) -
|
||||||
|
set(get_files_in_tar(test_output_dir, corrupt_tar))) > 0
|
||||||
|
|
||||||
|
# Reconstruct paths from the corrupted tar, assert it covers everything important
|
||||||
|
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, corrupt_tar))
|
||||||
|
paths_missed = deleted_files - reconstructed_paths
|
||||||
|
assert paths_missed.issubset({
|
||||||
|
"postgresql.auto.conf",
|
||||||
|
"pg_ident.conf",
|
||||||
|
})
|
||||||
|
|
||||||
|
# Recreate the correct tar by touching files, compare with original tar
|
||||||
|
touch_missing_rels(test_output_dir, corrupt_tar, output_tar, reconstructed_paths)
|
||||||
|
paths_missed = (set(get_files_in_tar(test_output_dir, base_tar)) -
|
||||||
|
set(get_files_in_tar(test_output_dir, output_tar)))
|
||||||
|
assert paths_missed.issubset({
|
||||||
|
"postgresql.auto.conf",
|
||||||
|
"pg_ident.conf",
|
||||||
|
})
|
||||||
|
|
||||||
|
# HACK this script relies on test fixtures, but you can run it with
|
||||||
|
# poetry run pytest -k test_main_hack and pass inputs via envvars
|
||||||
|
#
|
||||||
|
# The script takes a base tar, infers what empty rel files might be missing
|
||||||
|
# and creates a new base tar with those files included. It does not modify
|
||||||
|
# the original file.
|
||||||
|
def test_main_hack(test_output_dir, pg_bin, pytestconfig):
|
||||||
|
base_tar = os.environ['INPUT_BASE_TAR']
|
||||||
|
output_tar = os.environ['OUTPUT_BASE_TAR']
|
||||||
|
|
||||||
|
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, base_tar))
|
||||||
|
touch_missing_rels(test_output_dir, base_tar, output_tar, reconstructed_paths)
|
||||||
@@ -11,7 +11,7 @@ import signal
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir
|
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir
|
||||||
from fixtures.utils import lsn_from_hex
|
from fixtures.utils import lsn_from_hex, subprocess_capture
|
||||||
|
|
||||||
|
|
||||||
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
|
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
|
||||||
@@ -101,10 +101,6 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
|
|||||||
log.info('load thread stopped')
|
log.info('load thread stopped')
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip(
|
|
||||||
reason=
|
|
||||||
"needs to replace callmemaybe call with better idea how to migrate timelines between pageservers"
|
|
||||||
)
|
|
||||||
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
|
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
|
||||||
def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||||
port_distributor: PortDistributor,
|
port_distributor: PortDistributor,
|
||||||
@@ -188,30 +184,38 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
|||||||
new_pageserver_http_port,
|
new_pageserver_http_port,
|
||||||
neon_env_builder.broker):
|
neon_env_builder.broker):
|
||||||
|
|
||||||
# call to attach timeline to new pageserver
|
# Migrate either by attacking from s3 or import/export basebackup
|
||||||
new_pageserver_http.timeline_attach(tenant, timeline)
|
relocation_method = "import"
|
||||||
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
|
if relocation_method == "import":
|
||||||
new_timeline_detail = wait_until(
|
scripts_dir = "/home/bojan/src/neondatabase/neon/scripts/"
|
||||||
number_of_iterations=5,
|
cmd = [
|
||||||
interval=1,
|
"python",
|
||||||
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
|
os.path.join(scripts_dir, "export_import_betwen_pageservers.py"),
|
||||||
|
"--tenant-id", tenant.hex,
|
||||||
|
"--from-host", "localhost",
|
||||||
|
"--from-http-port", str(pageserver_http.port),
|
||||||
|
"--from-pg-port", str(env.pageserver.service_port.pg),
|
||||||
|
"--to-host", "localhost",
|
||||||
|
"--to-http-port", str(new_pageserver_http_port),
|
||||||
|
"--to-pg-port", str(new_pageserver_pg_port),
|
||||||
|
"--psql-path", os.path.join(pg_distrib_dir, "bin", "psql"),
|
||||||
|
]
|
||||||
|
subprocess_capture(env.repo_dir, cmd, check=True)
|
||||||
|
elif relocation_method == "attach":
|
||||||
|
# call to attach timeline to new pageserver
|
||||||
|
new_pageserver_http.timeline_attach(tenant, timeline)
|
||||||
|
|
||||||
# when load is active these checks can break because lsns are not static
|
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
|
||||||
# so lets check with some margin
|
new_timeline_detail = wait_until(
|
||||||
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
number_of_iterations=5,
|
||||||
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
|
interval=1,
|
||||||
0.03)
|
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
|
||||||
|
|
||||||
# callmemaybe to start replication from safekeeper to the new pageserver
|
# when load is active these checks can break because lsns are not static
|
||||||
# when there is no load there is a clean checkpoint and no wal delta
|
# so lets check with some margin
|
||||||
# needs to be streamed to the new pageserver
|
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
||||||
# TODO (rodionov) use attach to start replication
|
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
|
||||||
with pg_cur(PgProtocol(host='localhost', port=new_pageserver_pg_port)) as cur:
|
0.03)
|
||||||
# "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'"
|
|
||||||
safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
|
|
||||||
cur.execute("callmemaybe {} {} {}".format(tenant.hex,
|
|
||||||
timeline.hex,
|
|
||||||
safekeeper_connstring))
|
|
||||||
|
|
||||||
tenant_pg.stop()
|
tenant_pg.stop()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user