mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 20:50:37 +00:00
Compare commits
12 Commits
release-57
...
vlad/updat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
24a3a549c1 | ||
|
|
219e78f885 | ||
|
|
1ea5d8b132 | ||
|
|
3d760938e1 | ||
|
|
9211de0df7 | ||
|
|
d8ffe662a9 | ||
|
|
a4db2af1f0 | ||
|
|
47fdf93cf0 | ||
|
|
de05f90735 | ||
|
|
188797f048 | ||
|
|
5446e08891 | ||
|
|
78d9059fc7 |
20
Cargo.lock
generated
20
Cargo.lock
generated
@@ -2993,9 +2993,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
|
||||
|
||||
[[package]]
|
||||
name = "measured"
|
||||
version = "0.0.21"
|
||||
version = "0.0.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "652bc741286361c06de8cb4d89b21a6437f120c508c51713663589eeb9928ac5"
|
||||
checksum = "3051f3a030d55d680cdef6ca50e80abd1182f8da29f2344a7c9cb575721138f0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"crossbeam-utils",
|
||||
@@ -3011,9 +3011,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "measured-derive"
|
||||
version = "0.0.21"
|
||||
version = "0.0.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ea497f33e1e856a376c32ad916f69a0bd3c597db1f912a399f842b01a4a685d"
|
||||
checksum = "b9e6777fc80a575f9503d908c8b498782a6c3ee88a06cb416dc3941401e43b94"
|
||||
dependencies = [
|
||||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
@@ -3023,9 +3023,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "measured-process"
|
||||
version = "0.0.21"
|
||||
version = "0.0.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b364ccb66937a814b6b2ad751d1a2f7a9d5a78c761144036825fb36bb0771000"
|
||||
checksum = "7c4b80445aeb08e832d87bf1830049a924cdc1d6b7ef40b6b9b365bff17bf8ec"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"measured",
|
||||
@@ -4005,7 +4005,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#cff6927e4f58b1af6ecc2ee7279df1f2ff537295"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4018,7 +4018,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#cff6927e4f58b1af6ecc2ee7279df1f2ff537295"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -4037,7 +4037,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#cff6927e4f58b1af6ecc2ee7279df1f2ff537295"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -6210,7 +6210,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#cff6927e4f58b1af6ecc2ee7279df1f2ff537295"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
|
||||
@@ -111,8 +111,8 @@ lasso = "0.7"
|
||||
leaky-bucket = "1.0.1"
|
||||
libc = "0.2"
|
||||
md5 = "0.7.0"
|
||||
measured = { version = "0.0.21", features=["lasso"] }
|
||||
measured-process = { version = "0.0.21" }
|
||||
measured = { version = "0.0.22", features=["lasso"] }
|
||||
measured-process = { version = "0.0.22" }
|
||||
memoffset = "0.8"
|
||||
nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] }
|
||||
notify = "6.0.0"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use futures::StreamExt;
|
||||
use std::{collections::HashMap, str::FromStr, time::Duration};
|
||||
use std::{str::FromStr, time::Duration};
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use pageserver_api::{
|
||||
@@ -21,7 +21,7 @@ use utils::id::{NodeId, TenantId};
|
||||
|
||||
use pageserver_api::controller_api::{
|
||||
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
|
||||
TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
};
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
@@ -110,12 +110,6 @@ enum Command {
|
||||
#[arg(long)]
|
||||
config: String,
|
||||
},
|
||||
/// Attempt to balance the locations for a tenant across pageservers. This is a client-side
|
||||
/// alternative to the storage controller's scheduling optimization behavior.
|
||||
TenantScatter {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
},
|
||||
/// Print details about a particular tenant, including all its shards' states.
|
||||
TenantDescribe {
|
||||
#[arg(long)]
|
||||
@@ -498,88 +492,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
Command::TenantScatter { tenant_id } => {
|
||||
// Find the shards
|
||||
let locate_response = storcon_client
|
||||
.dispatch::<(), TenantLocateResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/tenant/{tenant_id}/locate"),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let shards = locate_response.shards;
|
||||
|
||||
let mut node_to_shards: HashMap<NodeId, Vec<TenantShardId>> = HashMap::new();
|
||||
let shard_count = shards.len();
|
||||
for s in shards {
|
||||
let entry = node_to_shards.entry(s.node_id).or_default();
|
||||
entry.push(s.shard_id);
|
||||
}
|
||||
|
||||
// Load list of available nodes
|
||||
let nodes_resp = storcon_client
|
||||
.dispatch::<(), Vec<NodeDescribeResponse>>(
|
||||
Method::GET,
|
||||
"control/v1/node".to_string(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
for node in nodes_resp {
|
||||
if matches!(node.availability, NodeAvailabilityWrapper::Active) {
|
||||
node_to_shards.entry(node.id).or_default();
|
||||
}
|
||||
}
|
||||
|
||||
let max_shard_per_node = shard_count / node_to_shards.len();
|
||||
|
||||
loop {
|
||||
let mut migrate_shard = None;
|
||||
for shards in node_to_shards.values_mut() {
|
||||
if shards.len() > max_shard_per_node {
|
||||
// Pick the emptiest
|
||||
migrate_shard = Some(shards.pop().unwrap());
|
||||
}
|
||||
}
|
||||
let Some(migrate_shard) = migrate_shard else {
|
||||
break;
|
||||
};
|
||||
|
||||
// Pick the emptiest node to migrate to
|
||||
let mut destinations = node_to_shards
|
||||
.iter()
|
||||
.map(|(k, v)| (k, v.len()))
|
||||
.collect::<Vec<_>>();
|
||||
destinations.sort_by_key(|i| i.1);
|
||||
let (destination_node, destination_count) = *destinations.first().unwrap();
|
||||
if destination_count + 1 > max_shard_per_node {
|
||||
// Even the emptiest destination doesn't have space: we're done
|
||||
break;
|
||||
}
|
||||
let destination_node = *destination_node;
|
||||
|
||||
node_to_shards
|
||||
.get_mut(&destination_node)
|
||||
.unwrap()
|
||||
.push(migrate_shard);
|
||||
|
||||
println!("Migrate {} -> {} ...", migrate_shard, destination_node);
|
||||
|
||||
storcon_client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{migrate_shard}/migrate"),
|
||||
Some(TenantShardMigrateRequest {
|
||||
tenant_shard_id: migrate_shard,
|
||||
node_id: destination_node,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
println!("Migrate {} -> {} OK", migrate_shard, destination_node);
|
||||
}
|
||||
|
||||
// Spread the shards across the nodes
|
||||
}
|
||||
Command::TenantDescribe { tenant_id } => {
|
||||
let describe_response = storcon_client
|
||||
.dispatch::<(), TenantDescribeResponse>(
|
||||
|
||||
@@ -13,11 +13,7 @@ use std::{
|
||||
|
||||
use measured::{
|
||||
label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor},
|
||||
metric::{
|
||||
group::{Encoding, MetricValue},
|
||||
name::MetricNameEncoder,
|
||||
Metric, MetricType, MetricVec,
|
||||
},
|
||||
metric::{gauge::write_gauge, name::MetricNameEncoder, Metric, MetricType, MetricVec},
|
||||
text::TextEncoder,
|
||||
LabelGroup,
|
||||
};
|
||||
@@ -182,12 +178,13 @@ impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEnc
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.try_for_each(|(hll_shard, val)| {
|
||||
enc.write_metric_value(
|
||||
write_gauge(
|
||||
enc,
|
||||
name.by_ref(),
|
||||
labels.by_ref().compose_with(HllShardLabel {
|
||||
hll_shard: hll_shard as i64,
|
||||
}),
|
||||
MetricValue::Int(val as i64),
|
||||
val as i64,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -8,8 +8,8 @@ use measured::{
|
||||
label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels},
|
||||
metric::{
|
||||
counter::CounterState,
|
||||
gauge::GaugeState,
|
||||
group::{Encoding, MetricValue},
|
||||
gauge::{write_gauge, GaugeState},
|
||||
group::Encoding,
|
||||
name::{MetricName, MetricNameEncoder},
|
||||
MetricEncoding, MetricFamilyEncoding,
|
||||
},
|
||||
@@ -165,15 +165,6 @@ pub struct LibMetrics {
|
||||
serve_count: CollectionCounter,
|
||||
}
|
||||
|
||||
fn write_gauge<Enc: Encoding>(
|
||||
x: i64,
|
||||
labels: impl LabelGroup,
|
||||
name: impl MetricNameEncoder,
|
||||
enc: &mut Enc,
|
||||
) -> Result<(), Enc::Err> {
|
||||
enc.write_metric_value(name, labels, MetricValue::Int(x))
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Rusage;
|
||||
|
||||
@@ -199,12 +190,12 @@ where
|
||||
"Bytes written and read from disk, grouped by the operation (read|write)",
|
||||
)?;
|
||||
GaugeState::write_type(DISK_IO, enc)?;
|
||||
write_gauge(ru.ru_inblock * BYTES_IN_BLOCK, IoOp::Read, DISK_IO, enc)?;
|
||||
write_gauge(ru.ru_oublock * BYTES_IN_BLOCK, IoOp::Write, DISK_IO, enc)?;
|
||||
write_gauge(enc, DISK_IO, IoOp::Read, ru.ru_inblock * BYTES_IN_BLOCK)?;
|
||||
write_gauge(enc, DISK_IO, IoOp::Write, ru.ru_oublock * BYTES_IN_BLOCK)?;
|
||||
|
||||
enc.write_help(MAXRSS, "Memory usage (Maximum Resident Set Size)")?;
|
||||
GaugeState::write_type(MAXRSS, enc)?;
|
||||
write_gauge(ru.ru_maxrss, IoOp::Read, MAXRSS, enc)?;
|
||||
write_gauge(enc, MAXRSS, IoOp::Read, ru.ru_maxrss)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -543,15 +534,6 @@ impl<T: Encoding> Encoding for Inc<T> {
|
||||
fn write_help(&mut self, name: impl MetricNameEncoder, help: &str) -> Result<(), Self::Err> {
|
||||
self.0.write_help(name, help)
|
||||
}
|
||||
|
||||
fn write_metric_value(
|
||||
&mut self,
|
||||
name: impl MetricNameEncoder,
|
||||
labels: impl LabelGroup,
|
||||
value: MetricValue,
|
||||
) -> Result<(), Self::Err> {
|
||||
self.0.write_metric_value(name, labels, value)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Encoding> MetricEncoding<Inc<T>> for MeasuredCounterPairState
|
||||
@@ -578,15 +560,6 @@ impl<T: Encoding> Encoding for Dec<T> {
|
||||
fn write_help(&mut self, name: impl MetricNameEncoder, help: &str) -> Result<(), Self::Err> {
|
||||
self.0.write_help(name, help)
|
||||
}
|
||||
|
||||
fn write_metric_value(
|
||||
&mut self,
|
||||
name: impl MetricNameEncoder,
|
||||
labels: impl LabelGroup,
|
||||
value: MetricValue,
|
||||
) -> Result<(), Self::Err> {
|
||||
self.0.write_metric_value(name, labels, value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Write the dec counter to the encoder
|
||||
|
||||
@@ -144,20 +144,7 @@ impl PgConnectionConfig {
|
||||
// implement and this function is hardly a bottleneck. The function is only called around
|
||||
// establishing a new connection.
|
||||
#[allow(unstable_name_collisions)]
|
||||
config.options(
|
||||
&self
|
||||
.options
|
||||
.iter()
|
||||
.map(|s| {
|
||||
if s.contains(['\\', ' ']) {
|
||||
Cow::Owned(s.replace('\\', "\\\\").replace(' ', "\\ "))
|
||||
} else {
|
||||
Cow::Borrowed(s.as_str())
|
||||
}
|
||||
})
|
||||
.intersperse(Cow::Borrowed(" ")) // TODO: use impl from std once it's stabilized
|
||||
.collect::<String>(),
|
||||
);
|
||||
config.options(&encode_options(&self.options));
|
||||
}
|
||||
config
|
||||
}
|
||||
@@ -178,6 +165,21 @@ impl PgConnectionConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unstable_name_collisions)]
|
||||
fn encode_options(options: &[String]) -> String {
|
||||
options
|
||||
.iter()
|
||||
.map(|s| {
|
||||
if s.contains(['\\', ' ']) {
|
||||
Cow::Owned(s.replace('\\', "\\\\").replace(' ', "\\ "))
|
||||
} else {
|
||||
Cow::Borrowed(s.as_str())
|
||||
}
|
||||
})
|
||||
.intersperse(Cow::Borrowed(" ")) // TODO: use impl from std once it's stabilized
|
||||
.collect::<String>()
|
||||
}
|
||||
|
||||
impl fmt::Display for PgConnectionConfig {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
// The password is intentionally hidden and not part of this display string.
|
||||
@@ -206,7 +208,7 @@ impl fmt::Debug for PgConnectionConfig {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests_pg_connection_config {
|
||||
use crate::PgConnectionConfig;
|
||||
use crate::{encode_options, PgConnectionConfig};
|
||||
use once_cell::sync::Lazy;
|
||||
use url::Host;
|
||||
|
||||
@@ -255,18 +257,12 @@ mod tests_pg_connection_config {
|
||||
|
||||
#[test]
|
||||
fn test_with_options() {
|
||||
let cfg = PgConnectionConfig::new_host_port(STUB_HOST.clone(), 123).extend_options([
|
||||
"hello",
|
||||
"world",
|
||||
"with space",
|
||||
"and \\ backslashes",
|
||||
let options = encode_options(&[
|
||||
"hello".to_owned(),
|
||||
"world".to_owned(),
|
||||
"with space".to_owned(),
|
||||
"and \\ backslashes".to_owned(),
|
||||
]);
|
||||
assert_eq!(cfg.host(), &*STUB_HOST);
|
||||
assert_eq!(cfg.port(), 123);
|
||||
assert_eq!(cfg.raw_address(), "stub.host.example:123");
|
||||
assert_eq!(
|
||||
cfg.to_tokio_postgres_config().get_options(),
|
||||
Some("hello world with\\ space and\\ \\\\\\ backslashes")
|
||||
);
|
||||
assert_eq!(options, "hello world with\\ space and\\ \\\\\\ backslashes");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ use utils::backoff;
|
||||
|
||||
use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
|
||||
use crate::{
|
||||
error::Cancelled, AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing,
|
||||
config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError, Listing,
|
||||
ListingMode, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
|
||||
};
|
||||
|
||||
|
||||
277
libs/remote_storage/src/config.rs
Normal file
277
libs/remote_storage/src/config.rs
Normal file
@@ -0,0 +1,277 @@
|
||||
use std::{fmt::Debug, num::NonZeroUsize, str::FromStr, time::Duration};
|
||||
|
||||
use anyhow::bail;
|
||||
use aws_sdk_s3::types::StorageClass;
|
||||
use camino::Utf8PathBuf;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT,
|
||||
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
};
|
||||
|
||||
/// External backup storage configuration, enough for creating a client for that storage.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
|
||||
pub struct RemoteStorageConfig {
|
||||
/// The storage connection configuration.
|
||||
#[serde(flatten)]
|
||||
pub storage: RemoteStorageKind,
|
||||
/// A common timeout enforced for all requests after concurrency limiter permit has been
|
||||
/// acquired.
|
||||
#[serde(
|
||||
with = "humantime_serde",
|
||||
default = "default_timeout",
|
||||
skip_serializing_if = "is_default_timeout"
|
||||
)]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
fn default_timeout() -> Duration {
|
||||
RemoteStorageConfig::DEFAULT_TIMEOUT
|
||||
}
|
||||
|
||||
fn is_default_timeout(d: &Duration) -> bool {
|
||||
*d == RemoteStorageConfig::DEFAULT_TIMEOUT
|
||||
}
|
||||
|
||||
/// A kind of a remote storage to connect to, with its connection configuration.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum RemoteStorageKind {
|
||||
/// Storage based on local file system.
|
||||
/// Specify a root folder to place all stored files into.
|
||||
LocalFs { local_path: Utf8PathBuf },
|
||||
/// AWS S3 based storage, storing all files in the S3 bucket
|
||||
/// specified by the config
|
||||
AwsS3(S3Config),
|
||||
/// Azure Blob based storage, storing all files in the container
|
||||
/// specified by the config
|
||||
AzureContainer(AzureConfig),
|
||||
}
|
||||
|
||||
/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
|
||||
#[derive(Clone, PartialEq, Eq, Deserialize, Serialize)]
|
||||
pub struct S3Config {
|
||||
/// Name of the bucket to connect to.
|
||||
pub bucket_name: String,
|
||||
/// The region where the bucket is located at.
|
||||
pub bucket_region: String,
|
||||
/// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
|
||||
pub prefix_in_bucket: Option<String>,
|
||||
/// A base URL to send S3 requests to.
|
||||
/// By default, the endpoint is derived from a region name, assuming it's
|
||||
/// an AWS S3 region name, erroring on wrong region name.
|
||||
/// Endpoint provides a way to support other S3 flavors and their regions.
|
||||
///
|
||||
/// Example: `http://127.0.0.1:5000`
|
||||
pub endpoint: Option<String>,
|
||||
/// AWS S3 has various limits on its API calls, we need not to exceed those.
|
||||
/// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
|
||||
#[serde(default = "default_remote_storage_s3_concurrency_limit")]
|
||||
pub concurrency_limit: NonZeroUsize,
|
||||
#[serde(default = "default_max_keys_per_list_response")]
|
||||
pub max_keys_per_list_response: Option<i32>,
|
||||
#[serde(
|
||||
deserialize_with = "deserialize_storage_class",
|
||||
serialize_with = "serialize_storage_class",
|
||||
default
|
||||
)]
|
||||
pub upload_storage_class: Option<StorageClass>,
|
||||
}
|
||||
|
||||
fn default_remote_storage_s3_concurrency_limit() -> NonZeroUsize {
|
||||
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
|
||||
.try_into()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn default_max_keys_per_list_response() -> Option<i32> {
|
||||
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
|
||||
}
|
||||
|
||||
impl Debug for S3Config {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("S3Config")
|
||||
.field("bucket_name", &self.bucket_name)
|
||||
.field("bucket_region", &self.bucket_region)
|
||||
.field("prefix_in_bucket", &self.prefix_in_bucket)
|
||||
.field("concurrency_limit", &self.concurrency_limit)
|
||||
.field(
|
||||
"max_keys_per_list_response",
|
||||
&self.max_keys_per_list_response,
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
|
||||
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct AzureConfig {
|
||||
/// Name of the container to connect to.
|
||||
pub container_name: String,
|
||||
/// Name of the storage account the container is inside of
|
||||
pub storage_account: Option<String>,
|
||||
/// The region where the bucket is located at.
|
||||
pub container_region: String,
|
||||
/// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
|
||||
pub prefix_in_container: Option<String>,
|
||||
/// Azure has various limits on its API calls, we need not to exceed those.
|
||||
/// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
|
||||
#[serde(default = "default_remote_storage_azure_concurrency_limit")]
|
||||
pub concurrency_limit: NonZeroUsize,
|
||||
#[serde(default = "default_max_keys_per_list_response")]
|
||||
pub max_keys_per_list_response: Option<i32>,
|
||||
}
|
||||
|
||||
fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
|
||||
NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap()
|
||||
}
|
||||
|
||||
impl Debug for AzureConfig {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("AzureConfig")
|
||||
.field("bucket_name", &self.container_name)
|
||||
.field("storage_account", &self.storage_account)
|
||||
.field("bucket_region", &self.container_region)
|
||||
.field("prefix_in_container", &self.prefix_in_container)
|
||||
.field("concurrency_limit", &self.concurrency_limit)
|
||||
.field(
|
||||
"max_keys_per_list_response",
|
||||
&self.max_keys_per_list_response,
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize_storage_class<'de, D: serde::Deserializer<'de>>(
|
||||
deserializer: D,
|
||||
) -> Result<Option<StorageClass>, D::Error> {
|
||||
Option::<String>::deserialize(deserializer).and_then(|s| {
|
||||
if let Some(s) = s {
|
||||
use serde::de::Error;
|
||||
let storage_class = StorageClass::from_str(&s).expect("infallible");
|
||||
#[allow(deprecated)]
|
||||
if matches!(storage_class, StorageClass::Unknown(_)) {
|
||||
return Err(D::Error::custom(format!(
|
||||
"Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}",
|
||||
StorageClass::values()
|
||||
)));
|
||||
}
|
||||
Ok(Some(storage_class))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn serialize_storage_class<S: serde::Serializer>(
|
||||
val: &Option<StorageClass>,
|
||||
serializer: S,
|
||||
) -> Result<S::Ok, S::Error> {
|
||||
let val = val.as_ref().map(StorageClass::as_str);
|
||||
Option::<&str>::serialize(&val, serializer)
|
||||
}
|
||||
|
||||
impl RemoteStorageConfig {
|
||||
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
|
||||
|
||||
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
|
||||
let document: toml_edit::Document = match toml {
|
||||
toml_edit::Item::Table(toml) => toml.clone().into(),
|
||||
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
|
||||
toml.clone().into_table().into()
|
||||
}
|
||||
_ => bail!("toml not a table or inline table"),
|
||||
};
|
||||
|
||||
if document.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(Some(toml_edit::de::from_document(document)?))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn parse(input: &str) -> anyhow::Result<Option<RemoteStorageConfig>> {
|
||||
let toml = input.parse::<toml_edit::Document>().unwrap();
|
||||
RemoteStorageConfig::from_toml(toml.as_item())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_localfs_config_with_timeout() {
|
||||
let input = "local_path = '.'
|
||||
timeout = '5s'";
|
||||
|
||||
let config = parse(input).unwrap().expect("it exists");
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs {
|
||||
local_path: Utf8PathBuf::from(".")
|
||||
},
|
||||
timeout: Duration::from_secs(5)
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_s3_parsing() {
|
||||
let toml = "\
|
||||
bucket_name = 'foo-bar'
|
||||
bucket_region = 'eu-central-1'
|
||||
upload_storage_class = 'INTELLIGENT_TIERING'
|
||||
timeout = '7s'
|
||||
";
|
||||
|
||||
let config = parse(toml).unwrap().expect("it exists");
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::AwsS3(S3Config {
|
||||
bucket_name: "foo-bar".into(),
|
||||
bucket_region: "eu-central-1".into(),
|
||||
prefix_in_bucket: None,
|
||||
endpoint: None,
|
||||
concurrency_limit: default_remote_storage_s3_concurrency_limit(),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
upload_storage_class: Some(StorageClass::IntelligentTiering),
|
||||
}),
|
||||
timeout: Duration::from_secs(7)
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_azure_parsing() {
|
||||
let toml = "\
|
||||
container_name = 'foo-bar'
|
||||
container_region = 'westeurope'
|
||||
upload_storage_class = 'INTELLIGENT_TIERING'
|
||||
timeout = '7s'
|
||||
";
|
||||
|
||||
let config = parse(toml).unwrap().expect("it exists");
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::AzureContainer(AzureConfig {
|
||||
container_name: "foo-bar".into(),
|
||||
storage_account: None,
|
||||
container_region: "westeurope".into(),
|
||||
prefix_in_container: None,
|
||||
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
}),
|
||||
timeout: Duration::from_secs(7)
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
mod azure_blob;
|
||||
mod config;
|
||||
mod error;
|
||||
mod local_fs;
|
||||
mod metrics;
|
||||
@@ -18,17 +19,10 @@ mod simulate_failures;
|
||||
mod support;
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
num::{NonZeroU32, NonZeroUsize},
|
||||
pin::Pin,
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime},
|
||||
collections::HashMap, fmt::Debug, num::NonZeroU32, pin::Pin, sync::Arc, time::SystemTime,
|
||||
};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use aws_sdk_s3::types::StorageClass;
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
|
||||
use bytes::Bytes;
|
||||
@@ -44,6 +38,8 @@ pub use self::{
|
||||
};
|
||||
use s3_bucket::RequestKind;
|
||||
|
||||
pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
|
||||
|
||||
/// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here.
|
||||
pub use azure_core::Etag;
|
||||
|
||||
@@ -525,168 +521,6 @@ impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
/// External backup storage configuration, enough for creating a client for that storage.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
|
||||
pub struct RemoteStorageConfig {
|
||||
/// The storage connection configuration.
|
||||
#[serde(flatten)]
|
||||
pub storage: RemoteStorageKind,
|
||||
/// A common timeout enforced for all requests after concurrency limiter permit has been
|
||||
/// acquired.
|
||||
#[serde(with = "humantime_serde", default = "default_timeout")]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
fn default_timeout() -> Duration {
|
||||
RemoteStorageConfig::DEFAULT_TIMEOUT
|
||||
}
|
||||
|
||||
/// A kind of a remote storage to connect to, with its connection configuration.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum RemoteStorageKind {
|
||||
/// Storage based on local file system.
|
||||
/// Specify a root folder to place all stored files into.
|
||||
LocalFs { local_path: Utf8PathBuf },
|
||||
/// AWS S3 based storage, storing all files in the S3 bucket
|
||||
/// specified by the config
|
||||
AwsS3(S3Config),
|
||||
/// Azure Blob based storage, storing all files in the container
|
||||
/// specified by the config
|
||||
AzureContainer(AzureConfig),
|
||||
}
|
||||
|
||||
/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
|
||||
#[derive(Clone, PartialEq, Eq, serde::Deserialize)]
|
||||
pub struct S3Config {
|
||||
/// Name of the bucket to connect to.
|
||||
pub bucket_name: String,
|
||||
/// The region where the bucket is located at.
|
||||
pub bucket_region: String,
|
||||
/// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
|
||||
pub prefix_in_bucket: Option<String>,
|
||||
/// A base URL to send S3 requests to.
|
||||
/// By default, the endpoint is derived from a region name, assuming it's
|
||||
/// an AWS S3 region name, erroring on wrong region name.
|
||||
/// Endpoint provides a way to support other S3 flavors and their regions.
|
||||
///
|
||||
/// Example: `http://127.0.0.1:5000`
|
||||
pub endpoint: Option<String>,
|
||||
/// AWS S3 has various limits on its API calls, we need not to exceed those.
|
||||
/// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
|
||||
#[serde(default = "default_remote_storage_s3_concurrency_limit")]
|
||||
pub concurrency_limit: NonZeroUsize,
|
||||
#[serde(default = "default_max_keys_per_list_response")]
|
||||
pub max_keys_per_list_response: Option<i32>,
|
||||
#[serde(deserialize_with = "deserialize_storage_class", default)]
|
||||
pub upload_storage_class: Option<StorageClass>,
|
||||
}
|
||||
|
||||
fn default_remote_storage_s3_concurrency_limit() -> NonZeroUsize {
|
||||
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
|
||||
.try_into()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn default_max_keys_per_list_response() -> Option<i32> {
|
||||
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
|
||||
}
|
||||
|
||||
impl Debug for S3Config {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("S3Config")
|
||||
.field("bucket_name", &self.bucket_name)
|
||||
.field("bucket_region", &self.bucket_region)
|
||||
.field("prefix_in_bucket", &self.prefix_in_bucket)
|
||||
.field("concurrency_limit", &self.concurrency_limit)
|
||||
.field(
|
||||
"max_keys_per_list_response",
|
||||
&self.max_keys_per_list_response,
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
|
||||
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct AzureConfig {
|
||||
/// Name of the container to connect to.
|
||||
pub container_name: String,
|
||||
/// Name of the storage account the container is inside of
|
||||
pub storage_account: Option<String>,
|
||||
/// The region where the bucket is located at.
|
||||
pub container_region: String,
|
||||
/// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
|
||||
pub prefix_in_container: Option<String>,
|
||||
/// Azure has various limits on its API calls, we need not to exceed those.
|
||||
/// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
|
||||
#[serde(default = "default_remote_storage_azure_concurrency_limit")]
|
||||
pub concurrency_limit: NonZeroUsize,
|
||||
#[serde(default = "default_max_keys_per_list_response")]
|
||||
pub max_keys_per_list_response: Option<i32>,
|
||||
}
|
||||
|
||||
fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
|
||||
NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap()
|
||||
}
|
||||
|
||||
impl Debug for AzureConfig {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("AzureConfig")
|
||||
.field("bucket_name", &self.container_name)
|
||||
.field("storage_account", &self.storage_account)
|
||||
.field("bucket_region", &self.container_region)
|
||||
.field("prefix_in_container", &self.prefix_in_container)
|
||||
.field("concurrency_limit", &self.concurrency_limit)
|
||||
.field(
|
||||
"max_keys_per_list_response",
|
||||
&self.max_keys_per_list_response,
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize_storage_class<'de, D: serde::Deserializer<'de>>(
|
||||
deserializer: D,
|
||||
) -> Result<Option<StorageClass>, D::Error> {
|
||||
Option::<String>::deserialize(deserializer).and_then(|s| {
|
||||
if let Some(s) = s {
|
||||
use serde::de::Error;
|
||||
let storage_class = StorageClass::from_str(&s).expect("infallible");
|
||||
#[allow(deprecated)]
|
||||
if matches!(storage_class, StorageClass::Unknown(_)) {
|
||||
return Err(D::Error::custom(format!(
|
||||
"Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}",
|
||||
StorageClass::values()
|
||||
)));
|
||||
}
|
||||
Ok(Some(storage_class))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
impl RemoteStorageConfig {
|
||||
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
|
||||
|
||||
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
|
||||
let document: toml_edit::Document = match toml {
|
||||
toml_edit::Item::Table(toml) => toml.clone().into(),
|
||||
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
|
||||
toml.clone().into_table().into()
|
||||
}
|
||||
_ => bail!("toml not a table or inline table"),
|
||||
};
|
||||
|
||||
if document.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(Some(toml_edit::de::from_document(document)?))
|
||||
}
|
||||
}
|
||||
|
||||
struct ConcurrencyLimiter {
|
||||
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
|
||||
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
|
||||
@@ -733,11 +567,6 @@ impl ConcurrencyLimiter {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn parse(input: &str) -> anyhow::Result<Option<RemoteStorageConfig>> {
|
||||
let toml = input.parse::<toml_edit::Document>().unwrap();
|
||||
RemoteStorageConfig::from_toml(toml.as_item())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_object_name() {
|
||||
let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
|
||||
@@ -759,77 +588,4 @@ mod tests {
|
||||
let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
|
||||
assert_eq!(err.to_string(), "Path \"/\" is not relative");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_localfs_config_with_timeout() {
|
||||
let input = "local_path = '.'
|
||||
timeout = '5s'";
|
||||
|
||||
let config = parse(input).unwrap().expect("it exists");
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs {
|
||||
local_path: Utf8PathBuf::from(".")
|
||||
},
|
||||
timeout: Duration::from_secs(5)
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_s3_parsing() {
|
||||
let toml = "\
|
||||
bucket_name = 'foo-bar'
|
||||
bucket_region = 'eu-central-1'
|
||||
upload_storage_class = 'INTELLIGENT_TIERING'
|
||||
timeout = '7s'
|
||||
";
|
||||
|
||||
let config = parse(toml).unwrap().expect("it exists");
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::AwsS3(S3Config {
|
||||
bucket_name: "foo-bar".into(),
|
||||
bucket_region: "eu-central-1".into(),
|
||||
prefix_in_bucket: None,
|
||||
endpoint: None,
|
||||
concurrency_limit: default_remote_storage_s3_concurrency_limit(),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
upload_storage_class: Some(StorageClass::IntelligentTiering),
|
||||
}),
|
||||
timeout: Duration::from_secs(7)
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_azure_parsing() {
|
||||
let toml = "\
|
||||
container_name = 'foo-bar'
|
||||
container_region = 'westeurope'
|
||||
upload_storage_class = 'INTELLIGENT_TIERING'
|
||||
timeout = '7s'
|
||||
";
|
||||
|
||||
let config = parse(toml).unwrap().expect("it exists");
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::AzureContainer(AzureConfig {
|
||||
container_name: "foo-bar".into(),
|
||||
storage_account: None,
|
||||
container_region: "westeurope".into(),
|
||||
prefix_in_container: None,
|
||||
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
}),
|
||||
timeout: Duration::from_secs(7)
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,12 +46,12 @@ use utils::backoff;
|
||||
|
||||
use super::StorageMetadata;
|
||||
use crate::{
|
||||
config::S3Config,
|
||||
error::Cancelled,
|
||||
metrics::{start_counting_cancelled_wait, start_measuring_requests},
|
||||
support::PermitCarrying,
|
||||
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
|
||||
S3Config, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
|
||||
REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
use crate::metrics::AttemptOutcome;
|
||||
|
||||
@@ -83,10 +83,18 @@ fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
|
||||
let keys: Vec<&str> = split[0].split('-').collect();
|
||||
let mut lsns: Vec<&str> = split[1].split('-').collect();
|
||||
|
||||
// The current format of the layer file name: 000000067F0000000400000B150100000000-000000067F0000000400000D350100000000__00000000014B7AC8-v1-00000001
|
||||
|
||||
// Handle generation number `-00000001` part
|
||||
if lsns.last().expect("should").len() == 8 {
|
||||
lsns.pop();
|
||||
}
|
||||
|
||||
// Handle version number `-v1` part
|
||||
if lsns.last().expect("should").starts_with('v') {
|
||||
lsns.pop();
|
||||
}
|
||||
|
||||
if lsns.len() == 1 {
|
||||
lsns.push(lsns[0]);
|
||||
}
|
||||
|
||||
@@ -33,9 +33,7 @@ use utils::{
|
||||
use crate::tenant::timeline::GetVectoredImpl;
|
||||
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
|
||||
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
|
||||
use crate::tenant::{
|
||||
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
|
||||
};
|
||||
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
use crate::{disk_usage_eviction_task::DiskUsageEvictionTaskConfig, virtual_file::io_engine};
|
||||
use crate::{tenant::config::TenantConf, virtual_file};
|
||||
use crate::{
|
||||
@@ -855,14 +853,6 @@ impl PageServerConf {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_deleted_mark_file_path(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
) -> Utf8PathBuf {
|
||||
self.tenant_path(tenant_shard_id)
|
||||
.join(TENANT_DELETED_MARKER_FILE_NAME)
|
||||
}
|
||||
|
||||
pub fn traces_path(&self) -> Utf8PathBuf {
|
||||
self.workdir.join("traces")
|
||||
}
|
||||
|
||||
@@ -236,6 +236,13 @@ paths:
|
||||
type: string
|
||||
format: date-time
|
||||
description: A timestamp to get the LSN
|
||||
- name: with_lease
|
||||
in: query
|
||||
required: false
|
||||
schema:
|
||||
type: boolean
|
||||
description: Whether to grant a lease to the corresponding LSN. Default to false.
|
||||
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
@@ -1029,6 +1036,10 @@ components:
|
||||
kind:
|
||||
type: string
|
||||
enum: [past, present, future, nodata]
|
||||
valid_until:
|
||||
type: string
|
||||
format: date-time
|
||||
description: The expiration time of the granted lease.
|
||||
|
||||
LsnLease:
|
||||
type: object
|
||||
|
||||
@@ -21,6 +21,7 @@ use pageserver_api::models::IngestAuxFilesRequest;
|
||||
use pageserver_api::models::ListAuxFilesRequest;
|
||||
use pageserver_api::models::LocationConfig;
|
||||
use pageserver_api::models::LocationConfigListResponse;
|
||||
use pageserver_api::models::LsnLease;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::models::TenantDetails;
|
||||
use pageserver_api::models::TenantLocationConfigResponse;
|
||||
@@ -329,14 +330,11 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
|
||||
fn from(value: crate::tenant::delete::DeleteTenantError) -> Self {
|
||||
use crate::tenant::delete::DeleteTenantError::*;
|
||||
impl From<crate::tenant::mgr::DeleteTenantError> for ApiError {
|
||||
fn from(value: crate::tenant::mgr::DeleteTenantError) -> Self {
|
||||
use crate::tenant::mgr::DeleteTenantError::*;
|
||||
match value {
|
||||
Get(g) => ApiError::from(g),
|
||||
Timeline(t) => ApiError::from(t),
|
||||
SlotError(e) => e.into(),
|
||||
SlotUpsertError(e) => e.into(),
|
||||
Other(o) => ApiError::InternalServerError(o),
|
||||
Cancelled => ApiError::ShuttingDown,
|
||||
}
|
||||
@@ -731,6 +729,8 @@ async fn get_lsn_by_timestamp_handler(
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
|
||||
|
||||
let with_lease = parse_query_param(&request, "with_lease")?.unwrap_or(false);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
|
||||
let timeline =
|
||||
@@ -739,10 +739,15 @@ async fn get_lsn_by_timestamp_handler(
|
||||
let result = timeline
|
||||
.find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
|
||||
.await?;
|
||||
|
||||
#[derive(serde::Serialize, Debug)]
|
||||
struct Result {
|
||||
lsn: Lsn,
|
||||
kind: &'static str,
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(flatten)]
|
||||
lease: Option<LsnLease>,
|
||||
}
|
||||
let (lsn, kind) = match result {
|
||||
LsnForTimestamp::Present(lsn) => (lsn, "present"),
|
||||
@@ -750,11 +755,28 @@ async fn get_lsn_by_timestamp_handler(
|
||||
LsnForTimestamp::Past(lsn) => (lsn, "past"),
|
||||
LsnForTimestamp::NoData(lsn) => (lsn, "nodata"),
|
||||
};
|
||||
let result = Result { lsn, kind };
|
||||
|
||||
let lease = if with_lease {
|
||||
timeline
|
||||
.make_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx)
|
||||
.inspect_err(|_| {
|
||||
warn!("fail to grant a lease to {}", lsn);
|
||||
})
|
||||
.ok()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let result = Result { lsn, kind, lease };
|
||||
let valid_until = result
|
||||
.lease
|
||||
.as_ref()
|
||||
.map(|l| humantime::format_rfc3339_millis(l.valid_until).to_string());
|
||||
tracing::info!(
|
||||
lsn=?result.lsn,
|
||||
kind=%result.kind,
|
||||
timestamp=%timestamp_raw,
|
||||
valid_until=?valid_until,
|
||||
"lsn_by_timestamp finished"
|
||||
);
|
||||
json_response(StatusCode::OK, result)
|
||||
|
||||
@@ -55,11 +55,9 @@ use self::config::AttachedLocationConfig;
|
||||
use self::config::AttachmentMode;
|
||||
use self::config::LocationConf;
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTenantFlow;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::mgr::GetActiveTenantError;
|
||||
use self::mgr::GetTenantError;
|
||||
use self::mgr::TenantsMap;
|
||||
use self::remote_timeline_client::upload::upload_index_part;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
use self::timeline::uninit::TimelineCreateGuard;
|
||||
@@ -137,7 +135,6 @@ pub mod remote_timeline_client;
|
||||
pub mod storage_layer;
|
||||
|
||||
pub mod config;
|
||||
pub mod delete;
|
||||
pub mod mgr;
|
||||
pub mod secondary;
|
||||
pub mod tasks;
|
||||
@@ -161,8 +158,6 @@ pub const TENANTS_SEGMENT_NAME: &str = "tenants";
|
||||
/// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
|
||||
pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
|
||||
|
||||
pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
|
||||
|
||||
/// References to shared objects that are passed into each tenant, such
|
||||
/// as the shared remote storage client and process initialization state.
|
||||
#[derive(Clone)]
|
||||
@@ -207,7 +202,6 @@ struct TimelinePreload {
|
||||
}
|
||||
|
||||
pub(crate) struct TenantPreload {
|
||||
deleting: bool,
|
||||
timelines: HashMap<TimelineId, TimelinePreload>,
|
||||
}
|
||||
|
||||
@@ -286,8 +280,6 @@ pub struct Tenant {
|
||||
/// background warmup.
|
||||
pub(crate) activate_now_sem: tokio::sync::Semaphore,
|
||||
|
||||
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
|
||||
|
||||
// Cancellation token fires when we have entered shutdown(). This is a parent of
|
||||
// Timelines' cancellation token.
|
||||
pub(crate) cancel: CancellationToken,
|
||||
@@ -654,7 +646,6 @@ impl Tenant {
|
||||
attached_conf: AttachedTenantConf,
|
||||
shard_identity: ShardIdentity,
|
||||
init_order: Option<InitializationOrder>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
mode: SpawnMode,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
@@ -828,52 +819,6 @@ impl Tenant {
|
||||
// Remote preload is complete.
|
||||
drop(remote_load_completion);
|
||||
|
||||
let pending_deletion = {
|
||||
match DeleteTenantFlow::should_resume_deletion(
|
||||
conf,
|
||||
preload.as_ref().map(|p| p.deleting).unwrap_or(false),
|
||||
&tenant_clone,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(should_resume_deletion) => should_resume_deletion,
|
||||
Err(err) => {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(err), BrokenVerbosity::Error);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
info!("pending_deletion {}", pending_deletion.is_some());
|
||||
|
||||
if let Some(deletion) = pending_deletion {
|
||||
// as we are no longer loading, signal completion by dropping
|
||||
// the completion while we resume deletion
|
||||
drop(_completion);
|
||||
let background_jobs_can_start =
|
||||
init_order.as_ref().map(|x| &x.background_jobs_can_start);
|
||||
if let Some(background) = background_jobs_can_start {
|
||||
info!("waiting for backgound jobs barrier");
|
||||
background.clone().wait().await;
|
||||
info!("ready for backgound jobs barrier");
|
||||
}
|
||||
|
||||
let deleted = DeleteTenantFlow::resume_from_attach(
|
||||
deletion,
|
||||
&tenant_clone,
|
||||
preload,
|
||||
tenants,
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Err(e) = deleted {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// We will time the duration of the attach phase unless this is a creation (attach will do no work)
|
||||
let attached = {
|
||||
let _attach_timer = match mode {
|
||||
@@ -931,21 +876,13 @@ impl Tenant {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let deleting = other_keys.contains(TENANT_DELETED_MARKER_FILE_NAME);
|
||||
info!(
|
||||
"found {} timelines, deleting={}",
|
||||
remote_timeline_ids.len(),
|
||||
deleting
|
||||
);
|
||||
info!("found {} timelines", remote_timeline_ids.len(),);
|
||||
|
||||
for k in other_keys {
|
||||
if k != TENANT_DELETED_MARKER_FILE_NAME {
|
||||
warn!("Unexpected non timeline key {k}");
|
||||
}
|
||||
warn!("Unexpected non timeline key {k}");
|
||||
}
|
||||
|
||||
Ok(TenantPreload {
|
||||
deleting,
|
||||
timelines: Self::load_timeline_metadata(
|
||||
self,
|
||||
remote_timeline_ids,
|
||||
@@ -974,7 +911,6 @@ impl Tenant {
|
||||
let preload = match (preload, mode) {
|
||||
(Some(p), _) => p,
|
||||
(None, SpawnMode::Create) => TenantPreload {
|
||||
deleting: false,
|
||||
timelines: HashMap::new(),
|
||||
},
|
||||
(None, _) => {
|
||||
@@ -2215,6 +2151,7 @@ impl Tenant {
|
||||
// Upload an index from the parent: this is partly to provide freshness for the
|
||||
// child tenants that will copy it, and partly for general ease-of-debugging: there will
|
||||
// always be a parent shard index in the same generation as we wrote the child shard index.
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index");
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_file_changes()?;
|
||||
@@ -2222,12 +2159,14 @@ impl Tenant {
|
||||
|
||||
// Shut down the timeline's remote client: this means that the indices we write
|
||||
// for child shards will not be invalidated by the parent shard deleting layers.
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Shutting down remote storage client");
|
||||
timeline.remote_client.shutdown().await;
|
||||
|
||||
// Download methods can still be used after shutdown, as they don't flow through the remote client's
|
||||
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this
|
||||
// operation is rare, so it's simpler to just download it (and robustly guarantees that the index
|
||||
// we use here really is the remotely persistent one).
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Downloading index_part from parent");
|
||||
let result = timeline.remote_client
|
||||
.download_index_file(&self.cancel)
|
||||
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
|
||||
@@ -2240,6 +2179,7 @@ impl Tenant {
|
||||
};
|
||||
|
||||
for child_shard in child_shards {
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index_part for child {}", child_shard.to_index());
|
||||
upload_index_part(
|
||||
&self.remote_storage,
|
||||
child_shard,
|
||||
@@ -2628,7 +2568,6 @@ impl Tenant {
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
activate_now_sem: tokio::sync::Semaphore::new(0),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||
cancel: CancellationToken::default(),
|
||||
gate: Gate::default(),
|
||||
timeline_get_throttle: Arc::new(throttle::Throttle::new(
|
||||
@@ -4068,6 +4007,7 @@ mod tests {
|
||||
use storage_layer::PersistentLayerKey;
|
||||
use tests::storage_layer::ValuesReconstructState;
|
||||
use tests::timeline::{GetVectoredError, ShutdownMode};
|
||||
use timeline::GcInfo;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::TenantId;
|
||||
|
||||
@@ -6745,49 +6685,48 @@ mod tests {
|
||||
|
||||
// img layer at 0x10
|
||||
let img_layer = (0..10)
|
||||
.map(|id| (get_key(id), test_img(&format!("value {id}@0x10"))))
|
||||
.map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10"))))
|
||||
.collect_vec();
|
||||
|
||||
let delta1 = vec![
|
||||
// TODO: we should test a real delta record here, which requires us to add a variant of NeonWalRecord for testing purpose.
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x20),
|
||||
Value::Image(test_img("value 1@0x20")),
|
||||
Value::Image(Bytes::from("value 1@0x20")),
|
||||
),
|
||||
(
|
||||
get_key(2),
|
||||
Lsn(0x30),
|
||||
Value::Image(test_img("value 2@0x30")),
|
||||
Value::Image(Bytes::from("value 2@0x30")),
|
||||
),
|
||||
(
|
||||
get_key(3),
|
||||
Lsn(0x40),
|
||||
Value::Image(test_img("value 3@0x40")),
|
||||
Value::Image(Bytes::from("value 3@0x40")),
|
||||
),
|
||||
];
|
||||
let delta2 = vec![
|
||||
(
|
||||
get_key(5),
|
||||
Lsn(0x20),
|
||||
Value::Image(test_img("value 5@0x20")),
|
||||
Value::Image(Bytes::from("value 5@0x20")),
|
||||
),
|
||||
(
|
||||
get_key(6),
|
||||
Lsn(0x20),
|
||||
Value::Image(test_img("value 6@0x20")),
|
||||
Value::Image(Bytes::from("value 6@0x20")),
|
||||
),
|
||||
];
|
||||
let delta3 = vec![
|
||||
(
|
||||
get_key(8),
|
||||
Lsn(0x40),
|
||||
Value::Image(test_img("value 8@0x40")),
|
||||
Value::Image(Bytes::from("value 8@0x40")),
|
||||
),
|
||||
(
|
||||
get_key(9),
|
||||
Lsn(0x40),
|
||||
Value::Image(test_img("value 9@0x40")),
|
||||
Value::Image(Bytes::from("value 9@0x40")),
|
||||
),
|
||||
];
|
||||
|
||||
@@ -6809,9 +6748,42 @@ mod tests {
|
||||
guard.cutoffs.horizon = Lsn(0x30);
|
||||
}
|
||||
|
||||
let expected_result = [
|
||||
Bytes::from_static(b"value 0@0x10"),
|
||||
Bytes::from_static(b"value 1@0x20"),
|
||||
Bytes::from_static(b"value 2@0x30"),
|
||||
Bytes::from_static(b"value 3@0x40"),
|
||||
Bytes::from_static(b"value 4@0x10"),
|
||||
Bytes::from_static(b"value 5@0x20"),
|
||||
Bytes::from_static(b"value 6@0x20"),
|
||||
Bytes::from_static(b"value 7@0x10"),
|
||||
Bytes::from_static(b"value 8@0x40"),
|
||||
Bytes::from_static(b"value 9@0x40"),
|
||||
];
|
||||
|
||||
for (idx, expected) in expected_result.iter().enumerate() {
|
||||
assert_eq!(
|
||||
tline
|
||||
.get(get_key(idx as u32), Lsn(0x50), &ctx)
|
||||
.await
|
||||
.unwrap(),
|
||||
expected
|
||||
);
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
|
||||
|
||||
for (idx, expected) in expected_result.iter().enumerate() {
|
||||
assert_eq!(
|
||||
tline
|
||||
.get(get_key(idx as u32), Lsn(0x50), &ctx)
|
||||
.await
|
||||
.unwrap(),
|
||||
expected
|
||||
);
|
||||
}
|
||||
|
||||
// Check if the image layer at the GC horizon contains exactly what we want
|
||||
let image_at_gc_horizon = tline
|
||||
.inspect_image_layers(Lsn(0x30), &ctx)
|
||||
@@ -6822,14 +6794,22 @@ mod tests {
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(image_at_gc_horizon.len(), 10);
|
||||
let expected_lsn = [0x10, 0x20, 0x30, 0x10, 0x10, 0x20, 0x20, 0x10, 0x10, 0x10];
|
||||
let expected_result = [
|
||||
Bytes::from_static(b"value 0@0x10"),
|
||||
Bytes::from_static(b"value 1@0x20"),
|
||||
Bytes::from_static(b"value 2@0x30"),
|
||||
Bytes::from_static(b"value 3@0x10"),
|
||||
Bytes::from_static(b"value 4@0x10"),
|
||||
Bytes::from_static(b"value 5@0x20"),
|
||||
Bytes::from_static(b"value 6@0x20"),
|
||||
Bytes::from_static(b"value 7@0x10"),
|
||||
Bytes::from_static(b"value 8@0x10"),
|
||||
Bytes::from_static(b"value 9@0x10"),
|
||||
];
|
||||
for idx in 0..10 {
|
||||
assert_eq!(
|
||||
image_at_gc_horizon[idx],
|
||||
(
|
||||
get_key(idx as u32),
|
||||
test_img(&format!("value {idx}@{:#x}", expected_lsn[idx]))
|
||||
)
|
||||
(get_key(idx as u32), expected_result[idx].clone())
|
||||
);
|
||||
}
|
||||
|
||||
@@ -6862,7 +6842,7 @@ mod tests {
|
||||
},
|
||||
// The delta layer that is cut in the middle
|
||||
PersistentLayerKey {
|
||||
key_range: Key::MIN..get_key(9),
|
||||
key_range: get_key(3)..get_key(4),
|
||||
lsn_range: Lsn(0x30)..Lsn(0x41),
|
||||
is_delta: true
|
||||
},
|
||||
@@ -6947,6 +6927,9 @@ mod tests {
|
||||
tline.get(get_key(2), Lsn(0x50), &ctx).await?,
|
||||
Bytes::from_static(b"0x10,0x20,0x30")
|
||||
);
|
||||
|
||||
// Need to remove the limit of "Neon WAL redo requires base image".
|
||||
|
||||
// assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new());
|
||||
// assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new());
|
||||
|
||||
@@ -7041,4 +7024,164 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple_bottom_most_compaction_deltas() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_simple_bottom_most_compaction_deltas")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
fn get_key(id: u32) -> Key {
|
||||
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
|
||||
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
key.field6 = id;
|
||||
key
|
||||
}
|
||||
|
||||
// We create one bottom-most image layer, a delta layer D1 crossing the GC horizon, D2 below the horizon, and D3 above the horizon.
|
||||
//
|
||||
// | D1 | | D3 |
|
||||
// -| |-- gc horizon -----------------
|
||||
// | | | D2 |
|
||||
// --------- img layer ------------------
|
||||
//
|
||||
// What we should expact from this compaction is:
|
||||
// | Part of D1 | | D3 |
|
||||
// --------- img layer with D1+D2 at GC horizon------------------
|
||||
|
||||
// img layer at 0x10
|
||||
let img_layer = (0..10)
|
||||
.map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10"))))
|
||||
.collect_vec();
|
||||
|
||||
let delta1 = vec![
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
|
||||
),
|
||||
(
|
||||
get_key(2),
|
||||
Lsn(0x30),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x30")),
|
||||
),
|
||||
(
|
||||
get_key(3),
|
||||
Lsn(0x40),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
|
||||
),
|
||||
];
|
||||
let delta2 = vec![
|
||||
(
|
||||
get_key(5),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
|
||||
),
|
||||
(
|
||||
get_key(6),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
|
||||
),
|
||||
];
|
||||
let delta3 = vec![
|
||||
(
|
||||
get_key(8),
|
||||
Lsn(0x40),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
|
||||
),
|
||||
(
|
||||
get_key(9),
|
||||
Lsn(0x40),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
|
||||
),
|
||||
];
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![delta1, delta2, delta3], // delta layers
|
||||
vec![(Lsn(0x10), img_layer)], // image layers
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
{
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
retain_lsns: vec![],
|
||||
cutoffs: GcCutoffs {
|
||||
pitr: Lsn(0x30),
|
||||
horizon: Lsn(0x30),
|
||||
},
|
||||
leases: Default::default(),
|
||||
};
|
||||
}
|
||||
|
||||
let expected_result = [
|
||||
Bytes::from_static(b"value 0@0x10"),
|
||||
Bytes::from_static(b"value 1@0x10@0x20"),
|
||||
Bytes::from_static(b"value 2@0x10@0x30"),
|
||||
Bytes::from_static(b"value 3@0x10@0x40"),
|
||||
Bytes::from_static(b"value 4@0x10"),
|
||||
Bytes::from_static(b"value 5@0x10@0x20"),
|
||||
Bytes::from_static(b"value 6@0x10@0x20"),
|
||||
Bytes::from_static(b"value 7@0x10"),
|
||||
Bytes::from_static(b"value 8@0x10@0x40"),
|
||||
Bytes::from_static(b"value 9@0x10@0x40"),
|
||||
];
|
||||
|
||||
let expected_result_at_gc_horizon = [
|
||||
Bytes::from_static(b"value 0@0x10"),
|
||||
Bytes::from_static(b"value 1@0x10@0x20"),
|
||||
Bytes::from_static(b"value 2@0x10@0x30"),
|
||||
Bytes::from_static(b"value 3@0x10"),
|
||||
Bytes::from_static(b"value 4@0x10"),
|
||||
Bytes::from_static(b"value 5@0x10@0x20"),
|
||||
Bytes::from_static(b"value 6@0x10@0x20"),
|
||||
Bytes::from_static(b"value 7@0x10"),
|
||||
Bytes::from_static(b"value 8@0x10"),
|
||||
Bytes::from_static(b"value 9@0x10"),
|
||||
];
|
||||
|
||||
for idx in 0..10 {
|
||||
assert_eq!(
|
||||
tline
|
||||
.get(get_key(idx as u32), Lsn(0x50), &ctx)
|
||||
.await
|
||||
.unwrap(),
|
||||
&expected_result[idx]
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get(get_key(idx as u32), Lsn(0x30), &ctx)
|
||||
.await
|
||||
.unwrap(),
|
||||
&expected_result_at_gc_horizon[idx]
|
||||
);
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
|
||||
|
||||
for idx in 0..10 {
|
||||
assert_eq!(
|
||||
tline
|
||||
.get(get_key(idx as u32), Lsn(0x50), &ctx)
|
||||
.await
|
||||
.unwrap(),
|
||||
&expected_result[idx]
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get(get_key(idx as u32), Lsn(0x30), &ctx)
|
||||
.await
|
||||
.unwrap(),
|
||||
&expected_result_at_gc_horizon[idx]
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,426 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::{models::TenantState, shard::TenantShardId};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, Instrument};
|
||||
|
||||
use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
context::RequestContext,
|
||||
task_mgr::{self},
|
||||
tenant::{
|
||||
mgr::{TenantSlot, TenantsMapRemoveResult},
|
||||
remote_timeline_client::remote_heatmap_path,
|
||||
},
|
||||
};
|
||||
|
||||
use super::{
|
||||
mgr::{GetTenantError, TenantSlotError, TenantSlotUpsertError, TenantsMap},
|
||||
remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
|
||||
timeline::delete::DeleteTimelineFlow,
|
||||
tree_sort_timelines, DeleteTimelineError, Tenant, TenantPreload,
|
||||
};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum DeleteTenantError {
|
||||
#[error("GetTenant {0}")]
|
||||
Get(#[from] GetTenantError),
|
||||
|
||||
#[error("Tenant map slot error {0}")]
|
||||
SlotError(#[from] TenantSlotError),
|
||||
|
||||
#[error("Tenant map slot upsert error {0}")]
|
||||
SlotUpsertError(#[from] TenantSlotUpsertError),
|
||||
|
||||
#[error("Timeline {0}")]
|
||||
Timeline(#[from] DeleteTimelineError),
|
||||
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
type DeletionGuard = tokio::sync::OwnedMutexGuard<DeleteTenantFlow>;
|
||||
|
||||
fn remote_tenant_delete_mark_path(
|
||||
conf: &PageServerConf,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
) -> anyhow::Result<RemotePath> {
|
||||
let tenant_remote_path = conf
|
||||
.tenant_path(tenant_shard_id)
|
||||
.strip_prefix(&conf.workdir)
|
||||
.context("Failed to strip workdir prefix")
|
||||
.and_then(RemotePath::new)
|
||||
.context("tenant path")?;
|
||||
Ok(tenant_remote_path.join(Utf8Path::new("timelines/deleted")))
|
||||
}
|
||||
|
||||
async fn schedule_ordered_timeline_deletions(
|
||||
tenant: &Arc<Tenant>,
|
||||
) -> Result<Vec<(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>, TimelineId)>, DeleteTenantError> {
|
||||
// Tenant is stopping at this point. We know it will be deleted.
|
||||
// No new timelines should be created.
|
||||
// Tree sort timelines to delete from leafs to the root.
|
||||
// NOTE: by calling clone we release the mutex which creates a possibility for a race: pending deletion
|
||||
// can complete and remove timeline from the map in between our call to clone
|
||||
// and `DeleteTimelineFlow::run`, so `run` wont find timeline in `timelines` map.
|
||||
// timelines.lock is currently synchronous so we cant hold it across await point.
|
||||
// So just ignore NotFound error if we get it from `run`.
|
||||
// Beware: in case it becomes async and we try to hold it here, `run` also locks it, which can create a deadlock.
|
||||
let timelines = tenant.timelines.lock().unwrap().clone();
|
||||
let sorted =
|
||||
tree_sort_timelines(timelines, |t| t.get_ancestor_timeline_id()).context("tree sort")?;
|
||||
|
||||
let mut already_running_deletions = vec![];
|
||||
|
||||
for (timeline_id, _) in sorted.into_iter().rev() {
|
||||
let span = tracing::info_span!("timeline_delete", %timeline_id);
|
||||
let res = DeleteTimelineFlow::run(tenant, timeline_id, true)
|
||||
.instrument(span)
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
match e {
|
||||
DeleteTimelineError::NotFound => {
|
||||
// Timeline deletion finished after call to clone above but before call
|
||||
// to `DeleteTimelineFlow::run` and removed timeline from the map.
|
||||
continue;
|
||||
}
|
||||
DeleteTimelineError::AlreadyInProgress(guard) => {
|
||||
already_running_deletions.push((guard, timeline_id));
|
||||
continue;
|
||||
}
|
||||
e => return Err(DeleteTenantError::Timeline(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(already_running_deletions)
|
||||
}
|
||||
|
||||
async fn ensure_timelines_dir_empty(timelines_path: &Utf8Path) -> Result<(), DeleteTenantError> {
|
||||
// Assert timelines dir is empty.
|
||||
if !fs_ext::is_directory_empty(timelines_path).await? {
|
||||
// Display first 10 items in directory
|
||||
let list = fs_ext::list_dir(timelines_path).await.context("list_dir")?;
|
||||
let list = &list.into_iter().take(10).collect::<Vec<_>>();
|
||||
return Err(DeleteTenantError::Other(anyhow::anyhow!(
|
||||
"Timelines directory is not empty after all timelines deletion: {list:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_tenant_remote_delete_mark(
|
||||
conf: &PageServerConf,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?;
|
||||
backoff::retry(
|
||||
|| async { remote_storage.delete(&path, cancel).await },
|
||||
TimeoutOrCancel::caused_by_cancel,
|
||||
FAILED_UPLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"remove_tenant_remote_delete_mark",
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
|
||||
.and_then(|x| x)
|
||||
.context("remove_tenant_remote_delete_mark")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Cleanup fs traces: tenant config, timelines dir local delete mark, tenant dir
|
||||
async fn cleanup_remaining_fs_traces(
|
||||
conf: &PageServerConf,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let rm = |p: Utf8PathBuf, is_dir: bool| async move {
|
||||
if is_dir {
|
||||
tokio::fs::remove_dir(&p).await
|
||||
} else {
|
||||
tokio::fs::remove_file(&p).await
|
||||
}
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.with_context(|| format!("failed to delete {p}"))
|
||||
};
|
||||
|
||||
rm(conf.tenant_config_path(tenant_shard_id), false).await?;
|
||||
rm(conf.tenant_location_config_path(tenant_shard_id), false).await?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-remove-timelines-dir", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-remove-timelines-dir"
|
||||
))?
|
||||
});
|
||||
|
||||
rm(conf.timelines_path(tenant_shard_id), true).await?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-remove-deleted-mark", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-remove-deleted-mark"
|
||||
))?
|
||||
});
|
||||
|
||||
// Make sure previous deletions are ordered before mark removal.
|
||||
// Otherwise there is no guarantee that they reach the disk before mark deletion.
|
||||
// So its possible for mark to reach disk first and for other deletions
|
||||
// to be reordered later and thus missed if a crash occurs.
|
||||
// Note that we dont need to sync after mark file is removed
|
||||
// because we can tolerate the case when mark file reappears on startup.
|
||||
let tenant_path = &conf.tenant_path(tenant_shard_id);
|
||||
if tenant_path.exists() {
|
||||
crashsafe::fsync_async(&conf.tenant_path(tenant_shard_id))
|
||||
.await
|
||||
.context("fsync_pre_mark_remove")?;
|
||||
}
|
||||
|
||||
rm(conf.tenant_deleted_mark_file_path(tenant_shard_id), false).await?;
|
||||
|
||||
rm(conf.tenant_heatmap_path(tenant_shard_id), false).await?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-remove-tenant-dir", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-remove-tenant-dir"
|
||||
))?
|
||||
});
|
||||
|
||||
rm(conf.tenant_path(tenant_shard_id), true).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub enum DeleteTenantFlow {
|
||||
#[default]
|
||||
NotStarted,
|
||||
InProgress,
|
||||
Finished,
|
||||
}
|
||||
|
||||
impl DeleteTenantFlow {
|
||||
pub(crate) async fn should_resume_deletion(
|
||||
conf: &'static PageServerConf,
|
||||
remote_mark_exists: bool,
|
||||
tenant: &Tenant,
|
||||
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
|
||||
let acquire = |t: &Tenant| {
|
||||
Some(
|
||||
Arc::clone(&t.delete_progress)
|
||||
.try_lock_owned()
|
||||
.expect("we're the only owner during init"),
|
||||
)
|
||||
};
|
||||
|
||||
if remote_mark_exists {
|
||||
return Ok(acquire(tenant));
|
||||
}
|
||||
|
||||
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
|
||||
if conf
|
||||
.tenant_deleted_mark_file_path(&tenant.tenant_shard_id)
|
||||
.exists()
|
||||
{
|
||||
Ok(acquire(tenant))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn resume_from_attach(
|
||||
guard: DeletionGuard,
|
||||
tenant: &Arc<Tenant>,
|
||||
preload: Option<TenantPreload>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let (_, progress) = completion::channel();
|
||||
|
||||
tenant
|
||||
.set_stopping(progress, false, true)
|
||||
.await
|
||||
.expect("cant be stopping or broken");
|
||||
|
||||
tenant
|
||||
.attach(preload, super::SpawnMode::Eager, ctx)
|
||||
.await
|
||||
.context("attach")?;
|
||||
|
||||
Self::background(
|
||||
guard,
|
||||
tenant.conf,
|
||||
tenant.remote_storage.clone(),
|
||||
tenants,
|
||||
tenant,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn background(
|
||||
mut guard: OwnedMutexGuard<Self>,
|
||||
conf: &PageServerConf,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
tenant: &Arc<Tenant>,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
// Tree sort timelines, schedule delete for them. Mention retries from the console side.
|
||||
// Note that if deletion fails we dont mark timelines as broken,
|
||||
// the whole tenant will become broken as by `Self::schedule_background` logic
|
||||
let already_running_timeline_deletions = schedule_ordered_timeline_deletions(tenant)
|
||||
.await
|
||||
.context("schedule_ordered_timeline_deletions")?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-polling-ongoing-deletions", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-polling-ongoing-deletions"
|
||||
))?
|
||||
});
|
||||
|
||||
// Wait for deletions that were already running at the moment when tenant deletion was requested.
|
||||
// When we can lock deletion guard it means that corresponding timeline deletion finished.
|
||||
for (guard, timeline_id) in already_running_timeline_deletions {
|
||||
let flow = guard.lock().await;
|
||||
if !flow.is_finished() {
|
||||
return Err(DeleteTenantError::Other(anyhow::anyhow!(
|
||||
"already running timeline deletion failed: {timeline_id}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
// Remove top-level tenant objects that don't belong to a timeline, such as heatmap
|
||||
let heatmap_path = remote_heatmap_path(&tenant.tenant_shard_id());
|
||||
if let Some(Err(e)) = backoff::retry(
|
||||
|| async {
|
||||
remote_storage
|
||||
.delete(&heatmap_path, &task_mgr::shutdown_token())
|
||||
.await
|
||||
},
|
||||
TimeoutOrCancel::caused_by_cancel,
|
||||
FAILED_UPLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"remove_remote_tenant_heatmap",
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!("Failed to delete heatmap at {heatmap_path}: {e}");
|
||||
}
|
||||
|
||||
let timelines_path = conf.timelines_path(&tenant.tenant_shard_id);
|
||||
// May not exist if we fail in cleanup_remaining_fs_traces after removing it
|
||||
if timelines_path.exists() {
|
||||
// sanity check to guard against layout changes
|
||||
ensure_timelines_dir_empty(&timelines_path)
|
||||
.await
|
||||
.context("timelines dir not empty")?;
|
||||
}
|
||||
|
||||
remove_tenant_remote_delete_mark(
|
||||
conf,
|
||||
&remote_storage,
|
||||
&tenant.tenant_shard_id,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
pausable_failpoint!("tenant-delete-before-cleanup-remaining-fs-traces-pausable");
|
||||
fail::fail_point!("tenant-delete-before-cleanup-remaining-fs-traces", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-cleanup-remaining-fs-traces"
|
||||
))?
|
||||
});
|
||||
|
||||
cleanup_remaining_fs_traces(conf, &tenant.tenant_shard_id)
|
||||
.await
|
||||
.context("cleanup_remaining_fs_traces")?;
|
||||
|
||||
{
|
||||
// This block is simply removing the TenantSlot for this tenant. It requires a loop because
|
||||
// we might conflict with a TenantSlot::InProgress marker and need to wait for it.
|
||||
//
|
||||
// This complexity will go away when we simplify how deletion works:
|
||||
// https://github.com/neondatabase/neon/issues/5080
|
||||
loop {
|
||||
// Under the TenantMap lock, try to remove the tenant. We usually succeed, but if
|
||||
// we encounter an InProgress marker, yield the barrier it contains and wait on it.
|
||||
let barrier = {
|
||||
let mut locked = tenants.write().unwrap();
|
||||
let removed = locked.remove(tenant.tenant_shard_id);
|
||||
|
||||
// FIXME: we should not be modifying this from outside of mgr.rs.
|
||||
// This will go away when we simplify deletion (https://github.com/neondatabase/neon/issues/5080)
|
||||
|
||||
// Update stats
|
||||
match &removed {
|
||||
TenantsMapRemoveResult::Occupied(slot) => {
|
||||
crate::metrics::TENANT_MANAGER.slot_removed(slot);
|
||||
}
|
||||
TenantsMapRemoveResult::InProgress(barrier) => {
|
||||
crate::metrics::TENANT_MANAGER
|
||||
.slot_removed(&TenantSlot::InProgress(barrier.clone()));
|
||||
}
|
||||
TenantsMapRemoveResult::Vacant => {
|
||||
// Nothing changed in map, no metric update
|
||||
}
|
||||
}
|
||||
|
||||
match removed {
|
||||
TenantsMapRemoveResult::Occupied(TenantSlot::Attached(tenant)) => {
|
||||
match tenant.current_state() {
|
||||
TenantState::Stopping { .. } | TenantState::Broken { .. } => {
|
||||
// Expected: we put the tenant into stopping state before we start deleting it
|
||||
}
|
||||
state => {
|
||||
// Unexpected state
|
||||
tracing::warn!(
|
||||
"Tenant in unexpected state {state} after deletion"
|
||||
);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
TenantsMapRemoveResult::Occupied(TenantSlot::Secondary(_)) => {
|
||||
// This is unexpected: this secondary tenants should not have been created, and we
|
||||
// are not in a position to shut it down from here.
|
||||
tracing::warn!("Tenant transitioned to secondary mode while deleting!");
|
||||
break;
|
||||
}
|
||||
TenantsMapRemoveResult::Occupied(TenantSlot::InProgress(_)) => {
|
||||
unreachable!("TenantsMap::remove handles InProgress separately, should never return it here");
|
||||
}
|
||||
TenantsMapRemoveResult::Vacant => {
|
||||
tracing::warn!(
|
||||
"Tenant removed from TenantsMap before deletion completed"
|
||||
);
|
||||
break;
|
||||
}
|
||||
TenantsMapRemoveResult::InProgress(barrier) => {
|
||||
// An InProgress entry was found, we must wait on its barrier
|
||||
barrier
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"Waiting for competing operation to complete before deleting state for tenant"
|
||||
);
|
||||
barrier.wait().await;
|
||||
}
|
||||
}
|
||||
|
||||
*guard = Self::Finished;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -51,7 +51,6 @@ use utils::fs_ext::PathExt;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::DeleteTenantError;
|
||||
use super::remote_timeline_client::remote_tenant_path;
|
||||
use super::secondary::SecondaryTenant;
|
||||
use super::timeline::detach_ancestor::PreparedTimelineDetach;
|
||||
@@ -109,12 +108,6 @@ pub(crate) enum TenantsMap {
|
||||
ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
|
||||
}
|
||||
|
||||
pub(crate) enum TenantsMapRemoveResult {
|
||||
Occupied(TenantSlot),
|
||||
Vacant,
|
||||
InProgress(utils::completion::Barrier),
|
||||
}
|
||||
|
||||
/// When resolving a TenantId to a shard, we may be looking for the 0th
|
||||
/// shard, or we might be looking for whichever shard holds a particular page.
|
||||
#[derive(Copy, Clone)]
|
||||
@@ -191,26 +184,6 @@ impl TenantsMap {
|
||||
}
|
||||
}
|
||||
|
||||
/// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map.
|
||||
///
|
||||
/// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
|
||||
/// slot if the enclosed tenant is shutdown.
|
||||
pub(crate) fn remove(&mut self, tenant_shard_id: TenantShardId) -> TenantsMapRemoveResult {
|
||||
use std::collections::btree_map::Entry;
|
||||
match self {
|
||||
TenantsMap::Initializing => TenantsMapRemoveResult::Vacant,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => match m.entry(tenant_shard_id) {
|
||||
Entry::Occupied(entry) => match entry.get() {
|
||||
TenantSlot::InProgress(barrier) => {
|
||||
TenantsMapRemoveResult::InProgress(barrier.clone())
|
||||
}
|
||||
_ => TenantsMapRemoveResult::Occupied(entry.remove()),
|
||||
},
|
||||
Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(debug_assertions, not(test)))]
|
||||
pub(crate) fn len(&self) -> usize {
|
||||
match self {
|
||||
@@ -460,6 +433,18 @@ async fn init_load_tenant_configs(
|
||||
Ok(configs)
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum DeleteTenantError {
|
||||
#[error("Tenant map slot error {0}")]
|
||||
SlotError(#[from] TenantSlotError),
|
||||
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
/// Initialize repositories with locally available timelines.
|
||||
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
|
||||
/// are scheduled for download and added to the tenant once download is completed.
|
||||
@@ -629,7 +614,6 @@ pub async fn init_tenant_mgr(
|
||||
AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
|
||||
shard_identity,
|
||||
Some(init_order.clone()),
|
||||
&TENANTS,
|
||||
SpawnMode::Lazy,
|
||||
&ctx,
|
||||
) {
|
||||
@@ -685,7 +669,6 @@ fn tenant_spawn(
|
||||
location_conf: AttachedTenantConf,
|
||||
shard_identity: ShardIdentity,
|
||||
init_order: Option<InitializationOrder>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
mode: SpawnMode,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
@@ -712,7 +695,6 @@ fn tenant_spawn(
|
||||
location_conf,
|
||||
shard_identity,
|
||||
init_order,
|
||||
tenants,
|
||||
mode,
|
||||
ctx,
|
||||
) {
|
||||
@@ -1161,7 +1143,6 @@ impl TenantManager {
|
||||
attached_conf,
|
||||
shard_identity,
|
||||
None,
|
||||
self.tenants,
|
||||
spawn_mode,
|
||||
ctx,
|
||||
)?;
|
||||
@@ -1283,7 +1264,6 @@ impl TenantManager {
|
||||
AttachedTenantConf::try_from(config)?,
|
||||
shard_identity,
|
||||
None,
|
||||
self.tenants,
|
||||
SpawnMode::Eager,
|
||||
ctx,
|
||||
)?;
|
||||
@@ -1634,7 +1614,7 @@ impl TenantManager {
|
||||
for child_shard_id in &child_shards {
|
||||
let child_shard_id = *child_shard_id;
|
||||
let child_shard = {
|
||||
let locked = TENANTS.read().unwrap();
|
||||
let locked = self.tenants.read().unwrap();
|
||||
let peek_slot =
|
||||
tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
|
||||
peek_slot.and_then(|s| s.get_attached()).cloned()
|
||||
@@ -1735,6 +1715,7 @@ impl TenantManager {
|
||||
let timelines = parent_shard.timelines.lock().unwrap().clone();
|
||||
let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
|
||||
for timeline in timelines.values() {
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Loading list of layers to hardlink");
|
||||
let timeline_layers = timeline
|
||||
.layers
|
||||
.read()
|
||||
@@ -1774,7 +1755,12 @@ impl TenantManager {
|
||||
|
||||
// Since we will do a large number of small filesystem metadata operations, batch them into
|
||||
// spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
|
||||
let span = tracing::Span::current();
|
||||
let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
|
||||
// Run this synchronous code in the same log context as the outer function that spawned it.
|
||||
let _span = span.enter();
|
||||
|
||||
tracing::info!("Creating {} directories", create_dirs.len());
|
||||
for dir in &create_dirs {
|
||||
if let Err(e) = std::fs::create_dir_all(dir) {
|
||||
// Ignore AlreadyExists errors, drop out on all other errors
|
||||
@@ -1788,6 +1774,11 @@ impl TenantManager {
|
||||
}
|
||||
|
||||
for child_prefix in child_prefixes {
|
||||
tracing::info!(
|
||||
"Hard-linking {} parent layers into child path {}",
|
||||
parent_layers.len(),
|
||||
child_prefix
|
||||
);
|
||||
for relative_layer in &parent_layers {
|
||||
let parent_path = parent_path.join(relative_layer);
|
||||
let child_path = child_prefix.join(relative_layer);
|
||||
@@ -1813,6 +1804,7 @@ impl TenantManager {
|
||||
// Durability is not required for correctness, but if we crashed during split and
|
||||
// then came restarted with empty timeline dirs, it would be very inefficient to
|
||||
// re-populate from remote storage.
|
||||
tracing::info!("fsyncing {} directories", create_dirs.len());
|
||||
for dir in create_dirs {
|
||||
if let Err(e) = crashsafe::fsync(&dir) {
|
||||
// Something removed a newly created timeline dir out from underneath us? Extremely
|
||||
@@ -1866,7 +1858,7 @@ impl TenantManager {
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<(), TenantStateError> {
|
||||
let tmp_path = self
|
||||
.detach_tenant0(conf, &TENANTS, tenant_shard_id, deletion_queue_client)
|
||||
.detach_tenant0(conf, tenant_shard_id, deletion_queue_client)
|
||||
.await?;
|
||||
spawn_background_purge(tmp_path);
|
||||
|
||||
@@ -1876,7 +1868,6 @@ impl TenantManager {
|
||||
async fn detach_tenant0(
|
||||
&self,
|
||||
conf: &'static PageServerConf,
|
||||
tenants: &std::sync::RwLock<TenantsMap>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<Utf8PathBuf, TenantStateError> {
|
||||
@@ -1890,7 +1881,7 @@ impl TenantManager {
|
||||
};
|
||||
|
||||
let removal_result = remove_tenant_from_memory(
|
||||
tenants,
|
||||
self.tenants,
|
||||
tenant_shard_id,
|
||||
tenant_dir_rename_operation(tenant_shard_id),
|
||||
)
|
||||
@@ -1906,7 +1897,7 @@ impl TenantManager {
|
||||
pub(crate) fn list_tenants(
|
||||
&self,
|
||||
) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().unwrap();
|
||||
let tenants = self.tenants.read().unwrap();
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
|
||||
@@ -2007,7 +1998,6 @@ impl TenantManager {
|
||||
AttachedTenantConf::try_from(config)?,
|
||||
shard_identity,
|
||||
None,
|
||||
self.tenants,
|
||||
SpawnMode::Eager,
|
||||
ctx,
|
||||
)?;
|
||||
|
||||
@@ -965,6 +965,8 @@ impl Timeline {
|
||||
_cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use crate::tenant::storage_layer::ValueReconstructState;
|
||||
// Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
|
||||
// The layer selection has the following properties:
|
||||
@@ -986,20 +988,30 @@ impl Timeline {
|
||||
(selected_layers, gc_cutoff)
|
||||
};
|
||||
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
|
||||
// Also, collect the layer information to decide when to split the new delta layers.
|
||||
let mut all_key_values = Vec::new();
|
||||
let mut delta_split_points = BTreeSet::new();
|
||||
for layer in &layer_selection {
|
||||
all_key_values.extend(layer.load_key_values(ctx).await?);
|
||||
let desc = layer.layer_desc();
|
||||
if desc.is_delta() {
|
||||
// TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
|
||||
// so that we can avoid having too many small delta layers.
|
||||
let key_range = desc.get_key_range();
|
||||
delta_split_points.insert(key_range.start);
|
||||
delta_split_points.insert(key_range.end);
|
||||
}
|
||||
}
|
||||
// Key small to large, LSN low to high, if the same LSN has both image and delta due to the merge of delta layers and
|
||||
// image layers, make image appear later than delta.
|
||||
// image layers, make image appear before than delta.
|
||||
struct ValueWrapper<'a>(&'a crate::repository::Value);
|
||||
impl Ord for ValueWrapper<'_> {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
use crate::repository::Value;
|
||||
use std::cmp::Ordering;
|
||||
match (self.0, other.0) {
|
||||
(Value::Image(_), Value::WalRecord(_)) => Ordering::Greater,
|
||||
(Value::WalRecord(_), Value::Image(_)) => Ordering::Less,
|
||||
(Value::Image(_), Value::WalRecord(_)) => Ordering::Less,
|
||||
(Value::WalRecord(_), Value::Image(_)) => Ordering::Greater,
|
||||
_ => Ordering::Equal,
|
||||
}
|
||||
}
|
||||
@@ -1018,13 +1030,6 @@ impl Timeline {
|
||||
all_key_values.sort_by(|(k1, l1, v1), (k2, l2, v2)| {
|
||||
(k1, l1, ValueWrapper(v1)).cmp(&(k2, l2, ValueWrapper(v2)))
|
||||
});
|
||||
let max_lsn = all_key_values
|
||||
.iter()
|
||||
.map(|(_, lsn, _)| lsn)
|
||||
.max()
|
||||
.copied()
|
||||
.unwrap()
|
||||
+ 1;
|
||||
// Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
|
||||
// Data of the same key.
|
||||
let mut accumulated_values = Vec::new();
|
||||
@@ -1043,7 +1048,19 @@ impl Timeline {
|
||||
// We have a list of deltas/images. We want to create image layers while collect garbages.
|
||||
for (key, lsn, val) in accumulated_values.iter().rev() {
|
||||
if *lsn > horizon {
|
||||
keys_above_horizon.push((*key, *lsn, val.clone())); // TODO: ensure one LSN corresponds to either delta or image instead of both
|
||||
if let Some((_, prev_lsn, _)) = keys_above_horizon.last_mut() {
|
||||
if *prev_lsn == *lsn {
|
||||
// The case that we have an LSN with both data from the delta layer and the image layer. As
|
||||
// `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
|
||||
// drop this delta and keep the image.
|
||||
//
|
||||
// For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
|
||||
// keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
|
||||
// dropped.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
keys_above_horizon.push((*key, *lsn, val.clone()));
|
||||
} else if *lsn <= horizon {
|
||||
match val {
|
||||
crate::repository::Value::Image(image) => {
|
||||
@@ -1068,15 +1085,59 @@ impl Timeline {
|
||||
Ok((keys_above_horizon, img))
|
||||
}
|
||||
|
||||
let mut delta_layer_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
all_key_values.first().unwrap().0,
|
||||
gc_cutoff..max_lsn, // TODO: off by one?
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
async fn flush_deltas(
|
||||
deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
|
||||
last_key: Key,
|
||||
delta_split_points: &[Key],
|
||||
current_delta_split_point: &mut usize,
|
||||
tline: &Arc<Timeline>,
|
||||
gc_cutoff: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Option<ResidentLayer>> {
|
||||
// Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
|
||||
// overlapping layers.
|
||||
//
|
||||
// If we have a structure like this:
|
||||
//
|
||||
// | Delta 1 | | Delta 4 |
|
||||
// |---------| Delta 2 |---------|
|
||||
// | Delta 3 | | Delta 5 |
|
||||
//
|
||||
// And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4.
|
||||
// A simple solution here is to split the delta layers using the original boundary, while this
|
||||
// might produce a lot of small layers. This should be improved and fixed in the future.
|
||||
let mut need_split = false;
|
||||
while *current_delta_split_point < delta_split_points.len()
|
||||
&& last_key >= delta_split_points[*current_delta_split_point]
|
||||
{
|
||||
*current_delta_split_point += 1;
|
||||
need_split = true;
|
||||
}
|
||||
if !need_split {
|
||||
return Ok(None);
|
||||
}
|
||||
let deltas = std::mem::take(deltas);
|
||||
if deltas.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
|
||||
let mut delta_layer_writer = DeltaLayerWriter::new(
|
||||
tline.conf,
|
||||
tline.timeline_id,
|
||||
tline.tenant_shard_id,
|
||||
deltas.first().unwrap().0,
|
||||
gc_cutoff..end_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let key_end = deltas.last().unwrap().0.next();
|
||||
for (key, lsn, val) in deltas {
|
||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||
}
|
||||
let delta_layer = delta_layer_writer.finish(key_end, tline, ctx).await?;
|
||||
Ok(Some(delta_layer))
|
||||
}
|
||||
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
@@ -1087,6 +1148,10 @@ impl Timeline {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut delta_values = Vec::new();
|
||||
let delta_split_points = delta_split_points.into_iter().collect_vec();
|
||||
let mut current_delta_split_point = 0;
|
||||
let mut delta_layers = Vec::new();
|
||||
for item @ (key, _, _) in &all_key_values {
|
||||
if &last_key == key {
|
||||
accumulated_values.push(item);
|
||||
@@ -1094,33 +1159,54 @@ impl Timeline {
|
||||
let (deltas, image) =
|
||||
flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff)
|
||||
.await?;
|
||||
// Put the image into the image layer. Currently we have a single big layer for the compaction.
|
||||
image_layer_writer.put_image(last_key, image, ctx).await?;
|
||||
for (key, lsn, val) in deltas {
|
||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||
}
|
||||
delta_values.extend(deltas);
|
||||
delta_layers.extend(
|
||||
flush_deltas(
|
||||
&mut delta_values,
|
||||
last_key,
|
||||
&delta_split_points,
|
||||
&mut current_delta_split_point,
|
||||
self,
|
||||
gc_cutoff,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
accumulated_values.clear();
|
||||
accumulated_values.push(item);
|
||||
last_key = *key;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: move this part to the loop body
|
||||
let (deltas, image) =
|
||||
flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?;
|
||||
// Put the image into the image layer. Currently we have a single big layer for the compaction.
|
||||
image_layer_writer.put_image(last_key, image, ctx).await?;
|
||||
for (key, lsn, val) in deltas {
|
||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||
}
|
||||
accumulated_values.clear();
|
||||
// TODO: split layers
|
||||
let delta_layer = delta_layer_writer.finish(last_key, self, ctx).await?;
|
||||
delta_values.extend(deltas);
|
||||
delta_layers.extend(
|
||||
flush_deltas(
|
||||
&mut delta_values,
|
||||
last_key,
|
||||
&delta_split_points,
|
||||
&mut current_delta_split_point,
|
||||
self,
|
||||
gc_cutoff,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
let image_layer = image_layer_writer.finish(self, ctx).await?;
|
||||
let mut compact_to = Vec::new();
|
||||
compact_to.extend(delta_layers);
|
||||
compact_to.push(image_layer);
|
||||
// Step 3: Place back to the layer map.
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
guard.finish_gc_compaction(
|
||||
&layer_selection,
|
||||
&[delta_layer.clone(), image_layer.clone()],
|
||||
&self.metrics,
|
||||
)
|
||||
guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -255,7 +255,6 @@ impl DeleteTimelineFlow {
|
||||
}
|
||||
|
||||
/// Shortcut to create Timeline in stopping state and spawn deletion task.
|
||||
/// See corresponding parts of [`crate::tenant::delete::DeleteTenantFlow`]
|
||||
#[instrument(skip_all, fields(%timeline_id))]
|
||||
pub async fn resume_deletion(
|
||||
tenant: Arc<Tenant>,
|
||||
@@ -420,10 +419,6 @@ impl DeleteTimelineFlow {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn is_finished(&self) -> bool {
|
||||
matches!(self, Self::Finished)
|
||||
}
|
||||
|
||||
pub(crate) fn is_not_started(&self) -> bool {
|
||||
matches!(self, Self::NotStarted)
|
||||
}
|
||||
|
||||
@@ -103,12 +103,8 @@ impl ConnCfg {
|
||||
|
||||
/// Reuse password or auth keys from the other config.
|
||||
pub fn reuse_password(&mut self, other: Self) {
|
||||
if let Some(password) = other.get_password() {
|
||||
self.password(password);
|
||||
}
|
||||
|
||||
if let Some(keys) = other.get_auth_keys() {
|
||||
self.auth_keys(keys);
|
||||
if let Some(password) = other.get_auth() {
|
||||
self.auth(password);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,48 +120,64 @@ impl ConnCfg {
|
||||
|
||||
/// Apply startup message params to the connection config.
|
||||
pub fn set_startup_params(&mut self, params: &StartupMessageParams) {
|
||||
// Only set `user` if it's not present in the config.
|
||||
// Link auth flow takes username from the console's response.
|
||||
if let (None, Some(user)) = (self.get_user(), params.get("user")) {
|
||||
self.user(user);
|
||||
}
|
||||
|
||||
// Only set `dbname` if it's not present in the config.
|
||||
// Link auth flow takes dbname from the console's response.
|
||||
if let (None, Some(dbname)) = (self.get_dbname(), params.get("database")) {
|
||||
self.dbname(dbname);
|
||||
}
|
||||
|
||||
// Don't add `options` if they were only used for specifying a project.
|
||||
// Connection pools don't support `options`, because they affect backend startup.
|
||||
if let Some(options) = filtered_options(params) {
|
||||
self.options(&options);
|
||||
}
|
||||
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
self.application_name(app_name);
|
||||
}
|
||||
|
||||
// TODO: This is especially ugly...
|
||||
if let Some(replication) = params.get("replication") {
|
||||
use tokio_postgres::config::ReplicationMode;
|
||||
match replication {
|
||||
"true" | "on" | "yes" | "1" => {
|
||||
self.replication_mode(ReplicationMode::Physical);
|
||||
let mut client_encoding = false;
|
||||
for (k, v) in params.iter() {
|
||||
match k {
|
||||
"user" => {
|
||||
// Only set `user` if it's not present in the config.
|
||||
// Link auth flow takes username from the console's response.
|
||||
if self.get_user().is_none() {
|
||||
self.user(v);
|
||||
}
|
||||
}
|
||||
"database" => {
|
||||
self.replication_mode(ReplicationMode::Logical);
|
||||
// Only set `dbname` if it's not present in the config.
|
||||
// Link auth flow takes dbname from the console's response.
|
||||
if self.get_dbname().is_none() {
|
||||
self.dbname(v);
|
||||
}
|
||||
}
|
||||
"options" => {
|
||||
// Don't add `options` if they were only used for specifying a project.
|
||||
// Connection pools don't support `options`, because they affect backend startup.
|
||||
if let Some(options) = filtered_options(v) {
|
||||
self.options(&options);
|
||||
}
|
||||
}
|
||||
|
||||
// the special ones in tokio-postgres that we don't want being set by the user
|
||||
"dbname" => {}
|
||||
"password" => {}
|
||||
"sslmode" => {}
|
||||
"host" => {}
|
||||
"port" => {}
|
||||
"connect_timeout" => {}
|
||||
"keepalives" => {}
|
||||
"keepalives_idle" => {}
|
||||
"keepalives_interval" => {}
|
||||
"keepalives_retries" => {}
|
||||
"target_session_attrs" => {}
|
||||
"channel_binding" => {}
|
||||
"max_backend_message_size" => {}
|
||||
|
||||
"client_encoding" => {
|
||||
client_encoding = true;
|
||||
// only error should be from bad null bytes,
|
||||
// but we've already checked for those.
|
||||
_ = self.param("client_encoding", v);
|
||||
}
|
||||
|
||||
_ => {
|
||||
// only error should be from bad null bytes,
|
||||
// but we've already checked for those.
|
||||
_ = self.param(k, v);
|
||||
}
|
||||
_other => {}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: extend the list of the forwarded startup parameters.
|
||||
// Currently, tokio-postgres doesn't allow us to pass
|
||||
// arbitrary parameters, but the ones above are a good start.
|
||||
//
|
||||
// This and the reverse params problem can be better addressed
|
||||
// in a bespoke connection machinery (a new library for that sake).
|
||||
if !client_encoding {
|
||||
// for compatibility since we removed it from tokio-postgres
|
||||
self.param("client_encoding", "UTF8").unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -338,10 +350,9 @@ impl ConnCfg {
|
||||
}
|
||||
|
||||
/// Retrieve `options` from a startup message, dropping all proxy-secific flags.
|
||||
fn filtered_options(params: &StartupMessageParams) -> Option<String> {
|
||||
fn filtered_options(options: &str) -> Option<String> {
|
||||
#[allow(unstable_name_collisions)]
|
||||
let options: String = params
|
||||
.options_raw()?
|
||||
let options: String = StartupMessageParams::parse_options_raw(options)
|
||||
.filter(|opt| parse_endpoint_param(opt).is_none() && neon_option(opt).is_none())
|
||||
.intersperse(" ") // TODO: use impl from std once it's stabilized
|
||||
.collect();
|
||||
@@ -413,27 +424,23 @@ mod tests {
|
||||
#[test]
|
||||
fn test_filtered_options() {
|
||||
// Empty options is unlikely to be useful anyway.
|
||||
let params = StartupMessageParams::new([("options", "")]);
|
||||
assert_eq!(filtered_options(¶ms), None);
|
||||
assert_eq!(filtered_options(""), None);
|
||||
|
||||
// It's likely that clients will only use options to specify endpoint/project.
|
||||
let params = StartupMessageParams::new([("options", "project=foo")]);
|
||||
assert_eq!(filtered_options(¶ms), None);
|
||||
let params = "project=foo";
|
||||
assert_eq!(filtered_options(params), None);
|
||||
|
||||
// Same, because unescaped whitespaces are no-op.
|
||||
let params = StartupMessageParams::new([("options", " project=foo ")]);
|
||||
assert_eq!(filtered_options(¶ms).as_deref(), None);
|
||||
let params = " project=foo ";
|
||||
assert_eq!(filtered_options(params), None);
|
||||
|
||||
let params = StartupMessageParams::new([("options", r"\ project=foo \ ")]);
|
||||
assert_eq!(filtered_options(¶ms).as_deref(), Some(r"\ \ "));
|
||||
let params = r"\ project=foo \ ";
|
||||
assert_eq!(filtered_options(params).as_deref(), Some(r"\ \ "));
|
||||
|
||||
let params = StartupMessageParams::new([("options", "project = foo")]);
|
||||
assert_eq!(filtered_options(¶ms).as_deref(), Some("project = foo"));
|
||||
let params = "project = foo";
|
||||
assert_eq!(filtered_options(params).as_deref(), Some("project = foo"));
|
||||
|
||||
let params = StartupMessageParams::new([(
|
||||
"options",
|
||||
"project = foo neon_endpoint_type:read_write neon_lsn:0/2",
|
||||
)]);
|
||||
assert_eq!(filtered_options(¶ms).as_deref(), Some("project = foo"));
|
||||
let params = "project = foo neon_endpoint_type:read_write neon_lsn:0/2";
|
||||
assert_eq!(filtered_options(params).as_deref(), Some("project = foo"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -231,6 +231,10 @@ impl ConnectMechanism for TokioMechanism {
|
||||
.dbname(&self.conn_info.dbname)
|
||||
.connect_timeout(timeout);
|
||||
|
||||
config
|
||||
.param("client_encoding", "UTF8")
|
||||
.expect("client encoding UTF8 is always valid");
|
||||
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
|
||||
let res = config.connect(tokio_postgres::NoTls).await;
|
||||
drop(pause);
|
||||
|
||||
@@ -202,6 +202,7 @@ fn get_conn_info(
|
||||
options = Some(NeonOptions::parse_options_raw(&value));
|
||||
}
|
||||
}
|
||||
ctx.set_db_options(params.freeze());
|
||||
|
||||
let user_info = ComputeUserInfo {
|
||||
endpoint,
|
||||
|
||||
@@ -231,11 +231,7 @@ impl PhysicalStorage {
|
||||
// half initialized segment, first bake it under tmp filename and
|
||||
// then rename.
|
||||
let tmp_path = self.timeline_dir.join("waltmp");
|
||||
#[allow(clippy::suspicious_open_options)]
|
||||
let mut file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.open(&tmp_path)
|
||||
let mut file = File::create(&tmp_path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?;
|
||||
|
||||
|
||||
@@ -619,13 +619,15 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
timeline_id: TimelineId,
|
||||
timestamp: datetime,
|
||||
with_lease: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
log.info(
|
||||
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
|
||||
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}, {with_lease=}"
|
||||
)
|
||||
with_lease_query = f"{with_lease=}".lower()
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp.isoformat()}Z",
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp.isoformat()}Z&{with_lease_query}",
|
||||
**kwargs,
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -48,7 +48,16 @@ def test_storage_controller_many_tenants(
|
||||
|
||||
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
|
||||
# of shards are hitting the delayed path.
|
||||
env.storage_controller.allowed_errors.append(".*Many shards are waiting to reconcile")
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
|
||||
# of shards are hitting the delayed path.
|
||||
".*Many shards are waiting to reconcile",
|
||||
# We will create many timelines concurrently, so they might get slow enough to trip the warning
|
||||
# that timeline creation is holding a lock too long.
|
||||
".*Shared lock by TimelineCreate.*was held.*",
|
||||
]
|
||||
)
|
||||
|
||||
for ps in env.pageservers:
|
||||
# This can happen because when we do a loop over all pageservers and mark them offline/active,
|
||||
|
||||
@@ -12,10 +12,24 @@ from fixtures.utils import query_scalar, wait_until
|
||||
from requests.exceptions import ReadTimeout
|
||||
|
||||
|
||||
#
|
||||
# Test pageserver get_lsn_by_timestamp API
|
||||
#
|
||||
def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
def assert_lsn_lease_granted(result, with_lease: bool):
|
||||
"""
|
||||
Asserts an LSN lease is granted when `with_lease` flag is turned on.
|
||||
Always asserts no LSN lease is granted when `with_lease` flag is off.
|
||||
"""
|
||||
if with_lease:
|
||||
assert result.get("valid_until")
|
||||
else:
|
||||
assert result.get("valid_until") is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("with_lease", [True, False])
|
||||
def test_lsn_mapping(neon_env_builder: NeonEnvBuilder, with_lease: bool):
|
||||
"""
|
||||
Test pageserver get_lsn_by_timestamp API.
|
||||
|
||||
:param with_lease: Whether to get a lease associated with returned LSN.
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
@@ -67,23 +81,33 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
# Check edge cases
|
||||
# Timestamp is in the future
|
||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, probe_timestamp, with_lease=with_lease
|
||||
)
|
||||
assert result["kind"] == "future"
|
||||
assert_lsn_lease_granted(result, with_lease)
|
||||
# make sure that we return a well advanced lsn here
|
||||
assert Lsn(result["lsn"]) > start_lsn
|
||||
|
||||
# Timestamp is in the unreachable past
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, probe_timestamp, with_lease=with_lease
|
||||
)
|
||||
assert result["kind"] == "past"
|
||||
assert_lsn_lease_granted(result, with_lease)
|
||||
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
assert Lsn(result["lsn"]) < start_lsn
|
||||
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
for i in range(1, len(tbl), 100):
|
||||
probe_timestamp = tbl[i][1]
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, probe_timestamp, with_lease=with_lease
|
||||
)
|
||||
assert result["kind"] not in ["past", "nodata"]
|
||||
assert_lsn_lease_granted(result, with_lease)
|
||||
lsn = result["lsn"]
|
||||
# Call get_lsn_by_timestamp to get the LSN
|
||||
# Launch a new read-only node at that LSN, and check that only the rows
|
||||
@@ -105,8 +129,11 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Timestamp is in the unreachable past
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id_child, probe_timestamp)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id_child, probe_timestamp, with_lease=with_lease
|
||||
)
|
||||
assert result["kind"] == "past"
|
||||
assert_lsn_lease_granted(result, with_lease)
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
assert Lsn(result["lsn"]) >= last_flush_lsn
|
||||
|
||||
|
||||
@@ -53,6 +53,25 @@ def test_proxy_select_1(static_proxy: NeonProxy):
|
||||
assert out[0][0] == 42
|
||||
|
||||
|
||||
def test_proxy_server_params(static_proxy: NeonProxy):
|
||||
"""
|
||||
Test that server params are passing through to postgres
|
||||
"""
|
||||
|
||||
out = static_proxy.safe_psql(
|
||||
"select to_json('0 seconds'::interval)", options="-c intervalstyle=iso_8601"
|
||||
)
|
||||
assert out[0][0] == "PT0S"
|
||||
out = static_proxy.safe_psql(
|
||||
"select to_json('0 seconds'::interval)", options="-c intervalstyle=sql_standard"
|
||||
)
|
||||
assert out[0][0] == "0"
|
||||
out = static_proxy.safe_psql(
|
||||
"select to_json('0 seconds'::interval)", options="-c intervalstyle=postgres"
|
||||
)
|
||||
assert out[0][0] == "00:00:00"
|
||||
|
||||
|
||||
def test_password_hack(static_proxy: NeonProxy):
|
||||
"""
|
||||
Check the PasswordHack auth flow: an alternative to SCRAM auth for
|
||||
|
||||
@@ -190,19 +190,20 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
|
||||
"""
|
||||
Test that after a split, we clean up parent layer data in the child shards via compaction.
|
||||
"""
|
||||
|
||||
TENANT_CONF = {
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": f"{128 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
"compaction_target_size": f"{128 * 1024}",
|
||||
"checkpoint_distance": 128 * 1024,
|
||||
"compaction_threshold": 1,
|
||||
"compaction_target_size": 128 * 1024,
|
||||
# no PITR horizon, we specify the horizon when we request on-demand GC
|
||||
"pitr_interval": "3600s",
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# create image layers eagerly, so that GC can remove some layers
|
||||
"image_creation_threshold": "1",
|
||||
"image_layer_creation_check_threshold": "0",
|
||||
"image_creation_threshold": 1,
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
}
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
@@ -261,7 +262,9 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
|
||||
env.pageserver.start()
|
||||
|
||||
# Cleanup part 2: once layers are outside the PITR window, they will be rewritten if they are partially redundant
|
||||
env.storage_controller.pageserver_api().set_tenant_config(tenant_id, {"pitr_interval": "0s"})
|
||||
updated_conf = TENANT_CONF.copy()
|
||||
updated_conf["pitr_interval"] = "0s"
|
||||
env.storage_controller.pageserver_api().set_tenant_config(tenant_id, updated_conf)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
for shard in shards:
|
||||
|
||||
@@ -31,8 +31,12 @@ def error_tolerant_delete(ps_http, tenant_id):
|
||||
if e.status_code == 500:
|
||||
# This test uses failure injection, which can produce 500s as the pageserver expects
|
||||
# the object store to always be available, and the ListObjects during deletion is generally
|
||||
# an infallible operation
|
||||
assert "simulated failure of remote operation" in e.message
|
||||
# an infallible operation. This can show up as a clear simulated error, or as a general
|
||||
# error during delete_objects()
|
||||
assert (
|
||||
"simulated failure of remote operation" in e.message
|
||||
or "failed to delete" in e.message
|
||||
)
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user