Compare commits

..

1 Commits

Author SHA1 Message Date
Sasha Krassovsky
e6ed2b8368 Add RFC 2023-05-25 15:07:37 -07:00
41 changed files with 725 additions and 1225 deletions

View File

@@ -407,7 +407,9 @@ jobs:
uses: ./.github/actions/allure-report-generate
- uses: actions/github-script@v6
if: ${{ !cancelled() }}
if: >
!cancelled() &&
github.event_name == 'pull_request'
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
@@ -417,7 +419,7 @@ jobs:
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
}
const script = require("./scripts/comment-test-report.js")
const script = require("./scripts/pr-comment-test-report.js")
await script({
github,
context,
@@ -492,24 +494,19 @@ jobs:
env:
COMMIT_URL: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.event.pull_request.head.sha || github.sha }}
run: |
scripts/coverage --dir=/tmp/coverage \
report \
scripts/coverage \
--dir=/tmp/coverage report \
--input-objects=/tmp/coverage/binaries.list \
--commit-url=${COMMIT_URL} \
--format=github
scripts/coverage --dir=/tmp/coverage \
report \
--input-objects=/tmp/coverage/binaries.list \
--format=lcov
- name: Upload coverage report
id: upload-coverage-report
env:
BUCKET: neon-github-public-dev
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://${BUCKET}/code-coverage/${COMMIT_SHA}
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://neon-github-public-dev/code-coverage/${COMMIT_SHA}
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/code-coverage/${COMMIT_SHA}/index.html
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
@@ -802,7 +799,7 @@ jobs:
- name: Build vm image
run: |
./vm-builder -enable-file-cache -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
./vm-builder -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
- name: Pushing vm-compute-node image
run: |

View File

@@ -370,10 +370,6 @@ impl PageServerNode {
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
};
// If tenant ID was not specified, generate one
let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
let request = models::TenantCreateRequest {
new_tenant_id,
config,
@@ -499,9 +495,6 @@ impl PageServerNode {
ancestor_timeline_id: Option<TimelineId>,
pg_version: Option<u32>,
) -> anyhow::Result<TimelineInfo> {
// If timeline ID was not specified, generate one
let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate());
self.http_request(
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),

View File

@@ -1,14 +1,6 @@
#!/bin/bash
set -eux
# Generate a random tenant or timeline ID
#
# Takes a variable name as argument. The result is stored in that variable.
generate_id() {
local -n resvar=$1
printf -v resvar '%08x%08x%08x%08x' $SRANDOM $SRANDOM $SRANDOM $SRANDOM
}
PG_VERSION=${PG_VERSION:-14}
SPEC_FILE_ORG=/var/db/postgres/specs/spec.json
@@ -21,29 +13,29 @@ done
echo "Page server is ready."
echo "Create a tenant and timeline"
generate_id tenant_id
PARAMS=(
-sb
-X POST
-H "Content-Type: application/json"
-d "{\"new_tenant_id\": \"${tenant_id}\"}"
-d "{}"
http://pageserver:9898/v1/tenant/
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
tenant_id=$(curl "${PARAMS[@]}" | sed 's/"//g')
generate_id timeline_id
PARAMS=(
-sb
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${timeline_id}\", \"pg_version\": ${PG_VERSION}}"
-d "{\"tenant_id\":\"${tenant_id}\", \"pg_version\": ${PG_VERSION}}"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
echo "Overwrite tenant id and timeline id in spec file"
tenant_id=$(echo ${result} | jq -r .tenant_id)
timeline_id=$(echo ${result} | jq -r .timeline_id)
sed "s/TENANT_ID/${tenant_id}/" ${SPEC_FILE_ORG} > ${SPEC_FILE}
sed -i "s/TIMELINE_ID/${timeline_id}/" ${SPEC_FILE}

View File

@@ -0,0 +1,64 @@
# Ingress-Egress Metrics
# Goals
* Track ingress/egress traffic directly from a compute node, which may be sending stuff due to
a custom extension.
* Lay the groundwork for further traffic management on the compute node.
# Background
As we move towards allowing custom extensions on the compute node, we need to cope with the fact
that these extensions are untrusted. They could send or receive things from the open internet or
even try to send bogus requests around on the internal network. Although we currently use the proxy
to measure incoming/outgoing traffic, custom extensions will potentially be able to interact with
the outside without involving the proxy. This has a high potential for abuse, so we need some ability
to measure and lock down usage. This RFC concerns itself with the first part (measuring).
# Possible Implementations
* Service Mesh: There are lots of different implementations, but typically they have a
proxy (proxy-per-node) that all traffic within the cluster goes through, and this proxy(-ies)
forwards metrics to the control plane. This seems more complicated than we need because:
- We only really need this for our compute nodes (not safekeepers and pageservers)
- We control our compute nodes and have full power to customize it (which afaict is not an
assumption that most service meshes can make)
- We already have a control plane, having two is more trouble than it's worth
- Having all traffic go through yet another proxy will add some amount of latency
- "You wanted a banana but what you got was a gorilla holding the banana and the entire jungle."
* eBPF Program: This option essentially implements what we actually need from the service mesh
in its minimal form. The program will have very minimal overhead and will be run on each
compute node. It has the advantages of being small, fully-customizable, and performant. One
thing to consider is that it's not well-supported on MacOS, but given that we always run
everything in Linux docker containers this doesn't seem like an issue.
# In Detail
The eBPF program itself will be written in C and compiled with the eBPF backend as part of the
Compute Image. The program will be loaded into the kernel by `compute_ctl` just before
it starts Postgres. The program will construct an eBPF ArrayMap that contains two elements:
ingress bytes and egress bytes.
```C
struct
{
__uint(type, BPF_MAP_TYPE_ARRAY);
__type(key, u32);
__type(value, u64);
__uint(max_entries, 2);
} ingress_egress_counter_map SEC(".maps");
```
On the kernel side, the eBPF program will intercept all packets,
check the IP address to see if it is going in/outside of the network, and if so atomically increment
the corresponding counter. On the userspace side, `compute_ctl` will `mmap` this array of counters
(it isn't explicitly documented anywhere that you can do this, but I found
[this patch](https://lwn.net/Articles/804180) that lets you mmap. Once per minute, `compute_ctl`
will `atomic_exchange` these counters with `0`, wrap them up in a JSON, and send them to the
control plane, in much the same format as in 021-metering.md.
```json
{
"metric" : "compute_ingress_bytes",
"endpoint_id" : "super-gigachad-117",
"event_start_time" : ...,
"event_stop_time" : ...,
"value" : ...
}
```

View File

@@ -18,29 +18,7 @@ use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
/// The state of a tenant in this pageserver.
///
/// ```mermaid
/// stateDiagram-v2
///
/// [*] --> Loading: spawn_load()
/// [*] --> Attaching: spawn_attach()
///
/// Loading --> Activating: activate()
/// Attaching --> Activating: activate()
/// Activating --> Active: infallible
///
/// Loading --> Broken: load() failure
/// Attaching --> Broken: attach() failure
///
/// Active --> Stopping: set_stopping(), part of shutdown & detach
/// Stopping --> Broken: late error in remove_tenant_from_memory
///
/// Broken --> [*]: ignore / detach / shutdown
/// Stopping --> [*]: remove_from_memory complete
///
/// Active --> Broken: cfg(testing)-only tenant break point
/// ```
/// A state of a tenant in pageserver's memory.
#[derive(
Clone,
PartialEq,
@@ -48,63 +26,40 @@ use bytes::{BufMut, Bytes, BytesMut};
serde::Serialize,
serde::Deserialize,
strum_macros::Display,
strum_macros::EnumString,
strum_macros::EnumVariantNames,
strum_macros::AsRefStr,
strum_macros::IntoStaticStr,
)]
#[serde(tag = "slug", content = "data")]
pub enum TenantState {
/// This tenant is being loaded from local disk.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
/// This tenant is being loaded from local disk
Loading,
/// This tenant is being attached to the pageserver.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
/// This tenant is being downloaded from cloud storage.
Attaching,
/// The tenant is transitioning from Loading/Attaching to Active.
///
/// While in this state, the individual timelines are being activated.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Activating(ActivatingFrom),
/// The tenant has finished activating and is open for business.
///
/// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
/// Tenant is fully operational
Active,
/// The tenant is recognized by pageserver, but it is being detached or the
/// A tenant is recognized by pageserver, but it is being detached or the
/// system is being shut down.
///
/// Transitions out of this state are possible through `set_broken()`.
Stopping,
/// The tenant is recognized by the pageserver, but can no longer be used for
/// any operations.
///
/// If the tenant fails to load or attach, it will transition to this state
/// and it is guaranteed that no background tasks are running in its name.
///
/// The other way to transition into this state is from `Stopping` state
/// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
/// if the cleanup future executed by `remove_tenant_from_memory()` fails.
/// A tenant is recognized by the pageserver, but can no longer be used for
/// any operations, because it failed to be activated.
Broken { reason: String, backtrace: String },
}
impl TenantState {
pub fn attachment_status(&self) -> TenantAttachmentStatus {
use TenantAttachmentStatus::*;
// Below TenantState::Activating is used as "transient" or "transparent" state for
// attachment_status determining.
match self {
// The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
// So, technically, we can return Attached here.
// However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
// But, our attach task might still be fetching the remote timelines, etc.
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
Self::Attaching => Maybe,
// tenant mgr startup distinguishes attaching from loading via marker file.
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
Self::Loading => Attached,
// We only reach Active after successful load / attach.
// So, call atttachment status Attached.
Self::Active => Attached,
@@ -143,15 +98,6 @@ impl std::fmt::Debug for TenantState {
}
}
/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ActivatingFrom {
/// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
Loading,
/// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
Attaching,
}
/// A state of a timeline in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TimelineState {
@@ -172,8 +118,9 @@ pub enum TimelineState {
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub new_timeline_id: TimelineId,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub new_timeline_id: Option<TimelineId>,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
@@ -184,11 +131,12 @@ pub struct TimelineCreateRequest {
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub new_tenant_id: TenantId,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub new_tenant_id: Option<TenantId>,
#[serde(flatten)]
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
}
@@ -236,10 +184,10 @@ pub struct StatusResponse {
}
impl TenantCreateRequest {
pub fn new(new_tenant_id: TenantId) -> TenantCreateRequest {
pub fn new(new_tenant_id: Option<TenantId>) -> TenantCreateRequest {
TenantCreateRequest {
new_tenant_id,
config: TenantConfig::default(),
..Default::default()
}
}
}
@@ -883,55 +831,4 @@ mod tests {
err
);
}
#[test]
fn tenantstatus_activating_serde() {
let states = [
TenantState::Activating(ActivatingFrom::Loading),
TenantState::Activating(ActivatingFrom::Attaching),
];
let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
let actual = serde_json::to_string(&states).unwrap();
assert_eq!(actual, expected);
let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
assert_eq!(states.as_slice(), &parsed);
}
#[test]
fn tenantstatus_activating_strum() {
// tests added, because we use these for metrics
let examples = [
(line!(), TenantState::Loading, "Loading"),
(line!(), TenantState::Attaching, "Attaching"),
(
line!(),
TenantState::Activating(ActivatingFrom::Loading),
"Activating",
),
(
line!(),
TenantState::Activating(ActivatingFrom::Attaching),
"Activating",
),
(line!(), TenantState::Active, "Active"),
(line!(), TenantState::Stopping, "Stopping"),
(
line!(),
TenantState::Broken {
reason: "Example".into(),
backtrace: "Looooong backtrace".into(),
},
"Broken",
),
];
for (line, rendered, expected) in examples {
let actual: &'static str = rendered.into();
assert_eq!(actual, expected, "example on {line}");
}
}
}

View File

@@ -1,33 +0,0 @@
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///
/// Can be cloned, moved and kept around in futures as "guard objects".
#[derive(Clone)]
pub struct Completion(mpsc::Sender<()>);
/// Barrier will wait until all clones of [`Completion`] have been dropped.
#[derive(Clone)]
pub struct Barrier(Arc<Mutex<mpsc::Receiver<()>>>);
impl Barrier {
pub async fn wait(self) {
self.0.lock().await.recv().await;
}
pub async fn maybe_wait(barrier: Option<Barrier>) {
if let Some(b) = barrier {
b.wait().await
}
}
}
/// Create new Guard and Barrier pair.
pub fn channel() -> (Completion, Barrier) {
let (tx, rx) = mpsc::channel::<()>(1);
let rx = Mutex::new(rx);
let rx = Arc::new(rx);
(Completion(tx), Barrier(rx))
}

View File

@@ -1,5 +1,5 @@
use crate::auth::{Claims, JwtAuth};
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
use crate::http::error;
use anyhow::{anyhow, Context};
use hyper::header::{HeaderName, AUTHORIZATION};
use hyper::http::HeaderValue;
@@ -16,6 +16,8 @@ use std::future::Future;
use std::net::TcpListener;
use std::str::FromStr;
use super::error::ApiError;
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"libmetrics_metric_handler_requests_total",
@@ -33,12 +35,8 @@ struct RequestId(String);
/// Adds a tracing info_span! instrumentation around the handler events,
/// logs the request start and end events for non-GET requests and non-200 responses.
///
/// Usage: Replace `my_handler` with `|r| request_span(r, my_handler)`
///
/// Use this to distinguish between logs of different HTTP requests: every request handler wrapped
/// with this will get request info logged in the wrapping span, including the unique request ID.
///
/// This also handles errors, logging them and converting them to an HTTP error response.
/// in this type will get request info logged in the wrapping span, including the unique request ID.
///
/// There could be other ways to implement similar functionality:
///
@@ -56,56 +54,60 @@ struct RequestId(String);
/// tries to achive with its `.instrument` used in the current approach.
///
/// If needed, a declarative macro to substitute the |r| ... closure boilerplate could be introduced.
pub async fn request_span<R, H>(request: Request<Body>, handler: H) -> R::Output
pub struct RequestSpan<E, R, H>(pub H)
where
R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
R: Future<Output = Result<Response<Body>, E>> + Send + 'static,
H: Fn(Request<Body>) -> R + Send + Sync + 'static;
impl<E, R, H> RequestSpan<E, R, H>
where
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
R: Future<Output = Result<Response<Body>, E>> + Send + 'static,
H: Fn(Request<Body>) -> R + Send + Sync + 'static,
{
let request_id = request.context::<RequestId>().unwrap_or_default().0;
let method = request.method();
let path = request.uri().path();
let request_span = info_span!("request", %method, %path, %request_id);
/// Creates a tracing span around inner request handler and executes the request handler in the contex of that span.
/// Use as `|r| RequestSpan(my_handler).handle(r)` instead of `my_handler` as the request handler to get the span enabled.
pub async fn handle(self, request: Request<Body>) -> Result<Response<Body>, E> {
let request_id = request.context::<RequestId>().unwrap_or_default().0;
let method = request.method();
let path = request.uri().path();
let request_span = info_span!("request", %method, %path, %request_id);
let log_quietly = method == Method::GET;
async move {
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
if log_quietly {
debug!("Handling request");
} else {
info!("Handling request");
}
// No special handling for panics here. There's a `tracing_panic_hook` from another
// module to do that globally.
let res = handler(request).await;
cancellation_guard.disarm();
// Log the result if needed.
//
// We also convert any errors into an Ok response with HTTP error code here.
// `make_router` sets a last-resort error handler that would do the same, but
// we prefer to do it here, before we exit the request span, so that the error
// is still logged with the span.
//
// (Because we convert errors to Ok response, we never actually return an error,
// and we could declare the function to return the never type (`!`). However,
// using `routerify::RouterBuilder` requires a proper error type.)
match res {
Ok(response) => {
let response_status = response.status();
if log_quietly && response_status.is_success() {
debug!("Request handled, status: {response_status}");
} else {
info!("Request handled, status: {response_status}");
}
Ok(response)
let log_quietly = method == Method::GET;
async move {
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
if log_quietly {
debug!("Handling request");
} else {
info!("Handling request");
}
// Note that we reuse `error::handler` here and not returning and error at all,
// yet cannot use `!` directly in the method signature due to `routerify::RouterBuilder` limitation.
// Usage of the error handler also means that we expect only the `ApiError` errors to be raised in this call.
//
// Panics are not handled separately, there's a `tracing_panic_hook` from another module to do that globally.
let res = (self.0)(request).await;
cancellation_guard.disarm();
match res {
Ok(response) => {
let response_status = response.status();
if log_quietly && response_status.is_success() {
debug!("Request handled, status: {response_status}");
} else {
info!("Request handled, status: {response_status}");
}
Ok(response)
}
Err(e) => Ok(error::handler(e.into()).await),
}
Err(err) => Ok(api_error_handler(err)),
}
.instrument(request_span)
.await
}
.instrument(request_span)
.await
}
/// Drop guard to WARN in case the request was dropped before completion.
@@ -205,8 +207,10 @@ pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
.middleware(Middleware::post_with_info(
add_request_id_header_to_response,
))
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.err_handler(route_error_handler)
.get("/metrics", |r| {
RequestSpan(prometheus_metrics_handler).handle(r)
})
.err_handler(error::handler)
}
pub fn attach_openapi_ui(
@@ -216,14 +220,12 @@ pub fn attach_openapi_ui(
ui_mount_path: &'static str,
) -> RouterBuilder<hyper::Body, ApiError> {
router_builder
.get(spec_mount_path,
move |r| request_span(r, move |_| async move {
Ok(Response::builder().body(Body::from(spec)).unwrap())
})
)
.get(ui_mount_path,
move |r| request_span(r, move |_| async move {
Ok(Response::builder().body(Body::from(format!(r#"
.get(spec_mount_path, move |r| {
RequestSpan(move |_| async move { Ok(Response::builder().body(Body::from(spec)).unwrap()) })
.handle(r)
})
.get(ui_mount_path, move |r| RequestSpan( move |_| async move {
Ok(Response::builder().body(Body::from(format!(r#"
<!DOCTYPE html>
<html lang="en">
<head>
@@ -253,8 +255,7 @@ pub fn attach_openapi_ui(
</body>
</html>
"#, spec_mount_path))).unwrap())
})
)
}).handle(r))
}
fn parse_token(header_value: &str) -> Result<&str, ApiError> {

View File

@@ -83,24 +83,13 @@ impl HttpErrorBody {
}
}
pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
match err.downcast::<ApiError>() {
Ok(api_error) => api_error_handler(*api_error),
Err(other_error) => {
// We expect all the request handlers to return an ApiError, so this should
// not be reached. But just in case.
error!("Error processing HTTP request: {other_error:?}");
HttpErrorBody::response_from_msg_and_status(
other_error.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
)
}
}
}
pub async fn handler(err: routerify::RouteError) -> Response<Body> {
let api_error = err
.downcast::<ApiError>()
.expect("handler should always return api error");
pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
// Print a stack trace for Internal Server errors
if let ApiError::InternalServerError(_) = api_error {
if let ApiError::InternalServerError(_) = api_error.as_ref() {
error!("Error processing HTTP request: {api_error:?}");
} else {
error!("Error processing HTTP request: {api_error:#}");

View File

@@ -60,9 +60,6 @@ pub mod tracing_span_assert;
pub mod rate_limit;
/// Simple once-barrier and a guard which keeps barrier awaiting.
pub mod completion;
mod failpoint_macro_helpers {
/// use with fail::cfg("$name", "return(2000)")

View File

@@ -335,34 +335,13 @@ fn start_pageserver(
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
// All tenant load operations carry this while they are ongoing; it will be dropped once those
// operations finish either successfully or in some other manner. However, the initial load
// will be then done, and we can start the global background tasks.
let (init_done_tx, init_done_rx) = utils::completion::channel();
// Scan the local 'tenants/' directory and start loading the tenants
let init_started_at = std::time::Instant::now();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client.clone(),
remote_storage.clone(),
(init_done_tx, init_done_rx.clone()),
))?;
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx.clone();
async move {
init_done_rx.wait().await;
let elapsed = init_started_at.elapsed();
tracing::info!(
elapsed_millis = elapsed.as_millis(),
"Initial load completed."
);
}
});
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
// is still accessible even if background task is not configured as long as remote storage has
@@ -374,7 +353,6 @@ fn start_pageserver(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
init_done_rx.clone(),
)?;
}
@@ -412,7 +390,6 @@ fn start_pageserver(
);
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let init_done_rx = init_done_rx;
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
@@ -428,13 +405,6 @@ fn start_pageserver(
"consumption metrics collection",
true,
async move {
// first wait for initial load to complete before first iteration.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
init_done_rx.wait().await;
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,

View File

@@ -88,7 +88,6 @@
use crate::task_mgr::TaskKind;
// The main structure of this module, see module-level comment.
#[derive(Clone, Debug)]
pub struct RequestContext {
task_kind: TaskKind,
download_behavior: DownloadBehavior,
@@ -96,7 +95,7 @@ pub struct RequestContext {
/// Desired behavior if the operation requires an on-demand download
/// to proceed.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum DownloadBehavior {
/// Download the layer file. It can take a while.
Download,

View File

@@ -54,13 +54,12 @@ use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument};
use utils::completion;
use utils::serde_percent::Percent;
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{self, storage_layer::{PersistentLayer, RemoteLayerDesc}, Timeline},
tenant::{self, storage_layer::PersistentLayer, Timeline},
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -83,7 +82,6 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
init_done: completion::Barrier,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured");
@@ -100,9 +98,6 @@ pub fn launch_disk_usage_global_eviction_task(
"disk usage based eviction",
false,
async move {
// wait until initial load is complete, because we cannot evict from loading tenants.
init_done.wait().await;
disk_usage_eviction_task(
&state,
task_config,
@@ -329,7 +324,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
let mut batched: HashMap<_, Vec<Arc<RemoteLayerDesc>>> = HashMap::new();
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
let mut warned = None;
let mut usage_planned = usage_pre;
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
@@ -434,7 +429,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
layer: Arc<RemoteLayerDesc>,
layer: Arc<dyn PersistentLayer>,
last_activity_ts: SystemTime,
}

View File

@@ -678,8 +678,6 @@ paths:
application/json:
schema:
type: object
required:
- new_timeline_id
properties:
new_timeline_id:
type: string
@@ -938,8 +936,6 @@ components:
allOf:
- $ref: '#/components/schemas/TenantConfig'
- type: object
required:
- new_tenant_id
properties:
new_tenant_id:
type: string

View File

@@ -11,7 +11,7 @@ use storage_broker::BrokerClientChannel;
use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::http::endpoint::request_span;
use utils::http::endpoint::RequestSpan;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
@@ -301,7 +301,9 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_id))?;
let new_timeline_id = request_data.new_timeline_id;
let new_timeline_id = request_data
.new_timeline_id
.unwrap_or_else(TimelineId::generate);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
@@ -328,7 +330,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
Err(err) => Err(ApiError::InternalServerError(err)),
}
}
.instrument(info_span!("timeline_create", tenant = %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.await
}
@@ -762,8 +764,6 @@ pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>,
}
async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let target_tenant_id = request_data.new_tenant_id;
check_permission(&request, None)?;
let _timer = STORAGE_TIME_GLOBAL
@@ -771,10 +771,17 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
.expect("bug")
.start_timer();
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let tenant_conf =
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let target_tenant_id = request_data
.new_tenant_id
.map(TenantId::from)
.unwrap_or_else(TenantId::generate);
let state = get_state(&request);
@@ -859,7 +866,7 @@ async fn handle_tenant_break(r: Request<Body>) -> Result<Response<Body>, ApiErro
.await
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
tenant.set_broken("broken from test".to_owned()).await;
tenant.set_broken("broken from test".to_owned());
json_response(StatusCode::OK, ())
}
@@ -1179,7 +1186,7 @@ pub fn make_router(
#[cfg(not(feature = "testing"))]
let handler = cfg_disabled;
move |r| request_span(r, handler)
move |r| RequestSpan(handler).handle(r)
}};
}
@@ -1194,50 +1201,54 @@ pub fn make_router(
)
.context("Failed to initialize router state")?,
))
.get("/v1/status", |r| request_span(r, status_handler))
.get("/v1/status", |r| RequestSpan(status_handler).handle(r))
.put(
"/v1/failpoints",
testing_api!("manage failpoints", failpoints_handler),
)
.get("/v1/tenant", |r| request_span(r, tenant_list_handler))
.post("/v1/tenant", |r| request_span(r, tenant_create_handler))
.get("/v1/tenant/:tenant_id", |r| request_span(r, tenant_status))
.get("/v1/tenant", |r| RequestSpan(tenant_list_handler).handle(r))
.post("/v1/tenant", |r| {
RequestSpan(tenant_create_handler).handle(r)
})
.get("/v1/tenant/:tenant_id", |r| {
RequestSpan(tenant_status).handle(r)
})
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
request_span(r, tenant_size_handler)
RequestSpan(tenant_size_handler).handle(r)
})
.put("/v1/tenant/config", |r| {
request_span(r, update_tenant_config_handler)
RequestSpan(update_tenant_config_handler).handle(r)
})
.get("/v1/tenant/:tenant_id/config", |r| {
request_span(r, get_tenant_config_handler)
RequestSpan(get_tenant_config_handler).handle(r)
})
.get("/v1/tenant/:tenant_id/timeline", |r| {
request_span(r, timeline_list_handler)
RequestSpan(timeline_list_handler).handle(r)
})
.post("/v1/tenant/:tenant_id/timeline", |r| {
request_span(r, timeline_create_handler)
RequestSpan(timeline_create_handler).handle(r)
})
.post("/v1/tenant/:tenant_id/attach", |r| {
request_span(r, tenant_attach_handler)
RequestSpan(tenant_attach_handler).handle(r)
})
.post("/v1/tenant/:tenant_id/detach", |r| {
request_span(r, tenant_detach_handler)
RequestSpan(tenant_detach_handler).handle(r)
})
.post("/v1/tenant/:tenant_id/load", |r| {
request_span(r, tenant_load_handler)
RequestSpan(tenant_load_handler).handle(r)
})
.post("/v1/tenant/:tenant_id/ignore", |r| {
request_span(r, tenant_ignore_handler)
RequestSpan(tenant_ignore_handler).handle(r)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
request_span(r, timeline_detail_handler)
RequestSpan(timeline_detail_handler).handle(r)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
|r| request_span(r, get_lsn_by_timestamp_handler),
|r| RequestSpan(get_lsn_by_timestamp_handler).handle(r),
)
.put("/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc", |r| {
request_span(r, timeline_gc_handler)
RequestSpan(timeline_gc_handler).handle(r)
})
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/compact",
@@ -1249,34 +1260,34 @@ pub fn make_router(
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|r| request_span(r, timeline_download_remote_layers_handler_post),
|r| RequestSpan(timeline_download_remote_layers_handler_post).handle(r),
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|r| request_span(r, timeline_download_remote_layers_handler_get),
|r| RequestSpan(timeline_download_remote_layers_handler_get).handle(r),
)
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
request_span(r, timeline_delete_handler)
RequestSpan(timeline_delete_handler).handle(r)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {
request_span(r, layer_map_info_handler)
RequestSpan(layer_map_info_handler).handle(r)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
|r| request_span(r, layer_download_handler),
|r| RequestSpan(layer_download_handler).handle(r),
)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
|r| request_span(r, evict_timeline_layer_handler),
|r| RequestSpan(evict_timeline_layer_handler).handle(r),
)
.put("/v1/disk_usage_eviction/run", |r| {
request_span(r, disk_usage_eviction_run)
RequestSpan(disk_usage_eviction_run).handle(r)
})
.put(
"/v1/tenant/:tenant_id/break",
testing_api!("set tenant state to broken", handle_tenant_break),
)
.get("/v1/panic", |r| request_span(r, always_panic_handler))
.get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r))
.post(
"/v1/tracing/event",
testing_api!("emit a tracing event", post_tracing_event_handler),

View File

@@ -45,7 +45,6 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;
#[tracing::instrument]
pub async fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.

View File

@@ -20,7 +20,6 @@ use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tracing::*;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use std::cmp::min;
@@ -84,7 +83,6 @@ pub mod block_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_map;
pub mod layer_cache;
pub mod metadata;
mod par_fsync;
@@ -449,11 +447,6 @@ pub enum DeleteTimelineError {
Other(#[from] anyhow::Error),
}
pub enum SetStoppingError {
AlreadyStopping,
Broken,
}
struct RemoteStartupData {
index_part: IndexPart,
remote_metadata: TimelineMetadata,
@@ -652,17 +645,16 @@ impl Tenant {
"attach tenant",
false,
async move {
match tenant_clone.attach(&ctx).await {
Ok(()) => {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
}
let doit = async {
tenant_clone.attach(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
anyhow::Ok(())
};
match doit.await {
Ok(_) => {}
Err(e) => {
error!("attach failed, setting tenant state to Broken: {:?}", e);
tenant_clone.state.send_modify(|state| {
assert_eq!(*state, TenantState::Attaching, "the attach task owns the tenant state until activation is complete");
*state = TenantState::broken_from_reason(e.to_string());
});
tenant_clone.set_broken(e.to_string());
error!("error attaching tenant: {:?}", e);
}
}
Ok(())
@@ -679,8 +671,6 @@ impl Tenant {
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
/// No background tasks are started as part of this routine.
///
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
@@ -891,17 +881,14 @@ impl Tenant {
/// If the loading fails for some reason, the Tenant will go into Broken
/// state.
///
#[instrument(skip_all, fields(tenant_id=%tenant_id))]
#[instrument(skip(conf, remote_storage, ctx), fields(tenant_id=%tenant_id))]
pub fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: Option<(completion::Completion, completion::Barrier)>,
ctx: &RequestContext,
) -> Arc<Tenant> {
debug_assert_current_span_has_tenant_id();
let tenant_conf = match Self::load_tenant_config(conf, tenant_id) {
Ok(conf) => conf,
Err(e) => {
@@ -933,27 +920,20 @@ impl Tenant {
"initial tenant load",
false,
async move {
// keep the sender alive as long as we have the initial load ongoing; it will be
// None for loads spawned after init_tenant_mgr.
let (_tx, rx) = if let Some((tx, rx)) = init_done {
(Some(tx), Some(rx))
} else {
(None, None)
let doit = async {
tenant_clone.load(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
anyhow::Ok(())
};
match tenant_clone.load(&ctx).await {
Ok(()) => {
debug!("load finished, activating");
tenant_clone.activate(broker_client, rx.as_ref(), &ctx);
}
match doit.await {
Ok(()) => {}
Err(err) => {
error!("load failed, setting tenant state to Broken: {err:?}");
tenant_clone.state.send_modify(|state| {
assert_eq!(*state, TenantState::Loading, "the loading task owns the tenant state until activation is complete");
*state = TenantState::broken_from_reason(err.to_string());
});
tenant_clone.set_broken(err.to_string());
error!("could not load tenant {tenant_id}: {err:?}");
}
}
Ok(())
info!("initial load for tenant {tenant_id} finished!");
Ok(())
}
.instrument({
let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
@@ -962,6 +942,8 @@ impl Tenant {
}),
);
info!("spawned load into background");
tenant
}
@@ -969,11 +951,10 @@ impl Tenant {
/// Background task to load in-memory data structures for this tenant, from
/// files on disk. Used at pageserver startup.
///
/// No background tasks are started as part of this routine.
async fn load(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
debug!("loading tenant task");
info!("loading tenant task");
utils::failpoint_sleep_millis_async!("before-loading-tenant");
@@ -983,109 +964,102 @@ impl Tenant {
//
// Scan the directory, peek into the metadata file of each timeline, and
// collect a list of timelines and their ancestors.
let tenant_id = self.tenant_id;
let conf = self.conf;
let span = info_span!("blocking");
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
for entry in std::fs::read_dir(&timelines_dir).with_context(|| {
format!(
"Failed to list timelines directory for tenant {}",
self.tenant_id
)
})? {
let entry = entry.with_context(|| {
format!("cannot read timeline dir entry for {}", self.tenant_id)
})?;
let timeline_dir = entry.path();
let sorted_timelines: Vec<(_, _)> = tokio::task::spawn_blocking(move || {
let _g = span.entered();
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
let timelines_dir = conf.timelines_path(&tenant_id);
for entry in
std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")?
{
let entry = entry.context("read timeline dir entry")?;
let timeline_dir = entry.path();
if crate::is_temporary(&timeline_dir) {
info!(
"Found temporary timeline directory, removing: {}",
timeline_dir.display()
if crate::is_temporary(&timeline_dir) {
info!(
"Found temporary timeline directory, removing: {}",
timeline_dir.display()
);
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
error!(
"Failed to remove temporary directory '{}': {:?}",
timeline_dir.display(),
e
);
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
error!(
"Failed to remove temporary directory '{}': {:?}",
timeline_dir.display(),
e
);
}
} else if is_uninit_mark(&timeline_dir) {
let timeline_uninit_mark_file = &timeline_dir;
info!(
"Found an uninit mark file {}, removing the timeline and its uninit mark",
timeline_uninit_mark_file.display()
);
let timeline_id = timeline_uninit_mark_file
.file_stem()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
}
} else if is_uninit_mark(&timeline_dir) {
let timeline_uninit_mark_file = &timeline_dir;
info!(
"Found an uninit mark file {}, removing the timeline and its uninit mark",
timeline_uninit_mark_file.display()
);
let timeline_id = timeline_uninit_mark_file
.file_stem()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline uninit mark name {}",
timeline_uninit_mark_file.display()
)
})?;
let timeline_dir = conf.timeline_path(&timeline_id, &tenant_id);
})?;
let timeline_dir = self.conf.timeline_path(&timeline_id, &self.tenant_id);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
} else {
let timeline_id = timeline_dir
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline dir name {}",
timeline_dir.display()
)
})?;
let timeline_uninit_mark_file = self
.conf
.timeline_uninit_mark_file_path(self.tenant_id, timeline_id);
if timeline_uninit_mark_file.exists() {
info!(
"Found an uninit mark file for timeline {}/{}, removing the timeline and its uninit mark",
self.tenant_id, timeline_id
);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
} else {
let timeline_id = timeline_dir
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline dir name {}",
timeline_dir.display()
)
})?;
let timeline_uninit_mark_file =
conf.timeline_uninit_mark_file_path(tenant_id, timeline_id);
if timeline_uninit_mark_file.exists() {
info!(
%timeline_id,
"Found an uninit mark file, removing the timeline and its uninit mark",
);
if let Err(e) = remove_timeline_and_uninit_mark(
&timeline_dir,
&timeline_uninit_mark_file,
) {
error!("Failed to clean up uninit marked timeline: {e:?}");
}
continue;
}
continue;
}
let file_name = entry.file_name();
if let Ok(timeline_id) =
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
{
let metadata = load_metadata(conf, timeline_id, tenant_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
// A file or directory that doesn't look like a timeline ID
warn!(
"unexpected file or directory in timelines directory: {}",
file_name.to_string_lossy()
);
}
let file_name = entry.file_name();
if let Ok(timeline_id) =
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
{
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
// A file or directory that doesn't look like a timeline ID
warn!(
"unexpected file or directory in timelines directory: {}",
file_name.to_string_lossy()
);
}
}
}
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
tree_sort_timelines(timelines_to_load)
})
.await
.context("load spawn_blocking")
.and_then(|res| res)?;
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
let sorted_timelines = tree_sort_timelines(timelines_to_load)?;
// FIXME original collect_timeline_files contained one more check:
// 1. "Timeline has no ancestor and no layer files"
@@ -1095,7 +1069,7 @@ impl Tenant {
.with_context(|| format!("load local timeline {timeline_id}"))?;
}
trace!("Done");
info!("Done");
Ok(())
}
@@ -1462,11 +1436,7 @@ impl Tenant {
Ok(())
}
/// Shuts down a timeline's tasks, removes its in-memory structures, and deletes its
/// data from disk.
///
/// This doesn't currently delete all data from S3, but sets a flag in its
/// index_part.json file to mark it as deleted.
/// Removes timeline-related in-memory data
pub async fn delete_timeline(
&self,
timeline_id: TimelineId,
@@ -1476,11 +1446,7 @@ impl Tenant {
// Transition the timeline into TimelineState::Stopping.
// This should prevent new operations from starting.
//
// Also grab the Timeline's delete_lock to prevent another deletion from starting.
let timeline;
let mut delete_lock_guard;
{
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
// Ensure that there are no child timelines **attached to that pageserver**,
@@ -1498,36 +1464,20 @@ impl Tenant {
Entry::Vacant(_) => return Err(DeleteTimelineError::NotFound),
};
timeline = Arc::clone(timeline_entry.get());
// Prevent two tasks from trying to delete the timeline at the same time.
//
// XXX: We should perhaps return an HTTP "202 Accepted" to signal that the caller
// needs to poll until the operation has finished. But for now, we return an
// error, because the control plane knows to retry errors.
delete_lock_guard = timeline.delete_lock.try_lock().map_err(|_| {
DeleteTimelineError::Other(anyhow::anyhow!(
"timeline deletion is already in progress"
))
})?;
// If another task finished the deletion just before we acquired the lock,
// return success.
if *delete_lock_guard {
return Ok(());
}
let timeline = Arc::clone(timeline_entry.get());
timeline.set_state(TimelineState::Stopping);
drop(timelines);
}
timeline
};
// Now that the Timeline is in Stopping state, request all the related tasks to
// shut down.
//
// NB: If this fails half-way through, and is retried, the retry will go through
// all the same steps again. Make sure the code here is idempotent, and don't
// error out if some of the shutdown tasks have already been completed!
// NB: If you call delete_timeline multiple times concurrently, they will
// all go through the motions here. Make sure the code here is idempotent,
// and don't error out if some of the shutdown tasks have already been
// completed!
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
@@ -1568,10 +1518,6 @@ impl Tenant {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
//
// AlreadyInProgress shouldn't happen, because the 'delete_lock' prevents
// two tasks from performing the deletion at the same time. The first task
// that starts deletion should run it to completion.
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
return Err(DeleteTimelineError::Other(anyhow::anyhow!(e)));
@@ -1582,12 +1528,14 @@ impl Tenant {
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//
// This lock prevents prevents GC or compaction from running at the same time.
// The GC task doesn't register itself with the timeline it's operating on,
// so it might still be running even though we called `shutdown_tasks`.
// This lock prevents multiple concurrent delete_timeline calls from
// stepping on each other's toes, while deleting the files. It also
// prevents GC or compaction from running at the same time.
//
// Note that there are still other race conditions between
// GC, compaction and timeline deletion. See
// GC, compaction and timeline deletion. GC task doesn't
// register itself properly with the timeline it's
// operating on. See
// https://github.com/neondatabase/neon/issues/2671
//
// No timeout here, GC & Compaction should be responsive to the
@@ -1649,27 +1597,37 @@ impl Tenant {
});
// Remove the timeline from the map.
{
let mut timelines = self.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
timelines.remove(&timeline_id).expect(
"timeline that we were deleting was concurrently removed from 'timelines' map",
);
let mut timelines = self.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
// All done! Mark the deletion as completed and release the delete_lock
*delete_lock_guard = true;
drop(delete_lock_guard);
let removed_timeline = timelines.remove(&timeline_id);
if removed_timeline.is_none() {
// This can legitimately happen if there's a concurrent call to this function.
// T1 T2
// lock
// unlock
// lock
// unlock
// remove files
// lock
// remove from map
// unlock
// return
// remove files
// lock
// remove from map observes empty map
// unlock
// return
debug!("concurrent call to this function won the race");
}
drop(timelines);
Ok(())
}
@@ -1686,193 +1644,127 @@ impl Tenant {
fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
init_done: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
let mut activating = false;
let mut result = Ok(());
self.state.send_modify(|current_state| {
use pageserver_api::models::ActivatingFrom;
match &*current_state {
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
}
TenantState::Loading => {
*current_state = TenantState::Activating(ActivatingFrom::Loading);
}
TenantState::Attaching => {
*current_state = TenantState::Activating(ActivatingFrom::Attaching);
}
}
debug!(tenant_id = %self.tenant_id, "Activating tenant");
activating = true;
// Continue outside the closure. We need to grab timelines.lock()
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
});
if activating {
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self, init_done);
let mut activated_timelines = 0;
for timeline in not_broken_timelines {
timeline.activate(broker_client.clone(), ctx);
activated_timelines += 1;
}
self.state.send_modify(move |current_state| {
assert!(
matches!(current_state, TenantState::Activating(_)),
"set_stopping and set_broken wait for us to leave Activating state",
);
*current_state = TenantState::Active;
let elapsed = self.loading_started_at.elapsed();
let total_timelines = timelines_accessor.len();
// log a lot of stuff, because some tenants sometimes suffer from user-visible
// times to activate. see https://github.com/neondatabase/neon/issues/4025
info!(
since_creation_millis = elapsed.as_millis(),
tenant_id = %self.tenant_id,
activated_timelines,
total_timelines,
post_state = <&'static str>::from(&*current_state),
"activation attempt finished"
);
});
}
}
/// Change tenant status to Stopping, to mark that it is being shut down.
///
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
///
/// This function is not cancel-safe!
pub async fn set_stopping(&self) -> Result<(), SetStoppingError> {
let mut rx = self.state.subscribe();
// cannot stop before we're done activating, so wait out until we're done activating
rx.wait_for(|state| match state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
);
false
}
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true,
})
.await
.expect("cannot drop self.state while on a &self method");
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
let mut err = None;
let stopping = self.state.send_if_modified(|current_state| match current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
// are created after the transition to Stopping. That's harmless, as the Timelines
// won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
*current_state = TenantState::Stopping;
// Continue stopping outside the closure. We need to grab timelines.lock()
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
true
}
TenantState::Broken { reason, .. } => {
info!(
"Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
);
err = Some(SetStoppingError::Broken);
false
}
TenantState::Stopping => {
info!("Tenant is already in Stopping state");
err = Some(SetStoppingError::AlreadyStopping);
false
}
});
match (stopping, err) {
(true, None) => {} // continue
(false, Some(err)) => return Err(err),
(true, Some(_)) => unreachable!(
"send_if_modified closure must error out if not transitioning to Stopping"
),
(false, None) => unreachable!(
"send_if_modified closure must return true if transitioning to Stopping"
),
}
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Stopping);
}
Ok(())
}
/// Method for tenant::mgr to transition us into Broken state in case of a late failure in
/// `remove_tenant_from_memory`
///
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
///
/// In tests, we also use this to set tenants to Broken state on purpose.
pub(crate) async fn set_broken(&self, reason: String) {
let mut rx = self.state.subscribe();
// The load & attach routines own the tenant state until it has reached `Active`.
// So, wait until it's done.
rx.wait_for(|state| match state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
);
false
}
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true,
})
.await
.expect("cannot drop self.state while on a &self method");
// we now know we're done activating, let's see whether this task is the winner to transition into Broken
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
if cfg!(feature = "testing") {
warn!("Changing Active tenant to Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
} else {
unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
// activate() was called on an already Active tenant. Shouldn't happen.
result = Err(anyhow::anyhow!("Tenant is already active"));
}
TenantState::Broken { reason, .. } => {
// This shouldn't happen either
result = Err(anyhow::anyhow!(
"Could not activate tenant because it is in broken state due to: {reason}",
));
}
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
// loading or attaching the tenant.
info!("Tenant is already in Stopping state, skipping activation");
}
TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Active;
debug!(tenant_id = %self.tenant_id, "Activating tenant");
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self);
let mut activated_timelines = 0;
for timeline in not_broken_timelines {
timeline.activate(broker_client.clone(), ctx);
activated_timelines += 1;
}
let elapsed = self.loading_started_at.elapsed();
let total_timelines = timelines_accessor.len();
// log a lot of stuff, because some tenants sometimes suffer from user-visible
// times to activate. see https://github.com/neondatabase/neon/issues/4025
info!(
since_creation_millis = elapsed.as_millis(),
tenant_id = %self.tenant_id,
activated_timelines,
total_timelines,
post_state = <&'static str>::from(&*current_state),
"activation attempt finished"
);
}
}
});
result
}
/// Change tenant status to Stopping, to mark that it is being shut down
pub fn set_stopping(&self) {
self.state.send_modify(|current_state| {
match current_state {
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Stopping;
// FIXME: If the tenant is still Loading or Attaching, new timelines
// might be created after this. That's harmless, as the Timelines
// won't be accessible to anyone, when the Tenant is in Stopping
// state.
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Stopping);
}
}
TenantState::Broken { reason, .. } => {
info!("Cannot set tenant to Stopping state, it is in Broken state due to: {reason}");
}
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
// loading or attaching the tenant.
info!("Tenant is already in Stopping state");
}
}
});
}
pub fn set_broken(&self, reason: String) {
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Active => {
// Broken tenants can currently only used for fatal errors that happen
// while loading or attaching a tenant. A tenant that has already been
// activated should never be marked as broken. We cope with it the best
// we can, but it shouldn't happen.
warn!("Changing Active tenant to Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
}
TenantState::Broken { .. } => {
// This shouldn't happen either
warn!("Tenant is already in Broken state");
}
// This is the only "expected" path, any other path is a bug.
TenantState::Stopping => {
// This shouldn't happen either
warn!(
"Marking Stopping tenant as Broken state, reason: {}",
reason
);
*current_state = TenantState::broken_from_reason(reason);
}
}
TenantState::Loading | TenantState::Attaching => {
info!("Setting tenant as Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
}
}
});
}
@@ -1885,7 +1777,7 @@ impl Tenant {
loop {
let current_state = receiver.borrow_and_update().clone();
match current_state {
TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
TenantState::Loading | TenantState::Attaching => {
// in these states, there's a chance that we can reach ::Active
receiver.changed().await.map_err(
|_e: tokio::sync::watch::error::RecvError| {

View File

@@ -1,34 +0,0 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use super::storage_layer::{LayerFileName, PersistentLayer, RemoteLayerDesc};
pub struct LayerCache {
layers: Mutex<HashMap<LayerFileName, Arc<dyn PersistentLayer>>>,
}
impl LayerCache {
pub fn new() -> Self {
Self {
layers: Mutex::new(HashMap::new()),
}
}
pub fn get(&self, layer_fname: &LayerFileName) -> Option<Arc<dyn PersistentLayer>> {
let guard: std::sync::MutexGuard<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> =
self.layers.lock().unwrap();
guard.get(layer_fname).cloned()
}
pub fn contains(&self, layer_fname: &LayerFileName) -> bool {
let guard = self.layers.lock().unwrap();
guard.contains_key(layer_fname)
}
pub fn insert(&self, layer_fname: LayerFileName, persistent_layer: Arc<dyn PersistentLayer>) {
let mut guard = self.layers.lock().unwrap();
guard.insert(layer_fname, persistent_layer);
}
}

View File

@@ -61,7 +61,6 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement;
use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayer;
///
/// LayerMap tracks what layers exist on a timeline.
@@ -139,19 +138,24 @@ where
self.layer_map.remove_historic_noflush(layer)
}
/// Ensure the downloaded layer matches existing layer.
/// Replaces existing layer iff it is the `expected`.
///
/// Returned `Replacement` describes succeeding in checking or the reason why it could not
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
pub fn ensure_consistent(
&self,
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected: &Arc<L>,
new: Arc<dyn PersistentLayer>,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map
.ensure_consistent_noflush(expected, new)
self.layer_map.replace_historic_noflush(expected, new)
}
// We will flush on drop anyway, but this method makes it
@@ -305,16 +309,16 @@ where
}
}
pub(self) fn ensure_consistent_noflush(
&self,
pub(self) fn replace_historic_noflush(
&mut self,
expected: &Arc<L>,
new: Arc<dyn PersistentLayer>,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected);
let other = historic_layer_coverage::LayerKey::from(&*new);
let expected_l0 = Self::is_l0(expected);
let new_l0 = LayerMap::<dyn PersistentLayer>::is_l0(&*new);
let new_l0 = Self::is_l0(&new);
anyhow::ensure!(
key == other,
@@ -341,7 +345,17 @@ where
None
};
Ok(Replacement::Replaced { in_buffered: false })
let replaced = self.historic.replace(&key, new.clone(), |existing| {
Self::compare_arced_layers(existing, expected)
});
if let Replacement::Replaced { .. } = &replaced {
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new;
}
}
Ok(replaced)
}
/// Helper function for BatchedUpdates::drop.

View File

@@ -10,7 +10,6 @@ use tokio::fs;
use anyhow::Context;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tracing::*;
use remote_storage::GenericRemoteStorage;
@@ -20,12 +19,9 @@ use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
};
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::IGNORED_TENANT_FILE_NAME;
use utils::completion;
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
@@ -67,7 +63,6 @@ pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: (completion::Completion, completion::Barrier),
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
@@ -124,7 +119,6 @@ pub async fn init_tenant_mgr(
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
Some(init_done.clone()),
&ctx,
) {
Ok(tenant) => {
@@ -160,7 +154,6 @@ pub fn schedule_local_tenant_processing(
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: Option<(completion::Completion, completion::Barrier)>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
@@ -214,14 +207,7 @@ pub fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(
conf,
tenant_id,
broker_client,
remote_storage,
init_done,
ctx,
)
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx)
};
Ok(tenant)
}
@@ -236,7 +222,6 @@ pub fn schedule_local_tenant_processing(
/// That could be easily misinterpreted by control plane, the consumer of the
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
#[instrument]
pub async fn shutdown_all_tenants() {
// Prevent new tenants from being created.
let tenants_to_shut_down = {
@@ -259,65 +244,15 @@ pub async fn shutdown_all_tenants() {
}
};
// Set tenant (and its timlines) to Stoppping state.
//
// Since we can only transition into Stopping state after activation is complete,
// run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
//
// Transitioning tenants to Stopping state has a couple of non-obvious side effects:
// 1. Lock out any new requests to the tenants.
// 2. Signal cancellation to WAL receivers (we wait on it below).
// 3. Signal cancellation for other tenant background loops.
// 4. ???
//
// The waiting for the cancellation is not done uniformly.
// We certainly wait for WAL receivers to shut down.
// That is necessary so that no new data comes in before the freeze_and_flush.
// But the tenant background loops are joined-on in our caller.
// It's mesed up.
let mut join_set = JoinSet::new();
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
for (tenant_id, tenant) in tenants_to_shut_down {
join_set.spawn(
async move {
match tenant.set_stopping().await {
Ok(()) => debug!("tenant successfully stopped"),
Err(SetStoppingError::Broken) => {
info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well");
},
Err(SetStoppingError::AlreadyStopping) => {
// our task_mgr::shutdown_tasks are going to coalesce on that just fine
}
}
tenant
}
.instrument(info_span!("set_stopping", %tenant_id)),
);
}
let mut panicked = 0;
while let Some(res) = join_set.join_next().await {
match res {
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the futures");
}
Err(join_error) if join_error.is_panic() => {
// cannot really do anything, as this panic is likely a bug
panicked += 1;
}
Err(join_error) => {
warn!("unknown kind of JoinError: {join_error}");
}
Ok(tenant) => tenants_to_freeze_and_flush.push(tenant),
for (_, tenant) in tenants_to_shut_down {
if tenant.is_active() {
// updates tenant state, forbidding new GC and compaction iterations from starting
tenant.set_stopping();
tenants_to_freeze_and_flush.push(tenant);
}
}
if panicked > 0 {
warn!(panicked, "observed panicks while stopping tenants");
}
// Shut down all existing walreceiver connections and stop accepting the new ones.
task_mgr::shutdown_tasks(Some(TaskKind::WalReceiverManager), None, None).await;
@@ -329,30 +264,12 @@ pub async fn shutdown_all_tenants() {
// should be no more activity in any of the repositories.
//
// On error, log it but continue with the shutdown for other tenants.
let mut join_set = tokio::task::JoinSet::new();
for tenant in tenants_to_freeze_and_flush {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
join_set.spawn(
async move {
if let Err(err) = tenant.freeze_and_flush().await {
warn!("Could not checkpoint tenant during shutdown: {err:?}");
}
}
.instrument(info_span!("freeze_and_flush", %tenant_id)),
);
}
while let Some(next) = join_set.join_next().await {
match next {
Ok(()) => {}
Err(join_error) if join_error.is_cancelled() => {
unreachable!("no cancelling")
}
Err(join_error) if join_error.is_panic() => { /* reported already */ }
Err(join_error) => warn!("unknown kind of JoinError: {join_error}"),
if let Err(err) = tenant.freeze_and_flush().await {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
}
}
@@ -374,7 +291,7 @@ pub async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -520,7 +437,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -593,7 +510,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -672,23 +589,13 @@ where
{
let tenants_accessor = TENANTS.write().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
let tenant = Arc::clone(tenant);
// don't hold TENANTS lock while set_stopping waits for activation to finish
drop(tenants_accessor);
match tenant.set_stopping().await {
Ok(()) => {
// we won, continue stopping procedure
}
Err(SetStoppingError::Broken) => {
// continue the procedure, let's hope the closure can deal with broken tenants
}
Err(SetStoppingError::AlreadyStopping) => {
// the tenant is already stopping or broken, don't do anything
return Err(TenantStateError::IsStopping(tenant_id));
}
}
}
Some(tenant) => match tenant.current_state() {
TenantState::Attaching
| TenantState::Loading
| TenantState::Broken { .. }
| TenantState::Active => tenant.set_stopping(),
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
},
None => return Err(TenantStateError::NotFound(tenant_id)),
}
}
@@ -713,7 +620,7 @@ where
let tenants_accessor = TENANTS.read().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
tenant.set_broken(e.to_string()).await;
tenant.set_broken(e.to_string());
}
None => {
warn!("Tenant {tenant_id} got removed from memory");

View File

@@ -19,8 +19,14 @@ fn parallel_worker(paths: &[PathBuf], next_path_idx: &AtomicUsize) -> io::Result
Ok(())
}
fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
// TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
const PARALLEL_PATH_THRESHOLD: usize = 1;
if paths.len() <= PARALLEL_PATH_THRESHOLD {
for path in paths {
fsync_path(path)?;
}
return Ok(());
}
/// Use at most this number of threads.
/// Increasing this limit will
@@ -30,11 +36,11 @@ fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
let num_threads = paths.len().min(MAX_NUM_THREADS);
let next_path_idx = AtomicUsize::new(0);
std::thread::scope(|s| -> io::Result<()> {
crossbeam_utils::thread::scope(|s| -> io::Result<()> {
let mut handles = vec![];
// Spawn `num_threads - 1`, as the current thread is also a worker.
for _ in 1..num_threads {
handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
handles.push(s.spawn(|_| parallel_worker(paths, &next_path_idx)));
}
parallel_worker(paths, &next_path_idx)?;
@@ -45,41 +51,5 @@ fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
Ok(())
})
}
/// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
if paths.len() == 1 {
fsync_path(&paths[0])?;
return Ok(());
}
fsync_in_thread_pool(paths)
}
/// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current
/// execution thread. Otherwise, we will spawn_blocking and run it in tokio.
pub async fn par_fsync_async(paths: &[PathBuf]) -> io::Result<()> {
const MAX_CONCURRENT_FSYNC: usize = 64;
let mut next = paths.iter().peekable();
let mut js = tokio::task::JoinSet::new();
loop {
while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
let next = next.next().expect("just peeked");
let next = next.to_owned();
js.spawn_blocking(move || fsync_path(&next));
}
// now the joinset has been filled up, wait for next to complete
if let Some(res) = js.join_next().await {
res??;
} else {
// last item had already completed
assert!(
next.peek().is_none(),
"joinset emptied, we shouldn't have more work"
);
return Ok(());
}
}
.unwrap()
}

View File

@@ -37,7 +37,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use remote_layer::RemoteLayerDesc;
pub use remote_layer::RemoteLayer;
use super::layer_map::BatchedUpdates;
@@ -431,6 +431,14 @@ pub trait PersistentLayer: Layer {
/// Permanently remove this layer from disk.
fn delete_resident_layer_file(&self) -> Result<()>;
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
None
}
fn is_remote_layer(&self) -> bool {
false
}
/// Returns None if the layer file size is not known.
///
/// Should not change over the lifetime of the layer object because
@@ -442,6 +450,16 @@ pub trait PersistentLayer: Layer {
fn access_stats(&self) -> &LayerAccessStats;
}
pub fn downcast_remote_layer(
layer: &Arc<dyn PersistentLayer>,
) -> Option<std::sync::Arc<RemoteLayer>> {
if layer.is_remote_layer() {
Arc::clone(layer).downcast_remote_layer()
} else {
None
}
}
/// Holds metadata about a layer without any content. Used mostly for testing.
///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a

View File

@@ -30,7 +30,6 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -58,7 +57,7 @@ use utils::{
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, PathOrConf, RemoteLayerDesc,
LayerKeyIter, PathOrConf,
};
///
@@ -664,17 +663,6 @@ impl DeltaLayer {
&self.layer_name(),
)
}
/// Create layer descriptor for this image layer
pub fn layer_desc(&self) -> RemoteLayerDesc {
RemoteLayerDesc::new_delta(
self.tenant_id,
self.timeline_id,
&self.layer_name(),
&LayerFileMetadata::new(self.file_size()),
LayerAccessStats::empty_will_record_residence_event_later(),
)
}
}
/// A builder object for constructing a new delta layer.

View File

@@ -26,7 +26,6 @@ use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -54,7 +53,7 @@ use utils::{
};
use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf, RemoteLayerDesc};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
///
/// Header stored in the beginning of the file
@@ -465,17 +464,6 @@ impl ImageLayer {
&self.layer_name(),
)
}
/// Create layer descriptor for this image layer
pub fn layer_desc(&self) -> RemoteLayerDesc {
RemoteLayerDesc::new_img(
self.tenant_id,
self.timeline_id,
&self.layer_name(),
&LayerFileMetadata::new(self.file_size()),
LayerAccessStats::empty_will_record_residence_event_later(),
)
}
}
/// A builder object for constructing a new image layer.

View File

@@ -1,4 +1,4 @@
//! A RemoteLayerDesc is an in-memory placeholder for a layer file that exists
//! A RemoteLayer is an in-memory placeholder for a layer file that exists
//! in remote storage.
//!
use crate::config::PageServerConf;
@@ -25,19 +25,19 @@ use super::{
LayerResidenceStatus, PersistentLayer,
};
/// RemoteLayerDesc is a not yet downloaded [`ImageLayer`] or
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
/// [`crate::storage_layer::DeltaLayer`].
///
/// RemoteLayerDesc might be downloaded on-demand during operations which are
/// RemoteLayer might be downloaded on-demand during operations which are
/// allowed download remote layers and during which, it gets replaced with a
/// concrete `DeltaLayer` or `ImageLayer`.
///
/// See: [`crate::context::RequestContext`] for authorization to download
pub struct RemoteLayerDesc {
pub(crate) tenantid: TenantId,
pub(crate) timelineid: TimelineId,
pub(crate) key_range: Range<Key>,
pub(crate) lsn_range: Range<Lsn>,
pub struct RemoteLayer {
tenantid: TenantId,
timelineid: TimelineId,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
pub file_name: LayerFileName,
@@ -54,7 +54,7 @@ pub struct RemoteLayerDesc {
/// Has `LayerMap::replace` failed for this (true) or not (false).
///
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
/// The field is used to mark a RemoteLayerDesc permanently (until restart or ignore+load)
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load)
/// unprocessable, because a LayerMap::replace failed.
///
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
@@ -63,9 +63,9 @@ pub struct RemoteLayerDesc {
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
}
impl std::fmt::Debug for RemoteLayerDesc {
impl std::fmt::Debug for RemoteLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteLayerDesc")
f.debug_struct("RemoteLayer")
.field("file_name", &self.file_name)
.field("layer_metadata", &self.layer_metadata)
.field("is_incremental", &self.is_incremental)
@@ -73,7 +73,7 @@ impl std::fmt::Debug for RemoteLayerDesc {
}
}
impl Layer for RemoteLayerDesc {
impl Layer for RemoteLayer {
fn get_key_range(&self) -> Range<Key> {
self.key_range.clone()
}
@@ -119,7 +119,7 @@ impl Layer for RemoteLayerDesc {
}
}
impl PersistentLayer for RemoteLayerDesc {
impl PersistentLayer for RemoteLayer {
fn get_tenant_id(&self) -> TenantId {
self.tenantid
}
@@ -160,6 +160,14 @@ impl PersistentLayer for RemoteLayerDesc {
bail!("remote layer has no layer file");
}
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
Some(self)
}
fn is_remote_layer(&self) -> bool {
true
}
fn file_size(&self) -> u64 {
self.layer_metadata.file_size()
}
@@ -193,15 +201,15 @@ impl PersistentLayer for RemoteLayerDesc {
}
}
impl RemoteLayerDesc {
impl RemoteLayer {
pub fn new_img(
tenantid: TenantId,
timelineid: TimelineId,
fname: &ImageFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayerDesc {
RemoteLayerDesc {
) -> RemoteLayer {
RemoteLayer {
tenantid,
timelineid,
key_range: fname.key_range.clone(),
@@ -222,8 +230,8 @@ impl RemoteLayerDesc {
fname: &DeltaFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayerDesc {
RemoteLayerDesc {
) -> RemoteLayer {
RemoteLayer {
tenantid,
timelineid,
key_range: fname.key_range.clone(),

View File

@@ -12,9 +12,8 @@ use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::completion;
pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completion::Barrier>) {
pub fn start_background_loops(tenant: &Arc<Tenant>) {
let tenant_id = tenant.tenant_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
@@ -25,9 +24,7 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
compaction_loop(tenant)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
@@ -44,9 +41,7 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
gc_loop(tenant)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;

View File

@@ -35,7 +35,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
LayerAccessStats, LayerFileName, RemoteLayerDesc,
LayerAccessStats, LayerFileName, RemoteLayer,
};
use crate::tenant::{
ephemeral_file::is_ephemeral_file,
@@ -77,7 +77,6 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::layer_cache::LayerCache;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
@@ -120,7 +119,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(super) layers: RwLock<LayerMap<RemoteLayerDesc>>,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -196,9 +195,8 @@ pub struct Timeline {
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
/// and [`Tenant::delete_timeline`].
pub(super) layer_removal_cs: tokio::sync::Mutex<()>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
@@ -237,13 +235,7 @@ pub struct Timeline {
state: watch::Sender<TimelineState>,
/// Prevent two tasks from deleting the timeline at the same time. If held, the
/// timeline is being deleted. If 'true', the timeline has already been deleted.
pub delete_lock: tokio::sync::Mutex<bool>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
layer_cache: Arc<LayerCache>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -677,7 +669,7 @@ impl Timeline {
}
/// Outermost timeline compaction operation; downloads needed layers.
pub async fn compact(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
pub async fn compact(&self, ctx: &RequestContext) -> anyhow::Result<()> {
const ROUNDS: usize = 2;
let last_record_lsn = self.get_last_record_lsn();
@@ -766,7 +758,7 @@ impl Timeline {
}
/// Compaction which might need to be retried after downloading remote layers.
async fn compact_inner(self: &Arc<Self>, ctx: &RequestContext) -> Result<(), CompactionError> {
async fn compact_inner(&self, ctx: &RequestContext) -> Result<(), CompactionError> {
//
// High level strategy for compaction / image creation:
//
@@ -801,7 +793,7 @@ impl Timeline {
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
let layer_removal_cs = self.layer_removal_cs.lock().await;
// Is the timeline being deleted?
let state = *self.state.borrow();
if state == TimelineState::Stopping {
@@ -835,7 +827,7 @@ impl Timeline {
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
self.compact_level0(&layer_removal_cs, target_file_size, ctx)
.await?;
timer.stop_and_record();
}
@@ -1010,22 +1002,20 @@ impl Timeline {
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(remote_layer_desc) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
if self.layer_cache.contains(&remote_layer_desc.filename()) {
return Ok(Some(false));
}
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
if self.remote_client.is_none() {
return Ok(Some(false));
}
self.download_remote_layer(remote_layer_desc).await?;
self.download_remote_layer(remote_layer).await?;
Ok(Some(true))
}
/// Like [`evict_layer_batch`], but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let remote_client = self
.remote_client
.as_ref()
@@ -1052,7 +1042,7 @@ impl Timeline {
pub async fn evict_layers(
&self,
_: &GenericRemoteStorage,
layers_to_evict: &[Arc<RemoteLayerDesc>],
layers_to_evict: &[Arc<dyn PersistentLayer>],
cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
let remote_client = self.remote_client.clone().expect(
@@ -1087,7 +1077,7 @@ impl Timeline {
async fn evict_layer_batch(
&self,
remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Arc<RemoteLayerDesc>],
layers_to_evict: &[Arc<dyn PersistentLayer>],
cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
// ensure that the layers have finished uploading
@@ -1136,12 +1126,12 @@ impl Timeline {
fn evict_layer_batch_impl(
&self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
local_layer: &Arc<RemoteLayerDesc>,
batch_updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
local_layer: &Arc<dyn PersistentLayer>,
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
) -> anyhow::Result<bool> {
use super::layer_map::Replacement;
if !self.layer_cache.contains(&local_layer.filename()) {
if local_layer.is_remote_layer() {
// TODO(issue #3851): consider returning an err here instead of false,
// which is the same out the match later
return Ok(false);
@@ -1168,7 +1158,7 @@ impl Timeline {
let layer_metadata = LayerFileMetadata::new(layer_file_size);
let new_remote_layer = Arc::new(match local_layer.filename() {
LayerFileName::Image(image_name) => RemoteLayerDesc::new_img(
LayerFileName::Image(image_name) => RemoteLayer::new_img(
self.tenant_id,
self.timeline_id,
&image_name,
@@ -1177,7 +1167,7 @@ impl Timeline {
.access_stats()
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
),
LayerFileName::Delta(delta_name) => RemoteLayerDesc::new_delta(
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
&delta_name,
@@ -1188,7 +1178,6 @@ impl Timeline {
),
});
/*
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
Replacement::Replaced { .. } => {
if let Err(e) = local_layer.delete_resident_layer_file() {
@@ -1239,10 +1228,8 @@ impl Timeline {
false
}
};
*/
// Ok(replaced)
Ok(true)
Ok(replaced)
}
}
@@ -1426,9 +1413,6 @@ impl Timeline {
eviction_task_timeline_state: tokio::sync::Mutex::new(
EvictionTaskTimelineState::default(),
),
delete_lock: tokio::sync::Mutex::new(false),
layer_cache: Arc::new(LayerCache::new()),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -1575,12 +1559,9 @@ impl Timeline {
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
);
let remote_desc = layer.layer_desc();
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
self.layer_cache.insert(layer.filename(), Arc::new(layer));
updates.insert_historic(Arc::new(remote_desc));
updates.insert_historic(Arc::new(layer));
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1612,9 +1593,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
let remote_desc = layer.layer_desc();
self.layer_cache.insert(layer.filename(), Arc::new(layer));
updates.insert_historic(Arc::new(remote_desc));
updates.insert_historic(Arc::new(layer));
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1659,9 +1638,9 @@ impl Timeline {
async fn create_remote_layers(
&self,
index_part: &IndexPart,
local_layers: HashMap<LayerFileName, Arc<RemoteLayerDesc>>,
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>,
up_to_date_disk_consistent_lsn: Lsn,
) -> anyhow::Result<HashMap<LayerFileName, Arc<RemoteLayerDesc>>> {
) -> anyhow::Result<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> {
// Are we missing some files that are present in remote storage?
// Create RemoteLayer instances for them.
let mut local_only_layers = local_layers;
@@ -1740,7 +1719,7 @@ impl Timeline {
continue;
}
let remote_layer = RemoteLayerDesc::new_img(
let remote_layer = RemoteLayer::new_img(
self.tenant_id,
self.timeline_id,
imgfilename,
@@ -1768,7 +1747,7 @@ impl Timeline {
);
continue;
}
let remote_layer = RemoteLayerDesc::new_delta(
let remote_layer = RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
deltafilename,
@@ -1925,7 +1904,6 @@ impl Timeline {
// no cancellation here, because nothing really waits for this to complete compared
// to spawn_ondemand_logical_size_calculation.
let cancel = CancellationToken::new();
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
.await
@@ -2174,7 +2152,7 @@ impl Timeline {
}
}
fn find_layer_desc(&self, layer_file_name: &str) -> Option<Arc<RemoteLayerDesc>> {
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
@@ -2190,11 +2168,11 @@ impl Timeline {
fn delete_historic_layer(
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Arc<RemoteLayerDesc>,
updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
) -> anyhow::Result<()> {
if self.layer_cache.contains(&layer.filename()) {
if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?;
let layer_file_size = layer.file_size();
self.metrics
@@ -2443,7 +2421,13 @@ impl Timeline {
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
// If it's a remote layer, download it and retry.
if let Some(layer) = self.layer_cache.get(&layer.filename()) {
if let Some(remote_layer) =
super::storage_layer::downcast_remote_layer(&layer)
{
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this layer.
remote_layer // download happens outside the scope of `layers` guard object
} else {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
@@ -2466,10 +2450,6 @@ impl Timeline {
}),
));
continue 'outer;
} else {
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this layer.
layer // download happens outside the scope of `layers` guard object
}
} else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
@@ -2652,7 +2632,7 @@ impl Timeline {
/// Layer flusher task's main loop.
async fn flush_loop(
self: &Arc<Self>,
&self,
mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
ctx: &RequestContext,
) {
@@ -2741,9 +2721,9 @@ impl Timeline {
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
#[instrument(skip(self, frozen_layer, ctx), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
async fn flush_frozen_layer(
self: &Arc<Self>,
&self,
frozen_layer: Arc<InMemoryLayer>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -2763,16 +2743,7 @@ impl Timeline {
.await?
} else {
// normal case, write out a L0 delta layer file.
let this = self.clone();
let frozen_layer = frozen_layer.clone();
let span = tracing::info_span!("blocking");
let (delta_path, metadata) = tokio::task::spawn_blocking(move || {
let _g = span.entered();
this.create_delta_layer(&frozen_layer)
})
.await
.context("create_delta_layer spawn_blocking")
.and_then(|res| res)?;
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?;
HashMap::from([(delta_path, metadata)])
};
@@ -2876,7 +2847,7 @@ impl Timeline {
// Write out the given frozen in-memory layer as a new L0 delta file
fn create_delta_layer(
self: &Arc<Self>,
&self,
frozen_layer: &InMemoryLayer,
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
// Write it out
@@ -2892,13 +2863,10 @@ impl Timeline {
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
// files to flush, it might be better to first write them all, and then fsync
// them all in parallel.
// First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace
// this with a single fsync in future refactors.
par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?;
// Then sync the parent directory.
par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
.context("fsync of timeline dir")?;
par_fsync::par_fsync(&[
new_delta_path.clone(),
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
])?;
// Add it to the layer map
let l = Arc::new(new_delta);
@@ -2909,8 +2877,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
batch_updates.insert_historic(Arc::new(l.layer_desc()));
self.layer_cache.insert(l.filename(), l);
batch_updates.insert_historic(l);
batch_updates.flush();
// update the timeline's physical size
@@ -3123,15 +3090,11 @@ impl Timeline {
let all_paths = image_layers
.iter()
.map(|layer| layer.path())
.chain(std::iter::once(
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
))
.collect::<Vec<_>>();
par_fsync::par_fsync_async(&all_paths)
.await
.context("fsync of newly created layer files")?;
par_fsync::par_fsync_async(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
.await
.context("fsync of timeline dir")?;
par_fsync::par_fsync(&all_paths).context("fsync of newly created layer files")?;
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
@@ -3156,9 +3119,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(Arc::new(l.layer_desc()));
let x: Arc<dyn PersistentLayer> = l;
self.layer_cache.insert(x.filename(), x)
updates.insert_historic(l);
}
updates.flush();
drop(layers);
@@ -3171,7 +3132,7 @@ impl Timeline {
#[derive(Default)]
struct CompactLevel0Phase1Result {
new_layers: Vec<DeltaLayer>,
deltas_to_compact: Vec<Arc<RemoteLayerDesc>>,
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>,
}
/// Top-level failure to compact.
@@ -3181,7 +3142,7 @@ enum CompactionError {
///
/// This should not happen repeatedly, but will be retried once by top-level
/// `Timeline::compact`.
DownloadRequired(Vec<Arc<RemoteLayerDesc>>),
DownloadRequired(Vec<Arc<RemoteLayer>>),
/// Compaction cannot be done right now; page reconstruction and so on.
Other(anyhow::Error),
}
@@ -3198,9 +3159,9 @@ impl Timeline {
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
/// start of level0 files compaction, the on-demand download should be revisited as well.
fn compact_level0_phase1(
async fn compact_level0_phase1(
&self,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
@@ -3253,9 +3214,13 @@ impl Timeline {
let remotes = deltas_to_compact
.iter()
.filter(|l| !self.layer_cache.contains(&l.filename()))
.filter(|l| l.is_remote_layer())
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
.cloned()
.map(|l| {
l.clone()
.downcast_remote_layer()
.expect("just checked it is remote layer")
})
.collect::<Vec<_>>();
if !remotes.is_empty() {
@@ -3509,13 +3474,13 @@ impl Timeline {
if !new_layers.is_empty() {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// also sync the directory
layer_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
.context("fsync of timeline dir")?;
layer_paths.pop().unwrap();
}
@@ -3532,26 +3497,17 @@ impl Timeline {
/// as Level 1 files.
///
async fn compact_level0(
self: &Arc<Self>,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
&self,
layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
let this = self.clone();
let ctx_inner = ctx.clone();
let layer_removal_cs_inner = layer_removal_cs.clone();
let span = tracing::info_span!("blocking");
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
} = tokio::task::spawn_blocking(move || {
let _g = span.entered();
this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner)
})
.await
.context("compact_level0_phase1 spawn_blocking")
.map_err(CompactionError::Other)
.and_then(|res| res)?;
} = self
.compact_level0_phase1(layer_removal_cs, target_file_size, ctx)
.await?;
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do
@@ -3595,15 +3551,13 @@ impl Timeline {
.add(metadata.len());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let remote_desc = l.layer_desc();
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
x.access_stats().record_residence_event(
&updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(Arc::new(remote_desc));
self.layer_cache.insert(x.filename(), x)
updates.insert_historic(x);
}
// Now that we have reshuffled the data to set of new delta layers, we can
@@ -3611,7 +3565,7 @@ impl Timeline {
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact {
layer_names_to_delete.push(l.filename());
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?;
self.delete_historic_layer(layer_removal_cs, l, &mut updates)?;
}
updates.flush();
drop(layers);
@@ -3731,7 +3685,7 @@ impl Timeline {
fail_point!("before-timeline-gc");
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
let layer_removal_cs = self.layer_removal_cs.lock().await;
// Is the timeline being deleted?
let state = *self.state.borrow();
if state == TimelineState::Stopping {
@@ -3751,7 +3705,7 @@ impl Timeline {
let res = self
.gc_timeline(
layer_removal_cs.clone(),
&layer_removal_cs,
horizon_cutoff,
pitr_cutoff,
retain_lsns,
@@ -3770,7 +3724,7 @@ impl Timeline {
async fn gc_timeline(
&self,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
horizon_cutoff: Lsn,
pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>,
@@ -3943,11 +3897,7 @@ impl Timeline {
{
for doomed_layer in layers_to_remove {
layer_names_to_delete.push(doomed_layer.filename());
self.delete_historic_layer(
layer_removal_cs.clone(),
doomed_layer,
&mut updates,
)?; // FIXME: schedule succeeded deletions before returning?
self.delete_historic_layer(layer_removal_cs, doomed_layer, &mut updates)?; // FIXME: schedule succeeded deletions before returning?
result.layers_removed += 1;
}
}
@@ -4076,7 +4026,7 @@ impl Timeline {
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
pub async fn download_remote_layer(
&self,
remote_layer: Arc<RemoteLayerDesc>,
remote_layer: Arc<RemoteLayer>,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -4133,12 +4083,10 @@ impl Timeline {
// Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().unwrap();
let mut updates = layers.batch_update();
let new_layer =
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
/*
let failure = match updates.replace_historic(&l, new_layer) {
Ok(Replacement::Replaced { .. }) => false,
Ok(Replacement::NotFound) => {
@@ -4193,9 +4141,8 @@ impl Timeline {
remote_layer
.download_replacement_failure
.store(true, Relaxed);
} */
}
}
updates.flush();
drop(layers);
@@ -4208,10 +4155,7 @@ impl Timeline {
remote_layer.ongoing_download.close();
} else {
// Keep semaphore open. We'll drop the permit at the end of the function.
error!(
"layer file download failed: {:?}",
result.as_ref().unwrap_err()
);
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
}
// Don't treat it as an error if the task that triggered the download
@@ -4225,8 +4169,7 @@ impl Timeline {
drop(permit);
Ok(())
}
.in_current_span(),
}.in_current_span(),
);
receiver.await.context("download task cancelled")?
@@ -4299,7 +4242,7 @@ impl Timeline {
let layers = self.layers.read().unwrap();
layers
.iter_historic_layers()
.filter(|l| !self.layer_cache.contains(&l.filename()))
.filter_map(|l| l.downcast_remote_layer())
.map(|l| self.download_remote_layer(l))
.for_each(|dl| downloads.push(dl))
}
@@ -4374,7 +4317,7 @@ pub struct DiskUsageEvictionInfo {
}
pub struct LocalLayerInfoForDiskUsageEviction {
pub layer: Arc<RemoteLayerDesc>,
pub layer: Arc<dyn PersistentLayer>,
pub last_activity_ts: SystemTime,
}
@@ -4408,7 +4351,7 @@ impl Timeline {
let file_size = l.file_size();
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
if !self.layer_cache.contains(&l.filename()) {
if l.is_remote_layer() {
continue;
}

View File

@@ -29,7 +29,7 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::{PersistentLayer, RemoteLayerDesc},
storage_layer::PersistentLayer,
LogicalSizeCalculationCause, Tenant,
},
};
@@ -184,11 +184,11 @@ impl Timeline {
// NB: all the checks can be invalidated as soon as we release the layer map lock.
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<RemoteLayerDesc>> = {
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().unwrap();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
if !self.layer_cache.contains(&hist_layer.filename()) {
if hist_layer.is_remote_layer() {
continue;
}

View File

@@ -19,10 +19,8 @@ use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use crate::metrics::BROKER_ITERATION_TIMELINES;
use crate::metrics::BROKER_PULLED_UPDATES;
use crate::metrics::BROKER_PUSHED_UPDATES;
use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
@@ -63,14 +61,8 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
BROKER_PUSHED_UPDATES.inc();
}
let elapsed = now.elapsed();
BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
BROKER_ITERATION_TIMELINES.observe(active_tlis.len() as f64);
if elapsed > push_interval / 2 {
info!("broker push is too long, pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
}
// Log duration every second. Should be about 10MB of logs per day.
info!("pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
sleep(push_interval).await;
}
};

View File

@@ -125,25 +125,6 @@ pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_backup_errors_total counter")
});
pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_broker_push_update_seconds",
"Seconds to push all timeline updates to the broker",
DISK_WRITE_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
});
pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
];
pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_broker_iteration_timelines",
"Count of timelines pushed to the broker in a single iteration",
TIMELINES_COUNT_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
});
pub const LABEL_UNKNOWN: &str = "unknown";

View File

@@ -156,9 +156,7 @@ class LLVM:
profdata: Path,
objects: List[str],
sources: List[str],
demangler: Optional[Path] = None,
output_file: Optional[Path] = None,
) -> None:
demangler: Optional[Path] = None) -> None:
cwd = self.cargo.cwd
objects = list(intersperse('-object', objects))
@@ -182,18 +180,14 @@ class LLVM:
*objects,
*sources,
]
if output_file is not None:
with output_file.open('w') as outfile:
subprocess.check_call(cmd, cwd=cwd, stdout=outfile)
else:
subprocess.check_call(cmd, cwd=cwd)
subprocess.check_call(cmd, cwd=cwd)
def cov_report(self, **kwargs) -> None:
self._cov(subcommand='report', **kwargs)
def cov_export(self, *, kind: str, output_file: Optional[Path], **kwargs) -> None:
def cov_export(self, *, kind: str, **kwargs) -> None:
extras = (f'-format={kind}', )
self._cov(subcommand='export', *extras, output_file=output_file, **kwargs)
self._cov(subcommand='export', *extras, **kwargs)
def cov_show(self, *, kind: str, output_dir: Optional[Path] = None, **kwargs) -> None:
extras = [f'-format={kind}']
@@ -289,12 +283,9 @@ class TextReport(Report):
self.llvm.cov_show(kind='text', **self._common_kwargs())
@dataclass
class LcovReport(Report):
output_file: Path
def generate(self) -> None:
self.llvm.cov_export(kind='lcov', output_file=self.output_file, **self._common_kwargs())
self.llvm.cov_export(kind='lcov', **self._common_kwargs())
@dataclass
@@ -484,7 +475,7 @@ class State:
'text':
lambda: TextReport(**params),
'lcov':
lambda: LcovReport(**params, output_file=self.report_dir / 'lcov.info'),
lambda: LcovReport(**params),
'summary':
lambda: SummaryReport(**params),
'github':

View File

@@ -1,5 +1,5 @@
//
// The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch.
// The script parses Allure reports and posts a comment with a summary of the test results to the PR.
//
// The comment is updated on each run with the latest results.
//
@@ -7,7 +7,7 @@
// - uses: actions/github-script@v6
// with:
// script: |
// const script = require("./scripts/comment-test-report.js")
// const script = require("./scripts/pr-comment-test-report.js")
// await script({
// github,
// context,
@@ -35,12 +35,8 @@ class DefaultMap extends Map {
module.exports = async ({ github, context, fetch, report }) => {
// Marker to find the comment in the subsequent runs
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
// If we run the script in the PR or in the branch (main/release/...)
const isPullRequest = !!context.payload.pull_request
// Latest commit in PR or in the branch
const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha
// Let users know that the comment is updated automatically
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${commitSha} at ${new Date().toISOString()} :recycle:</sub></div>`
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${context.payload.pull_request.head.sha} at ${new Date().toISOString()} :recycle:</sub></div>`
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
const githubActionsBotId = 41898282
// Commend body itself
@@ -170,39 +166,22 @@ module.exports = async ({ github, context, fetch, report }) => {
commentBody += autoupdateNotice
let createCommentFn, listCommentsFn, updateCommentFn, issueNumberOrSha
if (isPullRequest) {
createCommentFn = github.rest.issues.createComment
listCommentsFn = github.rest.issues.listComments
updateCommentFn = github.rest.issues.updateComment
issueNumberOrSha = {
issue_number: context.payload.number,
}
} else {
updateCommentFn = github.rest.repos.updateCommitComment
listCommentsFn = github.rest.repos.listCommentsForCommit
createCommentFn = github.rest.repos.createCommitComment
issueNumberOrSha = {
commit_sha: commitSha,
}
}
const { data: comments } = await listCommentsFn({
...issueNumberOrSha,
const { data: comments } = await github.rest.issues.listComments({
issue_number: context.payload.number,
...ownerRepoParams,
})
const comment = comments.find(comment => comment.user.id === githubActionsBotId && comment.body.startsWith(startMarker))
if (comment) {
await updateCommentFn({
await github.rest.issues.updateComment({
comment_id: comment.id,
body: commentBody,
...ownerRepoParams,
})
} else {
await createCommentFn({
await github.rest.issues.createComment({
issue_number: context.payload.number,
body: commentBody,
...issueNumberOrSha,
...ownerRepoParams,
})
}

View File

@@ -1621,8 +1621,6 @@ class NeonPageserver(PgProtocol):
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed, retrying in .*: queue is in state Stopped.*",
]
def start(

View File

@@ -155,14 +155,14 @@ class PageserverHttpClient(requests.Session):
return res_json
def tenant_create(
self, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None
self, new_tenant_id: Optional[TenantId] = None, conf: Optional[Dict[str, Any]] = None
) -> TenantId:
if conf is not None:
assert "new_tenant_id" not in conf.keys()
res = self.post(
f"http://localhost:{self.port}/v1/tenant",
json={
"new_tenant_id": str(new_tenant_id),
"new_tenant_id": str(new_tenant_id) if new_tenant_id else None,
**(conf or {}),
},
)
@@ -293,13 +293,13 @@ class PageserverHttpClient(requests.Session):
self,
pg_version: PgVersion,
tenant_id: TenantId,
new_timeline_id: TimelineId,
new_timeline_id: Optional[TimelineId] = None,
ancestor_timeline_id: Optional[TimelineId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
**kwargs,
) -> Dict[Any, Any]:
body: Dict[str, Any] = {
"new_timeline_id": str(new_timeline_id),
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
"ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None,
"ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None,
}

View File

@@ -3,7 +3,7 @@ from contextlib import closing
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol
from fixtures.pageserver.http import PageserverApiException
from fixtures.types import TenantId, TimelineId
from fixtures.types import TenantId
def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
@@ -25,19 +25,21 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
ps.safe_psql("set FOO", password=tenant_token)
ps.safe_psql("set FOO", password=pageserver_token)
new_timeline_id = env.neon_cli.create_branch(
"test_pageserver_auth", tenant_id=env.initial_tenant
)
# tenant can create branches
tenant_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
ancestor_timeline_id=new_timeline_id,
)
# console can create branches for tenant
pageserver_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
ancestor_timeline_id=new_timeline_id,
)
# fail to create branch using token with different tenant_id
@@ -47,19 +49,18 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
invalid_tenant_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
ancestor_timeline_id=new_timeline_id,
)
# create tenant using management token
pageserver_http_client.tenant_create(TenantId.generate())
pageserver_http_client.tenant_create()
# fail to create tenant using tenant token
with pytest.raises(
PageserverApiException,
match="Forbidden: Attempt to access management api with tenant scope. Permission denied",
):
tenant_http_client.tenant_create(TenantId.generate())
tenant_http_client.tenant_create()
def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):

View File

@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*load failed.*load local timeline.*",
".*could not load tenant.*load local timeline.*",
]
)

View File

@@ -4,12 +4,21 @@ from typing import Any, Dict, List, Optional, Tuple, Type
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import VanillaPostgres
from fixtures.neon_fixtures import (
PortDistributor,
VanillaPostgres,
)
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@pytest.fixture(scope="session")
def httpserver_listen_address(port_distributor: PortDistributor):
port = port_distributor.get_port()
return ("localhost", port)
def handle_db(dbs, roles, operation):
if operation["op"] == "set":
if "old_name" in operation and operation["old_name"] in dbs:

View File

@@ -228,6 +228,7 @@ def proxy_with_metric_collector(
@pytest.mark.asyncio
async def test_proxy_metric_collection(
httpserver: HTTPServer,
httpserver_listen_address,
proxy_with_metric_collector: NeonProxy,
vanilla_pg: VanillaPostgres,
):

View File

@@ -140,7 +140,7 @@ def test_remote_storage_backup_and_restore(
# This is before the failures injected by test_remote_failures, so it's a permanent error.
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
env.pageserver.allowed_errors.append(
".*attach failed.*: storage-sync-list-remote-timelines",
".*error attaching tenant: storage-sync-list-remote-timelines",
)
# Attach it. This HTTP request will succeed and launch a
# background task to load the tenant. In that background task,

View File

@@ -647,9 +647,7 @@ def test_ignored_tenant_stays_broken_without_metadata(
metadata_removed = True
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
env.pageserver.allowed_errors.append(
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
)
env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*")
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
pageserver_http.tenant_load(tenant_id=tenant_id)

View File

@@ -22,7 +22,6 @@ from fixtures.neon_fixtures import (
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -309,26 +308,27 @@ def test_pageserver_with_empty_tenants(
env.pageserver.allowed_errors.append(
".*marking .* as locally complete, while it doesnt exist in remote index.*"
)
env.pageserver.allowed_errors.append(".*load failed.*list timelines directory.*")
env.pageserver.allowed_errors.append(
".*could not load tenant.*Failed to list timelines directory.*"
)
client = env.pageserver.http_client()
tenant_with_empty_timelines = TenantId.generate()
client.tenant_create(tenant_with_empty_timelines)
temp_timelines = client.timeline_list(tenant_with_empty_timelines)
tenant_with_empty_timelines_dir = client.tenant_create()
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
for temp_timeline in temp_timelines:
client.timeline_delete(
tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines) / "timelines"
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
)
)
assert (
files_in_timelines_dir == 0
), f"Tenant {tenant_with_empty_timelines} should have an empty timelines/ directory"
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
# Trigger timeline re-initialization after pageserver restart
env.endpoints.stop_all()
@@ -340,16 +340,10 @@ def test_pageserver_with_empty_tenants(
env.pageserver.start()
client = env.pageserver.http_client()
def not_loading():
tenants = client.tenant_list()
assert len(tenants) == 2
assert all(t["state"]["slug"] != "Loading" for t in tenants)
wait_until(10, 0.2, not_loading)
tenants = client.tenant_list()
assert len(tenants) == 2
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
assert (
broken_tenant["state"]["slug"] == "Broken"
@@ -360,17 +354,17 @@ def test_pageserver_with_empty_tenants(
broken_tenant_status["state"]["slug"] == "Broken"
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*")
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines_dir)]
assert (
loaded_tenant["state"]["slug"] == "Active"
), "Tenant {tenant_with_empty_timelines} with empty timelines dir should be active and ready for timeline creation"
), "Tenant {tenant_with_empty_timelines_dir} with empty timelines dir should be active and ready for timeline creation"
loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines)
loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines_dir)
assert (
loaded_tenant_status["state"]["slug"] == "Active"
), f"Tenant {tenant_with_empty_timelines} without timelines dir should be active"
), f"Tenant {tenant_with_empty_timelines_dir} without timelines dir should be active"
time.sleep(1) # to allow metrics propagation
@@ -380,7 +374,7 @@ def test_pageserver_with_empty_tenants(
"state": "Broken",
}
active_tenants_metric_filter = {
"tenant_id": str(tenant_with_empty_timelines),
"tenant_id": str(tenant_with_empty_timelines_dir),
"state": "Active",
}
@@ -392,7 +386,7 @@ def test_pageserver_with_empty_tenants(
assert (
tenant_active_count == 1
), f"Tenant {tenant_with_empty_timelines} should have metric as active"
), f"Tenant {tenant_with_empty_timelines_dir} should have metric as active"
tenant_broken_count = int(
ps_metrics.query_one(

View File

@@ -371,7 +371,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
# make the second call and assert behavior
log.info("second call start")
error_msg_re = "timeline deletion is already in progress"
error_msg_re = "another task is already setting the deleted_flag, started at"
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
assert second_call_err.value.status_code == 500