diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs
index 3e558a7d3c..285be56264 100644
--- a/compute_tools/src/compute.rs
+++ b/compute_tools/src/compute.rs
@@ -1484,6 +1484,28 @@ LIMIT 100",
info!("Pageserver config changed");
}
}
+
+ // Gather info about installed extensions
+ pub fn get_installed_extensions(&self) -> Result<()> {
+ let connstr = self.connstr.clone();
+
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .expect("failed to create runtime");
+ let result = rt
+ .block_on(crate::installed_extensions::get_installed_extensions(
+ connstr,
+ ))
+ .expect("failed to get installed extensions");
+
+ info!(
+ "{}",
+ serde_json::to_string(&result).expect("failed to serialize extensions list")
+ );
+
+ Ok(())
+ }
}
pub fn forward_termination_signal() {
diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs
index fade3bbe6d..79e6158081 100644
--- a/compute_tools/src/http/api.rs
+++ b/compute_tools/src/http/api.rs
@@ -165,6 +165,32 @@ async fn routes(req: Request
, compute: &Arc) -> Response {
+ info!("serving /installed_extensions GET request");
+ let status = compute.get_status();
+ if status != ComputeStatus::Running {
+ let msg = format!(
+ "invalid compute status for extensions request: {:?}",
+ status
+ );
+ error!(msg);
+ return Response::new(Body::from(msg));
+ }
+
+ let connstr = compute.connstr.clone();
+ let res = crate::installed_extensions::get_installed_extensions(connstr).await;
+ match res {
+ Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
+ Err(e) => render_json_error(
+ &format!("could not get list of installed extensions: {}", e),
+ StatusCode::INTERNAL_SERVER_ERROR,
+ ),
+ }
+ }
+
// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml
index b0ddaeae2b..e9fa66b323 100644
--- a/compute_tools/src/http/openapi_spec.yaml
+++ b/compute_tools/src/http/openapi_spec.yaml
@@ -53,6 +53,20 @@ paths:
schema:
$ref: "#/components/schemas/ComputeInsights"
+ /installed_extensions:
+ get:
+ tags:
+ - Info
+ summary: Get installed extensions.
+ description: ""
+ operationId: getInstalledExtensions
+ responses:
+ 200:
+ description: List of installed extensions
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/InstalledExtensions"
/info:
get:
tags:
@@ -395,6 +409,24 @@ components:
- configuration
example: running
+ InstalledExtensions:
+ type: object
+ properties:
+ extensions:
+ description: Contains list of installed extensions.
+ type: array
+ items:
+ type: object
+ properties:
+ extname:
+ type: string
+ versions:
+ type: array
+ items:
+ type: string
+ n_databases:
+ type: integer
+
#
# Errors
#
diff --git a/compute_tools/src/installed_extensions.rs b/compute_tools/src/installed_extensions.rs
new file mode 100644
index 0000000000..3d8b22a8a3
--- /dev/null
+++ b/compute_tools/src/installed_extensions.rs
@@ -0,0 +1,80 @@
+use compute_api::responses::{InstalledExtension, InstalledExtensions};
+use std::collections::HashMap;
+use std::collections::HashSet;
+use url::Url;
+
+use anyhow::Result;
+use postgres::{Client, NoTls};
+use tokio::task;
+
+/// We don't reuse get_existing_dbs() just for code clarity
+/// and to make database listing query here more explicit.
+///
+/// Limit the number of databases to 500 to avoid excessive load.
+fn list_dbs(client: &mut Client) -> Result> {
+ // `pg_database.datconnlimit = -2` means that the database is in the
+ // invalid state
+ let databases = client
+ .query(
+ "SELECT datname FROM pg_catalog.pg_database
+ WHERE datallowconn
+ AND datconnlimit <> - 2
+ LIMIT 500",
+ &[],
+ )?
+ .iter()
+ .map(|row| {
+ let db: String = row.get("datname");
+ db
+ })
+ .collect();
+
+ Ok(databases)
+}
+
+/// Connect to every database (see list_dbs above) and get the list of installed extensions.
+/// Same extension can be installed in multiple databases with different versions,
+/// we only keep the highest and lowest version across all databases.
+pub async fn get_installed_extensions(connstr: Url) -> Result {
+ let mut connstr = connstr.clone();
+
+ task::spawn_blocking(move || {
+ let mut client = Client::connect(connstr.as_str(), NoTls)?;
+ let databases: Vec = list_dbs(&mut client)?;
+
+ let mut extensions_map: HashMap = HashMap::new();
+ for db in databases.iter() {
+ connstr.set_path(db);
+ let mut db_client = Client::connect(connstr.as_str(), NoTls)?;
+ let extensions: Vec<(String, String)> = db_client
+ .query(
+ "SELECT extname, extversion FROM pg_catalog.pg_extension;",
+ &[],
+ )?
+ .iter()
+ .map(|row| (row.get("extname"), row.get("extversion")))
+ .collect();
+
+ for (extname, v) in extensions.iter() {
+ let version = v.to_string();
+ extensions_map
+ .entry(extname.to_string())
+ .and_modify(|e| {
+ e.versions.insert(version.clone());
+ // count the number of databases where the extension is installed
+ e.n_databases += 1;
+ })
+ .or_insert(InstalledExtension {
+ extname: extname.to_string(),
+ versions: HashSet::from([version.clone()]),
+ n_databases: 1,
+ });
+ }
+ }
+
+ Ok(InstalledExtensions {
+ extensions: extensions_map.values().cloned().collect(),
+ })
+ })
+ .await?
+}
diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs
index 477f423aa2..d27ae58fa2 100644
--- a/compute_tools/src/lib.rs
+++ b/compute_tools/src/lib.rs
@@ -15,6 +15,7 @@ pub mod catalog;
pub mod compute;
pub mod disk_quota;
pub mod extension_server;
+pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;
mod migration;
diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs
index 3f055b914a..5023fce003 100644
--- a/libs/compute_api/src/responses.rs
+++ b/libs/compute_api/src/responses.rs
@@ -1,5 +1,6 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
+use std::collections::HashSet;
use std::fmt::Display;
use chrono::{DateTime, Utc};
@@ -155,3 +156,15 @@ pub enum ControlPlaneComputeStatus {
// should be able to start with provided spec.
Attached,
}
+
+#[derive(Clone, Debug, Default, Serialize)]
+pub struct InstalledExtension {
+ pub extname: String,
+ pub versions: HashSet,
+ pub n_databases: u32, // Number of databases using this extension
+}
+
+#[derive(Clone, Debug, Default, Serialize)]
+pub struct InstalledExtensions {
+ pub extensions: Vec,
+}
diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs
index e113a987a5..f98d16789c 100644
--- a/libs/remote_storage/src/azure_blob.rs
+++ b/libs/remote_storage/src/azure_blob.rs
@@ -496,26 +496,12 @@ impl RemoteStorage for AzureBlobStorage {
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
}
- self.download_for_builder(builder, cancel).await
- }
-
- async fn download_byte_range(
- &self,
- from: &RemotePath,
- start_inclusive: u64,
- end_exclusive: Option,
- cancel: &CancellationToken,
- ) -> Result {
- let blob_client = self.client.blob_client(self.relative_path_to_name(from));
-
- let mut builder = blob_client.get();
-
- let range: Range = if let Some(end_exclusive) = end_exclusive {
- (start_inclusive..end_exclusive).into()
- } else {
- (start_inclusive..).into()
- };
- builder = builder.range(range);
+ if let Some((start, end)) = opts.byte_range() {
+ builder = builder.range(match end {
+ Some(end) => Range::Range(start..end),
+ None => Range::RangeFrom(start..),
+ });
+ }
self.download_for_builder(builder, cancel).await
}
diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs
index 0ff0f1c878..c6466237bf 100644
--- a/libs/remote_storage/src/lib.rs
+++ b/libs/remote_storage/src/lib.rs
@@ -19,7 +19,8 @@ mod simulate_failures;
mod support;
use std::{
- collections::HashMap, fmt::Debug, num::NonZeroU32, pin::Pin, sync::Arc, time::SystemTime,
+ collections::HashMap, fmt::Debug, num::NonZeroU32, ops::Bound, pin::Pin, sync::Arc,
+ time::SystemTime,
};
use anyhow::Context;
@@ -162,11 +163,60 @@ pub struct Listing {
}
/// Options for downloads. The default value is a plain GET.
-#[derive(Default)]
pub struct DownloadOpts {
/// If given, returns [`DownloadError::Unmodified`] if the object still has
/// the same ETag (using If-None-Match).
pub etag: Option,
+ /// The start of the byte range to download, or unbounded.
+ pub byte_start: Bound,
+ /// The end of the byte range to download, or unbounded. Must be after the
+ /// start bound.
+ pub byte_end: Bound,
+}
+
+impl Default for DownloadOpts {
+ fn default() -> Self {
+ Self {
+ etag: Default::default(),
+ byte_start: Bound::Unbounded,
+ byte_end: Bound::Unbounded,
+ }
+ }
+}
+
+impl DownloadOpts {
+ /// Returns the byte range with inclusive start and exclusive end, or None
+ /// if unbounded.
+ pub fn byte_range(&self) -> Option<(u64, Option)> {
+ if self.byte_start == Bound::Unbounded && self.byte_end == Bound::Unbounded {
+ return None;
+ }
+ let start = match self.byte_start {
+ Bound::Excluded(i) => i + 1,
+ Bound::Included(i) => i,
+ Bound::Unbounded => 0,
+ };
+ let end = match self.byte_end {
+ Bound::Excluded(i) => Some(i),
+ Bound::Included(i) => Some(i + 1),
+ Bound::Unbounded => None,
+ };
+ if let Some(end) = end {
+ assert!(start < end, "range end {end} at or before start {start}");
+ }
+ Some((start, end))
+ }
+
+ /// Returns the byte range as an RFC 2616 Range header value with inclusive
+ /// bounds, or None if unbounded.
+ pub fn byte_range_header(&self) -> Option {
+ self.byte_range()
+ .map(|(start, end)| (start, end.map(|end| end - 1))) // make end inclusive
+ .map(|(start, end)| match end {
+ Some(end) => format!("bytes={start}-{end}"),
+ None => format!("bytes={start}-"),
+ })
+ }
}
/// Storage (potentially remote) API to manage its state.
@@ -257,21 +307,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
cancel: &CancellationToken,
) -> Result;
- /// Streams a given byte range of the remote storage entry contents.
- ///
- /// The returned download stream will obey initial timeout and cancellation signal by erroring
- /// on whichever happens first. Only one of the reasons will fail the stream, which is usually
- /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
- ///
- /// Returns the metadata, if any was stored with the file previously.
- async fn download_byte_range(
- &self,
- from: &RemotePath,
- start_inclusive: u64,
- end_exclusive: Option,
- cancel: &CancellationToken,
- ) -> Result;
-
/// Delete a single path from remote storage.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
@@ -425,33 +460,6 @@ impl GenericRemoteStorage> {
}
}
- pub async fn download_byte_range(
- &self,
- from: &RemotePath,
- start_inclusive: u64,
- end_exclusive: Option,
- cancel: &CancellationToken,
- ) -> Result {
- match self {
- Self::LocalFs(s) => {
- s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
- .await
- }
- Self::AwsS3(s) => {
- s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
- .await
- }
- Self::AzureBlob(s) => {
- s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
- .await
- }
- Self::Unreliable(s) => {
- s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
- .await
- }
- }
- }
-
/// See [`RemoteStorage::delete`]
pub async fn delete(
&self,
@@ -573,20 +581,6 @@ impl GenericRemoteStorage {
})
}
- /// Downloads the storage object into the `to_path` provided.
- /// `byte_range` could be specified to dowload only a part of the file, if needed.
- pub async fn download_storage_object(
- &self,
- byte_range: Option<(u64, Option)>,
- from: &RemotePath,
- cancel: &CancellationToken,
- ) -> Result {
- match byte_range {
- Some((start, end)) => self.download_byte_range(from, start, end, cancel).await,
- None => self.download(from, &DownloadOpts::default(), cancel).await,
- }
- }
-
/// The name of the bucket/container/etc.
pub fn bucket_name(&self) -> Option<&str> {
match self {
@@ -660,6 +654,76 @@ impl ConcurrencyLimiter {
mod tests {
use super::*;
+ /// DownloadOpts::byte_range() should generate (inclusive, exclusive) ranges
+ /// with optional end bound, or None when unbounded.
+ #[test]
+ fn download_opts_byte_range() {
+ // Consider using test_case or a similar table-driven test framework.
+ let cases = [
+ // (byte_start, byte_end, expected)
+ (Bound::Unbounded, Bound::Unbounded, None),
+ (Bound::Unbounded, Bound::Included(7), Some((0, Some(8)))),
+ (Bound::Unbounded, Bound::Excluded(7), Some((0, Some(7)))),
+ (Bound::Included(3), Bound::Unbounded, Some((3, None))),
+ (Bound::Included(3), Bound::Included(7), Some((3, Some(8)))),
+ (Bound::Included(3), Bound::Excluded(7), Some((3, Some(7)))),
+ (Bound::Excluded(3), Bound::Unbounded, Some((4, None))),
+ (Bound::Excluded(3), Bound::Included(7), Some((4, Some(8)))),
+ (Bound::Excluded(3), Bound::Excluded(7), Some((4, Some(7)))),
+ // 1-sized ranges are fine, 0 aren't and will panic (separate test).
+ (Bound::Included(3), Bound::Included(3), Some((3, Some(4)))),
+ (Bound::Included(3), Bound::Excluded(4), Some((3, Some(4)))),
+ ];
+
+ for (byte_start, byte_end, expect) in cases {
+ let opts = DownloadOpts {
+ byte_start,
+ byte_end,
+ ..Default::default()
+ };
+ let result = opts.byte_range();
+ assert_eq!(
+ result, expect,
+ "byte_start={byte_start:?} byte_end={byte_end:?}"
+ );
+
+ // Check generated HTTP header, which uses an inclusive range.
+ let expect_header = expect.map(|(start, end)| match end {
+ Some(end) => format!("bytes={start}-{}", end - 1), // inclusive end
+ None => format!("bytes={start}-"),
+ });
+ assert_eq!(
+ opts.byte_range_header(),
+ expect_header,
+ "byte_start={byte_start:?} byte_end={byte_end:?}"
+ );
+ }
+ }
+
+ /// DownloadOpts::byte_range() zero-sized byte range should panic.
+ #[test]
+ #[should_panic]
+ fn download_opts_byte_range_zero() {
+ DownloadOpts {
+ byte_start: Bound::Included(3),
+ byte_end: Bound::Excluded(3),
+ ..Default::default()
+ }
+ .byte_range();
+ }
+
+ /// DownloadOpts::byte_range() negative byte range should panic.
+ #[test]
+ #[should_panic]
+ fn download_opts_byte_range_negative() {
+ DownloadOpts {
+ byte_start: Bound::Included(3),
+ byte_end: Bound::Included(2),
+ ..Default::default()
+ }
+ .byte_range();
+ }
+
#[test]
fn test_object_name() {
let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs
index d912b94c74..93a052139b 100644
--- a/libs/remote_storage/src/local_fs.rs
+++ b/libs/remote_storage/src/local_fs.rs
@@ -506,54 +506,7 @@ impl RemoteStorage for LocalFs {
return Err(DownloadError::Unmodified);
}
- let source = ReaderStream::new(
- fs::OpenOptions::new()
- .read(true)
- .open(&target_path)
- .await
- .with_context(|| {
- format!("Failed to open source file {target_path:?} to use in the download")
- })
- .map_err(DownloadError::Other)?,
- );
-
- let metadata = self
- .read_storage_metadata(&target_path)
- .await
- .map_err(DownloadError::Other)?;
-
- let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
- let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
-
- Ok(Download {
- metadata,
- last_modified: file_metadata
- .modified()
- .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
- etag,
- download_stream: Box::pin(source),
- })
- }
-
- async fn download_byte_range(
- &self,
- from: &RemotePath,
- start_inclusive: u64,
- end_exclusive: Option,
- cancel: &CancellationToken,
- ) -> Result {
- if let Some(end_exclusive) = end_exclusive {
- if end_exclusive <= start_inclusive {
- return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
- };
- if start_inclusive == end_exclusive.saturating_sub(1) {
- return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
- }
- }
-
- let target_path = from.with_base(&self.storage_root);
- let file_metadata = file_metadata(&target_path).await?;
- let mut source = tokio::fs::OpenOptions::new()
+ let mut file = fs::OpenOptions::new()
.read(true)
.open(&target_path)
.await
@@ -562,31 +515,29 @@ impl RemoteStorage for LocalFs {
})
.map_err(DownloadError::Other)?;
- let len = source
- .metadata()
- .await
- .context("query file length")
- .map_err(DownloadError::Other)?
- .len();
+ let mut take = file_metadata.len();
+ if let Some((start, end)) = opts.byte_range() {
+ if start > 0 {
+ file.seek(io::SeekFrom::Start(start))
+ .await
+ .context("Failed to seek to the range start in a local storage file")
+ .map_err(DownloadError::Other)?;
+ }
+ if let Some(end) = end {
+ take = end - start;
+ }
+ }
- source
- .seek(io::SeekFrom::Start(start_inclusive))
- .await
- .context("Failed to seek to the range start in a local storage file")
- .map_err(DownloadError::Other)?;
+ let source = ReaderStream::new(file.take(take));
let metadata = self
.read_storage_metadata(&target_path)
.await
.map_err(DownloadError::Other)?;
- let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
- let source = ReaderStream::new(source);
-
let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
- let etag = mock_etag(&file_metadata);
Ok(Download {
metadata,
last_modified: file_metadata
@@ -688,7 +639,7 @@ mod fs_tests {
use super::*;
use camino_tempfile::tempdir;
- use std::{collections::HashMap, io::Write};
+ use std::{collections::HashMap, io::Write, ops::Bound};
async fn read_and_check_metadata(
storage: &LocalFs,
@@ -804,10 +755,12 @@ mod fs_tests {
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
let first_part_download = storage
- .download_byte_range(
+ .download(
&upload_target,
- 0,
- Some(first_part_local.len() as u64),
+ &DownloadOpts {
+ byte_end: Bound::Excluded(first_part_local.len() as u64),
+ ..Default::default()
+ },
&cancel,
)
.await?;
@@ -823,10 +776,15 @@ mod fs_tests {
);
let second_part_download = storage
- .download_byte_range(
+ .download(
&upload_target,
- first_part_local.len() as u64,
- Some((first_part_local.len() + second_part_local.len()) as u64),
+ &DownloadOpts {
+ byte_start: Bound::Included(first_part_local.len() as u64),
+ byte_end: Bound::Excluded(
+ (first_part_local.len() + second_part_local.len()) as u64,
+ ),
+ ..Default::default()
+ },
&cancel,
)
.await?;
@@ -842,7 +800,14 @@ mod fs_tests {
);
let suffix_bytes = storage
- .download_byte_range(&upload_target, 13, None, &cancel)
+ .download(
+ &upload_target,
+ &DownloadOpts {
+ byte_start: Bound::Included(13),
+ ..Default::default()
+ },
+ &cancel,
+ )
.await?
.download_stream;
let suffix_bytes = aggregate(suffix_bytes).await?;
@@ -850,7 +815,7 @@ mod fs_tests {
assert_eq!(upload_name, suffix);
let all_bytes = storage
- .download_byte_range(&upload_target, 0, None, &cancel)
+ .download(&upload_target, &DownloadOpts::default(), &cancel)
.await?
.download_stream;
let all_bytes = aggregate(all_bytes).await?;
@@ -861,48 +826,26 @@ mod fs_tests {
}
#[tokio::test]
- async fn download_file_range_negative() -> anyhow::Result<()> {
- let (storage, cancel) = create_storage()?;
+ #[should_panic(expected = "at or before start")]
+ async fn download_file_range_negative() {
+ let (storage, cancel) = create_storage().unwrap();
let upload_name = "upload_1";
- let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
+ let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel)
+ .await
+ .unwrap();
- let start = 1_000_000_000;
- let end = start + 1;
- match storage
- .download_byte_range(
+ storage
+ .download(
&upload_target,
- start,
- Some(end), // exclusive end
+ &DownloadOpts {
+ byte_start: Bound::Included(10),
+ byte_end: Bound::Excluded(10),
+ ..Default::default()
+ },
&cancel,
)
.await
- {
- Ok(_) => panic!("Should not allow downloading wrong ranges"),
- Err(e) => {
- let error_string = e.to_string();
- assert!(error_string.contains("zero bytes"));
- assert!(error_string.contains(&start.to_string()));
- assert!(error_string.contains(&end.to_string()));
- }
- }
-
- let start = 10000;
- let end = 234;
- assert!(start > end, "Should test an incorrect range");
- match storage
- .download_byte_range(&upload_target, start, Some(end), &cancel)
- .await
- {
- Ok(_) => panic!("Should not allow downloading wrong ranges"),
- Err(e) => {
- let error_string = e.to_string();
- assert!(error_string.contains("Invalid range"));
- assert!(error_string.contains(&start.to_string()));
- assert!(error_string.contains(&end.to_string()));
- }
- }
-
- Ok(())
+ .unwrap();
}
#[tokio::test]
@@ -945,10 +888,12 @@ mod fs_tests {
let (first_part_local, _) = uploaded_bytes.split_at(3);
let partial_download_with_metadata = storage
- .download_byte_range(
+ .download(
&upload_target,
- 0,
- Some(first_part_local.len() as u64),
+ &DownloadOpts {
+ byte_end: Bound::Excluded(first_part_local.len() as u64),
+ ..Default::default()
+ },
&cancel,
)
.await?;
diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs
index ec7c047565..f950f2886c 100644
--- a/libs/remote_storage/src/s3_bucket.rs
+++ b/libs/remote_storage/src/s3_bucket.rs
@@ -804,34 +804,7 @@ impl RemoteStorage for S3Bucket {
bucket: self.bucket_name.clone(),
key: self.relative_path_to_s3_object(from),
etag: opts.etag.as_ref().map(|e| e.to_string()),
- range: None,
- },
- cancel,
- )
- .await
- }
-
- async fn download_byte_range(
- &self,
- from: &RemotePath,
- start_inclusive: u64,
- end_exclusive: Option,
- cancel: &CancellationToken,
- ) -> Result {
- // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
- // and needs both ends to be exclusive
- let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
- let range = Some(match end_inclusive {
- Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
- None => format!("bytes={start_inclusive}-"),
- });
-
- self.download_object(
- GetObjectRequest {
- bucket: self.bucket_name.clone(),
- key: self.relative_path_to_s3_object(from),
- etag: None,
- range,
+ range: opts.byte_range_header(),
},
cancel,
)
diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs
index 05f82b5a5a..10db53971c 100644
--- a/libs/remote_storage/src/simulate_failures.rs
+++ b/libs/remote_storage/src/simulate_failures.rs
@@ -170,28 +170,13 @@ impl RemoteStorage for UnreliableWrapper {
opts: &DownloadOpts,
cancel: &CancellationToken,
) -> Result {
+ // Note: We treat any byte range as an "attempt" of the same operation.
+ // We don't pay attention to the ranges. That's good enough for now.
self.attempt(RemoteOp::Download(from.clone()))
.map_err(DownloadError::Other)?;
self.inner.download(from, opts, cancel).await
}
- async fn download_byte_range(
- &self,
- from: &RemotePath,
- start_inclusive: u64,
- end_exclusive: Option,
- cancel: &CancellationToken,
- ) -> Result {
- // Note: We treat any download_byte_range as an "attempt" of the same
- // operation. We don't pay attention to the ranges. That's good enough
- // for now.
- self.attempt(RemoteOp::Download(from.clone()))
- .map_err(DownloadError::Other)?;
- self.inner
- .download_byte_range(from, start_inclusive, end_exclusive, cancel)
- .await
- }
-
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
self.delete_inner(path, true, cancel).await
}
diff --git a/libs/remote_storage/tests/common/tests.rs b/libs/remote_storage/tests/common/tests.rs
index e38cfb3ef0..e6f33fc3f8 100644
--- a/libs/remote_storage/tests/common/tests.rs
+++ b/libs/remote_storage/tests/common/tests.rs
@@ -2,6 +2,7 @@ use anyhow::Context;
use camino::Utf8Path;
use futures::StreamExt;
use remote_storage::{DownloadError, DownloadOpts, ListingMode, ListingObject, RemotePath};
+use std::ops::Bound;
use std::sync::Arc;
use std::{collections::HashSet, num::NonZeroU32};
use test_context::test_context;
@@ -293,7 +294,15 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
// Full range (end specified)
let dl = ctx
.client
- .download_byte_range(&path, 0, Some(len as u64), &cancel)
+ .download(
+ &path,
+ &DownloadOpts {
+ byte_start: Bound::Included(0),
+ byte_end: Bound::Excluded(len as u64),
+ ..Default::default()
+ },
+ &cancel,
+ )
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig);
@@ -301,7 +310,15 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
// partial range (end specified)
let dl = ctx
.client
- .download_byte_range(&path, 4, Some(10), &cancel)
+ .download(
+ &path,
+ &DownloadOpts {
+ byte_start: Bound::Included(4),
+ byte_end: Bound::Excluded(10),
+ ..Default::default()
+ },
+ &cancel,
+ )
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig[4..10]);
@@ -309,7 +326,15 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
// partial range (end beyond real end)
let dl = ctx
.client
- .download_byte_range(&path, 8, Some(len as u64 * 100), &cancel)
+ .download(
+ &path,
+ &DownloadOpts {
+ byte_start: Bound::Included(8),
+ byte_end: Bound::Excluded(len as u64 * 100),
+ ..Default::default()
+ },
+ &cancel,
+ )
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig[8..]);
@@ -317,7 +342,14 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
// Partial range (end unspecified)
let dl = ctx
.client
- .download_byte_range(&path, 4, None, &cancel)
+ .download(
+ &path,
+ &DownloadOpts {
+ byte_start: Bound::Included(4),
+ ..Default::default()
+ },
+ &cancel,
+ )
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig[4..]);
@@ -325,7 +357,14 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
// Full range (end unspecified)
let dl = ctx
.client
- .download_byte_range(&path, 0, None, &cancel)
+ .download(
+ &path,
+ &DownloadOpts {
+ byte_start: Bound::Included(0),
+ ..Default::default()
+ },
+ &cancel,
+ )
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig);
diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index a5dcb430de..2985ab1efb 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -704,6 +704,8 @@ async fn timeline_archival_config_handler(
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
+ let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
+
let request_data: TimelineArchivalConfigRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
@@ -714,7 +716,7 @@ async fn timeline_archival_config_handler(
.get_attached_tenant_shard(tenant_shard_id)?;
tenant
- .apply_timeline_archival_config(timeline_id, request_data.state)
+ .apply_timeline_archival_config(timeline_id, request_data.state, ctx)
.await?;
Ok::<_, ApiError>(())
}
diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs
index 29f682c62a..d2818d04dc 100644
--- a/pageserver/src/tenant.rs
+++ b/pageserver/src/tenant.rs
@@ -38,6 +38,7 @@ use std::future::Future;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
+use timeline::offload::offload_timeline;
use tokio::io::BufReader;
use tokio::sync::watch;
use tokio::task::JoinSet;
@@ -287,9 +288,13 @@ pub struct Tenant {
/// During timeline creation, we first insert the TimelineId to the
/// creating map, then `timelines`, then remove it from the creating map.
- /// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating`
+ /// **Lock order**: if acquiring both, acquire`timelines` before `timelines_creating`
timelines_creating: std::sync::Mutex>,
+ /// Possibly offloaded and archived timelines
+ /// **Lock order**: if acquiring both, acquire`timelines` before `timelines_offloaded`
+ timelines_offloaded: Mutex>>,
+
// This mutex prevents creation of new timelines during GC.
// Adding yet another mutex (in addition to `timelines`) is needed because holding
// `timelines` mutex during all GC iteration
@@ -484,6 +489,65 @@ impl WalRedoManager {
}
}
+pub struct OffloadedTimeline {
+ pub tenant_shard_id: TenantShardId,
+ pub timeline_id: TimelineId,
+ pub ancestor_timeline_id: Option,
+
+ // TODO: once we persist offloaded state, make this lazily constructed
+ pub remote_client: Arc,
+
+ /// Prevent two tasks from deleting the timeline at the same time. If held, the
+ /// timeline is being deleted. If 'true', the timeline has already been deleted.
+ pub delete_progress: Arc>,
+}
+
+impl OffloadedTimeline {
+ fn from_timeline(timeline: &Timeline) -> Self {
+ Self {
+ tenant_shard_id: timeline.tenant_shard_id,
+ timeline_id: timeline.timeline_id,
+ ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
+
+ remote_client: timeline.remote_client.clone(),
+ delete_progress: timeline.delete_progress.clone(),
+ }
+ }
+}
+
+#[derive(Clone)]
+pub enum TimelineOrOffloaded {
+ Timeline(Arc),
+ Offloaded(Arc),
+}
+
+impl TimelineOrOffloaded {
+ pub fn tenant_shard_id(&self) -> TenantShardId {
+ match self {
+ TimelineOrOffloaded::Timeline(timeline) => timeline.tenant_shard_id,
+ TimelineOrOffloaded::Offloaded(offloaded) => offloaded.tenant_shard_id,
+ }
+ }
+ pub fn timeline_id(&self) -> TimelineId {
+ match self {
+ TimelineOrOffloaded::Timeline(timeline) => timeline.timeline_id,
+ TimelineOrOffloaded::Offloaded(offloaded) => offloaded.timeline_id,
+ }
+ }
+ pub fn delete_progress(&self) -> &Arc> {
+ match self {
+ TimelineOrOffloaded::Timeline(timeline) => &timeline.delete_progress,
+ TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.delete_progress,
+ }
+ }
+ pub fn remote_client(&self) -> &Arc {
+ match self {
+ TimelineOrOffloaded::Timeline(timeline) => &timeline.remote_client,
+ TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.remote_client,
+ }
+ }
+}
+
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum GetTimelineError {
#[error("Timeline is shutting down")]
@@ -1406,52 +1470,192 @@ impl Tenant {
}
}
- pub(crate) async fn apply_timeline_archival_config(
- &self,
+ fn check_to_be_archived_has_no_unarchived_children(
timeline_id: TimelineId,
- state: TimelineArchivalState,
+ timelines: &std::sync::MutexGuard<'_, HashMap>>,
+ ) -> Result<(), TimelineArchivalError> {
+ let children: Vec = timelines
+ .iter()
+ .filter_map(|(id, entry)| {
+ if entry.get_ancestor_timeline_id() != Some(timeline_id) {
+ return None;
+ }
+ if entry.is_archived() == Some(true) {
+ return None;
+ }
+ Some(*id)
+ })
+ .collect();
+
+ if !children.is_empty() {
+ return Err(TimelineArchivalError::HasUnarchivedChildren(children));
+ }
+ Ok(())
+ }
+
+ fn check_ancestor_of_to_be_unarchived_is_not_archived(
+ ancestor_timeline_id: TimelineId,
+ timelines: &std::sync::MutexGuard<'_, HashMap>>,
+ offloaded_timelines: &std::sync::MutexGuard<
+ '_,
+ HashMap>,
+ >,
+ ) -> Result<(), TimelineArchivalError> {
+ let has_archived_parent =
+ if let Some(ancestor_timeline) = timelines.get(&ancestor_timeline_id) {
+ ancestor_timeline.is_archived() == Some(true)
+ } else if offloaded_timelines.contains_key(&ancestor_timeline_id) {
+ true
+ } else {
+ error!("ancestor timeline {ancestor_timeline_id} not found");
+ if cfg!(debug_assertions) {
+ panic!("ancestor timeline {ancestor_timeline_id} not found");
+ }
+ return Err(TimelineArchivalError::NotFound);
+ };
+ if has_archived_parent {
+ return Err(TimelineArchivalError::HasArchivedParent(
+ ancestor_timeline_id,
+ ));
+ }
+ Ok(())
+ }
+
+ fn check_to_be_unarchived_timeline_has_no_archived_parent(
+ timeline: &Arc,
+ ) -> Result<(), TimelineArchivalError> {
+ if let Some(ancestor_timeline) = timeline.ancestor_timeline() {
+ if ancestor_timeline.is_archived() == Some(true) {
+ return Err(TimelineArchivalError::HasArchivedParent(
+ ancestor_timeline.timeline_id,
+ ));
+ }
+ }
+ Ok(())
+ }
+
+ /// Loads the specified (offloaded) timeline from S3 and attaches it as a loaded timeline
+ async fn unoffload_timeline(
+ self: &Arc,
+ timeline_id: TimelineId,
+ ctx: RequestContext,
+ ) -> Result, TimelineArchivalError> {
+ let cancel = self.cancel.clone();
+ let timeline_preload = self
+ .load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel)
+ .await;
+
+ let index_part = match timeline_preload.index_part {
+ Ok(index_part) => {
+ debug!("remote index part exists for timeline {timeline_id}");
+ index_part
+ }
+ Err(DownloadError::NotFound) => {
+ error!(%timeline_id, "index_part not found on remote");
+ return Err(TimelineArchivalError::NotFound);
+ }
+ Err(e) => {
+ // Some (possibly ephemeral) error happened during index_part download.
+ warn!(%timeline_id, "Failed to load index_part from remote storage, failed creation? ({e})");
+ return Err(TimelineArchivalError::Other(
+ anyhow::Error::new(e).context("downloading index_part from remote storage"),
+ ));
+ }
+ };
+ let index_part = match index_part {
+ MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
+ MaybeDeletedIndexPart::Deleted(_index_part) => {
+ info!("timeline is deleted according to index_part.json");
+ return Err(TimelineArchivalError::NotFound);
+ }
+ };
+ let remote_metadata = index_part.metadata.clone();
+ let timeline_resources = self.build_timeline_resources(timeline_id);
+ self.load_remote_timeline(
+ timeline_id,
+ index_part,
+ remote_metadata,
+ timeline_resources,
+ &ctx,
+ )
+ .await
+ .with_context(|| {
+ format!(
+ "failed to load remote timeline {} for tenant {}",
+ timeline_id, self.tenant_shard_id
+ )
+ })?;
+ let timelines = self.timelines.lock().unwrap();
+ if let Some(timeline) = timelines.get(&timeline_id) {
+ let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap();
+ if offloaded_timelines.remove(&timeline_id).is_none() {
+ warn!("timeline already removed from offloaded timelines");
+ }
+ Ok(Arc::clone(timeline))
+ } else {
+ warn!("timeline not available directly after attach");
+ Err(TimelineArchivalError::Other(anyhow::anyhow!(
+ "timeline not available directly after attach"
+ )))
+ }
+ }
+
+ pub(crate) async fn apply_timeline_archival_config(
+ self: &Arc,
+ timeline_id: TimelineId,
+ new_state: TimelineArchivalState,
+ ctx: RequestContext,
) -> Result<(), TimelineArchivalError> {
info!("setting timeline archival config");
- let timeline = {
+ // First part: figure out what is needed to do, and do validation
+ let timeline_or_unarchive_offloaded = 'outer: {
let timelines = self.timelines.lock().unwrap();
let Some(timeline) = timelines.get(&timeline_id) else {
- return Err(TimelineArchivalError::NotFound);
+ let offloaded_timelines = self.timelines_offloaded.lock().unwrap();
+ let Some(offloaded) = offloaded_timelines.get(&timeline_id) else {
+ return Err(TimelineArchivalError::NotFound);
+ };
+ if new_state == TimelineArchivalState::Archived {
+ // It's offloaded already, so nothing to do
+ return Ok(());
+ }
+ if let Some(ancestor_timeline_id) = offloaded.ancestor_timeline_id {
+ Self::check_ancestor_of_to_be_unarchived_is_not_archived(
+ ancestor_timeline_id,
+ &timelines,
+ &offloaded_timelines,
+ )?;
+ }
+ break 'outer None;
};
- if state == TimelineArchivalState::Unarchived {
- if let Some(ancestor_timeline) = timeline.ancestor_timeline() {
- if ancestor_timeline.is_archived() == Some(true) {
- return Err(TimelineArchivalError::HasArchivedParent(
- ancestor_timeline.timeline_id,
- ));
- }
+ // Do some validation. We release the timelines lock below, so there is potential
+ // for race conditions: these checks are more present to prevent misunderstandings of
+ // the API's capabilities, instead of serving as the sole way to defend their invariants.
+ match new_state {
+ TimelineArchivalState::Unarchived => {
+ Self::check_to_be_unarchived_timeline_has_no_archived_parent(timeline)?
+ }
+ TimelineArchivalState::Archived => {
+ Self::check_to_be_archived_has_no_unarchived_children(timeline_id, &timelines)?
}
}
-
- // Ensure that there are no non-archived child timelines
- let children: Vec = timelines
- .iter()
- .filter_map(|(id, entry)| {
- if entry.get_ancestor_timeline_id() != Some(timeline_id) {
- return None;
- }
- if entry.is_archived() == Some(true) {
- return None;
- }
- Some(*id)
- })
- .collect();
-
- if !children.is_empty() && state == TimelineArchivalState::Archived {
- return Err(TimelineArchivalError::HasUnarchivedChildren(children));
- }
- Arc::clone(timeline)
+ Some(Arc::clone(timeline))
};
+ // Second part: unarchive timeline (if needed)
+ let timeline = if let Some(timeline) = timeline_or_unarchive_offloaded {
+ timeline
+ } else {
+ // Turn offloaded timeline into a non-offloaded one
+ self.unoffload_timeline(timeline_id, ctx).await?
+ };
+
+ // Third part: upload new timeline archival state and block until it is present in S3
let upload_needed = timeline
.remote_client
- .schedule_index_upload_for_timeline_archival_state(state)?;
+ .schedule_index_upload_for_timeline_archival_state(new_state)?;
if upload_needed {
info!("Uploading new state");
@@ -1884,7 +2088,7 @@ impl Tenant {
///
/// Returns whether we have pending compaction task.
async fn compaction_iteration(
- &self,
+ self: &Arc,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result {
@@ -1905,21 +2109,28 @@ impl Tenant {
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
// compaction runs.
- let timelines_to_compact = {
+ let timelines_to_compact_or_offload;
+ {
let timelines = self.timelines.lock().unwrap();
- let timelines_to_compact = timelines
+ timelines_to_compact_or_offload = timelines
.iter()
.filter_map(|(timeline_id, timeline)| {
- if timeline.is_active() {
- Some((*timeline_id, timeline.clone()))
- } else {
+ let (is_active, can_offload) = (timeline.is_active(), timeline.can_offload());
+ let has_no_unoffloaded_children = {
+ !timelines
+ .iter()
+ .any(|(_id, tl)| tl.get_ancestor_timeline_id() == Some(*timeline_id))
+ };
+ let can_offload = can_offload && has_no_unoffloaded_children;
+ if (is_active, can_offload) == (false, false) {
None
+ } else {
+ Some((*timeline_id, timeline.clone(), (is_active, can_offload)))
}
})
.collect::>();
drop(timelines);
- timelines_to_compact
- };
+ }
// Before doing any I/O work, check our circuit breaker
if self.compaction_circuit_breaker.lock().unwrap().is_broken() {
@@ -1929,20 +2140,34 @@ impl Tenant {
let mut has_pending_task = false;
- for (timeline_id, timeline) in &timelines_to_compact {
- has_pending_task |= timeline
- .compact(cancel, EnumSet::empty(), ctx)
- .instrument(info_span!("compact_timeline", %timeline_id))
- .await
- .inspect_err(|e| match e {
- timeline::CompactionError::ShuttingDown => (),
- timeline::CompactionError::Other(e) => {
- self.compaction_circuit_breaker
- .lock()
- .unwrap()
- .fail(&CIRCUIT_BREAKERS_BROKEN, e);
- }
- })?;
+ for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload
+ {
+ let pending_task_left = if *can_compact {
+ Some(
+ timeline
+ .compact(cancel, EnumSet::empty(), ctx)
+ .instrument(info_span!("compact_timeline", %timeline_id))
+ .await
+ .inspect_err(|e| match e {
+ timeline::CompactionError::ShuttingDown => (),
+ timeline::CompactionError::Other(e) => {
+ self.compaction_circuit_breaker
+ .lock()
+ .unwrap()
+ .fail(&CIRCUIT_BREAKERS_BROKEN, e);
+ }
+ })?,
+ )
+ } else {
+ None
+ };
+ has_pending_task |= pending_task_left.unwrap_or(false);
+ if pending_task_left == Some(false) && *can_offload {
+ offload_timeline(self, timeline)
+ .instrument(info_span!("offload_timeline", %timeline_id))
+ .await
+ .map_err(timeline::CompactionError::Other)?;
+ }
}
self.compaction_circuit_breaker
@@ -2852,6 +3077,7 @@ impl Tenant {
constructed_at: Instant::now(),
timelines: Mutex::new(HashMap::new()),
timelines_creating: Mutex::new(HashSet::new()),
+ timelines_offloaded: Mutex::new(HashMap::new()),
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
remote_storage,
diff --git a/pageserver/src/tenant/gc_block.rs b/pageserver/src/tenant/gc_block.rs
index f7a7836a12..373779ddb8 100644
--- a/pageserver/src/tenant/gc_block.rs
+++ b/pageserver/src/tenant/gc_block.rs
@@ -141,14 +141,14 @@ impl GcBlock {
Ok(())
}
- pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
+ pub(crate) fn before_delete(&self, timeline_id: &super::TimelineId) {
let unblocked = {
let mut g = self.reasons.lock().unwrap();
if g.is_empty() {
return;
}
- g.remove(&timeline.timeline_id);
+ g.remove(timeline_id);
BlockingReasons::clean_and_summarize(g).is_none()
};
diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs
index 9f7447a9ac..82c5702686 100644
--- a/pageserver/src/tenant/secondary/downloader.rs
+++ b/pageserver/src/tenant/secondary/downloader.rs
@@ -950,6 +950,7 @@ impl<'a> TenantDownloader<'a> {
let cancel = &self.secondary_state.cancel;
let opts = DownloadOpts {
etag: prev_etag.cloned(),
+ ..Default::default()
};
backoff::retry(
diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs
index 1d79b2b74b..2fd4e699cf 100644
--- a/pageserver/src/tenant/timeline.rs
+++ b/pageserver/src/tenant/timeline.rs
@@ -7,6 +7,7 @@ pub(crate) mod handle;
mod init;
pub mod layer_manager;
pub(crate) mod logical_size;
+pub mod offload;
pub mod span;
pub mod uninit;
mod walreceiver;
@@ -1556,6 +1557,17 @@ impl Timeline {
}
}
+ /// Checks if the internal state of the timeline is consistent with it being able to be offloaded.
+ /// This is neccessary but not sufficient for offloading of the timeline as it might have
+ /// child timelines that are not offloaded yet.
+ pub(crate) fn can_offload(&self) -> bool {
+ if self.remote_client.is_archived() != Some(true) {
+ return false;
+ }
+
+ true
+ }
+
/// Outermost timeline compaction operation; downloads needed layers. Returns whether we have pending
/// compaction tasks.
pub(crate) async fn compact(
@@ -1818,7 +1830,6 @@ impl Timeline {
self.current_state() == TimelineState::Active
}
- #[allow(unused)]
pub(crate) fn is_archived(&self) -> Option {
self.remote_client.is_archived()
}
diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs
index 90db08ea81..305c5758cc 100644
--- a/pageserver/src/tenant/timeline/delete.rs
+++ b/pageserver/src/tenant/timeline/delete.rs
@@ -15,7 +15,7 @@ use crate::{
tenant::{
metadata::TimelineMetadata,
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
- CreateTimelineCause, DeleteTimelineError, Tenant,
+ CreateTimelineCause, DeleteTimelineError, Tenant, TimelineOrOffloaded,
},
};
@@ -24,12 +24,14 @@ use super::{Timeline, TimelineResources};
/// Mark timeline as deleted in S3 so we won't pick it up next time
/// during attach or pageserver restart.
/// See comment in persist_index_part_with_deleted_flag.
-async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
- match timeline
- .remote_client
+async fn set_deleted_in_remote_index(
+ timeline: &TimelineOrOffloaded,
+) -> Result<(), DeleteTimelineError> {
+ let res = timeline
+ .remote_client()
.persist_index_part_with_deleted_flag()
- .await
- {
+ .await;
+ match res {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
@@ -127,9 +129,9 @@ pub(super) async fn delete_local_timeline_directory(
}
/// Removes remote layers and an index file after them.
-async fn delete_remote_layers_and_index(timeline: &Timeline) -> anyhow::Result<()> {
+async fn delete_remote_layers_and_index(timeline: &TimelineOrOffloaded) -> anyhow::Result<()> {
timeline
- .remote_client
+ .remote_client()
.delete_all()
.await
.context("delete_all")
@@ -137,27 +139,41 @@ async fn delete_remote_layers_and_index(timeline: &Timeline) -> anyhow::Result<(
/// It is important that this gets called when DeletionGuard is being held.
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
-async fn remove_timeline_from_tenant(
+async fn remove_maybe_offloaded_timeline_from_tenant(
tenant: &Tenant,
- timeline: &Timeline,
+ timeline: &TimelineOrOffloaded,
_: &DeletionGuard, // using it as a witness
) -> anyhow::Result<()> {
// Remove the timeline from the map.
+ // This observes the locking order between timelines and timelines_offloaded
let mut timelines = tenant.timelines.lock().unwrap();
+ let mut timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
+ let offloaded_children_exist = timelines_offloaded
+ .iter()
+ .any(|(_, entry)| entry.ancestor_timeline_id == Some(timeline.timeline_id()));
let children_exist = timelines
.iter()
- .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline.timeline_id));
- // XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
- // We already deleted the layer files, so it's probably best to panic.
- // (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
- if children_exist {
+ .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline.timeline_id()));
+ // XXX this can happen because of race conditions with branch creation.
+ // We already deleted the remote layer files, so it's probably best to panic.
+ if children_exist || offloaded_children_exist {
panic!("Timeline grew children while we removed layer files");
}
- timelines
- .remove(&timeline.timeline_id)
- .expect("timeline that we were deleting was concurrently removed from 'timelines' map");
+ match timeline {
+ TimelineOrOffloaded::Timeline(timeline) => {
+ timelines.remove(&timeline.timeline_id).expect(
+ "timeline that we were deleting was concurrently removed from 'timelines' map",
+ );
+ }
+ TimelineOrOffloaded::Offloaded(timeline) => {
+ timelines_offloaded
+ .remove(&timeline.timeline_id)
+ .expect("timeline that we were deleting was concurrently removed from 'timelines_offloaded' map");
+ }
+ }
+ drop(timelines_offloaded);
drop(timelines);
Ok(())
@@ -207,9 +223,11 @@ impl DeleteTimelineFlow {
guard.mark_in_progress()?;
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
- timeline.shutdown(super::ShutdownMode::Hard).await;
+ if let TimelineOrOffloaded::Timeline(timeline) = &timeline {
+ timeline.shutdown(super::ShutdownMode::Hard).await;
+ }
- tenant.gc_block.before_delete(&timeline);
+ tenant.gc_block.before_delete(&timeline.timeline_id());
fail::fail_point!("timeline-delete-before-index-deleted-at", |_| {
Err(anyhow::anyhow!(
@@ -285,15 +303,16 @@ impl DeleteTimelineFlow {
guard.mark_in_progress()?;
+ let timeline = TimelineOrOffloaded::Timeline(timeline);
Self::schedule_background(guard, tenant.conf, tenant, timeline);
Ok(())
}
- fn prepare(
+ pub(super) fn prepare(
tenant: &Tenant,
timeline_id: TimelineId,
- ) -> Result<(Arc, DeletionGuard), DeleteTimelineError> {
+ ) -> Result<(TimelineOrOffloaded, DeletionGuard), DeleteTimelineError> {
// Note the interaction between this guard and deletion guard.
// Here we attempt to lock deletion guard when we're holding a lock on timelines.
// This is important because when you take into account `remove_timeline_from_tenant`
@@ -307,8 +326,14 @@ impl DeleteTimelineFlow {
let timelines = tenant.timelines.lock().unwrap();
let timeline = match timelines.get(&timeline_id) {
- Some(t) => t,
- None => return Err(DeleteTimelineError::NotFound),
+ Some(t) => TimelineOrOffloaded::Timeline(Arc::clone(t)),
+ None => {
+ let offloaded_timelines = tenant.timelines_offloaded.lock().unwrap();
+ match offloaded_timelines.get(&timeline_id) {
+ Some(t) => TimelineOrOffloaded::Offloaded(Arc::clone(t)),
+ None => return Err(DeleteTimelineError::NotFound),
+ }
+ }
};
// Ensure that there are no child timelines **attached to that pageserver**,
@@ -334,30 +359,32 @@ impl DeleteTimelineFlow {
// to remove the timeline from it.
// Always if you have two locks that are taken in different order this can result in a deadlock.
- let delete_progress = Arc::clone(&timeline.delete_progress);
+ let delete_progress = Arc::clone(timeline.delete_progress());
let delete_lock_guard = match delete_progress.try_lock_owned() {
Ok(guard) => DeletionGuard(guard),
Err(_) => {
// Unfortunately if lock fails arc is consumed.
return Err(DeleteTimelineError::AlreadyInProgress(Arc::clone(
- &timeline.delete_progress,
+ timeline.delete_progress(),
)));
}
};
- timeline.set_state(TimelineState::Stopping);
+ if let TimelineOrOffloaded::Timeline(timeline) = &timeline {
+ timeline.set_state(TimelineState::Stopping);
+ }
- Ok((Arc::clone(timeline), delete_lock_guard))
+ Ok((timeline, delete_lock_guard))
}
fn schedule_background(
guard: DeletionGuard,
conf: &'static PageServerConf,
tenant: Arc,
- timeline: Arc,
+ timeline: TimelineOrOffloaded,
) {
- let tenant_shard_id = timeline.tenant_shard_id;
- let timeline_id = timeline.timeline_id;
+ let tenant_shard_id = timeline.tenant_shard_id();
+ let timeline_id = timeline.timeline_id();
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
@@ -368,7 +395,9 @@ impl DeleteTimelineFlow {
async move {
if let Err(err) = Self::background(guard, conf, &tenant, &timeline).await {
error!("Error: {err:#}");
- timeline.set_broken(format!("{err:#}"))
+ if let TimelineOrOffloaded::Timeline(timeline) = timeline {
+ timeline.set_broken(format!("{err:#}"))
+ }
};
Ok(())
}
@@ -380,15 +409,19 @@ impl DeleteTimelineFlow {
mut guard: DeletionGuard,
conf: &PageServerConf,
tenant: &Tenant,
- timeline: &Timeline,
+ timeline: &TimelineOrOffloaded,
) -> Result<(), DeleteTimelineError> {
- delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await?;
+ // Offloaded timelines have no local state
+ // TODO: once we persist offloaded information, delete the timeline from there, too
+ if let TimelineOrOffloaded::Timeline(timeline) = timeline {
+ delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await?;
+ }
delete_remote_layers_and_index(timeline).await?;
pausable_failpoint!("in_progress_delete");
- remove_timeline_from_tenant(tenant, timeline, &guard).await?;
+ remove_maybe_offloaded_timeline_from_tenant(tenant, timeline, &guard).await?;
*guard = Self::Finished;
@@ -400,7 +433,7 @@ impl DeleteTimelineFlow {
}
}
-struct DeletionGuard(OwnedMutexGuard);
+pub(super) struct DeletionGuard(OwnedMutexGuard);
impl Deref for DeletionGuard {
type Target = DeleteTimelineFlow;
diff --git a/pageserver/src/tenant/timeline/offload.rs b/pageserver/src/tenant/timeline/offload.rs
new file mode 100644
index 0000000000..fb906d906b
--- /dev/null
+++ b/pageserver/src/tenant/timeline/offload.rs
@@ -0,0 +1,69 @@
+use std::sync::Arc;
+
+use crate::tenant::{OffloadedTimeline, Tenant, TimelineOrOffloaded};
+
+use super::{
+ delete::{delete_local_timeline_directory, DeleteTimelineFlow, DeletionGuard},
+ Timeline,
+};
+
+pub(crate) async fn offload_timeline(
+ tenant: &Tenant,
+ timeline: &Arc,
+) -> anyhow::Result<()> {
+ tracing::info!("offloading archived timeline");
+ let (timeline, guard) = DeleteTimelineFlow::prepare(tenant, timeline.timeline_id)?;
+
+ let TimelineOrOffloaded::Timeline(timeline) = timeline else {
+ tracing::error!("timeline already offloaded, but given timeline object");
+ return Ok(());
+ };
+
+ // TODO extend guard mechanism above with method
+ // to make deletions possible while offloading is in progress
+
+ // TODO mark timeline as offloaded in S3
+
+ let conf = &tenant.conf;
+ delete_local_timeline_directory(conf, tenant.tenant_shard_id, &timeline).await?;
+
+ remove_timeline_from_tenant(tenant, &timeline, &guard).await?;
+
+ {
+ let mut offloaded_timelines = tenant.timelines_offloaded.lock().unwrap();
+ offloaded_timelines.insert(
+ timeline.timeline_id,
+ Arc::new(OffloadedTimeline::from_timeline(&timeline)),
+ );
+ }
+
+ Ok(())
+}
+
+/// It is important that this gets called when DeletionGuard is being held.
+/// For more context see comments in [`DeleteTimelineFlow::prepare`]
+async fn remove_timeline_from_tenant(
+ tenant: &Tenant,
+ timeline: &Timeline,
+ _: &DeletionGuard, // using it as a witness
+) -> anyhow::Result<()> {
+ // Remove the timeline from the map.
+ let mut timelines = tenant.timelines.lock().unwrap();
+ let children_exist = timelines
+ .iter()
+ .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline.timeline_id));
+ // XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
+ // We already deleted the layer files, so it's probably best to panic.
+ // (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
+ if children_exist {
+ panic!("Timeline grew children while we removed layer files");
+ }
+
+ timelines
+ .remove(&timeline.timeline_id)
+ .expect("timeline that we were deleting was concurrently removed from 'timelines' map");
+
+ drop(timelines);
+
+ Ok(())
+}
diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs
index 11e3ae9c3f..3dd4ea4b96 100644
--- a/pageserver/src/virtual_file.rs
+++ b/pageserver/src/virtual_file.rs
@@ -1366,6 +1366,7 @@ pub(crate) type IoBufferMut = aligned_buffer::AlignedBufferMut<{ get_io_buffer_a
pub(crate) type IoPageSlice<'a> =
aligned_buffer::AlignedSlice<'a, { get_io_buffer_alignment() }, PAGE_SZ>;
+
static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8);
pub(crate) fn set_io_mode(mode: IoMode) {
diff --git a/poetry.lock b/poetry.lock
index 07f30d10e7..00fe2505c9 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -2095,6 +2095,7 @@ files = [
{file = "psycopg2_binary-2.9.9-cp311-cp311-win32.whl", hash = "sha256:dc4926288b2a3e9fd7b50dc6a1909a13bbdadfc67d93f3374d984e56f885579d"},
{file = "psycopg2_binary-2.9.9-cp311-cp311-win_amd64.whl", hash = "sha256:b76bedd166805480ab069612119ea636f5ab8f8771e640ae103e05a4aae3e417"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:8532fd6e6e2dc57bcb3bc90b079c60de896d2128c5d9d6f24a63875a95a088cf"},
+ {file = "psycopg2_binary-2.9.9-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b0605eaed3eb239e87df0d5e3c6489daae3f7388d455d0c0b4df899519c6a38d"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8f8544b092a29a6ddd72f3556a9fcf249ec412e10ad28be6a0c0d948924f2212"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2d423c8d8a3c82d08fe8af900ad5b613ce3632a1249fd6a223941d0735fce493"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2e5afae772c00980525f6d6ecf7cbca55676296b580c0e6abb407f15f3706996"},
@@ -2103,6 +2104,8 @@ files = [
{file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:cb16c65dcb648d0a43a2521f2f0a2300f40639f6f8c1ecbc662141e4e3e1ee07"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:911dda9c487075abd54e644ccdf5e5c16773470a6a5d3826fda76699410066fb"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:57fede879f08d23c85140a360c6a77709113efd1c993923c59fde17aa27599fe"},
+ {file = "psycopg2_binary-2.9.9-cp312-cp312-win32.whl", hash = "sha256:64cf30263844fa208851ebb13b0732ce674d8ec6a0c86a4e160495d299ba3c93"},
+ {file = "psycopg2_binary-2.9.9-cp312-cp312-win_amd64.whl", hash = "sha256:81ff62668af011f9a48787564ab7eded4e9fb17a4a6a74af5ffa6a457400d2ab"},
{file = "psycopg2_binary-2.9.9-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:2293b001e319ab0d869d660a704942c9e2cce19745262a8aba2115ef41a0a42a"},
{file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:03ef7df18daf2c4c07e2695e8cfd5ee7f748a1d54d802330985a78d2a5a6dca9"},
{file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0a602ea5aff39bb9fac6308e9c9d82b9a35c2bf288e184a816002c9fae930b77"},
@@ -2584,6 +2587,7 @@ files = [
{file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"},
+ {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"},
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"},
{file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"},
{file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"},
@@ -2729,21 +2733,22 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "responses"
-version = "0.21.0"
+version = "0.25.3"
description = "A utility library for mocking out the `requests` Python library."
optional = false
-python-versions = ">=3.7"
+python-versions = ">=3.8"
files = [
- {file = "responses-0.21.0-py3-none-any.whl", hash = "sha256:2dcc863ba63963c0c3d9ee3fa9507cbe36b7d7b0fccb4f0bdfd9e96c539b1487"},
- {file = "responses-0.21.0.tar.gz", hash = "sha256:b82502eb5f09a0289d8e209e7bad71ef3978334f56d09b444253d5ad67bf5253"},
+ {file = "responses-0.25.3-py3-none-any.whl", hash = "sha256:521efcbc82081ab8daa588e08f7e8a64ce79b91c39f6e62199b19159bea7dbcb"},
+ {file = "responses-0.25.3.tar.gz", hash = "sha256:617b9247abd9ae28313d57a75880422d55ec63c29d33d629697590a034358dba"},
]
[package.dependencies]
-requests = ">=2.0,<3.0"
-urllib3 = ">=1.25.10"
+pyyaml = "*"
+requests = ">=2.30.0,<3.0"
+urllib3 = ">=1.25.10,<3.0"
[package.extras]
-tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-localserver", "types-mock", "types-requests"]
+tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "tomli", "tomli-w", "types-PyYAML", "types-requests"]
[[package]]
name = "rfc3339-validator"
@@ -3137,6 +3142,16 @@ files = [
{file = "wrapt-1.14.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8ad85f7f4e20964db4daadcab70b47ab05c7c1cf2a7c1e51087bfaa83831854c"},
{file = "wrapt-1.14.1-cp310-cp310-win32.whl", hash = "sha256:a9a52172be0b5aae932bef82a79ec0a0ce87288c7d132946d645eba03f0ad8a8"},
{file = "wrapt-1.14.1-cp310-cp310-win_amd64.whl", hash = "sha256:6d323e1554b3d22cfc03cd3243b5bb815a51f5249fdcbb86fda4bf62bab9e164"},
+ {file = "wrapt-1.14.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ecee4132c6cd2ce5308e21672015ddfed1ff975ad0ac8d27168ea82e71413f55"},
+ {file = "wrapt-1.14.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2020f391008ef874c6d9e208b24f28e31bcb85ccff4f335f15a3251d222b92d9"},
+ {file = "wrapt-1.14.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2feecf86e1f7a86517cab34ae6c2f081fd2d0dac860cb0c0ded96d799d20b335"},
+ {file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:240b1686f38ae665d1b15475966fe0472f78e71b1b4903c143a842659c8e4cb9"},
+ {file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a9008dad07d71f68487c91e96579c8567c98ca4c3881b9b113bc7b33e9fd78b8"},
+ {file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:6447e9f3ba72f8e2b985a1da758767698efa72723d5b59accefd716e9e8272bf"},
+ {file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:acae32e13a4153809db37405f5eba5bac5fbe2e2ba61ab227926a22901051c0a"},
+ {file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:49ef582b7a1152ae2766557f0550a9fcbf7bbd76f43fbdc94dd3bf07cc7168be"},
+ {file = "wrapt-1.14.1-cp311-cp311-win32.whl", hash = "sha256:358fe87cc899c6bb0ddc185bf3dbfa4ba646f05b1b0b9b5a27c2cb92c2cea204"},
+ {file = "wrapt-1.14.1-cp311-cp311-win_amd64.whl", hash = "sha256:26046cd03936ae745a502abf44dac702a5e6880b2b01c29aea8ddf3353b68224"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:43ca3bbbe97af00f49efb06e352eae40434ca9d915906f77def219b88e85d907"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:6b1a564e6cb69922c7fe3a678b9f9a3c54e72b469875aa8018f18b4d1dd1adf3"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux2010_i686.whl", hash = "sha256:00b6d4ea20a906c0ca56d84f93065b398ab74b927a7a3dbd470f6fc503f95dc3"},
diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml
index ae9b2531aa..c995ac3660 100644
--- a/proxy/Cargo.toml
+++ b/proxy/Cargo.toml
@@ -39,7 +39,7 @@ http.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
hyper0.workspace = true
-hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
+hyper = { workspace = true, features = ["server", "http1", "http2"] }
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
http-body-util = { version = "0.1" }
indexmap.workspace = true
diff --git a/proxy/src/auth/backend/jwt.rs b/proxy/src/auth/backend/jwt.rs
index b62a11ccb2..0c66fe5381 100644
--- a/proxy/src/auth/backend/jwt.rs
+++ b/proxy/src/auth/backend/jwt.rs
@@ -571,7 +571,7 @@ mod tests {
use bytes::Bytes;
use http::Response;
use http_body_util::Full;
- use hyper1::service::service_fn;
+ use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use rand::rngs::OsRng;
use rsa::pkcs8::DecodePrivateKey;
@@ -736,7 +736,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
});
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
- let server = hyper1::server::conn::http1::Builder::new();
+ let server = hyper::server::conn::http1::Builder::new();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {
diff --git a/proxy/src/http/health_server.rs b/proxy/src/http/health_server.rs
index cae9eb5b97..d0352351d5 100644
--- a/proxy/src/http/health_server.rs
+++ b/proxy/src/http/health_server.rs
@@ -1,5 +1,5 @@
use anyhow::{anyhow, bail};
-use hyper::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
+use hyper0::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
use measured::{text::BufferedTextEncoder, MetricGroup};
use metrics::NeonMetrics;
use std::{
@@ -21,7 +21,7 @@ async fn status_handler(_: Request) -> Result, ApiError> {
json_response(StatusCode::OK, "")
}
-fn make_router(metrics: AppMetrics) -> RouterBuilder {
+fn make_router(metrics: AppMetrics) -> RouterBuilder {
let state = Arc::new(Mutex::new(PrometheusHandler {
encoder: BufferedTextEncoder::new(),
metrics,
@@ -45,7 +45,7 @@ pub async fn task_main(
let service = || RouterService::new(make_router(metrics).build()?);
- hyper::Server::from_tcp(http_listener)?
+ hyper0::Server::from_tcp(http_listener)?
.serve(service().map_err(|e| anyhow!(e))?)
.await?;
diff --git a/proxy/src/http/mod.rs b/proxy/src/http/mod.rs
index 14720b5c6b..d8676d5b50 100644
--- a/proxy/src/http/mod.rs
+++ b/proxy/src/http/mod.rs
@@ -9,7 +9,7 @@ use std::time::Duration;
use anyhow::bail;
use bytes::Bytes;
use http_body_util::BodyExt;
-use hyper1::body::Body;
+use hyper::body::Body;
use serde::de::DeserializeOwned;
pub(crate) use reqwest::{Request, Response};
diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs
index 79f9760461..8d274baa10 100644
--- a/proxy/src/lib.rs
+++ b/proxy/src/lib.rs
@@ -90,8 +90,6 @@ use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use tracing::warn;
-extern crate hyper0 as hyper;
-
pub mod auth;
pub mod cache;
pub mod cancellation;
diff --git a/proxy/src/proxy/wake_compute.rs b/proxy/src/proxy/wake_compute.rs
index 4dfee0656d..ba674f5d0d 100644
--- a/proxy/src/proxy/wake_compute.rs
+++ b/proxy/src/proxy/wake_compute.rs
@@ -7,7 +7,7 @@ use crate::metrics::{
WakeupFailureKind,
};
use crate::proxy::retry::{retry_after, should_retry};
-use hyper1::StatusCode;
+use hyper::StatusCode;
use tracing::{error, info, warn};
use super::connect_compute::ComputeConnectBackend;
diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs
index 4e758e6eda..764b97fb7b 100644
--- a/proxy/src/serverless/backend.rs
+++ b/proxy/src/serverless/backend.rs
@@ -257,7 +257,7 @@ pub(crate) enum LocalProxyConnError {
#[error("error with connection to local-proxy")]
Io(#[source] std::io::Error),
#[error("could not establish h2 connection")]
- H2(#[from] hyper1::Error),
+ H2(#[from] hyper::Error),
}
impl ReportableError for HttpConnError {
@@ -481,7 +481,7 @@ async fn connect_http2(
};
};
- let (client, connection) = hyper1::client::conn::http2::Builder::new(TokioExecutor::new())
+ let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
.timer(TokioTimer::new())
.keep_alive_interval(Duration::from_secs(20))
.keep_alive_while_idle(true)
diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs
index 4e6f8cf55c..6d61536f1a 100644
--- a/proxy/src/serverless/http_conn_pool.rs
+++ b/proxy/src/serverless/http_conn_pool.rs
@@ -1,5 +1,5 @@
use dashmap::DashMap;
-use hyper1::client::conn::http2;
+use hyper::client::conn::http2;
use hyper_util::rt::{TokioExecutor, TokioIo};
use parking_lot::RwLock;
use rand::Rng;
@@ -18,9 +18,9 @@ use tracing::{info, info_span, Instrument};
use super::conn_pool::ConnInfo;
-pub(crate) type Send = http2::SendRequest;
+pub(crate) type Send = http2::SendRequest;
pub(crate) type Connect =
- http2::Connection, hyper1::body::Incoming, TokioExecutor>;
+ http2::Connection, hyper::body::Incoming, TokioExecutor>;
#[derive(Clone)]
struct ConnPoolEntry {
diff --git a/proxy/src/serverless/http_util.rs b/proxy/src/serverless/http_util.rs
index d766a46577..87a72ec5f0 100644
--- a/proxy/src/serverless/http_util.rs
+++ b/proxy/src/serverless/http_util.rs
@@ -11,7 +11,7 @@ use serde::Serialize;
use utils::http::error::ApiError;
/// Like [`ApiError::into_response`]
-pub(crate) fn api_error_into_response(this: ApiError) -> Response> {
+pub(crate) fn api_error_into_response(this: ApiError) -> Response> {
match this {
ApiError::BadRequest(err) => HttpErrorBody::response_from_msg_and_status(
format!("{err:#?}"), // use debug printing so that we give the cause
@@ -67,12 +67,12 @@ impl HttpErrorBody {
fn response_from_msg_and_status(
msg: String,
status: StatusCode,
- ) -> Response> {
+ ) -> Response> {
HttpErrorBody { msg }.to_response(status)
}
/// Same as [`utils::http::error::HttpErrorBody::to_response`]
- fn to_response(&self, status: StatusCode) -> Response> {
+ fn to_response(&self, status: StatusCode) -> Response> {
Response::builder()
.status(status)
.header(http::header::CONTENT_TYPE, "application/json")
@@ -90,7 +90,7 @@ impl HttpErrorBody {
pub(crate) fn json_response(
status: StatusCode,
data: T,
-) -> Result>, ApiError> {
+) -> Result>, ApiError> {
let json = serde_json::to_string(&data)
.context("Failed to serialize JSON response")
.map_err(ApiError::InternalServerError)?;
diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs
index a7e3fa709b..5987776c28 100644
--- a/proxy/src/serverless/mod.rs
+++ b/proxy/src/serverless/mod.rs
@@ -22,7 +22,7 @@ use futures::TryFutureExt;
use http::{Method, Response, StatusCode};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Empty};
-use hyper1::body::Incoming;
+use hyper::body::Incoming;
use hyper_util::rt::TokioExecutor;
use hyper_util::server::conn::auto::Builder;
use rand::rngs::StdRng;
@@ -302,7 +302,7 @@ async fn connection_handler(
let server = Builder::new(TokioExecutor::new());
let conn = server.serve_connection_with_upgrades(
hyper_util::rt::TokioIo::new(conn),
- hyper1::service::service_fn(move |req: hyper1::Request| {
+ hyper::service::service_fn(move |req: hyper::Request| {
// First HTTP request shares the same session ID
let session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
@@ -355,7 +355,7 @@ async fn connection_handler(
#[allow(clippy::too_many_arguments)]
async fn request_handler(
- mut request: hyper1::Request,
+ mut request: hyper::Request,
config: &'static ProxyConfig,
backend: Arc,
ws_connections: TaskTracker,
@@ -365,7 +365,7 @@ async fn request_handler(
// used to cancel in-flight HTTP requests. not used to cancel websockets
http_cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc,
-) -> Result>, ApiError> {
+) -> Result>, ApiError> {
let host = request
.headers()
.get("host")
diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs
index f3a7ed9329..34c19157e6 100644
--- a/proxy/src/serverless/sql_over_http.rs
+++ b/proxy/src/serverless/sql_over_http.rs
@@ -12,14 +12,14 @@ use http::Method;
use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use http_body_util::Full;
-use hyper1::body::Body;
-use hyper1::body::Incoming;
-use hyper1::header;
-use hyper1::http::HeaderName;
-use hyper1::http::HeaderValue;
-use hyper1::Response;
-use hyper1::StatusCode;
-use hyper1::{HeaderMap, Request};
+use hyper::body::Body;
+use hyper::body::Incoming;
+use hyper::header;
+use hyper::http::HeaderName;
+use hyper::http::HeaderValue;
+use hyper::Response;
+use hyper::StatusCode;
+use hyper::{HeaderMap, Request};
use pq_proto::StartupMessageParamsBuilder;
use serde::Serialize;
use serde_json::Value;
@@ -272,7 +272,7 @@ pub(crate) async fn handle(
request: Request,
backend: Arc,
cancel: CancellationToken,
-) -> Result>, ApiError> {
+) -> Result>, ApiError> {
let result = handle_inner(cancel, config, &ctx, request, backend).await;
let mut response = match result {
@@ -435,7 +435,7 @@ impl UserFacingError for SqlOverHttpError {
#[derive(Debug, thiserror::Error)]
pub(crate) enum ReadPayloadError {
#[error("could not read the HTTP request body: {0}")]
- Read(#[from] hyper1::Error),
+ Read(#[from] hyper::Error),
#[error("could not parse the HTTP request body: {0}")]
Parse(#[from] serde_json::Error),
}
@@ -476,7 +476,7 @@ struct HttpHeaders {
}
impl HttpHeaders {
- fn try_parse(headers: &hyper1::http::HeaderMap) -> Result {
+ fn try_parse(headers: &hyper::http::HeaderMap) -> Result {
// Determine the output options. Default behaviour is 'false'. Anything that is not
// strictly 'true' assumed to be false.
let raw_output = headers.get(&RAW_TEXT_OUTPUT) == Some(&HEADER_VALUE_TRUE);
@@ -529,7 +529,7 @@ async fn handle_inner(
ctx: &RequestMonitoring,
request: Request,
backend: Arc,
-) -> Result>, SqlOverHttpError> {
+) -> Result>, SqlOverHttpError> {
let _requeset_gauge = Metrics::get()
.proxy
.connection_requests
@@ -577,7 +577,7 @@ async fn handle_db_inner(
conn_info: ConnInfo,
auth: AuthData,
backend: Arc,
-) -> Result>, SqlOverHttpError> {
+) -> Result>, SqlOverHttpError> {
//
// Determine the destination and connection params
//
@@ -744,7 +744,7 @@ async fn handle_auth_broker_inner(
conn_info: ConnInfo,
jwt: String,
backend: Arc,
-) -> Result>, SqlOverHttpError> {
+) -> Result>, SqlOverHttpError> {
backend
.authenticate_with_jwt(
ctx,
diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs
index 3d257223b8..08d5da9bef 100644
--- a/proxy/src/serverless/websocket.rs
+++ b/proxy/src/serverless/websocket.rs
@@ -12,7 +12,7 @@ use anyhow::Context as _;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use framed_websockets::{Frame, OpCode, WebSocketServer};
use futures::{Sink, Stream};
-use hyper1::upgrade::OnUpgrade;
+use hyper::upgrade::OnUpgrade;
use hyper_util::rt::TokioIo;
use pin_project_lite::pin_project;
diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs
index fd8599bcb3..bd3e62bc12 100644
--- a/proxy/src/usage_metrics.rs
+++ b/proxy/src/usage_metrics.rs
@@ -485,49 +485,51 @@ async fn upload_events_chunk(
#[cfg(test)]
mod tests {
- use std::{
- net::TcpListener,
- sync::{Arc, Mutex},
- };
+ use super::*;
+ use crate::{http, BranchId, EndpointId};
use anyhow::Error;
use chrono::Utc;
use consumption_metrics::{Event, EventChunk};
- use hyper::{
- service::{make_service_fn, service_fn},
- Body, Response,
- };
+ use http_body_util::BodyExt;
+ use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request, Response};
+ use hyper_util::rt::TokioIo;
+ use std::sync::{Arc, Mutex};
+ use tokio::net::TcpListener;
use url::Url;
- use super::*;
- use crate::{http, BranchId, EndpointId};
-
#[tokio::test]
async fn metrics() {
- let listener = TcpListener::bind("0.0.0.0:0").unwrap();
+ type Report = EventChunk<'static, Event>;
+ let reports: Arc>> = Arc::default();
- let reports = Arc::new(Mutex::new(vec![]));
- let reports2 = reports.clone();
-
- let server = hyper::server::Server::from_tcp(listener)
- .unwrap()
- .serve(make_service_fn(move |_| {
- let reports = reports.clone();
- async move {
- Ok::<_, Error>(service_fn(move |req| {
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+ tokio::spawn({
+ let reports = reports.clone();
+ async move {
+ loop {
+ if let Ok((stream, _addr)) = listener.accept().await {
let reports = reports.clone();
- async move {
- let bytes = hyper::body::to_bytes(req.into_body()).await?;
- let events: EventChunk<'static, Event> =
- serde_json::from_slice(&bytes)?;
- reports.lock().unwrap().push(events);
- Ok::<_, Error>(Response::new(Body::from(vec![])))
- }
- }))
+ http1::Builder::new()
+ .serve_connection(
+ TokioIo::new(stream),
+ service_fn(move |req: Request| {
+ let reports = reports.clone();
+ async move {
+ let bytes = req.into_body().collect().await?.to_bytes();
+ let events = serde_json::from_slice(&bytes)?;
+ reports.lock().unwrap().push(events);
+ Ok::<_, Error>(Response::new(String::new()))
+ }
+ }),
+ )
+ .await
+ .unwrap();
+ }
}
- }));
- let addr = server.local_addr();
- tokio::spawn(server);
+ }
+ });
let metrics = Metrics::default();
let client = http::new_client();
@@ -536,7 +538,7 @@ mod tests {
// no counters have been registered
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
- let r = std::mem::take(&mut *reports2.lock().unwrap());
+ let r = std::mem::take(&mut *reports.lock().unwrap());
assert!(r.is_empty());
// register a new counter
@@ -548,7 +550,7 @@ mod tests {
// the counter should be observed despite 0 egress
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
- let r = std::mem::take(&mut *reports2.lock().unwrap());
+ let r = std::mem::take(&mut *reports.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events[0].value, 0);
@@ -558,7 +560,7 @@ mod tests {
// egress should be observered
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
- let r = std::mem::take(&mut *reports2.lock().unwrap());
+ let r = std::mem::take(&mut *reports.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events[0].value, 1);
@@ -568,7 +570,7 @@ mod tests {
// we do not observe the counter
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
- let r = std::mem::take(&mut *reports2.lock().unwrap());
+ let r = std::mem::take(&mut *reports.lock().unwrap());
assert!(r.is_empty());
// counter is unregistered
diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs
index aa2bafbe92..e8fdddcdc1 100644
--- a/safekeeper/src/metrics.rs
+++ b/safekeeper/src/metrics.rs
@@ -12,8 +12,8 @@ use metrics::{
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
proto::MetricFamily,
register_histogram_vec, register_int_counter, register_int_counter_pair,
- register_int_counter_pair_vec, register_int_counter_vec, Gauge, HistogramVec, IntCounter,
- IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
+ register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge,
+ HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
};
use once_cell::sync::Lazy;
@@ -231,6 +231,14 @@ pub(crate) static EVICTION_EVENTS_COMPLETED: Lazy = Lazy::new(||
.expect("Failed to register metric")
});
+pub static NUM_EVICTED_TIMELINES: Lazy = Lazy::new(|| {
+ register_int_gauge!(
+ "safekeeper_evicted_timelines",
+ "Number of currently evicted timelines"
+ )
+ .expect("Failed to register metric")
+});
+
pub const LABEL_UNKNOWN: &str = "unknown";
/// Labels for traffic metrics.
diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs
index fb98534768..3494b0b764 100644
--- a/safekeeper/src/timeline.rs
+++ b/safekeeper/src/timeline.rs
@@ -631,13 +631,19 @@ impl Timeline {
return Err(e);
}
- self.bootstrap(conf, broker_active_set, partial_backup_rate_limiter);
+ self.bootstrap(
+ shared_state,
+ conf,
+ broker_active_set,
+ partial_backup_rate_limiter,
+ );
Ok(())
}
/// Bootstrap new or existing timeline starting background tasks.
pub fn bootstrap(
self: &Arc,
+ _shared_state: &mut WriteGuardSharedState<'_>,
conf: &SafeKeeperConf,
broker_active_set: Arc,
partial_backup_rate_limiter: RateLimiter,
diff --git a/safekeeper/src/timeline_eviction.rs b/safekeeper/src/timeline_eviction.rs
index 5aa4921a92..fae6571277 100644
--- a/safekeeper/src/timeline_eviction.rs
+++ b/safekeeper/src/timeline_eviction.rs
@@ -15,7 +15,9 @@ use tracing::{debug, info, instrument, warn};
use utils::crashsafe::durable_rename;
use crate::{
- metrics::{EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED},
+ metrics::{
+ EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED, NUM_EVICTED_TIMELINES,
+ },
rate_limit::rand_duration,
timeline_manager::{Manager, StateSnapshot},
wal_backup,
@@ -93,6 +95,7 @@ impl Manager {
}
info!("successfully evicted timeline");
+ NUM_EVICTED_TIMELINES.inc();
}
/// Attempt to restore evicted timeline from remote storage; it must be
@@ -128,6 +131,7 @@ impl Manager {
tokio::time::Instant::now() + rand_duration(&self.conf.eviction_min_resident);
info!("successfully restored evicted timeline");
+ NUM_EVICTED_TIMELINES.dec();
}
}
diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs
index f5535c0cea..2129e86baa 100644
--- a/safekeeper/src/timeline_manager.rs
+++ b/safekeeper/src/timeline_manager.rs
@@ -25,7 +25,10 @@ use utils::lsn::Lsn;
use crate::{
control_file::{FileStorage, Storage},
- metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS},
+ metrics::{
+ MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS,
+ NUM_EVICTED_TIMELINES,
+ },
rate_limit::{rand_duration, RateLimiter},
recovery::recovery_main,
remove_wal::calc_horizon_lsn,
@@ -251,6 +254,11 @@ pub async fn main_task(
mgr.recovery_task = Some(tokio::spawn(recovery_main(tli, mgr.conf.clone())));
}
+ // If timeline is evicted, reflect that in the metric.
+ if mgr.is_offloaded {
+ NUM_EVICTED_TIMELINES.inc();
+ }
+
let last_state = 'outer: loop {
MANAGER_ITERATIONS_TOTAL.inc();
@@ -367,6 +375,11 @@ pub async fn main_task(
mgr.update_wal_removal_end(res);
}
+ // If timeline is deleted while evicted decrement the gauge.
+ if mgr.tli.is_cancelled() && mgr.is_offloaded {
+ NUM_EVICTED_TIMELINES.dec();
+ }
+
mgr.set_status(Status::Finished);
}
diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs
index 6662e18817..866cde3339 100644
--- a/safekeeper/src/timelines_global_map.rs
+++ b/safekeeper/src/timelines_global_map.rs
@@ -165,12 +165,14 @@ impl GlobalTimelines {
match Timeline::load_timeline(&conf, ttid) {
Ok(timeline) => {
let tli = Arc::new(timeline);
+ let mut shared_state = tli.write_shared_state().await;
TIMELINES_STATE
.lock()
.unwrap()
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(
+ &mut shared_state,
&conf,
broker_active_set.clone(),
partial_backup_rate_limiter.clone(),
@@ -213,6 +215,7 @@ impl GlobalTimelines {
match Timeline::load_timeline(&conf, ttid) {
Ok(timeline) => {
let tli = Arc::new(timeline);
+ let mut shared_state = tli.write_shared_state().await;
// TODO: prevent concurrent timeline creation/loading
{
@@ -227,8 +230,13 @@ impl GlobalTimelines {
state.timelines.insert(ttid, tli.clone());
}
- tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter);
-
+ tli.bootstrap(
+ &mut shared_state,
+ &conf,
+ broker_active_set,
+ partial_backup_rate_limiter,
+ );
+ drop(shared_state);
Ok(tli)
}
// If we can't load a timeline, it's bad. Caller will figure it out.
diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs
index ef26ac99c5..6c87e5a926 100644
--- a/safekeeper/src/wal_backup.rs
+++ b/safekeeper/src/wal_backup.rs
@@ -17,7 +17,9 @@ use std::time::Duration;
use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
use postgres_ffi::XLogFileName;
use postgres_ffi::{XLogSegNo, PG_TLI};
-use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata};
+use remote_storage::{
+ DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata,
+};
use tokio::fs::File;
use tokio::select;
@@ -503,8 +505,12 @@ pub async fn read_object(
let cancel = CancellationToken::new();
+ let opts = DownloadOpts {
+ byte_start: std::ops::Bound::Included(offset),
+ ..Default::default()
+ };
let download = storage
- .download_storage_object(Some((offset, None)), file_path, &cancel)
+ .download(file_path, &opts, &cancel)
.await
.with_context(|| {
format!("Failed to open WAL segment download stream for remote path {file_path:?}")
diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs
index 4864a021fe..9d2182d44c 100644
--- a/storage_controller/src/reconciler.rs
+++ b/storage_controller/src/reconciler.rs
@@ -22,7 +22,7 @@ use utils::sync::gate::GateGuard;
use crate::compute_hook::{ComputeHook, NotifyError};
use crate::node::Node;
-use crate::tenant_shard::{IntentState, ObservedState, ObservedStateLocation};
+use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
@@ -45,8 +45,15 @@ pub(super) struct Reconciler {
pub(crate) reconciler_config: ReconcilerConfig,
pub(crate) config: TenantConfig,
+
+ /// Observed state from the point of view of the reconciler.
+ /// This gets updated as the reconciliation makes progress.
pub(crate) observed: ObservedState,
+ /// Snapshot of the observed state at the point when the reconciler
+ /// was spawned.
+ pub(crate) original_observed: ObservedState,
+
pub(crate) service_config: service::Config,
/// A hook to notify the running postgres instances when we change the location
@@ -846,6 +853,39 @@ impl Reconciler {
}
}
+ /// Compare the observed state snapshot from when the reconcile was created
+ /// with the final observed state in order to generate observed state deltas.
+ pub(crate) fn observed_deltas(&self) -> Vec {
+ let mut deltas = Vec::default();
+
+ for (node_id, location) in &self.observed.locations {
+ let previous_location = self.original_observed.locations.get(node_id);
+ let do_upsert = match previous_location {
+ // Location config changed for node
+ Some(prev) if location.conf != prev.conf => true,
+ // New location config for node
+ None => true,
+ // Location config has not changed for node
+ _ => false,
+ };
+
+ if do_upsert {
+ deltas.push(ObservedStateDelta::Upsert(Box::new((
+ *node_id,
+ location.clone(),
+ ))));
+ }
+ }
+
+ for node_id in self.original_observed.locations.keys() {
+ if !self.observed.locations.contains_key(node_id) {
+ deltas.push(ObservedStateDelta::Delete(*node_id));
+ }
+ }
+
+ deltas
+ }
+
/// Keep trying to notify the compute indefinitely, only dropping out if:
/// - the node `origin` becomes unavailable -> Ok(())
/// - the node `origin` no longer has our tenant shard attached -> Ok(())
diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs
index c349e2b9bf..cc735dc27e 100644
--- a/storage_controller/src/service.rs
+++ b/storage_controller/src/service.rs
@@ -28,8 +28,8 @@ use crate::{
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
- MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
- ScheduleOptimizationAction,
+ MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
+ ScheduleOptimization, ScheduleOptimizationAction,
},
};
use anyhow::Context;
@@ -1072,7 +1072,7 @@ impl Service {
tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
sequence=%result.sequence
))]
- fn process_result(&self, mut result: ReconcileResult) {
+ fn process_result(&self, result: ReconcileResult) {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
let Some(tenant) = tenants.get_mut(&result.tenant_shard_id) else {
@@ -1094,22 +1094,27 @@ impl Service {
// In case a node was deleted while this reconcile is in flight, filter it out of the update we will
// make to the tenant
- result
- .observed
- .locations
- .retain(|node_id, _loc| nodes.contains_key(node_id));
+ let deltas = result.observed_deltas.into_iter().flat_map(|delta| {
+ // In case a node was deleted while this reconcile is in flight, filter it out of the update we will
+ // make to the tenant
+ let node = nodes.get(delta.node_id())?;
+
+ if node.is_available() {
+ return Some(delta);
+ }
+
+ // In case a node became unavailable concurrently with the reconcile, observed
+ // locations on it are now uncertain. By convention, set them to None in order
+ // for them to get refreshed when the node comes back online.
+ Some(ObservedStateDelta::Upsert(Box::new((
+ node.get_id(),
+ ObservedStateLocation { conf: None },
+ ))))
+ });
match result.result {
Ok(()) => {
- for (node_id, loc) in &result.observed.locations {
- if let Some(conf) = &loc.conf {
- tracing::info!("Updating observed location {}: {:?}", node_id, conf);
- } else {
- tracing::info!("Setting observed location {} to None", node_id,)
- }
- }
-
- tenant.observed = result.observed;
+ tenant.apply_observed_deltas(deltas);
tenant.waiter.advance(result.sequence);
}
Err(e) => {
@@ -1131,9 +1136,10 @@ impl Service {
// so that waiters will see the correct error after waiting.
tenant.set_last_error(result.sequence, e);
- for (node_id, o) in result.observed.locations {
- tenant.observed.locations.insert(node_id, o);
- }
+ // Skip deletions on reconcile failures
+ let upsert_deltas =
+ deltas.filter(|delta| matches!(delta, ObservedStateDelta::Upsert(_)));
+ tenant.apply_observed_deltas(upsert_deltas);
}
}
diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs
index 2e85580e08..8a7ff866e6 100644
--- a/storage_controller/src/tenant_shard.rs
+++ b/storage_controller/src/tenant_shard.rs
@@ -425,6 +425,22 @@ pub(crate) enum ReconcileNeeded {
Yes,
}
+/// Pending modification to the observed state of a tenant shard.
+/// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
+pub(crate) enum ObservedStateDelta {
+ Upsert(Box<(NodeId, ObservedStateLocation)>),
+ Delete(NodeId),
+}
+
+impl ObservedStateDelta {
+ pub(crate) fn node_id(&self) -> &NodeId {
+ match self {
+ Self::Upsert(up) => &up.0,
+ Self::Delete(nid) => nid,
+ }
+ }
+}
+
/// When a reconcile task completes, it sends this result object
/// to be applied to the primary TenantShard.
pub(crate) struct ReconcileResult {
@@ -437,7 +453,7 @@ pub(crate) struct ReconcileResult {
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) generation: Option,
- pub(crate) observed: ObservedState,
+ pub(crate) observed_deltas: Vec,
/// Set [`TenantShard::pending_compute_notification`] from this flag
pub(crate) pending_compute_notification: bool,
@@ -1123,7 +1139,7 @@ impl TenantShard {
result,
tenant_shard_id: reconciler.tenant_shard_id,
generation: reconciler.generation,
- observed: reconciler.observed,
+ observed_deltas: reconciler.observed_deltas(),
pending_compute_notification: reconciler.compute_notify_failure,
}
}
@@ -1177,6 +1193,7 @@ impl TenantShard {
reconciler_config,
config: self.config.clone(),
observed: self.observed.clone(),
+ original_observed: self.observed.clone(),
compute_hook: compute_hook.clone(),
service_config: service_config.clone(),
_gate_guard: gate_guard,
@@ -1437,6 +1454,62 @@ impl TenantShard {
.map(|(node_id, gen)| (node_id, Generation::new(gen)))
.collect()
}
+
+ /// Update the observed state of the tenant by applying incremental deltas
+ ///
+ /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
+ /// They are then filtered in [`crate::service::Service::process_result`].
+ pub(crate) fn apply_observed_deltas(
+ &mut self,
+ deltas: impl Iterator- ,
+ ) {
+ for delta in deltas {
+ match delta {
+ ObservedStateDelta::Upsert(ups) => {
+ let (node_id, loc) = *ups;
+
+ // If the generation of the observed location in the delta is lagging
+ // behind the current one, then we have a race condition and cannot
+ // be certain about the true observed state. Set the observed state
+ // to None in order to reflect this.
+ let crnt_gen = self
+ .observed
+ .locations
+ .get(&node_id)
+ .and_then(|loc| loc.conf.as_ref())
+ .and_then(|conf| conf.generation);
+ let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
+ match (crnt_gen, new_gen) {
+ (Some(crnt), Some(new)) if crnt_gen > new_gen => {
+ tracing::warn!(
+ "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
+ node_id, loc, crnt, new
+ );
+
+ self.observed
+ .locations
+ .insert(node_id, ObservedStateLocation { conf: None });
+
+ continue;
+ }
+ _ => {}
+ }
+
+ if let Some(conf) = &loc.conf {
+ tracing::info!("Updating observed location {}: {:?}", node_id, conf);
+ } else {
+ tracing::info!("Setting observed location {} to None", node_id,)
+ }
+
+ self.observed.locations.insert(node_id, loc);
+ }
+ ObservedStateDelta::Delete(node_id) => {
+ tracing::info!("Deleting observed location {}", node_id);
+ self.observed.locations.remove(&node_id);
+ }
+ }
+ }
+ }
}
#[cfg(test)]
diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py
index aedd711dbd..26895df8a6 100644
--- a/test_runner/fixtures/endpoint/http.py
+++ b/test_runner/fixtures/endpoint/http.py
@@ -23,3 +23,8 @@ class EndpointHttpClient(requests.Session):
res = self.get(f"http://localhost:{self.port}/database_schema?database={database}")
res.raise_for_status()
return res.text
+
+ def installed_extensions(self):
+ res = self.get(f"http://localhost:{self.port}/installed_extensions")
+ res.raise_for_status()
+ return res.json()
diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py
index 2d499fd982..0d3dcd1671 100644
--- a/test_runner/fixtures/neon_cli.py
+++ b/test_runner/fixtures/neon_cli.py
@@ -446,7 +446,6 @@ class NeonLocalCli(AbstractNeonCli):
if immediate:
cmd.extend(["-m", "immediate"])
- log.info(f"Stopping pageserver with {cmd}")
return self.raw_cli(cmd)
def safekeeper_start(
diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py
index bbc29653a2..5cb9821476 100644
--- a/test_runner/fixtures/neon_fixtures.py
+++ b/test_runner/fixtures/neon_fixtures.py
@@ -3486,8 +3486,6 @@ class Endpoint(PgProtocol, LogUtils):
if safekeepers is not None:
self.active_safekeepers = safekeepers
- log.info(f"Starting postgres endpoint {self.endpoint_id}")
-
self.env.neon_cli.endpoint_start(
self.endpoint_id,
safekeepers=self.active_safekeepers,
@@ -3667,8 +3665,6 @@ class Endpoint(PgProtocol, LogUtils):
Returns self.
"""
- started_at = time.time()
-
self.create(
branch_name=branch_name,
endpoint_id=endpoint_id,
@@ -3684,8 +3680,6 @@ class Endpoint(PgProtocol, LogUtils):
basebackup_request_tries=basebackup_request_tries,
)
- log.info(f"Postgres startup took {time.time() - started_at} seconds")
-
return self
def __enter__(self) -> Endpoint:
@@ -3921,7 +3915,6 @@ class Safekeeper(LogUtils):
return self
def stop(self, immediate: bool = False) -> Safekeeper:
- log.info(f"Stopping safekeeper {self.id}")
self.env.neon_cli.safekeeper_stop(self.id, immediate)
self.running = False
return self
diff --git a/test_runner/fixtures/remote_storage.py b/test_runner/fixtures/remote_storage.py
index a527b810df..20e6bd9318 100644
--- a/test_runner/fixtures/remote_storage.py
+++ b/test_runner/fixtures/remote_storage.py
@@ -5,13 +5,13 @@ import hashlib
import json
import os
import re
-import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Union
import boto3
import toml
+from moto.server import ThreadedMotoServer
from mypy_boto3_s3 import S3Client
from fixtures.common_types import TenantId, TenantShardId, TimelineId
@@ -43,7 +43,6 @@ class RemoteStorageUser(str, enum.Enum):
class MockS3Server:
"""
Starts a mock S3 server for testing on a port given, errors if the server fails to start or exits prematurely.
- Relies that `poetry` and `moto` server are installed, since it's the way the tests are run.
Also provides a set of methods to derive the connection properties from and the method to kill the underlying server.
"""
@@ -53,22 +52,8 @@ class MockS3Server:
port: int,
):
self.port = port
-
- # XXX: do not use `shell=True` or add `exec ` to the command here otherwise.
- # We use `self.subprocess.kill()` to shut down the server, which would not "just" work in Linux
- # if a process is started from the shell process.
- self.subprocess = subprocess.Popen(["poetry", "run", "moto_server", f"-p{port}"])
- error = None
- try:
- return_code = self.subprocess.poll()
- if return_code is not None:
- error = f"expected mock s3 server to run but it exited with code {return_code}. stdout: '{self.subprocess.stdout}', stderr: '{self.subprocess.stderr}'"
- except Exception as e:
- error = f"expected mock s3 server to start but it failed with exception: {e}. stdout: '{self.subprocess.stdout}', stderr: '{self.subprocess.stderr}'"
- if error is not None:
- log.error(error)
- self.kill()
- raise RuntimeError("failed to start s3 mock server")
+ self.server = ThreadedMotoServer(port=port)
+ self.server.start()
def endpoint(self) -> str:
return f"http://127.0.0.1:{self.port}"
@@ -83,7 +68,7 @@ class MockS3Server:
return "test"
def kill(self):
- self.subprocess.kill()
+ self.server.stop()
@dataclass
diff --git a/test_runner/performance/test_perf_olap.py b/test_runner/performance/test_perf_olap.py
index 6dcde91b76..bc4ab64105 100644
--- a/test_runner/performance/test_perf_olap.py
+++ b/test_runner/performance/test_perf_olap.py
@@ -192,7 +192,7 @@ def tpch_queuies() -> tuple[ParameterSet, ...]:
- querues in returning tuple are ordered by the query number
- pytest parameters id is adjusted to match the query id (the numbering starts from 1)
"""
- queries_dir = Path(__file__).parent / "performance" / "tpc-h" / "queries"
+ queries_dir = Path(__file__).parent / "tpc-h" / "queries"
assert queries_dir.exists(), f"TPC-H queries dir not found: {queries_dir}"
return tuple(
diff --git a/test_runner/regress/test_installed_extensions.py b/test_runner/regress/test_installed_extensions.py
new file mode 100644
index 0000000000..4700db85ee
--- /dev/null
+++ b/test_runner/regress/test_installed_extensions.py
@@ -0,0 +1,87 @@
+from logging import info
+
+from fixtures.neon_fixtures import NeonEnv
+
+
+def test_installed_extensions(neon_simple_env: NeonEnv):
+ """basic test for the endpoint that returns the list of installed extensions"""
+
+ env = neon_simple_env
+
+ env.create_branch("test_installed_extensions")
+
+ endpoint = env.endpoints.create_start("test_installed_extensions")
+
+ endpoint.safe_psql("CREATE DATABASE test_installed_extensions")
+ endpoint.safe_psql("CREATE DATABASE test_installed_extensions_2")
+
+ client = endpoint.http_client()
+ res = client.installed_extensions()
+
+ info("Extensions list: %s", res)
+ info("Extensions: %s", res["extensions"])
+ # 'plpgsql' is a default extension that is always installed.
+ assert any(
+ ext["extname"] == "plpgsql" and ext["versions"] == ["1.0"] for ext in res["extensions"]
+ ), "The 'plpgsql' extension is missing"
+
+ # check that the neon_test_utils extension is not installed
+ assert not any(
+ ext["extname"] == "neon_test_utils" for ext in res["extensions"]
+ ), "The 'neon_test_utils' extension is installed"
+
+ pg_conn = endpoint.connect(dbname="test_installed_extensions")
+ with pg_conn.cursor() as cur:
+ cur.execute("CREATE EXTENSION neon_test_utils")
+ cur.execute(
+ "SELECT default_version FROM pg_available_extensions WHERE name = 'neon_test_utils'"
+ )
+ res = cur.fetchone()
+ neon_test_utils_version = res[0]
+
+ with pg_conn.cursor() as cur:
+ cur.execute("CREATE EXTENSION neon version '1.1'")
+
+ pg_conn_2 = endpoint.connect(dbname="test_installed_extensions_2")
+ with pg_conn_2.cursor() as cur:
+ cur.execute("CREATE EXTENSION neon version '1.2'")
+
+ res = client.installed_extensions()
+
+ info("Extensions list: %s", res)
+ info("Extensions: %s", res["extensions"])
+
+ # check that the neon_test_utils extension is installed only in 1 database
+ # and has the expected version
+ assert any(
+ ext["extname"] == "neon_test_utils"
+ and ext["versions"] == [neon_test_utils_version]
+ and ext["n_databases"] == 1
+ for ext in res["extensions"]
+ )
+
+ # check that the plpgsql extension is installed in all databases
+ # this is a default extension that is always installed
+ assert any(ext["extname"] == "plpgsql" and ext["n_databases"] == 4 for ext in res["extensions"])
+
+ # check that the neon extension is installed and has expected versions
+ for ext in res["extensions"]:
+ if ext["extname"] == "neon":
+ assert ext["n_databases"] == 2
+ ext["versions"].sort()
+ assert ext["versions"] == ["1.1", "1.2"]
+
+ with pg_conn.cursor() as cur:
+ cur.execute("ALTER EXTENSION neon UPDATE TO '1.3'")
+
+ res = client.installed_extensions()
+
+ info("Extensions list: %s", res)
+ info("Extensions: %s", res["extensions"])
+
+ # check that the neon_test_utils extension is updated
+ for ext in res["extensions"]:
+ if ext["extname"] == "neon":
+ assert ext["n_databases"] == 2
+ ext["versions"].sort()
+ assert ext["versions"] == ["1.2", "1.3"]
diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py
index d13cf0019d..577a3a25ca 100644
--- a/test_runner/regress/test_pageserver_generations.py
+++ b/test_runner/regress/test_pageserver_generations.py
@@ -666,14 +666,17 @@ def test_upgrade_generationless_local_file_paths(
pageserver.stop()
timeline_dir = pageserver.timeline_dir(tenant_id, timeline_id)
files_renamed = 0
+ log.info(f"Renaming files in {timeline_dir}")
for filename in os.listdir(timeline_dir):
- path = os.path.join(timeline_dir, filename)
- log.info(f"Found file {path}")
- if path.endswith("-v1-00000001"):
- new_path = path[:-12]
- os.rename(path, new_path)
- log.info(f"Renamed {path} -> {new_path}")
+ if filename.endswith("-v1-00000001"):
+ new_filename = filename[:-12]
+ os.rename(
+ os.path.join(timeline_dir, filename), os.path.join(timeline_dir, new_filename)
+ )
+ log.info(f"Renamed {filename} -> {new_filename}")
files_renamed += 1
+ else:
+ log.info(f"Keeping {filename}")
assert files_renamed > 0
diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py
index d372e2d461..d803cd7c78 100644
--- a/test_runner/regress/test_wal_acceptor.py
+++ b/test_runner/regress/test_wal_acceptor.py
@@ -2314,12 +2314,12 @@ def test_s3_eviction(
]
if delete_offloaded_wal:
neon_env_builder.safekeeper_extra_opts.append("--delete-offloaded-wal")
-
- env = neon_env_builder.init_start(
- initial_tenant_conf={
- "checkpoint_timeout": "100ms",
- }
- )
+ # make lagging_wal_timeout small to force pageserver quickly forget about
+ # safekeeper after it stops sending updates (timeline is deactivated) to
+ # make test faster. Won't be needed with
+ # https://github.com/neondatabase/neon/issues/8148 fixed.
+ initial_tenant_conf = {"lagging_wal_timeout": "1s", "checkpoint_timeout": "100ms"}
+ env = neon_env_builder.init_start(initial_tenant_conf=initial_tenant_conf)
n_timelines = 5
@@ -2407,9 +2407,37 @@ def test_s3_eviction(
and sk.log_contains("successfully restored evicted timeline")
for sk in env.safekeepers
)
-
assert event_metrics_seen
+ # test safekeeper_evicted_timelines metric
+ log.info("testing safekeeper_evicted_timelines metric")
+ # checkpoint pageserver to force remote_consistent_lsn update
+ for i in range(n_timelines):
+ ps_client.timeline_checkpoint(env.initial_tenant, timelines[i], wait_until_uploaded=True)
+ for ep in endpoints:
+ log.info(ep.is_running())
+ sk = env.safekeepers[0]
+
+ # all timelines must be evicted eventually
+ def all_evicted():
+ n_evicted = sk.http_client().get_metric_value("safekeeper_evicted_timelines")
+ assert n_evicted # make mypy happy
+ assert int(n_evicted) == n_timelines
+
+ wait_until(60, 0.5, all_evicted)
+ # restart should preserve the metric value
+ sk.stop().start()
+ wait_until(60, 0.5, all_evicted)
+ # and endpoint start should reduce is
+ endpoints[0].start()
+
+ def one_unevicted():
+ n_evicted = sk.http_client().get_metric_value("safekeeper_evicted_timelines")
+ assert n_evicted # make mypy happy
+ assert int(n_evicted) < n_timelines
+
+ wait_until(60, 0.5, one_unevicted)
+
# Test resetting uploaded partial segment state.
def test_backup_partial_reset(neon_env_builder: NeonEnvBuilder):