Compare commits

...

12 Commits

Author SHA1 Message Date
Vlad Lazar
24a3a549c1 metrics: update measured crate to 0.22 2024-06-25 12:58:45 +01:00
Yuchen Liang
219e78f885 feat(pageserver): add an optional lease to the get_lsn_by_timestamp API (#8104)
Part of #7497, closes #8072.

## Problem

Currently the `get_lsn_by_timestamp` and branch creation pageserver APIs do not provide a pleasant client experience where the looked-up LSN might be GC-ed between the two API calls.

This PR attempts to prevent common races between GC and branch creation by making use of LSN leases provided in #8084. A lease can be optionally granted to a looked-up LSN. With the lease, GC will not touch layers needed to reconstruct all pages at this LSN for the duration of the lease.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-24 20:12:24 +00:00
John Spray
1ea5d8b132 tests: accomodate some messages that can fail tests (#8144)
## Problem

- `test_storage_controller_many_tenants` can fail with warnings in the
storage controller about tenant creation holding a lock for too long,
because this test stresses the machine running the test with many
concurrent timeline creations
- `test_tenant_delete_smoke` can fail when synthetic remote storage
errors show up

## Summary of changes

- tolerate warnings about slow timeline creation in
test_storage_controller_many_tenants
- tolerate both possible errors during error_tolerant_delete
2024-06-24 17:03:53 +00:00
John Spray
3d760938e1 storcon_cli: remove old tenant-scatter command (#8127)
## Problem

This command was used in the very early days of sharding, before the
storage controller had anti-affinity + scheduling optimization to spread
out shards.

## Summary of changes

- Remove `storcon_cli tenant-scatter`
2024-06-24 12:57:16 -04:00
Alex Chi Z
9211de0df7 test(pageserver): add delta records tests for gc-compaction (#8078)
Part of https://github.com/neondatabase/neon/issues/8002

This pull request adds tests for bottom-most gc-compaction with delta
records. Also fixed a bug in the compaction process that creates
overlapping delta layers by force splitting at the original delta layer
boundary.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-06-24 15:50:31 +00:00
Alex Chi Z
d8ffe662a9 fix(pageserver): handle version number in draw timeline (#8102)
We now have a `vX` number in the file name, i.e.,
`000000067F0000000400000B150100000000-000000067F0000000400000D350100000000__00000000014B7AC8-v1-00000001`

The related pull request for new-style path was merged a month ago
https://github.com/neondatabase/neon/pull/7660

## Summary of changes

Fixed the draw timeline dir command to handle it.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-06-24 15:31:06 +00:00
Arthur Petukhovsky
a4db2af1f0 Truncate waltmp file on creation (#8133)
Previously in safekeeper code, new segment file was opened without
truncate option. I don't think there is a reason to do it, this commit
replaces it with `File::create` to make it simpler and remove
`clippy::suspicious_open_options` linter warning.
2024-06-24 14:07:59 +00:00
John Spray
47fdf93cf0 tests: fix a flake in test_sharding_split_compaction (#8136)
## Problem

This test could occasionally trigger a "removing local file ... because
it has unexpected length log" when using the
`compact-shard-ancestors-persistent` failpoint is in use, which is
unexpected because that failpoint stops the process when the remote
metadata is in sync with local files.

It was because there are two shards on the same pageserver, and while
the one being compacted explicitly stops at the failpoint, another shard
was compacting in the background and failing at an unclean point. The
test intends to disable background compaction, but was mistakenly
revoking the value of `compaction_period` when it updated
`pitr_interval`.

Example failure:

https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8123/9602976462/index.html#/testresult/7dd6165da7daef40

## Summary of changes

- Update `TENANT_CONF` in the test to use properly typed values, so that
it is usable in pageserver APIs as well as via neon_local.
- When updating tenant config with `pitr_interval`, retain the overrides
from the start of the test, so that there won't be any background
compaction going on during the test.
2024-06-24 14:54:54 +01:00
John Spray
de05f90735 pageserver: add more info-level logging in shard splits (#8137)
## Problem

`test_sharding_autosplit` is occasionally failing on warnings about
shard splits taking longer than expected (`Exclusive lock by ShardSplit
was held for`...)

It's not obvious which part is taking the time (I suspect remote storage
uploads).

Example:
https://neon-github-public-dev.s3.amazonaws.com/reports/main/9618788427/index.html#testresult/b395294d5bdeb783/

## Summary of changes

- Since shard splits are infrequent events, we can afford to be very
chatty: add a bunch of info-level logging throughout the process.
2024-06-24 11:53:43 +01:00
John Spray
188797f048 pageserver: remove code that resumes tenant deletions after restarts (#8091)
#8082 removed the legacy deletion path, but retained code for completing
deletions that were started before a pageserver restart. This PR cleans
up that remaining code, and removes all the pageserver code that dealt
with tenant deletion markers and resuming tenant deletions.

The release at https://github.com/neondatabase/neon/pull/8138 contains
https://github.com/neondatabase/neon/pull/8082, so we can now merge this
to `main`
2024-06-24 11:41:11 +01:00
Arpad Müller
5446e08891 Move remote_storage config related code into dedicated module (#8132)
Moves `RemoteStorageConfig` and related structs and functions into a
dedicated module. Also implements `Serialize` for the config structs
(requested in #8126).

Follow-up of #8126
2024-06-24 12:29:54 +02:00
Conrad Ludgate
78d9059fc7 proxy: update tokio-postgres to allow arbitrary config params (#8076)
## Problem

Fixes https://github.com/neondatabase/neon/issues/1287

## Summary of changes

tokio-postgres now supports arbitrary server params through the
`param(key, value)` method. Some keys are special so we explicitly
filter them out.
2024-06-24 10:20:27 +00:00
29 changed files with 910 additions and 1108 deletions

20
Cargo.lock generated
View File

@@ -2993,9 +2993,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "measured"
version = "0.0.21"
version = "0.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "652bc741286361c06de8cb4d89b21a6437f120c508c51713663589eeb9928ac5"
checksum = "3051f3a030d55d680cdef6ca50e80abd1182f8da29f2344a7c9cb575721138f0"
dependencies = [
"bytes",
"crossbeam-utils",
@@ -3011,9 +3011,9 @@ dependencies = [
[[package]]
name = "measured-derive"
version = "0.0.21"
version = "0.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ea497f33e1e856a376c32ad916f69a0bd3c597db1f912a399f842b01a4a685d"
checksum = "b9e6777fc80a575f9503d908c8b498782a6c3ee88a06cb416dc3941401e43b94"
dependencies = [
"heck 0.5.0",
"proc-macro2",
@@ -3023,9 +3023,9 @@ dependencies = [
[[package]]
name = "measured-process"
version = "0.0.21"
version = "0.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b364ccb66937a814b6b2ad751d1a2f7a9d5a78c761144036825fb36bb0771000"
checksum = "7c4b80445aeb08e832d87bf1830049a924cdc1d6b7ef40b6b9b365bff17bf8ec"
dependencies = [
"libc",
"measured",
@@ -4005,7 +4005,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#cff6927e4f58b1af6ecc2ee7279df1f2ff537295"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4018,7 +4018,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#cff6927e4f58b1af6ecc2ee7279df1f2ff537295"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -4037,7 +4037,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#cff6927e4f58b1af6ecc2ee7279df1f2ff537295"
dependencies = [
"bytes",
"fallible-iterator",
@@ -6210,7 +6210,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#cff6927e4f58b1af6ecc2ee7279df1f2ff537295"
dependencies = [
"async-trait",
"byteorder",

View File

@@ -111,8 +111,8 @@ lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
md5 = "0.7.0"
measured = { version = "0.0.21", features=["lasso"] }
measured-process = { version = "0.0.21" }
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
memoffset = "0.8"
nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] }
notify = "6.0.0"

View File

@@ -1,5 +1,5 @@
use futures::StreamExt;
use std::{collections::HashMap, str::FromStr, time::Duration};
use std::{str::FromStr, time::Duration};
use clap::{Parser, Subcommand};
use pageserver_api::{
@@ -21,7 +21,7 @@ use utils::id::{NodeId, TenantId};
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
#[derive(Subcommand, Debug)]
@@ -110,12 +110,6 @@ enum Command {
#[arg(long)]
config: String,
},
/// Attempt to balance the locations for a tenant across pageservers. This is a client-side
/// alternative to the storage controller's scheduling optimization behavior.
TenantScatter {
#[arg(long)]
tenant_id: TenantId,
},
/// Print details about a particular tenant, including all its shards' states.
TenantDescribe {
#[arg(long)]
@@ -498,88 +492,6 @@ async fn main() -> anyhow::Result<()> {
})
.await?;
}
Command::TenantScatter { tenant_id } => {
// Find the shards
let locate_response = storcon_client
.dispatch::<(), TenantLocateResponse>(
Method::GET,
format!("control/v1/tenant/{tenant_id}/locate"),
None,
)
.await?;
let shards = locate_response.shards;
let mut node_to_shards: HashMap<NodeId, Vec<TenantShardId>> = HashMap::new();
let shard_count = shards.len();
for s in shards {
let entry = node_to_shards.entry(s.node_id).or_default();
entry.push(s.shard_id);
}
// Load list of available nodes
let nodes_resp = storcon_client
.dispatch::<(), Vec<NodeDescribeResponse>>(
Method::GET,
"control/v1/node".to_string(),
None,
)
.await?;
for node in nodes_resp {
if matches!(node.availability, NodeAvailabilityWrapper::Active) {
node_to_shards.entry(node.id).or_default();
}
}
let max_shard_per_node = shard_count / node_to_shards.len();
loop {
let mut migrate_shard = None;
for shards in node_to_shards.values_mut() {
if shards.len() > max_shard_per_node {
// Pick the emptiest
migrate_shard = Some(shards.pop().unwrap());
}
}
let Some(migrate_shard) = migrate_shard else {
break;
};
// Pick the emptiest node to migrate to
let mut destinations = node_to_shards
.iter()
.map(|(k, v)| (k, v.len()))
.collect::<Vec<_>>();
destinations.sort_by_key(|i| i.1);
let (destination_node, destination_count) = *destinations.first().unwrap();
if destination_count + 1 > max_shard_per_node {
// Even the emptiest destination doesn't have space: we're done
break;
}
let destination_node = *destination_node;
node_to_shards
.get_mut(&destination_node)
.unwrap()
.push(migrate_shard);
println!("Migrate {} -> {} ...", migrate_shard, destination_node);
storcon_client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
Method::PUT,
format!("control/v1/tenant/{migrate_shard}/migrate"),
Some(TenantShardMigrateRequest {
tenant_shard_id: migrate_shard,
node_id: destination_node,
}),
)
.await?;
println!("Migrate {} -> {} OK", migrate_shard, destination_node);
}
// Spread the shards across the nodes
}
Command::TenantDescribe { tenant_id } => {
let describe_response = storcon_client
.dispatch::<(), TenantDescribeResponse>(

View File

@@ -13,11 +13,7 @@ use std::{
use measured::{
label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor},
metric::{
group::{Encoding, MetricValue},
name::MetricNameEncoder,
Metric, MetricType, MetricVec,
},
metric::{gauge::write_gauge, name::MetricNameEncoder, Metric, MetricType, MetricVec},
text::TextEncoder,
LabelGroup,
};
@@ -182,12 +178,13 @@ impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEnc
.into_iter()
.enumerate()
.try_for_each(|(hll_shard, val)| {
enc.write_metric_value(
write_gauge(
enc,
name.by_ref(),
labels.by_ref().compose_with(HllShardLabel {
hll_shard: hll_shard as i64,
}),
MetricValue::Int(val as i64),
val as i64,
)
})
}

View File

@@ -8,8 +8,8 @@ use measured::{
label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels},
metric::{
counter::CounterState,
gauge::GaugeState,
group::{Encoding, MetricValue},
gauge::{write_gauge, GaugeState},
group::Encoding,
name::{MetricName, MetricNameEncoder},
MetricEncoding, MetricFamilyEncoding,
},
@@ -165,15 +165,6 @@ pub struct LibMetrics {
serve_count: CollectionCounter,
}
fn write_gauge<Enc: Encoding>(
x: i64,
labels: impl LabelGroup,
name: impl MetricNameEncoder,
enc: &mut Enc,
) -> Result<(), Enc::Err> {
enc.write_metric_value(name, labels, MetricValue::Int(x))
}
#[derive(Default)]
struct Rusage;
@@ -199,12 +190,12 @@ where
"Bytes written and read from disk, grouped by the operation (read|write)",
)?;
GaugeState::write_type(DISK_IO, enc)?;
write_gauge(ru.ru_inblock * BYTES_IN_BLOCK, IoOp::Read, DISK_IO, enc)?;
write_gauge(ru.ru_oublock * BYTES_IN_BLOCK, IoOp::Write, DISK_IO, enc)?;
write_gauge(enc, DISK_IO, IoOp::Read, ru.ru_inblock * BYTES_IN_BLOCK)?;
write_gauge(enc, DISK_IO, IoOp::Write, ru.ru_oublock * BYTES_IN_BLOCK)?;
enc.write_help(MAXRSS, "Memory usage (Maximum Resident Set Size)")?;
GaugeState::write_type(MAXRSS, enc)?;
write_gauge(ru.ru_maxrss, IoOp::Read, MAXRSS, enc)?;
write_gauge(enc, MAXRSS, IoOp::Read, ru.ru_maxrss)?;
Ok(())
}
@@ -543,15 +534,6 @@ impl<T: Encoding> Encoding for Inc<T> {
fn write_help(&mut self, name: impl MetricNameEncoder, help: &str) -> Result<(), Self::Err> {
self.0.write_help(name, help)
}
fn write_metric_value(
&mut self,
name: impl MetricNameEncoder,
labels: impl LabelGroup,
value: MetricValue,
) -> Result<(), Self::Err> {
self.0.write_metric_value(name, labels, value)
}
}
impl<T: Encoding> MetricEncoding<Inc<T>> for MeasuredCounterPairState
@@ -578,15 +560,6 @@ impl<T: Encoding> Encoding for Dec<T> {
fn write_help(&mut self, name: impl MetricNameEncoder, help: &str) -> Result<(), Self::Err> {
self.0.write_help(name, help)
}
fn write_metric_value(
&mut self,
name: impl MetricNameEncoder,
labels: impl LabelGroup,
value: MetricValue,
) -> Result<(), Self::Err> {
self.0.write_metric_value(name, labels, value)
}
}
/// Write the dec counter to the encoder

View File

@@ -144,20 +144,7 @@ impl PgConnectionConfig {
// implement and this function is hardly a bottleneck. The function is only called around
// establishing a new connection.
#[allow(unstable_name_collisions)]
config.options(
&self
.options
.iter()
.map(|s| {
if s.contains(['\\', ' ']) {
Cow::Owned(s.replace('\\', "\\\\").replace(' ', "\\ "))
} else {
Cow::Borrowed(s.as_str())
}
})
.intersperse(Cow::Borrowed(" ")) // TODO: use impl from std once it's stabilized
.collect::<String>(),
);
config.options(&encode_options(&self.options));
}
config
}
@@ -178,6 +165,21 @@ impl PgConnectionConfig {
}
}
#[allow(unstable_name_collisions)]
fn encode_options(options: &[String]) -> String {
options
.iter()
.map(|s| {
if s.contains(['\\', ' ']) {
Cow::Owned(s.replace('\\', "\\\\").replace(' ', "\\ "))
} else {
Cow::Borrowed(s.as_str())
}
})
.intersperse(Cow::Borrowed(" ")) // TODO: use impl from std once it's stabilized
.collect::<String>()
}
impl fmt::Display for PgConnectionConfig {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// The password is intentionally hidden and not part of this display string.
@@ -206,7 +208,7 @@ impl fmt::Debug for PgConnectionConfig {
#[cfg(test)]
mod tests_pg_connection_config {
use crate::PgConnectionConfig;
use crate::{encode_options, PgConnectionConfig};
use once_cell::sync::Lazy;
use url::Host;
@@ -255,18 +257,12 @@ mod tests_pg_connection_config {
#[test]
fn test_with_options() {
let cfg = PgConnectionConfig::new_host_port(STUB_HOST.clone(), 123).extend_options([
"hello",
"world",
"with space",
"and \\ backslashes",
let options = encode_options(&[
"hello".to_owned(),
"world".to_owned(),
"with space".to_owned(),
"and \\ backslashes".to_owned(),
]);
assert_eq!(cfg.host(), &*STUB_HOST);
assert_eq!(cfg.port(), 123);
assert_eq!(cfg.raw_address(), "stub.host.example:123");
assert_eq!(
cfg.to_tokio_postgres_config().get_options(),
Some("hello world with\\ space and\\ \\\\\\ backslashes")
);
assert_eq!(options, "hello world with\\ space and\\ \\\\\\ backslashes");
}
}

View File

@@ -34,7 +34,7 @@ use utils::backoff;
use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
use crate::{
error::Cancelled, AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing,
config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError, Listing,
ListingMode, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
};

View File

@@ -0,0 +1,277 @@
use std::{fmt::Debug, num::NonZeroUsize, str::FromStr, time::Duration};
use anyhow::bail;
use aws_sdk_s3::types::StorageClass;
use camino::Utf8PathBuf;
use serde::{Deserialize, Serialize};
use crate::{
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT,
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
};
/// External backup storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub struct RemoteStorageConfig {
/// The storage connection configuration.
#[serde(flatten)]
pub storage: RemoteStorageKind,
/// A common timeout enforced for all requests after concurrency limiter permit has been
/// acquired.
#[serde(
with = "humantime_serde",
default = "default_timeout",
skip_serializing_if = "is_default_timeout"
)]
pub timeout: Duration,
}
fn default_timeout() -> Duration {
RemoteStorageConfig::DEFAULT_TIMEOUT
}
fn is_default_timeout(d: &Duration) -> bool {
*d == RemoteStorageConfig::DEFAULT_TIMEOUT
}
/// A kind of a remote storage to connect to, with its connection configuration.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(untagged)]
pub enum RemoteStorageKind {
/// Storage based on local file system.
/// Specify a root folder to place all stored files into.
LocalFs { local_path: Utf8PathBuf },
/// AWS S3 based storage, storing all files in the S3 bucket
/// specified by the config
AwsS3(S3Config),
/// Azure Blob based storage, storing all files in the container
/// specified by the config
AzureContainer(AzureConfig),
}
/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
#[derive(Clone, PartialEq, Eq, Deserialize, Serialize)]
pub struct S3Config {
/// Name of the bucket to connect to.
pub bucket_name: String,
/// The region where the bucket is located at.
pub bucket_region: String,
/// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
pub prefix_in_bucket: Option<String>,
/// A base URL to send S3 requests to.
/// By default, the endpoint is derived from a region name, assuming it's
/// an AWS S3 region name, erroring on wrong region name.
/// Endpoint provides a way to support other S3 flavors and their regions.
///
/// Example: `http://127.0.0.1:5000`
pub endpoint: Option<String>,
/// AWS S3 has various limits on its API calls, we need not to exceed those.
/// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
#[serde(default = "default_remote_storage_s3_concurrency_limit")]
pub concurrency_limit: NonZeroUsize,
#[serde(default = "default_max_keys_per_list_response")]
pub max_keys_per_list_response: Option<i32>,
#[serde(
deserialize_with = "deserialize_storage_class",
serialize_with = "serialize_storage_class",
default
)]
pub upload_storage_class: Option<StorageClass>,
}
fn default_remote_storage_s3_concurrency_limit() -> NonZeroUsize {
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
.try_into()
.unwrap()
}
fn default_max_keys_per_list_response() -> Option<i32> {
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
}
impl Debug for S3Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("S3Config")
.field("bucket_name", &self.bucket_name)
.field("bucket_region", &self.bucket_region)
.field("prefix_in_bucket", &self.prefix_in_bucket)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
.finish()
}
}
/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AzureConfig {
/// Name of the container to connect to.
pub container_name: String,
/// Name of the storage account the container is inside of
pub storage_account: Option<String>,
/// The region where the bucket is located at.
pub container_region: String,
/// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
pub prefix_in_container: Option<String>,
/// Azure has various limits on its API calls, we need not to exceed those.
/// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
#[serde(default = "default_remote_storage_azure_concurrency_limit")]
pub concurrency_limit: NonZeroUsize,
#[serde(default = "default_max_keys_per_list_response")]
pub max_keys_per_list_response: Option<i32>,
}
fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap()
}
impl Debug for AzureConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AzureConfig")
.field("bucket_name", &self.container_name)
.field("storage_account", &self.storage_account)
.field("bucket_region", &self.container_region)
.field("prefix_in_container", &self.prefix_in_container)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
.finish()
}
}
fn deserialize_storage_class<'de, D: serde::Deserializer<'de>>(
deserializer: D,
) -> Result<Option<StorageClass>, D::Error> {
Option::<String>::deserialize(deserializer).and_then(|s| {
if let Some(s) = s {
use serde::de::Error;
let storage_class = StorageClass::from_str(&s).expect("infallible");
#[allow(deprecated)]
if matches!(storage_class, StorageClass::Unknown(_)) {
return Err(D::Error::custom(format!(
"Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}",
StorageClass::values()
)));
}
Ok(Some(storage_class))
} else {
Ok(None)
}
})
}
fn serialize_storage_class<S: serde::Serializer>(
val: &Option<StorageClass>,
serializer: S,
) -> Result<S::Ok, S::Error> {
let val = val.as_ref().map(StorageClass::as_str);
Option::<&str>::serialize(&val, serializer)
}
impl RemoteStorageConfig {
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
let document: toml_edit::Document = match toml {
toml_edit::Item::Table(toml) => toml.clone().into(),
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
toml.clone().into_table().into()
}
_ => bail!("toml not a table or inline table"),
};
if document.is_empty() {
return Ok(None);
}
Ok(Some(toml_edit::de::from_document(document)?))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn parse(input: &str) -> anyhow::Result<Option<RemoteStorageConfig>> {
let toml = input.parse::<toml_edit::Document>().unwrap();
RemoteStorageConfig::from_toml(toml.as_item())
}
#[test]
fn parse_localfs_config_with_timeout() {
let input = "local_path = '.'
timeout = '5s'";
let config = parse(input).unwrap().expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs {
local_path: Utf8PathBuf::from(".")
},
timeout: Duration::from_secs(5)
}
);
}
#[test]
fn test_s3_parsing() {
let toml = "\
bucket_name = 'foo-bar'
bucket_region = 'eu-central-1'
upload_storage_class = 'INTELLIGENT_TIERING'
timeout = '7s'
";
let config = parse(toml).unwrap().expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::AwsS3(S3Config {
bucket_name: "foo-bar".into(),
bucket_region: "eu-central-1".into(),
prefix_in_bucket: None,
endpoint: None,
concurrency_limit: default_remote_storage_s3_concurrency_limit(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
upload_storage_class: Some(StorageClass::IntelligentTiering),
}),
timeout: Duration::from_secs(7)
}
);
}
#[test]
fn test_azure_parsing() {
let toml = "\
container_name = 'foo-bar'
container_region = 'westeurope'
upload_storage_class = 'INTELLIGENT_TIERING'
timeout = '7s'
";
let config = parse(toml).unwrap().expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::AzureContainer(AzureConfig {
container_name: "foo-bar".into(),
storage_account: None,
container_region: "westeurope".into(),
prefix_in_container: None,
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
}),
timeout: Duration::from_secs(7)
}
);
}
}

View File

@@ -10,6 +10,7 @@
#![deny(clippy::undocumented_unsafe_blocks)]
mod azure_blob;
mod config;
mod error;
mod local_fs;
mod metrics;
@@ -18,17 +19,10 @@ mod simulate_failures;
mod support;
use std::{
collections::HashMap,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
pin::Pin,
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
collections::HashMap, fmt::Debug, num::NonZeroU32, pin::Pin, sync::Arc, time::SystemTime,
};
use anyhow::{bail, Context};
use aws_sdk_s3::types::StorageClass;
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use bytes::Bytes;
@@ -44,6 +38,8 @@ pub use self::{
};
use s3_bucket::RequestKind;
pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
/// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here.
pub use azure_core::Etag;
@@ -525,168 +521,6 @@ impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
}
}
/// External backup storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
pub struct RemoteStorageConfig {
/// The storage connection configuration.
#[serde(flatten)]
pub storage: RemoteStorageKind,
/// A common timeout enforced for all requests after concurrency limiter permit has been
/// acquired.
#[serde(with = "humantime_serde", default = "default_timeout")]
pub timeout: Duration,
}
fn default_timeout() -> Duration {
RemoteStorageConfig::DEFAULT_TIMEOUT
}
/// A kind of a remote storage to connect to, with its connection configuration.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(untagged)]
pub enum RemoteStorageKind {
/// Storage based on local file system.
/// Specify a root folder to place all stored files into.
LocalFs { local_path: Utf8PathBuf },
/// AWS S3 based storage, storing all files in the S3 bucket
/// specified by the config
AwsS3(S3Config),
/// Azure Blob based storage, storing all files in the container
/// specified by the config
AzureContainer(AzureConfig),
}
/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
#[derive(Clone, PartialEq, Eq, serde::Deserialize)]
pub struct S3Config {
/// Name of the bucket to connect to.
pub bucket_name: String,
/// The region where the bucket is located at.
pub bucket_region: String,
/// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
pub prefix_in_bucket: Option<String>,
/// A base URL to send S3 requests to.
/// By default, the endpoint is derived from a region name, assuming it's
/// an AWS S3 region name, erroring on wrong region name.
/// Endpoint provides a way to support other S3 flavors and their regions.
///
/// Example: `http://127.0.0.1:5000`
pub endpoint: Option<String>,
/// AWS S3 has various limits on its API calls, we need not to exceed those.
/// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
#[serde(default = "default_remote_storage_s3_concurrency_limit")]
pub concurrency_limit: NonZeroUsize,
#[serde(default = "default_max_keys_per_list_response")]
pub max_keys_per_list_response: Option<i32>,
#[serde(deserialize_with = "deserialize_storage_class", default)]
pub upload_storage_class: Option<StorageClass>,
}
fn default_remote_storage_s3_concurrency_limit() -> NonZeroUsize {
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
.try_into()
.unwrap()
}
fn default_max_keys_per_list_response() -> Option<i32> {
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
}
impl Debug for S3Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("S3Config")
.field("bucket_name", &self.bucket_name)
.field("bucket_region", &self.bucket_region)
.field("prefix_in_bucket", &self.prefix_in_bucket)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
.finish()
}
}
/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AzureConfig {
/// Name of the container to connect to.
pub container_name: String,
/// Name of the storage account the container is inside of
pub storage_account: Option<String>,
/// The region where the bucket is located at.
pub container_region: String,
/// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
pub prefix_in_container: Option<String>,
/// Azure has various limits on its API calls, we need not to exceed those.
/// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
#[serde(default = "default_remote_storage_azure_concurrency_limit")]
pub concurrency_limit: NonZeroUsize,
#[serde(default = "default_max_keys_per_list_response")]
pub max_keys_per_list_response: Option<i32>,
}
fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap()
}
impl Debug for AzureConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AzureConfig")
.field("bucket_name", &self.container_name)
.field("storage_account", &self.storage_account)
.field("bucket_region", &self.container_region)
.field("prefix_in_container", &self.prefix_in_container)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
.finish()
}
}
fn deserialize_storage_class<'de, D: serde::Deserializer<'de>>(
deserializer: D,
) -> Result<Option<StorageClass>, D::Error> {
Option::<String>::deserialize(deserializer).and_then(|s| {
if let Some(s) = s {
use serde::de::Error;
let storage_class = StorageClass::from_str(&s).expect("infallible");
#[allow(deprecated)]
if matches!(storage_class, StorageClass::Unknown(_)) {
return Err(D::Error::custom(format!(
"Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}",
StorageClass::values()
)));
}
Ok(Some(storage_class))
} else {
Ok(None)
}
})
}
impl RemoteStorageConfig {
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
let document: toml_edit::Document = match toml {
toml_edit::Item::Table(toml) => toml.clone().into(),
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
toml.clone().into_table().into()
}
_ => bail!("toml not a table or inline table"),
};
if document.is_empty() {
return Ok(None);
}
Ok(Some(toml_edit::de::from_document(document)?))
}
}
struct ConcurrencyLimiter {
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
@@ -733,11 +567,6 @@ impl ConcurrencyLimiter {
mod tests {
use super::*;
fn parse(input: &str) -> anyhow::Result<Option<RemoteStorageConfig>> {
let toml = input.parse::<toml_edit::Document>().unwrap();
RemoteStorageConfig::from_toml(toml.as_item())
}
#[test]
fn test_object_name() {
let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
@@ -759,77 +588,4 @@ mod tests {
let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
assert_eq!(err.to_string(), "Path \"/\" is not relative");
}
#[test]
fn parse_localfs_config_with_timeout() {
let input = "local_path = '.'
timeout = '5s'";
let config = parse(input).unwrap().expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs {
local_path: Utf8PathBuf::from(".")
},
timeout: Duration::from_secs(5)
}
);
}
#[test]
fn test_s3_parsing() {
let toml = "\
bucket_name = 'foo-bar'
bucket_region = 'eu-central-1'
upload_storage_class = 'INTELLIGENT_TIERING'
timeout = '7s'
";
let config = parse(toml).unwrap().expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::AwsS3(S3Config {
bucket_name: "foo-bar".into(),
bucket_region: "eu-central-1".into(),
prefix_in_bucket: None,
endpoint: None,
concurrency_limit: default_remote_storage_s3_concurrency_limit(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
upload_storage_class: Some(StorageClass::IntelligentTiering),
}),
timeout: Duration::from_secs(7)
}
);
}
#[test]
fn test_azure_parsing() {
let toml = "\
container_name = 'foo-bar'
container_region = 'westeurope'
upload_storage_class = 'INTELLIGENT_TIERING'
timeout = '7s'
";
let config = parse(toml).unwrap().expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::AzureContainer(AzureConfig {
container_name: "foo-bar".into(),
storage_account: None,
container_region: "westeurope".into(),
prefix_in_container: None,
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
}),
timeout: Duration::from_secs(7)
}
);
}
}

View File

@@ -46,12 +46,12 @@ use utils::backoff;
use super::StorageMetadata;
use crate::{
config::S3Config,
error::Cancelled,
metrics::{start_counting_cancelled_wait, start_measuring_requests},
support::PermitCarrying,
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
S3Config, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
REMOTE_STORAGE_PREFIX_SEPARATOR,
TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
use crate::metrics::AttemptOutcome;

View File

@@ -83,10 +83,18 @@ fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
let keys: Vec<&str> = split[0].split('-').collect();
let mut lsns: Vec<&str> = split[1].split('-').collect();
// The current format of the layer file name: 000000067F0000000400000B150100000000-000000067F0000000400000D350100000000__00000000014B7AC8-v1-00000001
// Handle generation number `-00000001` part
if lsns.last().expect("should").len() == 8 {
lsns.pop();
}
// Handle version number `-v1` part
if lsns.last().expect("should").starts_with('v') {
lsns.pop();
}
if lsns.len() == 1 {
lsns.push(lsns[0]);
}

View File

@@ -33,9 +33,7 @@ use utils::{
use crate::tenant::timeline::GetVectoredImpl;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use crate::{disk_usage_eviction_task::DiskUsageEvictionTaskConfig, virtual_file::io_engine};
use crate::{tenant::config::TenantConf, virtual_file};
use crate::{
@@ -855,14 +853,6 @@ impl PageServerConf {
)
}
pub(crate) fn tenant_deleted_mark_file_path(
&self,
tenant_shard_id: &TenantShardId,
) -> Utf8PathBuf {
self.tenant_path(tenant_shard_id)
.join(TENANT_DELETED_MARKER_FILE_NAME)
}
pub fn traces_path(&self) -> Utf8PathBuf {
self.workdir.join("traces")
}

View File

@@ -236,6 +236,13 @@ paths:
type: string
format: date-time
description: A timestamp to get the LSN
- name: with_lease
in: query
required: false
schema:
type: boolean
description: Whether to grant a lease to the corresponding LSN. Default to false.
responses:
"200":
description: OK
@@ -1029,6 +1036,10 @@ components:
kind:
type: string
enum: [past, present, future, nodata]
valid_until:
type: string
format: date-time
description: The expiration time of the granted lease.
LsnLease:
type: object

View File

@@ -21,6 +21,7 @@ use pageserver_api::models::IngestAuxFilesRequest;
use pageserver_api::models::ListAuxFilesRequest;
use pageserver_api::models::LocationConfig;
use pageserver_api::models::LocationConfigListResponse;
use pageserver_api::models::LsnLease;
use pageserver_api::models::ShardParameters;
use pageserver_api::models::TenantDetails;
use pageserver_api::models::TenantLocationConfigResponse;
@@ -329,14 +330,11 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
}
}
impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
fn from(value: crate::tenant::delete::DeleteTenantError) -> Self {
use crate::tenant::delete::DeleteTenantError::*;
impl From<crate::tenant::mgr::DeleteTenantError> for ApiError {
fn from(value: crate::tenant::mgr::DeleteTenantError) -> Self {
use crate::tenant::mgr::DeleteTenantError::*;
match value {
Get(g) => ApiError::from(g),
Timeline(t) => ApiError::from(t),
SlotError(e) => e.into(),
SlotUpsertError(e) => e.into(),
Other(o) => ApiError::InternalServerError(o),
Cancelled => ApiError::ShuttingDown,
}
@@ -731,6 +729,8 @@ async fn get_lsn_by_timestamp_handler(
.map_err(ApiError::BadRequest)?;
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
let with_lease = parse_query_param(&request, "with_lease")?.unwrap_or(false);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline =
@@ -739,10 +739,15 @@ async fn get_lsn_by_timestamp_handler(
let result = timeline
.find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
.await?;
#[derive(serde::Serialize, Debug)]
struct Result {
lsn: Lsn,
kind: &'static str,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(flatten)]
lease: Option<LsnLease>,
}
let (lsn, kind) = match result {
LsnForTimestamp::Present(lsn) => (lsn, "present"),
@@ -750,11 +755,28 @@ async fn get_lsn_by_timestamp_handler(
LsnForTimestamp::Past(lsn) => (lsn, "past"),
LsnForTimestamp::NoData(lsn) => (lsn, "nodata"),
};
let result = Result { lsn, kind };
let lease = if with_lease {
timeline
.make_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx)
.inspect_err(|_| {
warn!("fail to grant a lease to {}", lsn);
})
.ok()
} else {
None
};
let result = Result { lsn, kind, lease };
let valid_until = result
.lease
.as_ref()
.map(|l| humantime::format_rfc3339_millis(l.valid_until).to_string());
tracing::info!(
lsn=?result.lsn,
kind=%result.kind,
timestamp=%timestamp_raw,
valid_until=?valid_until,
"lsn_by_timestamp finished"
);
json_response(StatusCode::OK, result)

View File

@@ -55,11 +55,9 @@ use self::config::AttachedLocationConfig;
use self::config::AttachmentMode;
use self::config::LocationConf;
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
use self::metadata::TimelineMetadata;
use self::mgr::GetActiveTenantError;
use self::mgr::GetTenantError;
use self::mgr::TenantsMap;
use self::remote_timeline_client::upload::upload_index_part;
use self::remote_timeline_client::RemoteTimelineClient;
use self::timeline::uninit::TimelineCreateGuard;
@@ -137,7 +135,6 @@ pub mod remote_timeline_client;
pub mod storage_layer;
pub mod config;
pub mod delete;
pub mod mgr;
pub mod secondary;
pub mod tasks;
@@ -161,8 +158,6 @@ pub const TENANTS_SEGMENT_NAME: &str = "tenants";
/// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
/// References to shared objects that are passed into each tenant, such
/// as the shared remote storage client and process initialization state.
#[derive(Clone)]
@@ -207,7 +202,6 @@ struct TimelinePreload {
}
pub(crate) struct TenantPreload {
deleting: bool,
timelines: HashMap<TimelineId, TimelinePreload>,
}
@@ -286,8 +280,6 @@ pub struct Tenant {
/// background warmup.
pub(crate) activate_now_sem: tokio::sync::Semaphore,
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
// Cancellation token fires when we have entered shutdown(). This is a parent of
// Timelines' cancellation token.
pub(crate) cancel: CancellationToken,
@@ -654,7 +646,6 @@ impl Tenant {
attached_conf: AttachedTenantConf,
shard_identity: ShardIdentity,
init_order: Option<InitializationOrder>,
tenants: &'static std::sync::RwLock<TenantsMap>,
mode: SpawnMode,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -828,52 +819,6 @@ impl Tenant {
// Remote preload is complete.
drop(remote_load_completion);
let pending_deletion = {
match DeleteTenantFlow::should_resume_deletion(
conf,
preload.as_ref().map(|p| p.deleting).unwrap_or(false),
&tenant_clone,
)
.await
{
Ok(should_resume_deletion) => should_resume_deletion,
Err(err) => {
make_broken(&tenant_clone, anyhow::anyhow!(err), BrokenVerbosity::Error);
return Ok(());
}
}
};
info!("pending_deletion {}", pending_deletion.is_some());
if let Some(deletion) = pending_deletion {
// as we are no longer loading, signal completion by dropping
// the completion while we resume deletion
drop(_completion);
let background_jobs_can_start =
init_order.as_ref().map(|x| &x.background_jobs_can_start);
if let Some(background) = background_jobs_can_start {
info!("waiting for backgound jobs barrier");
background.clone().wait().await;
info!("ready for backgound jobs barrier");
}
let deleted = DeleteTenantFlow::resume_from_attach(
deletion,
&tenant_clone,
preload,
tenants,
&ctx,
)
.await;
if let Err(e) = deleted {
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
}
return Ok(());
}
// We will time the duration of the attach phase unless this is a creation (attach will do no work)
let attached = {
let _attach_timer = match mode {
@@ -931,21 +876,13 @@ impl Tenant {
)
.await?;
let deleting = other_keys.contains(TENANT_DELETED_MARKER_FILE_NAME);
info!(
"found {} timelines, deleting={}",
remote_timeline_ids.len(),
deleting
);
info!("found {} timelines", remote_timeline_ids.len(),);
for k in other_keys {
if k != TENANT_DELETED_MARKER_FILE_NAME {
warn!("Unexpected non timeline key {k}");
}
warn!("Unexpected non timeline key {k}");
}
Ok(TenantPreload {
deleting,
timelines: Self::load_timeline_metadata(
self,
remote_timeline_ids,
@@ -974,7 +911,6 @@ impl Tenant {
let preload = match (preload, mode) {
(Some(p), _) => p,
(None, SpawnMode::Create) => TenantPreload {
deleting: false,
timelines: HashMap::new(),
},
(None, _) => {
@@ -2215,6 +2151,7 @@ impl Tenant {
// Upload an index from the parent: this is partly to provide freshness for the
// child tenants that will copy it, and partly for general ease-of-debugging: there will
// always be a parent shard index in the same generation as we wrote the child shard index.
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index");
timeline
.remote_client
.schedule_index_upload_for_file_changes()?;
@@ -2222,12 +2159,14 @@ impl Tenant {
// Shut down the timeline's remote client: this means that the indices we write
// for child shards will not be invalidated by the parent shard deleting layers.
tracing::info!(timeline_id=%timeline.timeline_id, "Shutting down remote storage client");
timeline.remote_client.shutdown().await;
// Download methods can still be used after shutdown, as they don't flow through the remote client's
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this
// operation is rare, so it's simpler to just download it (and robustly guarantees that the index
// we use here really is the remotely persistent one).
tracing::info!(timeline_id=%timeline.timeline_id, "Downloading index_part from parent");
let result = timeline.remote_client
.download_index_file(&self.cancel)
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
@@ -2240,6 +2179,7 @@ impl Tenant {
};
for child_shard in child_shards {
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index_part for child {}", child_shard.to_index());
upload_index_part(
&self.remote_storage,
child_shard,
@@ -2628,7 +2568,6 @@ impl Tenant {
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
activate_now_sem: tokio::sync::Semaphore::new(0),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
cancel: CancellationToken::default(),
gate: Gate::default(),
timeline_get_throttle: Arc::new(throttle::Throttle::new(
@@ -4068,6 +4007,7 @@ mod tests {
use storage_layer::PersistentLayerKey;
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
use timeline::GcInfo;
use utils::bin_ser::BeSer;
use utils::id::TenantId;
@@ -6745,49 +6685,48 @@ mod tests {
// img layer at 0x10
let img_layer = (0..10)
.map(|id| (get_key(id), test_img(&format!("value {id}@0x10"))))
.map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10"))))
.collect_vec();
let delta1 = vec![
// TODO: we should test a real delta record here, which requires us to add a variant of NeonWalRecord for testing purpose.
(
get_key(1),
Lsn(0x20),
Value::Image(test_img("value 1@0x20")),
Value::Image(Bytes::from("value 1@0x20")),
),
(
get_key(2),
Lsn(0x30),
Value::Image(test_img("value 2@0x30")),
Value::Image(Bytes::from("value 2@0x30")),
),
(
get_key(3),
Lsn(0x40),
Value::Image(test_img("value 3@0x40")),
Value::Image(Bytes::from("value 3@0x40")),
),
];
let delta2 = vec![
(
get_key(5),
Lsn(0x20),
Value::Image(test_img("value 5@0x20")),
Value::Image(Bytes::from("value 5@0x20")),
),
(
get_key(6),
Lsn(0x20),
Value::Image(test_img("value 6@0x20")),
Value::Image(Bytes::from("value 6@0x20")),
),
];
let delta3 = vec![
(
get_key(8),
Lsn(0x40),
Value::Image(test_img("value 8@0x40")),
Value::Image(Bytes::from("value 8@0x40")),
),
(
get_key(9),
Lsn(0x40),
Value::Image(test_img("value 9@0x40")),
Value::Image(Bytes::from("value 9@0x40")),
),
];
@@ -6809,9 +6748,42 @@ mod tests {
guard.cutoffs.horizon = Lsn(0x30);
}
let expected_result = [
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x20"),
Bytes::from_static(b"value 2@0x30"),
Bytes::from_static(b"value 3@0x40"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x20"),
Bytes::from_static(b"value 6@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x40"),
Bytes::from_static(b"value 9@0x40"),
];
for (idx, expected) in expected_result.iter().enumerate() {
assert_eq!(
tline
.get(get_key(idx as u32), Lsn(0x50), &ctx)
.await
.unwrap(),
expected
);
}
let cancel = CancellationToken::new();
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
for (idx, expected) in expected_result.iter().enumerate() {
assert_eq!(
tline
.get(get_key(idx as u32), Lsn(0x50), &ctx)
.await
.unwrap(),
expected
);
}
// Check if the image layer at the GC horizon contains exactly what we want
let image_at_gc_horizon = tline
.inspect_image_layers(Lsn(0x30), &ctx)
@@ -6822,14 +6794,22 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(image_at_gc_horizon.len(), 10);
let expected_lsn = [0x10, 0x20, 0x30, 0x10, 0x10, 0x20, 0x20, 0x10, 0x10, 0x10];
let expected_result = [
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x20"),
Bytes::from_static(b"value 2@0x30"),
Bytes::from_static(b"value 3@0x10"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x20"),
Bytes::from_static(b"value 6@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10"),
Bytes::from_static(b"value 9@0x10"),
];
for idx in 0..10 {
assert_eq!(
image_at_gc_horizon[idx],
(
get_key(idx as u32),
test_img(&format!("value {idx}@{:#x}", expected_lsn[idx]))
)
(get_key(idx as u32), expected_result[idx].clone())
);
}
@@ -6862,7 +6842,7 @@ mod tests {
},
// The delta layer that is cut in the middle
PersistentLayerKey {
key_range: Key::MIN..get_key(9),
key_range: get_key(3)..get_key(4),
lsn_range: Lsn(0x30)..Lsn(0x41),
is_delta: true
},
@@ -6947,6 +6927,9 @@ mod tests {
tline.get(get_key(2), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"0x10,0x20,0x30")
);
// Need to remove the limit of "Neon WAL redo requires base image".
// assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new());
// assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new());
@@ -7041,4 +7024,164 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_simple_bottom_most_compaction_deltas() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_deltas")?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
// We create one bottom-most image layer, a delta layer D1 crossing the GC horizon, D2 below the horizon, and D3 above the horizon.
//
// | D1 | | D3 |
// -| |-- gc horizon -----------------
// | | | D2 |
// --------- img layer ------------------
//
// What we should expact from this compaction is:
// | Part of D1 | | D3 |
// --------- img layer with D1+D2 at GC horizon------------------
// img layer at 0x10
let img_layer = (0..10)
.map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10"))))
.collect_vec();
let delta1 = vec![
(
get_key(1),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
(
get_key(2),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append("@0x30")),
),
(
get_key(3),
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
),
];
let delta2 = vec![
(
get_key(5),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
(
get_key(6),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
];
let delta3 = vec![
(
get_key(8),
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
),
(
get_key(9),
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
),
];
let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![delta1, delta2, delta3], // delta layers
vec![(Lsn(0x10), img_layer)], // image layers
Lsn(0x50),
)
.await?;
{
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![],
cutoffs: GcCutoffs {
pitr: Lsn(0x30),
horizon: Lsn(0x30),
},
leases: Default::default(),
};
}
let expected_result = [
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x10@0x20"),
Bytes::from_static(b"value 2@0x10@0x30"),
Bytes::from_static(b"value 3@0x10@0x40"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10@0x40"),
Bytes::from_static(b"value 9@0x10@0x40"),
];
let expected_result_at_gc_horizon = [
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x10@0x20"),
Bytes::from_static(b"value 2@0x10@0x30"),
Bytes::from_static(b"value 3@0x10"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10"),
Bytes::from_static(b"value 9@0x10"),
];
for idx in 0..10 {
assert_eq!(
tline
.get(get_key(idx as u32), Lsn(0x50), &ctx)
.await
.unwrap(),
&expected_result[idx]
);
assert_eq!(
tline
.get(get_key(idx as u32), Lsn(0x30), &ctx)
.await
.unwrap(),
&expected_result_at_gc_horizon[idx]
);
}
let cancel = CancellationToken::new();
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
for idx in 0..10 {
assert_eq!(
tline
.get(get_key(idx as u32), Lsn(0x50), &ctx)
.await
.unwrap(),
&expected_result[idx]
);
assert_eq!(
tline
.get(get_key(idx as u32), Lsn(0x30), &ctx)
.await
.unwrap(),
&expected_result_at_gc_horizon[idx]
);
}
Ok(())
}
}

View File

@@ -1,426 +0,0 @@
use std::sync::Arc;
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::{models::TenantState, shard::TenantShardId};
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
use tokio::sync::OwnedMutexGuard;
use tokio_util::sync::CancellationToken;
use tracing::{error, Instrument};
use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
use crate::{
config::PageServerConf,
context::RequestContext,
task_mgr::{self},
tenant::{
mgr::{TenantSlot, TenantsMapRemoveResult},
remote_timeline_client::remote_heatmap_path,
},
};
use super::{
mgr::{GetTenantError, TenantSlotError, TenantSlotUpsertError, TenantsMap},
remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
timeline::delete::DeleteTimelineFlow,
tree_sort_timelines, DeleteTimelineError, Tenant, TenantPreload,
};
#[derive(Debug, thiserror::Error)]
pub(crate) enum DeleteTenantError {
#[error("GetTenant {0}")]
Get(#[from] GetTenantError),
#[error("Tenant map slot error {0}")]
SlotError(#[from] TenantSlotError),
#[error("Tenant map slot upsert error {0}")]
SlotUpsertError(#[from] TenantSlotUpsertError),
#[error("Timeline {0}")]
Timeline(#[from] DeleteTimelineError),
#[error("Cancelled")]
Cancelled,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
type DeletionGuard = tokio::sync::OwnedMutexGuard<DeleteTenantFlow>;
fn remote_tenant_delete_mark_path(
conf: &PageServerConf,
tenant_shard_id: &TenantShardId,
) -> anyhow::Result<RemotePath> {
let tenant_remote_path = conf
.tenant_path(tenant_shard_id)
.strip_prefix(&conf.workdir)
.context("Failed to strip workdir prefix")
.and_then(RemotePath::new)
.context("tenant path")?;
Ok(tenant_remote_path.join(Utf8Path::new("timelines/deleted")))
}
async fn schedule_ordered_timeline_deletions(
tenant: &Arc<Tenant>,
) -> Result<Vec<(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>, TimelineId)>, DeleteTenantError> {
// Tenant is stopping at this point. We know it will be deleted.
// No new timelines should be created.
// Tree sort timelines to delete from leafs to the root.
// NOTE: by calling clone we release the mutex which creates a possibility for a race: pending deletion
// can complete and remove timeline from the map in between our call to clone
// and `DeleteTimelineFlow::run`, so `run` wont find timeline in `timelines` map.
// timelines.lock is currently synchronous so we cant hold it across await point.
// So just ignore NotFound error if we get it from `run`.
// Beware: in case it becomes async and we try to hold it here, `run` also locks it, which can create a deadlock.
let timelines = tenant.timelines.lock().unwrap().clone();
let sorted =
tree_sort_timelines(timelines, |t| t.get_ancestor_timeline_id()).context("tree sort")?;
let mut already_running_deletions = vec![];
for (timeline_id, _) in sorted.into_iter().rev() {
let span = tracing::info_span!("timeline_delete", %timeline_id);
let res = DeleteTimelineFlow::run(tenant, timeline_id, true)
.instrument(span)
.await;
if let Err(e) = res {
match e {
DeleteTimelineError::NotFound => {
// Timeline deletion finished after call to clone above but before call
// to `DeleteTimelineFlow::run` and removed timeline from the map.
continue;
}
DeleteTimelineError::AlreadyInProgress(guard) => {
already_running_deletions.push((guard, timeline_id));
continue;
}
e => return Err(DeleteTenantError::Timeline(e)),
}
}
}
Ok(already_running_deletions)
}
async fn ensure_timelines_dir_empty(timelines_path: &Utf8Path) -> Result<(), DeleteTenantError> {
// Assert timelines dir is empty.
if !fs_ext::is_directory_empty(timelines_path).await? {
// Display first 10 items in directory
let list = fs_ext::list_dir(timelines_path).await.context("list_dir")?;
let list = &list.into_iter().take(10).collect::<Vec<_>>();
return Err(DeleteTenantError::Other(anyhow::anyhow!(
"Timelines directory is not empty after all timelines deletion: {list:?}"
)));
}
Ok(())
}
async fn remove_tenant_remote_delete_mark(
conf: &PageServerConf,
remote_storage: &GenericRemoteStorage,
tenant_shard_id: &TenantShardId,
cancel: &CancellationToken,
) -> Result<(), DeleteTenantError> {
let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?;
backoff::retry(
|| async { remote_storage.delete(&path, cancel).await },
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"remove_tenant_remote_delete_mark",
cancel,
)
.await
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("remove_tenant_remote_delete_mark")?;
Ok(())
}
// Cleanup fs traces: tenant config, timelines dir local delete mark, tenant dir
async fn cleanup_remaining_fs_traces(
conf: &PageServerConf,
tenant_shard_id: &TenantShardId,
) -> Result<(), DeleteTenantError> {
let rm = |p: Utf8PathBuf, is_dir: bool| async move {
if is_dir {
tokio::fs::remove_dir(&p).await
} else {
tokio::fs::remove_file(&p).await
}
.or_else(fs_ext::ignore_not_found)
.with_context(|| format!("failed to delete {p}"))
};
rm(conf.tenant_config_path(tenant_shard_id), false).await?;
rm(conf.tenant_location_config_path(tenant_shard_id), false).await?;
fail::fail_point!("tenant-delete-before-remove-timelines-dir", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-remove-timelines-dir"
))?
});
rm(conf.timelines_path(tenant_shard_id), true).await?;
fail::fail_point!("tenant-delete-before-remove-deleted-mark", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-remove-deleted-mark"
))?
});
// Make sure previous deletions are ordered before mark removal.
// Otherwise there is no guarantee that they reach the disk before mark deletion.
// So its possible for mark to reach disk first and for other deletions
// to be reordered later and thus missed if a crash occurs.
// Note that we dont need to sync after mark file is removed
// because we can tolerate the case when mark file reappears on startup.
let tenant_path = &conf.tenant_path(tenant_shard_id);
if tenant_path.exists() {
crashsafe::fsync_async(&conf.tenant_path(tenant_shard_id))
.await
.context("fsync_pre_mark_remove")?;
}
rm(conf.tenant_deleted_mark_file_path(tenant_shard_id), false).await?;
rm(conf.tenant_heatmap_path(tenant_shard_id), false).await?;
fail::fail_point!("tenant-delete-before-remove-tenant-dir", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-remove-tenant-dir"
))?
});
rm(conf.tenant_path(tenant_shard_id), true).await?;
Ok(())
}
#[derive(Default)]
pub enum DeleteTenantFlow {
#[default]
NotStarted,
InProgress,
Finished,
}
impl DeleteTenantFlow {
pub(crate) async fn should_resume_deletion(
conf: &'static PageServerConf,
remote_mark_exists: bool,
tenant: &Tenant,
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
let acquire = |t: &Tenant| {
Some(
Arc::clone(&t.delete_progress)
.try_lock_owned()
.expect("we're the only owner during init"),
)
};
if remote_mark_exists {
return Ok(acquire(tenant));
}
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
if conf
.tenant_deleted_mark_file_path(&tenant.tenant_shard_id)
.exists()
{
Ok(acquire(tenant))
} else {
Ok(None)
}
}
pub(crate) async fn resume_from_attach(
guard: DeletionGuard,
tenant: &Arc<Tenant>,
preload: Option<TenantPreload>,
tenants: &'static std::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
tenant
.set_stopping(progress, false, true)
.await
.expect("cant be stopping or broken");
tenant
.attach(preload, super::SpawnMode::Eager, ctx)
.await
.context("attach")?;
Self::background(
guard,
tenant.conf,
tenant.remote_storage.clone(),
tenants,
tenant,
)
.await
}
async fn background(
mut guard: OwnedMutexGuard<Self>,
conf: &PageServerConf,
remote_storage: GenericRemoteStorage,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: &Arc<Tenant>,
) -> Result<(), DeleteTenantError> {
// Tree sort timelines, schedule delete for them. Mention retries from the console side.
// Note that if deletion fails we dont mark timelines as broken,
// the whole tenant will become broken as by `Self::schedule_background` logic
let already_running_timeline_deletions = schedule_ordered_timeline_deletions(tenant)
.await
.context("schedule_ordered_timeline_deletions")?;
fail::fail_point!("tenant-delete-before-polling-ongoing-deletions", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-polling-ongoing-deletions"
))?
});
// Wait for deletions that were already running at the moment when tenant deletion was requested.
// When we can lock deletion guard it means that corresponding timeline deletion finished.
for (guard, timeline_id) in already_running_timeline_deletions {
let flow = guard.lock().await;
if !flow.is_finished() {
return Err(DeleteTenantError::Other(anyhow::anyhow!(
"already running timeline deletion failed: {timeline_id}"
)));
}
}
// Remove top-level tenant objects that don't belong to a timeline, such as heatmap
let heatmap_path = remote_heatmap_path(&tenant.tenant_shard_id());
if let Some(Err(e)) = backoff::retry(
|| async {
remote_storage
.delete(&heatmap_path, &task_mgr::shutdown_token())
.await
},
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"remove_remote_tenant_heatmap",
&task_mgr::shutdown_token(),
)
.await
{
tracing::warn!("Failed to delete heatmap at {heatmap_path}: {e}");
}
let timelines_path = conf.timelines_path(&tenant.tenant_shard_id);
// May not exist if we fail in cleanup_remaining_fs_traces after removing it
if timelines_path.exists() {
// sanity check to guard against layout changes
ensure_timelines_dir_empty(&timelines_path)
.await
.context("timelines dir not empty")?;
}
remove_tenant_remote_delete_mark(
conf,
&remote_storage,
&tenant.tenant_shard_id,
&task_mgr::shutdown_token(),
)
.await?;
pausable_failpoint!("tenant-delete-before-cleanup-remaining-fs-traces-pausable");
fail::fail_point!("tenant-delete-before-cleanup-remaining-fs-traces", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-cleanup-remaining-fs-traces"
))?
});
cleanup_remaining_fs_traces(conf, &tenant.tenant_shard_id)
.await
.context("cleanup_remaining_fs_traces")?;
{
// This block is simply removing the TenantSlot for this tenant. It requires a loop because
// we might conflict with a TenantSlot::InProgress marker and need to wait for it.
//
// This complexity will go away when we simplify how deletion works:
// https://github.com/neondatabase/neon/issues/5080
loop {
// Under the TenantMap lock, try to remove the tenant. We usually succeed, but if
// we encounter an InProgress marker, yield the barrier it contains and wait on it.
let barrier = {
let mut locked = tenants.write().unwrap();
let removed = locked.remove(tenant.tenant_shard_id);
// FIXME: we should not be modifying this from outside of mgr.rs.
// This will go away when we simplify deletion (https://github.com/neondatabase/neon/issues/5080)
// Update stats
match &removed {
TenantsMapRemoveResult::Occupied(slot) => {
crate::metrics::TENANT_MANAGER.slot_removed(slot);
}
TenantsMapRemoveResult::InProgress(barrier) => {
crate::metrics::TENANT_MANAGER
.slot_removed(&TenantSlot::InProgress(barrier.clone()));
}
TenantsMapRemoveResult::Vacant => {
// Nothing changed in map, no metric update
}
}
match removed {
TenantsMapRemoveResult::Occupied(TenantSlot::Attached(tenant)) => {
match tenant.current_state() {
TenantState::Stopping { .. } | TenantState::Broken { .. } => {
// Expected: we put the tenant into stopping state before we start deleting it
}
state => {
// Unexpected state
tracing::warn!(
"Tenant in unexpected state {state} after deletion"
);
}
}
break;
}
TenantsMapRemoveResult::Occupied(TenantSlot::Secondary(_)) => {
// This is unexpected: this secondary tenants should not have been created, and we
// are not in a position to shut it down from here.
tracing::warn!("Tenant transitioned to secondary mode while deleting!");
break;
}
TenantsMapRemoveResult::Occupied(TenantSlot::InProgress(_)) => {
unreachable!("TenantsMap::remove handles InProgress separately, should never return it here");
}
TenantsMapRemoveResult::Vacant => {
tracing::warn!(
"Tenant removed from TenantsMap before deletion completed"
);
break;
}
TenantsMapRemoveResult::InProgress(barrier) => {
// An InProgress entry was found, we must wait on its barrier
barrier
}
}
};
tracing::info!(
"Waiting for competing operation to complete before deleting state for tenant"
);
barrier.wait().await;
}
}
*guard = Self::Finished;
Ok(())
}
}

View File

@@ -51,7 +51,6 @@ use utils::fs_ext::PathExt;
use utils::generation::Generation;
use utils::id::{TenantId, TimelineId};
use super::delete::DeleteTenantError;
use super::remote_timeline_client::remote_tenant_path;
use super::secondary::SecondaryTenant;
use super::timeline::detach_ancestor::PreparedTimelineDetach;
@@ -109,12 +108,6 @@ pub(crate) enum TenantsMap {
ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
}
pub(crate) enum TenantsMapRemoveResult {
Occupied(TenantSlot),
Vacant,
InProgress(utils::completion::Barrier),
}
/// When resolving a TenantId to a shard, we may be looking for the 0th
/// shard, or we might be looking for whichever shard holds a particular page.
#[derive(Copy, Clone)]
@@ -191,26 +184,6 @@ impl TenantsMap {
}
}
/// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map.
///
/// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
/// slot if the enclosed tenant is shutdown.
pub(crate) fn remove(&mut self, tenant_shard_id: TenantShardId) -> TenantsMapRemoveResult {
use std::collections::btree_map::Entry;
match self {
TenantsMap::Initializing => TenantsMapRemoveResult::Vacant,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => match m.entry(tenant_shard_id) {
Entry::Occupied(entry) => match entry.get() {
TenantSlot::InProgress(barrier) => {
TenantsMapRemoveResult::InProgress(barrier.clone())
}
_ => TenantsMapRemoveResult::Occupied(entry.remove()),
},
Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant,
},
}
}
#[cfg(all(debug_assertions, not(test)))]
pub(crate) fn len(&self) -> usize {
match self {
@@ -460,6 +433,18 @@ async fn init_load_tenant_configs(
Ok(configs)
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum DeleteTenantError {
#[error("Tenant map slot error {0}")]
SlotError(#[from] TenantSlotError),
#[error("Cancelled")]
Cancelled,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
/// Initialize repositories with locally available timelines.
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
/// are scheduled for download and added to the tenant once download is completed.
@@ -629,7 +614,6 @@ pub async fn init_tenant_mgr(
AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
shard_identity,
Some(init_order.clone()),
&TENANTS,
SpawnMode::Lazy,
&ctx,
) {
@@ -685,7 +669,6 @@ fn tenant_spawn(
location_conf: AttachedTenantConf,
shard_identity: ShardIdentity,
init_order: Option<InitializationOrder>,
tenants: &'static std::sync::RwLock<TenantsMap>,
mode: SpawnMode,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -712,7 +695,6 @@ fn tenant_spawn(
location_conf,
shard_identity,
init_order,
tenants,
mode,
ctx,
) {
@@ -1161,7 +1143,6 @@ impl TenantManager {
attached_conf,
shard_identity,
None,
self.tenants,
spawn_mode,
ctx,
)?;
@@ -1283,7 +1264,6 @@ impl TenantManager {
AttachedTenantConf::try_from(config)?,
shard_identity,
None,
self.tenants,
SpawnMode::Eager,
ctx,
)?;
@@ -1634,7 +1614,7 @@ impl TenantManager {
for child_shard_id in &child_shards {
let child_shard_id = *child_shard_id;
let child_shard = {
let locked = TENANTS.read().unwrap();
let locked = self.tenants.read().unwrap();
let peek_slot =
tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
peek_slot.and_then(|s| s.get_attached()).cloned()
@@ -1735,6 +1715,7 @@ impl TenantManager {
let timelines = parent_shard.timelines.lock().unwrap().clone();
let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
for timeline in timelines.values() {
tracing::info!(timeline_id=%timeline.timeline_id, "Loading list of layers to hardlink");
let timeline_layers = timeline
.layers
.read()
@@ -1774,7 +1755,12 @@ impl TenantManager {
// Since we will do a large number of small filesystem metadata operations, batch them into
// spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
let span = tracing::Span::current();
let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
// Run this synchronous code in the same log context as the outer function that spawned it.
let _span = span.enter();
tracing::info!("Creating {} directories", create_dirs.len());
for dir in &create_dirs {
if let Err(e) = std::fs::create_dir_all(dir) {
// Ignore AlreadyExists errors, drop out on all other errors
@@ -1788,6 +1774,11 @@ impl TenantManager {
}
for child_prefix in child_prefixes {
tracing::info!(
"Hard-linking {} parent layers into child path {}",
parent_layers.len(),
child_prefix
);
for relative_layer in &parent_layers {
let parent_path = parent_path.join(relative_layer);
let child_path = child_prefix.join(relative_layer);
@@ -1813,6 +1804,7 @@ impl TenantManager {
// Durability is not required for correctness, but if we crashed during split and
// then came restarted with empty timeline dirs, it would be very inefficient to
// re-populate from remote storage.
tracing::info!("fsyncing {} directories", create_dirs.len());
for dir in create_dirs {
if let Err(e) = crashsafe::fsync(&dir) {
// Something removed a newly created timeline dir out from underneath us? Extremely
@@ -1866,7 +1858,7 @@ impl TenantManager {
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = self
.detach_tenant0(conf, &TENANTS, tenant_shard_id, deletion_queue_client)
.detach_tenant0(conf, tenant_shard_id, deletion_queue_client)
.await?;
spawn_background_purge(tmp_path);
@@ -1876,7 +1868,6 @@ impl TenantManager {
async fn detach_tenant0(
&self,
conf: &'static PageServerConf,
tenants: &std::sync::RwLock<TenantsMap>,
tenant_shard_id: TenantShardId,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
@@ -1890,7 +1881,7 @@ impl TenantManager {
};
let removal_result = remove_tenant_from_memory(
tenants,
self.tenants,
tenant_shard_id,
tenant_dir_rename_operation(tenant_shard_id),
)
@@ -1906,7 +1897,7 @@ impl TenantManager {
pub(crate) fn list_tenants(
&self,
) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
let tenants = TENANTS.read().unwrap();
let tenants = self.tenants.read().unwrap();
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
@@ -2007,7 +1998,6 @@ impl TenantManager {
AttachedTenantConf::try_from(config)?,
shard_identity,
None,
self.tenants,
SpawnMode::Eager,
ctx,
)?;

View File

@@ -965,6 +965,8 @@ impl Timeline {
_cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
use std::collections::BTreeSet;
use crate::tenant::storage_layer::ValueReconstructState;
// Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
// The layer selection has the following properties:
@@ -986,20 +988,30 @@ impl Timeline {
(selected_layers, gc_cutoff)
};
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
// Also, collect the layer information to decide when to split the new delta layers.
let mut all_key_values = Vec::new();
let mut delta_split_points = BTreeSet::new();
for layer in &layer_selection {
all_key_values.extend(layer.load_key_values(ctx).await?);
let desc = layer.layer_desc();
if desc.is_delta() {
// TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
// so that we can avoid having too many small delta layers.
let key_range = desc.get_key_range();
delta_split_points.insert(key_range.start);
delta_split_points.insert(key_range.end);
}
}
// Key small to large, LSN low to high, if the same LSN has both image and delta due to the merge of delta layers and
// image layers, make image appear later than delta.
// image layers, make image appear before than delta.
struct ValueWrapper<'a>(&'a crate::repository::Value);
impl Ord for ValueWrapper<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
use crate::repository::Value;
use std::cmp::Ordering;
match (self.0, other.0) {
(Value::Image(_), Value::WalRecord(_)) => Ordering::Greater,
(Value::WalRecord(_), Value::Image(_)) => Ordering::Less,
(Value::Image(_), Value::WalRecord(_)) => Ordering::Less,
(Value::WalRecord(_), Value::Image(_)) => Ordering::Greater,
_ => Ordering::Equal,
}
}
@@ -1018,13 +1030,6 @@ impl Timeline {
all_key_values.sort_by(|(k1, l1, v1), (k2, l2, v2)| {
(k1, l1, ValueWrapper(v1)).cmp(&(k2, l2, ValueWrapper(v2)))
});
let max_lsn = all_key_values
.iter()
.map(|(_, lsn, _)| lsn)
.max()
.copied()
.unwrap()
+ 1;
// Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
// Data of the same key.
let mut accumulated_values = Vec::new();
@@ -1043,7 +1048,19 @@ impl Timeline {
// We have a list of deltas/images. We want to create image layers while collect garbages.
for (key, lsn, val) in accumulated_values.iter().rev() {
if *lsn > horizon {
keys_above_horizon.push((*key, *lsn, val.clone())); // TODO: ensure one LSN corresponds to either delta or image instead of both
if let Some((_, prev_lsn, _)) = keys_above_horizon.last_mut() {
if *prev_lsn == *lsn {
// The case that we have an LSN with both data from the delta layer and the image layer. As
// `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
// drop this delta and keep the image.
//
// For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
// keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
// dropped.
continue;
}
}
keys_above_horizon.push((*key, *lsn, val.clone()));
} else if *lsn <= horizon {
match val {
crate::repository::Value::Image(image) => {
@@ -1068,15 +1085,59 @@ impl Timeline {
Ok((keys_above_horizon, img))
}
let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
all_key_values.first().unwrap().0,
gc_cutoff..max_lsn, // TODO: off by one?
ctx,
)
.await?;
async fn flush_deltas(
deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
last_key: Key,
delta_split_points: &[Key],
current_delta_split_point: &mut usize,
tline: &Arc<Timeline>,
gc_cutoff: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<Option<ResidentLayer>> {
// Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
// overlapping layers.
//
// If we have a structure like this:
//
// | Delta 1 | | Delta 4 |
// |---------| Delta 2 |---------|
// | Delta 3 | | Delta 5 |
//
// And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4.
// A simple solution here is to split the delta layers using the original boundary, while this
// might produce a lot of small layers. This should be improved and fixed in the future.
let mut need_split = false;
while *current_delta_split_point < delta_split_points.len()
&& last_key >= delta_split_points[*current_delta_split_point]
{
*current_delta_split_point += 1;
need_split = true;
}
if !need_split {
return Ok(None);
}
let deltas = std::mem::take(deltas);
if deltas.is_empty() {
return Ok(None);
}
let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
let mut delta_layer_writer = DeltaLayerWriter::new(
tline.conf,
tline.timeline_id,
tline.tenant_shard_id,
deltas.first().unwrap().0,
gc_cutoff..end_lsn,
ctx,
)
.await?;
let key_end = deltas.last().unwrap().0.next();
for (key, lsn, val) in deltas {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
let delta_layer = delta_layer_writer.finish(key_end, tline, ctx).await?;
Ok(Some(delta_layer))
}
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
@@ -1087,6 +1148,10 @@ impl Timeline {
)
.await?;
let mut delta_values = Vec::new();
let delta_split_points = delta_split_points.into_iter().collect_vec();
let mut current_delta_split_point = 0;
let mut delta_layers = Vec::new();
for item @ (key, _, _) in &all_key_values {
if &last_key == key {
accumulated_values.push(item);
@@ -1094,33 +1159,54 @@ impl Timeline {
let (deltas, image) =
flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff)
.await?;
// Put the image into the image layer. Currently we have a single big layer for the compaction.
image_layer_writer.put_image(last_key, image, ctx).await?;
for (key, lsn, val) in deltas {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
delta_values.extend(deltas);
delta_layers.extend(
flush_deltas(
&mut delta_values,
last_key,
&delta_split_points,
&mut current_delta_split_point,
self,
gc_cutoff,
ctx,
)
.await?,
);
accumulated_values.clear();
accumulated_values.push(item);
last_key = *key;
}
}
// TODO: move this part to the loop body
let (deltas, image) =
flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?;
// Put the image into the image layer. Currently we have a single big layer for the compaction.
image_layer_writer.put_image(last_key, image, ctx).await?;
for (key, lsn, val) in deltas {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
accumulated_values.clear();
// TODO: split layers
let delta_layer = delta_layer_writer.finish(last_key, self, ctx).await?;
delta_values.extend(deltas);
delta_layers.extend(
flush_deltas(
&mut delta_values,
last_key,
&delta_split_points,
&mut current_delta_split_point,
self,
gc_cutoff,
ctx,
)
.await?,
);
let image_layer = image_layer_writer.finish(self, ctx).await?;
let mut compact_to = Vec::new();
compact_to.extend(delta_layers);
compact_to.push(image_layer);
// Step 3: Place back to the layer map.
{
let mut guard = self.layers.write().await;
guard.finish_gc_compaction(
&layer_selection,
&[delta_layer.clone(), image_layer.clone()],
&self.metrics,
)
guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
};
Ok(())
}

View File

@@ -255,7 +255,6 @@ impl DeleteTimelineFlow {
}
/// Shortcut to create Timeline in stopping state and spawn deletion task.
/// See corresponding parts of [`crate::tenant::delete::DeleteTenantFlow`]
#[instrument(skip_all, fields(%timeline_id))]
pub async fn resume_deletion(
tenant: Arc<Tenant>,
@@ -420,10 +419,6 @@ impl DeleteTimelineFlow {
Ok(())
}
pub(crate) fn is_finished(&self) -> bool {
matches!(self, Self::Finished)
}
pub(crate) fn is_not_started(&self) -> bool {
matches!(self, Self::NotStarted)
}

View File

@@ -103,12 +103,8 @@ impl ConnCfg {
/// Reuse password or auth keys from the other config.
pub fn reuse_password(&mut self, other: Self) {
if let Some(password) = other.get_password() {
self.password(password);
}
if let Some(keys) = other.get_auth_keys() {
self.auth_keys(keys);
if let Some(password) = other.get_auth() {
self.auth(password);
}
}
@@ -124,48 +120,64 @@ impl ConnCfg {
/// Apply startup message params to the connection config.
pub fn set_startup_params(&mut self, params: &StartupMessageParams) {
// Only set `user` if it's not present in the config.
// Link auth flow takes username from the console's response.
if let (None, Some(user)) = (self.get_user(), params.get("user")) {
self.user(user);
}
// Only set `dbname` if it's not present in the config.
// Link auth flow takes dbname from the console's response.
if let (None, Some(dbname)) = (self.get_dbname(), params.get("database")) {
self.dbname(dbname);
}
// Don't add `options` if they were only used for specifying a project.
// Connection pools don't support `options`, because they affect backend startup.
if let Some(options) = filtered_options(params) {
self.options(&options);
}
if let Some(app_name) = params.get("application_name") {
self.application_name(app_name);
}
// TODO: This is especially ugly...
if let Some(replication) = params.get("replication") {
use tokio_postgres::config::ReplicationMode;
match replication {
"true" | "on" | "yes" | "1" => {
self.replication_mode(ReplicationMode::Physical);
let mut client_encoding = false;
for (k, v) in params.iter() {
match k {
"user" => {
// Only set `user` if it's not present in the config.
// Link auth flow takes username from the console's response.
if self.get_user().is_none() {
self.user(v);
}
}
"database" => {
self.replication_mode(ReplicationMode::Logical);
// Only set `dbname` if it's not present in the config.
// Link auth flow takes dbname from the console's response.
if self.get_dbname().is_none() {
self.dbname(v);
}
}
"options" => {
// Don't add `options` if they were only used for specifying a project.
// Connection pools don't support `options`, because they affect backend startup.
if let Some(options) = filtered_options(v) {
self.options(&options);
}
}
// the special ones in tokio-postgres that we don't want being set by the user
"dbname" => {}
"password" => {}
"sslmode" => {}
"host" => {}
"port" => {}
"connect_timeout" => {}
"keepalives" => {}
"keepalives_idle" => {}
"keepalives_interval" => {}
"keepalives_retries" => {}
"target_session_attrs" => {}
"channel_binding" => {}
"max_backend_message_size" => {}
"client_encoding" => {
client_encoding = true;
// only error should be from bad null bytes,
// but we've already checked for those.
_ = self.param("client_encoding", v);
}
_ => {
// only error should be from bad null bytes,
// but we've already checked for those.
_ = self.param(k, v);
}
_other => {}
}
}
// TODO: extend the list of the forwarded startup parameters.
// Currently, tokio-postgres doesn't allow us to pass
// arbitrary parameters, but the ones above are a good start.
//
// This and the reverse params problem can be better addressed
// in a bespoke connection machinery (a new library for that sake).
if !client_encoding {
// for compatibility since we removed it from tokio-postgres
self.param("client_encoding", "UTF8").unwrap();
}
}
}
@@ -338,10 +350,9 @@ impl ConnCfg {
}
/// Retrieve `options` from a startup message, dropping all proxy-secific flags.
fn filtered_options(params: &StartupMessageParams) -> Option<String> {
fn filtered_options(options: &str) -> Option<String> {
#[allow(unstable_name_collisions)]
let options: String = params
.options_raw()?
let options: String = StartupMessageParams::parse_options_raw(options)
.filter(|opt| parse_endpoint_param(opt).is_none() && neon_option(opt).is_none())
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();
@@ -413,27 +424,23 @@ mod tests {
#[test]
fn test_filtered_options() {
// Empty options is unlikely to be useful anyway.
let params = StartupMessageParams::new([("options", "")]);
assert_eq!(filtered_options(&params), None);
assert_eq!(filtered_options(""), None);
// It's likely that clients will only use options to specify endpoint/project.
let params = StartupMessageParams::new([("options", "project=foo")]);
assert_eq!(filtered_options(&params), None);
let params = "project=foo";
assert_eq!(filtered_options(params), None);
// Same, because unescaped whitespaces are no-op.
let params = StartupMessageParams::new([("options", " project=foo ")]);
assert_eq!(filtered_options(&params).as_deref(), None);
let params = " project=foo ";
assert_eq!(filtered_options(params), None);
let params = StartupMessageParams::new([("options", r"\ project=foo \ ")]);
assert_eq!(filtered_options(&params).as_deref(), Some(r"\ \ "));
let params = r"\ project=foo \ ";
assert_eq!(filtered_options(params).as_deref(), Some(r"\ \ "));
let params = StartupMessageParams::new([("options", "project = foo")]);
assert_eq!(filtered_options(&params).as_deref(), Some("project = foo"));
let params = "project = foo";
assert_eq!(filtered_options(params).as_deref(), Some("project = foo"));
let params = StartupMessageParams::new([(
"options",
"project = foo neon_endpoint_type:read_write neon_lsn:0/2",
)]);
assert_eq!(filtered_options(&params).as_deref(), Some("project = foo"));
let params = "project = foo neon_endpoint_type:read_write neon_lsn:0/2";
assert_eq!(filtered_options(params).as_deref(), Some("project = foo"));
}
}

View File

@@ -231,6 +231,10 @@ impl ConnectMechanism for TokioMechanism {
.dbname(&self.conn_info.dbname)
.connect_timeout(timeout);
config
.param("client_encoding", "UTF8")
.expect("client encoding UTF8 is always valid");
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
let res = config.connect(tokio_postgres::NoTls).await;
drop(pause);

View File

@@ -202,6 +202,7 @@ fn get_conn_info(
options = Some(NeonOptions::parse_options_raw(&value));
}
}
ctx.set_db_options(params.freeze());
let user_info = ComputeUserInfo {
endpoint,

View File

@@ -231,11 +231,7 @@ impl PhysicalStorage {
// half initialized segment, first bake it under tmp filename and
// then rename.
let tmp_path = self.timeline_dir.join("waltmp");
#[allow(clippy::suspicious_open_options)]
let mut file = OpenOptions::new()
.create(true)
.write(true)
.open(&tmp_path)
let mut file = File::create(&tmp_path)
.await
.with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?;

View File

@@ -619,13 +619,15 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
tenant_id: Union[TenantId, TenantShardId],
timeline_id: TimelineId,
timestamp: datetime,
with_lease: bool = False,
**kwargs,
):
log.info(
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}, {with_lease=}"
)
with_lease_query = f"{with_lease=}".lower()
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp.isoformat()}Z",
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp.isoformat()}Z&{with_lease_query}",
**kwargs,
)
self.verbose_error(res)

View File

@@ -48,7 +48,16 @@ def test_storage_controller_many_tenants(
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
# of shards are hitting the delayed path.
env.storage_controller.allowed_errors.append(".*Many shards are waiting to reconcile")
env.storage_controller.allowed_errors.extend(
[
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
# of shards are hitting the delayed path.
".*Many shards are waiting to reconcile",
# We will create many timelines concurrently, so they might get slow enough to trip the warning
# that timeline creation is holding a lock too long.
".*Shared lock by TimelineCreate.*was held.*",
]
)
for ps in env.pageservers:
# This can happen because when we do a loop over all pageservers and mark them offline/active,

View File

@@ -12,10 +12,24 @@ from fixtures.utils import query_scalar, wait_until
from requests.exceptions import ReadTimeout
#
# Test pageserver get_lsn_by_timestamp API
#
def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
def assert_lsn_lease_granted(result, with_lease: bool):
"""
Asserts an LSN lease is granted when `with_lease` flag is turned on.
Always asserts no LSN lease is granted when `with_lease` flag is off.
"""
if with_lease:
assert result.get("valid_until")
else:
assert result.get("valid_until") is None
@pytest.mark.parametrize("with_lease", [True, False])
def test_lsn_mapping(neon_env_builder: NeonEnvBuilder, with_lease: bool):
"""
Test pageserver get_lsn_by_timestamp API.
:param with_lease: Whether to get a lease associated with returned LSN.
"""
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant(
@@ -67,23 +81,33 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Check edge cases
# Timestamp is in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id, probe_timestamp, with_lease=with_lease
)
assert result["kind"] == "future"
assert_lsn_lease_granted(result, with_lease)
# make sure that we return a well advanced lsn here
assert Lsn(result["lsn"]) > start_lsn
# Timestamp is in the unreachable past
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id, probe_timestamp, with_lease=with_lease
)
assert result["kind"] == "past"
assert_lsn_lease_granted(result, with_lease)
# make sure that we return the minimum lsn here at the start of the range
assert Lsn(result["lsn"]) < start_lsn
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id, probe_timestamp, with_lease=with_lease
)
assert result["kind"] not in ["past", "nodata"]
assert_lsn_lease_granted(result, with_lease)
lsn = result["lsn"]
# Call get_lsn_by_timestamp to get the LSN
# Launch a new read-only node at that LSN, and check that only the rows
@@ -105,8 +129,11 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Timestamp is in the unreachable past
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id_child, probe_timestamp)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id_child, probe_timestamp, with_lease=with_lease
)
assert result["kind"] == "past"
assert_lsn_lease_granted(result, with_lease)
# make sure that we return the minimum lsn here at the start of the range
assert Lsn(result["lsn"]) >= last_flush_lsn

View File

@@ -53,6 +53,25 @@ def test_proxy_select_1(static_proxy: NeonProxy):
assert out[0][0] == 42
def test_proxy_server_params(static_proxy: NeonProxy):
"""
Test that server params are passing through to postgres
"""
out = static_proxy.safe_psql(
"select to_json('0 seconds'::interval)", options="-c intervalstyle=iso_8601"
)
assert out[0][0] == "PT0S"
out = static_proxy.safe_psql(
"select to_json('0 seconds'::interval)", options="-c intervalstyle=sql_standard"
)
assert out[0][0] == "0"
out = static_proxy.safe_psql(
"select to_json('0 seconds'::interval)", options="-c intervalstyle=postgres"
)
assert out[0][0] == "00:00:00"
def test_password_hack(static_proxy: NeonProxy):
"""
Check the PasswordHack auth flow: an alternative to SCRAM auth for

View File

@@ -190,19 +190,20 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
"""
Test that after a split, we clean up parent layer data in the child shards via compaction.
"""
TENANT_CONF = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
"checkpoint_distance": 128 * 1024,
"compaction_threshold": 1,
"compaction_target_size": 128 * 1024,
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "3600s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"image_creation_threshold": 1,
"image_layer_creation_check_threshold": 0,
}
neon_env_builder.storage_controller_config = {
@@ -261,7 +262,9 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
env.pageserver.start()
# Cleanup part 2: once layers are outside the PITR window, they will be rewritten if they are partially redundant
env.storage_controller.pageserver_api().set_tenant_config(tenant_id, {"pitr_interval": "0s"})
updated_conf = TENANT_CONF.copy()
updated_conf["pitr_interval"] = "0s"
env.storage_controller.pageserver_api().set_tenant_config(tenant_id, updated_conf)
env.storage_controller.reconcile_until_idle()
for shard in shards:

View File

@@ -31,8 +31,12 @@ def error_tolerant_delete(ps_http, tenant_id):
if e.status_code == 500:
# This test uses failure injection, which can produce 500s as the pageserver expects
# the object store to always be available, and the ListObjects during deletion is generally
# an infallible operation
assert "simulated failure of remote operation" in e.message
# an infallible operation. This can show up as a clear simulated error, or as a general
# error during delete_objects()
assert (
"simulated failure of remote operation" in e.message
or "failed to delete" in e.message
)
else:
raise
else: