mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 22:50:38 +00:00
Compare commits
8 Commits
less_linki
...
jcsp/delet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8df507ccea | ||
|
|
cc2c1a8bf4 | ||
|
|
218b514498 | ||
|
|
86dd28d4fb | ||
|
|
fd20bbc6cb | ||
|
|
6a1903987a | ||
|
|
1881373ec4 | ||
|
|
ca3ca2bb9c |
@@ -5,7 +5,7 @@
|
||||
/libs/remote_storage/ @neondatabase/storage
|
||||
/libs/safekeeper_api/ @neondatabase/safekeepers
|
||||
/libs/vm_monitor/ @neondatabase/autoscaling @neondatabase/compute
|
||||
/pageserver/ @neondatabase/compute @neondatabase/storage
|
||||
/pageserver/ @neondatabase/storage
|
||||
/pgxn/ @neondatabase/compute
|
||||
/proxy/ @neondatabase/proxy
|
||||
/safekeeper/ @neondatabase/safekeepers
|
||||
|
||||
23
Cargo.lock
generated
23
Cargo.lock
generated
@@ -1780,18 +1780,9 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.2.6"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
|
||||
checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
|
||||
|
||||
[[package]]
|
||||
name = "hex"
|
||||
@@ -2053,7 +2044,7 @@ version = "1.0.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
|
||||
dependencies = [
|
||||
"hermit-abi 0.3.1",
|
||||
"hermit-abi",
|
||||
"libc",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
@@ -2070,7 +2061,7 @@ version = "0.4.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
|
||||
dependencies = [
|
||||
"hermit-abi 0.3.1",
|
||||
"hermit-abi",
|
||||
"io-lifetimes",
|
||||
"rustix 0.37.19",
|
||||
"windows-sys 0.48.0",
|
||||
@@ -2444,11 +2435,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "num_cpus"
|
||||
version = "1.15.0"
|
||||
version = "1.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
|
||||
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
|
||||
dependencies = [
|
||||
"hermit-abi 0.2.6",
|
||||
"hermit-abi",
|
||||
"libc",
|
||||
]
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ vulnerability = "deny"
|
||||
unmaintained = "warn"
|
||||
yanked = "warn"
|
||||
notice = "warn"
|
||||
ignore = ["RUSTSEC-2023-0052"]
|
||||
ignore = []
|
||||
|
||||
# This section is considered when running `cargo deny check licenses`
|
||||
# More documentation for the licenses section can be found here:
|
||||
|
||||
@@ -34,6 +34,8 @@ use crate::deletion_queue::TEMP_SUFFIX;
|
||||
use crate::metrics;
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use crate::virtual_file;
|
||||
use crate::virtual_file::on_fatal_io_error;
|
||||
|
||||
// The number of keys in a DeletionList before we will proactively persist it
|
||||
// (without reaching a flush deadline). This aims to deliver objects of the order
|
||||
@@ -199,7 +201,8 @@ impl ListWriter {
|
||||
);
|
||||
Ok(None)
|
||||
} else {
|
||||
Err(anyhow::anyhow!(e))
|
||||
on_fatal_io_error(&e);
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -228,8 +231,10 @@ impl ListWriter {
|
||||
deletion_directory.display(),
|
||||
);
|
||||
|
||||
// Give up: if we can't read the deletion list directory, we probably can't
|
||||
// write lists into it later, so the queue won't work.
|
||||
// This is fatal: any failure to read this local directory indicates a
|
||||
// storage problem or configuration problem of the node.
|
||||
virtual_file::on_fatal_io_error(&e);
|
||||
|
||||
return Err(e.into());
|
||||
}
|
||||
};
|
||||
@@ -243,26 +248,27 @@ impl ListWriter {
|
||||
let file_name = dentry.file_name();
|
||||
let dentry_str = file_name.to_string_lossy();
|
||||
|
||||
if Some(file_name.as_os_str()) == header_path.file_name() {
|
||||
// Don't try and parse the header's name like a list
|
||||
continue;
|
||||
}
|
||||
|
||||
// Temporary files might be left behind from `crashsafe_overwrite`
|
||||
if dentry_str.ends_with(TEMP_SUFFIX) {
|
||||
info!("Cleaning up temporary file {dentry_str}");
|
||||
let absolute_path = deletion_directory.join(dentry.file_name());
|
||||
if let Err(e) = tokio::fs::remove_file(&absolute_path).await {
|
||||
// Non-fatal error: we will just leave the file behind but not
|
||||
// try and load it.
|
||||
warn!(
|
||||
"Failed to clean up temporary file {}: {e:#}",
|
||||
absolute_path.display()
|
||||
);
|
||||
|
||||
virtual_file::on_fatal_io_error(&e);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if Some(file_name.as_os_str()) == header_path.file_name() {
|
||||
// Don't try and parse the header's name like a list
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_name = dentry.file_name().to_owned();
|
||||
let basename = file_name.to_string_lossy();
|
||||
let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
|
||||
@@ -270,7 +276,7 @@ impl ListWriter {
|
||||
.expect("Non optional group should be present")
|
||||
.as_str()
|
||||
} else {
|
||||
warn!("Unexpected key in deletion queue: {basename}");
|
||||
warn!("Unexpected filename in deletion queue: {basename}");
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
continue;
|
||||
};
|
||||
@@ -298,7 +304,13 @@ impl ListWriter {
|
||||
for s in seqs {
|
||||
let list_path = self.conf.deletion_list_path(s);
|
||||
|
||||
let list_bytes = tokio::fs::read(&list_path).await?;
|
||||
let list_bytes = match tokio::fs::read(&list_path).await {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
virtual_file::on_fatal_io_error(&e);
|
||||
unreachable!();
|
||||
}
|
||||
};
|
||||
|
||||
let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
|
||||
Ok(l) => l,
|
||||
|
||||
@@ -28,6 +28,7 @@ use crate::config::PageServerConf;
|
||||
use crate::control_plane_client::ControlPlaneGenerationsApi;
|
||||
use crate::control_plane_client::RetryForeverError;
|
||||
use crate::metrics;
|
||||
use crate::virtual_file::on_fatal_io_error;
|
||||
|
||||
use super::deleter::DeleterMessage;
|
||||
use super::DeletionHeader;
|
||||
@@ -116,6 +117,11 @@ where
|
||||
/// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
|
||||
/// go into the queue of ready-to-execute lists.
|
||||
async fn validate(&mut self) -> Result<(), DeletionQueueError> {
|
||||
// Figure out for each tenant which generation number to validate.
|
||||
//
|
||||
// It is sufficient to validate the max generation number of each tenant because only the
|
||||
// highest generation number can possibly be valid. Hence this map will collect the
|
||||
// highest generation pending validation for each tenant.
|
||||
let mut tenant_generations = HashMap::new();
|
||||
for list in &self.pending_lists {
|
||||
for (tenant_id, tenant_list) in &list.tenants {
|
||||
@@ -246,6 +252,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// Assert monotonicity of the list sequence numbers we are processing
|
||||
if let Some(validated) = validated_sequence {
|
||||
assert!(list.sequence >= validated)
|
||||
}
|
||||
|
||||
validated_sequence = Some(list.sequence);
|
||||
}
|
||||
|
||||
@@ -293,6 +304,9 @@ where
|
||||
// issue (probably permissions) has been fixed by then.
|
||||
tracing::error!("Failed to delete {}: {e:#}", list_path.display());
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
|
||||
on_fatal_io_error(&e);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,10 +130,15 @@ pub async fn init_tenant_mgr(
|
||||
// deletion list entries may still be valid. We provide that by pushing a recovery operation into
|
||||
// the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
|
||||
// are processed, even though we don't block on recovery completing here.
|
||||
resources
|
||||
.deletion_queue_client
|
||||
.recover(result.clone())
|
||||
.await?;
|
||||
//
|
||||
// Must only do this if remote storage is enabled, otherwise deletion queue
|
||||
// is not running and channel push will fail.
|
||||
if resources.remote_storage.is_some() {
|
||||
resources
|
||||
.deletion_queue_client
|
||||
.recover(result.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Some(result)
|
||||
} else {
|
||||
|
||||
@@ -208,6 +208,15 @@ impl CrashsafeOverwriteError {
|
||||
}
|
||||
}
|
||||
|
||||
/// Call this when the local filesystem gives us an error with an external
|
||||
/// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
|
||||
/// bad storage or bad configuration, and we can't fix that from inside
|
||||
/// a running process.
|
||||
pub(crate) fn on_fatal_io_error(e: &std::io::Error) {
|
||||
tracing::error!("Fatal I/O error: {}", &e);
|
||||
std::process::abort();
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
|
||||
|
||||
@@ -160,6 +160,19 @@ impl BackendType<'_, ClientCredentials<'_>> {
|
||||
Test(_) => Some("test".to_owned()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get username from the credentials.
|
||||
pub fn get_user(&self) -> &str {
|
||||
use BackendType::*;
|
||||
|
||||
match self {
|
||||
Console(_, creds) => creds.user,
|
||||
Postgres(_, creds) => creds.user,
|
||||
Link(_) => "link",
|
||||
Test(_) => "test",
|
||||
}
|
||||
}
|
||||
|
||||
/// Authenticate the client via the requested backend, possibly using credentials.
|
||||
#[tracing::instrument(fields(allow_cleartext = allow_cleartext), skip_all)]
|
||||
pub async fn authenticate(
|
||||
|
||||
@@ -697,7 +697,14 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
.await
|
||||
{
|
||||
Ok(auth_result) => auth_result,
|
||||
Err(e) => return stream.throw_error(e).await,
|
||||
Err(e) => {
|
||||
let user = creds.get_user();
|
||||
let db = params.get("database");
|
||||
let app = params.get("application_name");
|
||||
let params_span = tracing::info_span!("", ?user, ?db, ?app);
|
||||
|
||||
return stream.throw_error(e).instrument(params_span).await;
|
||||
}
|
||||
};
|
||||
|
||||
let AuthSuccess {
|
||||
|
||||
@@ -15,7 +15,7 @@ from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
wait_until_tenant_active,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion, xfail_on_postgres
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
@@ -532,7 +532,24 @@ def test_single_branch_get_tenant_size_grows(
|
||||
assert size_after == prev, "size after restarting pageserver should not have changed"
|
||||
|
||||
|
||||
@xfail_on_postgres(PgVersion.V15, reason="Test significantly more flaky on Postgres 15")
|
||||
def assert_size_approx_equal(size_a, size_b):
|
||||
"""
|
||||
Tests that evaluate sizes are checking the pageserver space consumption
|
||||
that sits many layers below the user input. The exact space needed
|
||||
varies slightly depending on postgres behavior.
|
||||
|
||||
Rather than expecting postgres to be determinstic and occasionally
|
||||
failing the test, we permit sizes for the same data to vary by a few pages.
|
||||
"""
|
||||
|
||||
# Determined empirically from examples of equality failures: they differ
|
||||
# by page multiples of 8272, and usually by 1-3 pages. Tolerate 4 to avoid
|
||||
# failing on outliers from that observed range.
|
||||
threshold = 4 * 8272
|
||||
|
||||
assert size_a == pytest.approx(size_b, abs=threshold)
|
||||
|
||||
|
||||
def test_get_tenant_size_with_multiple_branches(
|
||||
neon_env_builder: NeonEnvBuilder, test_output_dir: Path
|
||||
):
|
||||
@@ -573,7 +590,7 @@ def test_get_tenant_size_with_multiple_branches(
|
||||
)
|
||||
|
||||
size_after_first_branch = http_client.tenant_size(tenant_id)
|
||||
assert size_after_first_branch == size_at_branch
|
||||
assert_size_approx_equal(size_after_first_branch, size_at_branch)
|
||||
|
||||
first_branch_endpoint = env.endpoints.create_start("first-branch", tenant_id=tenant_id)
|
||||
|
||||
@@ -599,7 +616,7 @@ def test_get_tenant_size_with_multiple_branches(
|
||||
"second-branch", main_branch_name, tenant_id
|
||||
)
|
||||
size_after_second_branch = http_client.tenant_size(tenant_id)
|
||||
assert size_after_second_branch == size_after_continuing_on_main
|
||||
assert_size_approx_equal(size_after_second_branch, size_after_continuing_on_main)
|
||||
|
||||
second_branch_endpoint = env.endpoints.create_start("second-branch", tenant_id=tenant_id)
|
||||
|
||||
@@ -635,7 +652,7 @@ def test_get_tenant_size_with_multiple_branches(
|
||||
# tenant_size but so far this has been reliable, even though at least gc
|
||||
# and tenant_size race for the same locks
|
||||
size_after = http_client.tenant_size(tenant_id)
|
||||
assert size_after == size_after_thinning_branch
|
||||
assert_size_approx_equal(size_after, size_after_thinning_branch)
|
||||
|
||||
size_debug_file_before = open(test_output_dir / "size_debug_before.html", "w")
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
|
||||
Reference in New Issue
Block a user