Compare commits

..

8 Commits

Author SHA1 Message Date
Konstantin Knizhnik
773a8836cc Update pgxn/neon/file_cache.c
Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
2023-10-30 19:14:00 +02:00
Konstantin Knizhnik
da34a9ed89 Add neon_stats view 2023-10-12 09:44:15 +03:00
Konstantin Knizhnik
e4b74e375c Remove wrong comment 2023-10-11 08:48:22 +03:00
Konstantin Knizhnik
c3d1e26361 Add missed neon--1.0--1.1.sql file 2023-10-11 08:40:53 +03:00
Konstantin Knizhnik
665eff5e08 Show information about local file cache in EXPLAIN ANALYZE 2023-10-10 10:11:30 +03:00
Konstantin Knizhnik
7aeec2bc9d Apply review suggestions 2023-10-09 22:23:28 +03:00
Konstantin Knizhnik
bc3a5d571f Use popcount32 to calculate number of used pages 2023-10-09 22:07:10 +03:00
Konstantin Knizhnik
2324e5a3d3 Rebased version of LFC fixes 2023-10-08 15:02:42 +03:00
44 changed files with 707 additions and 1230 deletions

View File

@@ -224,8 +224,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
FROM build-deps AS vector-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.1.tar.gz -O pgvector.tar.gz && \
echo "cc7a8e034a96e30a819911ac79d32f6bc47bdd1aa2de4d7d4904e26b83209dc8 pgvector.tar.gz" | sha256sum --check && \
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.0.tar.gz -O pgvector.tar.gz && \
echo "d8aa3504b215467ca528525a6de12c3f85f9891b091ce0e5864dd8a9b757f77b pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -368,8 +368,8 @@ RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar
FROM build-deps AS plpgsql-check-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.5.3.tar.gz -O plpgsql_check.tar.gz && \
echo "6631ec3e7fb3769eaaf56e3dfedb829aa761abf163d13dba354b4c218508e1c0 plpgsql_check.tar.gz" | sha256sum --check && \
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.4.0.tar.gz -O plpgsql_check.tar.gz && \
echo "9ba58387a279b35a3bfa39ee611e5684e6cddb2ba046ddb2c5190b3bd2ca254a plpgsql_check.tar.gz" | sha256sum --check && \
mkdir plpgsql_check-src && cd plpgsql_check-src && tar xvzf ../plpgsql_check.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \

View File

@@ -116,7 +116,6 @@ fn main() -> Result<()> {
"attachment_service" => handle_attachment_service(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env),
"endpoint" => handle_endpoint(sub_args, &env),
"mappings" => handle_mappings(sub_args, &mut env),
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
_ => bail!("unexpected subcommand {sub_name}"),
};
@@ -817,38 +816,6 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
Ok(())
}
fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match sub_match.subcommand() {
Some(ep_subcommand_data) => ep_subcommand_data,
None => bail!("no mappings subcommand provided"),
};
match sub_name {
"map" => {
let branch_name = sub_args
.get_one::<String>("branch-name")
.expect("branch-name argument missing");
let tenant_id = sub_args
.get_one::<String>("tenant-id")
.map(|x| TenantId::from_str(x))
.expect("tenant-id argument missing")
.expect("malformed tenant-id arg");
let timeline_id = sub_args
.get_one::<String>("timeline-id")
.map(|x| TimelineId::from_str(x))
.expect("timeline-id argument missing")
.expect("malformed timeline-id arg");
env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?;
Ok(())
}
other => unimplemented!("mappings subcommand {other}"),
}
}
fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
@@ -1117,7 +1084,6 @@ fn cli() -> Command {
// --id, when using a pageserver command
let pageserver_id_arg = Arg::new("pageserver-id")
.long("id")
.global(true)
.help("pageserver id")
.required(false);
// --pageserver-id when using a non-pageserver command
@@ -1288,20 +1254,17 @@ fn cli() -> Command {
Command::new("pageserver")
.arg_required_else_help(true)
.about("Manage pageserver")
.arg(pageserver_id_arg)
.subcommand(Command::new("status"))
.subcommand(Command::new("start")
.about("Start local pageserver")
.arg(pageserver_config_args.clone())
)
.subcommand(Command::new("stop")
.about("Stop local pageserver")
.arg(stop_mode_arg.clone())
)
.subcommand(Command::new("restart")
.about("Restart local pageserver")
.arg(pageserver_config_args.clone())
)
.arg(pageserver_id_arg.clone())
.subcommand(Command::new("start").about("Start local pageserver")
.arg(pageserver_id_arg.clone())
.arg(pageserver_config_args.clone()))
.subcommand(Command::new("stop").about("Stop local pageserver")
.arg(pageserver_id_arg.clone())
.arg(stop_mode_arg.clone()))
.subcommand(Command::new("restart").about("Restart local pageserver")
.arg(pageserver_id_arg.clone())
.arg(pageserver_config_args.clone()))
)
.subcommand(
Command::new("attachment_service")
@@ -1358,8 +1321,8 @@ fn cli() -> Command {
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
.arg(endpoint_id_arg.clone())
.arg(tenant_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(timeline_id_arg.clone())
.arg(branch_name_arg)
.arg(timeline_id_arg)
.arg(lsn_arg)
.arg(pg_port_arg)
.arg(http_port_arg)
@@ -1372,7 +1335,7 @@ fn cli() -> Command {
.subcommand(
Command::new("stop")
.arg(endpoint_id_arg)
.arg(tenant_id_arg.clone())
.arg(tenant_id_arg)
.arg(
Arg::new("destroy")
.help("Also delete data directory (now optional, should be default in future)")
@@ -1383,18 +1346,6 @@ fn cli() -> Command {
)
)
.subcommand(
Command::new("mappings")
.arg_required_else_help(true)
.about("Manage neon_local branch name mappings")
.subcommand(
Command::new("map")
.about("Create new mapping which cannot exist already")
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
)
)
// Obsolete old name for 'endpoint'. We now just print an error if it's used.
.subcommand(
Command::new("pg")

View File

@@ -442,20 +442,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
trace!("got message {:?}", msg);
let result = self.process_message(handler, msg, &mut query_string).await;
tokio::select!(
biased;
_ = shutdown_watcher() => {
// We were requested to shut down.
tracing::info!("shutdown request received during response flush");
return Ok(())
},
flush_r = self.flush() => {
flush_r?;
}
);
self.flush().await?;
match result? {
ProcessMsgResult::Continue => {
self.flush().await?;
continue;
}
ProcessMsgResult::Break => break,

View File

@@ -4,7 +4,7 @@
//! allowing multiple api users to independently work with the same S3 bucket, if
//! their bucket prefixes are both specified and different.
use std::{borrow::Cow, sync::Arc};
use std::sync::Arc;
use anyhow::Context;
use aws_config::{
@@ -556,20 +556,6 @@ impl RemoteStorage for S3Bucket {
.deleted_objects_total
.inc_by(chunk.len() as u64);
if let Some(errors) = resp.errors {
// Log a bounded number of the errors within the response:
// these requests can carry 1000 keys so logging each one
// would be too verbose, especially as errors may lead us
// to retry repeatedly.
const LOG_UP_TO_N_ERRORS: usize = 10;
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
tracing::warn!(
"DeleteObjects key {} failed: {}: {}",
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
e.message.as_ref().map(Cow::from).unwrap_or("".into())
);
}
return Err(anyhow::format_err!(
"Failed to delete {} objects",
errors.len()

View File

@@ -119,7 +119,6 @@ pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
match api_error {
ApiError::ResourceUnavailable(_) => info!("Error processing HTTP request: {api_error:#}"),
ApiError::NotFound(_) => info!("Error processing HTTP request: {api_error:#}"),
ApiError::InternalServerError(_) => error!("Error processing HTTP request: {api_error:?}"),
_ => error!("Error processing HTTP request: {api_error:#}"),
}

View File

@@ -11,7 +11,10 @@ use std::sync::{Arc, Barrier};
use bytes::{Buf, Bytes};
use pageserver::{
config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager,
config::PageServerConf,
repository::Key,
walrecord::NeonWalRecord,
walredo::{PostgresRedoManager, WalRedoError},
};
use utils::{id::TenantId, lsn::Lsn};
@@ -149,7 +152,7 @@ impl Drop for JoinOnDrop {
}
}
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> anyhow::Result<()>
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> Result<(), WalRedoError>
where
I: IntoIterator<Item = Request>,
{
@@ -157,7 +160,7 @@ where
input.into_iter().try_for_each(|req| {
let page = req.execute(manager)?;
assert_eq!(page.remaining(), 8192);
anyhow::Ok(())
Ok::<_, WalRedoError>(())
})
}
@@ -470,7 +473,7 @@ struct Request {
}
impl Request {
fn execute(self, manager: &PostgresRedoManager) -> anyhow::Result<Bytes> {
fn execute(self, manager: &PostgresRedoManager) -> Result<Bytes, WalRedoError> {
use pageserver::walredo::WalRedoManager;
let Request {

View File

@@ -153,7 +153,7 @@ impl FlushOp {
#[derive(Clone, Debug)]
pub struct DeletionQueueClient {
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
@@ -212,7 +212,7 @@ where
/// Files ending with this suffix will be ignored and erased
/// during recovery as startup.
const TEMP_SUFFIX: &str = "tmp";
const TEMP_SUFFIX: &str = ".tmp";
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
@@ -416,7 +416,7 @@ pub enum DeletionQueueError {
impl DeletionQueueClient {
pub(crate) fn broken() -> Self {
// Channels whose receivers are immediately dropped.
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let (tx, _rx) = tokio::sync::mpsc::channel(1);
let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
Self {
tx,
@@ -428,12 +428,12 @@ impl DeletionQueueClient {
/// This is cancel-safe. If you drop the future before it completes, the message
/// is not pushed, although in the context of the deletion queue it doesn't matter: once
/// we decide to do a deletion the decision is always final.
fn do_push<T>(
async fn do_push<T>(
&self,
queue: &tokio::sync::mpsc::UnboundedSender<T>,
queue: &tokio::sync::mpsc::Sender<T>,
msg: T,
) -> Result<(), DeletionQueueError> {
match queue.send(msg) {
match queue.send(msg).await {
Ok(_) => Ok(()),
Err(e) => {
// This shouldn't happen, we should shut down all tenants before
@@ -445,7 +445,7 @@ impl DeletionQueueClient {
}
}
pub(crate) fn recover(
pub(crate) async fn recover(
&self,
attached_tenants: HashMap<TenantId, Generation>,
) -> Result<(), DeletionQueueError> {
@@ -453,6 +453,7 @@ impl DeletionQueueClient {
&self.tx,
ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
)
.await
}
/// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
@@ -525,21 +526,6 @@ impl DeletionQueueClient {
return self.flush_immediate().await;
}
self.push_layers_sync(tenant_id, timeline_id, current_generation, layers)
}
/// When a Tenant has a generation, push_layers is always synchronous because
/// the ListValidator channel is an unbounded channel.
///
/// This can be merged into push_layers when we remove the Generation-less mode
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
pub(crate) fn push_layers_sync(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerFileName, Generation)>,
) -> Result<(), DeletionQueueError> {
metrics::DELETION_QUEUE
.keys_submitted
.inc_by(layers.len() as u64);
@@ -553,16 +539,17 @@ impl DeletionQueueClient {
objects: Vec::new(),
}),
)
.await
}
/// This is cancel-safe. If you drop the future the flush may still happen in the background.
async fn do_flush<T>(
&self,
queue: &tokio::sync::mpsc::UnboundedSender<T>,
queue: &tokio::sync::mpsc::Sender<T>,
msg: T,
rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<(), DeletionQueueError> {
self.do_push(queue, msg)?;
self.do_push(queue, msg).await?;
if rx.await.is_err() {
// This shouldn't happen if tenants are shut down before deletion queue. If we
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
@@ -583,18 +570,6 @@ impl DeletionQueueClient {
.await
}
/// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
/// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
/// detach where flushing is nice but not necessary.
///
/// This function provides no guarantees of work being done.
pub fn flush_advisory(&self) {
let (flush_op, _) = FlushOp::new();
// Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
}
// Wait until all previous deletions are executed
pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
debug!("flush_execute: flushing to deletion lists...");
@@ -611,7 +586,9 @@ impl DeletionQueueClient {
// Flush any immediate-mode deletions (the above backend flush will only flush
// the executor if deletions had flowed through the backend)
debug!("flush_execute: flushing execution...");
self.flush_immediate().await?;
let (flush_op, rx) = FlushOp::new();
self.do_flush(&self.executor_tx, DeleterMessage::Flush(flush_op), rx)
.await?;
debug!("flush_execute: finished flushing execution...");
Ok(())
}
@@ -666,10 +643,8 @@ impl DeletionQueue {
where
C: ControlPlaneGenerationsApi + Send + Sync,
{
// Unbounded channel: enables non-async functions to submit deletions. The actual length is
// constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
// enough to avoid this taking pathologically large amount of memory.
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
// Deep channel: it consumes deletions from all timelines and we do not want to block them
let (tx, rx) = tokio::sync::mpsc::channel(16384);
// Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
@@ -982,7 +957,7 @@ mod test {
// Basic test that the deletion queue processes the deletions we pass into it
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
client.recover(HashMap::new()).await?;
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let tenant_id = ctx.harness.tenant_id;
@@ -1050,7 +1025,7 @@ mod test {
async fn deletion_queue_validation() -> anyhow::Result<()> {
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
client.recover(HashMap::new()).await?;
// Generation that the control plane thinks is current
let latest_generation = Generation::new(0xdeadbeef);
@@ -1107,7 +1082,7 @@ mod test {
// Basic test that the deletion queue processes the deletions we pass into it
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
client.recover(HashMap::new()).await?;
let tenant_id = ctx.harness.tenant_id;
@@ -1170,7 +1145,9 @@ mod test {
drop(client);
ctx.restart().await;
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::from([(tenant_id, now_generation)]))?;
client
.recover(HashMap::from([(tenant_id, now_generation)]))
.await?;
info!("Flush-executing");
client.flush_execute().await?;
@@ -1196,7 +1173,7 @@ pub(crate) mod mock {
};
pub struct ConsumerState {
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
}
@@ -1273,7 +1250,7 @@ pub(crate) mod mock {
}
pub struct MockDeletionQueue {
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
executed: Arc<AtomicUsize>,
remote_storage: Option<GenericRemoteStorage>,
@@ -1283,7 +1260,7 @@ pub(crate) mod mock {
impl MockDeletionQueue {
pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let (tx, rx) = tokio::sync::mpsc::channel(16384);
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
let executed = Arc::new(AtomicUsize::new(0));

View File

@@ -13,7 +13,6 @@ use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use utils::backoff;
use crate::metrics;
@@ -64,19 +63,7 @@ impl Deleter {
Err(anyhow::anyhow!("failpoint hit"))
});
// A backoff::retry is used here for two reasons:
// - To provide a backoff rather than busy-polling the API on errors
// - To absorb transient 429/503 conditions without hitting our error
// logging path for issues deleting objects.
backoff::retry(
|| async { self.remote_storage.delete_objects(&self.accumulator).await },
|_| false,
3,
10,
"executing deletion batch",
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")),
)
.await
self.remote_storage.delete_objects(&self.accumulator).await
}
/// Block until everything in accumulator has been executed
@@ -101,10 +88,7 @@ impl Deleter {
self.accumulator.clear();
}
Err(e) => {
if self.cancel.is_cancelled() {
return Err(DeletionQueueError::ShuttingDown);
}
warn!("DeleteObjects request failed: {e:#}, will continue trying");
warn!("DeleteObjects request failed: {e:#}, will retry");
metrics::DELETION_QUEUE
.remote_errors
.with_label_values(&["execute"])

View File

@@ -85,7 +85,7 @@ pub(super) struct ListWriter {
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
// Outbound requests to the backend to execute deletion lists we have composed.
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
@@ -111,7 +111,7 @@ impl ListWriter {
pub(super) fn new(
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
cancel: CancellationToken,
) -> Self {
@@ -230,7 +230,6 @@ impl ListWriter {
let list_name_pattern =
Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
let temp_extension = format!(".{TEMP_SUFFIX}");
let header_path = self.conf.deletion_header_path();
let mut seqs: Vec<u64> = Vec::new();
while let Some(dentry) = dir.next_entry().await? {
@@ -242,7 +241,7 @@ impl ListWriter {
continue;
}
if dentry_str.ends_with(&temp_extension) {
if dentry_str.ends_with(TEMP_SUFFIX) {
info!("Cleaning up temporary file {dentry_str}");
let absolute_path =
deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));

View File

@@ -77,7 +77,7 @@ impl State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
.iter()
.map(|v| v.parse().unwrap())
.collect::<Vec<_>>();
@@ -136,7 +136,9 @@ impl From<PageReconstructError> for ApiError {
PageReconstructError::AncestorStopping(_) => {
ApiError::ResourceUnavailable(format!("{pre}").into())
}
PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
PageReconstructError::WalRedo(pre) => {
ApiError::InternalServerError(anyhow::Error::new(pre))
}
}
}
}
@@ -162,6 +164,9 @@ impl From<TenantStateError> for ApiError {
fn from(tse: TenantStateError) -> ApiError {
match tse {
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
TenantStateError::NotActive(_) => {
ApiError::ResourceUnavailable("Tenant not yet active".into())
}
TenantStateError::IsStopping(_) => {
ApiError::ResourceUnavailable("Tenant is stopping".into())
}
@@ -391,9 +396,6 @@ async fn timeline_create_handler(
format!("{err:#}")
))
}
Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
}
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
}
}
@@ -570,14 +572,9 @@ async fn tenant_detach_handler(
let state = get_state(&request);
let conf = state.conf;
mgr::detach_tenant(
conf,
tenant_id,
detach_ignored.unwrap_or(false),
&state.deletion_queue_client,
)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false))
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
json_response(StatusCode::OK, ())
}
@@ -1034,7 +1031,7 @@ async fn put_tenant_location_config_handler(
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
mgr::detach_tenant(conf, tenant_id, true)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
return json_response(StatusCode::OK, ());

View File

@@ -35,7 +35,6 @@ use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::field;
use tracing::*;
use utils::id::ConnectionId;
@@ -65,6 +64,69 @@ use crate::trace::Tracer;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
fn copyin_stream<IO>(pgb: &mut PostgresBackend<IO>) -> impl Stream<Item = io::Result<Bytes>> + '_
where
IO: AsyncRead + AsyncWrite + Unpin,
{
async_stream::try_stream! {
loop {
let msg = tokio::select! {
biased;
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = "pageserver is shutting down";
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
};
match msg {
Ok(Some(message)) => {
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
FeMessage::CopyDone => { break },
FeMessage::Sync => continue,
FeMessage::Terminate => {
let msg = "client terminated connection with Terminate message during COPY";
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
break;
}
m => {
let msg = format!("unexpected message {m:?}");
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
Err(io::Error::new(io::ErrorKind::Other, msg))?;
break;
}
};
yield copy_data_bytes;
}
Ok(None) => {
let msg = "client closed connection during COPY";
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
pgb.flush().await?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
}
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
Err(io_error)?;
}
Err(other) => {
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
}
};
}
}
}
/// Read the end of a tar archive.
///
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
@@ -222,13 +284,7 @@ async fn page_service_conn_main(
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(
conf,
broker_client,
auth,
connection_ctx,
task_mgr::shutdown_token(),
);
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
@@ -262,10 +318,6 @@ struct PageServerHandler {
/// For each query received over the connection,
/// `process_query` creates a child context from this one.
connection_ctx: RequestContext,
/// A token that should fire when the tenant transitions from
/// attached state, or when the pageserver is shutting down.
cancel: CancellationToken,
}
impl PageServerHandler {
@@ -274,7 +326,6 @@ impl PageServerHandler {
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
connection_ctx: RequestContext,
cancel: CancellationToken,
) -> Self {
PageServerHandler {
_conf: conf,
@@ -282,91 +333,6 @@ impl PageServerHandler {
auth,
claims: None,
connection_ctx,
cancel,
}
}
/// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use
/// this rather than naked flush() in order to shut down promptly. Without this, we would
/// block shutdown of a tenant if a postgres client was failing to consume bytes we send
/// in the flush.
async fn flush_cancellable<IO>(&self, pgb: &mut PostgresBackend<IO>) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
tokio::select!(
flush_r = pgb.flush() => {
Ok(flush_r?)
},
_ = self.cancel.cancelled() => {
Err(QueryError::Other(anyhow::anyhow!("Shutting down")))
}
)
}
fn copyin_stream<'a, IO>(
&'a self,
pgb: &'a mut PostgresBackend<IO>,
) -> impl Stream<Item = io::Result<Bytes>> + 'a
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
async_stream::try_stream! {
loop {
let msg = tokio::select! {
biased;
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = "pageserver is shutting down";
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
};
match msg {
Ok(Some(message)) => {
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
FeMessage::CopyDone => { break },
FeMessage::Sync => continue,
FeMessage::Terminate => {
let msg = "client terminated connection with Terminate message during COPY";
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
break;
}
m => {
let msg = format!("unexpected message {m:?}");
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
Err(io::Error::new(io::ErrorKind::Other, msg))?;
break;
}
};
yield copy_data_bytes;
}
Ok(None) => {
let msg = "client closed connection during COPY";
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
self.flush_cancellable(pgb).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
}
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
Err(io_error)?;
}
Err(other) => {
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
}
};
}
}
}
@@ -406,7 +372,7 @@ impl PageServerHandler {
// switch client to COPYBOTH
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
self.flush_cancellable(pgb).await?;
pgb.flush().await?;
let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
@@ -499,7 +465,7 @@ impl PageServerHandler {
});
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
self.flush_cancellable(pgb).await?;
pgb.flush().await?;
}
Ok(())
}
@@ -542,9 +508,9 @@ impl PageServerHandler {
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
self.flush_cancellable(pgb).await?;
pgb.flush().await?;
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
timeline
.import_basebackup_from_tar(
&mut copyin_reader,
@@ -597,8 +563,8 @@ impl PageServerHandler {
// Import wal provided via CopyData
info!("importing wal");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
self.flush_cancellable(pgb).await?;
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
pgb.flush().await?;
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
info!("wal import complete");
@@ -806,7 +772,7 @@ impl PageServerHandler {
// switch client to COPYOUT
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
self.flush_cancellable(pgb).await?;
pgb.flush().await?;
// Send a tarball of the latest layer on the timeline. Compress if not
// fullbackup. TODO Compress in that case too (tests need to be updated)
@@ -858,7 +824,7 @@ impl PageServerHandler {
}
pgb.write_message_noflush(&BeMessage::CopyDone)?;
self.flush_cancellable(pgb).await?;
pgb.flush().await?;
let basebackup_after = started
.elapsed()

View File

@@ -45,7 +45,6 @@ use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::config::AttachedLocationConfig;
use self::config::AttachmentMode;
use self::config::LocationConf;
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
@@ -209,7 +208,7 @@ pub struct Tenant {
/// The remote storage generation, used to protect S3 objects from split-brain.
/// Does not change over the lifetime of the [`Tenant`] object.
///
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
generation: Generation,
@@ -407,8 +406,6 @@ pub enum CreateTimelineError {
AlreadyExists,
#[error(transparent)]
AncestorLsn(anyhow::Error),
#[error("ancestor timeline is not active")]
AncestorNotActive,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
@@ -1590,12 +1587,6 @@ impl Tenant {
.get_timeline(ancestor_timeline_id, false)
.context("Cannot branch off the timeline that's not present in pageserver")?;
// instead of waiting around, just deny the request because ancestor is not yet
// ready for other purposes either.
if !ancestor_timeline.is_active() {
return Err(CreateTimelineError::AncestorNotActive);
}
if let Some(lsn) = ancestor_start_lsn.as_mut() {
*lsn = lsn.align();
@@ -1628,6 +1619,8 @@ impl Tenant {
}
};
loaded_timeline.activate(broker_client, None, ctx);
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
// Ok, the timeline is durable in remote storage.
@@ -1639,8 +1632,6 @@ impl Tenant {
})?;
}
loaded_timeline.activate(broker_client, None, ctx);
Ok(loaded_timeline)
}
@@ -2077,15 +2068,6 @@ impl Tenant {
}
}
}
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
self.tenant_conf
.read()
.unwrap()
.location
.attach_mode
.clone()
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
@@ -2755,11 +2737,6 @@ impl Tenant {
) -> Result<Arc<Timeline>, CreateTimelineError> {
let src_id = src_timeline.timeline_id;
// First acquire the GC lock so that another task cannot advance the GC
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
// creating the branch.
let _gc_cs = self.gc_cs.lock().await;
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
let start_lsn = start_lsn.unwrap_or_else(|| {
let lsn = src_timeline.get_last_record_lsn();
@@ -2767,6 +2744,11 @@ impl Tenant {
lsn
});
// First acquire the GC lock so that another task cannot advance the GC
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
// creating the branch.
let _gc_cs = self.gc_cs.lock().await;
// Create a placeholder for the new branch. This will error
// out if the new timeline ID is already in use.
let timeline_uninit_mark = {
@@ -3448,8 +3430,11 @@ pub mod harness {
use crate::deletion_queue::mock::MockDeletionQueue;
use crate::{
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
walredo::WalRedoManager,
config::PageServerConf,
repository::Key,
tenant::Tenant,
walrecord::NeonWalRecord,
walredo::{WalRedoError, WalRedoManager},
};
use super::*;
@@ -3622,7 +3607,7 @@ pub mod harness {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, WalRedoError> {
let s = format!(
"redo for {} to get to {}, with {} and {} records",
key,

View File

@@ -31,7 +31,7 @@ use super::{
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;
#[derive(Debug, thiserror::Error)]
pub(crate) enum DeleteTenantError {
pub enum DeleteTenantError {
#[error("GetTenant {0}")]
Get(#[from] GetTenantError),
@@ -376,7 +376,7 @@ impl DeleteTenantFlow {
Ok(())
}
pub(crate) async fn should_resume_deletion(
pub async fn should_resume_deletion(
conf: &'static PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
tenant: &Tenant,

View File

@@ -24,7 +24,7 @@ use crate::control_plane_client::{
};
use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{
create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState,
@@ -50,7 +50,7 @@ use super::TenantSharedResources;
/// its lifetime, and we can preserve some important safety invariants like `Tenant` always
/// having a properly acquired generation (Secondary doesn't need a generation)
#[derive(Clone)]
pub(crate) enum TenantSlot {
pub enum TenantSlot {
Attached(Arc<Tenant>),
Secondary,
}
@@ -206,7 +206,8 @@ async fn init_load_generations(
if resources.remote_storage.is_some() {
resources
.deletion_queue_client
.recover(generations.clone())?;
.recover(generations.clone())
.await?;
}
Ok(Some(generations))
@@ -481,7 +482,7 @@ pub(crate) fn schedule_local_tenant_processing(
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
#[instrument(skip_all)]
pub(crate) async fn shutdown_all_tenants() {
pub async fn shutdown_all_tenants() {
shutdown_all_tenants0(&TENANTS).await
}
@@ -593,7 +594,7 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
// caller will log how long we took
}
pub(crate) async fn create_tenant(
pub async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
@@ -628,14 +629,14 @@ pub(crate) async fn create_tenant(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum SetNewTenantConfigError {
pub enum SetNewTenantConfigError {
#[error(transparent)]
GetTenant(#[from] GetTenantError),
#[error(transparent)]
Persist(anyhow::Error),
}
pub(crate) async fn set_new_tenant_config(
pub async fn set_new_tenant_config(
conf: &'static PageServerConf,
new_tenant_conf: TenantConfOpt,
tenant_id: TenantId,
@@ -694,18 +695,6 @@ pub(crate) async fn upsert_location(
if let Some(tenant) = shutdown_tenant {
let (_guard, progress) = utils::completion::channel();
match tenant.get_attach_mode() {
AttachmentMode::Single | AttachmentMode::Multi => {
// Before we leave our state as the presumed holder of the latest generation,
// flush any outstanding deletions to reduce the risk of leaking objects.
deletion_queue_client.flush_advisory()
}
AttachmentMode::Stale => {
// If we're stale there's not point trying to flush deletions
}
};
info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
Ok(()) => {}
@@ -776,7 +765,7 @@ pub(crate) async fn upsert_location(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum GetTenantError {
pub enum GetTenantError {
#[error("Tenant {0} not found")]
NotFound(TenantId),
#[error("Tenant {0} is not active")]
@@ -792,7 +781,7 @@ pub(crate) enum GetTenantError {
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
///
/// This method is cancel-safe.
pub(crate) async fn get_tenant(
pub async fn get_tenant(
tenant_id: TenantId,
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
@@ -817,7 +806,7 @@ pub(crate) async fn get_tenant(
}
}
pub(crate) async fn delete_tenant(
pub async fn delete_tenant(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenant_id: TenantId,
@@ -826,7 +815,7 @@ pub(crate) async fn delete_tenant(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum DeleteTimelineError {
pub enum DeleteTimelineError {
#[error("Tenant {0}")]
Tenant(#[from] GetTenantError),
@@ -834,7 +823,7 @@ pub(crate) enum DeleteTimelineError {
Timeline(#[from] crate::tenant::DeleteTimelineError),
}
pub(crate) async fn delete_timeline(
pub async fn delete_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
_ctx: &RequestContext,
@@ -845,29 +834,23 @@ pub(crate) async fn delete_timeline(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantStateError {
pub enum TenantStateError {
#[error("Tenant {0} not found")]
NotFound(TenantId),
#[error("Tenant {0} is stopping")]
IsStopping(TenantId),
#[error("Tenant {0} is not active")]
NotActive(TenantId),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
pub(crate) async fn detach_tenant(
pub async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = detach_tenant0(
conf,
&TENANTS,
tenant_id,
detach_ignored,
deletion_queue_client,
)
.await?;
let tmp_path = detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await?;
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
let task_tenant_id = None;
@@ -892,7 +875,6 @@ async fn detach_tenant0(
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
@@ -904,10 +886,6 @@ async fn detach_tenant0(
let removal_result =
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
// Flush pending deletions, so that they have a good chance of passing validation
// before this tenant is potentially re-attached elsewhere.
deletion_queue_client.flush_advisory();
// Ignored tenants are not present in memory and will bail the removal from memory operation.
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {
@@ -924,7 +902,7 @@ async fn detach_tenant0(
removal_result
}
pub(crate) async fn load_tenant(
pub async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
@@ -961,7 +939,7 @@ pub(crate) async fn load_tenant(
Ok(())
}
pub(crate) async fn ignore_tenant(
pub async fn ignore_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> Result<(), TenantStateError> {
@@ -989,7 +967,7 @@ async fn ignore_tenant0(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantMapListError {
pub enum TenantMapListError {
#[error("tenant map is still initiailizing")]
Initializing,
}
@@ -997,7 +975,7 @@ pub(crate) enum TenantMapListError {
///
/// Get list of tenants, for the mgmt API
///
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
let tenants = TENANTS.read().await;
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
@@ -1015,7 +993,7 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
///
/// Downloading all the tenant data is performed in the background, this merely
/// spawns the background task and returns quickly.
pub(crate) async fn attach_tenant(
pub async fn attach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
@@ -1052,7 +1030,7 @@ pub(crate) async fn attach_tenant(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantMapInsertError {
pub enum TenantMapInsertError {
#[error("tenant map is still initializing")]
StillInitializing,
#[error("tenant map is shutting down")]
@@ -1215,7 +1193,7 @@ use {
utils::http::error::ApiError,
};
pub(crate) async fn immediate_gc(
pub async fn immediate_gc(
tenant_id: TenantId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,

View File

@@ -31,7 +31,6 @@ pub(super) async fn upload_index_part<'a>(
fail_point!("before-upload-index", |_| {
bail!("failpoint before-upload-index")
});
pausable_failpoint!("before-upload-index-pausable");
let index_part_bytes =
serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;

View File

@@ -370,7 +370,7 @@ pub enum PageReconstructError {
/// An error happened replaying WAL records
#[error(transparent)]
WalRedo(anyhow::Error),
WalRedo(#[from] crate::walredo::WalRedoError),
}
impl std::fmt::Debug for PageReconstructError {
@@ -4327,7 +4327,7 @@ impl Timeline {
let img = match self
.walredo_mgr
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
.context("Failed to reconstruct a page image")
.context("Failed to reconstruct a page image:")
{
Ok(img) => img,
Err(e) => return Err(PageReconstructError::from(e)),

View File

@@ -18,7 +18,6 @@
//! any WAL records, so that even if an attacker hijacks the Postgres
//! process, he cannot escape out of it.
//!
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
@@ -90,7 +89,7 @@ pub trait WalRedoManager: Send + Sync {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> anyhow::Result<Bytes>;
) -> Result<Bytes, WalRedoError>;
}
struct ProcessInput {
@@ -101,14 +100,6 @@ struct ProcessInput {
n_requests: usize,
}
enum ApplyWalRecordsError {
WithRequestNo {
error: anyhow::Error,
request_no: usize,
},
NoRequestNo(anyhow::Error),
}
struct ProcessOutput {
stdout: ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
@@ -149,6 +140,20 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
}
}
/// An error happened in WAL redo
#[derive(Debug, thiserror::Error)]
pub enum WalRedoError {
#[error(transparent)]
IoError(#[from] std::io::Error),
#[error("cannot perform WAL redo now")]
InvalidState,
#[error("cannot perform WAL redo for this request")]
InvalidRequest,
#[error("cannot perform WAL redo for this record")]
InvalidRecord,
}
///
/// Public interface of WAL redo manager
///
@@ -166,9 +171,10 @@ impl WalRedoManager for PostgresRedoManager {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, WalRedoError> {
if records.is_empty() {
anyhow::bail!("invalid WAL redo request with no records");
error!("invalid WAL redo request with no records");
return Err(WalRedoError::InvalidRequest);
}
let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
@@ -180,13 +186,7 @@ impl WalRedoManager for PostgresRedoManager {
if rec_neon != batch_neon {
let result = if batch_neon {
self.apply_batch_neon(
key,
lsn,
img,
&records[batch_start..i],
(batch_start, records.len()),
)
self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
} else {
self.apply_batch_postgres(
key,
@@ -196,7 +196,6 @@ impl WalRedoManager for PostgresRedoManager {
&records[batch_start..i],
self.conf.wal_redo_timeout,
pg_version,
(batch_start, records.len()),
)
};
img = Some(result?);
@@ -207,13 +206,7 @@ impl WalRedoManager for PostgresRedoManager {
}
// last batch
if batch_neon {
self.apply_batch_neon(
key,
lsn,
img,
&records[batch_start..],
(batch_start, records.len()),
)
self.apply_batch_neon(key, lsn, img, &records[batch_start..])
} else {
self.apply_batch_postgres(
key,
@@ -223,7 +216,6 @@ impl WalRedoManager for PostgresRedoManager {
&records[batch_start..],
self.conf.wal_redo_timeout,
pg_version,
(batch_start, records.len()),
)
}
}
@@ -268,9 +260,8 @@ impl PostgresRedoManager {
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
diag: impl std::fmt::Debug,
) -> anyhow::Result<Bytes> {
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
) -> Result<Bytes, WalRedoError> {
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
const MAX_RETRY_ATTEMPTS: u32 = 1;
let start_time = Instant::now();
let mut n_attempts = 0u32;
@@ -280,15 +271,15 @@ impl PostgresRedoManager {
// launch the WAL redo process on first use
if proc.is_none() {
self.launch(&mut proc, pg_version)
.context("launch process")?;
self.launch(&mut proc, pg_version)?;
}
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
let result =
self.apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout);
let result = self
.apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout)
.map_err(WalRedoError::IoError);
let end_time = Instant::now();
let duration = end_time.duration_since(lock_time);
@@ -307,35 +298,26 @@ impl PostgresRedoManager {
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
debug!(
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {} diag={:?}",
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
len,
nbytes,
duration.as_micros(),
lsn,
diag,
lsn
);
// If something went wrong, don't try to reuse the process. Kill it, and
// next request will launch a new one.
if let Err(e) = result.as_ref() {
let (e, request_no) = match e {
ApplyWalRecordsError::WithRequestNo { error, request_no } => {
(error, Some(*request_no))
}
ApplyWalRecordsError::NoRequestNo(e) => (e, None),
};
error!(
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {} n_attempts={} request_no={:?} {:?}: {:?}",
n_attempts,
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}: {}",
records.len(),
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
nbytes,
base_img_lsn,
lsn,
n_attempts,
request_no,
diag,
e,
utils::error::report_compact_sources(e),
);
// self.stdin only holds stdin & stderr as_raw_fd().
// Dropping it as part of take() doesn't close them.
@@ -354,14 +336,11 @@ impl PostgresRedoManager {
proc.child.kill_and_wait();
}
} else if n_attempts != 0 {
info!(n_attempts, ?diag, "retried walredo succeeded");
info!(n_attempts, "retried walredo succeeded");
}
n_attempts += 1;
if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
return result.map_err(|e| match e {
ApplyWalRecordsError::WithRequestNo { error, .. } => error,
ApplyWalRecordsError::NoRequestNo(e) => e,
});
return result;
}
}
}
@@ -375,8 +354,7 @@ impl PostgresRedoManager {
lsn: Lsn,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
diag: impl std::fmt::Debug,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, WalRedoError> {
let start_time = Instant::now();
let mut page = BytesMut::new();
@@ -385,7 +363,8 @@ impl PostgresRedoManager {
page.extend_from_slice(&fpi[..]);
} else {
// All the current WAL record types that we can handle require a base image.
anyhow::bail!("invalid neon WAL redo request with no base image");
error!("invalid neon WAL redo request with no base image");
return Err(WalRedoError::InvalidRequest);
}
// Apply all the WAL records in the batch
@@ -398,11 +377,10 @@ impl PostgresRedoManager {
WAL_REDO_TIME.observe(duration.as_secs_f64());
debug!(
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {} diag={:?}",
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
records.len(),
duration.as_micros(),
lsn,
diag,
lsn
);
Ok(page.freeze())
@@ -414,13 +392,14 @@ impl PostgresRedoManager {
page: &mut BytesMut,
_record_lsn: Lsn,
record: &NeonWalRecord,
) -> anyhow::Result<()> {
) -> Result<(), WalRedoError> {
match record {
NeonWalRecord::Postgres {
will_init: _,
rec: _,
} => {
anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
error!("tried to pass postgres wal record to neon WAL redo");
return Err(WalRedoError::InvalidRequest);
}
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
@@ -428,7 +407,7 @@ impl PostgresRedoManager {
flags,
} => {
// sanity check that this is modifying the correct relation
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert!(
rel.forknum == VISIBILITYMAP_FORKNUM,
"ClearVisibilityMapFlags record on unexpected rel {}",
@@ -466,7 +445,7 @@ impl PostgresRedoManager {
// same effects as the corresponding Postgres WAL redo function.
NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
let (slru_kind, segno, blknum) =
key_to_slru_block(key).context("invalid record")?;
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert_eq!(
slru_kind,
SlruKind::Clog,
@@ -516,7 +495,7 @@ impl PostgresRedoManager {
}
NeonWalRecord::ClogSetAborted { xids } => {
let (slru_kind, segno, blknum) =
key_to_slru_block(key).context("invalid record")?;
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert_eq!(
slru_kind,
SlruKind::Clog,
@@ -547,7 +526,7 @@ impl PostgresRedoManager {
}
NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
let (slru_kind, segno, blknum) =
key_to_slru_block(key).context("invalid record")?;
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert_eq!(
slru_kind,
SlruKind::MultiXactOffsets,
@@ -580,7 +559,7 @@ impl PostgresRedoManager {
}
NeonWalRecord::MultixactMembersCreate { moff, members } => {
let (slru_kind, segno, blknum) =
key_to_slru_block(key).context("invalid record")?;
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert_eq!(
slru_kind,
SlruKind::MultiXactMembers,
@@ -780,7 +759,7 @@ impl PostgresRedoManager {
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> Result<Bytes, ApplyWalRecordsError> {
) -> Result<Bytes, std::io::Error> {
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
@@ -803,9 +782,10 @@ impl PostgresRedoManager {
{
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
"tried to pass neon wal record to postgres WAL redo"
)));
return Err(Error::new(
ErrorKind::Other,
"tried to pass neon wal record to postgres WAL redo",
));
}
}
build_get_page_msg(tag, &mut writebuf);
@@ -827,7 +807,7 @@ impl PostgresRedoManager {
writebuf: &[u8],
mut input: MutexGuard<Option<ProcessInput>>,
wal_redo_timeout: Duration,
) -> Result<Bytes, ApplyWalRecordsError> {
) -> Result<Bytes, std::io::Error> {
let proc = input.as_mut().unwrap();
let mut nwrite = 0usize;
let stdout_fd = proc.stdout_fd;
@@ -848,13 +828,10 @@ impl PostgresRedoManager {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
}?;
if n == 0 {
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
"WAL redo timed out"
)));
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
@@ -863,9 +840,7 @@ impl PostgresRedoManager {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr_guard = self.stderr.lock().unwrap();
let stderr = stderr_guard.as_mut().unwrap();
let len = stderr
.read(&mut errbuf)
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
@@ -880,23 +855,22 @@ impl PostgresRedoManager {
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
"WAL redo process closed its stderr unexpectedly"
)));
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
// If 'stdin' is writeable, do write.
let in_revents = pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc
.stdin
.write(&writebuf[nwrite..])
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
"WAL redo process closed its stdin unexpectedly"
)));
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
));
}
}
let request_no = proc.n_requests;
@@ -927,10 +901,10 @@ impl PostgresRedoManager {
//
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
// That's where we kill the child process.
return Err(ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!("WAL redo process closed its stdout unexpectedly"),
});
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
@@ -946,14 +920,10 @@ impl PostgresRedoManager {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
}?;
if n == 0 {
return Err(ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!("WAL redo timed out"),
});
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
@@ -962,12 +932,7 @@ impl PostgresRedoManager {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr_guard = self.stderr.lock().unwrap();
let stderr = stderr_guard.as_mut().unwrap();
let len = stderr.read(&mut errbuf).map_err(|e| {
ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!(e),
}
})?;
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
@@ -982,26 +947,21 @@ impl PostgresRedoManager {
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!("WAL redo process closed its stderr unexpectedly"),
});
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[2].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..]).map_err(|e| {
ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!(e),
}
})?;
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!("WAL redo process closed its stdout unexpectedly"),
});
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
output

View File

@@ -20,7 +20,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql
DATA = neon--1.0.sql neon--1.0--1.1.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"

View File

@@ -32,11 +32,13 @@
#include "storage/latch.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "utils/builtins.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
#include "storage/fd.h"
#include "storage/pg_shmem.h"
#include "storage/buf_internals.h"
#include "pgstat.h"
/*
* Local file cache is used to temporary store relations pages in local file system.
@@ -65,6 +67,7 @@
typedef struct FileCacheEntry
{
BufferTag key;
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 bitmap[BLOCKS_PER_CHUNK/32];
@@ -76,6 +79,9 @@ typedef struct FileCacheControl
uint64 generation; /* generation is needed to handle correct hash reenabling */
uint32 size; /* size of cache file in chunks */
uint32 used; /* number of used chunks */
uint32 limit; /* shared copy of lfc_size_limit */
uint64 hits;
uint64 misses;
dlist_head lru; /* double linked list for LRU replacement algorithm */
} FileCacheControl;
@@ -91,10 +97,12 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
static shmem_request_hook_type prev_shmem_request_hook;
#endif
void FileCacheMonitorMain(Datum main_arg);
#define LFC_ENABLED() (lfc_ctl->limit != 0)
void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg);
/*
* Local file cache is mandatory and Neon can work without it.
* Local file cache is optional and Neon can work without it.
* In case of any any errors with this cache, we should disable it but to not throw error.
* Also we should allow re-enable it if source of failure (lack of disk space, permissions,...) is fixed.
* All cache content should be invalidated to avoid reading of stale or corrupted data
@@ -102,49 +110,68 @@ void FileCacheMonitorMain(Datum main_arg);
static void
lfc_disable(char const* op)
{
HASH_SEQ_STATUS status;
FileCacheEntry* entry;
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (LFC_ENABLED())
{
HASH_SEQ_STATUS status;
FileCacheEntry* entry;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
hash_search_with_hash_value(lfc_hash, &entry->key, entry->hash, HASH_REMOVE, NULL);
}
lfc_ctl->generation += 1;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
lfc_ctl->limit = 0;
dlist_init(&lfc_ctl->lru);
if (lfc_desc > 0)
{
/* If the reason of error is ENOSPC, then truncation of file may help to reclaim some space */
int rc = ftruncate(lfc_desc, 0);
if (rc < 0)
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
}
}
LWLockRelease(lfc_lock);
if (lfc_desc > 0)
close(lfc_desc);
lfc_desc = -1;
lfc_size_limit = 0;
}
/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL);
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
hash_seq_term(&status);
lfc_ctl->generation += 1;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
dlist_init(&lfc_ctl->lru);
LWLockRelease(lfc_lock);
/*
* This check is done without obtaining lfc_lock, so it is unreliable
*/
static bool
lfc_maybe_disabled(void)
{
return !lfc_ctl || !LFC_ENABLED();
}
static bool
lfc_ensure_opened(void)
{
bool enabled = !lfc_maybe_disabled();
/* Open cache file if not done yet */
if (lfc_desc <= 0)
if (lfc_desc <= 0 && enabled)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
if (lfc_desc < 0) {
lfc_disable("open");
return false;
}
}
return true;
return enabled;
}
static void
@@ -163,6 +190,7 @@ lfc_shmem_startup(void)
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
if (!found)
{
int fd;
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock");
info.keysize = sizeof(BufferTag);
@@ -175,10 +203,22 @@ lfc_shmem_startup(void)
lfc_ctl->generation = 0;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
lfc_ctl->hits = 0;
lfc_ctl->misses = 0;
dlist_init(&lfc_ctl->lru);
/* Remove file cache on restart */
(void)unlink(lfc_path);
/* Recreate file cache on restart */
fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC);
if (fd < 0)
{
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
lfc_ctl->limit = 0;
}
else
{
close(fd);
lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit);
}
}
LWLockRelease(AddinShmemInitLock);
}
@@ -195,6 +235,17 @@ lfc_shmem_request(void)
RequestNamedLWLockTranche("lfc_lock", 1);
}
static bool
is_normal_backend(void)
{
/*
* Stats collector detach shared memory, so we should not try to access shared memory here.
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
*/
return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker();
}
static bool
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
{
@@ -210,25 +261,15 @@ static void
lfc_change_limit_hook(int newval, void *extra)
{
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
/*
* Stats collector detach shared memory, so we should not try to access shared memory here.
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
*/
if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker())
if (!is_normal_backend())
return;
if (!lfc_ensure_opened())
return;
/* Open cache file if not done yet */
if (lfc_desc <= 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
lfc_size_limit = 0; /* disable file cache */
return;
}
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
{
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
@@ -238,10 +279,12 @@ lfc_change_limit_hook(int newval, void *extra)
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
elog(LOG, "Failed to punch hole in file: %m");
#endif
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
lfc_ctl->used -= 1;
}
lfc_ctl->limit = new_size;
elog(DEBUG1, "set local file cache limit to %d", new_size);
LWLockRelease(lfc_lock);
}
@@ -255,6 +298,7 @@ lfc_init(void)
if (!process_shared_preload_libraries_in_progress)
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
DefineCustomIntVariable("neon.max_file_cache_size",
"Maximal size of Neon local file cache",
NULL,
@@ -315,10 +359,10 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
BufferTag tag;
FileCacheEntry* entry;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
bool found;
bool found = false;
uint32 hash;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false;
CopyNRelFileInfoToBufTag(tag, rinfo);
@@ -327,8 +371,11 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_SHARED);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
if (LFC_ENABLED())
{
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
}
LWLockRelease(lfc_lock);
return found;
}
@@ -345,7 +392,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;
CopyNRelFileInfoToBufTag(tag, rinfo);
@@ -355,6 +402,13 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
if (!found)
@@ -405,7 +459,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
/*
* Try to read page from local cache.
* Returns true if page is found in local cache.
* In case of error lfc_size_limit is set to zero to disable any further opera-tins with cache.
* In case of error local file cache is disabled (lfc->limit is set to zero).
*/
bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
@@ -420,7 +474,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
uint64 generation;
uint32 entry_offset;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false;
if (!lfc_ensure_opened())
@@ -432,10 +486,19 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return false;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
{
/* Page is not cached */
lfc_ctl->misses += 1;
pgBufferUsage.file_cache.misses += 1;
LWLockRelease(lfc_lock);
return false;
}
@@ -456,8 +519,12 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/* Place entry to the head of LRU list */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (lfc_ctl->generation == generation)
{
Assert(LFC_ENABLED());
lfc_ctl->hits += 1;
pgBufferUsage.file_cache.hits += 1;
Assert(entry->access_count > 0);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
@@ -489,7 +556,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;
if (!lfc_ensure_opened())
@@ -497,12 +564,17 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
CopyNRelFileInfoToBufTag(tag, rinfo);
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
if (found)
@@ -521,13 +593,13 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* there are should be very large number of concurrent IO operations and them are limited by max_connections,
* we prefer not to complicate code and use second approach.
*/
if (lfc_ctl->used >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru))
if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
Assert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
elog(DEBUG2, "Swap file cache page");
}
else
@@ -536,6 +608,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */
}
entry->access_count = 1;
entry->hash = hash;
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
@@ -557,6 +630,104 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
}
typedef struct
{
TupleDesc tupdesc;
} NeonGetStatsCtx;
#define NUM_NEON_GET_STATS_COLS 2
#define NUM_NEON_GET_STATS_ROWS 3
PG_FUNCTION_INFO_V1(neon_get_stats);
Datum
neon_get_stats(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
NeonGetStatsCtx* fctx;
MemoryContext oldcontext;
TupleDesc tupledesc;
Datum result;
HeapTuple tuple;
char const* key;
uint64 value;
Datum values[NUM_NEON_GET_STATS_COLS];
bool nulls[NUM_NEON_GET_STATS_COLS];
if (SRF_IS_FIRSTCALL())
{
funcctx = SRF_FIRSTCALL_INIT();
/* Switch context when allocating stuff to be used in later calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* Create a user function context for cross-call persistence */
fctx = (NeonGetStatsCtx*) palloc(sizeof(NeonGetStatsCtx));
/* Construct a tuple descriptor for the result rows. */
tupledesc = CreateTemplateTupleDesc(NUM_NEON_GET_STATS_COLS);
TupleDescInitEntry(tupledesc, (AttrNumber) 1, "ns_key",
TEXTOID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 2, "ns_value",
TEXTOID, -1, 0);
fctx->tupdesc = BlessTupleDesc(tupledesc);
funcctx->max_calls = NUM_NEON_GET_STATS_ROWS;
funcctx->user_fctx = fctx;
/* Return to original context when allocating transient memory */
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
/* Get the saved state */
fctx = (NeonGetStatsCtx*) funcctx->user_fctx;
switch (funcctx->call_cntr)
{
case 0:
key = "file_cache_misses";
if (lfc_ctl)
value = lfc_ctl->misses;
break;
case 1:
key = "file_cache_hits";
if (lfc_ctl)
value = lfc_ctl->hits;
break;
case 2:
key = "file_cache_used";
if (lfc_ctl)
value = lfc_ctl->used;
break;
default:
SRF_RETURN_DONE(funcctx);
}
values[0] = PointerGetDatum(cstring_to_text(key));
nulls[0] = false;
if (lfc_ctl)
{
char buf[64];
snprintf(buf, sizeof buf, "%llu", (long long)value);
nulls[1] = false;
values[1] = PointerGetDatum(cstring_to_text(buf));
}
else
nulls[1] = true;
tuple = heap_form_tuple(fctx->tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcctx, result);
}
/*
* Function returning data from the local file cache
* relation node/tablespace/database/blocknum and access_counter
*/
PG_FUNCTION_INFO_V1(local_cache_pages);
/*
* Record structure holding the to be exposed cache data.
*/
@@ -580,11 +751,6 @@ typedef struct
LocalCachePagesRec *record;
} LocalCachePagesContext;
/*
* Function returning data from the local file cache
* relation node/tablespace/database/blocknum and access_counter
*/
PG_FUNCTION_INFO_V1(local_cache_pages);
#define NUM_LOCALCACHE_PAGES_ELEM 7
@@ -653,13 +819,15 @@ local_cache_pages(PG_FUNCTION_ARGS)
LWLockAcquire(lfc_lock, LW_SHARED);
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
if (LFC_ENABLED())
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
for (int i = 0; i < BLOCKS_PER_CHUNK/32; i++)
n_pages += pg_popcount32(entry->bitmap[i]);
}
}
hash_seq_term(&status);
fctx->record = (LocalCachePagesRec *)
MemoryContextAllocHuge(CurrentMemoryContext,
sizeof(LocalCachePagesRec) * n_pages);
@@ -671,35 +839,33 @@ local_cache_pages(PG_FUNCTION_ARGS)
/* Return to original context when allocating transient memory */
MemoryContextSwitchTo(oldcontext);
/*
* Scan through all the buffers, saving the relevant fields in the
* fctx->record structure.
*
* We don't hold the partition locks, so we don't get a consistent
* snapshot across all buffers, but we do grab the buffer header
* locks, so the information of each buffer is self-consistent.
*/
n_pages = 0;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
if (n_pages != 0)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
/*
* Scan through all the cache entries, saving the relevant fields in the
* fctx->record structure.
*/
uint32 n = 0;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].forknum = entry->key.forkNum;
fctx->record[n_pages].blocknum = entry->key.blockNum + i;
fctx->record[n_pages].accesscount = entry->access_count;
n_pages += 1;
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
{
fctx->record[n].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].forknum = entry->key.forkNum;
fctx->record[n].blocknum = entry->key.blockNum + i;
fctx->record[n].accesscount = entry->access_count;
n += 1;
}
}
}
Assert(n_pages == n);
}
hash_seq_term(&status);
Assert(n_pages == funcctx->max_calls);
LWLockRelease(lfc_lock);
}

View File

@@ -0,0 +1,10 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.1'" to load this file. \quit
CREATE FUNCTION neon_get_stats()
RETURNS SETOF RECORD
AS 'MODULE_PATHNAME', 'neon_get_stats'
LANGUAGE C PARALLEL SAFE;
-- Create a view for convenient access.
CREATE VIEW neon_stats AS
SELECT P.* FROM neon_get_stats() AS P (ns_key text, ns_value text);

View File

@@ -1,4 +1,4 @@
# neon extension
comment = 'cloud storage for PostgreSQL'
default_version = '1.0'
default_version = '1.1'
module_pathname = '$libdir/neon'

View File

@@ -721,7 +721,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
req.buftag = tag;
Retry:
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &req);
if (entry != NULL)
@@ -858,11 +858,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
if (!page_server->flush())
{
/* Prefetch set is reset in case of error, so we should try to register our request once again */
goto Retry;
}
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
}

View File

@@ -1,6 +1,5 @@
use futures::future::Either;
use proxy::auth;
use proxy::config::HttpConfig;
use proxy::console;
use proxy::http;
use proxy::metrics;
@@ -80,9 +79,6 @@ struct ProxyCliArgs {
/// Allow self-signed certificates for compute nodes (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
allow_self_signed_compute: bool,
/// timeout for http connections
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
sql_over_http_timeout: tokio::time::Duration,
}
#[tokio::main]
@@ -224,15 +220,12 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
auth::BackendType::Link(Cow::Owned(url))
}
};
let http_config = HttpConfig {
sql_over_http_timeout: args.sql_over_http_timeout,
};
let config = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend,
metric_collection,
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
}));
Ok(config)

View File

@@ -13,7 +13,6 @@ pub struct ProxyConfig {
pub auth_backend: auth::BackendType<'static, ()>,
pub metric_collection: Option<MetricCollectionConfig>,
pub allow_self_signed_compute: bool,
pub http_config: HttpConfig,
}
#[derive(Debug)]
@@ -27,10 +26,6 @@ pub struct TlsConfig {
pub common_names: Option<HashSet<String>>,
}
pub struct HttpConfig {
pub sql_over_http_timeout: tokio::time::Duration,
}
impl TlsConfig {
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
self.config.clone()

View File

@@ -89,10 +89,7 @@ pub mod errors {
Self::Console {
status: http::StatusCode::LOCKED,
ref text,
} => {
!text.contains("written data quota exceeded")
&& !text.contains("the limit for current plan reached")
}
} => !text.contains("quota"),
// retry server errors
Self::Console { status, .. } if status.is_server_error() => true,
_ => false,

View File

@@ -20,7 +20,6 @@ use tokio_postgres::AsyncMessage;
use crate::{
auth, console,
metrics::{Ids, MetricCounter, USAGE_METRICS},
proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
};
use crate::{compute, config};
@@ -419,42 +418,36 @@ async fn connect_to_compute_once(
};
tokio::spawn(
async move {
NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc();
scopeguard::defer! {
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();
info!(%session, "changed session");
}
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();
info!(%session, "changed session");
}
loop {
let message = ready!(connection.poll_message(cx));
loop {
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session, "unknown message");
}
Some(Err(e)) => {
error!(%session, "connection error: {}", e);
return Poll::Ready(())
}
None => {
info!("connection closed");
return Poll::Ready(())
}
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session, "unknown message");
}
Some(Err(e)) => {
error!(%session, "connection error: {}", e);
return Poll::Ready(())
}
None => {
info!("connection closed");
return Poll::Ready(())
}
}
}).await
}
}
})
.instrument(span)
);

View File

@@ -24,9 +24,6 @@ use url::Url;
use utils::http::error::ApiError;
use utils::http::json::json_response;
use crate::config::HttpConfig;
use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER};
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
@@ -102,9 +99,9 @@ fn json_array_to_pg_array(value: &Value) -> Result<Option<String>, serde_json::E
// convert to text with escaping
Value::Bool(_) => serde_json::to_string(value).map(Some),
Value::Number(_) => serde_json::to_string(value).map(Some),
Value::Object(_) => serde_json::to_string(value).map(Some),
// here string needs to be escaped, as it is part of the array
Value::Object(_) => json_array_to_pg_array(&Value::String(serde_json::to_string(value)?)),
Value::String(_) => serde_json::to_string(value).map(Some),
// recurse into array
@@ -191,46 +188,28 @@ pub async fn handle(
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
config: &'static HttpConfig,
) -> Result<Response<Body>, ApiError> {
let result = tokio::time::timeout(
config.sql_over_http_timeout,
handle_inner(request, sni_hostname, conn_pool, session_id),
)
.await;
let result = handle_inner(request, sni_hostname, conn_pool, session_id).await;
let mut response = match result {
Ok(r) => match r {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = e.downcast_ref::<tokio_postgres::Error>().and_then(|e| {
e.code()
.map(|s| serde_json::to_value(s.code()).unwrap_or_default())
});
let code = match code {
Some(c) => c,
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
)?
}
},
Err(_) => {
let message = format!(
"HTTP-Connection timed out, execution time exeeded {} seconds",
config.sql_over_http_timeout.as_secs()
},
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
error!(message);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::GATEWAY_TIMEOUT,
json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
)?
}
};
@@ -248,13 +227,6 @@ async fn handle_inner(
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> anyhow::Result<Response<Body>> {
NUM_CONNECTIONS_ACCEPTED_COUNTER
.with_label_values(&["http"])
.inc();
scopeguard::defer! {
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
}
//
// Determine the destination and connection params
//
@@ -613,7 +585,7 @@ fn _pg_array_parse(
}
}
}
'}' if !quote => {
'}' => {
level -= 1;
if level == 0 {
push_checked(&mut entry, &mut entries, elem_type)?;
@@ -697,14 +669,6 @@ mod tests {
"{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned()
)]
);
// array of objects
let json = r#"[{"foo": 1},{"bar": 2}]"#;
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]).unwrap();
assert_eq!(
pg_params,
vec![Some(r#"{"{\"foo\":1}","{\"bar\":2}"}"#.to_owned())]
);
}
#[test]
@@ -832,23 +796,4 @@ mod tests {
json!([[[1, 2, 3], [4, 5, 6]]])
);
}
#[test]
fn test_pg_array_parse_json() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::JSONB).unwrap()
}
assert_eq!(pt(r#"{"{}"}"#), json!([{}]));
assert_eq!(
pt(r#"{"{\"foo\": 1, \"bar\": 2}"}"#),
json!([{"foo": 1, "bar": 2}])
);
assert_eq!(
pt(r#"{"{\"foo\": 1}", "{\"bar\": 2}"}"#),
json!([{"foo": 1}, {"bar": 2}])
);
assert_eq!(
pt(r#"{{"{\"foo\": 1}", "{\"bar\": 2}"}}"#),
json!([[{"foo": 1}, {"bar": 2}]])
);
}
}

View File

@@ -3,10 +3,7 @@ use crate::{
config::ProxyConfig,
error::io_error,
protocol2::{ProxyProtocolAccept, WithClientIp},
proxy::{
handle_client, ClientMode, NUM_CLIENT_CONNECTION_CLOSED_COUNTER,
NUM_CLIENT_CONNECTION_OPENED_COUNTER,
},
proxy::{handle_client, ClientMode},
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
@@ -205,14 +202,7 @@ async fn ws_handler(
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
sql_over_http::handle(
request,
sni_hostname,
conn_pool,
session_id,
&config.http_config,
)
.await
sql_over_http::handle(request, sni_hostname, conn_pool, session_id).await
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
Response::builder()
.header("Allow", "OPTIONS, POST")
@@ -285,25 +275,23 @@ pub async fn task_main(
let conn_pool = conn_pool.clone();
async move {
Ok::<_, Infallible>(MetricService::new(hyper::service::service_fn(
move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = %session_id,
%peer_addr,
))
.await
}
},
)))
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = %session_id,
%peer_addr,
))
.await
}
}))
}
},
);
@@ -315,41 +303,3 @@ pub async fn task_main(
Ok(())
}
struct MetricService<S> {
inner: S,
}
impl<S> MetricService<S> {
fn new(inner: S) -> MetricService<S> {
NUM_CLIENT_CONNECTION_OPENED_COUNTER
.with_label_values(&["http"])
.inc();
MetricService { inner }
}
}
impl<S> Drop for MetricService<S> {
fn drop(&mut self) {
NUM_CLIENT_CONNECTION_CLOSED_COUNTER
.with_label_values(&["http"])
.inc();
}
}
impl<S, ReqBody> hyper::service::Service<Request<ReqBody>> for MetricService<S>
where
S: hyper::service::Service<Request<ReqBody>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
self.inner.call(req)
}
}

View File

@@ -7,7 +7,6 @@ use crate::{
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
http::StatusCode,
metrics::{Ids, USAGE_METRICS},
protocol2::WithClientIp,
stream::{PqStream, Stream},
@@ -39,55 +38,19 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_opened_db_connections_total",
"Number of opened connections to a database.",
&["protocol"],
)
.unwrap()
});
pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_db_connections_total",
"Number of closed connections to a database.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_opened_client_connections_total",
"Number of opened connections from a client.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_client_connections_total",
"Number of closed connections from a client.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_accepted_connections_total",
"Number of client connections accepted.",
"Number of TCP client connections accepted.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_connections_total",
"Number of client connections closed.",
"Number of TCP client connections closed.",
&["protocol"],
)
.unwrap()
@@ -112,15 +75,6 @@ static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});
static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_connection_failures_breakdown",
"Number of wake-up failures (per kind).",
&["retry", "kind"],
)
.unwrap()
});
static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_io_bytes_per_client",
@@ -254,16 +208,12 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
"handling interactive connection from client"
);
let proto = mode.protocol_label();
NUM_CLIENT_CONNECTION_OPENED_COUNTER
.with_label_values(&[proto])
.inc();
// The `closed` counter will increase when this future is destroyed.
NUM_CONNECTIONS_ACCEPTED_COUNTER
.with_label_values(&[proto])
.with_label_values(&[mode.protocol_label()])
.inc();
scopeguard::defer! {
NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc();
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[mode.protocol_label()]).inc();
}
let tls = config.tls_config.as_ref();
@@ -298,7 +248,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
mode.allow_self_signed_compute(config),
);
cancel_map
.with_session(|session| client.connect_to_db(session, mode))
.with_session(|session| client.connect_to_db(session, mode.allow_cleartext()))
.await
}
@@ -447,46 +397,6 @@ impl ConnectMechanism for TcpMechanism<'_> {
}
}
const fn bool_to_str(x: bool) -> &'static str {
if x {
"true"
} else {
"false"
}
}
fn report_error(e: &WakeComputeError, retry: bool) {
use crate::console::errors::ApiError;
let retry = bool_to_str(retry);
let kind = match e {
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
ref text,
}) if text.contains("written data quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
"quota_exceeded"
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
..
}) => "api_console_locked",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
}) => "api_console_bad_request",
WakeComputeError::ApiError(ApiError::Console { status, .. })
if status.is_server_error() =>
{
"api_console_other_server_error"
}
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
};
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
}
/// Try to connect to the compute node, retrying if necessary.
/// This function might update `node_info`, so we take it by `&mut`.
#[tracing::instrument(skip_all)]
@@ -530,12 +440,10 @@ where
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
report_error(&e, false);
return Err(e.into());
}
// failed to wake up but we can continue to retry
Ok(ControlFlow::Continue(e)) => {
report_error(&e, true);
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
}
// successfully woke up a compute node and can break the wakeup loop
@@ -774,7 +682,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
async fn connect_to_db(
self,
session: cancellation::Session<'_>,
mode: ClientMode,
allow_cleartext: bool,
) -> anyhow::Result<()> {
let Self {
mut stream,
@@ -790,7 +698,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
};
let auth_result = match creds
.authenticate(&extra, &mut stream, mode.allow_cleartext())
.authenticate(&extra, &mut stream, allow_cleartext)
.await
{
Ok(auth_result) => auth_result,
@@ -816,14 +724,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
.or_else(|e| stream.throw_error(e))
.await?;
let proto = mode.protocol_label();
NUM_DB_CONNECTIONS_OPENED_COUNTER
.with_label_values(&[proto])
.inc();
scopeguard::defer! {
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
}
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
// Before proxy passing, forward to compute whatever data is left in the
// PqStream input buffer. Normally there is none, but our serverless npm

View File

@@ -374,12 +374,8 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
if conf.http_auth.is_some() {
router = router.middleware(auth_middleware(|request| {
#[allow(clippy::mutable_key_type)]
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
["/v1/status", "/metrics"]
.iter()
.map(|v| v.parse().unwrap())
.collect()
});
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> =
Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect());
if ALLOWLIST_ROUTES.contains(request.uri()) {
None
} else {

View File

@@ -1464,29 +1464,6 @@ class NeonCli(AbstractNeonCli):
return self.raw_cli(args, check_return_code=check_return_code)
def map_branch(
self, name: str, tenant_id: TenantId, timeline_id: TimelineId
) -> "subprocess.CompletedProcess[str]":
"""
Map tenant id and timeline id to a neon_local branch name. They do not have to exist.
Usually needed when creating branches via PageserverHttpClient and not neon_local.
After creating a name mapping, you can use EndpointFactory.create_start
with this registered branch name.
"""
args = [
"mappings",
"map",
"--branch-name",
name,
"--tenant-id",
str(tenant_id),
"--timeline-id",
str(timeline_id),
]
return self.raw_cli(args, check_return_code=True)
def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]":
return self.raw_cli(["start"], check_return_code=check_return_code)

View File

@@ -74,14 +74,11 @@ def wait_until_tenant_state(
for _ in range(iterations):
try:
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
else:
log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"]["slug"] == expected_state:
return tenant
if tenant["state"]["slug"] == "Broken":
raise RuntimeError(f"tenant became Broken, not {expected_state}")
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
time.sleep(period)

View File

@@ -65,7 +65,7 @@ def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_it
def start_single_table_workload(table_id: int):
for _ in range(num_iters):
with env.pg.connect(options="-cstatement_timeout=300s").cursor() as cur:
with env.pg.connect().cursor() as cur:
cur.execute(
f"INSERT INTO t{table_id} SELECT FROM generate_series(1,{new_rows_each_update})"
)

View File

@@ -1,24 +1,14 @@
import random
import threading
import time
from queue import SimpleQueue
from typing import Any, Dict, List, Union
from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
PgBin,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.types import Lsn, TimelineId
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin
from fixtures.types import Lsn
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException
from requests.exceptions import RetryError
# Test branch creation
@@ -138,245 +128,3 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
endpoint1 = env.endpoints.create_start("b1")
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])
def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonEnvBuilder):
"""
Endpoint should not be possible to create because branch has not been uploaded.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
)
env.pageserver.allowed_errors.append(
".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading"
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
initial_branch = "initial_branch"
def start_creating_timeline():
with pytest.raises(RequestException):
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
t = threading.Thread(target=start_creating_timeline)
try:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline)
with pytest.raises(RuntimeError, match="is not active, state: Loading"):
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
finally:
# FIXME: paused uploads bother shutdown
env.pageserver.stop(immediate=True)
t.join()
def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder):
"""
Branch should not be possible to create because ancestor has not been uploaded.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
def start_creating_timeline():
with pytest.raises(RequestException):
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
t = threading.Thread(target=start_creating_timeline)
try:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
branch_id = TimelineId.generate()
with pytest.raises(RetryError, match="too many 503 error responses"):
ps_http.timeline_create(
env.pg_version,
env.initial_tenant,
branch_id,
ancestor_timeline_id=env.initial_timeline,
)
with pytest.raises(
PageserverApiException,
match=f"NotFound: Timeline {env.initial_tenant}/{branch_id} was not found",
):
ps_http.timeline_detail(env.initial_tenant, branch_id)
# important to note that a task might still be in progress to complete
# the work, but will never get to that because we have the pause
# failpoint
finally:
# FIXME: paused uploads bother shutdown
env.pageserver.stop(immediate=True)
t.join()
def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: NeonEnvBuilder):
"""
If the activate only after upload is used, then retries could become competing.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
)
env.pageserver.allowed_errors.append(
".*Error processing HTTP request: InternalServerError\\(Timeline .*/.* already exists in pageserver's memory"
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
def start_creating_timeline():
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
create_root = threading.Thread(target=start_creating_timeline)
branch_id = TimelineId.generate()
queue: SimpleQueue[Union[Dict[Any, Any], Exception]] = SimpleQueue()
barrier = threading.Barrier(3)
def try_branch():
barrier.wait()
barrier.wait()
try:
ret = ps_http.timeline_create(
env.pg_version,
env.initial_tenant,
branch_id,
ancestor_timeline_id=env.initial_timeline,
timeout=5,
)
queue.put(ret)
except Exception as e:
queue.put(e)
threads = [threading.Thread(target=try_branch) for _ in range(2)]
try:
create_root.start()
for t in threads:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
barrier.wait()
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
barrier.wait()
# now both requests race to branch, only one can win because they take gc_cs, Tenant::timelines or marker files
first = queue.get()
second = queue.get()
log.info(first)
log.info(second)
(succeeded, failed) = (first, second) if isinstance(second, Exception) else (second, first)
assert isinstance(failed, Exception)
assert isinstance(succeeded, Dict)
# FIXME: there's probably multiple valid status codes:
# - Timeline 62505b9a9f6b1d29117b1b74eaf07b12/56cd19d3b2dbcc65e9d53ec6ca304f24 already exists
# - whatever 409 response says, but that is a subclass of PageserverApiException
assert isinstance(failed, PageserverApiException)
assert succeeded["state"] == "Active"
finally:
# we might still have the failpoint active
env.pageserver.stop(immediate=True)
# pytest should nag if we leave threads unjoined
for t in threads:
t.join()
create_root.join()
def test_non_uploaded_branch_availability_after_restart(neon_env_builder: NeonEnvBuilder):
"""
Currently before RFC#27 we keep and continue uploading branches which were not successfully uploaded before shutdown.
This test likely duplicates some other test, but it's easier to write one than to make sure there will be a failing test when the rfc is implemented.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
def start_creating_timeline():
with pytest.raises(RequestException):
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
t = threading.Thread(target=start_creating_timeline)
try:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
finally:
# FIXME: paused uploads bother shutdown
env.pageserver.stop(immediate=True)
t.join()
# now without a failpoint
env.pageserver.start()
wait_until_tenant_active(ps_http, env.initial_tenant)
# currently it lives on and will get eventually uploaded, but this will change
detail = ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
assert detail["state"] == "Active"
def wait_until_paused(env: NeonEnv, failpoint: str):
found = False
msg = f"at failpoint {failpoint}"
for _ in range(20):
time.sleep(1)
found = env.pageserver.log_contains(msg) is not None
if found:
break
assert found

View File

@@ -0,0 +1,74 @@
import os
import random
import threading
import time
from typing import List
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import query_scalar
def test_local_file_cache_unlink(neon_simple_env: NeonEnv):
env = neon_simple_env
cache_dir = os.path.join(env.repo_dir, "file_cache")
os.mkdir(cache_dir)
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
endpoint = env.endpoints.create_start(
"test_local_file_cache_unlink",
config_lines=[
"shared_buffers='1MB'",
f"neon.file_cache_path='{cache_dir}/file.cache'",
"neon.max_file_cache_size='64MB'",
"neon.file_cache_size_limit='10MB'",
],
)
cur = endpoint.connect().cursor()
n_rows = 100000
n_threads = 20
n_updates_per_thread = 10000
n_updates_per_connection = 1000
n_total_updates = n_threads * n_updates_per_thread
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
# Start threads that will perform random UPDATEs. Each UPDATE
# increments the counter on the row, so that we can check at the
# end that the sum of all the counters match the number of updates
# performed (plus the initial 1 on each row).
#
# Furthermore, each thread will reconnect between every 1000 updates.
def run_updates():
n_updates_performed = 0
conn = endpoint.connect()
cur = conn.cursor()
for _ in range(n_updates_per_thread):
id = random.randint(1, n_rows)
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
n_updates_performed += 1
if n_updates_performed % n_updates_per_connection == 0:
cur.close()
conn.close()
conn = endpoint.connect()
cur = conn.cursor()
threads: List[threading.Thread] = []
for _i in range(n_threads):
thread = threading.Thread(target=run_updates, args=(), daemon=True)
thread.start()
threads.append(thread)
time.sleep(5)
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
os.rename(cache_dir, new_cache_dir)
for thread in threads:
thread.join()
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows

View File

@@ -10,7 +10,6 @@ of the pageserver are:
"""
import enum
import re
import time
from typing import Optional
@@ -277,28 +276,15 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
assert get_deletion_queue_unexpected_errors(ps_http) == 0
class KeepAttachment(str, enum.Enum):
KEEP = "keep"
LOSE = "lose"
class ValidateBefore(str, enum.Enum):
VALIDATE = "validate"
NO_VALIDATE = "no-validate"
@pytest.mark.parametrize("keep_attachment", [KeepAttachment.KEEP, KeepAttachment.LOSE])
@pytest.mark.parametrize("validate_before", [ValidateBefore.VALIDATE, ValidateBefore.NO_VALIDATE])
@pytest.mark.parametrize("keep_attachment", [True, False])
@pytest.mark.parametrize("validate_before", [True, False])
def test_deletion_queue_recovery(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
keep_attachment: KeepAttachment,
validate_before: ValidateBefore,
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, keep_attachment: bool, validate_before: bool
):
"""
:param keep_attachment: whether to re-attach after restart. Else, we act as if some other
:param keep_attachment: If true, we re-attach after restart. Else, we act as if some other
node took the attachment while we were restarting.
:param validate_before: whether to wait for deletions to be validated before restart. This
:param validate_before: If true, we wait for deletions to be validated before restart. This
makes them elegible to be executed after restart, if the same node keeps the attachment.
"""
neon_env_builder.enable_generations = True
@@ -314,7 +300,7 @@ def test_deletion_queue_recovery(
("deletion-queue-before-execute", "return"),
]
if validate_before == ValidateBefore.NO_VALIDATE:
if not validate_before:
failpoints.append(
# Prevent deletion lists from being validated, we will test that they are
# dropped properly during recovery. 'pause' is okay here because we kill
@@ -334,25 +320,20 @@ def test_deletion_queue_recovery(
assert get_deletion_queue_unexpected_errors(ps_http) == 0
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
if validate_before == ValidateBefore.VALIDATE:
if validate_before:
def assert_validation_complete():
assert get_deletion_queue_submitted(ps_http) == get_deletion_queue_validated(ps_http)
wait_until(20, 1, assert_validation_complete)
# The validatated keys statistic advances before the header is written, so we
# also wait to see the header hit the disk: this seems paranoid but the race
# can really happen on a heavily overloaded test machine.
def assert_header_written():
assert (env.pageserver.workdir / "deletion" / "header-01").exists()
wait_until(20, 1, assert_header_written)
# A short wait to let the DeletionHeader get written out, as this happens after
# the validated count gets incremented.
time.sleep(1)
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
env.pageserver.stop(immediate=True)
if keep_attachment == KeepAttachment.LOSE:
if not keep_attachment:
some_other_pageserver = 101010
assert env.attachment_service is not None
env.attachment_service.attach_hook(env.initial_tenant, some_other_pageserver)
@@ -371,7 +352,7 @@ def test_deletion_queue_recovery(
ps_http.deletion_queue_flush(execute=True)
wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0))
if keep_attachment == KeepAttachment.KEEP or validate_before == ValidateBefore.VALIDATE:
if keep_attachment or validate_before:
# - If we kept the attachment, then our pre-restart deletions should execute
# because on re-attach they were from the immediately preceding generation
# - If we validated before restart, then the deletions should execute because the

View File

@@ -17,8 +17,6 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
n_restarts = 10
scale = 10
env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down")
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])

View File

@@ -188,7 +188,7 @@ def test_sql_over_http(static_proxy: NeonProxy):
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
verify=str(static_proxy.test_output_dir / "proxy.crt"),
)
assert response.status_code == 200, response.text
assert response.status_code == 200
return response.json()
rows = q("select 42 as answer")["rows"]
@@ -206,12 +206,6 @@ def test_sql_over_http(static_proxy: NeonProxy):
rows = q("select $1::json->'a' as answer", [{"a": {"b": 42}}])["rows"]
assert rows == [{"answer": {"b": 42}}]
rows = q("select $1::jsonb[] as answer", [[{}]])["rows"]
assert rows == [{"answer": [{}]}]
rows = q("select $1::jsonb[] as answer", [[{"foo": 1}, {"bar": 2}]])["rows"]
assert rows == [{"answer": [{"foo": 1}, {"bar": 2}]}]
rows = q("select * from pg_class limit 1")["rows"]
assert len(rows) == 1

View File

@@ -752,9 +752,6 @@ def test_ignore_while_attaching(
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
# An endpoint is starting up concurrently with our detach, it can
# experience RPC failure due to shutdown.
env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down")
data_id = 1
data_secret = "very secret secret"

View File

@@ -6,7 +6,6 @@ from pathlib import Path
from typing import List, Optional
import asyncpg
import pytest
import toml
from fixtures.log_helper import getLogger
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
@@ -598,10 +597,7 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Pat
assert res == expected_sum
# Do inserts while restarting postgres and messing with safekeeper addresses.
# The test takes more than default 5 minutes on Postgres 16,
# see https://github.com/neondatabase/neon/issues/5305
@pytest.mark.timeout(600)
# do inserts while restarting postgres and messing with safekeeper addresses
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "389ce36b4b3da7aa654a25e1b3f10b641319a87f",
"postgres-v15": "74cfe3e681836747a31fdbd47bdd14b3d81b0772",
"postgres-v14": "5d5cfee12783f0989a9c9fe13bb40b5585812568"
"postgres-v16": "e6c1ece2efbf52277565936925ae43599ce51730",
"postgres-v15": "327f4e5e7db1050ce1b79e19f3d27ef1cff7acca",
"postgres-v14": "80932ee1ae0b8e645390546ed104c357cddc7d0c"
}