mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 03:30:36 +00:00
Compare commits
8 Commits
problame/w
...
lfc_fixes2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
773a8836cc | ||
|
|
da34a9ed89 | ||
|
|
e4b74e375c | ||
|
|
c3d1e26361 | ||
|
|
665eff5e08 | ||
|
|
7aeec2bc9d | ||
|
|
bc3a5d571f | ||
|
|
2324e5a3d3 |
@@ -224,8 +224,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
|
||||
FROM build-deps AS vector-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.1.tar.gz -O pgvector.tar.gz && \
|
||||
echo "cc7a8e034a96e30a819911ac79d32f6bc47bdd1aa2de4d7d4904e26b83209dc8 pgvector.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.0.tar.gz -O pgvector.tar.gz && \
|
||||
echo "d8aa3504b215467ca528525a6de12c3f85f9891b091ce0e5864dd8a9b757f77b pgvector.tar.gz" | sha256sum --check && \
|
||||
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -368,8 +368,8 @@ RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar
|
||||
FROM build-deps AS plpgsql-check-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.5.3.tar.gz -O plpgsql_check.tar.gz && \
|
||||
echo "6631ec3e7fb3769eaaf56e3dfedb829aa761abf163d13dba354b4c218508e1c0 plpgsql_check.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.4.0.tar.gz -O plpgsql_check.tar.gz && \
|
||||
echo "9ba58387a279b35a3bfa39ee611e5684e6cddb2ba046ddb2c5190b3bd2ca254a plpgsql_check.tar.gz" | sha256sum --check && \
|
||||
mkdir plpgsql_check-src && cd plpgsql_check-src && tar xvzf ../plpgsql_check.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
|
||||
@@ -116,7 +116,6 @@ fn main() -> Result<()> {
|
||||
"attachment_service" => handle_attachment_service(sub_args, &env),
|
||||
"safekeeper" => handle_safekeeper(sub_args, &env),
|
||||
"endpoint" => handle_endpoint(sub_args, &env),
|
||||
"mappings" => handle_mappings(sub_args, &mut env),
|
||||
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
|
||||
_ => bail!("unexpected subcommand {sub_name}"),
|
||||
};
|
||||
@@ -817,38 +816,6 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
|
||||
let (sub_name, sub_args) = match sub_match.subcommand() {
|
||||
Some(ep_subcommand_data) => ep_subcommand_data,
|
||||
None => bail!("no mappings subcommand provided"),
|
||||
};
|
||||
|
||||
match sub_name {
|
||||
"map" => {
|
||||
let branch_name = sub_args
|
||||
.get_one::<String>("branch-name")
|
||||
.expect("branch-name argument missing");
|
||||
|
||||
let tenant_id = sub_args
|
||||
.get_one::<String>("tenant-id")
|
||||
.map(|x| TenantId::from_str(x))
|
||||
.expect("tenant-id argument missing")
|
||||
.expect("malformed tenant-id arg");
|
||||
|
||||
let timeline_id = sub_args
|
||||
.get_one::<String>("timeline-id")
|
||||
.map(|x| TimelineId::from_str(x))
|
||||
.expect("timeline-id argument missing")
|
||||
.expect("malformed timeline-id arg");
|
||||
|
||||
env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
other => unimplemented!("mappings subcommand {other}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
|
||||
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
|
||||
@@ -1117,7 +1084,6 @@ fn cli() -> Command {
|
||||
// --id, when using a pageserver command
|
||||
let pageserver_id_arg = Arg::new("pageserver-id")
|
||||
.long("id")
|
||||
.global(true)
|
||||
.help("pageserver id")
|
||||
.required(false);
|
||||
// --pageserver-id when using a non-pageserver command
|
||||
@@ -1288,20 +1254,17 @@ fn cli() -> Command {
|
||||
Command::new("pageserver")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage pageserver")
|
||||
.arg(pageserver_id_arg)
|
||||
.subcommand(Command::new("status"))
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start local pageserver")
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
.subcommand(Command::new("stop")
|
||||
.about("Stop local pageserver")
|
||||
.arg(stop_mode_arg.clone())
|
||||
)
|
||||
.subcommand(Command::new("restart")
|
||||
.about("Restart local pageserver")
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.subcommand(Command::new("start").about("Start local pageserver")
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.arg(pageserver_config_args.clone()))
|
||||
.subcommand(Command::new("stop").about("Stop local pageserver")
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.arg(stop_mode_arg.clone()))
|
||||
.subcommand(Command::new("restart").about("Restart local pageserver")
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.arg(pageserver_config_args.clone()))
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("attachment_service")
|
||||
@@ -1358,8 +1321,8 @@ fn cli() -> Command {
|
||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
.arg(branch_name_arg)
|
||||
.arg(timeline_id_arg)
|
||||
.arg(lsn_arg)
|
||||
.arg(pg_port_arg)
|
||||
.arg(http_port_arg)
|
||||
@@ -1372,7 +1335,7 @@ fn cli() -> Command {
|
||||
.subcommand(
|
||||
Command::new("stop")
|
||||
.arg(endpoint_id_arg)
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(tenant_id_arg)
|
||||
.arg(
|
||||
Arg::new("destroy")
|
||||
.help("Also delete data directory (now optional, should be default in future)")
|
||||
@@ -1383,18 +1346,6 @@ fn cli() -> Command {
|
||||
)
|
||||
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("mappings")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage neon_local branch name mappings")
|
||||
.subcommand(
|
||||
Command::new("map")
|
||||
.about("Create new mapping which cannot exist already")
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
)
|
||||
)
|
||||
// Obsolete old name for 'endpoint'. We now just print an error if it's used.
|
||||
.subcommand(
|
||||
Command::new("pg")
|
||||
|
||||
@@ -442,20 +442,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
trace!("got message {:?}", msg);
|
||||
|
||||
let result = self.process_message(handler, msg, &mut query_string).await;
|
||||
tokio::select!(
|
||||
biased;
|
||||
_ = shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
tracing::info!("shutdown request received during response flush");
|
||||
return Ok(())
|
||||
},
|
||||
flush_r = self.flush() => {
|
||||
flush_r?;
|
||||
}
|
||||
);
|
||||
|
||||
self.flush().await?;
|
||||
match result? {
|
||||
ProcessMsgResult::Continue => {
|
||||
self.flush().await?;
|
||||
continue;
|
||||
}
|
||||
ProcessMsgResult::Break => break,
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//! allowing multiple api users to independently work with the same S3 bucket, if
|
||||
//! their bucket prefixes are both specified and different.
|
||||
|
||||
use std::{borrow::Cow, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_config::{
|
||||
@@ -556,20 +556,6 @@ impl RemoteStorage for S3Bucket {
|
||||
.deleted_objects_total
|
||||
.inc_by(chunk.len() as u64);
|
||||
if let Some(errors) = resp.errors {
|
||||
// Log a bounded number of the errors within the response:
|
||||
// these requests can carry 1000 keys so logging each one
|
||||
// would be too verbose, especially as errors may lead us
|
||||
// to retry repeatedly.
|
||||
const LOG_UP_TO_N_ERRORS: usize = 10;
|
||||
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
|
||||
tracing::warn!(
|
||||
"DeleteObjects key {} failed: {}: {}",
|
||||
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
|
||||
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
|
||||
e.message.as_ref().map(Cow::from).unwrap_or("".into())
|
||||
);
|
||||
}
|
||||
|
||||
return Err(anyhow::format_err!(
|
||||
"Failed to delete {} objects",
|
||||
errors.len()
|
||||
|
||||
@@ -119,7 +119,6 @@ pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
|
||||
|
||||
match api_error {
|
||||
ApiError::ResourceUnavailable(_) => info!("Error processing HTTP request: {api_error:#}"),
|
||||
ApiError::NotFound(_) => info!("Error processing HTTP request: {api_error:#}"),
|
||||
ApiError::InternalServerError(_) => error!("Error processing HTTP request: {api_error:?}"),
|
||||
_ => error!("Error processing HTTP request: {api_error:#}"),
|
||||
}
|
||||
|
||||
@@ -11,7 +11,10 @@ use std::sync::{Arc, Barrier};
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use pageserver::{
|
||||
config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager,
|
||||
config::PageServerConf,
|
||||
repository::Key,
|
||||
walrecord::NeonWalRecord,
|
||||
walredo::{PostgresRedoManager, WalRedoError},
|
||||
};
|
||||
use utils::{id::TenantId, lsn::Lsn};
|
||||
|
||||
@@ -149,7 +152,7 @@ impl Drop for JoinOnDrop {
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> anyhow::Result<()>
|
||||
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> Result<(), WalRedoError>
|
||||
where
|
||||
I: IntoIterator<Item = Request>,
|
||||
{
|
||||
@@ -157,7 +160,7 @@ where
|
||||
input.into_iter().try_for_each(|req| {
|
||||
let page = req.execute(manager)?;
|
||||
assert_eq!(page.remaining(), 8192);
|
||||
anyhow::Ok(())
|
||||
Ok::<_, WalRedoError>(())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -470,7 +473,7 @@ struct Request {
|
||||
}
|
||||
|
||||
impl Request {
|
||||
fn execute(self, manager: &PostgresRedoManager) -> anyhow::Result<Bytes> {
|
||||
fn execute(self, manager: &PostgresRedoManager) -> Result<Bytes, WalRedoError> {
|
||||
use pageserver::walredo::WalRedoManager;
|
||||
|
||||
let Request {
|
||||
|
||||
@@ -153,7 +153,7 @@ impl FlushOp {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DeletionQueueClient {
|
||||
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
|
||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||
@@ -212,7 +212,7 @@ where
|
||||
|
||||
/// Files ending with this suffix will be ignored and erased
|
||||
/// during recovery as startup.
|
||||
const TEMP_SUFFIX: &str = "tmp";
|
||||
const TEMP_SUFFIX: &str = ".tmp";
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -416,7 +416,7 @@ pub enum DeletionQueueError {
|
||||
impl DeletionQueueClient {
|
||||
pub(crate) fn broken() -> Self {
|
||||
// Channels whose receivers are immediately dropped.
|
||||
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (tx, _rx) = tokio::sync::mpsc::channel(1);
|
||||
let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
|
||||
Self {
|
||||
tx,
|
||||
@@ -428,12 +428,12 @@ impl DeletionQueueClient {
|
||||
/// This is cancel-safe. If you drop the future before it completes, the message
|
||||
/// is not pushed, although in the context of the deletion queue it doesn't matter: once
|
||||
/// we decide to do a deletion the decision is always final.
|
||||
fn do_push<T>(
|
||||
async fn do_push<T>(
|
||||
&self,
|
||||
queue: &tokio::sync::mpsc::UnboundedSender<T>,
|
||||
queue: &tokio::sync::mpsc::Sender<T>,
|
||||
msg: T,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
match queue.send(msg) {
|
||||
match queue.send(msg).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// This shouldn't happen, we should shut down all tenants before
|
||||
@@ -445,7 +445,7 @@ impl DeletionQueueClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn recover(
|
||||
pub(crate) async fn recover(
|
||||
&self,
|
||||
attached_tenants: HashMap<TenantId, Generation>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
@@ -453,6 +453,7 @@ impl DeletionQueueClient {
|
||||
&self.tx,
|
||||
ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
|
||||
@@ -525,21 +526,6 @@ impl DeletionQueueClient {
|
||||
return self.flush_immediate().await;
|
||||
}
|
||||
|
||||
self.push_layers_sync(tenant_id, timeline_id, current_generation, layers)
|
||||
}
|
||||
|
||||
/// When a Tenant has a generation, push_layers is always synchronous because
|
||||
/// the ListValidator channel is an unbounded channel.
|
||||
///
|
||||
/// This can be merged into push_layers when we remove the Generation-less mode
|
||||
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
|
||||
pub(crate) fn push_layers_sync(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerFileName, Generation)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_submitted
|
||||
.inc_by(layers.len() as u64);
|
||||
@@ -553,16 +539,17 @@ impl DeletionQueueClient {
|
||||
objects: Vec::new(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// This is cancel-safe. If you drop the future the flush may still happen in the background.
|
||||
async fn do_flush<T>(
|
||||
&self,
|
||||
queue: &tokio::sync::mpsc::UnboundedSender<T>,
|
||||
queue: &tokio::sync::mpsc::Sender<T>,
|
||||
msg: T,
|
||||
rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
self.do_push(queue, msg)?;
|
||||
self.do_push(queue, msg).await?;
|
||||
if rx.await.is_err() {
|
||||
// This shouldn't happen if tenants are shut down before deletion queue. If we
|
||||
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
|
||||
@@ -583,18 +570,6 @@ impl DeletionQueueClient {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
|
||||
/// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
|
||||
/// detach where flushing is nice but not necessary.
|
||||
///
|
||||
/// This function provides no guarantees of work being done.
|
||||
pub fn flush_advisory(&self) {
|
||||
let (flush_op, _) = FlushOp::new();
|
||||
|
||||
// Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
|
||||
drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
|
||||
}
|
||||
|
||||
// Wait until all previous deletions are executed
|
||||
pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
|
||||
debug!("flush_execute: flushing to deletion lists...");
|
||||
@@ -611,7 +586,9 @@ impl DeletionQueueClient {
|
||||
// Flush any immediate-mode deletions (the above backend flush will only flush
|
||||
// the executor if deletions had flowed through the backend)
|
||||
debug!("flush_execute: flushing execution...");
|
||||
self.flush_immediate().await?;
|
||||
let (flush_op, rx) = FlushOp::new();
|
||||
self.do_flush(&self.executor_tx, DeleterMessage::Flush(flush_op), rx)
|
||||
.await?;
|
||||
debug!("flush_execute: finished flushing execution...");
|
||||
Ok(())
|
||||
}
|
||||
@@ -666,10 +643,8 @@ impl DeletionQueue {
|
||||
where
|
||||
C: ControlPlaneGenerationsApi + Send + Sync,
|
||||
{
|
||||
// Unbounded channel: enables non-async functions to submit deletions. The actual length is
|
||||
// constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
|
||||
// enough to avoid this taking pathologically large amount of memory.
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
// Deep channel: it consumes deletions from all timelines and we do not want to block them
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(16384);
|
||||
|
||||
// Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
|
||||
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
|
||||
@@ -982,7 +957,7 @@ mod test {
|
||||
// Basic test that the deletion queue processes the deletions we pass into it
|
||||
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
client.recover(HashMap::new())?;
|
||||
client.recover(HashMap::new()).await?;
|
||||
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let tenant_id = ctx.harness.tenant_id;
|
||||
@@ -1050,7 +1025,7 @@ mod test {
|
||||
async fn deletion_queue_validation() -> anyhow::Result<()> {
|
||||
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
client.recover(HashMap::new())?;
|
||||
client.recover(HashMap::new()).await?;
|
||||
|
||||
// Generation that the control plane thinks is current
|
||||
let latest_generation = Generation::new(0xdeadbeef);
|
||||
@@ -1107,7 +1082,7 @@ mod test {
|
||||
// Basic test that the deletion queue processes the deletions we pass into it
|
||||
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
client.recover(HashMap::new())?;
|
||||
client.recover(HashMap::new()).await?;
|
||||
|
||||
let tenant_id = ctx.harness.tenant_id;
|
||||
|
||||
@@ -1170,7 +1145,9 @@ mod test {
|
||||
drop(client);
|
||||
ctx.restart().await;
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
client.recover(HashMap::from([(tenant_id, now_generation)]))?;
|
||||
client
|
||||
.recover(HashMap::from([(tenant_id, now_generation)]))
|
||||
.await?;
|
||||
|
||||
info!("Flush-executing");
|
||||
client.flush_execute().await?;
|
||||
@@ -1196,7 +1173,7 @@ pub(crate) mod mock {
|
||||
};
|
||||
|
||||
pub struct ConsumerState {
|
||||
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
|
||||
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
|
||||
executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
|
||||
}
|
||||
|
||||
@@ -1273,7 +1250,7 @@ pub(crate) mod mock {
|
||||
}
|
||||
|
||||
pub struct MockDeletionQueue {
|
||||
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
executed: Arc<AtomicUsize>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
@@ -1283,7 +1260,7 @@ pub(crate) mod mock {
|
||||
|
||||
impl MockDeletionQueue {
|
||||
pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(16384);
|
||||
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
|
||||
|
||||
let executed = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
@@ -13,7 +13,6 @@ use std::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use utils::backoff;
|
||||
|
||||
use crate::metrics;
|
||||
|
||||
@@ -64,19 +63,7 @@ impl Deleter {
|
||||
Err(anyhow::anyhow!("failpoint hit"))
|
||||
});
|
||||
|
||||
// A backoff::retry is used here for two reasons:
|
||||
// - To provide a backoff rather than busy-polling the API on errors
|
||||
// - To absorb transient 429/503 conditions without hitting our error
|
||||
// logging path for issues deleting objects.
|
||||
backoff::retry(
|
||||
|| async { self.remote_storage.delete_objects(&self.accumulator).await },
|
||||
|_| false,
|
||||
3,
|
||||
10,
|
||||
"executing deletion batch",
|
||||
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")),
|
||||
)
|
||||
.await
|
||||
self.remote_storage.delete_objects(&self.accumulator).await
|
||||
}
|
||||
|
||||
/// Block until everything in accumulator has been executed
|
||||
@@ -101,10 +88,7 @@ impl Deleter {
|
||||
self.accumulator.clear();
|
||||
}
|
||||
Err(e) => {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(DeletionQueueError::ShuttingDown);
|
||||
}
|
||||
warn!("DeleteObjects request failed: {e:#}, will continue trying");
|
||||
warn!("DeleteObjects request failed: {e:#}, will retry");
|
||||
metrics::DELETION_QUEUE
|
||||
.remote_errors
|
||||
.with_label_values(&["execute"])
|
||||
|
||||
@@ -85,7 +85,7 @@ pub(super) struct ListWriter {
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
// Incoming frontend requests to delete some keys
|
||||
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
|
||||
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
|
||||
|
||||
// Outbound requests to the backend to execute deletion lists we have composed.
|
||||
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
|
||||
@@ -111,7 +111,7 @@ impl ListWriter {
|
||||
|
||||
pub(super) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
|
||||
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
@@ -230,7 +230,6 @@ impl ListWriter {
|
||||
let list_name_pattern =
|
||||
Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
|
||||
|
||||
let temp_extension = format!(".{TEMP_SUFFIX}");
|
||||
let header_path = self.conf.deletion_header_path();
|
||||
let mut seqs: Vec<u64> = Vec::new();
|
||||
while let Some(dentry) = dir.next_entry().await? {
|
||||
@@ -242,7 +241,7 @@ impl ListWriter {
|
||||
continue;
|
||||
}
|
||||
|
||||
if dentry_str.ends_with(&temp_extension) {
|
||||
if dentry_str.ends_with(TEMP_SUFFIX) {
|
||||
info!("Cleaning up temporary file {dentry_str}");
|
||||
let absolute_path =
|
||||
deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
|
||||
|
||||
@@ -77,7 +77,7 @@ impl State {
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
) -> anyhow::Result<Self> {
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
@@ -136,7 +136,9 @@ impl From<PageReconstructError> for ApiError {
|
||||
PageReconstructError::AncestorStopping(_) => {
|
||||
ApiError::ResourceUnavailable(format!("{pre}").into())
|
||||
}
|
||||
PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
|
||||
PageReconstructError::WalRedo(pre) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(pre))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -162,6 +164,9 @@ impl From<TenantStateError> for ApiError {
|
||||
fn from(tse: TenantStateError) -> ApiError {
|
||||
match tse {
|
||||
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
|
||||
TenantStateError::NotActive(_) => {
|
||||
ApiError::ResourceUnavailable("Tenant not yet active".into())
|
||||
}
|
||||
TenantStateError::IsStopping(_) => {
|
||||
ApiError::ResourceUnavailable("Tenant is stopping".into())
|
||||
}
|
||||
@@ -391,9 +396,6 @@ async fn timeline_create_handler(
|
||||
format!("{err:#}")
|
||||
))
|
||||
}
|
||||
Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
|
||||
json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
|
||||
}
|
||||
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
|
||||
}
|
||||
}
|
||||
@@ -570,14 +572,9 @@ async fn tenant_detach_handler(
|
||||
|
||||
let state = get_state(&request);
|
||||
let conf = state.conf;
|
||||
mgr::detach_tenant(
|
||||
conf,
|
||||
tenant_id,
|
||||
detach_ignored.unwrap_or(false),
|
||||
&state.deletion_queue_client,
|
||||
)
|
||||
.instrument(info_span!("tenant_detach", %tenant_id))
|
||||
.await?;
|
||||
mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false))
|
||||
.instrument(info_span!("tenant_detach", %tenant_id))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -1034,7 +1031,7 @@ async fn put_tenant_location_config_handler(
|
||||
// The `Detached` state is special, it doesn't upsert a tenant, it removes
|
||||
// its local disk content and drops it from memory.
|
||||
if let LocationConfigMode::Detached = request_data.config.mode {
|
||||
mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
|
||||
mgr::detach_tenant(conf, tenant_id, true)
|
||||
.instrument(info_span!("tenant_detach", %tenant_id))
|
||||
.await?;
|
||||
return json_response(StatusCode::OK, ());
|
||||
|
||||
@@ -35,7 +35,6 @@ use std::time::Duration;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::io::StreamReader;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::field;
|
||||
use tracing::*;
|
||||
use utils::id::ConnectionId;
|
||||
@@ -65,6 +64,69 @@ use crate::trace::Tracer;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
fn copyin_stream<IO>(pgb: &mut PostgresBackend<IO>) -> impl Stream<Item = io::Result<Bytes>> + '_
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
async_stream::try_stream! {
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
let msg = "pageserver is shutting down";
|
||||
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
|
||||
Err(QueryError::Other(anyhow::anyhow!(msg)))
|
||||
}
|
||||
|
||||
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
|
||||
};
|
||||
|
||||
match msg {
|
||||
Ok(Some(message)) => {
|
||||
let copy_data_bytes = match message {
|
||||
FeMessage::CopyData(bytes) => bytes,
|
||||
FeMessage::CopyDone => { break },
|
||||
FeMessage::Sync => continue,
|
||||
FeMessage::Terminate => {
|
||||
let msg = "client terminated connection with Terminate message during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
break;
|
||||
}
|
||||
m => {
|
||||
let msg = format!("unexpected message {m:?}");
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::Other, msg))?;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
yield copy_data_bytes;
|
||||
}
|
||||
Ok(None) => {
|
||||
let msg = "client closed connection during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
pgb.flush().await?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
}
|
||||
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
|
||||
Err(io_error)?;
|
||||
}
|
||||
Err(other) => {
|
||||
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Read the end of a tar archive.
|
||||
///
|
||||
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
|
||||
@@ -222,13 +284,7 @@ async fn page_service_conn_main(
|
||||
// and create a child per-query context when it invokes process_query.
|
||||
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
|
||||
// and create the per-query context in process_query ourselves.
|
||||
let mut conn_handler = PageServerHandler::new(
|
||||
conf,
|
||||
broker_client,
|
||||
auth,
|
||||
connection_ctx,
|
||||
task_mgr::shutdown_token(),
|
||||
);
|
||||
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
|
||||
match pgbackend
|
||||
@@ -262,10 +318,6 @@ struct PageServerHandler {
|
||||
/// For each query received over the connection,
|
||||
/// `process_query` creates a child context from this one.
|
||||
connection_ctx: RequestContext,
|
||||
|
||||
/// A token that should fire when the tenant transitions from
|
||||
/// attached state, or when the pageserver is shutting down.
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
@@ -274,7 +326,6 @@ impl PageServerHandler {
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
PageServerHandler {
|
||||
_conf: conf,
|
||||
@@ -282,91 +333,6 @@ impl PageServerHandler {
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
cancel,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use
|
||||
/// this rather than naked flush() in order to shut down promptly. Without this, we would
|
||||
/// block shutdown of a tenant if a postgres client was failing to consume bytes we send
|
||||
/// in the flush.
|
||||
async fn flush_cancellable<IO>(&self, pgb: &mut PostgresBackend<IO>) -> Result<(), QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
tokio::select!(
|
||||
flush_r = pgb.flush() => {
|
||||
Ok(flush_r?)
|
||||
},
|
||||
_ = self.cancel.cancelled() => {
|
||||
Err(QueryError::Other(anyhow::anyhow!("Shutting down")))
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
fn copyin_stream<'a, IO>(
|
||||
&'a self,
|
||||
pgb: &'a mut PostgresBackend<IO>,
|
||||
) -> impl Stream<Item = io::Result<Bytes>> + 'a
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
async_stream::try_stream! {
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
let msg = "pageserver is shutting down";
|
||||
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
|
||||
Err(QueryError::Other(anyhow::anyhow!(msg)))
|
||||
}
|
||||
|
||||
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
|
||||
};
|
||||
|
||||
match msg {
|
||||
Ok(Some(message)) => {
|
||||
let copy_data_bytes = match message {
|
||||
FeMessage::CopyData(bytes) => bytes,
|
||||
FeMessage::CopyDone => { break },
|
||||
FeMessage::Sync => continue,
|
||||
FeMessage::Terminate => {
|
||||
let msg = "client terminated connection with Terminate message during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
break;
|
||||
}
|
||||
m => {
|
||||
let msg = format!("unexpected message {m:?}");
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::Other, msg))?;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
yield copy_data_bytes;
|
||||
}
|
||||
Ok(None) => {
|
||||
let msg = "client closed connection during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
self.flush_cancellable(pgb).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
}
|
||||
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
|
||||
Err(io_error)?;
|
||||
}
|
||||
Err(other) => {
|
||||
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -406,7 +372,7 @@ impl PageServerHandler {
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
|
||||
|
||||
@@ -499,7 +465,7 @@ impl PageServerHandler {
|
||||
});
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
pgb.flush().await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -542,9 +508,9 @@ impl PageServerHandler {
|
||||
// Import basebackup provided via CopyData
|
||||
info!("importing basebackup");
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
|
||||
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
|
||||
timeline
|
||||
.import_basebackup_from_tar(
|
||||
&mut copyin_reader,
|
||||
@@ -597,8 +563,8 @@ impl PageServerHandler {
|
||||
// Import wal provided via CopyData
|
||||
info!("importing wal");
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
|
||||
pgb.flush().await?;
|
||||
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
|
||||
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
|
||||
info!("wal import complete");
|
||||
|
||||
@@ -806,7 +772,7 @@ impl PageServerHandler {
|
||||
|
||||
// switch client to COPYOUT
|
||||
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
pgb.flush().await?;
|
||||
|
||||
// Send a tarball of the latest layer on the timeline. Compress if not
|
||||
// fullbackup. TODO Compress in that case too (tests need to be updated)
|
||||
@@ -858,7 +824,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let basebackup_after = started
|
||||
.elapsed()
|
||||
|
||||
@@ -45,7 +45,6 @@ use std::sync::{Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::config::AttachedLocationConfig;
|
||||
use self::config::AttachmentMode;
|
||||
use self::config::LocationConf;
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTenantFlow;
|
||||
@@ -209,7 +208,7 @@ pub struct Tenant {
|
||||
|
||||
/// The remote storage generation, used to protect S3 objects from split-brain.
|
||||
/// Does not change over the lifetime of the [`Tenant`] object.
|
||||
///
|
||||
///
|
||||
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
|
||||
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
|
||||
generation: Generation,
|
||||
@@ -407,8 +406,6 @@ pub enum CreateTimelineError {
|
||||
AlreadyExists,
|
||||
#[error(transparent)]
|
||||
AncestorLsn(anyhow::Error),
|
||||
#[error("ancestor timeline is not active")]
|
||||
AncestorNotActive,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
@@ -1590,12 +1587,6 @@ impl Tenant {
|
||||
.get_timeline(ancestor_timeline_id, false)
|
||||
.context("Cannot branch off the timeline that's not present in pageserver")?;
|
||||
|
||||
// instead of waiting around, just deny the request because ancestor is not yet
|
||||
// ready for other purposes either.
|
||||
if !ancestor_timeline.is_active() {
|
||||
return Err(CreateTimelineError::AncestorNotActive);
|
||||
}
|
||||
|
||||
if let Some(lsn) = ancestor_start_lsn.as_mut() {
|
||||
*lsn = lsn.align();
|
||||
|
||||
@@ -1628,6 +1619,8 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
loaded_timeline.activate(broker_client, None, ctx);
|
||||
|
||||
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
|
||||
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
|
||||
// Ok, the timeline is durable in remote storage.
|
||||
@@ -1639,8 +1632,6 @@ impl Tenant {
|
||||
})?;
|
||||
}
|
||||
|
||||
loaded_timeline.activate(broker_client, None, ctx);
|
||||
|
||||
Ok(loaded_timeline)
|
||||
}
|
||||
|
||||
@@ -2077,15 +2068,6 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
|
||||
self.tenant_conf
|
||||
.read()
|
||||
.unwrap()
|
||||
.location
|
||||
.attach_mode
|
||||
.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
|
||||
@@ -2755,11 +2737,6 @@ impl Tenant {
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let src_id = src_timeline.timeline_id;
|
||||
|
||||
// First acquire the GC lock so that another task cannot advance the GC
|
||||
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
|
||||
// creating the branch.
|
||||
let _gc_cs = self.gc_cs.lock().await;
|
||||
|
||||
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
|
||||
let start_lsn = start_lsn.unwrap_or_else(|| {
|
||||
let lsn = src_timeline.get_last_record_lsn();
|
||||
@@ -2767,6 +2744,11 @@ impl Tenant {
|
||||
lsn
|
||||
});
|
||||
|
||||
// First acquire the GC lock so that another task cannot advance the GC
|
||||
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
|
||||
// creating the branch.
|
||||
let _gc_cs = self.gc_cs.lock().await;
|
||||
|
||||
// Create a placeholder for the new branch. This will error
|
||||
// out if the new timeline ID is already in use.
|
||||
let timeline_uninit_mark = {
|
||||
@@ -3448,8 +3430,11 @@ pub mod harness {
|
||||
|
||||
use crate::deletion_queue::mock::MockDeletionQueue;
|
||||
use crate::{
|
||||
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
|
||||
walredo::WalRedoManager,
|
||||
config::PageServerConf,
|
||||
repository::Key,
|
||||
tenant::Tenant,
|
||||
walrecord::NeonWalRecord,
|
||||
walredo::{WalRedoError, WalRedoManager},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
@@ -3622,7 +3607,7 @@ pub mod harness {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
_pg_version: u32,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
let s = format!(
|
||||
"redo for {} to get to {}, with {} and {} records",
|
||||
key,
|
||||
|
||||
@@ -31,7 +31,7 @@ use super::{
|
||||
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum DeleteTenantError {
|
||||
pub enum DeleteTenantError {
|
||||
#[error("GetTenant {0}")]
|
||||
Get(#[from] GetTenantError),
|
||||
|
||||
@@ -376,7 +376,7 @@ impl DeleteTenantFlow {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn should_resume_deletion(
|
||||
pub async fn should_resume_deletion(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant: &Tenant,
|
||||
|
||||
@@ -24,7 +24,7 @@ use crate::control_plane_client::{
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
|
||||
use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt};
|
||||
use crate::tenant::delete::DeleteTenantFlow;
|
||||
use crate::tenant::{
|
||||
create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState,
|
||||
@@ -50,7 +50,7 @@ use super::TenantSharedResources;
|
||||
/// its lifetime, and we can preserve some important safety invariants like `Tenant` always
|
||||
/// having a properly acquired generation (Secondary doesn't need a generation)
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum TenantSlot {
|
||||
pub enum TenantSlot {
|
||||
Attached(Arc<Tenant>),
|
||||
Secondary,
|
||||
}
|
||||
@@ -206,7 +206,8 @@ async fn init_load_generations(
|
||||
if resources.remote_storage.is_some() {
|
||||
resources
|
||||
.deletion_queue_client
|
||||
.recover(generations.clone())?;
|
||||
.recover(generations.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(Some(generations))
|
||||
@@ -481,7 +482,7 @@ pub(crate) fn schedule_local_tenant_processing(
|
||||
/// management API. For example, it could attach the tenant on a different pageserver.
|
||||
/// We would then be in split-brain once this pageserver restarts.
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn shutdown_all_tenants() {
|
||||
pub async fn shutdown_all_tenants() {
|
||||
shutdown_all_tenants0(&TENANTS).await
|
||||
}
|
||||
|
||||
@@ -593,7 +594,7 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
|
||||
// caller will log how long we took
|
||||
}
|
||||
|
||||
pub(crate) async fn create_tenant(
|
||||
pub async fn create_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
@@ -628,14 +629,14 @@ pub(crate) async fn create_tenant(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum SetNewTenantConfigError {
|
||||
pub enum SetNewTenantConfigError {
|
||||
#[error(transparent)]
|
||||
GetTenant(#[from] GetTenantError),
|
||||
#[error(transparent)]
|
||||
Persist(anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) async fn set_new_tenant_config(
|
||||
pub async fn set_new_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
new_tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
@@ -694,18 +695,6 @@ pub(crate) async fn upsert_location(
|
||||
|
||||
if let Some(tenant) = shutdown_tenant {
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
|
||||
match tenant.get_attach_mode() {
|
||||
AttachmentMode::Single | AttachmentMode::Multi => {
|
||||
// Before we leave our state as the presumed holder of the latest generation,
|
||||
// flush any outstanding deletions to reduce the risk of leaking objects.
|
||||
deletion_queue_client.flush_advisory()
|
||||
}
|
||||
AttachmentMode::Stale => {
|
||||
// If we're stale there's not point trying to flush deletions
|
||||
}
|
||||
};
|
||||
|
||||
info!("Shutting down attached tenant");
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {}
|
||||
@@ -776,7 +765,7 @@ pub(crate) async fn upsert_location(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum GetTenantError {
|
||||
pub enum GetTenantError {
|
||||
#[error("Tenant {0} not found")]
|
||||
NotFound(TenantId),
|
||||
#[error("Tenant {0} is not active")]
|
||||
@@ -792,7 +781,7 @@ pub(crate) enum GetTenantError {
|
||||
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
|
||||
///
|
||||
/// This method is cancel-safe.
|
||||
pub(crate) async fn get_tenant(
|
||||
pub async fn get_tenant(
|
||||
tenant_id: TenantId,
|
||||
active_only: bool,
|
||||
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||
@@ -817,7 +806,7 @@ pub(crate) async fn get_tenant(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_tenant(
|
||||
pub async fn delete_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenant_id: TenantId,
|
||||
@@ -826,7 +815,7 @@ pub(crate) async fn delete_tenant(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum DeleteTimelineError {
|
||||
pub enum DeleteTimelineError {
|
||||
#[error("Tenant {0}")]
|
||||
Tenant(#[from] GetTenantError),
|
||||
|
||||
@@ -834,7 +823,7 @@ pub(crate) enum DeleteTimelineError {
|
||||
Timeline(#[from] crate::tenant::DeleteTimelineError),
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline(
|
||||
pub async fn delete_timeline(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
_ctx: &RequestContext,
|
||||
@@ -845,29 +834,23 @@ pub(crate) async fn delete_timeline(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum TenantStateError {
|
||||
pub enum TenantStateError {
|
||||
#[error("Tenant {0} not found")]
|
||||
NotFound(TenantId),
|
||||
#[error("Tenant {0} is stopping")]
|
||||
IsStopping(TenantId),
|
||||
#[error("Tenant {0} is not active")]
|
||||
NotActive(TenantId),
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) async fn detach_tenant(
|
||||
pub async fn detach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
detach_ignored: bool,
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<(), TenantStateError> {
|
||||
let tmp_path = detach_tenant0(
|
||||
conf,
|
||||
&TENANTS,
|
||||
tenant_id,
|
||||
detach_ignored,
|
||||
deletion_queue_client,
|
||||
)
|
||||
.await?;
|
||||
let tmp_path = detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await?;
|
||||
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
|
||||
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
|
||||
let task_tenant_id = None;
|
||||
@@ -892,7 +875,6 @@ async fn detach_tenant0(
|
||||
tenants: &tokio::sync::RwLock<TenantsMap>,
|
||||
tenant_id: TenantId,
|
||||
detach_ignored: bool,
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<Utf8PathBuf, TenantStateError> {
|
||||
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
|
||||
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
|
||||
@@ -904,10 +886,6 @@ async fn detach_tenant0(
|
||||
let removal_result =
|
||||
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
|
||||
|
||||
// Flush pending deletions, so that they have a good chance of passing validation
|
||||
// before this tenant is potentially re-attached elsewhere.
|
||||
deletion_queue_client.flush_advisory();
|
||||
|
||||
// Ignored tenants are not present in memory and will bail the removal from memory operation.
|
||||
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
|
||||
if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {
|
||||
@@ -924,7 +902,7 @@ async fn detach_tenant0(
|
||||
removal_result
|
||||
}
|
||||
|
||||
pub(crate) async fn load_tenant(
|
||||
pub async fn load_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
@@ -961,7 +939,7 @@ pub(crate) async fn load_tenant(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn ignore_tenant(
|
||||
pub async fn ignore_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), TenantStateError> {
|
||||
@@ -989,7 +967,7 @@ async fn ignore_tenant0(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum TenantMapListError {
|
||||
pub enum TenantMapListError {
|
||||
#[error("tenant map is still initiailizing")]
|
||||
Initializing,
|
||||
}
|
||||
@@ -997,7 +975,7 @@ pub(crate) enum TenantMapListError {
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().await;
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
@@ -1015,7 +993,7 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
|
||||
///
|
||||
/// Downloading all the tenant data is performed in the background, this merely
|
||||
/// spawns the background task and returns quickly.
|
||||
pub(crate) async fn attach_tenant(
|
||||
pub async fn attach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
@@ -1052,7 +1030,7 @@ pub(crate) async fn attach_tenant(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum TenantMapInsertError {
|
||||
pub enum TenantMapInsertError {
|
||||
#[error("tenant map is still initializing")]
|
||||
StillInitializing,
|
||||
#[error("tenant map is shutting down")]
|
||||
@@ -1215,7 +1193,7 @@ use {
|
||||
utils::http::error::ApiError,
|
||||
};
|
||||
|
||||
pub(crate) async fn immediate_gc(
|
||||
pub async fn immediate_gc(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
gc_req: TimelineGcRequest,
|
||||
|
||||
@@ -31,7 +31,6 @@ pub(super) async fn upload_index_part<'a>(
|
||||
fail_point!("before-upload-index", |_| {
|
||||
bail!("failpoint before-upload-index")
|
||||
});
|
||||
pausable_failpoint!("before-upload-index-pausable");
|
||||
|
||||
let index_part_bytes =
|
||||
serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;
|
||||
|
||||
@@ -370,7 +370,7 @@ pub enum PageReconstructError {
|
||||
|
||||
/// An error happened replaying WAL records
|
||||
#[error(transparent)]
|
||||
WalRedo(anyhow::Error),
|
||||
WalRedo(#[from] crate::walredo::WalRedoError),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PageReconstructError {
|
||||
@@ -4327,7 +4327,7 @@ impl Timeline {
|
||||
let img = match self
|
||||
.walredo_mgr
|
||||
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
|
||||
.context("Failed to reconstruct a page image")
|
||||
.context("Failed to reconstruct a page image:")
|
||||
{
|
||||
Ok(img) => img,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
//! any WAL records, so that even if an attacker hijacks the Postgres
|
||||
//! process, he cannot escape out of it.
|
||||
//!
|
||||
use anyhow::Context;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use nix::poll::*;
|
||||
@@ -90,7 +89,7 @@ pub trait WalRedoManager: Send + Sync {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Bytes>;
|
||||
) -> Result<Bytes, WalRedoError>;
|
||||
}
|
||||
|
||||
struct ProcessInput {
|
||||
@@ -101,14 +100,6 @@ struct ProcessInput {
|
||||
n_requests: usize,
|
||||
}
|
||||
|
||||
enum ApplyWalRecordsError {
|
||||
WithRequestNo {
|
||||
error: anyhow::Error,
|
||||
request_no: usize,
|
||||
},
|
||||
NoRequestNo(anyhow::Error),
|
||||
}
|
||||
|
||||
struct ProcessOutput {
|
||||
stdout: ChildStdout,
|
||||
pending_responses: VecDeque<Option<Bytes>>,
|
||||
@@ -149,6 +140,20 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
/// An error happened in WAL redo
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum WalRedoError {
|
||||
#[error(transparent)]
|
||||
IoError(#[from] std::io::Error),
|
||||
|
||||
#[error("cannot perform WAL redo now")]
|
||||
InvalidState,
|
||||
#[error("cannot perform WAL redo for this request")]
|
||||
InvalidRequest,
|
||||
#[error("cannot perform WAL redo for this record")]
|
||||
InvalidRecord,
|
||||
}
|
||||
|
||||
///
|
||||
/// Public interface of WAL redo manager
|
||||
///
|
||||
@@ -166,9 +171,10 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
if records.is_empty() {
|
||||
anyhow::bail!("invalid WAL redo request with no records");
|
||||
error!("invalid WAL redo request with no records");
|
||||
return Err(WalRedoError::InvalidRequest);
|
||||
}
|
||||
|
||||
let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
|
||||
@@ -180,13 +186,7 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
|
||||
if rec_neon != batch_neon {
|
||||
let result = if batch_neon {
|
||||
self.apply_batch_neon(
|
||||
key,
|
||||
lsn,
|
||||
img,
|
||||
&records[batch_start..i],
|
||||
(batch_start, records.len()),
|
||||
)
|
||||
self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
|
||||
} else {
|
||||
self.apply_batch_postgres(
|
||||
key,
|
||||
@@ -196,7 +196,6 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
&records[batch_start..i],
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
(batch_start, records.len()),
|
||||
)
|
||||
};
|
||||
img = Some(result?);
|
||||
@@ -207,13 +206,7 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
}
|
||||
// last batch
|
||||
if batch_neon {
|
||||
self.apply_batch_neon(
|
||||
key,
|
||||
lsn,
|
||||
img,
|
||||
&records[batch_start..],
|
||||
(batch_start, records.len()),
|
||||
)
|
||||
self.apply_batch_neon(key, lsn, img, &records[batch_start..])
|
||||
} else {
|
||||
self.apply_batch_postgres(
|
||||
key,
|
||||
@@ -223,7 +216,6 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
&records[batch_start..],
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
(batch_start, records.len()),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -268,9 +260,8 @@ impl PostgresRedoManager {
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
pg_version: u32,
|
||||
diag: impl std::fmt::Debug,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
const MAX_RETRY_ATTEMPTS: u32 = 1;
|
||||
let start_time = Instant::now();
|
||||
let mut n_attempts = 0u32;
|
||||
@@ -280,15 +271,15 @@ impl PostgresRedoManager {
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
if proc.is_none() {
|
||||
self.launch(&mut proc, pg_version)
|
||||
.context("launch process")?;
|
||||
self.launch(&mut proc, pg_version)?;
|
||||
}
|
||||
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
||||
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
let result =
|
||||
self.apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout);
|
||||
let result = self
|
||||
.apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout)
|
||||
.map_err(WalRedoError::IoError);
|
||||
|
||||
let end_time = Instant::now();
|
||||
let duration = end_time.duration_since(lock_time);
|
||||
@@ -307,35 +298,26 @@ impl PostgresRedoManager {
|
||||
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
|
||||
|
||||
debug!(
|
||||
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {} diag={:?}",
|
||||
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
|
||||
len,
|
||||
nbytes,
|
||||
duration.as_micros(),
|
||||
lsn,
|
||||
diag,
|
||||
lsn
|
||||
);
|
||||
|
||||
// If something went wrong, don't try to reuse the process. Kill it, and
|
||||
// next request will launch a new one.
|
||||
if let Err(e) = result.as_ref() {
|
||||
let (e, request_no) = match e {
|
||||
ApplyWalRecordsError::WithRequestNo { error, request_no } => {
|
||||
(error, Some(*request_no))
|
||||
}
|
||||
ApplyWalRecordsError::NoRequestNo(e) => (e, None),
|
||||
};
|
||||
error!(
|
||||
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {} n_attempts={} request_no={:?} {:?}: {:?}",
|
||||
n_attempts,
|
||||
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}: {}",
|
||||
records.len(),
|
||||
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||
nbytes,
|
||||
base_img_lsn,
|
||||
lsn,
|
||||
n_attempts,
|
||||
request_no,
|
||||
diag,
|
||||
e,
|
||||
utils::error::report_compact_sources(e),
|
||||
);
|
||||
// self.stdin only holds stdin & stderr as_raw_fd().
|
||||
// Dropping it as part of take() doesn't close them.
|
||||
@@ -354,14 +336,11 @@ impl PostgresRedoManager {
|
||||
proc.child.kill_and_wait();
|
||||
}
|
||||
} else if n_attempts != 0 {
|
||||
info!(n_attempts, ?diag, "retried walredo succeeded");
|
||||
info!(n_attempts, "retried walredo succeeded");
|
||||
}
|
||||
n_attempts += 1;
|
||||
if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
|
||||
return result.map_err(|e| match e {
|
||||
ApplyWalRecordsError::WithRequestNo { error, .. } => error,
|
||||
ApplyWalRecordsError::NoRequestNo(e) => e,
|
||||
});
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -375,8 +354,7 @@ impl PostgresRedoManager {
|
||||
lsn: Lsn,
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
diag: impl std::fmt::Debug,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut page = BytesMut::new();
|
||||
@@ -385,7 +363,8 @@ impl PostgresRedoManager {
|
||||
page.extend_from_slice(&fpi[..]);
|
||||
} else {
|
||||
// All the current WAL record types that we can handle require a base image.
|
||||
anyhow::bail!("invalid neon WAL redo request with no base image");
|
||||
error!("invalid neon WAL redo request with no base image");
|
||||
return Err(WalRedoError::InvalidRequest);
|
||||
}
|
||||
|
||||
// Apply all the WAL records in the batch
|
||||
@@ -398,11 +377,10 @@ impl PostgresRedoManager {
|
||||
WAL_REDO_TIME.observe(duration.as_secs_f64());
|
||||
|
||||
debug!(
|
||||
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {} diag={:?}",
|
||||
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
|
||||
records.len(),
|
||||
duration.as_micros(),
|
||||
lsn,
|
||||
diag,
|
||||
lsn
|
||||
);
|
||||
|
||||
Ok(page.freeze())
|
||||
@@ -414,13 +392,14 @@ impl PostgresRedoManager {
|
||||
page: &mut BytesMut,
|
||||
_record_lsn: Lsn,
|
||||
record: &NeonWalRecord,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalRedoError> {
|
||||
match record {
|
||||
NeonWalRecord::Postgres {
|
||||
will_init: _,
|
||||
rec: _,
|
||||
} => {
|
||||
anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
|
||||
error!("tried to pass postgres wal record to neon WAL redo");
|
||||
return Err(WalRedoError::InvalidRequest);
|
||||
}
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
@@ -428,7 +407,7 @@ impl PostgresRedoManager {
|
||||
flags,
|
||||
} => {
|
||||
// sanity check that this is modifying the correct relation
|
||||
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
|
||||
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
assert!(
|
||||
rel.forknum == VISIBILITYMAP_FORKNUM,
|
||||
"ClearVisibilityMapFlags record on unexpected rel {}",
|
||||
@@ -466,7 +445,7 @@ impl PostgresRedoManager {
|
||||
// same effects as the corresponding Postgres WAL redo function.
|
||||
NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
|
||||
let (slru_kind, segno, blknum) =
|
||||
key_to_slru_block(key).context("invalid record")?;
|
||||
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
assert_eq!(
|
||||
slru_kind,
|
||||
SlruKind::Clog,
|
||||
@@ -516,7 +495,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
NeonWalRecord::ClogSetAborted { xids } => {
|
||||
let (slru_kind, segno, blknum) =
|
||||
key_to_slru_block(key).context("invalid record")?;
|
||||
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
assert_eq!(
|
||||
slru_kind,
|
||||
SlruKind::Clog,
|
||||
@@ -547,7 +526,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
|
||||
let (slru_kind, segno, blknum) =
|
||||
key_to_slru_block(key).context("invalid record")?;
|
||||
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
assert_eq!(
|
||||
slru_kind,
|
||||
SlruKind::MultiXactOffsets,
|
||||
@@ -580,7 +559,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
NeonWalRecord::MultixactMembersCreate { moff, members } => {
|
||||
let (slru_kind, segno, blknum) =
|
||||
key_to_slru_block(key).context("invalid record")?;
|
||||
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
assert_eq!(
|
||||
slru_kind,
|
||||
SlruKind::MultiXactMembers,
|
||||
@@ -780,7 +759,7 @@ impl PostgresRedoManager {
|
||||
base_img: &Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
) -> Result<Bytes, ApplyWalRecordsError> {
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
//
|
||||
// This could be problematic if there are millions of records to replay,
|
||||
@@ -803,9 +782,10 @@ impl PostgresRedoManager {
|
||||
{
|
||||
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
|
||||
} else {
|
||||
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
|
||||
"tried to pass neon wal record to postgres WAL redo"
|
||||
)));
|
||||
return Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
"tried to pass neon wal record to postgres WAL redo",
|
||||
));
|
||||
}
|
||||
}
|
||||
build_get_page_msg(tag, &mut writebuf);
|
||||
@@ -827,7 +807,7 @@ impl PostgresRedoManager {
|
||||
writebuf: &[u8],
|
||||
mut input: MutexGuard<Option<ProcessInput>>,
|
||||
wal_redo_timeout: Duration,
|
||||
) -> Result<Bytes, ApplyWalRecordsError> {
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
let proc = input.as_mut().unwrap();
|
||||
let mut nwrite = 0usize;
|
||||
let stdout_fd = proc.stdout_fd;
|
||||
@@ -848,13 +828,10 @@ impl PostgresRedoManager {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
res => break res,
|
||||
}
|
||||
}
|
||||
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
|
||||
}?;
|
||||
|
||||
if n == 0 {
|
||||
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
|
||||
"WAL redo timed out"
|
||||
)));
|
||||
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
||||
}
|
||||
|
||||
// If we have some messages in stderr, forward them to the log.
|
||||
@@ -863,9 +840,7 @@ impl PostgresRedoManager {
|
||||
let mut errbuf: [u8; 16384] = [0; 16384];
|
||||
let mut stderr_guard = self.stderr.lock().unwrap();
|
||||
let stderr = stderr_guard.as_mut().unwrap();
|
||||
let len = stderr
|
||||
.read(&mut errbuf)
|
||||
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
|
||||
let len = stderr.read(&mut errbuf)?;
|
||||
|
||||
// The message might not be split correctly into lines here. But this is
|
||||
// good enough, the important thing is to get the message to the log.
|
||||
@@ -880,23 +855,22 @@ impl PostgresRedoManager {
|
||||
continue;
|
||||
}
|
||||
} else if err_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
|
||||
"WAL redo process closed its stderr unexpectedly"
|
||||
)));
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stderr unexpectedly",
|
||||
));
|
||||
}
|
||||
|
||||
// If 'stdin' is writeable, do write.
|
||||
let in_revents = pollfds[0].revents().unwrap();
|
||||
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
|
||||
nwrite += proc
|
||||
.stdin
|
||||
.write(&writebuf[nwrite..])
|
||||
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
|
||||
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
|
||||
} else if in_revents.contains(PollFlags::POLLHUP) {
|
||||
// We still have more data to write, but the process closed the pipe.
|
||||
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
|
||||
"WAL redo process closed its stdin unexpectedly"
|
||||
)));
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdin unexpectedly",
|
||||
));
|
||||
}
|
||||
}
|
||||
let request_no = proc.n_requests;
|
||||
@@ -927,10 +901,10 @@ impl PostgresRedoManager {
|
||||
//
|
||||
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
|
||||
// That's where we kill the child process.
|
||||
return Err(ApplyWalRecordsError::WithRequestNo {
|
||||
request_no,
|
||||
error: anyhow::anyhow!("WAL redo process closed its stdout unexpectedly"),
|
||||
});
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdout unexpectedly",
|
||||
));
|
||||
}
|
||||
let n_processed_responses = output.n_processed_responses;
|
||||
while n_processed_responses + output.pending_responses.len() <= request_no {
|
||||
@@ -946,14 +920,10 @@ impl PostgresRedoManager {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
res => break res,
|
||||
}
|
||||
}
|
||||
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
|
||||
}?;
|
||||
|
||||
if n == 0 {
|
||||
return Err(ApplyWalRecordsError::WithRequestNo {
|
||||
request_no,
|
||||
error: anyhow::anyhow!("WAL redo timed out"),
|
||||
});
|
||||
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
||||
}
|
||||
|
||||
// If we have some messages in stderr, forward them to the log.
|
||||
@@ -962,12 +932,7 @@ impl PostgresRedoManager {
|
||||
let mut errbuf: [u8; 16384] = [0; 16384];
|
||||
let mut stderr_guard = self.stderr.lock().unwrap();
|
||||
let stderr = stderr_guard.as_mut().unwrap();
|
||||
let len = stderr.read(&mut errbuf).map_err(|e| {
|
||||
ApplyWalRecordsError::WithRequestNo {
|
||||
request_no,
|
||||
error: anyhow::anyhow!(e),
|
||||
}
|
||||
})?;
|
||||
let len = stderr.read(&mut errbuf)?;
|
||||
|
||||
// The message might not be split correctly into lines here. But this is
|
||||
// good enough, the important thing is to get the message to the log.
|
||||
@@ -982,26 +947,21 @@ impl PostgresRedoManager {
|
||||
continue;
|
||||
}
|
||||
} else if err_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(ApplyWalRecordsError::WithRequestNo {
|
||||
request_no,
|
||||
error: anyhow::anyhow!("WAL redo process closed its stderr unexpectedly"),
|
||||
});
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stderr unexpectedly",
|
||||
));
|
||||
}
|
||||
|
||||
// If we have some data in stdout, read it to the result buffer.
|
||||
let out_revents = pollfds[2].revents().unwrap();
|
||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
nresult += output.stdout.read(&mut resultbuf[nresult..]).map_err(|e| {
|
||||
ApplyWalRecordsError::WithRequestNo {
|
||||
request_no,
|
||||
error: anyhow::anyhow!(e),
|
||||
}
|
||||
})?;
|
||||
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
|
||||
} else if out_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(ApplyWalRecordsError::WithRequestNo {
|
||||
request_no,
|
||||
error: anyhow::anyhow!("WAL redo process closed its stdout unexpectedly"),
|
||||
});
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdout unexpectedly",
|
||||
));
|
||||
}
|
||||
}
|
||||
output
|
||||
|
||||
@@ -20,7 +20,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql
|
||||
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
||||
|
||||
|
||||
|
||||
@@ -32,11 +32,13 @@
|
||||
#include "storage/latch.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/dynahash.h"
|
||||
#include "utils/guc.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "pgstat.h"
|
||||
|
||||
/*
|
||||
* Local file cache is used to temporary store relations pages in local file system.
|
||||
@@ -65,6 +67,7 @@
|
||||
typedef struct FileCacheEntry
|
||||
{
|
||||
BufferTag key;
|
||||
uint32 hash;
|
||||
uint32 offset;
|
||||
uint32 access_count;
|
||||
uint32 bitmap[BLOCKS_PER_CHUNK/32];
|
||||
@@ -76,6 +79,9 @@ typedef struct FileCacheControl
|
||||
uint64 generation; /* generation is needed to handle correct hash reenabling */
|
||||
uint32 size; /* size of cache file in chunks */
|
||||
uint32 used; /* number of used chunks */
|
||||
uint32 limit; /* shared copy of lfc_size_limit */
|
||||
uint64 hits;
|
||||
uint64 misses;
|
||||
dlist_head lru; /* double linked list for LRU replacement algorithm */
|
||||
} FileCacheControl;
|
||||
|
||||
@@ -91,10 +97,12 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
static shmem_request_hook_type prev_shmem_request_hook;
|
||||
#endif
|
||||
|
||||
void FileCacheMonitorMain(Datum main_arg);
|
||||
#define LFC_ENABLED() (lfc_ctl->limit != 0)
|
||||
|
||||
void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg);
|
||||
|
||||
/*
|
||||
* Local file cache is mandatory and Neon can work without it.
|
||||
* Local file cache is optional and Neon can work without it.
|
||||
* In case of any any errors with this cache, we should disable it but to not throw error.
|
||||
* Also we should allow re-enable it if source of failure (lack of disk space, permissions,...) is fixed.
|
||||
* All cache content should be invalidated to avoid reading of stale or corrupted data
|
||||
@@ -102,49 +110,68 @@ void FileCacheMonitorMain(Datum main_arg);
|
||||
static void
|
||||
lfc_disable(char const* op)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
FileCacheEntry* entry;
|
||||
|
||||
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
||||
|
||||
/* Invalidate hash */
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
FileCacheEntry* entry;
|
||||
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
hash_search_with_hash_value(lfc_hash, &entry->key, entry->hash, HASH_REMOVE, NULL);
|
||||
}
|
||||
lfc_ctl->generation += 1;
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
lfc_ctl->limit = 0;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
|
||||
if (lfc_desc > 0)
|
||||
{
|
||||
/* If the reason of error is ENOSPC, then truncation of file may help to reclaim some space */
|
||||
int rc = ftruncate(lfc_desc, 0);
|
||||
if (rc < 0)
|
||||
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
|
||||
}
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
|
||||
if (lfc_desc > 0)
|
||||
close(lfc_desc);
|
||||
|
||||
lfc_desc = -1;
|
||||
lfc_size_limit = 0;
|
||||
}
|
||||
|
||||
/* Invalidate hash */
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL);
|
||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||
}
|
||||
hash_seq_term(&status);
|
||||
lfc_ctl->generation += 1;
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
/*
|
||||
* This check is done without obtaining lfc_lock, so it is unreliable
|
||||
*/
|
||||
static bool
|
||||
lfc_maybe_disabled(void)
|
||||
{
|
||||
return !lfc_ctl || !LFC_ENABLED();
|
||||
}
|
||||
|
||||
static bool
|
||||
lfc_ensure_opened(void)
|
||||
{
|
||||
bool enabled = !lfc_maybe_disabled();
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc <= 0)
|
||||
if (lfc_desc <= 0 && enabled)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
|
||||
|
||||
if (lfc_desc < 0) {
|
||||
lfc_disable("open");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
return enabled;
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -163,6 +190,7 @@ lfc_shmem_startup(void)
|
||||
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
|
||||
if (!found)
|
||||
{
|
||||
int fd;
|
||||
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
|
||||
lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock");
|
||||
info.keysize = sizeof(BufferTag);
|
||||
@@ -175,10 +203,22 @@ lfc_shmem_startup(void)
|
||||
lfc_ctl->generation = 0;
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
lfc_ctl->hits = 0;
|
||||
lfc_ctl->misses = 0;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
|
||||
/* Remove file cache on restart */
|
||||
(void)unlink(lfc_path);
|
||||
/* Recreate file cache on restart */
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC);
|
||||
if (fd < 0)
|
||||
{
|
||||
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
|
||||
lfc_ctl->limit = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
close(fd);
|
||||
lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit);
|
||||
}
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
}
|
||||
@@ -195,6 +235,17 @@ lfc_shmem_request(void)
|
||||
RequestNamedLWLockTranche("lfc_lock", 1);
|
||||
}
|
||||
|
||||
static bool
|
||||
is_normal_backend(void)
|
||||
{
|
||||
/*
|
||||
* Stats collector detach shared memory, so we should not try to access shared memory here.
|
||||
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
|
||||
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
|
||||
*/
|
||||
return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker();
|
||||
}
|
||||
|
||||
static bool
|
||||
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
|
||||
{
|
||||
@@ -210,25 +261,15 @@ static void
|
||||
lfc_change_limit_hook(int newval, void *extra)
|
||||
{
|
||||
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
|
||||
/*
|
||||
* Stats collector detach shared memory, so we should not try to access shared memory here.
|
||||
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
|
||||
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
|
||||
*/
|
||||
if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker())
|
||||
|
||||
if (!is_normal_backend())
|
||||
return;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
return;
|
||||
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc <= 0)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
||||
if (lfc_desc < 0) {
|
||||
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
|
||||
lfc_size_limit = 0; /* disable file cache */
|
||||
return;
|
||||
}
|
||||
}
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
|
||||
{
|
||||
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
|
||||
@@ -238,10 +279,12 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
|
||||
elog(LOG, "Failed to punch hole in file: %m");
|
||||
#endif
|
||||
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
|
||||
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
|
||||
lfc_ctl->used -= 1;
|
||||
}
|
||||
lfc_ctl->limit = new_size;
|
||||
elog(DEBUG1, "set local file cache limit to %d", new_size);
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
@@ -255,6 +298,7 @@ lfc_init(void)
|
||||
if (!process_shared_preload_libraries_in_progress)
|
||||
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
|
||||
|
||||
|
||||
DefineCustomIntVariable("neon.max_file_cache_size",
|
||||
"Maximal size of Neon local file cache",
|
||||
NULL,
|
||||
@@ -315,10 +359,10 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
BufferTag tag;
|
||||
FileCacheEntry* entry;
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||
bool found;
|
||||
bool found = false;
|
||||
uint32 hash;
|
||||
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return false;
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
@@ -327,8 +371,11 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
return found;
|
||||
}
|
||||
@@ -345,7 +392,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||
uint32 hash;
|
||||
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return;
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
@@ -355,6 +402,13 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (!LFC_ENABLED())
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
|
||||
|
||||
if (!found)
|
||||
@@ -405,7 +459,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
/*
|
||||
* Try to read page from local cache.
|
||||
* Returns true if page is found in local cache.
|
||||
* In case of error lfc_size_limit is set to zero to disable any further opera-tins with cache.
|
||||
* In case of error local file cache is disabled (lfc->limit is set to zero).
|
||||
*/
|
||||
bool
|
||||
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
@@ -420,7 +474,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
uint64 generation;
|
||||
uint32 entry_offset;
|
||||
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return false;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
@@ -432,10 +486,19 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (!LFC_ENABLED())
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return false;
|
||||
}
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
|
||||
{
|
||||
/* Page is not cached */
|
||||
lfc_ctl->misses += 1;
|
||||
pgBufferUsage.file_cache.misses += 1;
|
||||
LWLockRelease(lfc_lock);
|
||||
return false;
|
||||
}
|
||||
@@ -456,8 +519,12 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
/* Place entry to the head of LRU list */
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (lfc_ctl->generation == generation)
|
||||
{
|
||||
Assert(LFC_ENABLED());
|
||||
lfc_ctl->hits += 1;
|
||||
pgBufferUsage.file_cache.hits += 1;
|
||||
Assert(entry->access_count > 0);
|
||||
if (--entry->access_count == 0)
|
||||
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
|
||||
@@ -489,7 +556,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||
uint32 hash;
|
||||
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
@@ -497,12 +564,17 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
tag.forkNum = forkNum;
|
||||
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (!LFC_ENABLED())
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
|
||||
|
||||
if (found)
|
||||
@@ -521,13 +593,13 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
* there are should be very large number of concurrent IO operations and them are limited by max_connections,
|
||||
* we prefer not to complicate code and use second approach.
|
||||
*/
|
||||
if (lfc_ctl->used >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru))
|
||||
if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
|
||||
{
|
||||
/* Cache overflow: evict least recently used chunk */
|
||||
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
|
||||
Assert(victim->access_count == 0);
|
||||
entry->offset = victim->offset; /* grab victim's chunk */
|
||||
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
|
||||
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
|
||||
elog(DEBUG2, "Swap file cache page");
|
||||
}
|
||||
else
|
||||
@@ -536,6 +608,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */
|
||||
}
|
||||
entry->access_count = 1;
|
||||
entry->hash = hash;
|
||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||
}
|
||||
|
||||
@@ -557,6 +630,104 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct
|
||||
{
|
||||
TupleDesc tupdesc;
|
||||
} NeonGetStatsCtx;
|
||||
|
||||
#define NUM_NEON_GET_STATS_COLS 2
|
||||
#define NUM_NEON_GET_STATS_ROWS 3
|
||||
|
||||
PG_FUNCTION_INFO_V1(neon_get_stats);
|
||||
Datum
|
||||
neon_get_stats(PG_FUNCTION_ARGS)
|
||||
{
|
||||
FuncCallContext *funcctx;
|
||||
NeonGetStatsCtx* fctx;
|
||||
MemoryContext oldcontext;
|
||||
TupleDesc tupledesc;
|
||||
Datum result;
|
||||
HeapTuple tuple;
|
||||
char const* key;
|
||||
uint64 value;
|
||||
Datum values[NUM_NEON_GET_STATS_COLS];
|
||||
bool nulls[NUM_NEON_GET_STATS_COLS];
|
||||
|
||||
if (SRF_IS_FIRSTCALL())
|
||||
{
|
||||
funcctx = SRF_FIRSTCALL_INIT();
|
||||
|
||||
/* Switch context when allocating stuff to be used in later calls */
|
||||
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
|
||||
|
||||
/* Create a user function context for cross-call persistence */
|
||||
fctx = (NeonGetStatsCtx*) palloc(sizeof(NeonGetStatsCtx));
|
||||
|
||||
/* Construct a tuple descriptor for the result rows. */
|
||||
tupledesc = CreateTemplateTupleDesc(NUM_NEON_GET_STATS_COLS);
|
||||
|
||||
TupleDescInitEntry(tupledesc, (AttrNumber) 1, "ns_key",
|
||||
TEXTOID, -1, 0);
|
||||
TupleDescInitEntry(tupledesc, (AttrNumber) 2, "ns_value",
|
||||
TEXTOID, -1, 0);
|
||||
|
||||
fctx->tupdesc = BlessTupleDesc(tupledesc);
|
||||
funcctx->max_calls = NUM_NEON_GET_STATS_ROWS;
|
||||
funcctx->user_fctx = fctx;
|
||||
|
||||
/* Return to original context when allocating transient memory */
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
|
||||
funcctx = SRF_PERCALL_SETUP();
|
||||
|
||||
/* Get the saved state */
|
||||
fctx = (NeonGetStatsCtx*) funcctx->user_fctx;
|
||||
|
||||
switch (funcctx->call_cntr)
|
||||
{
|
||||
case 0:
|
||||
key = "file_cache_misses";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->misses;
|
||||
break;
|
||||
case 1:
|
||||
key = "file_cache_hits";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->hits;
|
||||
break;
|
||||
case 2:
|
||||
key = "file_cache_used";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->used;
|
||||
break;
|
||||
default:
|
||||
SRF_RETURN_DONE(funcctx);
|
||||
}
|
||||
values[0] = PointerGetDatum(cstring_to_text(key));
|
||||
nulls[0] = false;
|
||||
if (lfc_ctl)
|
||||
{
|
||||
char buf[64];
|
||||
snprintf(buf, sizeof buf, "%llu", (long long)value);
|
||||
nulls[1] = false;
|
||||
values[1] = PointerGetDatum(cstring_to_text(buf));
|
||||
}
|
||||
else
|
||||
nulls[1] = true;
|
||||
|
||||
tuple = heap_form_tuple(fctx->tupdesc, values, nulls);
|
||||
result = HeapTupleGetDatum(tuple);
|
||||
SRF_RETURN_NEXT(funcctx, result);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Function returning data from the local file cache
|
||||
* relation node/tablespace/database/blocknum and access_counter
|
||||
*/
|
||||
PG_FUNCTION_INFO_V1(local_cache_pages);
|
||||
|
||||
/*
|
||||
* Record structure holding the to be exposed cache data.
|
||||
*/
|
||||
@@ -580,11 +751,6 @@ typedef struct
|
||||
LocalCachePagesRec *record;
|
||||
} LocalCachePagesContext;
|
||||
|
||||
/*
|
||||
* Function returning data from the local file cache
|
||||
* relation node/tablespace/database/blocknum and access_counter
|
||||
*/
|
||||
PG_FUNCTION_INFO_V1(local_cache_pages);
|
||||
|
||||
#define NUM_LOCALCACHE_PAGES_ELEM 7
|
||||
|
||||
@@ -653,13 +819,15 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0;
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK/32; i++)
|
||||
n_pages += pg_popcount32(entry->bitmap[i]);
|
||||
}
|
||||
}
|
||||
hash_seq_term(&status);
|
||||
fctx->record = (LocalCachePagesRec *)
|
||||
MemoryContextAllocHuge(CurrentMemoryContext,
|
||||
sizeof(LocalCachePagesRec) * n_pages);
|
||||
@@ -671,35 +839,33 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
/* Return to original context when allocating transient memory */
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
/*
|
||||
* Scan through all the buffers, saving the relevant fields in the
|
||||
* fctx->record structure.
|
||||
*
|
||||
* We don't hold the partition locks, so we don't get a consistent
|
||||
* snapshot across all buffers, but we do grab the buffer header
|
||||
* locks, so the information of each buffer is self-consistent.
|
||||
*/
|
||||
n_pages = 0;
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
if (n_pages != 0)
|
||||
{
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
/*
|
||||
* Scan through all the cache entries, saving the relevant fields in the
|
||||
* fctx->record structure.
|
||||
*/
|
||||
uint32 n = 0;
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
{
|
||||
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
|
||||
fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n_pages].forknum = entry->key.forkNum;
|
||||
fctx->record[n_pages].blocknum = entry->key.blockNum + i;
|
||||
fctx->record[n_pages].accesscount = entry->access_count;
|
||||
n_pages += 1;
|
||||
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
|
||||
{
|
||||
fctx->record[n].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
|
||||
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n].forknum = entry->key.forkNum;
|
||||
fctx->record[n].blocknum = entry->key.blockNum + i;
|
||||
fctx->record[n].accesscount = entry->access_count;
|
||||
n += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
Assert(n_pages == n);
|
||||
}
|
||||
hash_seq_term(&status);
|
||||
Assert(n_pages == funcctx->max_calls);
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
|
||||
10
pgxn/neon/neon--1.0--1.1.sql
Normal file
10
pgxn/neon/neon--1.0--1.1.sql
Normal file
@@ -0,0 +1,10 @@
|
||||
\echo Use "ALTER EXTENSION neon UPDATE TO '1.1'" to load this file. \quit
|
||||
|
||||
CREATE FUNCTION neon_get_stats()
|
||||
RETURNS SETOF RECORD
|
||||
AS 'MODULE_PATHNAME', 'neon_get_stats'
|
||||
LANGUAGE C PARALLEL SAFE;
|
||||
|
||||
-- Create a view for convenient access.
|
||||
CREATE VIEW neon_stats AS
|
||||
SELECT P.* FROM neon_get_stats() AS P (ns_key text, ns_value text);
|
||||
@@ -1,4 +1,4 @@
|
||||
# neon extension
|
||||
comment = 'cloud storage for PostgreSQL'
|
||||
default_version = '1.0'
|
||||
default_version = '1.1'
|
||||
module_pathname = '$libdir/neon'
|
||||
|
||||
@@ -721,7 +721,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
|
||||
|
||||
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
|
||||
req.buftag = tag;
|
||||
Retry:
|
||||
|
||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &req);
|
||||
|
||||
if (entry != NULL)
|
||||
@@ -858,11 +858,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
|
||||
if (flush_every_n_requests > 0 &&
|
||||
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
|
||||
{
|
||||
if (!page_server->flush())
|
||||
{
|
||||
/* Prefetch set is reset in case of error, so we should try to register our request once again */
|
||||
goto Retry;
|
||||
}
|
||||
page_server->flush();
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use futures::future::Either;
|
||||
use proxy::auth;
|
||||
use proxy::config::HttpConfig;
|
||||
use proxy::console;
|
||||
use proxy::http;
|
||||
use proxy::metrics;
|
||||
@@ -80,9 +79,6 @@ struct ProxyCliArgs {
|
||||
/// Allow self-signed certificates for compute nodes (for testing)
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
allow_self_signed_compute: bool,
|
||||
/// timeout for http connections
|
||||
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
|
||||
sql_over_http_timeout: tokio::time::Duration,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -224,15 +220,12 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
auth::BackendType::Link(Cow::Owned(url))
|
||||
}
|
||||
};
|
||||
let http_config = HttpConfig {
|
||||
sql_over_http_timeout: args.sql_over_http_timeout,
|
||||
};
|
||||
|
||||
let config = Box::leak(Box::new(ProxyConfig {
|
||||
tls_config,
|
||||
auth_backend,
|
||||
metric_collection,
|
||||
allow_self_signed_compute: args.allow_self_signed_compute,
|
||||
http_config,
|
||||
}));
|
||||
|
||||
Ok(config)
|
||||
|
||||
@@ -13,7 +13,6 @@ pub struct ProxyConfig {
|
||||
pub auth_backend: auth::BackendType<'static, ()>,
|
||||
pub metric_collection: Option<MetricCollectionConfig>,
|
||||
pub allow_self_signed_compute: bool,
|
||||
pub http_config: HttpConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -27,10 +26,6 @@ pub struct TlsConfig {
|
||||
pub common_names: Option<HashSet<String>>,
|
||||
}
|
||||
|
||||
pub struct HttpConfig {
|
||||
pub sql_over_http_timeout: tokio::time::Duration,
|
||||
}
|
||||
|
||||
impl TlsConfig {
|
||||
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
|
||||
self.config.clone()
|
||||
|
||||
@@ -89,10 +89,7 @@ pub mod errors {
|
||||
Self::Console {
|
||||
status: http::StatusCode::LOCKED,
|
||||
ref text,
|
||||
} => {
|
||||
!text.contains("written data quota exceeded")
|
||||
&& !text.contains("the limit for current plan reached")
|
||||
}
|
||||
} => !text.contains("quota"),
|
||||
// retry server errors
|
||||
Self::Console { status, .. } if status.is_server_error() => true,
|
||||
_ => false,
|
||||
|
||||
@@ -20,7 +20,6 @@ use tokio_postgres::AsyncMessage;
|
||||
use crate::{
|
||||
auth, console,
|
||||
metrics::{Ids, MetricCounter, USAGE_METRICS},
|
||||
proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
|
||||
};
|
||||
use crate::{compute, config};
|
||||
|
||||
@@ -419,42 +418,36 @@ async fn connect_to_compute_once(
|
||||
};
|
||||
|
||||
tokio::spawn(
|
||||
async move {
|
||||
NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc();
|
||||
scopeguard::defer! {
|
||||
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
|
||||
poll_fn(move |cx| {
|
||||
if matches!(rx.has_changed(), Ok(true)) {
|
||||
session = *rx.borrow_and_update();
|
||||
info!(%session, "changed session");
|
||||
}
|
||||
poll_fn(move |cx| {
|
||||
if matches!(rx.has_changed(), Ok(true)) {
|
||||
session = *rx.borrow_and_update();
|
||||
info!(%session, "changed session");
|
||||
}
|
||||
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session, "connection error: {}", e);
|
||||
return Poll::Ready(())
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
return Poll::Ready(())
|
||||
}
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session, "connection error: {}", e);
|
||||
return Poll::Ready(())
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
return Poll::Ready(())
|
||||
}
|
||||
}
|
||||
}).await
|
||||
}
|
||||
}
|
||||
})
|
||||
.instrument(span)
|
||||
);
|
||||
|
||||
|
||||
@@ -24,9 +24,6 @@ use url::Url;
|
||||
use utils::http::error::ApiError;
|
||||
use utils::http::json::json_response;
|
||||
|
||||
use crate::config::HttpConfig;
|
||||
use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER};
|
||||
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::GlobalConnPool;
|
||||
|
||||
@@ -102,9 +99,9 @@ fn json_array_to_pg_array(value: &Value) -> Result<Option<String>, serde_json::E
|
||||
// convert to text with escaping
|
||||
Value::Bool(_) => serde_json::to_string(value).map(Some),
|
||||
Value::Number(_) => serde_json::to_string(value).map(Some),
|
||||
Value::Object(_) => serde_json::to_string(value).map(Some),
|
||||
|
||||
// here string needs to be escaped, as it is part of the array
|
||||
Value::Object(_) => json_array_to_pg_array(&Value::String(serde_json::to_string(value)?)),
|
||||
Value::String(_) => serde_json::to_string(value).map(Some),
|
||||
|
||||
// recurse into array
|
||||
@@ -191,46 +188,28 @@ pub async fn handle(
|
||||
sni_hostname: Option<String>,
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
session_id: uuid::Uuid,
|
||||
config: &'static HttpConfig,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let result = tokio::time::timeout(
|
||||
config.sql_over_http_timeout,
|
||||
handle_inner(request, sni_hostname, conn_pool, session_id),
|
||||
)
|
||||
.await;
|
||||
let result = handle_inner(request, sni_hostname, conn_pool, session_id).await;
|
||||
|
||||
let mut response = match result {
|
||||
Ok(r) => match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
let code = e.downcast_ref::<tokio_postgres::Error>().and_then(|e| {
|
||||
e.code()
|
||||
.map(|s| serde_json::to_value(s.code()).unwrap_or_default())
|
||||
});
|
||||
let code = match code {
|
||||
Some(c) => c,
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
let code = match e.downcast_ref::<tokio_postgres::Error>() {
|
||||
Some(e) => match e.code() {
|
||||
Some(e) => serde_json::to_value(e.code()).unwrap(),
|
||||
None => Value::Null,
|
||||
};
|
||||
error!(
|
||||
?code,
|
||||
"sql-over-http per-client task finished with an error: {e:#}"
|
||||
);
|
||||
// TODO: this shouldn't always be bad request.
|
||||
json_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
json!({ "message": message, "code": code }),
|
||||
)?
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
let message = format!(
|
||||
"HTTP-Connection timed out, execution time exeeded {} seconds",
|
||||
config.sql_over_http_timeout.as_secs()
|
||||
},
|
||||
None => Value::Null,
|
||||
};
|
||||
error!(
|
||||
?code,
|
||||
"sql-over-http per-client task finished with an error: {e:#}"
|
||||
);
|
||||
error!(message);
|
||||
// TODO: this shouldn't always be bad request.
|
||||
json_response(
|
||||
StatusCode::GATEWAY_TIMEOUT,
|
||||
json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
|
||||
StatusCode::BAD_REQUEST,
|
||||
json!({ "message": message, "code": code }),
|
||||
)?
|
||||
}
|
||||
};
|
||||
@@ -248,13 +227,6 @@ async fn handle_inner(
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
session_id: uuid::Uuid,
|
||||
) -> anyhow::Result<Response<Body>> {
|
||||
NUM_CONNECTIONS_ACCEPTED_COUNTER
|
||||
.with_label_values(&["http"])
|
||||
.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
|
||||
}
|
||||
|
||||
//
|
||||
// Determine the destination and connection params
|
||||
//
|
||||
@@ -613,7 +585,7 @@ fn _pg_array_parse(
|
||||
}
|
||||
}
|
||||
}
|
||||
'}' if !quote => {
|
||||
'}' => {
|
||||
level -= 1;
|
||||
if level == 0 {
|
||||
push_checked(&mut entry, &mut entries, elem_type)?;
|
||||
@@ -697,14 +669,6 @@ mod tests {
|
||||
"{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned()
|
||||
)]
|
||||
);
|
||||
// array of objects
|
||||
let json = r#"[{"foo": 1},{"bar": 2}]"#;
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]).unwrap();
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some(r#"{"{\"foo\":1}","{\"bar\":2}"}"#.to_owned())]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -832,23 +796,4 @@ mod tests {
|
||||
json!([[[1, 2, 3], [4, 5, 6]]])
|
||||
);
|
||||
}
|
||||
#[test]
|
||||
fn test_pg_array_parse_json() {
|
||||
fn pt(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::JSONB).unwrap()
|
||||
}
|
||||
assert_eq!(pt(r#"{"{}"}"#), json!([{}]));
|
||||
assert_eq!(
|
||||
pt(r#"{"{\"foo\": 1, \"bar\": 2}"}"#),
|
||||
json!([{"foo": 1, "bar": 2}])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{"{\"foo\": 1}", "{\"bar\": 2}"}"#),
|
||||
json!([{"foo": 1}, {"bar": 2}])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{"{\"foo\": 1}", "{\"bar\": 2}"}}"#),
|
||||
json!([[{"foo": 1}, {"bar": 2}]])
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,10 +3,7 @@ use crate::{
|
||||
config::ProxyConfig,
|
||||
error::io_error,
|
||||
protocol2::{ProxyProtocolAccept, WithClientIp},
|
||||
proxy::{
|
||||
handle_client, ClientMode, NUM_CLIENT_CONNECTION_CLOSED_COUNTER,
|
||||
NUM_CLIENT_CONNECTION_OPENED_COUNTER,
|
||||
},
|
||||
proxy::{handle_client, ClientMode},
|
||||
};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Sink, Stream, StreamExt};
|
||||
@@ -205,14 +202,7 @@ async fn ws_handler(
|
||||
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
|
||||
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
|
||||
sql_over_http::handle(
|
||||
request,
|
||||
sni_hostname,
|
||||
conn_pool,
|
||||
session_id,
|
||||
&config.http_config,
|
||||
)
|
||||
.await
|
||||
sql_over_http::handle(request, sni_hostname, conn_pool, session_id).await
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
|
||||
Response::builder()
|
||||
.header("Allow", "OPTIONS, POST")
|
||||
@@ -285,25 +275,23 @@ pub async fn task_main(
|
||||
let conn_pool = conn_pool.clone();
|
||||
|
||||
async move {
|
||||
Ok::<_, Infallible>(MetricService::new(hyper::service::service_fn(
|
||||
move |req: Request<Body>| {
|
||||
let sni_name = sni_name.clone();
|
||||
let conn_pool = conn_pool.clone();
|
||||
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
|
||||
let sni_name = sni_name.clone();
|
||||
let conn_pool = conn_pool.clone();
|
||||
|
||||
async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
|
||||
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
|
||||
.instrument(info_span!(
|
||||
"ws-client",
|
||||
session = %session_id,
|
||||
%peer_addr,
|
||||
))
|
||||
.await
|
||||
}
|
||||
},
|
||||
)))
|
||||
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
|
||||
.instrument(info_span!(
|
||||
"ws-client",
|
||||
session = %session_id,
|
||||
%peer_addr,
|
||||
))
|
||||
.await
|
||||
}
|
||||
}))
|
||||
}
|
||||
},
|
||||
);
|
||||
@@ -315,41 +303,3 @@ pub async fn task_main(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct MetricService<S> {
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S> MetricService<S> {
|
||||
fn new(inner: S) -> MetricService<S> {
|
||||
NUM_CLIENT_CONNECTION_OPENED_COUNTER
|
||||
.with_label_values(&["http"])
|
||||
.inc();
|
||||
MetricService { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Drop for MetricService<S> {
|
||||
fn drop(&mut self) {
|
||||
NUM_CLIENT_CONNECTION_CLOSED_COUNTER
|
||||
.with_label_values(&["http"])
|
||||
.inc();
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, ReqBody> hyper::service::Service<Request<ReqBody>> for MetricService<S>
|
||||
where
|
||||
S: hyper::service::Service<Request<ReqBody>>,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
|
||||
self.inner.call(req)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ use crate::{
|
||||
compute::{self, PostgresConnection},
|
||||
config::{ProxyConfig, TlsConfig},
|
||||
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
|
||||
http::StatusCode,
|
||||
metrics::{Ids, USAGE_METRICS},
|
||||
protocol2::WithClientIp,
|
||||
stream::{PqStream, Stream},
|
||||
@@ -39,55 +38,19 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
const ERR_PROTO_VIOLATION: &str = "protocol violation";
|
||||
|
||||
pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_opened_db_connections_total",
|
||||
"Number of opened connections to a database.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_closed_db_connections_total",
|
||||
"Number of closed connections to a database.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_opened_client_connections_total",
|
||||
"Number of opened connections from a client.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_closed_client_connections_total",
|
||||
"Number of closed connections from a client.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_accepted_connections_total",
|
||||
"Number of client connections accepted.",
|
||||
"Number of TCP client connections accepted.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_closed_connections_total",
|
||||
"Number of client connections closed.",
|
||||
"Number of TCP client connections closed.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
@@ -112,15 +75,6 @@ static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_connection_failures_breakdown",
|
||||
"Number of wake-up failures (per kind).",
|
||||
&["retry", "kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_io_bytes_per_client",
|
||||
@@ -254,16 +208,12 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
"handling interactive connection from client"
|
||||
);
|
||||
|
||||
let proto = mode.protocol_label();
|
||||
NUM_CLIENT_CONNECTION_OPENED_COUNTER
|
||||
.with_label_values(&[proto])
|
||||
.inc();
|
||||
// The `closed` counter will increase when this future is destroyed.
|
||||
NUM_CONNECTIONS_ACCEPTED_COUNTER
|
||||
.with_label_values(&[proto])
|
||||
.with_label_values(&[mode.protocol_label()])
|
||||
.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc();
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[mode.protocol_label()]).inc();
|
||||
}
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
@@ -298,7 +248,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
mode.allow_self_signed_compute(config),
|
||||
);
|
||||
cancel_map
|
||||
.with_session(|session| client.connect_to_db(session, mode))
|
||||
.with_session(|session| client.connect_to_db(session, mode.allow_cleartext()))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -447,46 +397,6 @@ impl ConnectMechanism for TcpMechanism<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
const fn bool_to_str(x: bool) -> &'static str {
|
||||
if x {
|
||||
"true"
|
||||
} else {
|
||||
"false"
|
||||
}
|
||||
}
|
||||
|
||||
fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
use crate::console::errors::ApiError;
|
||||
let retry = bool_to_str(retry);
|
||||
let kind = match e {
|
||||
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
|
||||
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
ref text,
|
||||
}) if text.contains("written data quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
"quota_exceeded"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
..
|
||||
}) => "api_console_locked",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
}) => "api_console_bad_request",
|
||||
WakeComputeError::ApiError(ApiError::Console { status, .. })
|
||||
if status.is_server_error() =>
|
||||
{
|
||||
"api_console_other_server_error"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
|
||||
};
|
||||
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
|
||||
}
|
||||
|
||||
/// Try to connect to the compute node, retrying if necessary.
|
||||
/// This function might update `node_info`, so we take it by `&mut`.
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -530,12 +440,10 @@ where
|
||||
match handle_try_wake(wake_res, num_retries) {
|
||||
Err(e) => {
|
||||
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
|
||||
report_error(&e, false);
|
||||
return Err(e.into());
|
||||
}
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(ControlFlow::Continue(e)) => {
|
||||
report_error(&e, true);
|
||||
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
@@ -774,7 +682,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
async fn connect_to_db(
|
||||
self,
|
||||
session: cancellation::Session<'_>,
|
||||
mode: ClientMode,
|
||||
allow_cleartext: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let Self {
|
||||
mut stream,
|
||||
@@ -790,7 +698,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
};
|
||||
|
||||
let auth_result = match creds
|
||||
.authenticate(&extra, &mut stream, mode.allow_cleartext())
|
||||
.authenticate(&extra, &mut stream, allow_cleartext)
|
||||
.await
|
||||
{
|
||||
Ok(auth_result) => auth_result,
|
||||
@@ -816,14 +724,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
let proto = mode.protocol_label();
|
||||
NUM_DB_CONNECTIONS_OPENED_COUNTER
|
||||
.with_label_values(&[proto])
|
||||
.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
|
||||
}
|
||||
|
||||
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
|
||||
// Before proxy passing, forward to compute whatever data is left in the
|
||||
// PqStream input buffer. Normally there is none, but our serverless npm
|
||||
|
||||
@@ -374,12 +374,8 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
if conf.http_auth.is_some() {
|
||||
router = router.middleware(auth_middleware(|request| {
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
|
||||
["/v1/status", "/metrics"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect()
|
||||
});
|
||||
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> =
|
||||
Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect());
|
||||
if ALLOWLIST_ROUTES.contains(request.uri()) {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -1464,29 +1464,6 @@ class NeonCli(AbstractNeonCli):
|
||||
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
|
||||
def map_branch(
|
||||
self, name: str, tenant_id: TenantId, timeline_id: TimelineId
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
"""
|
||||
Map tenant id and timeline id to a neon_local branch name. They do not have to exist.
|
||||
Usually needed when creating branches via PageserverHttpClient and not neon_local.
|
||||
|
||||
After creating a name mapping, you can use EndpointFactory.create_start
|
||||
with this registered branch name.
|
||||
"""
|
||||
args = [
|
||||
"mappings",
|
||||
"map",
|
||||
"--branch-name",
|
||||
name,
|
||||
"--tenant-id",
|
||||
str(tenant_id),
|
||||
"--timeline-id",
|
||||
str(timeline_id),
|
||||
]
|
||||
|
||||
return self.raw_cli(args, check_return_code=True)
|
||||
|
||||
def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]":
|
||||
return self.raw_cli(["start"], check_return_code=check_return_code)
|
||||
|
||||
|
||||
@@ -74,14 +74,11 @@ def wait_until_tenant_state(
|
||||
for _ in range(iterations):
|
||||
try:
|
||||
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
|
||||
except Exception as e:
|
||||
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
|
||||
else:
|
||||
log.debug(f"Tenant {tenant_id} data: {tenant}")
|
||||
if tenant["state"]["slug"] == expected_state:
|
||||
return tenant
|
||||
if tenant["state"]["slug"] == "Broken":
|
||||
raise RuntimeError(f"tenant became Broken, not {expected_state}")
|
||||
except Exception as e:
|
||||
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
|
||||
|
||||
time.sleep(period)
|
||||
|
||||
|
||||
@@ -65,7 +65,7 @@ def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_it
|
||||
|
||||
def start_single_table_workload(table_id: int):
|
||||
for _ in range(num_iters):
|
||||
with env.pg.connect(options="-cstatement_timeout=300s").cursor() as cur:
|
||||
with env.pg.connect().cursor() as cur:
|
||||
cur.execute(
|
||||
f"INSERT INTO t{table_id} SELECT FROM generate_series(1,{new_rows_each_update})"
|
||||
)
|
||||
|
||||
@@ -1,24 +1,14 @@
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from queue import SimpleQueue
|
||||
from typing import Any, Dict, List, Union
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
Endpoint,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.types import Lsn, TimelineId
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
from requests import RequestException
|
||||
from requests.exceptions import RetryError
|
||||
|
||||
|
||||
# Test branch creation
|
||||
@@ -138,245 +128,3 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
|
||||
endpoint1 = env.endpoints.create_start("b1")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])
|
||||
|
||||
|
||||
def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Endpoint should not be possible to create because branch has not been uploaded.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
initial_branch = "initial_branch"
|
||||
|
||||
def start_creating_timeline():
|
||||
with pytest.raises(RequestException):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
t = threading.Thread(target=start_creating_timeline)
|
||||
try:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
with pytest.raises(RuntimeError, match="is not active, state: Loading"):
|
||||
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
t.join()
|
||||
|
||||
|
||||
def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Branch should not be possible to create because ancestor has not been uploaded.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
with pytest.raises(RequestException):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
t = threading.Thread(target=start_creating_timeline)
|
||||
try:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
branch_id = TimelineId.generate()
|
||||
|
||||
with pytest.raises(RetryError, match="too many 503 error responses"):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version,
|
||||
env.initial_tenant,
|
||||
branch_id,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match=f"NotFound: Timeline {env.initial_tenant}/{branch_id} was not found",
|
||||
):
|
||||
ps_http.timeline_detail(env.initial_tenant, branch_id)
|
||||
# important to note that a task might still be in progress to complete
|
||||
# the work, but will never get to that because we have the pause
|
||||
# failpoint
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
t.join()
|
||||
|
||||
|
||||
def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
If the activate only after upload is used, then retries could become competing.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Error processing HTTP request: InternalServerError\\(Timeline .*/.* already exists in pageserver's memory"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
create_root = threading.Thread(target=start_creating_timeline)
|
||||
|
||||
branch_id = TimelineId.generate()
|
||||
|
||||
queue: SimpleQueue[Union[Dict[Any, Any], Exception]] = SimpleQueue()
|
||||
barrier = threading.Barrier(3)
|
||||
|
||||
def try_branch():
|
||||
barrier.wait()
|
||||
barrier.wait()
|
||||
try:
|
||||
ret = ps_http.timeline_create(
|
||||
env.pg_version,
|
||||
env.initial_tenant,
|
||||
branch_id,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
timeout=5,
|
||||
)
|
||||
queue.put(ret)
|
||||
except Exception as e:
|
||||
queue.put(e)
|
||||
|
||||
threads = [threading.Thread(target=try_branch) for _ in range(2)]
|
||||
|
||||
try:
|
||||
create_root.start()
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
barrier.wait()
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
|
||||
barrier.wait()
|
||||
|
||||
# now both requests race to branch, only one can win because they take gc_cs, Tenant::timelines or marker files
|
||||
first = queue.get()
|
||||
second = queue.get()
|
||||
|
||||
log.info(first)
|
||||
log.info(second)
|
||||
|
||||
(succeeded, failed) = (first, second) if isinstance(second, Exception) else (second, first)
|
||||
assert isinstance(failed, Exception)
|
||||
assert isinstance(succeeded, Dict)
|
||||
|
||||
# FIXME: there's probably multiple valid status codes:
|
||||
# - Timeline 62505b9a9f6b1d29117b1b74eaf07b12/56cd19d3b2dbcc65e9d53ec6ca304f24 already exists
|
||||
# - whatever 409 response says, but that is a subclass of PageserverApiException
|
||||
assert isinstance(failed, PageserverApiException)
|
||||
assert succeeded["state"] == "Active"
|
||||
finally:
|
||||
# we might still have the failpoint active
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
# pytest should nag if we leave threads unjoined
|
||||
for t in threads:
|
||||
t.join()
|
||||
create_root.join()
|
||||
|
||||
|
||||
def test_non_uploaded_branch_availability_after_restart(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Currently before RFC#27 we keep and continue uploading branches which were not successfully uploaded before shutdown.
|
||||
|
||||
This test likely duplicates some other test, but it's easier to write one than to make sure there will be a failing test when the rfc is implemented.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
with pytest.raises(RequestException):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
t = threading.Thread(target=start_creating_timeline)
|
||||
try:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
t.join()
|
||||
|
||||
# now without a failpoint
|
||||
env.pageserver.start()
|
||||
|
||||
wait_until_tenant_active(ps_http, env.initial_tenant)
|
||||
|
||||
# currently it lives on and will get eventually uploaded, but this will change
|
||||
detail = ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
|
||||
assert detail["state"] == "Active"
|
||||
|
||||
|
||||
def wait_until_paused(env: NeonEnv, failpoint: str):
|
||||
found = False
|
||||
msg = f"at failpoint {failpoint}"
|
||||
for _ in range(20):
|
||||
time.sleep(1)
|
||||
found = env.pageserver.log_contains(msg) is not None
|
||||
if found:
|
||||
break
|
||||
assert found
|
||||
|
||||
74
test_runner/regress/test_local_file_cache.py
Normal file
74
test_runner/regress/test_local_file_cache.py
Normal file
@@ -0,0 +1,74 @@
|
||||
import os
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
def test_local_file_cache_unlink(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
||||
os.mkdir(cache_dir)
|
||||
|
||||
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_local_file_cache_unlink",
|
||||
config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
f"neon.file_cache_path='{cache_dir}/file.cache'",
|
||||
"neon.max_file_cache_size='64MB'",
|
||||
"neon.file_cache_size_limit='10MB'",
|
||||
],
|
||||
)
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
|
||||
n_rows = 100000
|
||||
n_threads = 20
|
||||
n_updates_per_thread = 10000
|
||||
n_updates_per_connection = 1000
|
||||
n_total_updates = n_threads * n_updates_per_thread
|
||||
|
||||
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
|
||||
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
|
||||
|
||||
# Start threads that will perform random UPDATEs. Each UPDATE
|
||||
# increments the counter on the row, so that we can check at the
|
||||
# end that the sum of all the counters match the number of updates
|
||||
# performed (plus the initial 1 on each row).
|
||||
#
|
||||
# Furthermore, each thread will reconnect between every 1000 updates.
|
||||
def run_updates():
|
||||
n_updates_performed = 0
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
for _ in range(n_updates_per_thread):
|
||||
id = random.randint(1, n_rows)
|
||||
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
|
||||
n_updates_performed += 1
|
||||
if n_updates_performed % n_updates_per_connection == 0:
|
||||
cur.close()
|
||||
conn.close()
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
threads: List[threading.Thread] = []
|
||||
for _i in range(n_threads):
|
||||
thread = threading.Thread(target=run_updates, args=(), daemon=True)
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
|
||||
os.rename(cache_dir, new_cache_dir)
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows
|
||||
@@ -10,7 +10,6 @@ of the pageserver are:
|
||||
"""
|
||||
|
||||
|
||||
import enum
|
||||
import re
|
||||
import time
|
||||
from typing import Optional
|
||||
@@ -277,28 +276,15 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
assert get_deletion_queue_unexpected_errors(ps_http) == 0
|
||||
|
||||
|
||||
class KeepAttachment(str, enum.Enum):
|
||||
KEEP = "keep"
|
||||
LOSE = "lose"
|
||||
|
||||
|
||||
class ValidateBefore(str, enum.Enum):
|
||||
VALIDATE = "validate"
|
||||
NO_VALIDATE = "no-validate"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("keep_attachment", [KeepAttachment.KEEP, KeepAttachment.LOSE])
|
||||
@pytest.mark.parametrize("validate_before", [ValidateBefore.VALIDATE, ValidateBefore.NO_VALIDATE])
|
||||
@pytest.mark.parametrize("keep_attachment", [True, False])
|
||||
@pytest.mark.parametrize("validate_before", [True, False])
|
||||
def test_deletion_queue_recovery(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
keep_attachment: KeepAttachment,
|
||||
validate_before: ValidateBefore,
|
||||
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, keep_attachment: bool, validate_before: bool
|
||||
):
|
||||
"""
|
||||
:param keep_attachment: whether to re-attach after restart. Else, we act as if some other
|
||||
:param keep_attachment: If true, we re-attach after restart. Else, we act as if some other
|
||||
node took the attachment while we were restarting.
|
||||
:param validate_before: whether to wait for deletions to be validated before restart. This
|
||||
:param validate_before: If true, we wait for deletions to be validated before restart. This
|
||||
makes them elegible to be executed after restart, if the same node keeps the attachment.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
@@ -314,7 +300,7 @@ def test_deletion_queue_recovery(
|
||||
("deletion-queue-before-execute", "return"),
|
||||
]
|
||||
|
||||
if validate_before == ValidateBefore.NO_VALIDATE:
|
||||
if not validate_before:
|
||||
failpoints.append(
|
||||
# Prevent deletion lists from being validated, we will test that they are
|
||||
# dropped properly during recovery. 'pause' is okay here because we kill
|
||||
@@ -334,25 +320,20 @@ def test_deletion_queue_recovery(
|
||||
assert get_deletion_queue_unexpected_errors(ps_http) == 0
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
|
||||
if validate_before == ValidateBefore.VALIDATE:
|
||||
if validate_before:
|
||||
|
||||
def assert_validation_complete():
|
||||
assert get_deletion_queue_submitted(ps_http) == get_deletion_queue_validated(ps_http)
|
||||
|
||||
wait_until(20, 1, assert_validation_complete)
|
||||
|
||||
# The validatated keys statistic advances before the header is written, so we
|
||||
# also wait to see the header hit the disk: this seems paranoid but the race
|
||||
# can really happen on a heavily overloaded test machine.
|
||||
def assert_header_written():
|
||||
assert (env.pageserver.workdir / "deletion" / "header-01").exists()
|
||||
|
||||
wait_until(20, 1, assert_header_written)
|
||||
# A short wait to let the DeletionHeader get written out, as this happens after
|
||||
# the validated count gets incremented.
|
||||
time.sleep(1)
|
||||
|
||||
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
if not keep_attachment:
|
||||
some_other_pageserver = 101010
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.attach_hook(env.initial_tenant, some_other_pageserver)
|
||||
@@ -371,7 +352,7 @@ def test_deletion_queue_recovery(
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0))
|
||||
|
||||
if keep_attachment == KeepAttachment.KEEP or validate_before == ValidateBefore.VALIDATE:
|
||||
if keep_attachment or validate_before:
|
||||
# - If we kept the attachment, then our pre-restart deletions should execute
|
||||
# because on re-attach they were from the immediately preceding generation
|
||||
# - If we validated before restart, then the deletions should execute because the
|
||||
|
||||
@@ -17,8 +17,6 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
|
||||
n_restarts = 10
|
||||
scale = 10
|
||||
|
||||
env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down")
|
||||
|
||||
def run_pgbench(connstr: str):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
|
||||
@@ -188,7 +188,7 @@ def test_sql_over_http(static_proxy: NeonProxy):
|
||||
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
|
||||
verify=str(static_proxy.test_output_dir / "proxy.crt"),
|
||||
)
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.status_code == 200
|
||||
return response.json()
|
||||
|
||||
rows = q("select 42 as answer")["rows"]
|
||||
@@ -206,12 +206,6 @@ def test_sql_over_http(static_proxy: NeonProxy):
|
||||
rows = q("select $1::json->'a' as answer", [{"a": {"b": 42}}])["rows"]
|
||||
assert rows == [{"answer": {"b": 42}}]
|
||||
|
||||
rows = q("select $1::jsonb[] as answer", [[{}]])["rows"]
|
||||
assert rows == [{"answer": [{}]}]
|
||||
|
||||
rows = q("select $1::jsonb[] as answer", [[{"foo": 1}, {"bar": 2}]])["rows"]
|
||||
assert rows == [{"answer": [{"foo": 1}, {"bar": 2}]}]
|
||||
|
||||
rows = q("select * from pg_class limit 1")["rows"]
|
||||
assert len(rows) == 1
|
||||
|
||||
|
||||
@@ -752,9 +752,6 @@ def test_ignore_while_attaching(
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
# An endpoint is starting up concurrently with our detach, it can
|
||||
# experience RPC failure due to shutdown.
|
||||
env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down")
|
||||
|
||||
data_id = 1
|
||||
data_secret = "very secret secret"
|
||||
|
||||
@@ -6,7 +6,6 @@ from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
import asyncpg
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.log_helper import getLogger
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
|
||||
@@ -598,10 +597,7 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Pat
|
||||
assert res == expected_sum
|
||||
|
||||
|
||||
# Do inserts while restarting postgres and messing with safekeeper addresses.
|
||||
# The test takes more than default 5 minutes on Postgres 16,
|
||||
# see https://github.com/neondatabase/neon/issues/5305
|
||||
@pytest.mark.timeout(600)
|
||||
# do inserts while restarting postgres and messing with safekeeper addresses
|
||||
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 5d5cfee127...80932ee1ae
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 74cfe3e681...327f4e5e7d
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 389ce36b4b...e6c1ece2ef
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"postgres-v16": "389ce36b4b3da7aa654a25e1b3f10b641319a87f",
|
||||
"postgres-v15": "74cfe3e681836747a31fdbd47bdd14b3d81b0772",
|
||||
"postgres-v14": "5d5cfee12783f0989a9c9fe13bb40b5585812568"
|
||||
"postgres-v16": "e6c1ece2efbf52277565936925ae43599ce51730",
|
||||
"postgres-v15": "327f4e5e7db1050ce1b79e19f3d27ef1cff7acca",
|
||||
"postgres-v14": "80932ee1ae0b8e645390546ed104c357cddc7d0c"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user