Merge branch 'main' of https://github.com/neondatabase/neon into skyzh/cli-parse-reject

This commit is contained in:
Alex Chi
2023-06-01 13:24:15 -04:00
36 changed files with 1124 additions and 515 deletions

View File

@@ -407,9 +407,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
- uses: actions/github-script@v6
if: >
!cancelled() &&
github.event_name == 'pull_request'
if: ${{ !cancelled() }}
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
@@ -419,7 +417,7 @@ jobs:
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
}
const script = require("./scripts/pr-comment-test-report.js")
const script = require("./scripts/comment-test-report.js")
await script({
github,
context,
@@ -494,19 +492,24 @@ jobs:
env:
COMMIT_URL: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.event.pull_request.head.sha || github.sha }}
run: |
scripts/coverage \
--dir=/tmp/coverage report \
scripts/coverage --dir=/tmp/coverage \
report \
--input-objects=/tmp/coverage/binaries.list \
--commit-url=${COMMIT_URL} \
--format=github
scripts/coverage --dir=/tmp/coverage \
report \
--input-objects=/tmp/coverage/binaries.list \
--format=lcov
- name: Upload coverage report
id: upload-coverage-report
env:
BUCKET: neon-github-public-dev
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://neon-github-public-dev/code-coverage/${COMMIT_SHA}
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://${BUCKET}/code-coverage/${COMMIT_SHA}
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/code-coverage/${COMMIT_SHA}/index.html
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
@@ -799,7 +802,7 @@ jobs:
- name: Build vm image
run: |
./vm-builder -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
./vm-builder -enable-file-cache -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
- name: Pushing vm-compute-node image
run: |

12
Cargo.lock generated
View File

@@ -2040,17 +2040,6 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "leaky-bucket"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d615fd0b579225f0d3c8d781af50a73644b571da8b5b50053ef2dcfa60dd51e7"
dependencies = [
"parking_lot",
"tokio",
"tracing",
]
[[package]]
name = "libc"
version = "0.2.144"
@@ -3234,7 +3223,6 @@ dependencies = [
"aws-smithy-http",
"aws-types",
"hyper",
"leaky-bucket",
"metrics",
"once_cell",
"pin-project-lite",

View File

@@ -33,5 +33,7 @@ pub fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
.init();
tracing::info!("logging and tracing started");
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
Ok(())
}

View File

@@ -19,7 +19,29 @@ use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
/// A state of a tenant in pageserver's memory.
/// The state of a tenant in this pageserver.
///
/// ```mermaid
/// stateDiagram-v2
///
/// [*] --> Loading: spawn_load()
/// [*] --> Attaching: spawn_attach()
///
/// Loading --> Activating: activate()
/// Attaching --> Activating: activate()
/// Activating --> Active: infallible
///
/// Loading --> Broken: load() failure
/// Attaching --> Broken: attach() failure
///
/// Active --> Stopping: set_stopping(), part of shutdown & detach
/// Stopping --> Broken: late error in remove_tenant_from_memory
///
/// Broken --> [*]: ignore / detach / shutdown
/// Stopping --> [*]: remove_from_memory complete
///
/// Active --> Broken: cfg(testing)-only tenant break point
/// ```
#[derive(
Clone,
PartialEq,
@@ -27,40 +49,63 @@ use bytes::{BufMut, Bytes, BytesMut};
serde::Serialize,
serde::Deserialize,
strum_macros::Display,
strum_macros::EnumString,
strum_macros::EnumVariantNames,
strum_macros::AsRefStr,
strum_macros::IntoStaticStr,
)]
#[serde(tag = "slug", content = "data")]
pub enum TenantState {
/// This tenant is being loaded from local disk
/// This tenant is being loaded from local disk.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Loading,
/// This tenant is being downloaded from cloud storage.
/// This tenant is being attached to the pageserver.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Attaching,
/// Tenant is fully operational
/// The tenant is transitioning from Loading/Attaching to Active.
///
/// While in this state, the individual timelines are being activated.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Activating(ActivatingFrom),
/// The tenant has finished activating and is open for business.
///
/// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
Active,
/// A tenant is recognized by pageserver, but it is being detached or the
/// The tenant is recognized by pageserver, but it is being detached or the
/// system is being shut down.
///
/// Transitions out of this state are possible through `set_broken()`.
Stopping,
/// A tenant is recognized by the pageserver, but can no longer be used for
/// any operations, because it failed to be activated.
/// The tenant is recognized by the pageserver, but can no longer be used for
/// any operations.
///
/// If the tenant fails to load or attach, it will transition to this state
/// and it is guaranteed that no background tasks are running in its name.
///
/// The other way to transition into this state is from `Stopping` state
/// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
/// if the cleanup future executed by `remove_tenant_from_memory()` fails.
Broken { reason: String, backtrace: String },
}
impl TenantState {
pub fn attachment_status(&self) -> TenantAttachmentStatus {
use TenantAttachmentStatus::*;
// Below TenantState::Activating is used as "transient" or "transparent" state for
// attachment_status determining.
match self {
// The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
// So, technically, we can return Attached here.
// However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
// But, our attach task might still be fetching the remote timelines, etc.
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
Self::Attaching => Maybe,
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
// tenant mgr startup distinguishes attaching from loading via marker file.
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
Self::Loading => Attached,
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
// We only reach Active after successful load / attach.
// So, call atttachment status Attached.
Self::Active => Attached,
@@ -99,6 +144,15 @@ impl std::fmt::Debug for TenantState {
}
}
/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ActivatingFrom {
/// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
Loading,
/// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
Attaching,
}
/// A state of a timeline in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TimelineState {
@@ -187,6 +241,7 @@ pub struct TenantConfig {
pub min_resident_size_override: Option<u64>,
#[clap(long)]
pub evictions_low_residence_duration_metric_threshold: Option<String>,
pub gc_feedback: Option<bool>,
}
impl TenantConfig {
@@ -264,6 +319,7 @@ impl TenantConfigRequest {
eviction_policy: None,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: None,
gc_feedback: None,
};
TenantConfigRequest { tenant_id, config }
}
@@ -871,4 +927,55 @@ mod tests {
err
);
}
#[test]
fn tenantstatus_activating_serde() {
let states = [
TenantState::Activating(ActivatingFrom::Loading),
TenantState::Activating(ActivatingFrom::Attaching),
];
let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
let actual = serde_json::to_string(&states).unwrap();
assert_eq!(actual, expected);
let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
assert_eq!(states.as_slice(), &parsed);
}
#[test]
fn tenantstatus_activating_strum() {
// tests added, because we use these for metrics
let examples = [
(line!(), TenantState::Loading, "Loading"),
(line!(), TenantState::Attaching, "Attaching"),
(
line!(),
TenantState::Activating(ActivatingFrom::Loading),
"Activating",
),
(
line!(),
TenantState::Activating(ActivatingFrom::Attaching),
"Activating",
),
(line!(), TenantState::Active, "Active"),
(line!(), TenantState::Stopping, "Stopping"),
(
line!(),
TenantState::Broken {
reason: "Example".into(),
backtrace: "Looooong backtrace".into(),
},
"Broken",
),
];
for (line, rendered, expected) in examples {
let actual: &'static str = rendered.into();
assert_eq!(actual, expected, "example on {line}");
}
}
}

View File

@@ -25,8 +25,6 @@ utils.workspace = true
pin-project-lite.workspace = true
workspace_hack.workspace = true
leaky-bucket = "1.0"
[dev-dependencies]
tempfile.workspace = true
test-context.workspace = true

View File

@@ -37,8 +37,6 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
/// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html
/// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
///
/// IAM ratelimit should never be observed with caching credentials provider.
pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
/// No limits on the client side, which currenltly means 1000 for AWS S3.
/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax

View File

@@ -21,7 +21,10 @@ use aws_sdk_s3::{
};
use aws_smithy_http::body::SdkBody;
use hyper::Body;
use tokio::io;
use tokio::{
io::{self, AsyncRead},
sync::Semaphore,
};
use tokio_util::io::ReaderStream;
use tracing::debug;
@@ -102,8 +105,9 @@ pub struct S3Bucket {
prefix_in_bucket: Option<String>,
max_keys_per_list_response: Option<i32>,
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
// The helps to ensure we don't exceed the thresholds.
concurrency_limiter: Arc<leaky_bucket::RateLimiter>,
concurrency_limiter: Arc<Semaphore>,
}
#[derive(Default)]
@@ -154,24 +158,12 @@ impl S3Bucket {
}
prefix
});
let rps = aws_config.concurrency_limit.get();
let concurrency_limiter = leaky_bucket::RateLimiter::builder()
.max(rps)
.initial(0)
// refill it by rps every second. this means the (rps+1)th request will have to wait for
// 1 second from earliest.
.refill(rps)
.interval(std::time::Duration::from_secs(1))
.fair(true)
.build();
Ok(Self {
client,
bucket_name: aws_config.bucket_name.clone(),
max_keys_per_list_response: aws_config.max_keys_per_list_response,
prefix_in_bucket,
concurrency_limiter: Arc::new(concurrency_limiter),
concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
})
}
@@ -203,10 +195,13 @@ impl S3Bucket {
}
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
// while the download could take a long time with `leaky_bucket` we have nothing to release
// once the download is done. this is because with "requests per second" rate limiting on
// s3, there should be no meaning for the long requests.
self.concurrency_limiter.clone().acquire_owned(1).await;
let permit = self
.concurrency_limiter
.clone()
.acquire_owned()
.await
.context("Concurrency limiter semaphore got closed during S3 download")
.map_err(DownloadError::Other)?;
metrics::inc_get_object();
@@ -224,9 +219,10 @@ impl S3Bucket {
let metadata = object_output.metadata().cloned().map(StorageMetadata);
Ok(Download {
metadata,
download_stream: Box::pin(io::BufReader::new(
download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new(
permit,
object_output.body.into_async_read(),
)),
))),
})
}
Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
@@ -242,6 +238,32 @@ impl S3Bucket {
}
}
pin_project_lite::pin_project! {
/// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
struct RatelimitedAsyncRead<S> {
permit: tokio::sync::OwnedSemaphorePermit,
#[pin]
inner: S,
}
}
impl<S: AsyncRead> RatelimitedAsyncRead<S> {
fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
RatelimitedAsyncRead { permit, inner }
}
}
impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.project();
this.inner.poll_read(cx, buf)
}
}
#[async_trait::async_trait]
impl RemoteStorage for S3Bucket {
/// See the doc for `RemoteStorage::list_prefixes`
@@ -267,7 +289,12 @@ impl RemoteStorage for S3Bucket {
let mut continuation_token = None;
loop {
self.concurrency_limiter.acquire_one().await;
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 list")
.map_err(DownloadError::Other)?;
metrics::inc_list_objects();
@@ -312,9 +339,11 @@ impl RemoteStorage for S3Bucket {
to: &RemotePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
// similarly to downloads, the permit does not have live through the upload, but instead we
// are rate limiting requests per second.
self.concurrency_limiter.acquire_one().await;
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 upload")?;
metrics::inc_put_object();
@@ -369,7 +398,11 @@ impl RemoteStorage for S3Bucket {
}
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
self.concurrency_limiter.acquire_one().await;
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 delete")?;
metrics::inc_delete_object();

View File

@@ -0,0 +1,33 @@
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///
/// Can be cloned, moved and kept around in futures as "guard objects".
#[derive(Clone)]
pub struct Completion(mpsc::Sender<()>);
/// Barrier will wait until all clones of [`Completion`] have been dropped.
#[derive(Clone)]
pub struct Barrier(Arc<Mutex<mpsc::Receiver<()>>>);
impl Barrier {
pub async fn wait(self) {
self.0.lock().await.recv().await;
}
pub async fn maybe_wait(barrier: Option<Barrier>) {
if let Some(b) = barrier {
b.wait().await
}
}
}
/// Create new Guard and Barrier pair.
pub fn channel() -> (Completion, Barrier) {
let (tx, rx) = mpsc::channel::<()>(1);
let rx = Mutex::new(rx);
let rx = Arc::new(rx);
(Completion(tx), Barrier(rx))
}

View File

@@ -1,5 +1,5 @@
use crate::auth::{Claims, JwtAuth};
use crate::http::error;
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
use anyhow::{anyhow, Context};
use hyper::header::{HeaderName, AUTHORIZATION};
use hyper::http::HeaderValue;
@@ -16,8 +16,6 @@ use std::future::Future;
use std::net::TcpListener;
use std::str::FromStr;
use super::error::ApiError;
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"libmetrics_metric_handler_requests_total",
@@ -35,8 +33,12 @@ struct RequestId(String);
/// Adds a tracing info_span! instrumentation around the handler events,
/// logs the request start and end events for non-GET requests and non-200 responses.
///
/// Usage: Replace `my_handler` with `|r| request_span(r, my_handler)`
///
/// Use this to distinguish between logs of different HTTP requests: every request handler wrapped
/// in this type will get request info logged in the wrapping span, including the unique request ID.
/// with this will get request info logged in the wrapping span, including the unique request ID.
///
/// This also handles errors, logging them and converting them to an HTTP error response.
///
/// There could be other ways to implement similar functionality:
///
@@ -54,60 +56,56 @@ struct RequestId(String);
/// tries to achive with its `.instrument` used in the current approach.
///
/// If needed, a declarative macro to substitute the |r| ... closure boilerplate could be introduced.
pub struct RequestSpan<E, R, H>(pub H)
pub async fn request_span<R, H>(request: Request<Body>, handler: H) -> R::Output
where
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
R: Future<Output = Result<Response<Body>, E>> + Send + 'static,
H: Fn(Request<Body>) -> R + Send + Sync + 'static;
impl<E, R, H> RequestSpan<E, R, H>
where
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
R: Future<Output = Result<Response<Body>, E>> + Send + 'static,
H: Fn(Request<Body>) -> R + Send + Sync + 'static,
R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
{
/// Creates a tracing span around inner request handler and executes the request handler in the contex of that span.
/// Use as `|r| RequestSpan(my_handler).handle(r)` instead of `my_handler` as the request handler to get the span enabled.
pub async fn handle(self, request: Request<Body>) -> Result<Response<Body>, E> {
let request_id = request.context::<RequestId>().unwrap_or_default().0;
let method = request.method();
let path = request.uri().path();
let request_span = info_span!("request", %method, %path, %request_id);
let request_id = request.context::<RequestId>().unwrap_or_default().0;
let method = request.method();
let path = request.uri().path();
let request_span = info_span!("request", %method, %path, %request_id);
let log_quietly = method == Method::GET;
async move {
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
if log_quietly {
debug!("Handling request");
} else {
info!("Handling request");
}
// Note that we reuse `error::handler` here and not returning and error at all,
// yet cannot use `!` directly in the method signature due to `routerify::RouterBuilder` limitation.
// Usage of the error handler also means that we expect only the `ApiError` errors to be raised in this call.
//
// Panics are not handled separately, there's a `tracing_panic_hook` from another module to do that globally.
let res = (self.0)(request).await;
cancellation_guard.disarm();
match res {
Ok(response) => {
let response_status = response.status();
if log_quietly && response_status.is_success() {
debug!("Request handled, status: {response_status}");
} else {
info!("Request handled, status: {response_status}");
}
Ok(response)
}
Err(e) => Ok(error::handler(e.into()).await),
}
let log_quietly = method == Method::GET;
async move {
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
if log_quietly {
debug!("Handling request");
} else {
info!("Handling request");
}
// No special handling for panics here. There's a `tracing_panic_hook` from another
// module to do that globally.
let res = handler(request).await;
cancellation_guard.disarm();
// Log the result if needed.
//
// We also convert any errors into an Ok response with HTTP error code here.
// `make_router` sets a last-resort error handler that would do the same, but
// we prefer to do it here, before we exit the request span, so that the error
// is still logged with the span.
//
// (Because we convert errors to Ok response, we never actually return an error,
// and we could declare the function to return the never type (`!`). However,
// using `routerify::RouterBuilder` requires a proper error type.)
match res {
Ok(response) => {
let response_status = response.status();
if log_quietly && response_status.is_success() {
debug!("Request handled, status: {response_status}");
} else {
info!("Request handled, status: {response_status}");
}
Ok(response)
}
Err(err) => Ok(api_error_handler(err)),
}
.instrument(request_span)
.await
}
.instrument(request_span)
.await
}
/// Drop guard to WARN in case the request was dropped before completion.
@@ -207,10 +205,8 @@ pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
.middleware(Middleware::post_with_info(
add_request_id_header_to_response,
))
.get("/metrics", |r| {
RequestSpan(prometheus_metrics_handler).handle(r)
})
.err_handler(error::handler)
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.err_handler(route_error_handler)
}
pub fn attach_openapi_ui(
@@ -220,12 +216,14 @@ pub fn attach_openapi_ui(
ui_mount_path: &'static str,
) -> RouterBuilder<hyper::Body, ApiError> {
router_builder
.get(spec_mount_path, move |r| {
RequestSpan(move |_| async move { Ok(Response::builder().body(Body::from(spec)).unwrap()) })
.handle(r)
})
.get(ui_mount_path, move |r| RequestSpan( move |_| async move {
Ok(Response::builder().body(Body::from(format!(r#"
.get(spec_mount_path,
move |r| request_span(r, move |_| async move {
Ok(Response::builder().body(Body::from(spec)).unwrap())
})
)
.get(ui_mount_path,
move |r| request_span(r, move |_| async move {
Ok(Response::builder().body(Body::from(format!(r#"
<!DOCTYPE html>
<html lang="en">
<head>
@@ -255,7 +253,8 @@ pub fn attach_openapi_ui(
</body>
</html>
"#, spec_mount_path))).unwrap())
}).handle(r))
})
)
}
fn parse_token(header_value: &str) -> Result<&str, ApiError> {

View File

@@ -83,13 +83,24 @@ impl HttpErrorBody {
}
}
pub async fn handler(err: routerify::RouteError) -> Response<Body> {
let api_error = err
.downcast::<ApiError>()
.expect("handler should always return api error");
pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
match err.downcast::<ApiError>() {
Ok(api_error) => api_error_handler(*api_error),
Err(other_error) => {
// We expect all the request handlers to return an ApiError, so this should
// not be reached. But just in case.
error!("Error processing HTTP request: {other_error:?}");
HttpErrorBody::response_from_msg_and_status(
other_error.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
)
}
}
}
pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
// Print a stack trace for Internal Server errors
if let ApiError::InternalServerError(_) = api_error.as_ref() {
if let ApiError::InternalServerError(_) = api_error {
error!("Error processing HTTP request: {api_error:?}");
} else {
error!("Error processing HTTP request: {api_error:#}");

View File

@@ -60,6 +60,9 @@ pub mod tracing_span_assert;
pub mod rate_limit;
/// Simple once-barrier and a guard which keeps barrier awaiting.
pub mod completion;
mod failpoint_macro_helpers {
/// use with fail::cfg("$name", "return(2000)")

View File

@@ -335,13 +335,34 @@ fn start_pageserver(
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
// All tenant load operations carry this while they are ongoing; it will be dropped once those
// operations finish either successfully or in some other manner. However, the initial load
// will be then done, and we can start the global background tasks.
let (init_done_tx, init_done_rx) = utils::completion::channel();
// Scan the local 'tenants/' directory and start loading the tenants
let init_started_at = std::time::Instant::now();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client.clone(),
remote_storage.clone(),
(init_done_tx, init_done_rx.clone()),
))?;
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx.clone();
async move {
init_done_rx.wait().await;
let elapsed = init_started_at.elapsed();
tracing::info!(
elapsed_millis = elapsed.as_millis(),
"Initial load completed."
);
}
});
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
// is still accessible even if background task is not configured as long as remote storage has
@@ -353,6 +374,7 @@ fn start_pageserver(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
init_done_rx.clone(),
)?;
}
@@ -390,6 +412,7 @@ fn start_pageserver(
);
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let init_done_rx = init_done_rx;
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
@@ -405,6 +428,13 @@ fn start_pageserver(
"consumption metrics collection",
true,
async move {
// first wait for initial load to complete before first iteration.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
init_done_rx.wait().await;
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,

View File

@@ -108,7 +108,7 @@ pub mod defaults {
#min_resident_size_override = .. # in bytes
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
#gc_feedback = false
# [remote_storage]
"###
@@ -828,6 +828,14 @@ impl PageServerConf {
)?);
}
if let Some(gc_feedback) = item.get("gc_feedback") {
t_conf.gc_feedback = Some(
gc_feedback
.as_bool()
.with_context(|| "configure option gc_feedback is not a bool".to_string())?,
);
}
Ok(t_conf)
}

View File

@@ -88,6 +88,7 @@
use crate::task_mgr::TaskKind;
// The main structure of this module, see module-level comment.
#[derive(Clone, Debug)]
pub struct RequestContext {
task_kind: TaskKind,
download_behavior: DownloadBehavior,
@@ -95,7 +96,7 @@ pub struct RequestContext {
/// Desired behavior if the operation requires an on-demand download
/// to proceed.
#[derive(Clone, Copy, PartialEq, Eq)]
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum DownloadBehavior {
/// Download the layer file. It can take a while.
Download,

View File

@@ -54,6 +54,7 @@ use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument};
use utils::completion;
use utils::serde_percent::Percent;
use crate::{
@@ -82,6 +83,7 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
init_done: completion::Barrier,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured");
@@ -98,6 +100,9 @@ pub fn launch_disk_usage_global_eviction_task(
"disk usage based eviction",
false,
async move {
// wait until initial load is complete, because we cannot evict from loading tenants.
init_done.wait().await;
disk_usage_eviction_task(
&state,
task_config,

View File

@@ -11,7 +11,7 @@ use storage_broker::BrokerClientChannel;
use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::http::endpoint::RequestSpan;
use utils::http::endpoint::request_span;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
@@ -859,7 +859,7 @@ async fn handle_tenant_break(r: Request<Body>) -> Result<Response<Body>, ApiErro
.await
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
tenant.set_broken("broken from test".to_owned());
tenant.set_broken("broken from test".to_owned()).await;
json_response(StatusCode::OK, ())
}
@@ -1179,7 +1179,7 @@ pub fn make_router(
#[cfg(not(feature = "testing"))]
let handler = cfg_disabled;
move |r| RequestSpan(handler).handle(r)
move |r| request_span(r, handler)
}};
}
@@ -1194,54 +1194,50 @@ pub fn make_router(
)
.context("Failed to initialize router state")?,
))
.get("/v1/status", |r| RequestSpan(status_handler).handle(r))
.get("/v1/status", |r| request_span(r, status_handler))
.put(
"/v1/failpoints",
testing_api!("manage failpoints", failpoints_handler),
)
.get("/v1/tenant", |r| RequestSpan(tenant_list_handler).handle(r))
.post("/v1/tenant", |r| {
RequestSpan(tenant_create_handler).handle(r)
})
.get("/v1/tenant/:tenant_id", |r| {
RequestSpan(tenant_status).handle(r)
})
.get("/v1/tenant", |r| request_span(r, tenant_list_handler))
.post("/v1/tenant", |r| request_span(r, tenant_create_handler))
.get("/v1/tenant/:tenant_id", |r| request_span(r, tenant_status))
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
RequestSpan(tenant_size_handler).handle(r)
request_span(r, tenant_size_handler)
})
.put("/v1/tenant/config", |r| {
RequestSpan(update_tenant_config_handler).handle(r)
request_span(r, update_tenant_config_handler)
})
.get("/v1/tenant/:tenant_id/config", |r| {
RequestSpan(get_tenant_config_handler).handle(r)
request_span(r, get_tenant_config_handler)
})
.get("/v1/tenant/:tenant_id/timeline", |r| {
RequestSpan(timeline_list_handler).handle(r)
request_span(r, timeline_list_handler)
})
.post("/v1/tenant/:tenant_id/timeline", |r| {
RequestSpan(timeline_create_handler).handle(r)
request_span(r, timeline_create_handler)
})
.post("/v1/tenant/:tenant_id/attach", |r| {
RequestSpan(tenant_attach_handler).handle(r)
request_span(r, tenant_attach_handler)
})
.post("/v1/tenant/:tenant_id/detach", |r| {
RequestSpan(tenant_detach_handler).handle(r)
request_span(r, tenant_detach_handler)
})
.post("/v1/tenant/:tenant_id/load", |r| {
RequestSpan(tenant_load_handler).handle(r)
request_span(r, tenant_load_handler)
})
.post("/v1/tenant/:tenant_id/ignore", |r| {
RequestSpan(tenant_ignore_handler).handle(r)
request_span(r, tenant_ignore_handler)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
RequestSpan(timeline_detail_handler).handle(r)
request_span(r, timeline_detail_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
|r| RequestSpan(get_lsn_by_timestamp_handler).handle(r),
|r| request_span(r, get_lsn_by_timestamp_handler),
)
.put("/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc", |r| {
RequestSpan(timeline_gc_handler).handle(r)
request_span(r, timeline_gc_handler)
})
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/compact",
@@ -1253,34 +1249,34 @@ pub fn make_router(
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|r| RequestSpan(timeline_download_remote_layers_handler_post).handle(r),
|r| request_span(r, timeline_download_remote_layers_handler_post),
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|r| RequestSpan(timeline_download_remote_layers_handler_get).handle(r),
|r| request_span(r, timeline_download_remote_layers_handler_get),
)
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
RequestSpan(timeline_delete_handler).handle(r)
request_span(r, timeline_delete_handler)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {
RequestSpan(layer_map_info_handler).handle(r)
request_span(r, layer_map_info_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
|r| RequestSpan(layer_download_handler).handle(r),
|r| request_span(r, layer_download_handler),
)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
|r| RequestSpan(evict_timeline_layer_handler).handle(r),
|r| request_span(r, evict_timeline_layer_handler),
)
.put("/v1/disk_usage_eviction/run", |r| {
RequestSpan(disk_usage_eviction_run).handle(r)
request_span(r, disk_usage_eviction_run)
})
.put(
"/v1/tenant/:tenant_id/break",
testing_api!("set tenant state to broken", handle_tenant_break),
)
.get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r))
.get("/v1/panic", |r| request_span(r, always_panic_handler))
.post(
"/v1/tracing/event",
testing_api!("emit a tracing event", post_tracing_event_handler),

View File

@@ -45,6 +45,7 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;
#[tracing::instrument]
pub async fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.

View File

@@ -20,6 +20,7 @@ use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tracing::*;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use std::cmp::min;
@@ -447,6 +448,11 @@ pub enum DeleteTimelineError {
Other(#[from] anyhow::Error),
}
pub enum SetStoppingError {
AlreadyStopping,
Broken,
}
struct RemoteStartupData {
index_part: IndexPart,
remote_metadata: TimelineMetadata,
@@ -645,16 +651,17 @@ impl Tenant {
"attach tenant",
false,
async move {
let doit = async {
tenant_clone.attach(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
anyhow::Ok(())
};
match doit.await {
Ok(_) => {}
match tenant_clone.attach(&ctx).await {
Ok(()) => {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
}
Err(e) => {
tenant_clone.set_broken(e.to_string());
error!("error attaching tenant: {:?}", e);
error!("attach failed, setting tenant state to Broken: {:?}", e);
tenant_clone.state.send_modify(|state| {
assert_eq!(*state, TenantState::Attaching, "the attach task owns the tenant state until activation is complete");
*state = TenantState::broken_from_reason(e.to_string());
});
}
}
Ok(())
@@ -671,6 +678,8 @@ impl Tenant {
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
/// No background tasks are started as part of this routine.
///
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
@@ -881,14 +890,17 @@ impl Tenant {
/// If the loading fails for some reason, the Tenant will go into Broken
/// state.
///
#[instrument(skip(conf, remote_storage, ctx), fields(tenant_id=%tenant_id))]
#[instrument(skip_all, fields(tenant_id=%tenant_id))]
pub fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: Option<(completion::Completion, completion::Barrier)>,
ctx: &RequestContext,
) -> Arc<Tenant> {
debug_assert_current_span_has_tenant_id();
let tenant_conf = match Self::load_tenant_config(conf, tenant_id) {
Ok(conf) => conf,
Err(e) => {
@@ -920,20 +932,27 @@ impl Tenant {
"initial tenant load",
false,
async move {
let doit = async {
tenant_clone.load(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
anyhow::Ok(())
// keep the sender alive as long as we have the initial load ongoing; it will be
// None for loads spawned after init_tenant_mgr.
let (_tx, rx) = if let Some((tx, rx)) = init_done {
(Some(tx), Some(rx))
} else {
(None, None)
};
match doit.await {
Ok(()) => {}
match tenant_clone.load(&ctx).await {
Ok(()) => {
debug!("load finished, activating");
tenant_clone.activate(broker_client, rx.as_ref(), &ctx);
}
Err(err) => {
tenant_clone.set_broken(err.to_string());
error!("could not load tenant {tenant_id}: {err:?}");
error!("load failed, setting tenant state to Broken: {err:?}");
tenant_clone.state.send_modify(|state| {
assert_eq!(*state, TenantState::Loading, "the loading task owns the tenant state until activation is complete");
*state = TenantState::broken_from_reason(err.to_string());
});
}
}
info!("initial load for tenant {tenant_id} finished!");
Ok(())
Ok(())
}
.instrument({
let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
@@ -942,8 +961,6 @@ impl Tenant {
}),
);
info!("spawned load into background");
tenant
}
@@ -951,10 +968,11 @@ impl Tenant {
/// Background task to load in-memory data structures for this tenant, from
/// files on disk. Used at pageserver startup.
///
/// No background tasks are started as part of this routine.
async fn load(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
info!("loading tenant task");
debug!("loading tenant task");
utils::failpoint_sleep_millis_async!("before-loading-tenant");
@@ -964,102 +982,109 @@ impl Tenant {
//
// Scan the directory, peek into the metadata file of each timeline, and
// collect a list of timelines and their ancestors.
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
for entry in std::fs::read_dir(&timelines_dir).with_context(|| {
format!(
"Failed to list timelines directory for tenant {}",
self.tenant_id
)
})? {
let entry = entry.with_context(|| {
format!("cannot read timeline dir entry for {}", self.tenant_id)
})?;
let timeline_dir = entry.path();
let tenant_id = self.tenant_id;
let conf = self.conf;
let span = info_span!("blocking");
if crate::is_temporary(&timeline_dir) {
info!(
"Found temporary timeline directory, removing: {}",
timeline_dir.display()
);
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
error!(
"Failed to remove temporary directory '{}': {:?}",
timeline_dir.display(),
e
let sorted_timelines: Vec<(_, _)> = tokio::task::spawn_blocking(move || {
let _g = span.entered();
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
let timelines_dir = conf.timelines_path(&tenant_id);
for entry in
std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")?
{
let entry = entry.context("read timeline dir entry")?;
let timeline_dir = entry.path();
if crate::is_temporary(&timeline_dir) {
info!(
"Found temporary timeline directory, removing: {}",
timeline_dir.display()
);
}
} else if is_uninit_mark(&timeline_dir) {
let timeline_uninit_mark_file = &timeline_dir;
info!(
"Found an uninit mark file {}, removing the timeline and its uninit mark",
timeline_uninit_mark_file.display()
);
let timeline_id = timeline_uninit_mark_file
.file_stem()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
error!(
"Failed to remove temporary directory '{}': {:?}",
timeline_dir.display(),
e
);
}
} else if is_uninit_mark(&timeline_dir) {
let timeline_uninit_mark_file = &timeline_dir;
info!(
"Found an uninit mark file {}, removing the timeline and its uninit mark",
timeline_uninit_mark_file.display()
);
let timeline_id = timeline_uninit_mark_file
.file_stem()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline uninit mark name {}",
timeline_uninit_mark_file.display()
)
})?;
let timeline_dir = self.conf.timeline_path(&timeline_id, &self.tenant_id);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
} else {
let timeline_id = timeline_dir
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline dir name {}",
timeline_dir.display()
)
})?;
let timeline_uninit_mark_file = self
.conf
.timeline_uninit_mark_file_path(self.tenant_id, timeline_id);
if timeline_uninit_mark_file.exists() {
info!(
"Found an uninit mark file for timeline {}/{}, removing the timeline and its uninit mark",
self.tenant_id, timeline_id
);
})?;
let timeline_dir = conf.timeline_path(&timeline_id, &tenant_id);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file)
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
continue;
}
let file_name = entry.file_name();
if let Ok(timeline_id) =
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
{
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
// A file or directory that doesn't look like a timeline ID
warn!(
"unexpected file or directory in timelines directory: {}",
file_name.to_string_lossy()
);
let timeline_id = timeline_dir
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline dir name {}",
timeline_dir.display()
)
})?;
let timeline_uninit_mark_file =
conf.timeline_uninit_mark_file_path(tenant_id, timeline_id);
if timeline_uninit_mark_file.exists() {
info!(
%timeline_id,
"Found an uninit mark file, removing the timeline and its uninit mark",
);
if let Err(e) = remove_timeline_and_uninit_mark(
&timeline_dir,
&timeline_uninit_mark_file,
) {
error!("Failed to clean up uninit marked timeline: {e:?}");
}
continue;
}
let file_name = entry.file_name();
if let Ok(timeline_id) =
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
{
let metadata = load_metadata(conf, timeline_id, tenant_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
// A file or directory that doesn't look like a timeline ID
warn!(
"unexpected file or directory in timelines directory: {}",
file_name.to_string_lossy()
);
}
}
}
}
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
let sorted_timelines = tree_sort_timelines(timelines_to_load)?;
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
tree_sort_timelines(timelines_to_load)
})
.await
.context("load spawn_blocking")
.and_then(|res| res)?;
// FIXME original collect_timeline_files contained one more check:
// 1. "Timeline has no ancestor and no layer files"
@@ -1069,7 +1094,7 @@ impl Tenant {
.with_context(|| format!("load local timeline {timeline_id}"))?;
}
info!("Done");
trace!("Done");
Ok(())
}
@@ -1436,7 +1461,11 @@ impl Tenant {
Ok(())
}
/// Removes timeline-related in-memory data
/// Shuts down a timeline's tasks, removes its in-memory structures, and deletes its
/// data from disk.
///
/// This doesn't currently delete all data from S3, but sets a flag in its
/// index_part.json file to mark it as deleted.
pub async fn delete_timeline(
&self,
timeline_id: TimelineId,
@@ -1446,7 +1475,11 @@ impl Tenant {
// Transition the timeline into TimelineState::Stopping.
// This should prevent new operations from starting.
let timeline = {
//
// Also grab the Timeline's delete_lock to prevent another deletion from starting.
let timeline;
let mut delete_lock_guard;
{
let mut timelines = self.timelines.lock().unwrap();
// Ensure that there are no child timelines **attached to that pageserver**,
@@ -1464,20 +1497,36 @@ impl Tenant {
Entry::Vacant(_) => return Err(DeleteTimelineError::NotFound),
};
let timeline = Arc::clone(timeline_entry.get());
timeline = Arc::clone(timeline_entry.get());
// Prevent two tasks from trying to delete the timeline at the same time.
//
// XXX: We should perhaps return an HTTP "202 Accepted" to signal that the caller
// needs to poll until the operation has finished. But for now, we return an
// error, because the control plane knows to retry errors.
delete_lock_guard = timeline.delete_lock.try_lock().map_err(|_| {
DeleteTimelineError::Other(anyhow::anyhow!(
"timeline deletion is already in progress"
))
})?;
// If another task finished the deletion just before we acquired the lock,
// return success.
if *delete_lock_guard {
return Ok(());
}
timeline.set_state(TimelineState::Stopping);
drop(timelines);
timeline
};
}
// Now that the Timeline is in Stopping state, request all the related tasks to
// shut down.
//
// NB: If you call delete_timeline multiple times concurrently, they will
// all go through the motions here. Make sure the code here is idempotent,
// and don't error out if some of the shutdown tasks have already been
// completed!
// NB: If this fails half-way through, and is retried, the retry will go through
// all the same steps again. Make sure the code here is idempotent, and don't
// error out if some of the shutdown tasks have already been completed!
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
@@ -1518,6 +1567,10 @@ impl Tenant {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
//
// AlreadyInProgress shouldn't happen, because the 'delete_lock' prevents
// two tasks from performing the deletion at the same time. The first task
// that starts deletion should run it to completion.
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
return Err(DeleteTimelineError::Other(anyhow::anyhow!(e)));
@@ -1528,14 +1581,12 @@ impl Tenant {
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//
// This lock prevents multiple concurrent delete_timeline calls from
// stepping on each other's toes, while deleting the files. It also
// prevents GC or compaction from running at the same time.
// This lock prevents prevents GC or compaction from running at the same time.
// The GC task doesn't register itself with the timeline it's operating on,
// so it might still be running even though we called `shutdown_tasks`.
//
// Note that there are still other race conditions between
// GC, compaction and timeline deletion. GC task doesn't
// register itself properly with the timeline it's
// operating on. See
// GC, compaction and timeline deletion. See
// https://github.com/neondatabase/neon/issues/2671
//
// No timeout here, GC & Compaction should be responsive to the
@@ -1597,37 +1648,27 @@ impl Tenant {
});
// Remove the timeline from the map.
let mut timelines = self.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
{
let mut timelines = self.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
timelines.remove(&timeline_id).expect(
"timeline that we were deleting was concurrently removed from 'timelines' map",
);
}
let removed_timeline = timelines.remove(&timeline_id);
if removed_timeline.is_none() {
// This can legitimately happen if there's a concurrent call to this function.
// T1 T2
// lock
// unlock
// lock
// unlock
// remove files
// lock
// remove from map
// unlock
// return
// remove files
// lock
// remove from map observes empty map
// unlock
// return
debug!("concurrent call to this function won the race");
}
drop(timelines);
// All done! Mark the deletion as completed and release the delete_lock
*delete_lock_guard = true;
drop(delete_lock_guard);
Ok(())
}
@@ -1644,127 +1685,193 @@ impl Tenant {
fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
init_done: Option<&completion::Barrier>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) {
debug_assert_current_span_has_tenant_id();
let mut result = Ok(());
let mut activating = false;
self.state.send_modify(|current_state| {
use pageserver_api::models::ActivatingFrom;
match &*current_state {
TenantState::Active => {
// activate() was called on an already Active tenant. Shouldn't happen.
result = Err(anyhow::anyhow!("Tenant is already active"));
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
}
TenantState::Broken { reason, .. } => {
// This shouldn't happen either
result = Err(anyhow::anyhow!(
"Could not activate tenant because it is in broken state due to: {reason}",
));
TenantState::Loading => {
*current_state = TenantState::Activating(ActivatingFrom::Loading);
}
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
// loading or attaching the tenant.
info!("Tenant is already in Stopping state, skipping activation");
}
TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Active;
debug!(tenant_id = %self.tenant_id, "Activating tenant");
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self);
let mut activated_timelines = 0;
for timeline in not_broken_timelines {
timeline.activate(broker_client.clone(), ctx);
activated_timelines += 1;
}
let elapsed = self.loading_started_at.elapsed();
let total_timelines = timelines_accessor.len();
// log a lot of stuff, because some tenants sometimes suffer from user-visible
// times to activate. see https://github.com/neondatabase/neon/issues/4025
info!(
since_creation_millis = elapsed.as_millis(),
tenant_id = %self.tenant_id,
activated_timelines,
total_timelines,
post_state = <&'static str>::from(&*current_state),
"activation attempt finished"
);
TenantState::Attaching => {
*current_state = TenantState::Activating(ActivatingFrom::Attaching);
}
}
debug!(tenant_id = %self.tenant_id, "Activating tenant");
activating = true;
// Continue outside the closure. We need to grab timelines.lock()
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
});
result
if activating {
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self, init_done);
let mut activated_timelines = 0;
for timeline in not_broken_timelines {
timeline.activate(broker_client.clone(), ctx);
activated_timelines += 1;
}
self.state.send_modify(move |current_state| {
assert!(
matches!(current_state, TenantState::Activating(_)),
"set_stopping and set_broken wait for us to leave Activating state",
);
*current_state = TenantState::Active;
let elapsed = self.loading_started_at.elapsed();
let total_timelines = timelines_accessor.len();
// log a lot of stuff, because some tenants sometimes suffer from user-visible
// times to activate. see https://github.com/neondatabase/neon/issues/4025
info!(
since_creation_millis = elapsed.as_millis(),
tenant_id = %self.tenant_id,
activated_timelines,
total_timelines,
post_state = <&'static str>::from(&*current_state),
"activation attempt finished"
);
});
}
}
/// Change tenant status to Stopping, to mark that it is being shut down
pub fn set_stopping(&self) {
self.state.send_modify(|current_state| {
match current_state {
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Stopping;
/// Change tenant status to Stopping, to mark that it is being shut down.
///
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
///
/// This function is not cancel-safe!
pub async fn set_stopping(&self) -> Result<(), SetStoppingError> {
let mut rx = self.state.subscribe();
// FIXME: If the tenant is still Loading or Attaching, new timelines
// might be created after this. That's harmless, as the Timelines
// won't be accessible to anyone, when the Tenant is in Stopping
// state.
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Stopping);
}
}
TenantState::Broken { reason, .. } => {
info!("Cannot set tenant to Stopping state, it is in Broken state due to: {reason}");
}
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
// loading or attaching the tenant.
info!("Tenant is already in Stopping state");
}
// cannot stop before we're done activating, so wait out until we're done activating
rx.wait_for(|state| match state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
);
false
}
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true,
})
.await
.expect("cannot drop self.state while on a &self method");
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
let mut err = None;
let stopping = self.state.send_if_modified(|current_state| match current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
// are created after the transition to Stopping. That's harmless, as the Timelines
// won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
*current_state = TenantState::Stopping;
// Continue stopping outside the closure. We need to grab timelines.lock()
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
true
}
TenantState::Broken { reason, .. } => {
info!(
"Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
);
err = Some(SetStoppingError::Broken);
false
}
TenantState::Stopping => {
info!("Tenant is already in Stopping state");
err = Some(SetStoppingError::AlreadyStopping);
false
}
});
match (stopping, err) {
(true, None) => {} // continue
(false, Some(err)) => return Err(err),
(true, Some(_)) => unreachable!(
"send_if_modified closure must error out if not transitioning to Stopping"
),
(false, None) => unreachable!(
"send_if_modified closure must return true if transitioning to Stopping"
),
}
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Stopping);
}
Ok(())
}
pub fn set_broken(&self, reason: String) {
/// Method for tenant::mgr to transition us into Broken state in case of a late failure in
/// `remove_tenant_from_memory`
///
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
///
/// In tests, we also use this to set tenants to Broken state on purpose.
pub(crate) async fn set_broken(&self, reason: String) {
let mut rx = self.state.subscribe();
// The load & attach routines own the tenant state until it has reached `Active`.
// So, wait until it's done.
rx.wait_for(|state| match state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
);
false
}
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true,
})
.await
.expect("cannot drop self.state while on a &self method");
// we now know we're done activating, let's see whether this task is the winner to transition into Broken
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
// Broken tenants can currently only used for fatal errors that happen
// while loading or attaching a tenant. A tenant that has already been
// activated should never be marked as broken. We cope with it the best
// we can, but it shouldn't happen.
warn!("Changing Active tenant to Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
if cfg!(feature = "testing") {
warn!("Changing Active tenant to Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
} else {
unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
}
}
TenantState::Broken { .. } => {
// This shouldn't happen either
warn!("Tenant is already in Broken state");
}
// This is the only "expected" path, any other path is a bug.
TenantState::Stopping => {
// This shouldn't happen either
warn!(
"Marking Stopping tenant as Broken state, reason: {}",
reason
);
*current_state = TenantState::broken_from_reason(reason);
}
TenantState::Loading | TenantState::Attaching => {
info!("Setting tenant as Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
}
}
}
});
}
@@ -1777,7 +1884,7 @@ impl Tenant {
loop {
let current_state = receiver.borrow_and_update().clone();
match current_state {
TenantState::Loading | TenantState::Attaching => {
TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
// in these states, there's a chance that we can reach ::Active
receiver.changed().await.map_err(
|_e: tokio::sync::watch::error::RecvError| {
@@ -3102,6 +3209,7 @@ pub mod harness {
evictions_low_residence_duration_metric_threshold: Some(
tenant_conf.evictions_low_residence_duration_metric_threshold,
),
gc_feedback: Some(tenant_conf.gc_feedback),
}
}
}

View File

@@ -99,6 +99,7 @@ pub struct TenantConf {
// See the corresponding metric's help string.
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Duration,
pub gc_feedback: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -175,6 +176,10 @@ pub struct TenantConfOpt {
#[serde(with = "humantime_serde")]
#[serde(default)]
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub gc_feedback: Option<bool>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -242,6 +247,7 @@ impl TenantConfOpt {
evictions_low_residence_duration_metric_threshold: self
.evictions_low_residence_duration_metric_threshold
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
}
}
}
@@ -278,6 +284,7 @@ impl Default for TenantConf {
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
gc_feedback: false,
}
}
}
@@ -372,6 +379,7 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
))?,
);
}
tenant_conf.gc_feedback = request_data.gc_feedback;
Ok(tenant_conf)
}

View File

@@ -204,6 +204,35 @@ fn test_off_by_one() {
assert_eq!(version.image_coverage.query(5), None);
}
/// White-box regression test, checking for incorrect removal of node at key.end
#[test]
fn test_regression() {
let mut map = HistoricLayerCoverage::<String>::new();
map.insert(
LayerKey {
key: 0..5,
lsn: 0..5,
is_image: false,
},
"Layer 1".to_string(),
);
map.insert(
LayerKey {
key: 0..5,
lsn: 1..2,
is_image: false,
},
"Layer 2".to_string(),
);
// If an insertion operation improperly deletes the endpoint of a previous layer
// (which is more likely to happen with layers that collide on key.end), we will
// end up with an infinite layer, covering the entire keyspace. Here we assert
// that there's no layer at key 100 because we didn't insert any layer there.
let version = map.get_version(100).unwrap();
assert_eq!(version.delta_coverage.query(100), None);
}
/// Cover edge cases where layers begin or end on the same key
#[test]
fn test_key_collision() {

View File

@@ -1,8 +1,8 @@
use std::ops::Range;
// TODO the `im` crate has 20x more downloads and also has
// persistent/immutable BTree. It also runs a bit faster but
// results are not the same on some tests.
// NOTE the `im` crate has 20x more downloads and also has
// persistent/immutable BTree. But it's bugged so rpds is a
// better choice https://github.com/neondatabase/neon/issues/3395
use rpds::RedBlackTreeMapSync;
/// Data structure that can efficiently:
@@ -10,19 +10,22 @@ use rpds::RedBlackTreeMapSync;
/// - iterate the latest layers in a key range
/// - insert layers in non-decreasing lsn.start order
///
/// The struct is parameterized over Value for easier
/// testing, but in practice it's some sort of layer.
/// For a detailed explanation and justification of this approach, see:
/// https://neon.tech/blog/persistent-structures-in-neons-wal-indexing
///
/// NOTE The struct is parameterized over Value for easier
/// testing, but in practice it's some sort of layer.
pub struct LayerCoverage<Value> {
/// For every change in coverage (as we sweep the key space)
/// we store (lsn.end, value).
///
/// We use an immutable/persistent tree so that we can keep historic
/// versions of this coverage without cloning the whole thing and
/// incurring quadratic memory cost. See HistoricLayerCoverage.
/// NOTE We use an immutable/persistent tree so that we can keep historic
/// versions of this coverage without cloning the whole thing and
/// incurring quadratic memory cost. See HistoricLayerCoverage.
///
/// We use the Sync version of the map because we want Self to
/// be Sync. Using nonsync might be faster, if we can work with
/// that.
/// NOTE We use the Sync version of the map because we want Self to
/// be Sync. Using nonsync might be faster, if we can work with
/// that.
nodes: RedBlackTreeMapSync<i128, Option<(u64, Value)>>,
}
@@ -41,6 +44,13 @@ impl<Value: Clone> LayerCoverage<Value> {
/// Helper function to subdivide the key range without changing any values
///
/// This operation has no semantic effect by itself. It only helps us pin in
/// place the part of the coverage we don't want to change when inserting.
///
/// As an analogy, think of a polygon. If you add a vertex along one of the
/// segments, the polygon is still the same, but it behaves differently when
/// we move or delete one of the other points.
///
/// Complexity: O(log N)
fn add_node(&mut self, key: i128) {
let value = match self.nodes.range(..=key).last() {
@@ -74,7 +84,7 @@ impl<Value: Clone> LayerCoverage<Value> {
let mut to_update = Vec::new();
let mut to_remove = Vec::new();
let mut prev_covered = false;
for (k, node) in self.nodes.range(key.clone()) {
for (k, node) in self.nodes.range(key) {
let needs_cover = match node {
None => true,
Some((h, _)) => h < &lsn.end,
@@ -87,9 +97,8 @@ impl<Value: Clone> LayerCoverage<Value> {
}
prev_covered = needs_cover;
}
if !prev_covered {
to_remove.push(key.end);
}
// TODO check if the nodes inserted at key.start and key.end are safe
// to remove. It's fine to keep them but they could be redundant.
for k in to_update {
self.nodes.insert_mut(k, Some((lsn.end, value.clone())));
}

View File

@@ -10,6 +10,7 @@ use tokio::fs;
use anyhow::Context;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tracing::*;
use remote_storage::GenericRemoteStorage;
@@ -19,9 +20,12 @@ use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::tenant::{
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
};
use crate::IGNORED_TENANT_FILE_NAME;
use utils::completion;
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
@@ -63,6 +67,7 @@ pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: (completion::Completion, completion::Barrier),
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
@@ -119,6 +124,7 @@ pub async fn init_tenant_mgr(
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
Some(init_done.clone()),
&ctx,
) {
Ok(tenant) => {
@@ -154,6 +160,7 @@ pub fn schedule_local_tenant_processing(
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: Option<(completion::Completion, completion::Barrier)>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
@@ -207,7 +214,14 @@ pub fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx)
Tenant::spawn_load(
conf,
tenant_id,
broker_client,
remote_storage,
init_done,
ctx,
)
};
Ok(tenant)
}
@@ -222,6 +236,7 @@ pub fn schedule_local_tenant_processing(
/// That could be easily misinterpreted by control plane, the consumer of the
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
#[instrument]
pub async fn shutdown_all_tenants() {
// Prevent new tenants from being created.
let tenants_to_shut_down = {
@@ -244,15 +259,65 @@ pub async fn shutdown_all_tenants() {
}
};
// Set tenant (and its timlines) to Stoppping state.
//
// Since we can only transition into Stopping state after activation is complete,
// run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
//
// Transitioning tenants to Stopping state has a couple of non-obvious side effects:
// 1. Lock out any new requests to the tenants.
// 2. Signal cancellation to WAL receivers (we wait on it below).
// 3. Signal cancellation for other tenant background loops.
// 4. ???
//
// The waiting for the cancellation is not done uniformly.
// We certainly wait for WAL receivers to shut down.
// That is necessary so that no new data comes in before the freeze_and_flush.
// But the tenant background loops are joined-on in our caller.
// It's mesed up.
let mut join_set = JoinSet::new();
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
for (_, tenant) in tenants_to_shut_down {
if tenant.is_active() {
// updates tenant state, forbidding new GC and compaction iterations from starting
tenant.set_stopping();
tenants_to_freeze_and_flush.push(tenant);
for (tenant_id, tenant) in tenants_to_shut_down {
join_set.spawn(
async move {
match tenant.set_stopping().await {
Ok(()) => debug!("tenant successfully stopped"),
Err(SetStoppingError::Broken) => {
info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well");
},
Err(SetStoppingError::AlreadyStopping) => {
// our task_mgr::shutdown_tasks are going to coalesce on that just fine
}
}
tenant
}
.instrument(info_span!("set_stopping", %tenant_id)),
);
}
let mut panicked = 0;
while let Some(res) = join_set.join_next().await {
match res {
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the futures");
}
Err(join_error) if join_error.is_panic() => {
// cannot really do anything, as this panic is likely a bug
panicked += 1;
}
Err(join_error) => {
warn!("unknown kind of JoinError: {join_error}");
}
Ok(tenant) => tenants_to_freeze_and_flush.push(tenant),
}
}
if panicked > 0 {
warn!(panicked, "observed panicks while stopping tenants");
}
// Shut down all existing walreceiver connections and stop accepting the new ones.
task_mgr::shutdown_tasks(Some(TaskKind::WalReceiverManager), None, None).await;
@@ -264,12 +329,30 @@ pub async fn shutdown_all_tenants() {
// should be no more activity in any of the repositories.
//
// On error, log it but continue with the shutdown for other tenants.
let mut join_set = tokio::task::JoinSet::new();
for tenant in tenants_to_freeze_and_flush {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
if let Err(err) = tenant.freeze_and_flush().await {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
join_set.spawn(
async move {
if let Err(err) = tenant.freeze_and_flush().await {
warn!("Could not checkpoint tenant during shutdown: {err:?}");
}
}
.instrument(info_span!("freeze_and_flush", %tenant_id)),
);
}
while let Some(next) = join_set.join_next().await {
match next {
Ok(()) => {}
Err(join_error) if join_error.is_cancelled() => {
unreachable!("no cancelling")
}
Err(join_error) if join_error.is_panic() => { /* reported already */ }
Err(join_error) => warn!("unknown kind of JoinError: {join_error}"),
}
}
}
@@ -291,7 +374,7 @@ pub async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -437,7 +520,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -510,7 +593,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -589,13 +672,23 @@ where
{
let tenants_accessor = TENANTS.write().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => match tenant.current_state() {
TenantState::Attaching
| TenantState::Loading
| TenantState::Broken { .. }
| TenantState::Active => tenant.set_stopping(),
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
},
Some(tenant) => {
let tenant = Arc::clone(tenant);
// don't hold TENANTS lock while set_stopping waits for activation to finish
drop(tenants_accessor);
match tenant.set_stopping().await {
Ok(()) => {
// we won, continue stopping procedure
}
Err(SetStoppingError::Broken) => {
// continue the procedure, let's hope the closure can deal with broken tenants
}
Err(SetStoppingError::AlreadyStopping) => {
// the tenant is already stopping or broken, don't do anything
return Err(TenantStateError::IsStopping(tenant_id));
}
}
}
None => return Err(TenantStateError::NotFound(tenant_id)),
}
}
@@ -620,7 +713,7 @@ where
let tenants_accessor = TENANTS.read().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
tenant.set_broken(e.to_string());
tenant.set_broken(e.to_string()).await;
}
None => {
warn!("Tenant {tenant_id} got removed from memory");

View File

@@ -19,14 +19,8 @@ fn parallel_worker(paths: &[PathBuf], next_path_idx: &AtomicUsize) -> io::Result
Ok(())
}
pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
const PARALLEL_PATH_THRESHOLD: usize = 1;
if paths.len() <= PARALLEL_PATH_THRESHOLD {
for path in paths {
fsync_path(path)?;
}
return Ok(());
}
fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
// TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
/// Use at most this number of threads.
/// Increasing this limit will
@@ -36,11 +30,11 @@ pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
let num_threads = paths.len().min(MAX_NUM_THREADS);
let next_path_idx = AtomicUsize::new(0);
crossbeam_utils::thread::scope(|s| -> io::Result<()> {
std::thread::scope(|s| -> io::Result<()> {
let mut handles = vec![];
// Spawn `num_threads - 1`, as the current thread is also a worker.
for _ in 1..num_threads {
handles.push(s.spawn(|_| parallel_worker(paths, &next_path_idx)));
handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
}
parallel_worker(paths, &next_path_idx)?;
@@ -51,5 +45,41 @@ pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
Ok(())
})
.unwrap()
}
/// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
if paths.len() == 1 {
fsync_path(&paths[0])?;
return Ok(());
}
fsync_in_thread_pool(paths)
}
/// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current
/// execution thread. Otherwise, we will spawn_blocking and run it in tokio.
pub async fn par_fsync_async(paths: &[PathBuf]) -> io::Result<()> {
const MAX_CONCURRENT_FSYNC: usize = 64;
let mut next = paths.iter().peekable();
let mut js = tokio::task::JoinSet::new();
loop {
while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
let next = next.next().expect("just peeked");
let next = next.to_owned();
js.spawn_blocking(move || fsync_path(&next));
}
// now the joinset has been filled up, wait for next to complete
if let Some(res) = js.join_next().await {
res??;
} else {
// last item had already completed
assert!(
next.peek().is_none(),
"joinset emptied, we shouldn't have more work"
);
return Ok(());
}
}
}

View File

@@ -12,8 +12,9 @@ use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::completion;
pub fn start_background_loops(tenant: &Arc<Tenant>) {
pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completion::Barrier>) {
let tenant_id = tenant.tenant_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
@@ -24,7 +25,9 @@ pub fn start_background_loops(tenant: &Arc<Tenant>) {
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
compaction_loop(tenant)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
@@ -41,7 +44,9 @@ pub fn start_background_loops(tenant: &Arc<Tenant>) {
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
gc_loop(tenant)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;

View File

@@ -195,8 +195,9 @@ pub struct Timeline {
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`].
pub(super) layer_removal_cs: tokio::sync::Mutex<()>,
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
@@ -235,6 +236,10 @@ pub struct Timeline {
state: watch::Sender<TimelineState>,
/// Prevent two tasks from deleting the timeline at the same time. If held, the
/// timeline is being deleted. If 'true', the timeline has already been deleted.
pub delete_lock: tokio::sync::Mutex<bool>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
}
@@ -669,7 +674,7 @@ impl Timeline {
}
/// Outermost timeline compaction operation; downloads needed layers.
pub async fn compact(&self, ctx: &RequestContext) -> anyhow::Result<()> {
pub async fn compact(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
const ROUNDS: usize = 2;
let last_record_lsn = self.get_last_record_lsn();
@@ -758,7 +763,7 @@ impl Timeline {
}
/// Compaction which might need to be retried after downloading remote layers.
async fn compact_inner(&self, ctx: &RequestContext) -> Result<(), CompactionError> {
async fn compact_inner(self: &Arc<Self>, ctx: &RequestContext) -> Result<(), CompactionError> {
//
// High level strategy for compaction / image creation:
//
@@ -793,7 +798,7 @@ impl Timeline {
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let layer_removal_cs = self.layer_removal_cs.lock().await;
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
// Is the timeline being deleted?
let state = *self.state.borrow();
if state == TimelineState::Stopping {
@@ -827,7 +832,7 @@ impl Timeline {
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(&layer_removal_cs, target_file_size, ctx)
self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
.await?;
timer.stop_and_record();
}
@@ -1286,6 +1291,13 @@ impl Timeline {
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
fn get_gc_feedback(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.gc_feedback
.unwrap_or(self.conf.default_tenant_conf.gc_feedback)
}
pub(super) fn tenant_conf_updated(&self) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
@@ -1413,6 +1425,7 @@ impl Timeline {
eviction_task_timeline_state: tokio::sync::Mutex::new(
EvictionTaskTimelineState::default(),
),
delete_lock: tokio::sync::Mutex::new(false),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -1904,6 +1917,7 @@ impl Timeline {
// no cancellation here, because nothing really waits for this to complete compared
// to spawn_ondemand_logical_size_calculation.
let cancel = CancellationToken::new();
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
.await
@@ -2168,7 +2182,7 @@ impl Timeline {
fn delete_historic_layer(
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
) -> anyhow::Result<()> {
@@ -2632,7 +2646,7 @@ impl Timeline {
/// Layer flusher task's main loop.
async fn flush_loop(
&self,
self: &Arc<Self>,
mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
ctx: &RequestContext,
) {
@@ -2721,9 +2735,9 @@ impl Timeline {
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
#[instrument(skip(self, frozen_layer, ctx), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
async fn flush_frozen_layer(
&self,
self: &Arc<Self>,
frozen_layer: Arc<InMemoryLayer>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -2743,7 +2757,16 @@ impl Timeline {
.await?
} else {
// normal case, write out a L0 delta layer file.
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?;
let this = self.clone();
let frozen_layer = frozen_layer.clone();
let span = tracing::info_span!("blocking");
let (delta_path, metadata) = tokio::task::spawn_blocking(move || {
let _g = span.entered();
this.create_delta_layer(&frozen_layer)
})
.await
.context("create_delta_layer spawn_blocking")
.and_then(|res| res)?;
HashMap::from([(delta_path, metadata)])
};
@@ -2847,7 +2870,7 @@ impl Timeline {
// Write out the given frozen in-memory layer as a new L0 delta file
fn create_delta_layer(
&self,
self: &Arc<Self>,
frozen_layer: &InMemoryLayer,
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
// Write it out
@@ -2863,10 +2886,13 @@ impl Timeline {
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
// files to flush, it might be better to first write them all, and then fsync
// them all in parallel.
par_fsync::par_fsync(&[
new_delta_path.clone(),
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
])?;
// First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace
// this with a single fsync in future refactors.
par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?;
// Then sync the parent directory.
par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
.context("fsync of timeline dir")?;
// Add it to the layer map
let l = Arc::new(new_delta);
@@ -3090,17 +3116,22 @@ impl Timeline {
let all_paths = image_layers
.iter()
.map(|layer| layer.path())
.chain(std::iter::once(
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
))
.collect::<Vec<_>>();
par_fsync::par_fsync(&all_paths).context("fsync of newly created layer files")?;
par_fsync::par_fsync_async(&all_paths)
.await
.context("fsync of newly created layer files")?;
par_fsync::par_fsync_async(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
.await
.context("fsync of timeline dir")?;
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
let mut layers = self.layers.write().unwrap();
let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for l in image_layers {
let path = l.filename();
let metadata = timeline_path
@@ -3159,9 +3190,9 @@ impl Timeline {
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
/// start of level0 files compaction, the on-demand download should be revisited as well.
async fn compact_level0_phase1(
fn compact_level0_phase1(
&self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
@@ -3474,13 +3505,13 @@ impl Timeline {
if !new_layers.is_empty() {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// also sync the directory
layer_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
.context("fsync of timeline dir")?;
layer_paths.pop().unwrap();
}
@@ -3497,17 +3528,26 @@ impl Timeline {
/// as Level 1 files.
///
async fn compact_level0(
&self,
layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
self: &Arc<Self>,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
let this = self.clone();
let ctx_inner = ctx.clone();
let layer_removal_cs_inner = layer_removal_cs.clone();
let span = tracing::info_span!("blocking");
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
} = self
.compact_level0_phase1(layer_removal_cs, target_file_size, ctx)
.await?;
} = tokio::task::spawn_blocking(move || {
let _g = span.entered();
this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner)
})
.await
.context("compact_level0_phase1 spawn_blocking")
.map_err(CompactionError::Other)
.and_then(|res| res)?;
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do
@@ -3565,7 +3605,7 @@ impl Timeline {
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact {
layer_names_to_delete.push(l.filename());
self.delete_historic_layer(layer_removal_cs, l, &mut updates)?;
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?;
}
updates.flush();
drop(layers);
@@ -3685,7 +3725,7 @@ impl Timeline {
fail_point!("before-timeline-gc");
let layer_removal_cs = self.layer_removal_cs.lock().await;
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
// Is the timeline being deleted?
let state = *self.state.borrow();
if state == TimelineState::Stopping {
@@ -3705,7 +3745,7 @@ impl Timeline {
let res = self
.gc_timeline(
&layer_removal_cs,
layer_removal_cs.clone(),
horizon_cutoff,
pitr_cutoff,
retain_lsns,
@@ -3724,7 +3764,7 @@ impl Timeline {
async fn gc_timeline(
&self,
layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
horizon_cutoff: Lsn,
pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>,
@@ -3864,7 +3904,7 @@ impl Timeline {
// delta layers. Image layers can form "stairs" preventing old image from been deleted.
// But image layers are in any case less sparse than delta layers. Also we need some
// protection from replacing recent image layers with new one after each GC iteration.
if l.is_incremental() && !LayerMap::is_l0(&*l) {
if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&*l) {
wanted_image_layers.add_range(l.get_key_range());
}
result.layers_not_updated += 1;
@@ -3897,7 +3937,11 @@ impl Timeline {
{
for doomed_layer in layers_to_remove {
layer_names_to_delete.push(doomed_layer.filename());
self.delete_historic_layer(layer_removal_cs, doomed_layer, &mut updates)?; // FIXME: schedule succeeded deletions before returning?
self.delete_historic_layer(
layer_removal_cs.clone(),
doomed_layer,
&mut updates,
)?; // FIXME: schedule succeeded deletions before returning?
result.layers_removed += 1;
}
}

View File

@@ -19,8 +19,10 @@ use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use crate::metrics::BROKER_ITERATION_TIMELINES;
use crate::metrics::BROKER_PULLED_UPDATES;
use crate::metrics::BROKER_PUSHED_UPDATES;
use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
@@ -61,8 +63,14 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
BROKER_PUSHED_UPDATES.inc();
}
let elapsed = now.elapsed();
// Log duration every second. Should be about 10MB of logs per day.
info!("pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
BROKER_ITERATION_TIMELINES.observe(active_tlis.len() as f64);
if elapsed > push_interval / 2 {
info!("broker push is too long, pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
}
sleep(push_interval).await;
}
};

View File

@@ -125,6 +125,25 @@ pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_backup_errors_total counter")
});
pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_broker_push_update_seconds",
"Seconds to push all timeline updates to the broker",
DISK_WRITE_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
});
pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
];
pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_broker_iteration_timelines",
"Count of timelines pushed to the broker in a single iteration",
TIMELINES_COUNT_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
});
pub const LABEL_UNKNOWN: &str = "unknown";

View File

@@ -1,5 +1,5 @@
//
// The script parses Allure reports and posts a comment with a summary of the test results to the PR.
// The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch.
//
// The comment is updated on each run with the latest results.
//
@@ -7,7 +7,7 @@
// - uses: actions/github-script@v6
// with:
// script: |
// const script = require("./scripts/pr-comment-test-report.js")
// const script = require("./scripts/comment-test-report.js")
// await script({
// github,
// context,
@@ -35,8 +35,12 @@ class DefaultMap extends Map {
module.exports = async ({ github, context, fetch, report }) => {
// Marker to find the comment in the subsequent runs
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
// If we run the script in the PR or in the branch (main/release/...)
const isPullRequest = !!context.payload.pull_request
// Latest commit in PR or in the branch
const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha
// Let users know that the comment is updated automatically
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${context.payload.pull_request.head.sha} at ${new Date().toISOString()} :recycle:</sub></div>`
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${commitSha} at ${new Date().toISOString()} :recycle:</sub></div>`
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
const githubActionsBotId = 41898282
// Commend body itself
@@ -166,22 +170,39 @@ module.exports = async ({ github, context, fetch, report }) => {
commentBody += autoupdateNotice
const { data: comments } = await github.rest.issues.listComments({
issue_number: context.payload.number,
let createCommentFn, listCommentsFn, updateCommentFn, issueNumberOrSha
if (isPullRequest) {
createCommentFn = github.rest.issues.createComment
listCommentsFn = github.rest.issues.listComments
updateCommentFn = github.rest.issues.updateComment
issueNumberOrSha = {
issue_number: context.payload.number,
}
} else {
updateCommentFn = github.rest.repos.updateCommitComment
listCommentsFn = github.rest.repos.listCommentsForCommit
createCommentFn = github.rest.repos.createCommitComment
issueNumberOrSha = {
commit_sha: commitSha,
}
}
const { data: comments } = await listCommentsFn({
...issueNumberOrSha,
...ownerRepoParams,
})
const comment = comments.find(comment => comment.user.id === githubActionsBotId && comment.body.startsWith(startMarker))
if (comment) {
await github.rest.issues.updateComment({
await updateCommentFn({
comment_id: comment.id,
body: commentBody,
...ownerRepoParams,
})
} else {
await github.rest.issues.createComment({
issue_number: context.payload.number,
await createCommentFn({
body: commentBody,
...issueNumberOrSha,
...ownerRepoParams,
})
}

View File

@@ -156,7 +156,9 @@ class LLVM:
profdata: Path,
objects: List[str],
sources: List[str],
demangler: Optional[Path] = None) -> None:
demangler: Optional[Path] = None,
output_file: Optional[Path] = None,
) -> None:
cwd = self.cargo.cwd
objects = list(intersperse('-object', objects))
@@ -180,14 +182,18 @@ class LLVM:
*objects,
*sources,
]
subprocess.check_call(cmd, cwd=cwd)
if output_file is not None:
with output_file.open('w') as outfile:
subprocess.check_call(cmd, cwd=cwd, stdout=outfile)
else:
subprocess.check_call(cmd, cwd=cwd)
def cov_report(self, **kwargs) -> None:
self._cov(subcommand='report', **kwargs)
def cov_export(self, *, kind: str, **kwargs) -> None:
def cov_export(self, *, kind: str, output_file: Optional[Path], **kwargs) -> None:
extras = (f'-format={kind}', )
self._cov(subcommand='export', *extras, **kwargs)
self._cov(subcommand='export', *extras, output_file=output_file, **kwargs)
def cov_show(self, *, kind: str, output_dir: Optional[Path] = None, **kwargs) -> None:
extras = [f'-format={kind}']
@@ -283,9 +289,12 @@ class TextReport(Report):
self.llvm.cov_show(kind='text', **self._common_kwargs())
@dataclass
class LcovReport(Report):
output_file: Path
def generate(self) -> None:
self.llvm.cov_export(kind='lcov', **self._common_kwargs())
self.llvm.cov_export(kind='lcov', output_file=self.output_file, **self._common_kwargs())
@dataclass
@@ -475,7 +484,7 @@ class State:
'text':
lambda: TextReport(**params),
'lcov':
lambda: LcovReport(**params),
lambda: LcovReport(**params, output_file=self.report_dir / 'lcov.info'),
'summary':
lambda: SummaryReport(**params),
'github':

View File

@@ -1621,6 +1621,8 @@ class NeonPageserver(PgProtocol):
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed, retrying in .*: queue is in state Stopped.*",
]
def start(

View File

@@ -158,6 +158,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"threshold": "23h",
},
"evictions_low_residence_duration_metric_threshold": "2days",
"gc_feedback": True,
"gc_horizon": 23 * (1024 * 1024),
"gc_period": "2h 13m",
"image_creation_threshold": 7,

View File

@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*could not load tenant.*load local timeline.*",
".*load failed.*load local timeline.*",
]
)

View File

@@ -140,7 +140,7 @@ def test_remote_storage_backup_and_restore(
# This is before the failures injected by test_remote_failures, so it's a permanent error.
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
env.pageserver.allowed_errors.append(
".*error attaching tenant: storage-sync-list-remote-timelines",
".*attach failed.*: storage-sync-list-remote-timelines",
)
# Attach it. This HTTP request will succeed and launch a
# background task to load the tenant. In that background task,

View File

@@ -647,7 +647,9 @@ def test_ignored_tenant_stays_broken_without_metadata(
metadata_removed = True
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*")
env.pageserver.allowed_errors.append(
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
)
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
pageserver_http.tenant_load(tenant_id=tenant_id)

View File

@@ -22,6 +22,7 @@ from fixtures.neon_fixtures import (
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -308,9 +309,7 @@ def test_pageserver_with_empty_tenants(
env.pageserver.allowed_errors.append(
".*marking .* as locally complete, while it doesnt exist in remote index.*"
)
env.pageserver.allowed_errors.append(
".*could not load tenant.*Failed to list timelines directory.*"
)
env.pageserver.allowed_errors.append(".*load failed.*list timelines directory.*")
client = env.pageserver.http_client()
@@ -341,9 +340,15 @@ def test_pageserver_with_empty_tenants(
env.pageserver.start()
client = env.pageserver.http_client()
tenants = client.tenant_list()
assert len(tenants) == 2
def not_loading():
tenants = client.tenant_list()
assert len(tenants) == 2
assert all(t["state"]["slug"] != "Loading" for t in tenants)
wait_until(10, 0.2, not_loading)
tenants = client.tenant_list()
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
assert (
@@ -355,7 +360,7 @@ def test_pageserver_with_empty_tenants(
broken_tenant_status["state"]["slug"] == "Broken"
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*")
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
assert (

View File

@@ -371,7 +371,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
# make the second call and assert behavior
log.info("second call start")
error_msg_re = "another task is already setting the deleted_flag, started at"
error_msg_re = "timeline deletion is already in progress"
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
assert second_call_err.value.status_code == 500