Compare commits

..

1 Commits

Author SHA1 Message Date
Bojan Serafimov
29bb696f3a Don't distract startup with background threads 2023-07-20 22:31:38 -04:00
12 changed files with 78 additions and 335 deletions

1
Cargo.lock generated
View File

@@ -4867,7 +4867,6 @@ dependencies = [
"tempfile",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
"tracing-error",
"tracing-subscriber",

View File

@@ -222,10 +222,6 @@ fn main() -> Result<()> {
compute.state_changed.notify_all();
drop(state);
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute);
let _configurator_handle = launch_configurator(&compute);
// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;
@@ -242,6 +238,14 @@ fn main() -> Result<()> {
}
};
// Launch remaining service threads
//
// NOTE we do this after starting postgres so that these two extra threads
// don't blow the cpu budget and throttle the startup process.
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
// Wait for the child Postgres process forever. In this state Ctrl+C will
// propagate to Postgres and it will be shut down as well.
if let Some(mut pg) = pg {

View File

@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::thread;
use anyhow::Result;
use tracing::{error, info, instrument};
use compute_api::responses::ComputeStatus;
@@ -41,14 +42,13 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
}
}
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let compute = Arc::clone(compute);
thread::Builder::new()
Ok(thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute);
info!("configurator thread is exited");
})
.expect("cannot launch configurator thread")
})?)
}

View File

@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::{thread, time};
use anyhow::Result;
use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use tracing::{debug, info};
@@ -104,11 +105,10 @@ fn watch_compute_activity(compute: &ComputeNode) {
}
/// Launch a separate compute monitor thread and return its `JoinHandle`.
pub fn launch_monitor(state: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let state = Arc::clone(state);
thread::Builder::new()
Ok(thread::Builder::new()
.name("compute-monitor".into())
.spawn(move || watch_compute_activity(&state))
.expect("cannot launch compute monitor thread")
.spawn(move || watch_compute_activity(&state))?)
}

View File

@@ -6,7 +6,6 @@ use once_cell::sync::Lazy;
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::Error;
pub use prometheus::{core, default_registry, proto};
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_counter_vec, Counter, CounterVec};

View File

@@ -57,9 +57,9 @@ pub fn slru_may_delete_clogsegment(segpage: u32, cutoff_page: u32) -> bool {
// Multixact utils
pub fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize {
((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32)
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE as u32
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE as u32) as usize
((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize
}
pub fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 {
@@ -81,41 +81,3 @@ fn mx_offset_to_member_page(xid: u32) -> u32 {
pub fn mx_offset_to_member_segment(xid: u32) -> i32 {
(mx_offset_to_member_page(xid) / pg_constants::SLRU_PAGES_PER_SEGMENT) as i32
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_multixid_calc() {
// Check that the mx_offset_* functions produce the same values as the
// corresponding PostgreSQL C macros (MXOffsetTo*). These test values
// were generated by calling the PostgreSQL macros with a little C
// program.
assert_eq!(mx_offset_to_member_segment(0), 0);
assert_eq!(mx_offset_to_member_page(0), 0);
assert_eq!(mx_offset_to_flags_offset(0), 0);
assert_eq!(mx_offset_to_flags_bitshift(0), 0);
assert_eq!(mx_offset_to_member_offset(0), 4);
assert_eq!(mx_offset_to_member_segment(1), 0);
assert_eq!(mx_offset_to_member_page(1), 0);
assert_eq!(mx_offset_to_flags_offset(1), 0);
assert_eq!(mx_offset_to_flags_bitshift(1), 8);
assert_eq!(mx_offset_to_member_offset(1), 8);
assert_eq!(mx_offset_to_member_segment(123456789), 2358);
assert_eq!(mx_offset_to_member_page(123456789), 75462);
assert_eq!(mx_offset_to_flags_offset(123456789), 4780);
assert_eq!(mx_offset_to_flags_bitshift(123456789), 8);
assert_eq!(mx_offset_to_member_offset(123456789), 4788);
assert_eq!(mx_offset_to_member_segment(u32::MAX - 1), 82040);
assert_eq!(mx_offset_to_member_page(u32::MAX - 1), 2625285);
assert_eq!(mx_offset_to_flags_offset(u32::MAX - 1), 5160);
assert_eq!(mx_offset_to_flags_bitshift(u32::MAX - 1), 16);
assert_eq!(mx_offset_to_member_offset(u32::MAX - 1), 5172);
assert_eq!(mx_offset_to_member_segment(u32::MAX), 82040);
assert_eq!(mx_offset_to_member_page(u32::MAX), 2625285);
assert_eq!(mx_offset_to_flags_offset(u32::MAX), 5160);
assert_eq!(mx_offset_to_flags_bitshift(u32::MAX), 24);
assert_eq!(mx_offset_to_member_offset(u32::MAX), 5176);
}
}

View File

@@ -42,10 +42,6 @@ workspace_hack.workspace = true
const_format.workspace = true
# to use tokio channels as streams, this is faster to compile than async_stream
# why is it only here? no other crate should use it, streams are rarely needed.
tokio-stream = { version = "0.1.14" }
[dev-dependencies]
byteorder.workspace = true
bytes.workspace = true

View File

@@ -9,6 +9,7 @@ use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tokio::task::JoinError;
use tracing::{self, debug, info, info_span, warn, Instrument};
use std::future::Future;
@@ -147,140 +148,26 @@ impl Drop for RequestCancelled {
}
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
use bytes::{Bytes, BytesMut};
use std::io::Write as _;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
SERVE_METRICS_COUNT.inc();
/// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks.
struct ChannelWriter {
buffer: BytesMut,
tx: mpsc::Sender<std::io::Result<Bytes>>,
written: usize,
}
impl ChannelWriter {
fn new(buf_len: usize, tx: mpsc::Sender<std::io::Result<Bytes>>) -> Self {
assert_ne!(buf_len, 0);
ChannelWriter {
// split about half off the buffer from the start, because we flush depending on
// capacity. first flush will come sooner than without this, but now resizes will
// have better chance of picking up the "other" half. not guaranteed of course.
buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2),
tx,
written: 0,
}
}
fn flush0(&mut self) -> std::io::Result<usize> {
let n = self.buffer.len();
if n == 0 {
return Ok(0);
}
tracing::trace!(n, "flushing");
let ready = self.buffer.split().freeze();
// not ideal to call from blocking code to block_on, but we are sure that this
// operation does not spawn_blocking other tasks
let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async {
self.tx.send(Ok(ready)).await.map_err(|_| ())?;
// throttle sending to allow reuse of our buffer in `write`.
self.tx.reserve().await.map_err(|_| ())?;
// now the response task has picked up the buffer and hopefully started
// sending it to the client.
Ok(())
});
if res.is_err() {
return Err(std::io::ErrorKind::BrokenPipe.into());
}
self.written += n;
Ok(n)
}
fn flushed_bytes(&self) -> usize {
self.written
}
}
impl std::io::Write for ChannelWriter {
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
let remaining = self.buffer.capacity() - self.buffer.len();
let out_of_space = remaining < buf.len();
let original_len = buf.len();
if out_of_space {
let can_still_fit = buf.len() - remaining;
self.buffer.extend_from_slice(&buf[..can_still_fit]);
buf = &buf[can_still_fit..];
self.flush0()?;
}
// assume that this will often under normal operation just move the pointer back to the
// beginning of allocation, because previous split off parts are already sent and
// dropped.
self.buffer.extend_from_slice(buf);
Ok(original_len)
}
fn flush(&mut self) -> std::io::Result<()> {
self.flush0().map(|_| ())
}
}
let started_at = std::time::Instant::now();
let (tx, rx) = mpsc::channel(1);
let body = Body::wrap_stream(ReceiverStream::new(rx));
let mut writer = ChannelWriter::new(128 * 1024, tx);
let mut buffer = vec![];
let encoder = TextEncoder::new();
let metrics = tokio::task::spawn_blocking(move || {
// Currently we take a lot of mutexes while collecting metrics, so it's
// better to spawn a blocking task to avoid blocking the event loop.
metrics::gather()
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
encoder.encode(&metrics, &mut buffer).unwrap();
let response = Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
.body(body)
.body(Body::from(buffer))
.unwrap();
let span = info_span!("blocking");
tokio::task::spawn_blocking(move || {
let _span = span.entered();
let metrics = metrics::gather();
let res = encoder
.encode(&metrics, &mut writer)
.and_then(|_| writer.flush().map_err(|e| e.into()));
match res {
Ok(()) => {
tracing::info!(
bytes = writer.flushed_bytes(),
elapsed_ms = started_at.elapsed().as_millis(),
"responded /metrics"
);
}
Err(e) => {
tracing::warn!("failed to write out /metrics response: {e:#}");
// semantics of this error are quite... unclear. we want to error the stream out to
// abort the response to somehow notify the client that we failed.
//
// though, most likely the reason for failure is that the receiver is already gone.
drop(
writer
.tx
.blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
);
}
}
});
Ok(response)
}

View File

@@ -1,8 +1,6 @@
//! This module acts as a switchboard to access different repositories managed by this
//! page server.
use futures::Future;
use scopeguard::ScopeGuard;
use std::collections::{hash_map, HashMap};
use std::ffi::OsStr;
use std::path::Path;
@@ -16,7 +14,7 @@ use tokio::task::JoinSet;
use tracing::*;
use remote_storage::GenericRemoteStorage;
use utils::{completion, crashsafe};
use utils::crashsafe;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
@@ -341,61 +339,24 @@ pub async fn create_tenant(
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
tenant_map_insert(tenant_id, async {
tenant_map_insert(tenant_id, || {
// We're holding the tenants lock in write mode while doing local IO.
// If this section ever becomes contentious, introduce a new `TenantState::Creating`
// and do the work in that state.
let tenant_directory = super::create_tenant_files(
conf,
tenant_conf,
&tenant_id,
CreateTenantFilesMode::Create,
)?;
let tenant_directory = super::create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Create)?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
let guard_fs = scopeguard::guard((), |_| {
if let Err(e) = std::fs::remove_dir_all(&tenant_directory) {
// log the error but not throwing it somewhere
warn!("failed to cleanup when tenant {tenant_id} fails to start: {e}");
}
});
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant = schedule_local_tenant_processing(
conf,
&tenant_directory,
broker_client,
remote_storage,
None,
ctx,
)?;
// Put all code that might error into the check function.
let check = || {
fail::fail_point!("tenant-create-fail", |_| {
anyhow::bail!("failpoint: tenant-create-fail");
});
let crated_tenant_id = created_tenant.tenant_id();
anyhow::ensure!(
let crated_tenant_id = created_tenant.tenant_id();
anyhow::ensure!(
tenant_id == crated_tenant_id,
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {crated_tenant_id})",
);
Ok(())
};
if let Err(e) = check() {
// `schedule_local_tenant_processing` eventually launches the tenant's background task
// We need to shut them down before bailing out. Shutdown the tenant in the background.
tokio::spawn(async move {
let (_guard, shutdown_progress) = completion::channel();
created_tenant.shutdown(shutdown_progress, false).await
});
return Err(e);
}
// Ok, we're good. Disarm the cleanup scopeguards and return the tenant.
ScopeGuard::into_inner(guard_fs);
Ok(created_tenant)
}).await
}
@@ -532,7 +493,7 @@ pub async fn load_tenant(
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, async {
tenant_map_insert(tenant_id, || {
let tenant_path = conf.tenant_path(&tenant_id);
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
if tenant_ignore_mark.exists() {
@@ -609,61 +570,30 @@ pub async fn attach_tenant(
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, async {
let tenant_dir =
create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach)?;
let guard_fs = scopeguard::guard((), |_| {
if let Err(e) = std::fs::remove_dir_all(&tenant_dir) {
// log the error but not throwing it somewhere
warn!("failed to cleanup when tenant {tenant_id} fails to start: {e}");
}
});
tenant_map_insert(tenant_id, || {
let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach)?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
// Without the attach marker, schedule_local_tenant_processing will treat the attached tenant as fully attached
let marker_file_exists = conf
.tenant_attaching_mark_file_path(&tenant_id)
.try_exists()
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
let attached_tenant_id = attached_tenant.tenant_id();
anyhow::ensure!(
marker_file_exists,
"create_tenant_files should have created the attach marker file"
tenant_id == attached_tenant_id,
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})",
);
let attached_tenant = schedule_local_tenant_processing(
conf,
&tenant_dir,
broker_client,
Some(remote_storage),
None,
ctx,
)?;
// Put all code that might error in the check function.
let check = || {
let attached_tenant_id = attached_tenant.tenant_id();
anyhow::ensure!(
tenant_id == attached_tenant_id,
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})",
);
Ok(())
};
if let Err(e) = check() {
// `schedule_local_tenant_processing` eventually launches the tenant's background task
// We need to shut them down before bailing out. Shutdown the tenant in the background.
tokio::spawn(async move {
let (_guard, shutdown_progress) = completion::channel();
attached_tenant.shutdown(shutdown_progress, false).await
});
return Err(e);
}
// Ok, we're good. Disarm the cleanup scopeguards and return the tenant.
ScopeGuard::into_inner(guard_fs);
Ok(attached_tenant)
}).await?;
})
.await?;
Ok(())
}
@@ -685,12 +615,12 @@ pub enum TenantMapInsertError {
///
/// NB: the closure should return quickly because the current implementation of tenants map
/// serializes access through an `RwLock`.
async fn tenant_map_insert<Fut>(
async fn tenant_map_insert<F>(
tenant_id: TenantId,
insert_fut: Fut,
insert_fn: F,
) -> Result<Arc<Tenant>, TenantMapInsertError>
where
Fut: Future<Output = anyhow::Result<Arc<Tenant>>>,
F: FnOnce() -> anyhow::Result<Arc<Tenant>>,
{
let mut guard = TENANTS.write().await;
let m = match &mut *guard {
@@ -703,7 +633,7 @@ where
tenant_id,
e.get().current_state(),
)),
hash_map::Entry::Vacant(v) => match insert_fut.await {
hash_map::Entry::Vacant(v) => match insert_fn() {
Ok(tenant) => {
v.insert(tenant.clone());
Ok(tenant)

View File

@@ -3452,7 +3452,7 @@ impl Timeline {
let mut prev: Option<Key> = None;
for (next_key, _next_lsn, _size) in itertools::process_results(
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 < b.0),
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
)? {
if let Some(prev_key) = prev {
// just first fast filter
@@ -3492,7 +3492,11 @@ impl Timeline {
iter_iter.kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
(a_key, a_lsn) < (b_key, b_lsn)
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
} else {
false
}
@@ -3510,7 +3514,11 @@ impl Timeline {
iter_iter.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a;
let (b_key, b_lsn, _) = b;
(a_key, a_lsn) < (b_key, b_lsn)
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
})
},
)?;

View File

@@ -8,10 +8,6 @@ from fixtures.utils import query_scalar
# Now this test is very minimalistic -
# it only checks next_multixact_id field in restored pg_control,
# since we don't have functions to check multixact internals.
# We do check that the datadir contents exported from the
# pageserver match what the running PostgreSQL produced. This
# is enough to verify that the WAL records are handled correctly
# in the pageserver.
#
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
@@ -22,8 +18,8 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
cur = endpoint.connect().cursor()
cur.execute(
"""
CREATE TABLE t1(i int primary key, n_updated int);
INSERT INTO t1 select g, 0 from generate_series(1, 50) g;
CREATE TABLE t1(i int primary key);
INSERT INTO t1 select * from generate_series(1, 100);
"""
)
@@ -33,7 +29,6 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
# Lock entries using parallel connections in a round-robin fashion.
nclients = 20
update_every = 97
connections = []
for _ in range(nclients):
# Do not turn on autocommit. We want to hold the key-share locks.
@@ -41,20 +36,14 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
connections.append(conn)
# On each iteration, we commit the previous transaction on a connection,
# and issue another select. Each SELECT generates a new multixact that
# and issue antoher select. Each SELECT generates a new multixact that
# includes the new XID, and the XIDs of all the other parallel transactions.
# This generates enough traffic on both multixact offsets and members SLRUs
# to cross page boundaries.
for i in range(20000):
for i in range(5000):
conn = connections[i % nclients]
conn.commit()
# Perform some non-key UPDATEs too, to exercise different multixact
# member statuses.
if i % update_every == 0:
conn.cursor().execute(f"update t1 set n_updated = n_updated + 1 where i = {i % 50}")
else:
conn.cursor().execute("select * from t1 for key share")
conn.cursor().execute("select * from t1 for key share")
# We have multixacts now. We can close the connections.
for c in connections:

View File

@@ -21,7 +21,6 @@ from fixtures.neon_fixtures import (
RemoteStorageKind,
available_remote_storages,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import timeline_delete_wait_completed
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
@@ -407,33 +406,3 @@ def test_pageserver_with_empty_tenants(
assert (
tenant_broken_count == 1
), f"Tenant {tenant_without_timelines_dir} should have metric as broken"
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_failed_tenant_directory_is_removed(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
"""Tenants which fail to be created are cleaned up from disk and not created"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_pageserver_create_tenants_fail",
)
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(".*tenant-create-fail.*")
env.pageserver.allowed_errors.append(".*tenant-attach-fail.*")
env.pageserver.allowed_errors.append(".*Tenant is already in Broken state.*")
env.pageserver.allowed_errors.append(".*could not load tenant.*")
env.pageserver.allowed_errors.append(".*InternalServerError.*")
client = env.pageserver.http_client()
client.configure_failpoints(("tenant-create-fail", "return"))
tenant_id = TenantId.generate()
with pytest.raises(PageserverApiException, match="failpoint: tenant-create-fail"):
client.tenant_create(tenant_id)
assert not (env.repo_dir / "tenants" / str(tenant_id)).exists()
with pytest.raises(PageserverApiException, match="Tenant .* not found"):
# the tenant creation is not successful and should not be found
client.tenant_status(tenant_id)