mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 05:00:38 +00:00
Compare commits
1 Commits
skyzh/rm-f
...
dont-blow-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
29bb696f3a |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4867,7 +4867,6 @@ dependencies = [
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
"tracing-error",
|
||||
"tracing-subscriber",
|
||||
|
||||
@@ -222,10 +222,6 @@ fn main() -> Result<()> {
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
|
||||
// Launch remaining service threads
|
||||
let _monitor_handle = launch_monitor(&compute);
|
||||
let _configurator_handle = launch_configurator(&compute);
|
||||
|
||||
// Start Postgres
|
||||
let mut delay_exit = false;
|
||||
let mut exit_code = None;
|
||||
@@ -242,6 +238,14 @@ fn main() -> Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
// Launch remaining service threads
|
||||
//
|
||||
// NOTE we do this after starting postgres so that these two extra threads
|
||||
// don't blow the cpu budget and throttle the startup process.
|
||||
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
|
||||
let _configurator_handle =
|
||||
launch_configurator(&compute).expect("cannot launch configurator thread");
|
||||
|
||||
// Wait for the child Postgres process forever. In this state Ctrl+C will
|
||||
// propagate to Postgres and it will be shut down as well.
|
||||
if let Some(mut pg) = pg {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
use anyhow::Result;
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
use compute_api::responses::ComputeStatus;
|
||||
@@ -41,14 +42,13 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
|
||||
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||
let compute = Arc::clone(compute);
|
||||
|
||||
thread::Builder::new()
|
||||
Ok(thread::Builder::new()
|
||||
.name("compute-configurator".into())
|
||||
.spawn(move || {
|
||||
configurator_main_loop(&compute);
|
||||
info!("configurator thread is exited");
|
||||
})
|
||||
.expect("cannot launch configurator thread")
|
||||
})?)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
use std::{thread, time};
|
||||
|
||||
use anyhow::Result;
|
||||
use chrono::{DateTime, Utc};
|
||||
use postgres::{Client, NoTls};
|
||||
use tracing::{debug, info};
|
||||
@@ -104,11 +105,10 @@ fn watch_compute_activity(compute: &ComputeNode) {
|
||||
}
|
||||
|
||||
/// Launch a separate compute monitor thread and return its `JoinHandle`.
|
||||
pub fn launch_monitor(state: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
|
||||
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||
let state = Arc::clone(state);
|
||||
|
||||
thread::Builder::new()
|
||||
Ok(thread::Builder::new()
|
||||
.name("compute-monitor".into())
|
||||
.spawn(move || watch_compute_activity(&state))
|
||||
.expect("cannot launch compute monitor thread")
|
||||
.spawn(move || watch_compute_activity(&state))?)
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ use once_cell::sync::Lazy;
|
||||
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
|
||||
pub use prometheus::opts;
|
||||
pub use prometheus::register;
|
||||
pub use prometheus::Error;
|
||||
pub use prometheus::{core, default_registry, proto};
|
||||
pub use prometheus::{exponential_buckets, linear_buckets};
|
||||
pub use prometheus::{register_counter_vec, Counter, CounterVec};
|
||||
|
||||
@@ -57,9 +57,9 @@ pub fn slru_may_delete_clogsegment(segpage: u32, cutoff_page: u32) -> bool {
|
||||
// Multixact utils
|
||||
|
||||
pub fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize {
|
||||
((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32)
|
||||
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE as u32
|
||||
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE as u32) as usize
|
||||
((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16
|
||||
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE
|
||||
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize
|
||||
}
|
||||
|
||||
pub fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 {
|
||||
@@ -81,41 +81,3 @@ fn mx_offset_to_member_page(xid: u32) -> u32 {
|
||||
pub fn mx_offset_to_member_segment(xid: u32) -> i32 {
|
||||
(mx_offset_to_member_page(xid) / pg_constants::SLRU_PAGES_PER_SEGMENT) as i32
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_multixid_calc() {
|
||||
// Check that the mx_offset_* functions produce the same values as the
|
||||
// corresponding PostgreSQL C macros (MXOffsetTo*). These test values
|
||||
// were generated by calling the PostgreSQL macros with a little C
|
||||
// program.
|
||||
assert_eq!(mx_offset_to_member_segment(0), 0);
|
||||
assert_eq!(mx_offset_to_member_page(0), 0);
|
||||
assert_eq!(mx_offset_to_flags_offset(0), 0);
|
||||
assert_eq!(mx_offset_to_flags_bitshift(0), 0);
|
||||
assert_eq!(mx_offset_to_member_offset(0), 4);
|
||||
assert_eq!(mx_offset_to_member_segment(1), 0);
|
||||
assert_eq!(mx_offset_to_member_page(1), 0);
|
||||
assert_eq!(mx_offset_to_flags_offset(1), 0);
|
||||
assert_eq!(mx_offset_to_flags_bitshift(1), 8);
|
||||
assert_eq!(mx_offset_to_member_offset(1), 8);
|
||||
assert_eq!(mx_offset_to_member_segment(123456789), 2358);
|
||||
assert_eq!(mx_offset_to_member_page(123456789), 75462);
|
||||
assert_eq!(mx_offset_to_flags_offset(123456789), 4780);
|
||||
assert_eq!(mx_offset_to_flags_bitshift(123456789), 8);
|
||||
assert_eq!(mx_offset_to_member_offset(123456789), 4788);
|
||||
assert_eq!(mx_offset_to_member_segment(u32::MAX - 1), 82040);
|
||||
assert_eq!(mx_offset_to_member_page(u32::MAX - 1), 2625285);
|
||||
assert_eq!(mx_offset_to_flags_offset(u32::MAX - 1), 5160);
|
||||
assert_eq!(mx_offset_to_flags_bitshift(u32::MAX - 1), 16);
|
||||
assert_eq!(mx_offset_to_member_offset(u32::MAX - 1), 5172);
|
||||
assert_eq!(mx_offset_to_member_segment(u32::MAX), 82040);
|
||||
assert_eq!(mx_offset_to_member_page(u32::MAX), 2625285);
|
||||
assert_eq!(mx_offset_to_flags_offset(u32::MAX), 5160);
|
||||
assert_eq!(mx_offset_to_flags_bitshift(u32::MAX), 24);
|
||||
assert_eq!(mx_offset_to_member_offset(u32::MAX), 5176);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,10 +42,6 @@ workspace_hack.workspace = true
|
||||
|
||||
const_format.workspace = true
|
||||
|
||||
# to use tokio channels as streams, this is faster to compile than async_stream
|
||||
# why is it only here? no other crate should use it, streams are rarely needed.
|
||||
tokio-stream = { version = "0.1.14" }
|
||||
|
||||
[dev-dependencies]
|
||||
byteorder.workspace = true
|
||||
bytes.workspace = true
|
||||
|
||||
@@ -9,6 +9,7 @@ use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
|
||||
use once_cell::sync::Lazy;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
|
||||
use tokio::task::JoinError;
|
||||
use tracing::{self, debug, info, info_span, warn, Instrument};
|
||||
|
||||
use std::future::Future;
|
||||
@@ -147,140 +148,26 @@ impl Drop for RequestCancelled {
|
||||
}
|
||||
|
||||
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use std::io::Write as _;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
SERVE_METRICS_COUNT.inc();
|
||||
|
||||
/// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks.
|
||||
struct ChannelWriter {
|
||||
buffer: BytesMut,
|
||||
tx: mpsc::Sender<std::io::Result<Bytes>>,
|
||||
written: usize,
|
||||
}
|
||||
|
||||
impl ChannelWriter {
|
||||
fn new(buf_len: usize, tx: mpsc::Sender<std::io::Result<Bytes>>) -> Self {
|
||||
assert_ne!(buf_len, 0);
|
||||
ChannelWriter {
|
||||
// split about half off the buffer from the start, because we flush depending on
|
||||
// capacity. first flush will come sooner than without this, but now resizes will
|
||||
// have better chance of picking up the "other" half. not guaranteed of course.
|
||||
buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2),
|
||||
tx,
|
||||
written: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn flush0(&mut self) -> std::io::Result<usize> {
|
||||
let n = self.buffer.len();
|
||||
if n == 0 {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
tracing::trace!(n, "flushing");
|
||||
let ready = self.buffer.split().freeze();
|
||||
|
||||
// not ideal to call from blocking code to block_on, but we are sure that this
|
||||
// operation does not spawn_blocking other tasks
|
||||
let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async {
|
||||
self.tx.send(Ok(ready)).await.map_err(|_| ())?;
|
||||
|
||||
// throttle sending to allow reuse of our buffer in `write`.
|
||||
self.tx.reserve().await.map_err(|_| ())?;
|
||||
|
||||
// now the response task has picked up the buffer and hopefully started
|
||||
// sending it to the client.
|
||||
Ok(())
|
||||
});
|
||||
if res.is_err() {
|
||||
return Err(std::io::ErrorKind::BrokenPipe.into());
|
||||
}
|
||||
self.written += n;
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
fn flushed_bytes(&self) -> usize {
|
||||
self.written
|
||||
}
|
||||
}
|
||||
|
||||
impl std::io::Write for ChannelWriter {
|
||||
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
|
||||
let remaining = self.buffer.capacity() - self.buffer.len();
|
||||
|
||||
let out_of_space = remaining < buf.len();
|
||||
|
||||
let original_len = buf.len();
|
||||
|
||||
if out_of_space {
|
||||
let can_still_fit = buf.len() - remaining;
|
||||
self.buffer.extend_from_slice(&buf[..can_still_fit]);
|
||||
buf = &buf[can_still_fit..];
|
||||
self.flush0()?;
|
||||
}
|
||||
|
||||
// assume that this will often under normal operation just move the pointer back to the
|
||||
// beginning of allocation, because previous split off parts are already sent and
|
||||
// dropped.
|
||||
self.buffer.extend_from_slice(buf);
|
||||
Ok(original_len)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
self.flush0().map(|_| ())
|
||||
}
|
||||
}
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
|
||||
let body = Body::wrap_stream(ReceiverStream::new(rx));
|
||||
|
||||
let mut writer = ChannelWriter::new(128 * 1024, tx);
|
||||
|
||||
let mut buffer = vec![];
|
||||
let encoder = TextEncoder::new();
|
||||
|
||||
let metrics = tokio::task::spawn_blocking(move || {
|
||||
// Currently we take a lot of mutexes while collecting metrics, so it's
|
||||
// better to spawn a blocking task to avoid blocking the event loop.
|
||||
metrics::gather()
|
||||
})
|
||||
.await
|
||||
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
|
||||
encoder.encode(&metrics, &mut buffer).unwrap();
|
||||
|
||||
let response = Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, encoder.format_type())
|
||||
.body(body)
|
||||
.body(Body::from(buffer))
|
||||
.unwrap();
|
||||
|
||||
let span = info_span!("blocking");
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _span = span.entered();
|
||||
let metrics = metrics::gather();
|
||||
let res = encoder
|
||||
.encode(&metrics, &mut writer)
|
||||
.and_then(|_| writer.flush().map_err(|e| e.into()));
|
||||
|
||||
match res {
|
||||
Ok(()) => {
|
||||
tracing::info!(
|
||||
bytes = writer.flushed_bytes(),
|
||||
elapsed_ms = started_at.elapsed().as_millis(),
|
||||
"responded /metrics"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to write out /metrics response: {e:#}");
|
||||
// semantics of this error are quite... unclear. we want to error the stream out to
|
||||
// abort the response to somehow notify the client that we failed.
|
||||
//
|
||||
// though, most likely the reason for failure is that the receiver is already gone.
|
||||
drop(
|
||||
writer
|
||||
.tx
|
||||
.blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
//! This module acts as a switchboard to access different repositories managed by this
|
||||
//! page server.
|
||||
|
||||
use futures::Future;
|
||||
use scopeguard::ScopeGuard;
|
||||
use std::collections::{hash_map, HashMap};
|
||||
use std::ffi::OsStr;
|
||||
use std::path::Path;
|
||||
@@ -16,7 +14,7 @@ use tokio::task::JoinSet;
|
||||
use tracing::*;
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use utils::{completion, crashsafe};
|
||||
use utils::crashsafe;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
@@ -341,61 +339,24 @@ pub async fn create_tenant(
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Tenant>, TenantMapInsertError> {
|
||||
tenant_map_insert(tenant_id, async {
|
||||
tenant_map_insert(tenant_id, || {
|
||||
// We're holding the tenants lock in write mode while doing local IO.
|
||||
// If this section ever becomes contentious, introduce a new `TenantState::Creating`
|
||||
// and do the work in that state.
|
||||
let tenant_directory = super::create_tenant_files(
|
||||
conf,
|
||||
tenant_conf,
|
||||
&tenant_id,
|
||||
CreateTenantFilesMode::Create,
|
||||
)?;
|
||||
let tenant_directory = super::create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Create)?;
|
||||
// TODO: tenant directory remains on disk if we bail out from here on.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let guard_fs = scopeguard::guard((), |_| {
|
||||
if let Err(e) = std::fs::remove_dir_all(&tenant_directory) {
|
||||
// log the error but not throwing it somewhere
|
||||
warn!("failed to cleanup when tenant {tenant_id} fails to start: {e}");
|
||||
}
|
||||
});
|
||||
let created_tenant =
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let created_tenant = schedule_local_tenant_processing(
|
||||
conf,
|
||||
&tenant_directory,
|
||||
broker_client,
|
||||
remote_storage,
|
||||
None,
|
||||
ctx,
|
||||
)?;
|
||||
|
||||
// Put all code that might error into the check function.
|
||||
|
||||
let check = || {
|
||||
fail::fail_point!("tenant-create-fail", |_| {
|
||||
anyhow::bail!("failpoint: tenant-create-fail");
|
||||
});
|
||||
|
||||
let crated_tenant_id = created_tenant.tenant_id();
|
||||
anyhow::ensure!(
|
||||
let crated_tenant_id = created_tenant.tenant_id();
|
||||
anyhow::ensure!(
|
||||
tenant_id == crated_tenant_id,
|
||||
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {crated_tenant_id})",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
if let Err(e) = check() {
|
||||
// `schedule_local_tenant_processing` eventually launches the tenant's background task
|
||||
// We need to shut them down before bailing out. Shutdown the tenant in the background.
|
||||
tokio::spawn(async move {
|
||||
let (_guard, shutdown_progress) = completion::channel();
|
||||
created_tenant.shutdown(shutdown_progress, false).await
|
||||
});
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
// Ok, we're good. Disarm the cleanup scopeguards and return the tenant.
|
||||
ScopeGuard::into_inner(guard_fs);
|
||||
Ok(created_tenant)
|
||||
}).await
|
||||
}
|
||||
@@ -532,7 +493,7 @@ pub async fn load_tenant(
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), TenantMapInsertError> {
|
||||
tenant_map_insert(tenant_id, async {
|
||||
tenant_map_insert(tenant_id, || {
|
||||
let tenant_path = conf.tenant_path(&tenant_id);
|
||||
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
|
||||
if tenant_ignore_mark.exists() {
|
||||
@@ -609,61 +570,30 @@ pub async fn attach_tenant(
|
||||
remote_storage: GenericRemoteStorage,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), TenantMapInsertError> {
|
||||
tenant_map_insert(tenant_id, async {
|
||||
let tenant_dir =
|
||||
create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach)?;
|
||||
|
||||
let guard_fs = scopeguard::guard((), |_| {
|
||||
if let Err(e) = std::fs::remove_dir_all(&tenant_dir) {
|
||||
// log the error but not throwing it somewhere
|
||||
warn!("failed to cleanup when tenant {tenant_id} fails to start: {e}");
|
||||
}
|
||||
});
|
||||
tenant_map_insert(tenant_id, || {
|
||||
let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach)?;
|
||||
// TODO: tenant directory remains on disk if we bail out from here on.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
// Without the attach marker, schedule_local_tenant_processing will treat the attached tenant as fully attached
|
||||
let marker_file_exists = conf
|
||||
.tenant_attaching_mark_file_path(&tenant_id)
|
||||
.try_exists()
|
||||
.context("check for attach marker file existence")?;
|
||||
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
|
||||
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let attached_tenant_id = attached_tenant.tenant_id();
|
||||
anyhow::ensure!(
|
||||
marker_file_exists,
|
||||
"create_tenant_files should have created the attach marker file"
|
||||
tenant_id == attached_tenant_id,
|
||||
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})",
|
||||
);
|
||||
|
||||
let attached_tenant = schedule_local_tenant_processing(
|
||||
conf,
|
||||
&tenant_dir,
|
||||
broker_client,
|
||||
Some(remote_storage),
|
||||
None,
|
||||
ctx,
|
||||
)?;
|
||||
|
||||
// Put all code that might error in the check function.
|
||||
|
||||
let check = || {
|
||||
let attached_tenant_id = attached_tenant.tenant_id();
|
||||
anyhow::ensure!(
|
||||
tenant_id == attached_tenant_id,
|
||||
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})",
|
||||
);
|
||||
Ok(())
|
||||
};
|
||||
|
||||
if let Err(e) = check() {
|
||||
// `schedule_local_tenant_processing` eventually launches the tenant's background task
|
||||
// We need to shut them down before bailing out. Shutdown the tenant in the background.
|
||||
tokio::spawn(async move {
|
||||
let (_guard, shutdown_progress) = completion::channel();
|
||||
attached_tenant.shutdown(shutdown_progress, false).await
|
||||
});
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
// Ok, we're good. Disarm the cleanup scopeguards and return the tenant.
|
||||
ScopeGuard::into_inner(guard_fs);
|
||||
Ok(attached_tenant)
|
||||
}).await?;
|
||||
})
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -685,12 +615,12 @@ pub enum TenantMapInsertError {
|
||||
///
|
||||
/// NB: the closure should return quickly because the current implementation of tenants map
|
||||
/// serializes access through an `RwLock`.
|
||||
async fn tenant_map_insert<Fut>(
|
||||
async fn tenant_map_insert<F>(
|
||||
tenant_id: TenantId,
|
||||
insert_fut: Fut,
|
||||
insert_fn: F,
|
||||
) -> Result<Arc<Tenant>, TenantMapInsertError>
|
||||
where
|
||||
Fut: Future<Output = anyhow::Result<Arc<Tenant>>>,
|
||||
F: FnOnce() -> anyhow::Result<Arc<Tenant>>,
|
||||
{
|
||||
let mut guard = TENANTS.write().await;
|
||||
let m = match &mut *guard {
|
||||
@@ -703,7 +633,7 @@ where
|
||||
tenant_id,
|
||||
e.get().current_state(),
|
||||
)),
|
||||
hash_map::Entry::Vacant(v) => match insert_fut.await {
|
||||
hash_map::Entry::Vacant(v) => match insert_fn() {
|
||||
Ok(tenant) => {
|
||||
v.insert(tenant.clone());
|
||||
Ok(tenant)
|
||||
|
||||
@@ -3452,7 +3452,7 @@ impl Timeline {
|
||||
let mut prev: Option<Key> = None;
|
||||
for (next_key, _next_lsn, _size) in itertools::process_results(
|
||||
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|
||||
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 < b.0),
|
||||
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
|
||||
)? {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter
|
||||
@@ -3492,7 +3492,11 @@ impl Timeline {
|
||||
iter_iter.kmerge_by(|a, b| {
|
||||
if let Ok((a_key, a_lsn, _)) = a {
|
||||
if let Ok((b_key, b_lsn, _)) = b {
|
||||
(a_key, a_lsn) < (b_key, b_lsn)
|
||||
match a_key.cmp(b_key) {
|
||||
Ordering::Less => true,
|
||||
Ordering::Equal => a_lsn <= b_lsn,
|
||||
Ordering::Greater => false,
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
@@ -3510,7 +3514,11 @@ impl Timeline {
|
||||
iter_iter.kmerge_by(|a, b| {
|
||||
let (a_key, a_lsn, _) = a;
|
||||
let (b_key, b_lsn, _) = b;
|
||||
(a_key, a_lsn) < (b_key, b_lsn)
|
||||
match a_key.cmp(b_key) {
|
||||
Ordering::Less => true,
|
||||
Ordering::Equal => a_lsn <= b_lsn,
|
||||
Ordering::Greater => false,
|
||||
}
|
||||
})
|
||||
},
|
||||
)?;
|
||||
|
||||
@@ -8,10 +8,6 @@ from fixtures.utils import query_scalar
|
||||
# Now this test is very minimalistic -
|
||||
# it only checks next_multixact_id field in restored pg_control,
|
||||
# since we don't have functions to check multixact internals.
|
||||
# We do check that the datadir contents exported from the
|
||||
# pageserver match what the running PostgreSQL produced. This
|
||||
# is enough to verify that the WAL records are handled correctly
|
||||
# in the pageserver.
|
||||
#
|
||||
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
@@ -22,8 +18,8 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TABLE t1(i int primary key, n_updated int);
|
||||
INSERT INTO t1 select g, 0 from generate_series(1, 50) g;
|
||||
CREATE TABLE t1(i int primary key);
|
||||
INSERT INTO t1 select * from generate_series(1, 100);
|
||||
"""
|
||||
)
|
||||
|
||||
@@ -33,7 +29,6 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
|
||||
# Lock entries using parallel connections in a round-robin fashion.
|
||||
nclients = 20
|
||||
update_every = 97
|
||||
connections = []
|
||||
for _ in range(nclients):
|
||||
# Do not turn on autocommit. We want to hold the key-share locks.
|
||||
@@ -41,20 +36,14 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
connections.append(conn)
|
||||
|
||||
# On each iteration, we commit the previous transaction on a connection,
|
||||
# and issue another select. Each SELECT generates a new multixact that
|
||||
# and issue antoher select. Each SELECT generates a new multixact that
|
||||
# includes the new XID, and the XIDs of all the other parallel transactions.
|
||||
# This generates enough traffic on both multixact offsets and members SLRUs
|
||||
# to cross page boundaries.
|
||||
for i in range(20000):
|
||||
for i in range(5000):
|
||||
conn = connections[i % nclients]
|
||||
conn.commit()
|
||||
|
||||
# Perform some non-key UPDATEs too, to exercise different multixact
|
||||
# member statuses.
|
||||
if i % update_every == 0:
|
||||
conn.cursor().execute(f"update t1 set n_updated = n_updated + 1 where i = {i % 50}")
|
||||
else:
|
||||
conn.cursor().execute("select * from t1 for key share")
|
||||
conn.cursor().execute("select * from t1 for key share")
|
||||
|
||||
# We have multixacts now. We can close the connections.
|
||||
for c in connections:
|
||||
|
||||
@@ -21,7 +21,6 @@ from fixtures.neon_fixtures import (
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import timeline_delete_wait_completed
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
@@ -407,33 +406,3 @@ def test_pageserver_with_empty_tenants(
|
||||
assert (
|
||||
tenant_broken_count == 1
|
||||
), f"Tenant {tenant_without_timelines_dir} should have metric as broken"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_failed_tenant_directory_is_removed(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
"""Tenants which fail to be created are cleaned up from disk and not created"""
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_pageserver_create_tenants_fail",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.pageserver.allowed_errors.append(".*tenant-create-fail.*")
|
||||
env.pageserver.allowed_errors.append(".*tenant-attach-fail.*")
|
||||
env.pageserver.allowed_errors.append(".*Tenant is already in Broken state.*")
|
||||
env.pageserver.allowed_errors.append(".*could not load tenant.*")
|
||||
env.pageserver.allowed_errors.append(".*InternalServerError.*")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
client.configure_failpoints(("tenant-create-fail", "return"))
|
||||
tenant_id = TenantId.generate()
|
||||
with pytest.raises(PageserverApiException, match="failpoint: tenant-create-fail"):
|
||||
client.tenant_create(tenant_id)
|
||||
|
||||
assert not (env.repo_dir / "tenants" / str(tenant_id)).exists()
|
||||
with pytest.raises(PageserverApiException, match="Tenant .* not found"):
|
||||
# the tenant creation is not successful and should not be found
|
||||
client.tenant_status(tenant_id)
|
||||
|
||||
Reference in New Issue
Block a user