mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 05:50:38 +00:00
Compare commits
7 Commits
enable-pg_
...
bodobolero
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9b9a12a3a5 | ||
|
|
52eafba2cc | ||
|
|
4cfedee06d | ||
|
|
80ae39311e | ||
|
|
144cac71d8 | ||
|
|
4f2d8894ac | ||
|
|
1755fb4ef3 |
@@ -890,8 +890,6 @@ impl ComputeNode {
|
||||
.context("apply_config handle_grants")?;
|
||||
handle_extensions(spec, &mut client).context("apply_config handle_extensions")?;
|
||||
handle_extension_neon(&mut client).context("apply_config handle_extension_neon")?;
|
||||
handle_jwt_extension(spec, &mut client, connstr.as_str())
|
||||
.context("apply_config handle_jwt_extension")?;
|
||||
create_availability_check_data(&mut client)
|
||||
.context("apply_config create_availability_check_data")?;
|
||||
|
||||
@@ -994,7 +992,6 @@ impl ComputeNode {
|
||||
)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
handle_jwt_extension(&spec, &mut client, self.connstr.as_str())?;
|
||||
// We can skip handle_migrations here because a new migration can only appear
|
||||
// if we have a new version of the compute_ctl binary, which can only happen
|
||||
// if compute got restarted, in which case we'll end up inside of apply_config
|
||||
|
||||
@@ -731,48 +731,7 @@ pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()>
|
||||
client.simple_query(query)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create pg_session_jwt in all databases if configured
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_jwt_extension(spec: &ComputeSpec, client: &mut Client, connstr: &str) -> Result<()> {
|
||||
if let Some(local_proxy) = &spec.local_proxy_config {
|
||||
if let Some(jwks_list) = &local_proxy.jwks {
|
||||
if !jwks_list.is_empty() {
|
||||
info!("enabling pg_session_jwt extension");
|
||||
let existing_dbs = get_existing_dbs(client)?;
|
||||
|
||||
for db in &spec.cluster.databases {
|
||||
match existing_dbs.get(&db.name) {
|
||||
Some(pg_db) => {
|
||||
if pg_db.restrict_conn || pg_db.invalid {
|
||||
info!(
|
||||
"skipping extension for db {} (invalid: {}, connections not allowed: {})",
|
||||
db.name, pg_db.invalid, pg_db.restrict_conn
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
bail!(
|
||||
"database {} doesn't exist in Postgres after handle_databases()",
|
||||
db.name
|
||||
);
|
||||
}
|
||||
}
|
||||
let mut conf = Config::from_str(connstr)?;
|
||||
conf.dbname(&db.name);
|
||||
|
||||
let mut db_client = conf.connect(NoTls)?;
|
||||
|
||||
let query = "CREATE EXTENSION IF NOT EXISTS pg_session_jwt";
|
||||
info!("creating pg_session_jwt extension with query: {}", query);
|
||||
db_client.simple_query(query)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -496,12 +496,26 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
|
||||
}
|
||||
|
||||
if let Some((start, end)) = opts.byte_range() {
|
||||
builder = builder.range(match end {
|
||||
Some(end) => Range::Range(start..end),
|
||||
None => Range::RangeFrom(start..),
|
||||
});
|
||||
}
|
||||
self.download_for_builder(builder, cancel).await
|
||||
}
|
||||
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
let blob_client = self.client.blob_client(self.relative_path_to_name(from));
|
||||
|
||||
let mut builder = blob_client.get();
|
||||
|
||||
let range: Range = if let Some(end_exclusive) = end_exclusive {
|
||||
(start_inclusive..end_exclusive).into()
|
||||
} else {
|
||||
(start_inclusive..).into()
|
||||
};
|
||||
builder = builder.range(range);
|
||||
|
||||
self.download_for_builder(builder, cancel).await
|
||||
}
|
||||
|
||||
@@ -19,8 +19,7 @@ mod simulate_failures;
|
||||
mod support;
|
||||
|
||||
use std::{
|
||||
collections::HashMap, fmt::Debug, num::NonZeroU32, ops::Bound, pin::Pin, sync::Arc,
|
||||
time::SystemTime,
|
||||
collections::HashMap, fmt::Debug, num::NonZeroU32, pin::Pin, sync::Arc, time::SystemTime,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -163,60 +162,11 @@ pub struct Listing {
|
||||
}
|
||||
|
||||
/// Options for downloads. The default value is a plain GET.
|
||||
#[derive(Default)]
|
||||
pub struct DownloadOpts {
|
||||
/// If given, returns [`DownloadError::Unmodified`] if the object still has
|
||||
/// the same ETag (using If-None-Match).
|
||||
pub etag: Option<Etag>,
|
||||
/// The start of the byte range to download, or unbounded.
|
||||
pub byte_start: Bound<u64>,
|
||||
/// The end of the byte range to download, or unbounded. Must be after the
|
||||
/// start bound.
|
||||
pub byte_end: Bound<u64>,
|
||||
}
|
||||
|
||||
impl Default for DownloadOpts {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
etag: Default::default(),
|
||||
byte_start: Bound::Unbounded,
|
||||
byte_end: Bound::Unbounded,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DownloadOpts {
|
||||
/// Returns the byte range with inclusive start and exclusive end, or None
|
||||
/// if unbounded.
|
||||
pub fn byte_range(&self) -> Option<(u64, Option<u64>)> {
|
||||
if self.byte_start == Bound::Unbounded && self.byte_end == Bound::Unbounded {
|
||||
return None;
|
||||
}
|
||||
let start = match self.byte_start {
|
||||
Bound::Excluded(i) => i + 1,
|
||||
Bound::Included(i) => i,
|
||||
Bound::Unbounded => 0,
|
||||
};
|
||||
let end = match self.byte_end {
|
||||
Bound::Excluded(i) => Some(i),
|
||||
Bound::Included(i) => Some(i + 1),
|
||||
Bound::Unbounded => None,
|
||||
};
|
||||
if let Some(end) = end {
|
||||
assert!(start < end, "range end {end} at or before start {start}");
|
||||
}
|
||||
Some((start, end))
|
||||
}
|
||||
|
||||
/// Returns the byte range as an RFC 2616 Range header value with inclusive
|
||||
/// bounds, or None if unbounded.
|
||||
pub fn byte_range_header(&self) -> Option<String> {
|
||||
self.byte_range()
|
||||
.map(|(start, end)| (start, end.map(|end| end - 1))) // make end inclusive
|
||||
.map(|(start, end)| match end {
|
||||
Some(end) => format!("bytes={start}-{end}"),
|
||||
None => format!("bytes={start}-"),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
@@ -307,6 +257,21 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
/// Streams a given byte range of the remote storage entry contents.
|
||||
///
|
||||
/// The returned download stream will obey initial timeout and cancellation signal by erroring
|
||||
/// on whichever happens first. Only one of the reasons will fail the stream, which is usually
|
||||
/// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
|
||||
///
|
||||
/// Returns the metadata, if any was stored with the file previously.
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
/// Delete a single path from remote storage.
|
||||
///
|
||||
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
|
||||
@@ -460,6 +425,33 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
match self {
|
||||
Self::LocalFs(s) => {
|
||||
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
|
||||
.await
|
||||
}
|
||||
Self::AwsS3(s) => {
|
||||
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
|
||||
.await
|
||||
}
|
||||
Self::AzureBlob(s) => {
|
||||
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
|
||||
.await
|
||||
}
|
||||
Self::Unreliable(s) => {
|
||||
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteStorage::delete`]
|
||||
pub async fn delete(
|
||||
&self,
|
||||
@@ -581,6 +573,20 @@ impl GenericRemoteStorage {
|
||||
})
|
||||
}
|
||||
|
||||
/// Downloads the storage object into the `to_path` provided.
|
||||
/// `byte_range` could be specified to dowload only a part of the file, if needed.
|
||||
pub async fn download_storage_object(
|
||||
&self,
|
||||
byte_range: Option<(u64, Option<u64>)>,
|
||||
from: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
match byte_range {
|
||||
Some((start, end)) => self.download_byte_range(from, start, end, cancel).await,
|
||||
None => self.download(from, &DownloadOpts::default(), cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// The name of the bucket/container/etc.
|
||||
pub fn bucket_name(&self) -> Option<&str> {
|
||||
match self {
|
||||
@@ -654,76 +660,6 @@ impl ConcurrencyLimiter {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
/// DownloadOpts::byte_range() should generate (inclusive, exclusive) ranges
|
||||
/// with optional end bound, or None when unbounded.
|
||||
#[test]
|
||||
fn download_opts_byte_range() {
|
||||
// Consider using test_case or a similar table-driven test framework.
|
||||
let cases = [
|
||||
// (byte_start, byte_end, expected)
|
||||
(Bound::Unbounded, Bound::Unbounded, None),
|
||||
(Bound::Unbounded, Bound::Included(7), Some((0, Some(8)))),
|
||||
(Bound::Unbounded, Bound::Excluded(7), Some((0, Some(7)))),
|
||||
(Bound::Included(3), Bound::Unbounded, Some((3, None))),
|
||||
(Bound::Included(3), Bound::Included(7), Some((3, Some(8)))),
|
||||
(Bound::Included(3), Bound::Excluded(7), Some((3, Some(7)))),
|
||||
(Bound::Excluded(3), Bound::Unbounded, Some((4, None))),
|
||||
(Bound::Excluded(3), Bound::Included(7), Some((4, Some(8)))),
|
||||
(Bound::Excluded(3), Bound::Excluded(7), Some((4, Some(7)))),
|
||||
// 1-sized ranges are fine, 0 aren't and will panic (separate test).
|
||||
(Bound::Included(3), Bound::Included(3), Some((3, Some(4)))),
|
||||
(Bound::Included(3), Bound::Excluded(4), Some((3, Some(4)))),
|
||||
];
|
||||
|
||||
for (byte_start, byte_end, expect) in cases {
|
||||
let opts = DownloadOpts {
|
||||
byte_start,
|
||||
byte_end,
|
||||
..Default::default()
|
||||
};
|
||||
let result = opts.byte_range();
|
||||
assert_eq!(
|
||||
result, expect,
|
||||
"byte_start={byte_start:?} byte_end={byte_end:?}"
|
||||
);
|
||||
|
||||
// Check generated HTTP header, which uses an inclusive range.
|
||||
let expect_header = expect.map(|(start, end)| match end {
|
||||
Some(end) => format!("bytes={start}-{}", end - 1), // inclusive end
|
||||
None => format!("bytes={start}-"),
|
||||
});
|
||||
assert_eq!(
|
||||
opts.byte_range_header(),
|
||||
expect_header,
|
||||
"byte_start={byte_start:?} byte_end={byte_end:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// DownloadOpts::byte_range() zero-sized byte range should panic.
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn download_opts_byte_range_zero() {
|
||||
DownloadOpts {
|
||||
byte_start: Bound::Included(3),
|
||||
byte_end: Bound::Excluded(3),
|
||||
..Default::default()
|
||||
}
|
||||
.byte_range();
|
||||
}
|
||||
|
||||
/// DownloadOpts::byte_range() negative byte range should panic.
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn download_opts_byte_range_negative() {
|
||||
DownloadOpts {
|
||||
byte_start: Bound::Included(3),
|
||||
byte_end: Bound::Included(2),
|
||||
..Default::default()
|
||||
}
|
||||
.byte_range();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_object_name() {
|
||||
let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
|
||||
|
||||
@@ -506,29 +506,16 @@ impl RemoteStorage for LocalFs {
|
||||
return Err(DownloadError::Unmodified);
|
||||
}
|
||||
|
||||
let mut file = fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(&target_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to open source file {target_path:?} to use in the download")
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let mut take = file_metadata.len();
|
||||
if let Some((start, end)) = opts.byte_range() {
|
||||
if start > 0 {
|
||||
file.seek(io::SeekFrom::Start(start))
|
||||
.await
|
||||
.context("Failed to seek to the range start in a local storage file")
|
||||
.map_err(DownloadError::Other)?;
|
||||
}
|
||||
if let Some(end) = end {
|
||||
take = end - start;
|
||||
}
|
||||
}
|
||||
|
||||
let source = ReaderStream::new(file.take(take));
|
||||
let source = ReaderStream::new(
|
||||
fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(&target_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to open source file {target_path:?} to use in the download")
|
||||
})
|
||||
.map_err(DownloadError::Other)?,
|
||||
);
|
||||
|
||||
let metadata = self
|
||||
.read_storage_metadata(&target_path)
|
||||
@@ -548,6 +535,68 @@ impl RemoteStorage for LocalFs {
|
||||
})
|
||||
}
|
||||
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
if let Some(end_exclusive) = end_exclusive {
|
||||
if end_exclusive <= start_inclusive {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
|
||||
};
|
||||
if start_inclusive == end_exclusive.saturating_sub(1) {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
|
||||
}
|
||||
}
|
||||
|
||||
let target_path = from.with_base(&self.storage_root);
|
||||
let file_metadata = file_metadata(&target_path).await?;
|
||||
let mut source = tokio::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(&target_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to open source file {target_path:?} to use in the download")
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let len = source
|
||||
.metadata()
|
||||
.await
|
||||
.context("query file length")
|
||||
.map_err(DownloadError::Other)?
|
||||
.len();
|
||||
|
||||
source
|
||||
.seek(io::SeekFrom::Start(start_inclusive))
|
||||
.await
|
||||
.context("Failed to seek to the range start in a local storage file")
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let metadata = self
|
||||
.read_storage_metadata(&target_path)
|
||||
.await
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
|
||||
let source = ReaderStream::new(source);
|
||||
|
||||
let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
|
||||
let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
|
||||
|
||||
let etag = mock_etag(&file_metadata);
|
||||
Ok(Download {
|
||||
metadata,
|
||||
last_modified: file_metadata
|
||||
.modified()
|
||||
.map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
|
||||
etag,
|
||||
download_stream: Box::pin(source),
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
let file_path = path.with_base(&self.storage_root);
|
||||
match fs::remove_file(&file_path).await {
|
||||
@@ -639,7 +688,7 @@ mod fs_tests {
|
||||
use super::*;
|
||||
|
||||
use camino_tempfile::tempdir;
|
||||
use std::{collections::HashMap, io::Write, ops::Bound};
|
||||
use std::{collections::HashMap, io::Write};
|
||||
|
||||
async fn read_and_check_metadata(
|
||||
storage: &LocalFs,
|
||||
@@ -755,12 +804,10 @@ mod fs_tests {
|
||||
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
|
||||
|
||||
let first_part_download = storage
|
||||
.download(
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
&DownloadOpts {
|
||||
byte_end: Bound::Excluded(first_part_local.len() as u64),
|
||||
..Default::default()
|
||||
},
|
||||
0,
|
||||
Some(first_part_local.len() as u64),
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
@@ -776,15 +823,10 @@ mod fs_tests {
|
||||
);
|
||||
|
||||
let second_part_download = storage
|
||||
.download(
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(first_part_local.len() as u64),
|
||||
byte_end: Bound::Excluded(
|
||||
(first_part_local.len() + second_part_local.len()) as u64,
|
||||
),
|
||||
..Default::default()
|
||||
},
|
||||
first_part_local.len() as u64,
|
||||
Some((first_part_local.len() + second_part_local.len()) as u64),
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
@@ -800,14 +842,7 @@ mod fs_tests {
|
||||
);
|
||||
|
||||
let suffix_bytes = storage
|
||||
.download(
|
||||
&upload_target,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(13),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.download_byte_range(&upload_target, 13, None, &cancel)
|
||||
.await?
|
||||
.download_stream;
|
||||
let suffix_bytes = aggregate(suffix_bytes).await?;
|
||||
@@ -815,7 +850,7 @@ mod fs_tests {
|
||||
assert_eq!(upload_name, suffix);
|
||||
|
||||
let all_bytes = storage
|
||||
.download(&upload_target, &DownloadOpts::default(), &cancel)
|
||||
.download_byte_range(&upload_target, 0, None, &cancel)
|
||||
.await?
|
||||
.download_stream;
|
||||
let all_bytes = aggregate(all_bytes).await?;
|
||||
@@ -826,26 +861,48 @@ mod fs_tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic(expected = "at or before start")]
|
||||
async fn download_file_range_negative() {
|
||||
let (storage, cancel) = create_storage().unwrap();
|
||||
async fn download_file_range_negative() -> anyhow::Result<()> {
|
||||
let (storage, cancel) = create_storage()?;
|
||||
let upload_name = "upload_1";
|
||||
let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel)
|
||||
.await
|
||||
.unwrap();
|
||||
let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
|
||||
|
||||
storage
|
||||
.download(
|
||||
let start = 1_000_000_000;
|
||||
let end = start + 1;
|
||||
match storage
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(10),
|
||||
byte_end: Bound::Excluded(10),
|
||||
..Default::default()
|
||||
},
|
||||
start,
|
||||
Some(end), // exclusive end
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
{
|
||||
Ok(_) => panic!("Should not allow downloading wrong ranges"),
|
||||
Err(e) => {
|
||||
let error_string = e.to_string();
|
||||
assert!(error_string.contains("zero bytes"));
|
||||
assert!(error_string.contains(&start.to_string()));
|
||||
assert!(error_string.contains(&end.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let start = 10000;
|
||||
let end = 234;
|
||||
assert!(start > end, "Should test an incorrect range");
|
||||
match storage
|
||||
.download_byte_range(&upload_target, start, Some(end), &cancel)
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("Should not allow downloading wrong ranges"),
|
||||
Err(e) => {
|
||||
let error_string = e.to_string();
|
||||
assert!(error_string.contains("Invalid range"));
|
||||
assert!(error_string.contains(&start.to_string()));
|
||||
assert!(error_string.contains(&end.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -888,12 +945,10 @@ mod fs_tests {
|
||||
let (first_part_local, _) = uploaded_bytes.split_at(3);
|
||||
|
||||
let partial_download_with_metadata = storage
|
||||
.download(
|
||||
.download_byte_range(
|
||||
&upload_target,
|
||||
&DownloadOpts {
|
||||
byte_end: Bound::Excluded(first_part_local.len() as u64),
|
||||
..Default::default()
|
||||
},
|
||||
0,
|
||||
Some(first_part_local.len() as u64),
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -804,7 +804,34 @@ impl RemoteStorage for S3Bucket {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
etag: opts.etag.as_ref().map(|e| e.to_string()),
|
||||
range: opts.byte_range_header(),
|
||||
range: None,
|
||||
},
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
|
||||
// and needs both ends to be exclusive
|
||||
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
|
||||
let range = Some(match end_inclusive {
|
||||
Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
|
||||
None => format!("bytes={start_inclusive}-"),
|
||||
});
|
||||
|
||||
self.download_object(
|
||||
GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
etag: None,
|
||||
range,
|
||||
},
|
||||
cancel,
|
||||
)
|
||||
|
||||
@@ -170,13 +170,28 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
opts: &DownloadOpts,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
// Note: We treat any byte range as an "attempt" of the same operation.
|
||||
// We don't pay attention to the ranges. That's good enough for now.
|
||||
self.attempt(RemoteOp::Download(from.clone()))
|
||||
.map_err(DownloadError::Other)?;
|
||||
self.inner.download(from, opts, cancel).await
|
||||
}
|
||||
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
// Note: We treat any download_byte_range as an "attempt" of the same
|
||||
// operation. We don't pay attention to the ranges. That's good enough
|
||||
// for now.
|
||||
self.attempt(RemoteOp::Download(from.clone()))
|
||||
.map_err(DownloadError::Other)?;
|
||||
self.inner
|
||||
.download_byte_range(from, start_inclusive, end_exclusive, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
self.delete_inner(path, true, cancel).await
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use futures::StreamExt;
|
||||
use remote_storage::{DownloadError, DownloadOpts, ListingMode, ListingObject, RemotePath};
|
||||
use std::ops::Bound;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashSet, num::NonZeroU32};
|
||||
use test_context::test_context;
|
||||
@@ -294,15 +293,7 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
|
||||
// Full range (end specified)
|
||||
let dl = ctx
|
||||
.client
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(0),
|
||||
byte_end: Bound::Excluded(len as u64),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.download_byte_range(&path, 0, Some(len as u64), &cancel)
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
@@ -310,15 +301,7 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
|
||||
// partial range (end specified)
|
||||
let dl = ctx
|
||||
.client
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(4),
|
||||
byte_end: Bound::Excluded(10),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.download_byte_range(&path, 4, Some(10), &cancel)
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig[4..10]);
|
||||
@@ -326,15 +309,7 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
|
||||
// partial range (end beyond real end)
|
||||
let dl = ctx
|
||||
.client
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(8),
|
||||
byte_end: Bound::Excluded(len as u64 * 100),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.download_byte_range(&path, 8, Some(len as u64 * 100), &cancel)
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig[8..]);
|
||||
@@ -342,14 +317,7 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
|
||||
// Partial range (end unspecified)
|
||||
let dl = ctx
|
||||
.client
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(4),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.download_byte_range(&path, 4, None, &cancel)
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig[4..]);
|
||||
@@ -357,14 +325,7 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
|
||||
// Full range (end unspecified)
|
||||
let dl = ctx
|
||||
.client
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(0),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.download_byte_range(&path, 0, None, &cancel)
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
|
||||
@@ -950,7 +950,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
let cancel = &self.secondary_state.cancel;
|
||||
let opts = DownloadOpts {
|
||||
etag: prev_etag.cloned(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
backoff::retry(
|
||||
|
||||
@@ -39,7 +39,7 @@ http.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
hyper0.workspace = true
|
||||
hyper = { workspace = true, features = ["server", "http1", "http2"] }
|
||||
hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
|
||||
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
|
||||
http-body-util = { version = "0.1" }
|
||||
indexmap.workspace = true
|
||||
|
||||
@@ -571,7 +571,7 @@ mod tests {
|
||||
use bytes::Bytes;
|
||||
use http::Response;
|
||||
use http_body_util::Full;
|
||||
use hyper::service::service_fn;
|
||||
use hyper1::service::service_fn;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use rand::rngs::OsRng;
|
||||
use rsa::pkcs8::DecodePrivateKey;
|
||||
@@ -736,7 +736,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
|
||||
});
|
||||
|
||||
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
|
||||
let server = hyper::server::conn::http1::Builder::new();
|
||||
let server = hyper1::server::conn::http1::Builder::new();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use anyhow::{anyhow, bail};
|
||||
use hyper0::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
|
||||
use measured::{text::BufferedTextEncoder, MetricGroup};
|
||||
use metrics::NeonMetrics;
|
||||
use std::{
|
||||
@@ -21,7 +21,7 @@ async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
json_response(StatusCode::OK, "")
|
||||
}
|
||||
|
||||
fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper0::Body, ApiError> {
|
||||
fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
let state = Arc::new(Mutex::new(PrometheusHandler {
|
||||
encoder: BufferedTextEncoder::new(),
|
||||
metrics,
|
||||
@@ -45,7 +45,7 @@ pub async fn task_main(
|
||||
|
||||
let service = || RouterService::new(make_router(metrics).build()?);
|
||||
|
||||
hyper0::Server::from_tcp(http_listener)?
|
||||
hyper::Server::from_tcp(http_listener)?
|
||||
.serve(service().map_err(|e| anyhow!(e))?)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ use std::time::Duration;
|
||||
use anyhow::bail;
|
||||
use bytes::Bytes;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::Body;
|
||||
use hyper1::body::Body;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
pub(crate) use reqwest::{Request, Response};
|
||||
|
||||
@@ -90,6 +90,8 @@ use tokio::task::JoinError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
extern crate hyper0 as hyper;
|
||||
|
||||
pub mod auth;
|
||||
pub mod cache;
|
||||
pub mod cancellation;
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::metrics::{
|
||||
WakeupFailureKind,
|
||||
};
|
||||
use crate::proxy::retry::{retry_after, should_retry};
|
||||
use hyper::StatusCode;
|
||||
use hyper1::StatusCode;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use super::connect_compute::ComputeConnectBackend;
|
||||
|
||||
@@ -257,7 +257,7 @@ pub(crate) enum LocalProxyConnError {
|
||||
#[error("error with connection to local-proxy")]
|
||||
Io(#[source] std::io::Error),
|
||||
#[error("could not establish h2 connection")]
|
||||
H2(#[from] hyper::Error),
|
||||
H2(#[from] hyper1::Error),
|
||||
}
|
||||
|
||||
impl ReportableError for HttpConnError {
|
||||
@@ -481,7 +481,7 @@ async fn connect_http2(
|
||||
};
|
||||
};
|
||||
|
||||
let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
|
||||
let (client, connection) = hyper1::client::conn::http2::Builder::new(TokioExecutor::new())
|
||||
.timer(TokioTimer::new())
|
||||
.keep_alive_interval(Duration::from_secs(20))
|
||||
.keep_alive_while_idle(true)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use dashmap::DashMap;
|
||||
use hyper::client::conn::http2;
|
||||
use hyper1::client::conn::http2;
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo};
|
||||
use parking_lot::RwLock;
|
||||
use rand::Rng;
|
||||
@@ -18,9 +18,9 @@ use tracing::{info, info_span, Instrument};
|
||||
|
||||
use super::conn_pool::ConnInfo;
|
||||
|
||||
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
|
||||
pub(crate) type Send = http2::SendRequest<hyper1::body::Incoming>;
|
||||
pub(crate) type Connect =
|
||||
http2::Connection<TokioIo<TcpStream>, hyper::body::Incoming, TokioExecutor>;
|
||||
http2::Connection<TokioIo<TcpStream>, hyper1::body::Incoming, TokioExecutor>;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ConnPoolEntry {
|
||||
|
||||
@@ -11,7 +11,7 @@ use serde::Serialize;
|
||||
use utils::http::error::ApiError;
|
||||
|
||||
/// Like [`ApiError::into_response`]
|
||||
pub(crate) fn api_error_into_response(this: ApiError) -> Response<BoxBody<Bytes, hyper::Error>> {
|
||||
pub(crate) fn api_error_into_response(this: ApiError) -> Response<BoxBody<Bytes, hyper1::Error>> {
|
||||
match this {
|
||||
ApiError::BadRequest(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
format!("{err:#?}"), // use debug printing so that we give the cause
|
||||
@@ -67,12 +67,12 @@ impl HttpErrorBody {
|
||||
fn response_from_msg_and_status(
|
||||
msg: String,
|
||||
status: StatusCode,
|
||||
) -> Response<BoxBody<Bytes, hyper::Error>> {
|
||||
) -> Response<BoxBody<Bytes, hyper1::Error>> {
|
||||
HttpErrorBody { msg }.to_response(status)
|
||||
}
|
||||
|
||||
/// Same as [`utils::http::error::HttpErrorBody::to_response`]
|
||||
fn to_response(&self, status: StatusCode) -> Response<BoxBody<Bytes, hyper::Error>> {
|
||||
fn to_response(&self, status: StatusCode) -> Response<BoxBody<Bytes, hyper1::Error>> {
|
||||
Response::builder()
|
||||
.status(status)
|
||||
.header(http::header::CONTENT_TYPE, "application/json")
|
||||
@@ -90,7 +90,7 @@ impl HttpErrorBody {
|
||||
pub(crate) fn json_response<T: Serialize>(
|
||||
status: StatusCode,
|
||||
data: T,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, ApiError> {
|
||||
let json = serde_json::to_string(&data)
|
||||
.context("Failed to serialize JSON response")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
@@ -22,7 +22,7 @@ use futures::TryFutureExt;
|
||||
use http::{Method, Response, StatusCode};
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Empty};
|
||||
use hyper::body::Incoming;
|
||||
use hyper1::body::Incoming;
|
||||
use hyper_util::rt::TokioExecutor;
|
||||
use hyper_util::server::conn::auto::Builder;
|
||||
use rand::rngs::StdRng;
|
||||
@@ -302,7 +302,7 @@ async fn connection_handler(
|
||||
let server = Builder::new(TokioExecutor::new());
|
||||
let conn = server.serve_connection_with_upgrades(
|
||||
hyper_util::rt::TokioIo::new(conn),
|
||||
hyper::service::service_fn(move |req: hyper::Request<Incoming>| {
|
||||
hyper1::service::service_fn(move |req: hyper1::Request<Incoming>| {
|
||||
// First HTTP request shares the same session ID
|
||||
let session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
|
||||
|
||||
@@ -355,7 +355,7 @@ async fn connection_handler(
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn request_handler(
|
||||
mut request: hyper::Request<Incoming>,
|
||||
mut request: hyper1::Request<Incoming>,
|
||||
config: &'static ProxyConfig,
|
||||
backend: Arc<PoolingBackend>,
|
||||
ws_connections: TaskTracker,
|
||||
@@ -365,7 +365,7 @@ async fn request_handler(
|
||||
// used to cancel in-flight HTTP requests. not used to cancel websockets
|
||||
http_cancellation_token: CancellationToken,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, ApiError> {
|
||||
let host = request
|
||||
.headers()
|
||||
.get("host")
|
||||
|
||||
@@ -12,14 +12,14 @@ use http::Method;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::BodyExt;
|
||||
use http_body_util::Full;
|
||||
use hyper::body::Body;
|
||||
use hyper::body::Incoming;
|
||||
use hyper::header;
|
||||
use hyper::http::HeaderName;
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::Response;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{HeaderMap, Request};
|
||||
use hyper1::body::Body;
|
||||
use hyper1::body::Incoming;
|
||||
use hyper1::header;
|
||||
use hyper1::http::HeaderName;
|
||||
use hyper1::http::HeaderValue;
|
||||
use hyper1::Response;
|
||||
use hyper1::StatusCode;
|
||||
use hyper1::{HeaderMap, Request};
|
||||
use pq_proto::StartupMessageParamsBuilder;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
@@ -272,7 +272,7 @@ pub(crate) async fn handle(
|
||||
request: Request<Incoming>,
|
||||
backend: Arc<PoolingBackend>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, ApiError> {
|
||||
let result = handle_inner(cancel, config, &ctx, request, backend).await;
|
||||
|
||||
let mut response = match result {
|
||||
@@ -435,7 +435,7 @@ impl UserFacingError for SqlOverHttpError {
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum ReadPayloadError {
|
||||
#[error("could not read the HTTP request body: {0}")]
|
||||
Read(#[from] hyper::Error),
|
||||
Read(#[from] hyper1::Error),
|
||||
#[error("could not parse the HTTP request body: {0}")]
|
||||
Parse(#[from] serde_json::Error),
|
||||
}
|
||||
@@ -476,7 +476,7 @@ struct HttpHeaders {
|
||||
}
|
||||
|
||||
impl HttpHeaders {
|
||||
fn try_parse(headers: &hyper::http::HeaderMap) -> Result<Self, SqlOverHttpError> {
|
||||
fn try_parse(headers: &hyper1::http::HeaderMap) -> Result<Self, SqlOverHttpError> {
|
||||
// Determine the output options. Default behaviour is 'false'. Anything that is not
|
||||
// strictly 'true' assumed to be false.
|
||||
let raw_output = headers.get(&RAW_TEXT_OUTPUT) == Some(&HEADER_VALUE_TRUE);
|
||||
@@ -529,7 +529,7 @@ async fn handle_inner(
|
||||
ctx: &RequestMonitoring,
|
||||
request: Request<Incoming>,
|
||||
backend: Arc<PoolingBackend>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, SqlOverHttpError> {
|
||||
let _requeset_gauge = Metrics::get()
|
||||
.proxy
|
||||
.connection_requests
|
||||
@@ -577,7 +577,7 @@ async fn handle_db_inner(
|
||||
conn_info: ConnInfo,
|
||||
auth: AuthData,
|
||||
backend: Arc<PoolingBackend>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, SqlOverHttpError> {
|
||||
//
|
||||
// Determine the destination and connection params
|
||||
//
|
||||
@@ -744,7 +744,7 @@ async fn handle_auth_broker_inner(
|
||||
conn_info: ConnInfo,
|
||||
jwt: String,
|
||||
backend: Arc<PoolingBackend>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, SqlOverHttpError> {
|
||||
backend
|
||||
.authenticate_with_jwt(
|
||||
ctx,
|
||||
|
||||
@@ -12,7 +12,7 @@ use anyhow::Context as _;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use framed_websockets::{Frame, OpCode, WebSocketServer};
|
||||
use futures::{Sink, Stream};
|
||||
use hyper::upgrade::OnUpgrade;
|
||||
use hyper1::upgrade::OnUpgrade;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
|
||||
@@ -485,51 +485,49 @@ async fn upload_events_chunk(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::{
|
||||
net::TcpListener,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use crate::{http, BranchId, EndpointId};
|
||||
use anyhow::Error;
|
||||
use chrono::Utc;
|
||||
use consumption_metrics::{Event, EventChunk};
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request, Response};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::net::TcpListener;
|
||||
use hyper::{
|
||||
service::{make_service_fn, service_fn},
|
||||
Body, Response,
|
||||
};
|
||||
use url::Url;
|
||||
|
||||
use super::*;
|
||||
use crate::{http, BranchId, EndpointId};
|
||||
|
||||
#[tokio::test]
|
||||
async fn metrics() {
|
||||
type Report = EventChunk<'static, Event<Ids, String>>;
|
||||
let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
|
||||
let listener = TcpListener::bind("0.0.0.0:0").unwrap();
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn({
|
||||
let reports = reports.clone();
|
||||
async move {
|
||||
loop {
|
||||
if let Ok((stream, _addr)) = listener.accept().await {
|
||||
let reports = Arc::new(Mutex::new(vec![]));
|
||||
let reports2 = reports.clone();
|
||||
|
||||
let server = hyper::server::Server::from_tcp(listener)
|
||||
.unwrap()
|
||||
.serve(make_service_fn(move |_| {
|
||||
let reports = reports.clone();
|
||||
async move {
|
||||
Ok::<_, Error>(service_fn(move |req| {
|
||||
let reports = reports.clone();
|
||||
http1::Builder::new()
|
||||
.serve_connection(
|
||||
TokioIo::new(stream),
|
||||
service_fn(move |req: Request<Incoming>| {
|
||||
let reports = reports.clone();
|
||||
async move {
|
||||
let bytes = req.into_body().collect().await?.to_bytes();
|
||||
let events = serde_json::from_slice(&bytes)?;
|
||||
reports.lock().unwrap().push(events);
|
||||
Ok::<_, Error>(Response::new(String::new()))
|
||||
}
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
async move {
|
||||
let bytes = hyper::body::to_bytes(req.into_body()).await?;
|
||||
let events: EventChunk<'static, Event<Ids, String>> =
|
||||
serde_json::from_slice(&bytes)?;
|
||||
reports.lock().unwrap().push(events);
|
||||
Ok::<_, Error>(Response::new(Body::from(vec![])))
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
});
|
||||
}));
|
||||
let addr = server.local_addr();
|
||||
tokio::spawn(server);
|
||||
|
||||
let metrics = Metrics::default();
|
||||
let client = http::new_client();
|
||||
@@ -538,7 +536,7 @@ mod tests {
|
||||
|
||||
// no counters have been registered
|
||||
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports.lock().unwrap());
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
assert!(r.is_empty());
|
||||
|
||||
// register a new counter
|
||||
@@ -550,7 +548,7 @@ mod tests {
|
||||
|
||||
// the counter should be observed despite 0 egress
|
||||
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports.lock().unwrap());
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
assert_eq!(r.len(), 1);
|
||||
assert_eq!(r[0].events.len(), 1);
|
||||
assert_eq!(r[0].events[0].value, 0);
|
||||
@@ -560,7 +558,7 @@ mod tests {
|
||||
|
||||
// egress should be observered
|
||||
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports.lock().unwrap());
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
assert_eq!(r.len(), 1);
|
||||
assert_eq!(r[0].events.len(), 1);
|
||||
assert_eq!(r[0].events[0].value, 1);
|
||||
@@ -570,7 +568,7 @@ mod tests {
|
||||
|
||||
// we do not observe the counter
|
||||
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports.lock().unwrap());
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
assert!(r.is_empty());
|
||||
|
||||
// counter is unregistered
|
||||
|
||||
@@ -12,8 +12,8 @@ use metrics::{
|
||||
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
|
||||
proto::MetricFamily,
|
||||
register_histogram_vec, register_int_counter, register_int_counter_pair,
|
||||
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge,
|
||||
HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
|
||||
register_int_counter_pair_vec, register_int_counter_vec, Gauge, HistogramVec, IntCounter,
|
||||
IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -231,14 +231,6 @@ pub(crate) static EVICTION_EVENTS_COMPLETED: Lazy<IntCounterVec> = Lazy::new(||
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static NUM_EVICTED_TIMELINES: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"safekeeper_evicted_timelines",
|
||||
"Number of currently evicted timelines"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub const LABEL_UNKNOWN: &str = "unknown";
|
||||
|
||||
/// Labels for traffic metrics.
|
||||
|
||||
@@ -631,19 +631,13 @@ impl Timeline {
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
self.bootstrap(
|
||||
shared_state,
|
||||
conf,
|
||||
broker_active_set,
|
||||
partial_backup_rate_limiter,
|
||||
);
|
||||
self.bootstrap(conf, broker_active_set, partial_backup_rate_limiter);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Bootstrap new or existing timeline starting background tasks.
|
||||
pub fn bootstrap(
|
||||
self: &Arc<Timeline>,
|
||||
_shared_state: &mut WriteGuardSharedState<'_>,
|
||||
conf: &SafeKeeperConf,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
partial_backup_rate_limiter: RateLimiter,
|
||||
|
||||
@@ -15,9 +15,7 @@ use tracing::{debug, info, instrument, warn};
|
||||
use utils::crashsafe::durable_rename;
|
||||
|
||||
use crate::{
|
||||
metrics::{
|
||||
EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED, NUM_EVICTED_TIMELINES,
|
||||
},
|
||||
metrics::{EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED},
|
||||
rate_limit::rand_duration,
|
||||
timeline_manager::{Manager, StateSnapshot},
|
||||
wal_backup,
|
||||
@@ -95,7 +93,6 @@ impl Manager {
|
||||
}
|
||||
|
||||
info!("successfully evicted timeline");
|
||||
NUM_EVICTED_TIMELINES.inc();
|
||||
}
|
||||
|
||||
/// Attempt to restore evicted timeline from remote storage; it must be
|
||||
@@ -131,7 +128,6 @@ impl Manager {
|
||||
tokio::time::Instant::now() + rand_duration(&self.conf.eviction_min_resident);
|
||||
|
||||
info!("successfully restored evicted timeline");
|
||||
NUM_EVICTED_TIMELINES.dec();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,10 +25,7 @@ use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
control_file::{FileStorage, Storage},
|
||||
metrics::{
|
||||
MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS,
|
||||
NUM_EVICTED_TIMELINES,
|
||||
},
|
||||
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS},
|
||||
rate_limit::{rand_duration, RateLimiter},
|
||||
recovery::recovery_main,
|
||||
remove_wal::calc_horizon_lsn,
|
||||
@@ -254,11 +251,6 @@ pub async fn main_task(
|
||||
mgr.recovery_task = Some(tokio::spawn(recovery_main(tli, mgr.conf.clone())));
|
||||
}
|
||||
|
||||
// If timeline is evicted, reflect that in the metric.
|
||||
if mgr.is_offloaded {
|
||||
NUM_EVICTED_TIMELINES.inc();
|
||||
}
|
||||
|
||||
let last_state = 'outer: loop {
|
||||
MANAGER_ITERATIONS_TOTAL.inc();
|
||||
|
||||
@@ -375,11 +367,6 @@ pub async fn main_task(
|
||||
mgr.update_wal_removal_end(res);
|
||||
}
|
||||
|
||||
// If timeline is deleted while evicted decrement the gauge.
|
||||
if mgr.tli.is_cancelled() && mgr.is_offloaded {
|
||||
NUM_EVICTED_TIMELINES.dec();
|
||||
}
|
||||
|
||||
mgr.set_status(Status::Finished);
|
||||
}
|
||||
|
||||
|
||||
@@ -165,14 +165,12 @@ impl GlobalTimelines {
|
||||
match Timeline::load_timeline(&conf, ttid) {
|
||||
Ok(timeline) => {
|
||||
let tli = Arc::new(timeline);
|
||||
let mut shared_state = tli.write_shared_state().await;
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(ttid, tli.clone());
|
||||
tli.bootstrap(
|
||||
&mut shared_state,
|
||||
&conf,
|
||||
broker_active_set.clone(),
|
||||
partial_backup_rate_limiter.clone(),
|
||||
@@ -215,7 +213,6 @@ impl GlobalTimelines {
|
||||
match Timeline::load_timeline(&conf, ttid) {
|
||||
Ok(timeline) => {
|
||||
let tli = Arc::new(timeline);
|
||||
let mut shared_state = tli.write_shared_state().await;
|
||||
|
||||
// TODO: prevent concurrent timeline creation/loading
|
||||
{
|
||||
@@ -230,13 +227,8 @@ impl GlobalTimelines {
|
||||
state.timelines.insert(ttid, tli.clone());
|
||||
}
|
||||
|
||||
tli.bootstrap(
|
||||
&mut shared_state,
|
||||
&conf,
|
||||
broker_active_set,
|
||||
partial_backup_rate_limiter,
|
||||
);
|
||||
drop(shared_state);
|
||||
tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter);
|
||||
|
||||
Ok(tli)
|
||||
}
|
||||
// If we can't load a timeline, it's bad. Caller will figure it out.
|
||||
|
||||
@@ -17,9 +17,7 @@ use std::time::Duration;
|
||||
use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::{XLogSegNo, PG_TLI};
|
||||
use remote_storage::{
|
||||
DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata,
|
||||
};
|
||||
use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata};
|
||||
use tokio::fs::File;
|
||||
|
||||
use tokio::select;
|
||||
@@ -505,12 +503,8 @@ pub async fn read_object(
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let opts = DownloadOpts {
|
||||
byte_start: std::ops::Bound::Included(offset),
|
||||
..Default::default()
|
||||
};
|
||||
let download = storage
|
||||
.download(file_path, &opts, &cancel)
|
||||
.download_storage_object(Some((offset, None)), file_path, &cancel)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to open WAL segment download stream for remote path {file_path:?}")
|
||||
|
||||
@@ -22,7 +22,7 @@ use utils::sync::gate::GateGuard;
|
||||
|
||||
use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
|
||||
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateLocation};
|
||||
|
||||
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
|
||||
|
||||
@@ -45,15 +45,8 @@ pub(super) struct Reconciler {
|
||||
pub(crate) reconciler_config: ReconcilerConfig,
|
||||
|
||||
pub(crate) config: TenantConfig,
|
||||
|
||||
/// Observed state from the point of view of the reconciler.
|
||||
/// This gets updated as the reconciliation makes progress.
|
||||
pub(crate) observed: ObservedState,
|
||||
|
||||
/// Snapshot of the observed state at the point when the reconciler
|
||||
/// was spawned.
|
||||
pub(crate) original_observed: ObservedState,
|
||||
|
||||
pub(crate) service_config: service::Config,
|
||||
|
||||
/// A hook to notify the running postgres instances when we change the location
|
||||
@@ -853,39 +846,6 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
/// Compare the observed state snapshot from when the reconcile was created
|
||||
/// with the final observed state in order to generate observed state deltas.
|
||||
pub(crate) fn observed_deltas(&self) -> Vec<ObservedStateDelta> {
|
||||
let mut deltas = Vec::default();
|
||||
|
||||
for (node_id, location) in &self.observed.locations {
|
||||
let previous_location = self.original_observed.locations.get(node_id);
|
||||
let do_upsert = match previous_location {
|
||||
// Location config changed for node
|
||||
Some(prev) if location.conf != prev.conf => true,
|
||||
// New location config for node
|
||||
None => true,
|
||||
// Location config has not changed for node
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if do_upsert {
|
||||
deltas.push(ObservedStateDelta::Upsert(Box::new((
|
||||
*node_id,
|
||||
location.clone(),
|
||||
))));
|
||||
}
|
||||
}
|
||||
|
||||
for node_id in self.original_observed.locations.keys() {
|
||||
if !self.observed.locations.contains_key(node_id) {
|
||||
deltas.push(ObservedStateDelta::Delete(*node_id));
|
||||
}
|
||||
}
|
||||
|
||||
deltas
|
||||
}
|
||||
|
||||
/// Keep trying to notify the compute indefinitely, only dropping out if:
|
||||
/// - the node `origin` becomes unavailable -> Ok(())
|
||||
/// - the node `origin` no longer has our tenant shard attached -> Ok(())
|
||||
|
||||
@@ -28,8 +28,8 @@ use crate::{
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
|
||||
ScheduleOptimization, ScheduleOptimizationAction,
|
||||
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
|
||||
ScheduleOptimizationAction,
|
||||
},
|
||||
};
|
||||
use anyhow::Context;
|
||||
@@ -1072,7 +1072,7 @@ impl Service {
|
||||
tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
|
||||
sequence=%result.sequence
|
||||
))]
|
||||
fn process_result(&self, result: ReconcileResult) {
|
||||
fn process_result(&self, mut result: ReconcileResult) {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
let Some(tenant) = tenants.get_mut(&result.tenant_shard_id) else {
|
||||
@@ -1094,27 +1094,22 @@ impl Service {
|
||||
|
||||
// In case a node was deleted while this reconcile is in flight, filter it out of the update we will
|
||||
// make to the tenant
|
||||
let deltas = result.observed_deltas.into_iter().flat_map(|delta| {
|
||||
// In case a node was deleted while this reconcile is in flight, filter it out of the update we will
|
||||
// make to the tenant
|
||||
let node = nodes.get(delta.node_id())?;
|
||||
|
||||
if node.is_available() {
|
||||
return Some(delta);
|
||||
}
|
||||
|
||||
// In case a node became unavailable concurrently with the reconcile, observed
|
||||
// locations on it are now uncertain. By convention, set them to None in order
|
||||
// for them to get refreshed when the node comes back online.
|
||||
Some(ObservedStateDelta::Upsert(Box::new((
|
||||
node.get_id(),
|
||||
ObservedStateLocation { conf: None },
|
||||
))))
|
||||
});
|
||||
result
|
||||
.observed
|
||||
.locations
|
||||
.retain(|node_id, _loc| nodes.contains_key(node_id));
|
||||
|
||||
match result.result {
|
||||
Ok(()) => {
|
||||
tenant.apply_observed_deltas(deltas);
|
||||
for (node_id, loc) in &result.observed.locations {
|
||||
if let Some(conf) = &loc.conf {
|
||||
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
|
||||
} else {
|
||||
tracing::info!("Setting observed location {} to None", node_id,)
|
||||
}
|
||||
}
|
||||
|
||||
tenant.observed = result.observed;
|
||||
tenant.waiter.advance(result.sequence);
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -1136,10 +1131,9 @@ impl Service {
|
||||
// so that waiters will see the correct error after waiting.
|
||||
tenant.set_last_error(result.sequence, e);
|
||||
|
||||
// Skip deletions on reconcile failures
|
||||
let upsert_deltas =
|
||||
deltas.filter(|delta| matches!(delta, ObservedStateDelta::Upsert(_)));
|
||||
tenant.apply_observed_deltas(upsert_deltas);
|
||||
for (node_id, o) in result.observed.locations {
|
||||
tenant.observed.locations.insert(node_id, o);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -425,22 +425,6 @@ pub(crate) enum ReconcileNeeded {
|
||||
Yes,
|
||||
}
|
||||
|
||||
/// Pending modification to the observed state of a tenant shard.
|
||||
/// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
|
||||
pub(crate) enum ObservedStateDelta {
|
||||
Upsert(Box<(NodeId, ObservedStateLocation)>),
|
||||
Delete(NodeId),
|
||||
}
|
||||
|
||||
impl ObservedStateDelta {
|
||||
pub(crate) fn node_id(&self) -> &NodeId {
|
||||
match self {
|
||||
Self::Upsert(up) => &up.0,
|
||||
Self::Delete(nid) => nid,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// When a reconcile task completes, it sends this result object
|
||||
/// to be applied to the primary TenantShard.
|
||||
pub(crate) struct ReconcileResult {
|
||||
@@ -453,7 +437,7 @@ pub(crate) struct ReconcileResult {
|
||||
|
||||
pub(crate) tenant_shard_id: TenantShardId,
|
||||
pub(crate) generation: Option<Generation>,
|
||||
pub(crate) observed_deltas: Vec<ObservedStateDelta>,
|
||||
pub(crate) observed: ObservedState,
|
||||
|
||||
/// Set [`TenantShard::pending_compute_notification`] from this flag
|
||||
pub(crate) pending_compute_notification: bool,
|
||||
@@ -1139,7 +1123,7 @@ impl TenantShard {
|
||||
result,
|
||||
tenant_shard_id: reconciler.tenant_shard_id,
|
||||
generation: reconciler.generation,
|
||||
observed_deltas: reconciler.observed_deltas(),
|
||||
observed: reconciler.observed,
|
||||
pending_compute_notification: reconciler.compute_notify_failure,
|
||||
}
|
||||
}
|
||||
@@ -1193,7 +1177,6 @@ impl TenantShard {
|
||||
reconciler_config,
|
||||
config: self.config.clone(),
|
||||
observed: self.observed.clone(),
|
||||
original_observed: self.observed.clone(),
|
||||
compute_hook: compute_hook.clone(),
|
||||
service_config: service_config.clone(),
|
||||
_gate_guard: gate_guard,
|
||||
@@ -1454,62 +1437,6 @@ impl TenantShard {
|
||||
.map(|(node_id, gen)| (node_id, Generation::new(gen)))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Update the observed state of the tenant by applying incremental deltas
|
||||
///
|
||||
/// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
|
||||
/// They are then filtered in [`crate::service::Service::process_result`].
|
||||
pub(crate) fn apply_observed_deltas(
|
||||
&mut self,
|
||||
deltas: impl Iterator<Item = ObservedStateDelta>,
|
||||
) {
|
||||
for delta in deltas {
|
||||
match delta {
|
||||
ObservedStateDelta::Upsert(ups) => {
|
||||
let (node_id, loc) = *ups;
|
||||
|
||||
// If the generation of the observed location in the delta is lagging
|
||||
// behind the current one, then we have a race condition and cannot
|
||||
// be certain about the true observed state. Set the observed state
|
||||
// to None in order to reflect this.
|
||||
let crnt_gen = self
|
||||
.observed
|
||||
.locations
|
||||
.get(&node_id)
|
||||
.and_then(|loc| loc.conf.as_ref())
|
||||
.and_then(|conf| conf.generation);
|
||||
let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
|
||||
match (crnt_gen, new_gen) {
|
||||
(Some(crnt), Some(new)) if crnt_gen > new_gen => {
|
||||
tracing::warn!(
|
||||
"Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
|
||||
node_id, loc, crnt, new
|
||||
);
|
||||
|
||||
self.observed
|
||||
.locations
|
||||
.insert(node_id, ObservedStateLocation { conf: None });
|
||||
|
||||
continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if let Some(conf) = &loc.conf {
|
||||
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
|
||||
} else {
|
||||
tracing::info!("Setting observed location {} to None", node_id,)
|
||||
}
|
||||
|
||||
self.observed.locations.insert(node_id, loc);
|
||||
}
|
||||
ObservedStateDelta::Delete(node_id) => {
|
||||
tracing::info!("Deleting observed location {}", node_id);
|
||||
self.observed.locations.remove(&node_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -192,7 +192,7 @@ def tpch_queuies() -> tuple[ParameterSet, ...]:
|
||||
- querues in returning tuple are ordered by the query number
|
||||
- pytest parameters id is adjusted to match the query id (the numbering starts from 1)
|
||||
"""
|
||||
queries_dir = Path(__file__).parent / "tpc-h" / "queries"
|
||||
queries_dir = Path(__file__).parent / "performance" / "tpc-h" / "queries"
|
||||
assert queries_dir.exists(), f"TPC-H queries dir not found: {queries_dir}"
|
||||
|
||||
return tuple(
|
||||
|
||||
@@ -666,17 +666,14 @@ def test_upgrade_generationless_local_file_paths(
|
||||
pageserver.stop()
|
||||
timeline_dir = pageserver.timeline_dir(tenant_id, timeline_id)
|
||||
files_renamed = 0
|
||||
log.info(f"Renaming files in {timeline_dir}")
|
||||
for filename in os.listdir(timeline_dir):
|
||||
if filename.endswith("-v1-00000001"):
|
||||
new_filename = filename[:-12]
|
||||
os.rename(
|
||||
os.path.join(timeline_dir, filename), os.path.join(timeline_dir, new_filename)
|
||||
)
|
||||
log.info(f"Renamed {filename} -> {new_filename}")
|
||||
path = os.path.join(timeline_dir, filename)
|
||||
log.info(f"Found file {path}")
|
||||
if path.endswith("-v1-00000001"):
|
||||
new_path = path[:-12]
|
||||
os.rename(path, new_path)
|
||||
log.info(f"Renamed {path} -> {new_path}")
|
||||
files_renamed += 1
|
||||
else:
|
||||
log.info(f"Keeping {filename}")
|
||||
|
||||
assert files_renamed > 0
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ from contextlib import closing
|
||||
from dataclasses import dataclass, field
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, List
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.errors
|
||||
@@ -735,7 +735,7 @@ class ProposerPostgres(PgProtocol):
|
||||
"""Path to postgresql.conf"""
|
||||
return os.path.join(self.pgdata_dir, "postgresql.conf")
|
||||
|
||||
def create_dir_config(self, safekeepers: str):
|
||||
def create_dir_config(self, safekeepers: str, additional_config_options: Optional[dict[str,str]] = None):
|
||||
"""Create dir and config for running --sync-safekeepers"""
|
||||
|
||||
Path(self.pg_data_dir_path()).mkdir(exist_ok=True)
|
||||
@@ -750,6 +750,9 @@ class ProposerPostgres(PgProtocol):
|
||||
f"listen_addresses = '{self.listen_addr}'\n",
|
||||
f"port = '{self.port}'\n",
|
||||
]
|
||||
if additional_config_options:
|
||||
for key, value in additional_config_options.items():
|
||||
cfg.append(f"{key} = '{value}'\n")
|
||||
|
||||
f.writelines(cfg)
|
||||
|
||||
@@ -1446,6 +1449,7 @@ class SafekeeperEnv:
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
num_safekeepers: int = 1,
|
||||
pg_conf_options: Optional[dict[str, str]] = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.port_distributor = port_distributor
|
||||
@@ -1457,6 +1461,7 @@ class SafekeeperEnv:
|
||||
self.postgres: Optional[ProposerPostgres] = None
|
||||
self.tenant_id: Optional[TenantId] = None
|
||||
self.timeline_id: Optional[TimelineId] = None
|
||||
self.pg_conf_options = pg_conf_options
|
||||
|
||||
def init(self) -> SafekeeperEnv:
|
||||
assert self.postgres is None, "postgres is already initialized"
|
||||
@@ -1499,6 +1504,7 @@ class SafekeeperEnv:
|
||||
str(i),
|
||||
"--broker-endpoint",
|
||||
self.fake_broker_endpoint,
|
||||
# "--no-sync",
|
||||
]
|
||||
log.info(f'Running command "{" ".join(cmd)}"')
|
||||
|
||||
@@ -1533,7 +1539,7 @@ class SafekeeperEnv:
|
||||
self.port_distributor.get_port(),
|
||||
)
|
||||
pg.initdb()
|
||||
pg.create_dir_config(self.get_safekeeper_connstrs())
|
||||
pg.create_dir_config(self.get_safekeeper_connstrs(), self.pg_conf_options)
|
||||
return pg
|
||||
|
||||
def kill_safekeeper(self, sk_dir):
|
||||
@@ -1558,6 +1564,224 @@ class SafekeeperEnv:
|
||||
self.kill_safekeeper(sk_proc.args[6])
|
||||
|
||||
|
||||
def run_pg_restore(pg_bin: PgBin, dump_file: Path, postgres: ProposerPostgres, table_names: List[str]):
|
||||
# env_vars = {
|
||||
# "PGOPTIONS": "-c maintenance_work_mem=4388608 -c max_parallel_maintenance_workers=7",
|
||||
# }
|
||||
|
||||
postgres.connstr
|
||||
|
||||
pg_restore_command: List[str] = [
|
||||
"pg_restore",
|
||||
"-v", # Verbose output
|
||||
"-d", postgres.connstr(options='-c maintenance_work_mem=4388608 -c max_parallel_maintenance_workers=7 -cstatement_timeout=0'), # Target database
|
||||
"--no-owner", # Do not restore ownership
|
||||
"--jobs=4", # Number of parallel jobs
|
||||
str(dump_file), # Dump file
|
||||
]
|
||||
|
||||
# Add table names to the command
|
||||
for table in table_names:
|
||||
pg_restore_command.insert(-2, "-t")
|
||||
pg_restore_command.insert(-2, table)
|
||||
|
||||
pg_bin.run(pg_restore_command)
|
||||
return None
|
||||
|
||||
|
||||
def download_pg_dump(database_name: str) -> Path:
|
||||
dump_file_path = Path(f"/tmp/{database_name}.pg_dump")
|
||||
if not dump_file_path.exists():
|
||||
s3_path = f"s3://neon-github-dev/performance/pgdumps/{database_name}/{database_name}.pg_dump"
|
||||
try:
|
||||
log.info(f"Downloading {s3_path} to {dump_file_path}")
|
||||
subprocess.run(["aws", "s3", "cp", s3_path, str(dump_file_path)], check=True)
|
||||
except subprocess.CalledProcessError:
|
||||
log.error("Failed to download the pg_dump file. Ensure AWS S3 credentials are set in the environment variables.")
|
||||
raise
|
||||
return dump_file_path
|
||||
|
||||
def test_safekeeper_without_pageserver_and_pg_restore(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
):
|
||||
# Create the environment in the test-specific output dir
|
||||
repo_dir = Path(os.path.join(test_output_dir, "repo"))
|
||||
|
||||
# Download the pg_dump file if it doesn't exist
|
||||
database_name = "clickbench" # Replace with your database name
|
||||
dump_file_path = download_pg_dump(database_name)
|
||||
|
||||
pg_conf_options: dict[str, str] = {
|
||||
"shared_buffers": "8GB",
|
||||
}
|
||||
|
||||
env = SafekeeperEnv(
|
||||
repo_dir,
|
||||
port_distributor,
|
||||
pg_bin,
|
||||
neon_binpath,
|
||||
pg_conf_options=pg_conf_options,
|
||||
)
|
||||
|
||||
with env:
|
||||
env.init()
|
||||
assert env.postgres is not None
|
||||
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
|
||||
log.info(f"shared_buffers: {shared_buffers}")
|
||||
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size before restore: {size_before}")
|
||||
|
||||
start_time = time.time()
|
||||
run_pg_restore(pg_bin, dump_file_path, env.postgres, ["hits"])
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size after restore: {size_after}")
|
||||
# Calculate the restore rate in bytes/second
|
||||
restored_size = size_after - size_before
|
||||
restore_rate = restored_size / duration
|
||||
log.info(f"pg_restore duration: {duration:.2f} seconds")
|
||||
log.info(f"Restore rate: {restore_rate:.2f} bytes/second")
|
||||
|
||||
def test_safekeeper_without_pageserver_and_pg_restore_tpch(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
):
|
||||
# Create the environment in the test-specific output dir
|
||||
repo_dir = Path(os.path.join(test_output_dir, "repo"))
|
||||
|
||||
# Download the pg_dump file if it doesn't exist
|
||||
database_name = "tpch" # Replace with your database name
|
||||
dump_file_path = download_pg_dump(database_name)
|
||||
|
||||
pg_conf_options: dict[str, str] = {
|
||||
"shared_buffers": "8GB",
|
||||
}
|
||||
|
||||
env = SafekeeperEnv(
|
||||
repo_dir,
|
||||
port_distributor,
|
||||
pg_bin,
|
||||
neon_binpath,
|
||||
pg_conf_options=pg_conf_options,
|
||||
)
|
||||
|
||||
with env:
|
||||
env.init()
|
||||
assert env.postgres is not None
|
||||
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
|
||||
log.info(f"shared_buffers: {shared_buffers}")
|
||||
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size before restore: {size_before}")
|
||||
|
||||
table_names = ["customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier"]
|
||||
start_time = time.time()
|
||||
run_pg_restore(pg_bin, dump_file_path, env.postgres, table_names)
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size after restore: {size_after}")
|
||||
# Calculate the restore rate in bytes/second
|
||||
restored_size = size_after - size_before
|
||||
restore_rate = restored_size / duration
|
||||
log.info(f"pg_restore duration: {duration:.2f} seconds")
|
||||
log.info(f"Restore rate: {restore_rate:.2f} bytes/second")
|
||||
|
||||
def test_safekeeper_without_pageserver_and_waltest(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
):
|
||||
# Create the environment in the test-specific output dir
|
||||
repo_dir = Path(os.path.join(test_output_dir, "repo"))
|
||||
|
||||
pg_conf_options: dict[str, str] = {
|
||||
"shared_buffers": "8GB",
|
||||
}
|
||||
|
||||
env = SafekeeperEnv(
|
||||
repo_dir,
|
||||
port_distributor,
|
||||
pg_bin,
|
||||
neon_binpath,
|
||||
pg_conf_options=pg_conf_options,
|
||||
)
|
||||
|
||||
wal_test = """
|
||||
CREATE OR REPLACE FUNCTION public.wal_bandwidth_test()
|
||||
RETURNS text
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
declare
|
||||
i int;
|
||||
j int;
|
||||
lastlsn pg_lsn;
|
||||
nowlsn pg_lsn;
|
||||
lastts timestamp;
|
||||
nowts timestamp;
|
||||
bandwidth numeric;
|
||||
|
||||
result text := ''; -- Initialize an empty string to capture messages
|
||||
begin
|
||||
for i in 1..100 loop
|
||||
|
||||
lastlsn = pg_current_wal_insert_lsn();
|
||||
lastts = clock_timestamp();
|
||||
|
||||
-- Emit 100 MB of WAL
|
||||
for j in 1..10000 loop
|
||||
perform pg_logical_emit_message(false, '', repeat('x', 10486));
|
||||
end loop;
|
||||
|
||||
nowlsn = pg_current_wal_insert_lsn();
|
||||
nowts = clock_timestamp();
|
||||
|
||||
bandwidth = (nowlsn - lastlsn) / (extract(epoch from nowts) - extract(epoch from lastts));
|
||||
|
||||
-- Capture the message instead of raising a notice
|
||||
result := result || format('bandwidth: %s kB / s%s',
|
||||
lpad(round(bandwidth / 1024)::text, 10),
|
||||
chr(10)); -- Newline for formatting
|
||||
|
||||
end loop;
|
||||
|
||||
return result; -- Return the concatenated string of messages
|
||||
end;
|
||||
$function$
|
||||
;
|
||||
"""
|
||||
|
||||
with env:
|
||||
env.init()
|
||||
assert env.postgres is not None
|
||||
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
|
||||
log.info(f"shared_buffers: {shared_buffers}")
|
||||
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size before test: {size_before}")
|
||||
#env.postgres.safe_psql("create extension neon;")
|
||||
env.postgres.safe_psql(wal_test)
|
||||
start_time = time.time()
|
||||
output = env.postgres.safe_psql("""
|
||||
SET statement_timeout = 0;
|
||||
select wal_bandwidth_test();
|
||||
""")[0][0]
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
log.info(output)
|
||||
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size after test (irrelevant because no real WAL records, no relations): {size_after}")
|
||||
# Calculate the restore rate in bytes/second
|
||||
restored_size = size_after - size_before
|
||||
restore_rate = restored_size / duration
|
||||
log.info(f"test duration: {duration:.2f} seconds")
|
||||
log.info(f"Average ingest rate(irrelevant because no real WAL records, no relations): {restore_rate:.2f} bytes/second")
|
||||
|
||||
def test_safekeeper_without_pageserver(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
@@ -2314,12 +2538,12 @@ def test_s3_eviction(
|
||||
]
|
||||
if delete_offloaded_wal:
|
||||
neon_env_builder.safekeeper_extra_opts.append("--delete-offloaded-wal")
|
||||
# make lagging_wal_timeout small to force pageserver quickly forget about
|
||||
# safekeeper after it stops sending updates (timeline is deactivated) to
|
||||
# make test faster. Won't be needed with
|
||||
# https://github.com/neondatabase/neon/issues/8148 fixed.
|
||||
initial_tenant_conf = {"lagging_wal_timeout": "1s", "checkpoint_timeout": "100ms"}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=initial_tenant_conf)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"checkpoint_timeout": "100ms",
|
||||
}
|
||||
)
|
||||
|
||||
n_timelines = 5
|
||||
|
||||
@@ -2407,37 +2631,9 @@ def test_s3_eviction(
|
||||
and sk.log_contains("successfully restored evicted timeline")
|
||||
for sk in env.safekeepers
|
||||
)
|
||||
|
||||
assert event_metrics_seen
|
||||
|
||||
# test safekeeper_evicted_timelines metric
|
||||
log.info("testing safekeeper_evicted_timelines metric")
|
||||
# checkpoint pageserver to force remote_consistent_lsn update
|
||||
for i in range(n_timelines):
|
||||
ps_client.timeline_checkpoint(env.initial_tenant, timelines[i], wait_until_uploaded=True)
|
||||
for ep in endpoints:
|
||||
log.info(ep.is_running())
|
||||
sk = env.safekeepers[0]
|
||||
|
||||
# all timelines must be evicted eventually
|
||||
def all_evicted():
|
||||
n_evicted = sk.http_client().get_metric_value("safekeeper_evicted_timelines")
|
||||
assert n_evicted # make mypy happy
|
||||
assert int(n_evicted) == n_timelines
|
||||
|
||||
wait_until(60, 0.5, all_evicted)
|
||||
# restart should preserve the metric value
|
||||
sk.stop().start()
|
||||
wait_until(60, 0.5, all_evicted)
|
||||
# and endpoint start should reduce is
|
||||
endpoints[0].start()
|
||||
|
||||
def one_unevicted():
|
||||
n_evicted = sk.http_client().get_metric_value("safekeeper_evicted_timelines")
|
||||
assert n_evicted # make mypy happy
|
||||
assert int(n_evicted) < n_timelines
|
||||
|
||||
wait_until(60, 0.5, one_unevicted)
|
||||
|
||||
|
||||
# Test resetting uploaded partial segment state.
|
||||
def test_backup_partial_reset(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
Reference in New Issue
Block a user