Compare commits

..

17 Commits

Author SHA1 Message Date
Bojan Serafimov
aa9baddd3d Fix unrelated test 2023-06-09 10:27:54 -04:00
Bojan Serafimov
4756dcd0cc fmt 2023-06-09 09:57:06 -04:00
Bojan Serafimov
ab23e28768 revert test changes 2023-06-09 09:50:05 -04:00
Bojan Serafimov
341563261a Store compute spec id inside basebackup 2023-06-09 00:39:06 -04:00
Bojan Serafimov
b836013721 Cleanup pg_stat_statements 2023-06-08 22:52:52 -04:00
Bojan Serafimov
44ad006eb3 Merge branch 'main' into startup-no-config 2023-06-08 18:12:15 -04:00
bojanserafimov
6bac770811 Add cold start test (#4436) 2023-06-08 18:11:33 -04:00
Bojan Serafimov
aff94b54c8 more roles, dbs 2023-06-08 12:47:59 -04:00
Bojan Serafimov
1adb38bb82 Merge branch 'new-startup-test' into startup-no-config 2023-06-08 12:41:14 -04:00
Bojan Serafimov
1baecdc27a comments 2023-06-08 12:33:19 -04:00
Bojan Serafimov
eceda63379 Do two iterations 2023-06-08 12:30:47 -04:00
Bojan Serafimov
881bfc4da8 WIP 2023-06-08 10:21:07 -04:00
Stas Kelvich
c82d19d8d6 Fix NULLs handling in proxy json endpoint
There were few problems with null handling:

* query_raw_txt() accepted vector of string so it always (erroneously)
treated "null" as a string instead of null. Change rust pg client
to accept the vector of Option<String> instead of just Strings. Adopt
coding here to pass nulls as None.

* pg_text_to_json() had a check that always interpreted "NULL" string
as null. That is wrong and nulls were already handled by match None.
This bug appeared as a bad attempt to parse arrays containing NULL
elements. Fix coding by checking presence of quotes while parsing an
array (no quotes -> null, quoted -> "null" string).

Array parser fix also slightly changes behavior by always cleaning
current entry when pushing to the resulting vector. This seems to be
an omission by previous coding, however looks like it was harmless
as entry was not cleared only at the end of the nested or to-level
array.
2023-06-08 16:00:18 +03:00
Stas Kelvich
d73639646e Add more output options to proxy json endpoint
With this commit client can pass following optional headers:

`Neon-Raw-Text-Output: true`. Return postgres values as text, without parsing them. So numbers, objects, booleans, nulls and arrays will be returned as text. That can be useful in cases when client code wants to implement it's own parsing or reuse parsing libraries from e.g. node-postgres.

`Neon-Array-Mode: true`. Return postgres rows as arrays instead of objects. That is more compact representation and also helps in some edge
cases where it is hard to use rows represented as objects (e.g. when several fields have the same name).
2023-06-08 16:00:18 +03:00
Dmitry Rodionov
d53f9ab3eb delete timelines from s3 (#4384)
Delete data from s3 when timeline deletion is requested

## Summary of changes

UploadQueue is altered to support scheduling of delete operations in
stopped state. This looks weird, and I'm thinking whether there are
better options/refactorings for upload client to make it look better.

Probably can be part of https://github.com/neondatabase/neon/issues/4378

Deletion is implemented directly in existing endpoint because changes are not
that significant. If we want more safety we can separate those or create
feature flag for new behavior.

resolves [#4193](https://github.com/neondatabase/neon/issues/4193)

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-06-08 15:01:22 +03:00
Dmitry Rodionov
8560a98d68 fix openapi spec to pass swagger editor validation (#4445)
There shouldnt be a dash before `type: object`. Also added description.
2023-06-08 13:25:30 +03:00
Bojan Serafimov
eda4f86588 Add startup test 2023-06-06 16:45:16 -04:00
31 changed files with 1302 additions and 434 deletions

View File

@@ -264,7 +264,7 @@ jobs:
export REMOTE_STORAGE_S3_BUCKET=neon-github-public-dev
export REMOTE_STORAGE_S3_REGION=eu-central-1
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
${cov_prefix} cargo test $CARGO_FLAGS --package remote_storage --test pagination_tests -- s3_pagination_should_work --exact
${cov_prefix} cargo test $CARGO_FLAGS --package remote_storage --test test_real_s3
- name: Install rust binaries
run: |

10
Cargo.lock generated
View File

@@ -2770,7 +2770,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
dependencies = [
"bytes",
"fallible-iterator",
@@ -2783,7 +2783,7 @@ dependencies = [
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
dependencies = [
"native-tls",
"tokio",
@@ -2794,7 +2794,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -2812,7 +2812,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4272,7 +4272,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c"
dependencies = [
"async-trait",
"byteorder",

View File

@@ -140,11 +140,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
## Other git libraries
@@ -180,7 +180,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" }
# Changes the MAX_THREADS limit from 4096 to 32768.
# This is a temporary workaround for using tracing from many threads in safekeepers code,

View File

@@ -442,8 +442,42 @@ impl ComputeNode {
let pg = self.start_postgres(spec.storage_auth_token.clone())?;
// Maybe apply the spec
if spec.spec.mode == ComputeMode::Primary {
self.apply_config(&compute_state)?;
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
// Get spec_id or make it up by hashing
//
// TODO Make spec_id required so there would be no need to hash.
let spec_id = spec.operation_uuid.clone().unwrap_or_else(|| {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
// HACK Exclude postgresql.conf because it doesn't need
// to be applied like the other fields in the spec
let mut spec_no_conf = spec.clone();
spec_no_conf.cluster.postgresql_conf = None;
let json = serde_json::to_vec(&spec_no_conf).unwrap();
let mut hasher = DefaultHasher::new();
json.hash(&mut hasher);
let hash = hasher.finish();
format!("{:x}", hash)
});
// Get current spec_id
let path = Path::new(&self.pgdata).join("neon_compute_spec_id.txt");
let current_spec_id = std::fs::read_to_string(path).ok();
// Respec if needed
if current_spec_id == Some(spec_id.clone()) {
info!("no need to respec");
} else {
info!("respeccing {:?} {:?}", current_spec_id, &spec_id);
self.apply_config(&compute_state)?;
self.cache_spec_id(&compute_state, spec_id)?;
}
}
let startup_end_time = Utc::now();
@@ -465,6 +499,28 @@ impl ComputeNode {
Ok(pg)
}
fn cache_spec_id(&self, compute_state: &ComputeState, spec_id: String) -> anyhow::Result<()> {
let spec = &compute_state.pspec.as_ref().expect("spec must be set");
let cmd = format!(
"set_compute_spec_id {} {} {}",
spec.tenant_id, spec.timeline_id, spec_id,
);
let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
if let Some(storage_auth_token) = &spec.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token);
} else {
info!("Storage auth token not set");
}
let mut client = config.connect(NoTls)?;
client.simple_query(&cmd)?;
Ok(())
}
// Look for core dumps and collect backtraces.
//
// EKS worker nodes have following core dump settings:

View File

@@ -152,7 +152,7 @@ pub enum ActivatingFrom {
}
/// A state of a timeline in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TimelineState {
/// The timeline is recognized by the pageserver but is not yet operational.
/// In particular, the walreceiver connection loop is not running for this timeline.
@@ -165,7 +165,7 @@ pub enum TimelineState {
/// It cannot transition back into any other state.
Stopping,
/// The timeline is broken and not operational (previous states: Loading or Active).
Broken,
Broken { reason: String, backtrace: String },
}
#[serde_as]

View File

@@ -17,7 +17,7 @@ use tokio::{
io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
};
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
use crate::{Download, DownloadError, RemotePath};
@@ -101,19 +101,35 @@ impl RemoteStorage for LocalFs {
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
None => Cow::Borrowed(&self.storage_root),
};
Ok(get_all_files(path.as_ref(), false)
let prefixes_to_filter = get_all_files(path.as_ref(), false)
.await
.map_err(DownloadError::Other)?
.into_iter()
.map(|path| {
path.strip_prefix(&self.storage_root)
.context("Failed to strip preifix")
.map_err(DownloadError::Other)?;
let mut prefixes = Vec::with_capacity(prefixes_to_filter.len());
// filter out empty directories to mirror s3 behavior.
for prefix in prefixes_to_filter {
if prefix.is_dir()
&& is_directory_empty(&prefix)
.await
.map_err(DownloadError::Other)?
{
continue;
}
prefixes.push(
prefix
.strip_prefix(&self.storage_root)
.context("Failed to strip prefix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
)
})
.collect())
),
)
}
Ok(prefixes)
}
async fn upload(
@@ -291,11 +307,18 @@ impl RemoteStorage for LocalFs {
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
let file_path = path.with_base(&self.storage_root);
if file_path.exists() && file_path.is_file() {
Ok(fs::remove_file(file_path).await?)
} else {
bail!("File {file_path:?} either does not exist or is not a file")
if !file_path.exists() {
// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
// > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
return Ok(());
}
if !file_path.is_file() {
anyhow::bail!("{file_path:?} is not a file");
}
Ok(fs::remove_file(file_path)
.await
.map_err(|e| anyhow::anyhow!(e))?)
}
}
@@ -320,7 +343,7 @@ where
let file_type = dir_entry.file_type().await?;
let entry_path = dir_entry.path();
if file_type.is_symlink() {
debug!("{entry_path:?} us a symlink, skipping")
debug!("{entry_path:?} is a symlink, skipping")
} else if file_type.is_dir() {
if recursive {
paths.extend(get_all_files(&entry_path, true).await?.into_iter())
@@ -595,15 +618,11 @@ mod fs_tests {
storage.delete(&upload_target).await?;
assert!(storage.list().await?.is_empty());
match storage.delete(&upload_target).await {
Ok(()) => panic!("Should not allow deleting non-existing storage files"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("does not exist"));
let expected_path = upload_target.with_base(&storage.storage_root);
assert!(error_string.contains(expected_path.to_str().unwrap()));
}
}
storage
.delete(&upload_target)
.await
.expect("Should allow deleting non-existing storage files");
Ok(())
}

View File

@@ -7,6 +7,7 @@ use std::sync::Arc;
use std::time::UNIX_EPOCH;
use anyhow::Context;
use once_cell::sync::OnceCell;
use remote_storage::{
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
};
@@ -14,8 +15,12 @@ use test_context::{test_context, AsyncTestContext};
use tokio::task::JoinSet;
use tracing::{debug, error, info};
static LOGGING_DONE: OnceCell<()> = OnceCell::new();
const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
const BASE_PREFIX: &str = "test/";
/// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
/// See the client creation in [`create_s3_client`] for details on the required env vars.
@@ -38,20 +43,20 @@ const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_
///
/// Lastly, the test attempts to clean up and remove all uploaded S3 files.
/// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
#[test_context(MaybeEnabledS3)]
#[test_context(MaybeEnabledS3WithTestBlobs)]
#[tokio::test]
async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> anyhow::Result<()> {
let ctx = match ctx {
MaybeEnabledS3::Enabled(ctx) => ctx,
MaybeEnabledS3::Disabled => return Ok(()),
MaybeEnabledS3::UploadsFailed(e, _) => anyhow::bail!("S3 init failed: {e:?}"),
MaybeEnabledS3WithTestBlobs::Enabled(ctx) => ctx,
MaybeEnabledS3WithTestBlobs::Disabled => return Ok(()),
MaybeEnabledS3WithTestBlobs::UploadsFailed(e, _) => anyhow::bail!("S3 init failed: {e:?}"),
};
let test_client = Arc::clone(&ctx.client_with_excessive_pagination);
let test_client = Arc::clone(&ctx.enabled.client);
let expected_remote_prefixes = ctx.remote_prefixes.clone();
let base_prefix =
RemotePath::new(Path::new(ctx.base_prefix_str)).context("common_prefix construction")?;
let base_prefix = RemotePath::new(Path::new(ctx.enabled.base_prefix))
.context("common_prefix construction")?;
let root_remote_prefixes = test_client
.list_prefixes(None)
.await
@@ -83,27 +88,91 @@ async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3) -> anyhow::Result<(
Ok(())
}
#[test_context(MaybeEnabledS3)]
#[tokio::test]
async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
let ctx = match ctx {
MaybeEnabledS3::Enabled(ctx) => ctx,
MaybeEnabledS3::Disabled => return Ok(()),
};
let path = RemotePath::new(&PathBuf::from(format!(
"{}/for_sure_there_is_nothing_there_really",
ctx.base_prefix,
)))
.with_context(|| "RemotePath conversion")?;
ctx.client.delete(&path).await.expect("should succeed");
Ok(())
}
fn ensure_logging_ready() {
LOGGING_DONE.get_or_init(|| {
utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
)
.expect("logging init failed");
});
}
struct EnabledS3 {
client: Arc<GenericRemoteStorage>,
base_prefix: &'static str,
}
impl EnabledS3 {
async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
let client = create_s3_client(max_keys_in_list_response)
.context("S3 client creation")
.expect("S3 client creation failed");
EnabledS3 {
client,
base_prefix: BASE_PREFIX,
}
}
}
enum MaybeEnabledS3 {
Enabled(EnabledS3),
Disabled,
}
#[async_trait::async_trait]
impl AsyncTestContext for MaybeEnabledS3 {
async fn setup() -> Self {
ensure_logging_ready();
if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
info!(
"`{}` env variable is not set, skipping the test",
ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
);
return Self::Disabled;
}
Self::Enabled(EnabledS3::setup(None).await)
}
}
enum MaybeEnabledS3WithTestBlobs {
Enabled(S3WithTestBlobs),
Disabled,
UploadsFailed(anyhow::Error, S3WithTestBlobs),
}
struct S3WithTestBlobs {
client_with_excessive_pagination: Arc<GenericRemoteStorage>,
base_prefix_str: &'static str,
enabled: EnabledS3,
remote_prefixes: HashSet<RemotePath>,
remote_blobs: HashSet<RemotePath>,
}
#[async_trait::async_trait]
impl AsyncTestContext for MaybeEnabledS3 {
impl AsyncTestContext for MaybeEnabledS3WithTestBlobs {
async fn setup() -> Self {
utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
)
.expect("logging init failed");
ensure_logging_ready();
if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
info!(
"`{}` env variable is not set, skipping the test",
@@ -115,23 +184,14 @@ impl AsyncTestContext for MaybeEnabledS3 {
let max_keys_in_list_response = 10;
let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
let client_with_excessive_pagination = create_s3_client(max_keys_in_list_response)
.context("S3 client creation")
.expect("S3 client creation failed");
let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
let base_prefix_str = "test/";
match upload_s3_data(
&client_with_excessive_pagination,
base_prefix_str,
upload_tasks_count,
)
.await
{
match upload_s3_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
ControlFlow::Continue(uploads) => {
info!("Remote objects created successfully");
Self::Enabled(S3WithTestBlobs {
client_with_excessive_pagination,
base_prefix_str,
enabled,
remote_prefixes: uploads.prefixes,
remote_blobs: uploads.blobs,
})
@@ -139,8 +199,7 @@ impl AsyncTestContext for MaybeEnabledS3 {
ControlFlow::Break(uploads) => Self::UploadsFailed(
anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
S3WithTestBlobs {
client_with_excessive_pagination,
base_prefix_str,
enabled,
remote_prefixes: uploads.prefixes,
remote_blobs: uploads.blobs,
},
@@ -152,13 +211,15 @@ impl AsyncTestContext for MaybeEnabledS3 {
match self {
Self::Disabled => {}
Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
cleanup(&ctx.client_with_excessive_pagination, ctx.remote_blobs).await;
cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
}
}
}
}
fn create_s3_client(max_keys_per_list_response: i32) -> anyhow::Result<Arc<GenericRemoteStorage>> {
fn create_s3_client(
max_keys_per_list_response: Option<i32>,
) -> anyhow::Result<Arc<GenericRemoteStorage>> {
let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
.context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
@@ -176,7 +237,7 @@ fn create_s3_client(max_keys_per_list_response: i32) -> anyhow::Result<Arc<Gener
prefix_in_bucket: Some(format!("pagination_should_work_test_{random_prefix_part}/")),
endpoint: None,
concurrency_limit: NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response: Some(max_keys_per_list_response),
max_keys_per_list_response,
}),
};
Ok(Arc::new(

View File

@@ -1,6 +1,8 @@
/// Extensions to `std::fs` types.
use std::{fs, io, path::Path};
use anyhow::Context;
pub trait PathExt {
/// Returns an error if `self` is not a directory.
fn is_empty_dir(&self) -> io::Result<bool>;
@@ -15,10 +17,19 @@ where
}
}
pub async fn is_directory_empty(path: impl AsRef<Path>) -> anyhow::Result<bool> {
let mut dir = tokio::fs::read_dir(&path)
.await
.context(format!("read_dir({})", path.as_ref().display()))?;
Ok(dir.next_entry().await?.is_none())
}
#[cfg(test)]
mod test {
use std::path::PathBuf;
use crate::fs_ext::is_directory_empty;
#[test]
fn is_empty_dir() {
use super::PathExt;
@@ -42,4 +53,26 @@ mod test {
std::fs::remove_file(&file_path).unwrap();
assert!(file_path.is_empty_dir().is_err());
}
#[tokio::test]
async fn is_empty_dir_async() {
let dir = tempfile::tempdir().unwrap();
let dir_path = dir.path();
// test positive case
assert!(
is_directory_empty(dir_path).await.expect("test failure"),
"new tempdir should be empty"
);
// invoke on a file to ensure it returns an error
let file_path: PathBuf = dir_path.join("testfile");
let f = std::fs::File::create(&file_path).unwrap();
drop(f);
assert!(is_directory_empty(&file_path).await.is_err());
// do it again on a path, we know to be nonexistent
std::fs::remove_file(&file_path).unwrap();
assert!(is_directory_empty(file_path).await.is_err());
}
}

View File

@@ -21,7 +21,7 @@ pub enum ApiError {
Conflict(String),
#[error("Precondition failed: {0}")]
PreconditionFailed(&'static str),
PreconditionFailed(Box<str>),
#[error(transparent)]
InternalServerError(anyhow::Error),

View File

@@ -417,6 +417,16 @@ where
// Also send zenith.signal file with extra bootstrap data.
//
async fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
// Add neon_compute_spec_id.txt
if let Some(spec_id) = &self.timeline.compute_spec_id.lock().await.clone() {
self.ar
.append(
&new_tar_header("neon_compute_spec_id.txt", spec_id.len() as u64)?,
spec_id.as_bytes(),
)
.await?;
}
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {

View File

@@ -215,7 +215,7 @@ paths:
schema:
$ref: "#/components/schemas/NotFoundError"
"412":
description: Tenant is missing
description: Tenant is missing, or timeline has children
content:
application/json:
schema:
@@ -386,6 +386,7 @@ paths:
"202":
description: Tenant attaching scheduled
"400":
description: Bad Request
content:
application/json:
schema:
@@ -945,7 +946,7 @@ components:
type: string
enum: [ "maybe", "attached", "failed" ]
data:
- type: object
type: object
properties:
reason:
type: string

View File

@@ -183,9 +183,10 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
use crate::tenant::DeleteTimelineError::*;
match value {
NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found")),
HasChildren => ApiError::BadRequest(anyhow::anyhow!(
"Cannot delete timeline which has child timelines"
)),
HasChildren(children) => ApiError::PreconditionFailed(
format!("Cannot delete timeline which has child timelines: {children:?}")
.into_boxed_str(),
),
Other(e) => ApiError::InternalServerError(e),
}
}
@@ -197,9 +198,9 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
match value {
// Report Precondition failed so client can distinguish between
// "tenant is missing" case from "timeline is missing"
Tenant(GetTenantError::NotFound(..)) => {
ApiError::PreconditionFailed("Requested tenant is missing")
}
Tenant(GetTenantError::NotFound(..)) => ApiError::PreconditionFailed(
"Requested tenant is missing".to_owned().into_boxed_str(),
),
Tenant(t) => ApiError::from(t),
Timeline(t) => ApiError::from(t),
}
@@ -494,7 +495,8 @@ async fn timeline_delete_handler(
.instrument(info_span!("timeline_delete", tenant = %tenant_id, timeline = %timeline_id))
.await?;
json_response(StatusCode::OK, ())
// FIXME: needs to be an error for console to retry it. Ideally Accepted should be used and retried until 404.
json_response(StatusCode::ACCEPTED, ())
}
async fn tenant_detach_handler(

View File

@@ -915,6 +915,27 @@ where
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false, ctx)
.await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("set_compute_spec_id ") {
let (_, params_raw) = query_string.split_at("set_compute_spec_id ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
if params.len() != 3 {
return Err(QueryError::Other(anyhow::anyhow!(
"invalid param number for set_compute_spec_id command"
)));
}
let tenant_id = TenantId::from_str(params[0])
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
let spec_id = params[2].to_string();
self.check_permission(Some(tenant_id))?;
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
*timeline.compute_spec_id.lock().await = Some(spec_id);
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
// return pair of prev_lsn and last_lsn
else if query_string.starts_with("get_last_record_rlsn ") {

View File

@@ -257,6 +257,9 @@ pub enum TaskKind {
// task that handles attaching a tenant
Attach,
// Used mostly for background deletion from s3
TimelineDeletionWorker,
// task that handhes metrics collection
MetricsCollection,

View File

@@ -18,6 +18,7 @@ use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio::sync::OwnedMutexGuard;
use tokio::task::JoinSet;
use tracing::*;
use utils::completion;
@@ -444,7 +445,7 @@ pub enum DeleteTimelineError {
#[error("NotFound")]
NotFound,
#[error("HasChildren")]
HasChildren,
HasChildren(Vec<TimelineId>),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
@@ -568,7 +569,7 @@ impl Tenant {
.with_context(|| {
format!("creating broken timeline data for {tenant_id}/{timeline_id}")
})?;
broken_timeline.set_state(TimelineState::Broken);
broken_timeline.set_broken(e.to_string());
timelines_accessor.insert(timeline_id, broken_timeline);
return Err(e);
}
@@ -763,7 +764,7 @@ impl Tenant {
);
remote_index_and_client.insert(timeline_id, (index_part, client));
}
MaybeDeletedIndexPart::Deleted => {
MaybeDeletedIndexPart::Deleted(_) => {
info!("timeline {} is deleted, skipping", timeline_id);
continue;
}
@@ -1113,9 +1114,9 @@ impl Tenant {
/// Subroutine of `load_tenant`, to load an individual timeline
///
/// NB: The parent is assumed to be already loaded!
#[instrument(skip_all, fields(timeline_id))]
#[instrument(skip(self, local_metadata, init_order, ctx))]
async fn load_local_timeline(
&self,
self: &Arc<Self>,
timeline_id: TimelineId,
local_metadata: TimelineMetadata,
init_order: Option<&InitializationOrder>,
@@ -1132,12 +1133,20 @@ impl Tenant {
)
});
let remote_startup_data = match &remote_client {
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
.with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?;
Some(ancestor_timeline)
} else {
None
};
let (remote_startup_data, remote_client) = match remote_client {
Some(remote_client) => match remote_client.download_index_file().await {
Ok(index_part) => {
let index_part = match index_part {
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
MaybeDeletedIndexPart::Deleted => {
MaybeDeletedIndexPart::Deleted(index_part) => {
// TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation.
// Example:
// start deletion operation
@@ -1148,37 +1157,59 @@ impl Tenant {
//
// We don't really anticipate remote storage to be de-configured, so, for now, this is fine.
// Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099.
info!("is_deleted is set on remote, resuming removal of local data originally done by timeline deletion handler");
std::fs::remove_dir_all(
self.conf.timeline_path(&timeline_id, &self.tenant_id),
)
.context("remove_dir_all")?;
info!("is_deleted is set on remote, resuming removal of timeline data originally done by timeline deletion handler");
remote_client
.init_upload_queue_stopped_to_continue_deletion(&index_part)?;
let timeline = self
.create_timeline_data(
timeline_id,
&local_metadata,
ancestor,
Some(remote_client),
init_order,
)
.context("create_timeline_data")?;
let guard = Arc::clone(&timeline.delete_lock).lock_owned().await;
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
// RemoteTimelineClient is the only functioning part.
timeline.set_state(TimelineState::Stopping);
// We meed to do this because when console retries delete request we shouldnt answer with 404
// because 404 means successful deletion.
// FIXME consider TimelineState::Deleting.
let mut locked = self.timelines.lock().unwrap();
locked.insert(timeline_id, Arc::clone(&timeline));
Tenant::schedule_delete_timeline(
Arc::clone(self),
timeline_id,
timeline,
guard,
);
return Ok(());
}
};
let remote_metadata = index_part.parse_metadata().context("parse_metadata")?;
Some(RemoteStartupData {
index_part,
remote_metadata,
})
(
Some(RemoteStartupData {
index_part,
remote_metadata,
}),
Some(remote_client),
)
}
Err(DownloadError::NotFound) => {
info!("no index file was found on the remote");
None
(None, Some(remote_client))
}
Err(e) => return Err(anyhow::anyhow!(e)),
},
None => None,
};
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
.with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?;
Some(ancestor_timeline)
} else {
None
None => (None, remote_client),
};
self.timeline_init_and_sync(
@@ -1511,13 +1542,118 @@ impl Tenant {
}
/// Shuts down a timeline's tasks, removes its in-memory structures, and deletes its
/// data from disk.
///
/// This doesn't currently delete all data from S3, but sets a flag in its
/// index_part.json file to mark it as deleted.
pub async fn delete_timeline(
/// data from both disk and s3.
async fn delete_timeline(
&self,
timeline_id: TimelineId,
timeline: Arc<Timeline>,
) -> anyhow::Result<()> {
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//
// This lock prevents prevents GC or compaction from running at the same time.
// The GC task doesn't register itself with the timeline it's operating on,
// so it might still be running even though we called `shutdown_tasks`.
//
// Note that there are still other race conditions between
// GC, compaction and timeline deletion. See
// https://github.com/neondatabase/neon/issues/2671
//
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by the caller.
let local_timeline_directory = self
.conf
.timeline_path(&timeline.timeline_id, &self.tenant_id);
fail::fail_point!("timeline-delete-before-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
});
// NB: This need not be atomic because the deleted flag in the IndexPart
// will be observed during tenant/timeline load. The deletion will be resumed there.
//
// For configurations without remote storage, we tolerate that we're not crash-safe here.
// The timeline may come up Active but with missing layer files, in such setups.
// See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720
match std::fs::remove_dir_all(&local_timeline_directory) {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// This can happen if we're called a second time, e.g.,
// because of a previous failure/cancellation at/after
// failpoint timeline-delete-after-rm.
//
// It can also happen if we race with tenant detach, because,
// it doesn't grab the layer_removal_cs lock.
//
// For now, log and continue.
// warn! level is technically not appropriate for the
// first case because we should expect retries to happen.
// But the error is so rare, it seems better to get attention if it happens.
let tenant_state = self.current_state();
warn!(
timeline_dir=?local_timeline_directory,
?tenant_state,
"timeline directory not found, proceeding anyway"
);
// continue with the rest of the deletion
}
res => res.with_context(|| {
format!(
"Failed to remove local timeline directory '{}'",
local_timeline_directory.display()
)
})?,
}
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
drop(layer_removal_guard);
}
fail::fail_point!("timeline-delete-after-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
});
{
// Remove the timeline from the map.
let mut timelines = self.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
timelines.remove(&timeline_id).expect(
"timeline that we were deleting was concurrently removed from 'timelines' map",
);
drop(timelines);
}
let remote_client = match &timeline.remote_client {
Some(remote_client) => remote_client,
None => return Ok(()),
};
remote_client.delete_all().await?;
Ok(())
}
/// Removes timeline-related in-memory data and schedules removal from remote storage.
#[instrument(skip(self, _ctx))]
pub async fn prepare_and_schedule_delete_timeline(
self: Arc<Self>,
timeline_id: TimelineId,
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
timeline::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -1527,18 +1663,25 @@ impl Tenant {
//
// Also grab the Timeline's delete_lock to prevent another deletion from starting.
let timeline;
let mut delete_lock_guard;
let delete_lock_guard;
{
let mut timelines = self.timelines.lock().unwrap();
// Ensure that there are no child timelines **attached to that pageserver**,
// because detach removes files, which will break child branches
let children_exist = timelines
let children: Vec<TimelineId> = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
.filter_map(|(id, entry)| {
if entry.get_ancestor_timeline_id() == Some(timeline_id) {
Some(*id)
} else {
None
}
})
.collect();
if children_exist {
return Err(DeleteTimelineError::HasChildren);
if !children.is_empty() {
return Err(DeleteTimelineError::HasChildren(children));
}
let timeline_entry = match timelines.entry(timeline_id) {
@@ -1553,11 +1696,15 @@ impl Tenant {
// XXX: We should perhaps return an HTTP "202 Accepted" to signal that the caller
// needs to poll until the operation has finished. But for now, we return an
// error, because the control plane knows to retry errors.
delete_lock_guard = timeline.delete_lock.try_lock().map_err(|_| {
DeleteTimelineError::Other(anyhow::anyhow!(
"timeline deletion is already in progress"
))
})?;
delete_lock_guard =
Arc::clone(&timeline.delete_lock)
.try_lock_owned()
.map_err(|_| {
DeleteTimelineError::Other(anyhow::anyhow!(
"timeline deletion is already in progress"
))
})?;
// If another task finished the deletion just before we acquired the lock,
// return success.
@@ -1626,102 +1773,43 @@ impl Tenant {
}
}
}
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//
// This lock prevents prevents GC or compaction from running at the same time.
// The GC task doesn't register itself with the timeline it's operating on,
// so it might still be running even though we called `shutdown_tasks`.
//
// Note that there are still other race conditions between
// GC, compaction and timeline deletion. See
// https://github.com/neondatabase/neon/issues/2671
//
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by the caller.
let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id);
fail::fail_point!("timeline-delete-before-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
});
// NB: This need not be atomic because the deleted flag in the IndexPart
// will be observed during tenant/timeline load. The deletion will be resumed there.
//
// For configurations without remote storage, we tolerate that we're not crash-safe here.
// The timeline may come up Active but with missing layer files, in such setups.
// See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720
match std::fs::remove_dir_all(&local_timeline_directory) {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// This can happen if we're called a second time, e.g.,
// because of a previous failure/cancellation at/after
// failpoint timeline-delete-after-rm.
//
// It can also happen if we race with tenant detach, because,
// it doesn't grab the layer_removal_cs lock.
//
// For now, log and continue.
// warn! level is technically not appropriate for the
// first case because we should expect retries to happen.
// But the error is so rare, it seems better to get attention if it happens.
let tenant_state = self.current_state();
warn!(
timeline_dir=?local_timeline_directory,
?tenant_state,
"timeline directory not found, proceeding anyway"
);
// continue with the rest of the deletion
}
res => res.with_context(|| {
format!(
"Failed to remove local timeline directory '{}'",
local_timeline_directory.display()
)
})?,
}
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
drop(layer_removal_guard);
}
fail::fail_point!("timeline-delete-after-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
});
// Remove the timeline from the map.
{
let mut timelines = self.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
timelines.remove(&timeline_id).expect(
"timeline that we were deleting was concurrently removed from 'timelines' map",
);
}
// All done! Mark the deletion as completed and release the delete_lock
*delete_lock_guard = true;
drop(delete_lock_guard);
self.schedule_delete_timeline(timeline_id, timeline, delete_lock_guard);
Ok(())
}
fn schedule_delete_timeline(
self: Arc<Self>,
timeline_id: TimelineId,
timeline: Arc<Timeline>,
_guard: OwnedMutexGuard<bool>,
) {
let tenant_id = self.tenant_id;
let timeline_clone = Arc::clone(&timeline);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
Some(self.tenant_id),
Some(timeline_id),
"timeline_delete",
false,
async move {
if let Err(err) = self.delete_timeline(timeline_id, timeline).await {
error!("Error: {err:#}");
timeline_clone.set_broken(err.to_string())
};
Ok(())
}
.instrument({
let span =
tracing::info_span!(parent: None, "delete_timeline", tenant_id=%tenant_id, timeline_id=%timeline_id);
span.follows_from(Span::current());
span
}),
);
}
pub fn current_state(&self) -> TenantState {
self.state.borrow().clone()
}
@@ -1764,9 +1852,9 @@ impl Tenant {
if activating {
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
let timelines_to_activate = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
@@ -1774,7 +1862,7 @@ impl Tenant {
let mut activated_timelines = 0;
for timeline in not_broken_timelines {
for timeline in timelines_to_activate {
timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
activated_timelines += 1;
}
@@ -1925,7 +2013,7 @@ impl Tenant {
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
.filter(|timeline| !timeline.is_broken());
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Stopping);
}
@@ -3758,7 +3846,7 @@ mod tests {
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
tline.set_state(TimelineState::Broken);
tline.set_broken("test".to_owned());
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)

View File

@@ -396,7 +396,9 @@ pub async fn delete_timeline(
ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
let tenant = get_tenant(tenant_id, true).await?;
tenant.delete_timeline(timeline_id, ctx).await?;
tenant
.prepare_and_schedule_delete_timeline(timeline_id, ctx)
.await?;
Ok(())
}

View File

@@ -210,13 +210,15 @@ use chrono::{NaiveDateTime, Utc};
pub use download::{is_temp_download_file, list_remote_timelines};
use scopeguard::ScopeGuard;
use std::collections::{HashMap, VecDeque};
use std::path::Path;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use remote_storage::{DownloadError, GenericRemoteStorage};
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
use std::ops::DerefMut;
use tokio::runtime::Runtime;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, instrument, warn};
use tracing::{info_span, Instrument};
use utils::lsn::Lsn;
@@ -225,7 +227,9 @@ use crate::metrics::{
RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
};
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::upload_queue::Delete;
use crate::{
config::PageServerConf,
task_mgr,
@@ -259,7 +263,7 @@ const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
pub enum MaybeDeletedIndexPart {
IndexPart(IndexPart),
Deleted,
Deleted(IndexPart),
}
/// Errors that can arise when calling [`RemoteTimelineClient::stop`].
@@ -361,11 +365,42 @@ impl RemoteTimelineClient {
Ok(())
}
/// Initialize the queue in stopped state. Used in startup path
/// to continue deletion operation interrupted by pageserver crash or restart.
pub fn init_upload_queue_stopped_to_continue_deletion(
&self,
index_part: &IndexPart,
) -> anyhow::Result<()> {
// FIXME: consider newtype for DeletedIndexPart.
let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
"bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
))?;
{
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?;
self.update_remote_physical_size_gauge(Some(index_part));
}
// also locks upload queue, without dropping the guard above it will be a deadlock
self.stop().expect("initialized line above");
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue
.stopped_mut()
.expect("stopped above")
.deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
Ok(())
}
pub fn last_uploaded_consistent_lsn(&self) -> Option<Lsn> {
match &*self.upload_queue.lock().unwrap() {
UploadQueue::Uninitialized => None,
UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn),
UploadQueue::Stopped(q) => Some(q.last_uploaded_consistent_lsn),
UploadQueue::Stopped(q) => {
Some(q.upload_queue_for_deletion.last_uploaded_consistent_lsn)
}
}
}
@@ -420,7 +455,7 @@ impl RemoteTimelineClient {
.await?;
if index_part.deleted_at.is_some() {
Ok(MaybeDeletedIndexPart::Deleted)
Ok(MaybeDeletedIndexPart::Deleted(index_part))
} else {
Ok(MaybeDeletedIndexPart::IndexPart(index_part))
}
@@ -622,7 +657,11 @@ impl RemoteTimelineClient {
// schedule the actual deletions
for name in names {
let op = UploadOp::Delete(RemoteOpFileKind::Layer, name.clone());
let op = UploadOp::Delete(Delete {
file_kind: RemoteOpFileKind::Layer,
layer_file_name: name.clone(),
scheduled_from_timeline_delete: false,
});
self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op);
info!("scheduled layer file deletion {}", name.file_name());
@@ -639,18 +678,11 @@ impl RemoteTimelineClient {
/// Wait for all previously scheduled uploads/deletions to complete
///
pub async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
let (sender, mut receiver) = tokio::sync::watch::channel(());
let barrier_op = UploadOp::Barrier(sender);
{
let mut receiver = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue.queued_operations.push_back(barrier_op);
// Don't count this kind of operation!
// Launch the task immediately, if possible
self.launch_queued_tasks(upload_queue);
}
self.schedule_barrier(upload_queue)
};
if receiver.changed().await.is_err() {
anyhow::bail!("wait_completion aborted because upload queue was stopped");
@@ -658,6 +690,22 @@ impl RemoteTimelineClient {
Ok(())
}
fn schedule_barrier(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
) -> tokio::sync::watch::Receiver<()> {
let (sender, receiver) = tokio::sync::watch::channel(());
let barrier_op = UploadOp::Barrier(sender);
upload_queue.queued_operations.push_back(barrier_op);
// Don't count this kind of operation!
// Launch the task immediately, if possible
self.launch_queued_tasks(upload_queue);
receiver
}
/// Set the deleted_at field in the remote index file.
///
/// This fails if the upload queue has not been `stop()`ed.
@@ -665,6 +713,7 @@ impl RemoteTimelineClient {
/// The caller is responsible for calling `stop()` AND for waiting
/// for any ongoing upload tasks to finish after `stop()` has succeeded.
/// Check method [`RemoteTimelineClient::stop`] for details.
#[instrument(skip_all)]
pub(crate) async fn persist_index_part_with_deleted_flag(
self: &Arc<Self>,
) -> Result<(), PersistIndexPartWithDeletedFlagError> {
@@ -674,15 +723,7 @@ impl RemoteTimelineClient {
// We must be in stopped state because otherwise
// we can have inprogress index part upload that can overwrite the file
// with missing is_deleted flag that we going to set below
let stopped = match &mut *locked {
UploadQueue::Uninitialized => {
return Err(anyhow::anyhow!("is not Stopped but Uninitialized").into())
}
UploadQueue::Initialized(_) => {
return Err(anyhow::anyhow!("is not Stopped but Initialized").into())
}
UploadQueue::Stopped(stopped) => stopped,
};
let stopped = locked.stopped_mut()?;
match stopped.deleted_at {
SetDeletedFlagProgress::NotRunning => (), // proceed
@@ -696,27 +737,17 @@ impl RemoteTimelineClient {
let deleted_at = Utc::now().naive_utc();
stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
let mut index_part = IndexPart::new(
stopped.latest_files.clone(),
stopped.last_uploaded_consistent_lsn,
stopped
.latest_metadata
.to_bytes()
.context("serialize metadata")?,
);
let mut index_part = IndexPart::try_from(&stopped.upload_queue_for_deletion)
.context("IndexPart serialize")?;
index_part.deleted_at = Some(deleted_at);
index_part
};
let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
let mut locked = self_clone.upload_queue.lock().unwrap();
let stopped = match &mut *locked {
UploadQueue::Uninitialized | UploadQueue::Initialized(_) => unreachable!(
"there's no way out of Stopping, and we checked it's Stopping above: {:?}",
locked.as_str(),
),
UploadQueue::Stopped(stopped) => stopped,
};
let stopped = locked
.stopped_mut()
.expect("there's no way out of Stopping, and we checked it's Stopping above");
stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
});
@@ -751,13 +782,10 @@ impl RemoteTimelineClient {
ScopeGuard::into_inner(undo_deleted_at);
{
let mut locked = self.upload_queue.lock().unwrap();
let stopped = match &mut *locked {
UploadQueue::Uninitialized | UploadQueue::Initialized(_) => unreachable!(
"there's no way out of Stopping, and we checked it's Stopping above: {:?}",
locked.as_str(),
),
UploadQueue::Stopped(stopped) => stopped,
};
let stopped = locked
.stopped_mut()
.expect("there's no way out of Stopping, and we checked it's Stopping above");
stopped.deleted_at = SetDeletedFlagProgress::Successful(
index_part_with_deleted_at
.deleted_at
@@ -768,6 +796,92 @@ impl RemoteTimelineClient {
Ok(())
}
/// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
/// The function deletes layer files one by one, then lists the prefix to see if we leaked something
/// deletes leaked files if any and proceeds with deletion of index file at the end.
pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
let (mut receiver, deletions_queued) = {
let mut deletions_queued = 0;
let mut locked = self.upload_queue.lock().unwrap();
let stopped = locked.stopped_mut()?;
if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
anyhow::bail!("deleted_at is not set")
}
debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
stopped
.upload_queue_for_deletion
.queued_operations
.reserve(stopped.upload_queue_for_deletion.latest_files.len());
// schedule the actual deletions
for name in stopped.upload_queue_for_deletion.latest_files.keys() {
let op = UploadOp::Delete(Delete {
file_kind: RemoteOpFileKind::Layer,
layer_file_name: name.clone(),
scheduled_from_timeline_delete: true,
});
self.calls_unfinished_metric_begin(&op);
stopped
.upload_queue_for_deletion
.queued_operations
.push_back(op);
info!("scheduled layer file deletion {}", name.file_name());
deletions_queued += 1;
}
self.launch_queued_tasks(&mut stopped.upload_queue_for_deletion);
(
self.schedule_barrier(&mut stopped.upload_queue_for_deletion),
deletions_queued,
)
};
receiver.changed().await?;
// Do not delete index part yet, it is needed for possible retry. If we remove it first
// and retry will arrive to different pageserver there wont be any traces of it on remote storage
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
let timeline_storage_path = self.conf.remote_path(&timeline_path)?;
let remaining = self
.storage_impl
.list_prefixes(Some(&timeline_storage_path))
.await?;
let remaining: Vec<RemotePath> = remaining
.into_iter()
.filter(|p| p.object_name() != Some(IndexPart::FILE_NAME))
.collect();
if !remaining.is_empty() {
warn!(
"Found {} files not bound to index_file.json, proceeding with their deletion",
remaining.len()
);
for file in remaining {
warn!("Removing {}", file.object_name().unwrap_or_default());
self.storage_impl.delete(&file).await?;
}
}
let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME));
debug!("deleting index part");
self.storage_impl.delete(&index_file_path).await?;
info!(deletions_queued, "done deleting, including index_part.json");
Ok(())
}
///
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
@@ -786,7 +900,7 @@ impl RemoteTimelineClient {
// have finished.
upload_queue.inprogress_tasks.is_empty()
}
UploadOp::Delete(_, _) => {
UploadOp::Delete(_) => {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}
@@ -817,7 +931,7 @@ impl RemoteTimelineClient {
UploadOp::UploadMetadata(_, _) => {
upload_queue.num_inprogress_metadata_uploads += 1;
}
UploadOp::Delete(_, _) => {
UploadOp::Delete(_) => {
upload_queue.num_inprogress_deletions += 1;
}
UploadOp::Barrier(sender) => {
@@ -891,7 +1005,6 @@ impl RemoteTimelineClient {
unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
}
}
self.calls_unfinished_metric_end(&task.op);
return;
}
@@ -937,16 +1050,16 @@ impl RemoteTimelineClient {
}
res
}
UploadOp::Delete(metric_file_kind, ref layer_file_name) => {
UploadOp::Delete(delete) => {
let path = &self
.conf
.timeline_path(&self.timeline_id, &self.tenant_id)
.join(layer_file_name.file_name());
.join(delete.layer_file_name.file_name());
delete::delete_layer(self.conf, &self.storage_impl, path)
.measure_remote_op(
self.tenant_id,
self.timeline_id,
*metric_file_kind,
delete.file_kind,
RemoteOpKind::Delete,
Arc::clone(&self.metrics),
)
@@ -1012,11 +1125,24 @@ impl RemoteTimelineClient {
let mut upload_queue_guard = self.upload_queue.lock().unwrap();
let upload_queue = match upload_queue_guard.deref_mut() {
UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
UploadQueue::Stopped(_) => {
UploadQueue::Stopped(stopped) => {
// Special care is needed for deletions, if it was an earlier deletion (not scheduled from deletion)
// then stop() took care of it so we just return.
// For deletions that come from delete_all we still want to maintain metrics, launch following tasks, etc.
match &task.op {
UploadOp::Delete(delete) if delete.scheduled_from_timeline_delete => Some(&mut stopped.upload_queue_for_deletion),
_ => None
}
},
UploadQueue::Initialized(qi) => { Some(qi) }
};
let upload_queue = match upload_queue {
Some(upload_queue) => upload_queue,
None => {
info!("another concurrent task already stopped the queue");
return;
}, // nothing to do
UploadQueue::Initialized(qi) => { qi }
}
};
upload_queue.inprogress_tasks.remove(&task.task_id);
@@ -1029,7 +1155,7 @@ impl RemoteTimelineClient {
upload_queue.num_inprogress_metadata_uploads -= 1;
upload_queue.last_uploaded_consistent_lsn = lsn; // XXX monotonicity check?
}
UploadOp::Delete(_, _) => {
UploadOp::Delete(_) => {
upload_queue.num_inprogress_deletions -= 1;
}
UploadOp::Barrier(_) => unreachable!(),
@@ -1063,8 +1189,8 @@ impl RemoteTimelineClient {
reason: "metadata uploads are tiny",
},
),
UploadOp::Delete(file_kind, _) => (
*file_kind,
UploadOp::Delete(delete) => (
delete.file_kind,
RemoteOpKind::Delete,
DontTrackSize {
reason: "should we track deletes? positive or negative sign?",
@@ -1111,32 +1237,36 @@ impl RemoteTimelineClient {
info!("another concurrent task already shut down the queue");
Ok(())
}
UploadQueue::Initialized(UploadQueueInitialized {
latest_files,
latest_metadata,
last_uploaded_consistent_lsn,
..
}) => {
UploadQueue::Initialized(initialized) => {
info!("shutting down upload queue");
// Replace the queue with the Stopped state, taking ownership of the old
// Initialized queue. We will do some checks on it, and then drop it.
let qi = {
// take or clone what we need
let latest_files = std::mem::take(latest_files);
let last_uploaded_consistent_lsn = *last_uploaded_consistent_lsn;
// this could be Copy
let latest_metadata = latest_metadata.clone();
let stopped = UploadQueueStopped {
latest_files,
last_uploaded_consistent_lsn,
latest_metadata,
deleted_at: SetDeletedFlagProgress::NotRunning,
// Here we preserve working version of the upload queue for possible use during deletions.
// In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
// but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
// Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
let upload_queue_for_deletion = UploadQueueInitialized {
task_counter: 0,
latest_files: initialized.latest_files.clone(),
latest_files_changes_since_metadata_upload_scheduled: 0,
latest_metadata: initialized.latest_metadata.clone(),
last_uploaded_consistent_lsn: initialized.last_uploaded_consistent_lsn,
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::default(),
queued_operations: VecDeque::default(),
};
let upload_queue =
std::mem::replace(&mut *guard, UploadQueue::Stopped(stopped));
let upload_queue = std::mem::replace(
&mut *guard,
UploadQueue::Stopped(UploadQueueStopped {
upload_queue_for_deletion,
deleted_at: SetDeletedFlagProgress::NotRunning,
}),
);
if let UploadQueue::Initialized(qi) = upload_queue {
qi
} else {
@@ -1144,8 +1274,6 @@ impl RemoteTimelineClient {
}
};
assert!(qi.latest_files.is_empty(), "do not use this anymore");
// consistency check
assert_eq!(
qi.num_inprogress_layer_uploads
@@ -1408,7 +1536,7 @@ mod tests {
// Download back the index.json, and check that the list of files is correct
let index_part = match runtime.block_on(client.download_index_file())? {
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
MaybeDeletedIndexPart::Deleted => panic!("unexpectedly got deleted index part"),
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
};
assert_file_list(

View File

@@ -7,9 +7,11 @@ use std::collections::{HashMap, HashSet};
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::bin_ser::SerializeError;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::upload_queue::UploadQueueInitialized;
use utils::lsn::Lsn;
@@ -115,6 +117,21 @@ impl IndexPart {
}
}
impl TryFrom<&UploadQueueInitialized> for IndexPart {
type Error = SerializeError;
fn try_from(upload_queue: &UploadQueueInitialized) -> Result<Self, Self::Error> {
let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
let metadata_bytes = upload_queue.latest_metadata.to_bytes()?;
Ok(Self::new(
upload_queue.latest_files.clone(),
disk_consistent_lsn,
metadata_bytes,
))
}
}
/// Serialized form of [`LayerFileMetadata`].
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
pub struct IndexLayerMetadata {

View File

@@ -215,6 +215,10 @@ pub struct Timeline {
// though let's keep them both for better error visibility.
pub initdb_lsn: Lsn,
// Compute nodes can set this field after successful application
// of a new spec, in order to avoid reapplying it on next restart.
pub compute_spec_id: tokio::sync::Mutex<Option<String>>,
/// When did we last calculate the partitioning?
partitioning: Mutex<(KeyPartitioning, Lsn)>,
@@ -239,7 +243,7 @@ pub struct Timeline {
/// Prevent two tasks from deleting the timeline at the same time. If held, the
/// timeline is being deleted. If 'true', the timeline has already been deleted.
pub delete_lock: tokio::sync::Mutex<bool>,
pub delete_lock: Arc<tokio::sync::Mutex<bool>>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
@@ -815,8 +819,7 @@ impl Timeline {
// above. Rewrite it.
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
// Is the timeline being deleted?
let state = *self.state.borrow();
if state == TimelineState::Stopping {
if self.is_stopping() {
return Err(anyhow::anyhow!("timeline is Stopping").into());
}
@@ -955,14 +958,17 @@ impl Timeline {
(st, TimelineState::Loading) => {
error!("ignoring transition from {st:?} into Loading state");
}
(TimelineState::Broken, _) => {
error!("Ignoring state update {new_state:?} for broken tenant");
(TimelineState::Broken { .. }, new_state) => {
error!("Ignoring state update {new_state:?} for broken timeline");
}
(TimelineState::Stopping, TimelineState::Active) => {
error!("Not activating a Stopping timeline");
}
(_, new_state) => {
if matches!(new_state, TimelineState::Stopping | TimelineState::Broken) {
if matches!(
new_state,
TimelineState::Stopping | TimelineState::Broken { .. }
) {
// drop the copmletion guard, if any; it might be holding off the completion
// forever needlessly
self.initial_logical_size_attempt
@@ -975,14 +981,31 @@ impl Timeline {
}
}
pub fn set_broken(&self, reason: String) {
let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
let broken_state = TimelineState::Broken {
reason,
backtrace: backtrace_str,
};
self.set_state(broken_state)
}
pub fn current_state(&self) -> TimelineState {
*self.state.borrow()
self.state.borrow().clone()
}
pub fn is_broken(&self) -> bool {
matches!(&*self.state.borrow(), TimelineState::Broken { .. })
}
pub fn is_active(&self) -> bool {
self.current_state() == TimelineState::Active
}
pub fn is_stopping(&self) -> bool {
self.current_state() == TimelineState::Stopping
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
self.state.subscribe()
}
@@ -993,7 +1016,7 @@ impl Timeline {
) -> Result<(), TimelineState> {
let mut receiver = self.state.subscribe();
loop {
let current_state = *receiver.borrow_and_update();
let current_state = receiver.borrow().clone();
match current_state {
TimelineState::Loading => {
receiver
@@ -1437,6 +1460,7 @@ impl Timeline {
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
initdb_lsn: metadata.initdb_lsn(),
compute_spec_id: tokio::sync::Mutex::new(None),
current_logical_size: if disk_consistent_lsn.is_valid() {
// we're creating timeline data with some layer files existing locally,
@@ -1460,7 +1484,7 @@ impl Timeline {
eviction_task_timeline_state: tokio::sync::Mutex::new(
EvictionTaskTimelineState::default(),
),
delete_lock: tokio::sync::Mutex::new(false),
delete_lock: Arc::new(tokio::sync::Mutex::new(false)),
initial_logical_size_can_start,
initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
@@ -2101,11 +2125,11 @@ impl Timeline {
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = *timeline_state_updates.borrow();
let new_state = timeline_state_updates.borrow().clone();
match new_state {
// we're running this job for active timelines only
TimelineState::Active => continue,
TimelineState::Broken
TimelineState::Broken { .. }
| TimelineState::Stopping
| TimelineState::Loading => {
break format!("aborted because timeline became inactive (new state: {new_state:?})")
@@ -3792,9 +3816,7 @@ impl Timeline {
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
// Is the timeline being deleted?
let state = *self.state.borrow();
if state == TimelineState::Stopping {
// there's a global allowed_error for this
if self.is_stopping() {
anyhow::bail!("timeline is Stopping");
}

View File

@@ -153,7 +153,7 @@ pub(super) async fn connection_manager_loop_step(
match new_state {
// we're already active as walreceiver, no need to reactivate
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Stopping => {
TimelineState::Broken { .. } | TimelineState::Stopping => {
debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
return ControlFlow::Break(());
}

View File

@@ -76,6 +76,12 @@ pub(crate) struct UploadQueueInitialized {
pub(crate) queued_operations: VecDeque<UploadOp>,
}
impl UploadQueueInitialized {
pub(super) fn no_pending_work(&self) -> bool {
self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
}
}
#[derive(Clone, Copy)]
pub(super) enum SetDeletedFlagProgress {
NotRunning,
@@ -84,9 +90,7 @@ pub(super) enum SetDeletedFlagProgress {
}
pub(super) struct UploadQueueStopped {
pub(super) latest_files: HashMap<LayerFileName, LayerFileMetadata>,
pub(super) last_uploaded_consistent_lsn: Lsn,
pub(super) latest_metadata: TimelineMetadata,
pub(super) upload_queue_for_deletion: UploadQueueInitialized,
pub(super) deleted_at: SetDeletedFlagProgress,
}
@@ -187,6 +191,15 @@ impl UploadQueue {
UploadQueue::Initialized(x) => Ok(x),
}
}
pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> {
match self {
UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
anyhow::bail!("queue is in state {}", self.as_str())
}
UploadQueue::Stopped(stopped) => Ok(stopped),
}
}
}
/// An in-progress upload or delete task.
@@ -199,6 +212,13 @@ pub(crate) struct UploadTask {
pub(crate) op: UploadOp,
}
#[derive(Debug)]
pub(crate) struct Delete {
pub(crate) file_kind: RemoteOpFileKind,
pub(crate) layer_file_name: LayerFileName,
pub(crate) scheduled_from_timeline_delete: bool,
}
#[derive(Debug)]
pub(crate) enum UploadOp {
/// Upload a layer file
@@ -207,8 +227,8 @@ pub(crate) enum UploadOp {
/// Upload the metadata file
UploadMetadata(IndexPart, Lsn),
/// Delete a file.
Delete(RemoteOpFileKind, LayerFileName),
/// Delete a layer file
Delete(Delete),
/// Barrier. When the barrier operation is reached,
Barrier(tokio::sync::watch::Sender<()>),
@@ -226,7 +246,12 @@ impl std::fmt::Display for UploadOp {
)
}
UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn),
UploadOp::Delete(_, path) => write!(f, "Delete({})", path.file_name()),
UploadOp::Delete(delete) => write!(
f,
"Delete(path: {}, scheduled_from_timeline_delete: {})",
delete.layer_file_name.file_name(),
delete.scheduled_from_timeline_delete
),
UploadOp::Barrier(_) => write!(f, "Barrier"),
}
}

View File

@@ -93,6 +93,15 @@ With the current approach we made the following design decisions:
and column oids. Command tag capturing was added to the rust-postgres
functionality as part of this change.
### Output options
User can pass several optional headers that will affect resulting json.
1. `Neon-Raw-Text-Output: true`. Return postgres values as text, without parsing them. So numbers, objects, booleans, nulls and arrays will be returned as text. That can be useful in cases when client code wants to implement it's own parsing or reuse parsing libraries from e.g. node-postgres.
2. `Neon-Array-Mode: true`. Return postgres rows as arrays instead of objects. That is more compact representation and also helps in some edge
cases where it is hard to use rows represented as objects (e.g. when several fields have the same name).
## Using SNI-based routing on localhost
Now proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so I usually use `*.localtest.me` which resolves to `127.0.0.1`. Now we can create self-signed certificate and play with proxy:

View File

@@ -1,6 +1,8 @@
use futures::pin_mut;
use futures::StreamExt;
use hyper::body::HttpBody;
use hyper::http::HeaderName;
use hyper::http::HeaderValue;
use hyper::{Body, HeaderMap, Request};
use pq_proto::StartupMessageParams;
use serde_json::json;
@@ -23,21 +25,28 @@ const APP_NAME: &str = "sql_over_http";
const MAX_RESPONSE_SIZE: usize = 1024 * 1024; // 1 MB
const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true");
//
// Convert json non-string types to strings, so that they can be passed to Postgres
// as parameters.
//
fn json_to_pg_text(json: Vec<Value>) -> Result<Vec<String>, serde_json::Error> {
fn json_to_pg_text(json: Vec<Value>) -> Result<Vec<Option<String>>, serde_json::Error> {
json.iter()
.map(|value| {
match value {
Value::Null => serde_json::to_string(value),
Value::Bool(_) => serde_json::to_string(value),
Value::Number(_) => serde_json::to_string(value),
Value::Object(_) => serde_json::to_string(value),
// special care for nulls
Value::Null => Ok(None),
// no need to escape
Value::String(s) => Ok(s.to_string()),
// convert to text with escaping
Value::Bool(_) => serde_json::to_string(value).map(Some),
Value::Number(_) => serde_json::to_string(value).map(Some),
Value::Object(_) => serde_json::to_string(value).map(Some),
// avoid escaping here, as we pass this as a parameter
Value::String(s) => Ok(Some(s.to_string())),
// special care for arrays
Value::Array(_) => json_array_to_pg_array(value),
@@ -54,25 +63,29 @@ fn json_to_pg_text(json: Vec<Value>) -> Result<Vec<String>, serde_json::Error> {
//
// Example of the same escaping in node-postgres: packages/pg/lib/utils.js
//
fn json_array_to_pg_array(value: &Value) -> Result<String, serde_json::Error> {
fn json_array_to_pg_array(value: &Value) -> Result<Option<String>, serde_json::Error> {
match value {
// same
Value::Null => serde_json::to_string(value),
Value::Bool(_) => serde_json::to_string(value),
Value::Number(_) => serde_json::to_string(value),
Value::Object(_) => serde_json::to_string(value),
// special care for nulls
Value::Null => Ok(None),
// now needs to be escaped, as it is part of the array
Value::String(_) => serde_json::to_string(value),
// convert to text with escaping
Value::Bool(_) => serde_json::to_string(value).map(Some),
Value::Number(_) => serde_json::to_string(value).map(Some),
Value::Object(_) => serde_json::to_string(value).map(Some),
// here string needs to be escaped, as it is part of the array
Value::String(_) => serde_json::to_string(value).map(Some),
// recurse into array
Value::Array(arr) => {
let vals = arr
.iter()
.map(json_array_to_pg_array)
.map(|r| r.map(|v| v.unwrap_or_else(|| "NULL".to_string())))
.collect::<Result<Vec<_>, _>>()?
.join(",");
Ok(format!("{{{}}}", vals))
Ok(Some(format!("{{{}}}", vals)))
}
}
}
@@ -158,6 +171,11 @@ pub async fn handle(
("application_name", APP_NAME),
]);
// Determine the output options. Default behaviour is 'false'. Anything that is not
// strictly 'true' assumed to be false.
let raw_output = headers.get(&RAW_TEXT_OUTPUT) == Some(&HEADER_VALUE_TRUE);
let array_mode = headers.get(&ARRAY_MODE) == Some(&HEADER_VALUE_TRUE);
//
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
@@ -272,7 +290,7 @@ pub async fn handle(
// convert rows to JSON
let rows = rows
.iter()
.map(pg_text_row_to_json)
.map(|row| pg_text_row_to_json(row, raw_output, array_mode))
.collect::<Result<Vec<_>, _>>()?;
// resulting JSON format is based on the format of node-postgres result
@@ -281,26 +299,42 @@ pub async fn handle(
"rowCount": command_tag_count,
"rows": rows,
"fields": fields,
"rowAsArray": array_mode,
}))
}
//
// Convert postgres row with text-encoded values to JSON object
//
pub fn pg_text_row_to_json(row: &Row) -> Result<Value, anyhow::Error> {
let res = row
.columns()
.iter()
.enumerate()
.map(|(i, column)| {
let name = column.name();
let pg_value = row.as_text(i)?;
let json_value = pg_text_to_json(pg_value, column.type_())?;
Ok((name.to_string(), json_value))
})
.collect::<Result<Map<String, Value>, anyhow::Error>>()?;
pub fn pg_text_row_to_json(
row: &Row,
raw_output: bool,
array_mode: bool,
) -> Result<Value, anyhow::Error> {
let iter = row.columns().iter().enumerate().map(|(i, column)| {
let name = column.name();
let pg_value = row.as_text(i)?;
let json_value = if raw_output {
match pg_value {
Some(v) => Value::String(v.to_string()),
None => Value::Null,
}
} else {
pg_text_to_json(pg_value, column.type_())?
};
Ok((name.to_string(), json_value))
});
Ok(Value::Object(res))
if array_mode {
// drop keys and aggregate into array
let arr = iter
.map(|r| r.map(|(_key, val)| val))
.collect::<Result<Vec<Value>, anyhow::Error>>()?;
Ok(Value::Array(arr))
} else {
let obj = iter.collect::<Result<Map<String, Value>, anyhow::Error>>()?;
Ok(Value::Object(obj))
}
}
//
@@ -308,10 +342,6 @@ pub fn pg_text_row_to_json(row: &Row) -> Result<Value, anyhow::Error> {
//
pub fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result<Value, anyhow::Error> {
if let Some(val) = pg_value {
if val == "NULL" {
return Ok(Value::Null);
}
if let Kind::Array(elem_type) = pg_type.kind() {
return pg_array_parse(val, elem_type);
}
@@ -373,6 +403,27 @@ fn _pg_array_parse(
}
}
fn push_checked(
entry: &mut String,
entries: &mut Vec<Value>,
elem_type: &Type,
) -> Result<(), anyhow::Error> {
if !entry.is_empty() {
// While in usual postgres response we get nulls as None and everything else
// as Some(&str), in arrays we get NULL as unquoted 'NULL' string (while
// string with value 'NULL' will be represented by '"NULL"'). So catch NULLs
// here while we have quotation info and convert them to None.
if entry == "NULL" {
entries.push(pg_text_to_json(None, elem_type)?);
} else {
entries.push(pg_text_to_json(Some(entry), elem_type)?);
}
entry.clear();
}
Ok(())
}
while let Some((mut i, mut c)) = pg_array_chr.next() {
let mut escaped = false;
@@ -395,9 +446,7 @@ fn _pg_array_parse(
'}' => {
level -= 1;
if level == 0 {
if !entry.is_empty() {
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
}
push_checked(&mut entry, &mut entries, elem_type)?;
if nested {
return Ok((Value::Array(entries), i));
}
@@ -405,17 +454,15 @@ fn _pg_array_parse(
}
'"' if !escaped => {
if quote {
// push even if empty
// end of quoted string, so push it manually without any checks
// for emptiness or nulls
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
entry = String::new();
entry.clear();
}
quote = !quote;
}
',' if !quote => {
if !entry.is_empty() {
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
entry = String::new();
}
push_checked(&mut entry, &mut entries, elem_type)?;
}
_ => {
entry.push(c);
@@ -439,30 +486,35 @@ mod tests {
fn test_atomic_types_to_pg_params() {
let json = vec![Value::Bool(true), Value::Bool(false)];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["true", "false"]);
assert_eq!(
pg_params,
vec![Some("true".to_owned()), Some("false".to_owned())]
);
let json = vec![Value::Number(serde_json::Number::from(42))];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["42"]);
assert_eq!(pg_params, vec![Some("42".to_owned())]);
let json = vec![Value::String("foo\"".to_string())];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["foo\""]);
assert_eq!(pg_params, vec![Some("foo\"".to_owned())]);
let json = vec![Value::Null];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["null"]);
assert_eq!(pg_params, vec![None]);
}
#[test]
fn test_json_array_to_pg_array() {
// atoms and escaping
let json = "[true, false, null, 42, \"foo\", \"bar\\\"-\\\\\"]";
let json = "[true, false, null, \"NULL\", 42, \"foo\", \"bar\\\"-\\\\\"]";
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]).unwrap();
assert_eq!(
pg_params,
vec!["{true,false,null,42,\"foo\",\"bar\\\"-\\\\\"}"]
vec![Some(
"{true,false,NULL,\"NULL\",42,\"foo\",\"bar\\\"-\\\\\"}".to_owned()
)]
);
// nested arrays
@@ -471,7 +523,9 @@ mod tests {
let pg_params = json_to_pg_text(vec![json]).unwrap();
assert_eq!(
pg_params,
vec!["{{true,false},{null,42},{\"foo\",\"bar\\\"-\\\\\"}}"]
vec![Some(
"{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned()
)]
);
}

View File

@@ -146,7 +146,15 @@ impl PhysicalStorage {
// If not, maybe it's better to call fsync() here to be sure?
let flush_lsn = write_lsn;
let mut storage = PhysicalStorage {
debug!(
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
);
if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id);
}
Ok(PhysicalStorage {
metrics: WalStorageMetrics::default(),
timeline_dir,
conf: conf.clone(),
@@ -156,39 +164,7 @@ impl PhysicalStorage {
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
file: None,
};
if write_lsn != Lsn::INVALID {
// fsync last segment as we're setting flush_lsn to write_lsn.
// Previous segments must have been fsynced before starting new one.
let mut last_segno = storage.write_lsn.segment_number(storage.wal_seg_size);
// We could have stopped on the segment boundary; fsync the previous
// segment then, we might have crashed before that.
if storage.write_lsn.segment_offset(storage.wal_seg_size) == 0 {
last_segno -= 1;
}
// 1 is the first segment
if last_segno >= 1 {
// We can create timeline without underlying WAL (and do that in
// s3_wal_reply test) if e.g. it is in s3, so ignore missing
// file.
if let Ok((mut last_file, _)) = storage.open_or_create(last_segno) {
storage
.fsync_file(&mut last_file)
.with_context(|| format!("fsync last segment segno={last_segno}"))?;
}
}
}
debug!(
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
);
if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id);
}
Ok(storage)
})
}
/// Get all known state of the storage.
@@ -406,15 +382,6 @@ impl Storage for PhysicalStorage {
// Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
// disk (this happens on each connect).
if end_pos == self.write_lsn {
// ... we still need to rename last to partial to confirm asserts.
// It's likely better to remove .partial altogether.
let segno = end_pos.segment_number(self.wal_seg_size);
let (_, is_partial) = self.open_or_create(segno)?;
if !is_partial {
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_path, wal_file_partial_path)?;
}
return Ok(());
}

View File

@@ -663,6 +663,8 @@ class NeonEnvBuilder:
else:
raise RuntimeError(f"Unknown storage type: {remote_storage_kind}")
self.remote_storage_kind = remote_storage_kind
def enable_local_fs_remote_storage(self, force_enable: bool = True):
"""
Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path.
@@ -2910,6 +2912,7 @@ SKIP_FILES = frozenset(
"pg_internal.init",
"pg.log",
"zenith.signal",
"neon_compute_spec_id.txt",
"pg_hba.conf",
"postgresql.conf",
"postmaster.opts",

View File

@@ -2,7 +2,7 @@ import time
from typing import Any, Dict, Optional
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.types import Lsn, TenantId, TimelineId
@@ -92,6 +92,41 @@ def wait_until_tenant_state(
)
def wait_until_timeline_state(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
expected_state: str,
iterations: int,
period: float = 1.0,
) -> Dict[str, Any]:
"""
Does not use `wait_until` for debugging purposes
"""
for i in range(iterations):
try:
timeline = pageserver_http.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)
log.debug(f"Timeline {tenant_id}/{timeline_id} data: {timeline}")
if isinstance(timeline["state"], str):
if timeline["state"] == expected_state:
return timeline
elif isinstance(timeline, Dict):
if timeline["state"].get(expected_state):
return timeline
except Exception as e:
log.debug(f"Timeline {tenant_id}/{timeline_id} state retrieval failure: {e}")
if i == iterations - 1:
# do not sleep last time, we already know that we failed
break
time.sleep(period)
raise Exception(
f"Timeline {tenant_id}/{timeline_id} did not become {expected_state} within {iterations * period} seconds"
)
def wait_until_tenant_active(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
@@ -156,3 +191,21 @@ def wait_for_upload_queue_empty(
if all(m.value == 0 for m in tl):
return
time.sleep(0.2)
def assert_timeline_detail_404(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
):
"""Asserts that timeline_detail returns 404, or dumps the detail."""
try:
data = pageserver_http.timeline_detail(tenant_id, timeline_id)
log.error(f"detail {data}")
except PageserverApiException as e:
log.error(e)
if e.status_code == 404:
return
else:
raise
raise Exception("detail succeeded (it should return 404)")

View File

@@ -1,10 +1,63 @@
from contextlib import closing
import pytest
from fixtures.benchmark_fixture import NeonBenchmarker
import requests
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.neon_fixtures import NeonEnvBuilder
# Just start and measure duration.
#
# This test runs pretty quickly and can be informative when used in combination
# with emulated network delay. Some useful delay commands:
#
# 1. Add 2msec delay to all localhost traffic
# `sudo tc qdisc add dev lo root handle 1:0 netem delay 2msec`
#
# 2. Test that it works (you should see 4ms ping)
# `ping localhost`
#
# 3. Revert back to normal
# `sudo tc qdisc del dev lo root netem`
#
# NOTE this test might not represent the real startup time because the basebackup
# for a large database might be larger if there's a lof of transaction metadata,
# or safekeepers might need more syncing, or there might be more operations to
# apply during config step, like more users, databases, or extensions. By default
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
# test we only load neon.
def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_startup")
# We do two iterations so we can see if the second startup is faster. It should
# be because the compute node should already be configured with roles, databases,
# extensions, etc from the first run.
for i in range(2):
# Start
with zenbenchmark.record_duration(f"{i}_start_and_select"):
endpoint = env.endpoints.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Get metrics
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
durations = {
"wait_for_spec_ms": f"{i}_wait_for_spec",
"sync_safekeepers_ms": f"{i}_sync_safekeepers",
"basebackup_ms": f"{i}_basebackup",
"config_ms": f"{i}_config",
"total_startup_ms": f"{i}_total_startup",
}
for key, name in durations.items():
value = metrics[key]
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
# Stop so we can restart
endpoint.stop()
# This test sometimes runs for longer than the global 5 minute timeout.
@pytest.mark.timeout(600)
def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):

View File

@@ -79,6 +79,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
# Set up pageserver for import
neon_env_builder.enable_local_fs_remote_storage()
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
client.tenant_create(tenant)
@@ -145,6 +146,11 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
)
# NOTE: delete can easily come before upload operations are completed
# https://github.com/neondatabase/neon/issues/4326
env.pageserver.allowed_errors.append(
".*files not bound to index_file.json, proceeding with their deletion.*"
)
client.timeline_delete(tenant, timeline)
# Importing correct backup works

View File

@@ -225,3 +225,37 @@ def test_sql_over_http(static_proxy: NeonProxy):
res = q("drop table t")
assert res["command"] == "DROP"
assert res["rowCount"] is None
def test_sql_over_http_output_options(static_proxy: NeonProxy):
static_proxy.safe_psql("create role http2 with login password 'http2' superuser")
def q(sql: str, raw_text: bool, array_mode: bool, params: List[Any] = []) -> Any:
connstr = (
f"postgresql://http2:http2@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
)
response = requests.post(
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
data=json.dumps({"query": sql, "params": params}),
headers={
"Content-Type": "application/sql",
"Neon-Connection-String": connstr,
"Neon-Raw-Text-Output": "true" if raw_text else "false",
"Neon-Array-Mode": "true" if array_mode else "false",
},
verify=str(static_proxy.test_output_dir / "proxy.crt"),
)
assert response.status_code == 200
return response.json()
rows = q("select 1 as n, 'a' as s, '{1,2,3}'::int4[] as arr", False, False)["rows"]
assert rows == [{"arr": [1, 2, 3], "n": 1, "s": "a"}]
rows = q("select 1 as n, 'a' as s, '{1,2,3}'::int4[] as arr", False, True)["rows"]
assert rows == [[1, "a", [1, 2, 3]]]
rows = q("select 1 as n, 'a' as s, '{1,2,3}'::int4[] as arr", True, False)["rows"]
assert rows == [{"arr": "{1,2,3}", "n": "1", "s": "a"}]
rows = q("select 1 as n, 'a' as s, '{1,2,3}'::int4[] as arr", True, True)["rows"]
assert rows == [["1", "a", "{1,2,3}"]]

View File

@@ -20,6 +20,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.utils import (
assert_timeline_detail_404,
wait_for_last_record_lsn,
wait_for_upload,
wait_until_tenant_active,
@@ -182,7 +183,7 @@ def test_remote_storage_backup_and_restore(
wait_until_tenant_active(
pageserver_http=client,
tenant_id=tenant_id,
iterations=5,
iterations=10, # make it longer for real_s3 tests when unreliable wrapper is involved
)
detail = client.timeline_detail(tenant_id, timeline_id)
@@ -598,8 +599,23 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
)
client.timeline_delete(tenant_id, timeline_id)
env.pageserver.allowed_errors.append(f".*Timeline {tenant_id}/{timeline_id} was not found.*")
env.pageserver.allowed_errors.append(
".*files not bound to index_file.json, proceeding with their deletion.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(client, tenant_id, timeline_id))
assert not timeline_path.exists()
# to please mypy
assert isinstance(env.remote_storage, LocalFsStorage)
remote_timeline_path = (
env.remote_storage.root / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
)
assert not list(remote_timeline_path.iterdir())
# timeline deletion should kill ongoing uploads, so, the metric will be gone
assert get_queued_count(file_kind="index", op_kind="upload") is None

View File

@@ -3,6 +3,7 @@ import queue
import shutil
import threading
from pathlib import Path
from typing import Optional
import pytest
import requests
@@ -11,13 +12,16 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
S3Storage,
available_remote_storages,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
assert_timeline_detail_404,
wait_for_last_record_lsn,
wait_for_upload,
wait_until_tenant_active,
wait_until_timeline_state,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, wait_until
@@ -68,7 +72,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
ps_http.timeline_delete(env.initial_tenant, parent_timeline_id)
assert exc.value.status_code == 400
assert exc.value.status_code == 412
timeline_path = (
env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id)
@@ -130,13 +134,25 @@ def test_delete_timeline_post_rm_failure(
env = neon_env_builder.init_start()
assert env.initial_timeline
env.pageserver.allowed_errors.append(".*Error: failpoint: timeline-delete-after-rm")
env.pageserver.allowed_errors.append(".*Ignoring state update Stopping for broken timeline")
ps_http = env.pageserver.http_client()
failpoint_name = "timeline-delete-after-rm"
ps_http.configure_failpoints((failpoint_name, "return"))
with pytest.raises(PageserverApiException, match=f"failpoint: {failpoint_name}"):
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline)
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline)
timeline_info = wait_until_timeline_state(
pageserver_http=ps_http,
tenant_id=env.initial_tenant,
timeline_id=env.initial_timeline,
expected_state="Broken",
iterations=2, # effectively try immediately and retry once in one second
)
timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
at_failpoint_log_message = f".*{env.initial_timeline}.*at failpoint {failpoint_name}.*"
env.pageserver.allowed_errors.append(at_failpoint_log_message)
@@ -148,11 +164,14 @@ def test_delete_timeline_post_rm_failure(
ps_http.configure_failpoints((failpoint_name, "off"))
# this should succeed
# this also checks that delete can be retried even when timeline is in Broken state
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline, timeout=2)
# the second call will try to transition the timeline into Stopping state, but it's already in that state
env.pageserver.allowed_errors.append(
f".*{env.initial_timeline}.*Ignoring new state, equal to the existing one: Stopping"
)
with pytest.raises(PageserverApiException) as e:
ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
assert e.value.status_code == 404
env.pageserver.allowed_errors.append(f".*NotFound: Timeline.*{env.initial_timeline}.*")
env.pageserver.allowed_errors.append(
f".*{env.initial_timeline}.*timeline directory not found, proceeding anyway.*"
)
@@ -230,6 +249,12 @@ def test_timeline_resurrection_on_attach(
# delete new timeline
ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {tenant_id}/{branch_timeline_id} was not found.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, branch_timeline_id))
##### Stop the pageserver instance, erase all its data
env.endpoints.stop_all()
env.pageserver.stop()
@@ -252,12 +277,31 @@ def test_timeline_resurrection_on_attach(
assert all([tl["state"] == "Active" for tl in timelines])
def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str] = None):
# For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api.
assert neon_env_builder.remote_storage_kind in (
RemoteStorageKind.MOCK_S3,
RemoteStorageKind.REAL_S3,
)
# For mypy
assert isinstance(neon_env_builder.remote_storage, S3Storage)
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
response = neon_env_builder.remote_storage_client.list_objects_v2(
Bucket=neon_env_builder.remote_storage.bucket_name,
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
)
objects = response.get("Contents")
assert (
response["KeyCount"] == 0
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuilder):
"""
When deleting a timeline, if we succeed in setting the deleted flag remotely
but fail to delete the local state, restarting the pageserver should resume
the deletion of the local state.
(Deletion of the state in S3 is not implemented yet.)
"""
neon_env_builder.enable_remote_storage(
@@ -293,11 +337,17 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id)
)
with pytest.raises(
PageserverApiException,
match="failpoint: timeline-delete-before-rm",
):
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
timeline_info = wait_until_timeline_state(
pageserver_http=ps_http,
tenant_id=env.initial_tenant,
timeline_id=leaf_timeline_id,
expected_state="Broken",
iterations=2, # effectively try immediately and retry once in one second
)
timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
assert leaf_timeline_path.exists(), "the failpoint didn't work"
@@ -305,7 +355,14 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
env.pageserver.start()
# Wait for tenant to finish loading.
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=0.5)
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{leaf_timeline_id} was not found.*"
)
wait_until(
2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id)
)
assert (
not leaf_timeline_path.exists()
@@ -317,6 +374,50 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
}, "other timelines should not have been affected"
assert all([tl["state"] == "Active" for tl in timelines])
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(leaf_timeline_id),
)
),
)
assert env.initial_timeline is not None
for timeline_id in (intermediate_timeline_id, env.initial_timeline):
ps_http.timeline_delete(env.initial_tenant, timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*"
)
wait_until(
2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, timeline_id)
)
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
# for some reason the check above doesnt immediately take effect for the below.
# Assume it is mock server incosistency and check twice.
wait_until(
2,
0.5,
lambda: assert_prefix_empty(neon_env_builder),
)
def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
neon_env_builder: NeonEnvBuilder,
@@ -457,3 +558,87 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
ps_http.timeline_detail(env.initial_tenant, child_timeline_id)
assert exc.value.status_code == 404
@pytest.mark.parametrize(
"remote_storage_kind",
list(
filter(
lambda s: s in (RemoteStorageKind.MOCK_S3, RemoteStorageKind.REAL_S3),
available_remote_storages(),
)
),
)
def test_timeline_delete_works_for_remote_smoke(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_timeline_delete_works_for_remote_smoke",
)
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
pg = env.endpoints.create_start("main")
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
main_timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
assert tenant_id == env.initial_tenant
assert main_timeline_id == env.initial_timeline
timeline_ids = [env.initial_timeline]
for i in range(2):
branch_timeline_id = env.neon_cli.create_branch(f"new{i}", "main")
pg = env.endpoints.create_start(f"new{i}")
with pg.cursor() as cur:
cur.execute("CREATE TABLE f (i integer);")
cur.execute("INSERT INTO f VALUES (generate_series(1,1000));")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(ps_http, tenant_id, branch_timeline_id, current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
ps_http.timeline_checkpoint(tenant_id, branch_timeline_id)
# wait until pageserver successfully uploaded a checkpoint to remote storage
log.info("waiting for checkpoint upload")
wait_for_upload(ps_http, tenant_id, branch_timeline_id, current_lsn)
log.info("upload of checkpoint is done")
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
timeline_ids.append(timeline_id)
for timeline_id in reversed(timeline_ids):
# note that we need to finish previous deletion before scheduling next one
# otherwise we can get an "HasChildren" error if deletion is not fast enough (real_s3)
ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, timeline_id))
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
# for some reason the check above doesnt immediately take effect for the below.
# Assume it is mock server incosistency and check twice.
wait_until(
2,
0.5,
lambda: assert_prefix_empty(neon_env_builder),
)