mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 04:30:38 +00:00
Merge branch 'problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify' into problame/async-timeline-get/refactor-timeline-initialization-to-avoid-holding-tenants-timelines-lock
This commit is contained in:
6
.github/workflows/build_and_test.yml
vendored
6
.github/workflows/build_and_test.yml
vendored
@@ -407,9 +407,7 @@ jobs:
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
|
||||
- uses: actions/github-script@v6
|
||||
if: >
|
||||
!cancelled() &&
|
||||
github.event_name == 'pull_request'
|
||||
if: ${{ !cancelled() }}
|
||||
with:
|
||||
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
|
||||
retries: 5
|
||||
@@ -419,7 +417,7 @@ jobs:
|
||||
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
|
||||
}
|
||||
|
||||
const script = require("./scripts/pr-comment-test-report.js")
|
||||
const script = require("./scripts/comment-test-report.js")
|
||||
await script({
|
||||
github,
|
||||
context,
|
||||
|
||||
@@ -370,6 +370,10 @@ impl PageServerNode {
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
};
|
||||
|
||||
// If tenant ID was not specified, generate one
|
||||
let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
|
||||
|
||||
let request = models::TenantCreateRequest {
|
||||
new_tenant_id,
|
||||
config,
|
||||
@@ -495,6 +499,9 @@ impl PageServerNode {
|
||||
ancestor_timeline_id: Option<TimelineId>,
|
||||
pg_version: Option<u32>,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
// If timeline ID was not specified, generate one
|
||||
let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate());
|
||||
|
||||
self.http_request(
|
||||
Method::POST,
|
||||
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
|
||||
|
||||
@@ -1,6 +1,14 @@
|
||||
#!/bin/bash
|
||||
set -eux
|
||||
|
||||
# Generate a random tenant or timeline ID
|
||||
#
|
||||
# Takes a variable name as argument. The result is stored in that variable.
|
||||
generate_id() {
|
||||
local -n resvar=$1
|
||||
printf -v resvar '%08x%08x%08x%08x' $SRANDOM $SRANDOM $SRANDOM $SRANDOM
|
||||
}
|
||||
|
||||
PG_VERSION=${PG_VERSION:-14}
|
||||
|
||||
SPEC_FILE_ORG=/var/db/postgres/specs/spec.json
|
||||
@@ -13,29 +21,29 @@ done
|
||||
echo "Page server is ready."
|
||||
|
||||
echo "Create a tenant and timeline"
|
||||
generate_id tenant_id
|
||||
PARAMS=(
|
||||
-sb
|
||||
-X POST
|
||||
-H "Content-Type: application/json"
|
||||
-d "{}"
|
||||
-d "{\"new_tenant_id\": \"${tenant_id}\"}"
|
||||
http://pageserver:9898/v1/tenant/
|
||||
)
|
||||
tenant_id=$(curl "${PARAMS[@]}" | sed 's/"//g')
|
||||
result=$(curl "${PARAMS[@]}")
|
||||
echo $result | jq .
|
||||
|
||||
generate_id timeline_id
|
||||
PARAMS=(
|
||||
-sb
|
||||
-X POST
|
||||
-H "Content-Type: application/json"
|
||||
-d "{\"tenant_id\":\"${tenant_id}\", \"pg_version\": ${PG_VERSION}}"
|
||||
-d "{\"new_timeline_id\": \"${timeline_id}\", \"pg_version\": ${PG_VERSION}}"
|
||||
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline/"
|
||||
)
|
||||
result=$(curl "${PARAMS[@]}")
|
||||
echo $result | jq .
|
||||
|
||||
echo "Overwrite tenant id and timeline id in spec file"
|
||||
tenant_id=$(echo ${result} | jq -r .tenant_id)
|
||||
timeline_id=$(echo ${result} | jq -r .timeline_id)
|
||||
|
||||
sed "s/TENANT_ID/${tenant_id}/" ${SPEC_FILE_ORG} > ${SPEC_FILE}
|
||||
sed -i "s/TIMELINE_ID/${timeline_id}/" ${SPEC_FILE}
|
||||
|
||||
|
||||
@@ -55,19 +55,38 @@ use bytes::{BufMut, Bytes, BytesMut};
|
||||
)]
|
||||
#[serde(tag = "slug", content = "data")]
|
||||
pub enum TenantState {
|
||||
/// This tenant is being loaded from local disk
|
||||
/// This tenant is being loaded from local disk.
|
||||
///
|
||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||
Loading,
|
||||
/// This tenant is being attached to the pageserver.
|
||||
///
|
||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||
Attaching,
|
||||
/// The tenant is transitioning from Loading/Attaching to Active.
|
||||
///
|
||||
/// While in this state, the individual timelines are being activated.
|
||||
///
|
||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||
Activating,
|
||||
/// The tenant has finished activating and is open for business.
|
||||
///
|
||||
/// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
|
||||
Active,
|
||||
/// A tenant is recognized by pageserver, but it is being detached or the
|
||||
/// The tenant is recognized by pageserver, but it is being detached or the
|
||||
/// system is being shut down.
|
||||
///
|
||||
/// Transitions out of this state are possible through `set_broken()`.
|
||||
Stopping,
|
||||
/// A tenant is recognized by the pageserver, but can no longer be used for
|
||||
/// any operations, because it failed to be activated.
|
||||
/// The tenant is recognized by the pageserver, but can no longer be used for
|
||||
/// any operations.
|
||||
///
|
||||
/// If the tenant fails to load or attach, it will transition to this state
|
||||
/// and it is guaranteed that no background tasks are running in its name.
|
||||
///
|
||||
/// The other way to transition into this state is from `Stopping` state
|
||||
/// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
|
||||
/// if the cleanup future executed by `remove_tenant_from_memory()` fails.
|
||||
Broken { reason: String, backtrace: String },
|
||||
}
|
||||
|
||||
@@ -144,9 +163,8 @@ pub enum TimelineState {
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TimelineCreateRequest {
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub new_timeline_id: Option<TimelineId>,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub new_timeline_id: TimelineId,
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub ancestor_timeline_id: Option<TimelineId>,
|
||||
@@ -157,12 +175,11 @@ pub struct TimelineCreateRequest {
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantCreateRequest {
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub new_tenant_id: Option<TenantId>,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub new_tenant_id: TenantId,
|
||||
#[serde(flatten)]
|
||||
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
}
|
||||
@@ -210,10 +227,10 @@ pub struct StatusResponse {
|
||||
}
|
||||
|
||||
impl TenantCreateRequest {
|
||||
pub fn new(new_tenant_id: Option<TenantId>) -> TenantCreateRequest {
|
||||
pub fn new(new_tenant_id: TenantId) -> TenantCreateRequest {
|
||||
TenantCreateRequest {
|
||||
new_tenant_id,
|
||||
..Default::default()
|
||||
config: TenantConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -678,6 +678,8 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
required:
|
||||
- new_timeline_id
|
||||
properties:
|
||||
new_timeline_id:
|
||||
type: string
|
||||
@@ -936,6 +938,8 @@ components:
|
||||
allOf:
|
||||
- $ref: '#/components/schemas/TenantConfig'
|
||||
- type: object
|
||||
required:
|
||||
- new_tenant_id
|
||||
properties:
|
||||
new_tenant_id:
|
||||
type: string
|
||||
|
||||
@@ -301,9 +301,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let new_timeline_id = request_data
|
||||
.new_timeline_id
|
||||
.unwrap_or_else(TimelineId::generate);
|
||||
let new_timeline_id = request_data.new_timeline_id;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||
|
||||
@@ -330,7 +328,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
Err(err) => Err(ApiError::InternalServerError(err)),
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
|
||||
.instrument(info_span!("timeline_create", tenant = %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -764,6 +762,8 @@ pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>,
|
||||
}
|
||||
|
||||
async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||
let target_tenant_id = request_data.new_tenant_id;
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let _timer = STORAGE_TIME_GLOBAL
|
||||
@@ -771,17 +771,10 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
.expect("bug")
|
||||
.start_timer();
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||
|
||||
let tenant_conf =
|
||||
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
|
||||
|
||||
let target_tenant_id = request_data
|
||||
.new_tenant_id
|
||||
.map(TenantId::from)
|
||||
.unwrap_or_else(TenantId::generate);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
|
||||
@@ -394,6 +394,11 @@ pub enum DeleteTimelineError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
pub enum SetStoppingError {
|
||||
AlreadyStopping,
|
||||
Broken,
|
||||
}
|
||||
|
||||
struct RemoteStartupData {
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
@@ -1982,20 +1987,26 @@ impl Tenant {
|
||||
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
|
||||
///
|
||||
/// This function is not cancel-safe!
|
||||
pub async fn set_stopping(&self) {
|
||||
pub async fn set_stopping(&self) -> Result<(), SetStoppingError> {
|
||||
let mut rx = self.state.subscribe();
|
||||
|
||||
// cannot stop before we're done activating, so wait out until we're done activating
|
||||
rx.wait_for(|state| match state {
|
||||
TenantState::Activating | TenantState::Loading | TenantState::Attaching => false, // TODO log that we're waiting
|
||||
TenantState::Activating | TenantState::Loading | TenantState::Attaching => {
|
||||
info!(
|
||||
"waiting for {} to turn Active|Broken|Stopping",
|
||||
<&'static str>::from(state)
|
||||
);
|
||||
false
|
||||
}
|
||||
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true,
|
||||
})
|
||||
.await
|
||||
.expect("cannot drop self.state while on a &self method");
|
||||
|
||||
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
|
||||
let mut stopping = false;
|
||||
self.state.send_modify(|current_state| match current_state {
|
||||
let mut err = None;
|
||||
let stopping = self.state.send_if_modified(|current_state| match current_state {
|
||||
TenantState::Activating | TenantState::Loading | TenantState::Attaching => {
|
||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||
}
|
||||
@@ -2004,29 +2015,42 @@ impl Tenant {
|
||||
// are created after the transition to Stopping. That's harmless, as the Timelines
|
||||
// won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
|
||||
*current_state = TenantState::Stopping;
|
||||
stopping = true;
|
||||
// Continue stopping outside the closure. We need to grab timelines.lock()
|
||||
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
|
||||
true
|
||||
}
|
||||
TenantState::Broken { reason, .. } => {
|
||||
info!(
|
||||
"Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
|
||||
);
|
||||
err = Some(SetStoppingError::Broken);
|
||||
false
|
||||
}
|
||||
TenantState::Stopping => {
|
||||
info!("Tenant is already in Stopping state");
|
||||
err = Some(SetStoppingError::AlreadyStopping);
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
if stopping {
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let not_broken_timelines = timelines_accessor
|
||||
.values()
|
||||
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
|
||||
for timeline in not_broken_timelines {
|
||||
timeline.set_state(TimelineState::Stopping);
|
||||
}
|
||||
match (stopping, err) {
|
||||
(true, None) => {} // continue
|
||||
(false, Some(err)) => return Err(err),
|
||||
(true, Some(_)) => unreachable!(
|
||||
"send_if_modified closure must error out if not transitioning to Stopping"
|
||||
),
|
||||
(false, None) => unreachable!(
|
||||
"send_if_modified closure must return true if transitioning to Stopping"
|
||||
),
|
||||
}
|
||||
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let not_broken_timelines = timelines_accessor
|
||||
.values()
|
||||
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
|
||||
for timeline in not_broken_timelines {
|
||||
timeline.set_state(TimelineState::Stopping);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Method for tenant::mgr to transition us into Broken state in case of a late failure in
|
||||
@@ -2041,7 +2065,13 @@ impl Tenant {
|
||||
// The load & attach routines own the tenant state until it has reached `Active`.
|
||||
// So, wait until it's done.
|
||||
rx.wait_for(|state| match state {
|
||||
TenantState::Activating | TenantState::Loading | TenantState::Attaching => false, // TODO log that we're waiting
|
||||
TenantState::Activating | TenantState::Loading | TenantState::Attaching => {
|
||||
info!(
|
||||
"waiting for {} to turn Active|Broken|Stopping",
|
||||
<&'static str>::from(state)
|
||||
);
|
||||
false
|
||||
}
|
||||
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true,
|
||||
})
|
||||
.await
|
||||
|
||||
@@ -10,6 +10,7 @@ use tokio::fs;
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::*;
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
@@ -20,7 +21,8 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{
|
||||
create_tenant_files, CreateTenantFilesMode, Tenant, TenantState, TimelineLoadCause,
|
||||
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
|
||||
TimelineLoadCause,
|
||||
};
|
||||
use crate::IGNORED_TENANT_FILE_NAME;
|
||||
|
||||
@@ -250,11 +252,56 @@ pub async fn shutdown_all_tenants() {
|
||||
}
|
||||
};
|
||||
|
||||
// Set tenant (and its timlines) to Stoppping state.
|
||||
// Since we can only transition into Stopping state after activation is complete,
|
||||
// run it in a JoinSet so all tenants have a chance to stop before we git SIGKILLed.
|
||||
//
|
||||
// Transitioning tenants to Stopping state has a couple of non-obvious side effects:
|
||||
// 1. Lock out any new requests to the tenants.
|
||||
// 2. Signal cancellation to WAL receivers (we wait on it below).
|
||||
// 3. Signal cancellation for othher tenant background loops.
|
||||
// 4. ???
|
||||
//
|
||||
// The waiting for the cancellation is not done uniformly.
|
||||
// We certainly wait for WAL receivers to shut down.
|
||||
// That is necessary so that no new data comes in before the freeze_and_flush.
|
||||
// But the tenant background loops are joined-on in our caller.
|
||||
// It's mesed up.
|
||||
let mut join_set = JoinSet::new();
|
||||
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
|
||||
for (_, tenant) in tenants_to_shut_down {
|
||||
// updates tenant state, forbidding new GC and compaction iterations from starting
|
||||
tenant.set_stopping().await;
|
||||
tenants_to_freeze_and_flush.push(tenant);
|
||||
join_set.spawn(async move {
|
||||
match tenant.set_stopping().await {
|
||||
Ok(()) => Ok(tenant),
|
||||
Err(e) => Err((tenant, e)),
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = join_set.join_next().await {
|
||||
match res {
|
||||
Err(join_error) if join_error.is_cancelled() => {
|
||||
unreachable!("we are not cancelling any of the futures");
|
||||
}
|
||||
Err(join_error) => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
error!("task that calls set_stopping() panicked, don't know which tenant this is, and probably freeze_and_flush won't work anyways: {join_error:#}");
|
||||
}
|
||||
Ok(retval) => match retval {
|
||||
Ok(tenant) => {
|
||||
// success
|
||||
debug!("tenant successfully stopped: {}", tenant.tenant_id);
|
||||
tenants_to_freeze_and_flush.push(tenant);
|
||||
}
|
||||
// our task_mgr::shutdown_tasks are going to coalesce on that just fine
|
||||
Err((tenant, SetStoppingError::AlreadyStopping)) => {
|
||||
tenants_to_freeze_and_flush.push(tenant);
|
||||
}
|
||||
Err((tenant, SetStoppingError::Broken)) => {
|
||||
info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well: {}", tenant.tenant_id);
|
||||
tenants_to_freeze_and_flush.push(tenant);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Shut down all existing walreceiver connections and stop accepting the new ones.
|
||||
@@ -270,8 +317,9 @@ pub async fn shutdown_all_tenants() {
|
||||
// On error, log it but continue with the shutdown for other tenants.
|
||||
for tenant in tenants_to_freeze_and_flush {
|
||||
let tenant_id = tenant.tenant_id();
|
||||
debug!("shutdown tenant {tenant_id}");
|
||||
debug!("freeze_and_flush tenant {tenant_id}");
|
||||
|
||||
// TODO this could probably run in a JoinSet as well?
|
||||
if let Err(err) = tenant.freeze_and_flush().await {
|
||||
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
|
||||
}
|
||||
@@ -593,14 +641,23 @@ where
|
||||
{
|
||||
let tenants_accessor = TENANTS.write().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => match tenant.current_state() {
|
||||
TenantState::Attaching
|
||||
| TenantState::Loading
|
||||
| TenantState::Activating
|
||||
| TenantState::Broken { .. }
|
||||
| TenantState::Active => tenant.set_stopping().await,
|
||||
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
|
||||
},
|
||||
Some(tenant) => {
|
||||
let tenant = Arc::clone(tenant);
|
||||
// don't hold TENANTS lock while set_stopping waits for activation to finish
|
||||
drop(tenants_accessor);
|
||||
match tenant.set_stopping().await {
|
||||
Ok(()) => {
|
||||
// we won, continue stopping procedure
|
||||
}
|
||||
Err(SetStoppingError::Broken) => {
|
||||
// continue the procedure, let's hope the closure can deal with broken tenants
|
||||
}
|
||||
Err(SetStoppingError::AlreadyStopping) => {
|
||||
// the tenant is already stopping or broken, don't do anything
|
||||
return Err(TenantStateError::IsStopping(tenant_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
None => return Err(TenantStateError::NotFound(tenant_id)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
//
|
||||
// The script parses Allure reports and posts a comment with a summary of the test results to the PR.
|
||||
// The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch.
|
||||
//
|
||||
// The comment is updated on each run with the latest results.
|
||||
//
|
||||
@@ -7,7 +7,7 @@
|
||||
// - uses: actions/github-script@v6
|
||||
// with:
|
||||
// script: |
|
||||
// const script = require("./scripts/pr-comment-test-report.js")
|
||||
// const script = require("./scripts/comment-test-report.js")
|
||||
// await script({
|
||||
// github,
|
||||
// context,
|
||||
@@ -35,8 +35,12 @@ class DefaultMap extends Map {
|
||||
module.exports = async ({ github, context, fetch, report }) => {
|
||||
// Marker to find the comment in the subsequent runs
|
||||
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
|
||||
// If we run the script in the PR or in the branch (main/release/...)
|
||||
const isPullRequest = !!context.payload.pull_request
|
||||
// Latest commit in PR or in the branch
|
||||
const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha
|
||||
// Let users know that the comment is updated automatically
|
||||
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${context.payload.pull_request.head.sha} at ${new Date().toISOString()} :recycle:</sub></div>`
|
||||
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${commitSha} at ${new Date().toISOString()} :recycle:</sub></div>`
|
||||
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
|
||||
const githubActionsBotId = 41898282
|
||||
// Commend body itself
|
||||
@@ -166,22 +170,39 @@ module.exports = async ({ github, context, fetch, report }) => {
|
||||
|
||||
commentBody += autoupdateNotice
|
||||
|
||||
const { data: comments } = await github.rest.issues.listComments({
|
||||
issue_number: context.payload.number,
|
||||
let createCommentFn, listCommentsFn, updateCommentFn, issueNumberOrSha
|
||||
if (isPullRequest) {
|
||||
createCommentFn = github.rest.issues.createComment
|
||||
listCommentsFn = github.rest.issues.listComments
|
||||
updateCommentFn = github.rest.issues.updateComment
|
||||
issueNumberOrSha = {
|
||||
issue_number: context.payload.number,
|
||||
}
|
||||
} else {
|
||||
updateCommentFn = github.rest.repos.updateCommitComment
|
||||
listCommentsFn = github.rest.repos.listCommentsForCommit
|
||||
createCommentFn = github.rest.repos.createCommitComment
|
||||
issueNumberOrSha = {
|
||||
commit_sha: commitSha,
|
||||
}
|
||||
}
|
||||
|
||||
const { data: comments } = await listCommentsFn({
|
||||
...issueNumberOrSha,
|
||||
...ownerRepoParams,
|
||||
})
|
||||
|
||||
const comment = comments.find(comment => comment.user.id === githubActionsBotId && comment.body.startsWith(startMarker))
|
||||
if (comment) {
|
||||
await github.rest.issues.updateComment({
|
||||
await updateCommentFn({
|
||||
comment_id: comment.id,
|
||||
body: commentBody,
|
||||
...ownerRepoParams,
|
||||
})
|
||||
} else {
|
||||
await github.rest.issues.createComment({
|
||||
issue_number: context.payload.number,
|
||||
await createCommentFn({
|
||||
body: commentBody,
|
||||
...issueNumberOrSha,
|
||||
...ownerRepoParams,
|
||||
})
|
||||
}
|
||||
@@ -155,14 +155,14 @@ class PageserverHttpClient(requests.Session):
|
||||
return res_json
|
||||
|
||||
def tenant_create(
|
||||
self, new_tenant_id: Optional[TenantId] = None, conf: Optional[Dict[str, Any]] = None
|
||||
self, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None
|
||||
) -> TenantId:
|
||||
if conf is not None:
|
||||
assert "new_tenant_id" not in conf.keys()
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant",
|
||||
json={
|
||||
"new_tenant_id": str(new_tenant_id) if new_tenant_id else None,
|
||||
"new_tenant_id": str(new_tenant_id),
|
||||
**(conf or {}),
|
||||
},
|
||||
)
|
||||
@@ -293,13 +293,13 @@ class PageserverHttpClient(requests.Session):
|
||||
self,
|
||||
pg_version: PgVersion,
|
||||
tenant_id: TenantId,
|
||||
new_timeline_id: Optional[TimelineId] = None,
|
||||
new_timeline_id: TimelineId,
|
||||
ancestor_timeline_id: Optional[TimelineId] = None,
|
||||
ancestor_start_lsn: Optional[Lsn] = None,
|
||||
**kwargs,
|
||||
) -> Dict[Any, Any]:
|
||||
body: Dict[str, Any] = {
|
||||
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
|
||||
"new_timeline_id": str(new_timeline_id),
|
||||
"ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None,
|
||||
"ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None,
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ from contextlib import closing
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.types import TenantId
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
|
||||
@@ -25,21 +25,19 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
|
||||
ps.safe_psql("set FOO", password=tenant_token)
|
||||
ps.safe_psql("set FOO", password=pageserver_token)
|
||||
|
||||
new_timeline_id = env.neon_cli.create_branch(
|
||||
"test_pageserver_auth", tenant_id=env.initial_tenant
|
||||
)
|
||||
|
||||
# tenant can create branches
|
||||
tenant_http_client.timeline_create(
|
||||
pg_version=env.pg_version,
|
||||
tenant_id=env.initial_tenant,
|
||||
ancestor_timeline_id=new_timeline_id,
|
||||
new_timeline_id=TimelineId.generate(),
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
)
|
||||
# console can create branches for tenant
|
||||
pageserver_http_client.timeline_create(
|
||||
pg_version=env.pg_version,
|
||||
tenant_id=env.initial_tenant,
|
||||
ancestor_timeline_id=new_timeline_id,
|
||||
new_timeline_id=TimelineId.generate(),
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
)
|
||||
|
||||
# fail to create branch using token with different tenant_id
|
||||
@@ -49,18 +47,19 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
|
||||
invalid_tenant_http_client.timeline_create(
|
||||
pg_version=env.pg_version,
|
||||
tenant_id=env.initial_tenant,
|
||||
ancestor_timeline_id=new_timeline_id,
|
||||
new_timeline_id=TimelineId.generate(),
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
)
|
||||
|
||||
# create tenant using management token
|
||||
pageserver_http_client.tenant_create()
|
||||
pageserver_http_client.tenant_create(TenantId.generate())
|
||||
|
||||
# fail to create tenant using tenant token
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="Forbidden: Attempt to access management api with tenant scope. Permission denied",
|
||||
):
|
||||
tenant_http_client.tenant_create()
|
||||
tenant_http_client.tenant_create(TenantId.generate())
|
||||
|
||||
|
||||
def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -4,21 +4,12 @@ from typing import Any, Dict, List, Optional, Tuple, Type
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
PortDistributor,
|
||||
VanillaPostgres,
|
||||
)
|
||||
from fixtures.neon_fixtures import VanillaPostgres
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def httpserver_listen_address(port_distributor: PortDistributor):
|
||||
port = port_distributor.get_port()
|
||||
return ("localhost", port)
|
||||
|
||||
|
||||
def handle_db(dbs, roles, operation):
|
||||
if operation["op"] == "set":
|
||||
if "old_name" in operation and operation["old_name"] in dbs:
|
||||
|
||||
@@ -228,7 +228,6 @@ def proxy_with_metric_collector(
|
||||
@pytest.mark.asyncio
|
||||
async def test_proxy_metric_collection(
|
||||
httpserver: HTTPServer,
|
||||
httpserver_listen_address,
|
||||
proxy_with_metric_collector: NeonProxy,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
):
|
||||
|
||||
@@ -322,21 +322,22 @@ def test_pageserver_with_empty_tenants(
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_with_empty_timelines_dir = client.tenant_create()
|
||||
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
|
||||
tenant_with_empty_timelines = TenantId.generate()
|
||||
client.tenant_create(tenant_with_empty_timelines)
|
||||
temp_timelines = client.timeline_list(tenant_with_empty_timelines)
|
||||
for temp_timeline in temp_timelines:
|
||||
client.timeline_delete(
|
||||
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
|
||||
tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
|
||||
)
|
||||
files_in_timelines_dir = sum(
|
||||
1
|
||||
for _p in Path.iterdir(
|
||||
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
|
||||
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines) / "timelines"
|
||||
)
|
||||
)
|
||||
assert (
|
||||
files_in_timelines_dir == 0
|
||||
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
|
||||
), f"Tenant {tenant_with_empty_timelines} should have an empty timelines/ directory"
|
||||
|
||||
# Trigger timeline re-initialization after pageserver restart
|
||||
env.endpoints.stop_all()
|
||||
@@ -368,15 +369,17 @@ def test_pageserver_with_empty_tenants(
|
||||
broken_tenant_status["state"]["slug"] == "Broken"
|
||||
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
|
||||
|
||||
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines_dir)]
|
||||
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
|
||||
|
||||
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
|
||||
assert (
|
||||
loaded_tenant["state"]["slug"] == "Active"
|
||||
), "Tenant {tenant_with_empty_timelines_dir} with empty timelines dir should be active and ready for timeline creation"
|
||||
), "Tenant {tenant_with_empty_timelines} with empty timelines dir should be active and ready for timeline creation"
|
||||
|
||||
loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines_dir)
|
||||
loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines)
|
||||
assert (
|
||||
loaded_tenant_status["state"]["slug"] == "Active"
|
||||
), f"Tenant {tenant_with_empty_timelines_dir} without timelines dir should be active"
|
||||
), f"Tenant {tenant_with_empty_timelines} without timelines dir should be active"
|
||||
|
||||
time.sleep(1) # to allow metrics propagation
|
||||
|
||||
@@ -386,7 +389,7 @@ def test_pageserver_with_empty_tenants(
|
||||
"state": "Broken",
|
||||
}
|
||||
active_tenants_metric_filter = {
|
||||
"tenant_id": str(tenant_with_empty_timelines_dir),
|
||||
"tenant_id": str(tenant_with_empty_timelines),
|
||||
"state": "Active",
|
||||
}
|
||||
|
||||
@@ -398,7 +401,7 @@ def test_pageserver_with_empty_tenants(
|
||||
|
||||
assert (
|
||||
tenant_active_count == 1
|
||||
), f"Tenant {tenant_with_empty_timelines_dir} should have metric as active"
|
||||
), f"Tenant {tenant_with_empty_timelines} should have metric as active"
|
||||
|
||||
tenant_broken_count = int(
|
||||
ps_metrics.query_one(
|
||||
|
||||
Reference in New Issue
Block a user