mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 17:10:38 +00:00
Compare commits
12 Commits
joonas/imp
...
jcsp/downg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7f3670a589 | ||
|
|
5158de70f3 | ||
|
|
aec9188d36 | ||
|
|
acefee9a32 | ||
|
|
bf065aabdf | ||
|
|
fe74fac276 | ||
|
|
b91ac670e1 | ||
|
|
b3195afd20 | ||
|
|
7eaa7a496b | ||
|
|
4772cd6c93 | ||
|
|
010b4d0d5c | ||
|
|
477cb3717b |
@@ -368,8 +368,8 @@ RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar
|
||||
FROM build-deps AS plpgsql-check-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.4.0.tar.gz -O plpgsql_check.tar.gz && \
|
||||
echo "9ba58387a279b35a3bfa39ee611e5684e6cddb2ba046ddb2c5190b3bd2ca254a plpgsql_check.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.5.3.tar.gz -O plpgsql_check.tar.gz && \
|
||||
echo "6631ec3e7fb3769eaaf56e3dfedb829aa761abf163d13dba354b4c218508e1c0 plpgsql_check.tar.gz" | sha256sum --check && \
|
||||
mkdir plpgsql_check-src && cd plpgsql_check-src && tar xvzf ../plpgsql_check.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
|
||||
@@ -116,6 +116,7 @@ fn main() -> Result<()> {
|
||||
"attachment_service" => handle_attachment_service(sub_args, &env),
|
||||
"safekeeper" => handle_safekeeper(sub_args, &env),
|
||||
"endpoint" => handle_endpoint(sub_args, &env),
|
||||
"mappings" => handle_mappings(sub_args, &mut env),
|
||||
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
|
||||
_ => bail!("unexpected subcommand {sub_name}"),
|
||||
};
|
||||
@@ -816,6 +817,38 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
|
||||
let (sub_name, sub_args) = match sub_match.subcommand() {
|
||||
Some(ep_subcommand_data) => ep_subcommand_data,
|
||||
None => bail!("no mappings subcommand provided"),
|
||||
};
|
||||
|
||||
match sub_name {
|
||||
"map" => {
|
||||
let branch_name = sub_args
|
||||
.get_one::<String>("branch-name")
|
||||
.expect("branch-name argument missing");
|
||||
|
||||
let tenant_id = sub_args
|
||||
.get_one::<String>("tenant-id")
|
||||
.map(|x| TenantId::from_str(x))
|
||||
.expect("tenant-id argument missing")
|
||||
.expect("malformed tenant-id arg");
|
||||
|
||||
let timeline_id = sub_args
|
||||
.get_one::<String>("timeline-id")
|
||||
.map(|x| TimelineId::from_str(x))
|
||||
.expect("timeline-id argument missing")
|
||||
.expect("malformed timeline-id arg");
|
||||
|
||||
env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
other => unimplemented!("mappings subcommand {other}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
|
||||
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
|
||||
@@ -1084,6 +1117,7 @@ fn cli() -> Command {
|
||||
// --id, when using a pageserver command
|
||||
let pageserver_id_arg = Arg::new("pageserver-id")
|
||||
.long("id")
|
||||
.global(true)
|
||||
.help("pageserver id")
|
||||
.required(false);
|
||||
// --pageserver-id when using a non-pageserver command
|
||||
@@ -1254,17 +1288,20 @@ fn cli() -> Command {
|
||||
Command::new("pageserver")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage pageserver")
|
||||
.arg(pageserver_id_arg)
|
||||
.subcommand(Command::new("status"))
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.subcommand(Command::new("start").about("Start local pageserver")
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.arg(pageserver_config_args.clone()))
|
||||
.subcommand(Command::new("stop").about("Stop local pageserver")
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.arg(stop_mode_arg.clone()))
|
||||
.subcommand(Command::new("restart").about("Restart local pageserver")
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.arg(pageserver_config_args.clone()))
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start local pageserver")
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
.subcommand(Command::new("stop")
|
||||
.about("Stop local pageserver")
|
||||
.arg(stop_mode_arg.clone())
|
||||
)
|
||||
.subcommand(Command::new("restart")
|
||||
.about("Restart local pageserver")
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("attachment_service")
|
||||
@@ -1321,8 +1358,8 @@ fn cli() -> Command {
|
||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(branch_name_arg)
|
||||
.arg(timeline_id_arg)
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
.arg(lsn_arg)
|
||||
.arg(pg_port_arg)
|
||||
.arg(http_port_arg)
|
||||
@@ -1335,7 +1372,7 @@ fn cli() -> Command {
|
||||
.subcommand(
|
||||
Command::new("stop")
|
||||
.arg(endpoint_id_arg)
|
||||
.arg(tenant_id_arg)
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(
|
||||
Arg::new("destroy")
|
||||
.help("Also delete data directory (now optional, should be default in future)")
|
||||
@@ -1346,6 +1383,18 @@ fn cli() -> Command {
|
||||
)
|
||||
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("mappings")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage neon_local branch name mappings")
|
||||
.subcommand(
|
||||
Command::new("map")
|
||||
.about("Create new mapping which cannot exist already")
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
)
|
||||
)
|
||||
// Obsolete old name for 'endpoint'. We now just print an error if it's used.
|
||||
.subcommand(
|
||||
Command::new("pg")
|
||||
|
||||
@@ -442,10 +442,20 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
trace!("got message {:?}", msg);
|
||||
|
||||
let result = self.process_message(handler, msg, &mut query_string).await;
|
||||
self.flush().await?;
|
||||
tokio::select!(
|
||||
biased;
|
||||
_ = shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
tracing::info!("shutdown request received during response flush");
|
||||
return Ok(())
|
||||
},
|
||||
flush_r = self.flush() => {
|
||||
flush_r?;
|
||||
}
|
||||
);
|
||||
|
||||
match result? {
|
||||
ProcessMsgResult::Continue => {
|
||||
self.flush().await?;
|
||||
continue;
|
||||
}
|
||||
ProcessMsgResult::Break => break,
|
||||
|
||||
@@ -119,6 +119,7 @@ pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
|
||||
|
||||
match api_error {
|
||||
ApiError::ResourceUnavailable(_) => info!("Error processing HTTP request: {api_error:#}"),
|
||||
ApiError::NotFound(_) => info!("Error processing HTTP request: {api_error:#}"),
|
||||
ApiError::InternalServerError(_) => error!("Error processing HTTP request: {api_error:?}"),
|
||||
_ => error!("Error processing HTTP request: {api_error:#}"),
|
||||
}
|
||||
|
||||
@@ -153,7 +153,7 @@ impl FlushOp {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DeletionQueueClient {
|
||||
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
|
||||
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
|
||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||
@@ -212,7 +212,7 @@ where
|
||||
|
||||
/// Files ending with this suffix will be ignored and erased
|
||||
/// during recovery as startup.
|
||||
const TEMP_SUFFIX: &str = ".tmp";
|
||||
const TEMP_SUFFIX: &str = "tmp";
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -416,7 +416,7 @@ pub enum DeletionQueueError {
|
||||
impl DeletionQueueClient {
|
||||
pub(crate) fn broken() -> Self {
|
||||
// Channels whose receivers are immediately dropped.
|
||||
let (tx, _rx) = tokio::sync::mpsc::channel(1);
|
||||
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
|
||||
Self {
|
||||
tx,
|
||||
@@ -428,12 +428,12 @@ impl DeletionQueueClient {
|
||||
/// This is cancel-safe. If you drop the future before it completes, the message
|
||||
/// is not pushed, although in the context of the deletion queue it doesn't matter: once
|
||||
/// we decide to do a deletion the decision is always final.
|
||||
async fn do_push<T>(
|
||||
fn do_push<T>(
|
||||
&self,
|
||||
queue: &tokio::sync::mpsc::Sender<T>,
|
||||
queue: &tokio::sync::mpsc::UnboundedSender<T>,
|
||||
msg: T,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
match queue.send(msg).await {
|
||||
match queue.send(msg) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// This shouldn't happen, we should shut down all tenants before
|
||||
@@ -445,7 +445,7 @@ impl DeletionQueueClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn recover(
|
||||
pub(crate) fn recover(
|
||||
&self,
|
||||
attached_tenants: HashMap<TenantId, Generation>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
@@ -453,7 +453,6 @@ impl DeletionQueueClient {
|
||||
&self.tx,
|
||||
ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
|
||||
@@ -526,6 +525,21 @@ impl DeletionQueueClient {
|
||||
return self.flush_immediate().await;
|
||||
}
|
||||
|
||||
self.push_layers_sync(tenant_id, timeline_id, current_generation, layers)
|
||||
}
|
||||
|
||||
/// When a Tenant has a generation, push_layers is always synchronous because
|
||||
/// the ListValidator channel is an unbounded channel.
|
||||
///
|
||||
/// This can be merged into push_layers when we remove the Generation-less mode
|
||||
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
|
||||
pub(crate) fn push_layers_sync(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerFileName, Generation)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_submitted
|
||||
.inc_by(layers.len() as u64);
|
||||
@@ -539,17 +553,16 @@ impl DeletionQueueClient {
|
||||
objects: Vec::new(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// This is cancel-safe. If you drop the future the flush may still happen in the background.
|
||||
async fn do_flush<T>(
|
||||
&self,
|
||||
queue: &tokio::sync::mpsc::Sender<T>,
|
||||
queue: &tokio::sync::mpsc::UnboundedSender<T>,
|
||||
msg: T,
|
||||
rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
self.do_push(queue, msg).await?;
|
||||
self.do_push(queue, msg)?;
|
||||
if rx.await.is_err() {
|
||||
// This shouldn't happen if tenants are shut down before deletion queue. If we
|
||||
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
|
||||
@@ -570,6 +583,18 @@ impl DeletionQueueClient {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
|
||||
/// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
|
||||
/// detach where flushing is nice but not necessary.
|
||||
///
|
||||
/// This function provides no guarantees of work being done.
|
||||
pub fn flush_advisory(&self) {
|
||||
let (flush_op, _) = FlushOp::new();
|
||||
|
||||
// Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
|
||||
drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
|
||||
}
|
||||
|
||||
// Wait until all previous deletions are executed
|
||||
pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
|
||||
debug!("flush_execute: flushing to deletion lists...");
|
||||
@@ -586,9 +611,7 @@ impl DeletionQueueClient {
|
||||
// Flush any immediate-mode deletions (the above backend flush will only flush
|
||||
// the executor if deletions had flowed through the backend)
|
||||
debug!("flush_execute: flushing execution...");
|
||||
let (flush_op, rx) = FlushOp::new();
|
||||
self.do_flush(&self.executor_tx, DeleterMessage::Flush(flush_op), rx)
|
||||
.await?;
|
||||
self.flush_immediate().await?;
|
||||
debug!("flush_execute: finished flushing execution...");
|
||||
Ok(())
|
||||
}
|
||||
@@ -643,8 +666,10 @@ impl DeletionQueue {
|
||||
where
|
||||
C: ControlPlaneGenerationsApi + Send + Sync,
|
||||
{
|
||||
// Deep channel: it consumes deletions from all timelines and we do not want to block them
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(16384);
|
||||
// Unbounded channel: enables non-async functions to submit deletions. The actual length is
|
||||
// constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
|
||||
// enough to avoid this taking pathologically large amount of memory.
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
// Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
|
||||
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
|
||||
@@ -957,7 +982,7 @@ mod test {
|
||||
// Basic test that the deletion queue processes the deletions we pass into it
|
||||
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
client.recover(HashMap::new()).await?;
|
||||
client.recover(HashMap::new())?;
|
||||
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let tenant_id = ctx.harness.tenant_id;
|
||||
@@ -1025,7 +1050,7 @@ mod test {
|
||||
async fn deletion_queue_validation() -> anyhow::Result<()> {
|
||||
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
client.recover(HashMap::new()).await?;
|
||||
client.recover(HashMap::new())?;
|
||||
|
||||
// Generation that the control plane thinks is current
|
||||
let latest_generation = Generation::new(0xdeadbeef);
|
||||
@@ -1082,7 +1107,7 @@ mod test {
|
||||
// Basic test that the deletion queue processes the deletions we pass into it
|
||||
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
client.recover(HashMap::new()).await?;
|
||||
client.recover(HashMap::new())?;
|
||||
|
||||
let tenant_id = ctx.harness.tenant_id;
|
||||
|
||||
@@ -1145,9 +1170,7 @@ mod test {
|
||||
drop(client);
|
||||
ctx.restart().await;
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
client
|
||||
.recover(HashMap::from([(tenant_id, now_generation)]))
|
||||
.await?;
|
||||
client.recover(HashMap::from([(tenant_id, now_generation)]))?;
|
||||
|
||||
info!("Flush-executing");
|
||||
client.flush_execute().await?;
|
||||
@@ -1173,7 +1196,7 @@ pub(crate) mod mock {
|
||||
};
|
||||
|
||||
pub struct ConsumerState {
|
||||
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
|
||||
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
|
||||
executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
|
||||
}
|
||||
|
||||
@@ -1250,7 +1273,7 @@ pub(crate) mod mock {
|
||||
}
|
||||
|
||||
pub struct MockDeletionQueue {
|
||||
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
|
||||
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
executed: Arc<AtomicUsize>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
@@ -1260,7 +1283,7 @@ pub(crate) mod mock {
|
||||
|
||||
impl MockDeletionQueue {
|
||||
pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(16384);
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
|
||||
|
||||
let executed = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
@@ -12,7 +12,6 @@ use remote_storage::MAX_KEYS_PER_DELETE;
|
||||
use std::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::metrics;
|
||||
|
||||
@@ -88,7 +87,10 @@ impl Deleter {
|
||||
self.accumulator.clear();
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("DeleteObjects request failed: {e:#}, will retry");
|
||||
// The RemoteStorage interface doesn't discriminate between
|
||||
// real errors and 503/429 responses, so we log at INFO level
|
||||
// to avoid propagating spurious error-severity logs.
|
||||
info!("DeleteObjects request failed: {e:#}, will retry");
|
||||
metrics::DELETION_QUEUE
|
||||
.remote_errors
|
||||
.with_label_values(&["execute"])
|
||||
|
||||
@@ -85,7 +85,7 @@ pub(super) struct ListWriter {
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
// Incoming frontend requests to delete some keys
|
||||
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
|
||||
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
|
||||
|
||||
// Outbound requests to the backend to execute deletion lists we have composed.
|
||||
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
|
||||
@@ -111,7 +111,7 @@ impl ListWriter {
|
||||
|
||||
pub(super) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
|
||||
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
@@ -230,6 +230,7 @@ impl ListWriter {
|
||||
let list_name_pattern =
|
||||
Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
|
||||
|
||||
let temp_extension = format!(".{TEMP_SUFFIX}");
|
||||
let header_path = self.conf.deletion_header_path();
|
||||
let mut seqs: Vec<u64> = Vec::new();
|
||||
while let Some(dentry) = dir.next_entry().await? {
|
||||
@@ -241,7 +242,7 @@ impl ListWriter {
|
||||
continue;
|
||||
}
|
||||
|
||||
if dentry_str.ends_with(TEMP_SUFFIX) {
|
||||
if dentry_str.ends_with(&temp_extension) {
|
||||
info!("Cleaning up temporary file {dentry_str}");
|
||||
let absolute_path =
|
||||
deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
|
||||
|
||||
@@ -396,6 +396,9 @@ async fn timeline_create_handler(
|
||||
format!("{err:#}")
|
||||
))
|
||||
}
|
||||
Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
|
||||
json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
|
||||
}
|
||||
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
|
||||
}
|
||||
}
|
||||
@@ -572,9 +575,14 @@ async fn tenant_detach_handler(
|
||||
|
||||
let state = get_state(&request);
|
||||
let conf = state.conf;
|
||||
mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false))
|
||||
.instrument(info_span!("tenant_detach", %tenant_id))
|
||||
.await?;
|
||||
mgr::detach_tenant(
|
||||
conf,
|
||||
tenant_id,
|
||||
detach_ignored.unwrap_or(false),
|
||||
&state.deletion_queue_client,
|
||||
)
|
||||
.instrument(info_span!("tenant_detach", %tenant_id))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -1031,7 +1039,7 @@ async fn put_tenant_location_config_handler(
|
||||
// The `Detached` state is special, it doesn't upsert a tenant, it removes
|
||||
// its local disk content and drops it from memory.
|
||||
if let LocationConfigMode::Detached = request_data.config.mode {
|
||||
mgr::detach_tenant(conf, tenant_id, true)
|
||||
mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
|
||||
.instrument(info_span!("tenant_detach", %tenant_id))
|
||||
.await?;
|
||||
return json_response(StatusCode::OK, ());
|
||||
|
||||
@@ -35,6 +35,7 @@ use std::time::Duration;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::io::StreamReader;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::field;
|
||||
use tracing::*;
|
||||
use utils::id::ConnectionId;
|
||||
@@ -64,69 +65,6 @@ use crate::trace::Tracer;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
fn copyin_stream<IO>(pgb: &mut PostgresBackend<IO>) -> impl Stream<Item = io::Result<Bytes>> + '_
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
async_stream::try_stream! {
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
let msg = "pageserver is shutting down";
|
||||
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
|
||||
Err(QueryError::Other(anyhow::anyhow!(msg)))
|
||||
}
|
||||
|
||||
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
|
||||
};
|
||||
|
||||
match msg {
|
||||
Ok(Some(message)) => {
|
||||
let copy_data_bytes = match message {
|
||||
FeMessage::CopyData(bytes) => bytes,
|
||||
FeMessage::CopyDone => { break },
|
||||
FeMessage::Sync => continue,
|
||||
FeMessage::Terminate => {
|
||||
let msg = "client terminated connection with Terminate message during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
break;
|
||||
}
|
||||
m => {
|
||||
let msg = format!("unexpected message {m:?}");
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::Other, msg))?;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
yield copy_data_bytes;
|
||||
}
|
||||
Ok(None) => {
|
||||
let msg = "client closed connection during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
pgb.flush().await?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
}
|
||||
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
|
||||
Err(io_error)?;
|
||||
}
|
||||
Err(other) => {
|
||||
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Read the end of a tar archive.
|
||||
///
|
||||
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
|
||||
@@ -284,7 +222,13 @@ async fn page_service_conn_main(
|
||||
// and create a child per-query context when it invokes process_query.
|
||||
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
|
||||
// and create the per-query context in process_query ourselves.
|
||||
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
|
||||
let mut conn_handler = PageServerHandler::new(
|
||||
conf,
|
||||
broker_client,
|
||||
auth,
|
||||
connection_ctx,
|
||||
task_mgr::shutdown_token(),
|
||||
);
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
|
||||
match pgbackend
|
||||
@@ -318,6 +262,10 @@ struct PageServerHandler {
|
||||
/// For each query received over the connection,
|
||||
/// `process_query` creates a child context from this one.
|
||||
connection_ctx: RequestContext,
|
||||
|
||||
/// A token that should fire when the tenant transitions from
|
||||
/// attached state, or when the pageserver is shutting down.
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
@@ -326,6 +274,7 @@ impl PageServerHandler {
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
PageServerHandler {
|
||||
_conf: conf,
|
||||
@@ -333,6 +282,91 @@ impl PageServerHandler {
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
cancel,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use
|
||||
/// this rather than naked flush() in order to shut down promptly. Without this, we would
|
||||
/// block shutdown of a tenant if a postgres client was failing to consume bytes we send
|
||||
/// in the flush.
|
||||
async fn flush_cancellable<IO>(&self, pgb: &mut PostgresBackend<IO>) -> Result<(), QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
tokio::select!(
|
||||
flush_r = pgb.flush() => {
|
||||
Ok(flush_r?)
|
||||
},
|
||||
_ = self.cancel.cancelled() => {
|
||||
Err(QueryError::Other(anyhow::anyhow!("Shutting down")))
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
fn copyin_stream<'a, IO>(
|
||||
&'a self,
|
||||
pgb: &'a mut PostgresBackend<IO>,
|
||||
) -> impl Stream<Item = io::Result<Bytes>> + 'a
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
async_stream::try_stream! {
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
let msg = "pageserver is shutting down";
|
||||
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
|
||||
Err(QueryError::Other(anyhow::anyhow!(msg)))
|
||||
}
|
||||
|
||||
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
|
||||
};
|
||||
|
||||
match msg {
|
||||
Ok(Some(message)) => {
|
||||
let copy_data_bytes = match message {
|
||||
FeMessage::CopyData(bytes) => bytes,
|
||||
FeMessage::CopyDone => { break },
|
||||
FeMessage::Sync => continue,
|
||||
FeMessage::Terminate => {
|
||||
let msg = "client terminated connection with Terminate message during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
break;
|
||||
}
|
||||
m => {
|
||||
let msg = format!("unexpected message {m:?}");
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::Other, msg))?;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
yield copy_data_bytes;
|
||||
}
|
||||
Ok(None) => {
|
||||
let msg = "client closed connection during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
self.flush_cancellable(pgb).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
}
|
||||
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
|
||||
Err(io_error)?;
|
||||
}
|
||||
Err(other) => {
|
||||
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -372,7 +406,7 @@ impl PageServerHandler {
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
|
||||
pgb.flush().await?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
|
||||
let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
|
||||
|
||||
@@ -465,7 +499,7 @@ impl PageServerHandler {
|
||||
});
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
|
||||
pgb.flush().await?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -508,9 +542,9 @@ impl PageServerHandler {
|
||||
// Import basebackup provided via CopyData
|
||||
info!("importing basebackup");
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
|
||||
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
|
||||
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
|
||||
timeline
|
||||
.import_basebackup_from_tar(
|
||||
&mut copyin_reader,
|
||||
@@ -563,8 +597,8 @@ impl PageServerHandler {
|
||||
// Import wal provided via CopyData
|
||||
info!("importing wal");
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
|
||||
self.flush_cancellable(pgb).await?;
|
||||
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
|
||||
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
|
||||
info!("wal import complete");
|
||||
|
||||
@@ -772,7 +806,7 @@ impl PageServerHandler {
|
||||
|
||||
// switch client to COPYOUT
|
||||
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
|
||||
pgb.flush().await?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
|
||||
// Send a tarball of the latest layer on the timeline. Compress if not
|
||||
// fullbackup. TODO Compress in that case too (tests need to be updated)
|
||||
@@ -824,7 +858,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)?;
|
||||
pgb.flush().await?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
|
||||
let basebackup_after = started
|
||||
.elapsed()
|
||||
|
||||
@@ -45,6 +45,7 @@ use std::sync::{Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::config::AttachedLocationConfig;
|
||||
use self::config::AttachmentMode;
|
||||
use self::config::LocationConf;
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTenantFlow;
|
||||
@@ -406,6 +407,8 @@ pub enum CreateTimelineError {
|
||||
AlreadyExists,
|
||||
#[error(transparent)]
|
||||
AncestorLsn(anyhow::Error),
|
||||
#[error("ancestor timeline is not active")]
|
||||
AncestorNotActive,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
@@ -1587,6 +1590,12 @@ impl Tenant {
|
||||
.get_timeline(ancestor_timeline_id, false)
|
||||
.context("Cannot branch off the timeline that's not present in pageserver")?;
|
||||
|
||||
// instead of waiting around, just deny the request because ancestor is not yet
|
||||
// ready for other purposes either.
|
||||
if !ancestor_timeline.is_active() {
|
||||
return Err(CreateTimelineError::AncestorNotActive);
|
||||
}
|
||||
|
||||
if let Some(lsn) = ancestor_start_lsn.as_mut() {
|
||||
*lsn = lsn.align();
|
||||
|
||||
@@ -1619,8 +1628,6 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
loaded_timeline.activate(broker_client, None, ctx);
|
||||
|
||||
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
|
||||
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
|
||||
// Ok, the timeline is durable in remote storage.
|
||||
@@ -1632,6 +1639,8 @@ impl Tenant {
|
||||
})?;
|
||||
}
|
||||
|
||||
loaded_timeline.activate(broker_client, None, ctx);
|
||||
|
||||
Ok(loaded_timeline)
|
||||
}
|
||||
|
||||
@@ -2068,6 +2077,15 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
|
||||
self.tenant_conf
|
||||
.read()
|
||||
.unwrap()
|
||||
.location
|
||||
.attach_mode
|
||||
.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
|
||||
|
||||
@@ -24,7 +24,7 @@ use crate::control_plane_client::{
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt};
|
||||
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
|
||||
use crate::tenant::delete::DeleteTenantFlow;
|
||||
use crate::tenant::{
|
||||
create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState,
|
||||
@@ -206,8 +206,7 @@ async fn init_load_generations(
|
||||
if resources.remote_storage.is_some() {
|
||||
resources
|
||||
.deletion_queue_client
|
||||
.recover(generations.clone())
|
||||
.await?;
|
||||
.recover(generations.clone())?;
|
||||
}
|
||||
|
||||
Ok(Some(generations))
|
||||
@@ -695,6 +694,18 @@ pub(crate) async fn upsert_location(
|
||||
|
||||
if let Some(tenant) = shutdown_tenant {
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
|
||||
match tenant.get_attach_mode() {
|
||||
AttachmentMode::Single | AttachmentMode::Multi => {
|
||||
// Before we leave our state as the presumed holder of the latest generation,
|
||||
// flush any outstanding deletions to reduce the risk of leaking objects.
|
||||
deletion_queue_client.flush_advisory()
|
||||
}
|
||||
AttachmentMode::Stale => {
|
||||
// If we're stale there's not point trying to flush deletions
|
||||
}
|
||||
};
|
||||
|
||||
info!("Shutting down attached tenant");
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {}
|
||||
@@ -849,8 +860,16 @@ pub async fn detach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
detach_ignored: bool,
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<(), TenantStateError> {
|
||||
let tmp_path = detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await?;
|
||||
let tmp_path = detach_tenant0(
|
||||
conf,
|
||||
&TENANTS,
|
||||
tenant_id,
|
||||
detach_ignored,
|
||||
deletion_queue_client,
|
||||
)
|
||||
.await?;
|
||||
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
|
||||
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
|
||||
let task_tenant_id = None;
|
||||
@@ -875,6 +894,7 @@ async fn detach_tenant0(
|
||||
tenants: &tokio::sync::RwLock<TenantsMap>,
|
||||
tenant_id: TenantId,
|
||||
detach_ignored: bool,
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<Utf8PathBuf, TenantStateError> {
|
||||
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
|
||||
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
|
||||
@@ -886,6 +906,10 @@ async fn detach_tenant0(
|
||||
let removal_result =
|
||||
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
|
||||
|
||||
// Flush pending deletions, so that they have a good chance of passing validation
|
||||
// before this tenant is potentially re-attached elsewhere.
|
||||
deletion_queue_client.flush_advisory();
|
||||
|
||||
// Ignored tenants are not present in memory and will bail the removal from memory operation.
|
||||
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
|
||||
if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {
|
||||
|
||||
@@ -31,6 +31,7 @@ pub(super) async fn upload_index_part<'a>(
|
||||
fail_point!("before-upload-index", |_| {
|
||||
bail!("failpoint before-upload-index")
|
||||
});
|
||||
pausable_failpoint!("before-upload-index-pausable");
|
||||
|
||||
let index_part_bytes =
|
||||
serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;
|
||||
|
||||
@@ -721,7 +721,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
|
||||
|
||||
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
|
||||
req.buftag = tag;
|
||||
|
||||
Retry:
|
||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &req);
|
||||
|
||||
if (entry != NULL)
|
||||
@@ -858,7 +858,11 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
|
||||
if (flush_every_n_requests > 0 &&
|
||||
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
|
||||
{
|
||||
page_server->flush();
|
||||
if (!page_server->flush())
|
||||
{
|
||||
/* Prefetch set is reset in case of error, so we should try to register our request once again */
|
||||
goto Retry;
|
||||
}
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
|
||||
@@ -89,7 +89,10 @@ pub mod errors {
|
||||
Self::Console {
|
||||
status: http::StatusCode::LOCKED,
|
||||
ref text,
|
||||
} => !text.contains("quota"),
|
||||
} => {
|
||||
!text.contains("written data quota exceeded")
|
||||
&& !text.contains("the limit for current plan reached")
|
||||
}
|
||||
// retry server errors
|
||||
Self::Console { status, .. } if status.is_server_error() => true,
|
||||
_ => false,
|
||||
|
||||
@@ -47,6 +47,7 @@ enum Payload {
|
||||
|
||||
const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
|
||||
const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
|
||||
const HTTP_CONNECTION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(15);
|
||||
|
||||
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
|
||||
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
|
||||
@@ -189,27 +190,44 @@ pub async fn handle(
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
session_id: uuid::Uuid,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let result = handle_inner(request, sni_hostname, conn_pool, session_id).await;
|
||||
|
||||
let result = tokio::time::timeout(
|
||||
HTTP_CONNECTION_TIMEOUT,
|
||||
handle_inner(request, sni_hostname, conn_pool, session_id),
|
||||
)
|
||||
.await;
|
||||
let mut response = match result {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
let code = match e.downcast_ref::<tokio_postgres::Error>() {
|
||||
Some(e) => match e.code() {
|
||||
Some(e) => serde_json::to_value(e.code()).unwrap(),
|
||||
Ok(r) => match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
let code = e.downcast_ref::<tokio_postgres::Error>().and_then(|e| {
|
||||
e.code()
|
||||
.map(|s| serde_json::to_value(s.code()).unwrap_or_default())
|
||||
});
|
||||
let code = match code {
|
||||
Some(c) => c,
|
||||
None => Value::Null,
|
||||
},
|
||||
None => Value::Null,
|
||||
};
|
||||
error!(
|
||||
?code,
|
||||
"sql-over-http per-client task finished with an error: {e:#}"
|
||||
};
|
||||
error!(
|
||||
?code,
|
||||
"sql-over-http per-client task finished with an error: {e:#}"
|
||||
);
|
||||
// TODO: this shouldn't always be bad request.
|
||||
json_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
json!({ "message": message, "code": code }),
|
||||
)?
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
let message = format!(
|
||||
"HTTP-Connection timed out, execution time exeeded {} seconds",
|
||||
HTTP_CONNECTION_TIMEOUT.as_secs()
|
||||
);
|
||||
// TODO: this shouldn't always be bad request.
|
||||
error!(message);
|
||||
json_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
json!({ "message": message, "code": code }),
|
||||
StatusCode::GATEWAY_TIMEOUT,
|
||||
json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
|
||||
)?
|
||||
}
|
||||
};
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::{
|
||||
compute::{self, PostgresConnection},
|
||||
config::{ProxyConfig, TlsConfig},
|
||||
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
|
||||
http::StatusCode,
|
||||
metrics::{Ids, USAGE_METRICS},
|
||||
protocol2::WithClientIp,
|
||||
stream::{PqStream, Stream},
|
||||
@@ -75,6 +76,15 @@ static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_connection_failures_breakdown",
|
||||
"Number of wake-up failures (per kind).",
|
||||
&["retry", "kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_io_bytes_per_client",
|
||||
@@ -397,6 +407,46 @@ impl ConnectMechanism for TcpMechanism<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
const fn bool_to_str(x: bool) -> &'static str {
|
||||
if x {
|
||||
"true"
|
||||
} else {
|
||||
"false"
|
||||
}
|
||||
}
|
||||
|
||||
fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
use crate::console::errors::ApiError;
|
||||
let retry = bool_to_str(retry);
|
||||
let kind = match e {
|
||||
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
|
||||
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
ref text,
|
||||
}) if text.contains("written data quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
"quota_exceeded"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
..
|
||||
}) => "api_console_locked",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
}) => "api_console_bad_request",
|
||||
WakeComputeError::ApiError(ApiError::Console { status, .. })
|
||||
if status.is_server_error() =>
|
||||
{
|
||||
"api_console_other_server_error"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
|
||||
};
|
||||
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
|
||||
}
|
||||
|
||||
/// Try to connect to the compute node, retrying if necessary.
|
||||
/// This function might update `node_info`, so we take it by `&mut`.
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -440,10 +490,12 @@ where
|
||||
match handle_try_wake(wake_res, num_retries) {
|
||||
Err(e) => {
|
||||
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
|
||||
report_error(&e, false);
|
||||
return Err(e.into());
|
||||
}
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(ControlFlow::Continue(e)) => {
|
||||
report_error(&e, true);
|
||||
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
|
||||
@@ -1464,6 +1464,29 @@ class NeonCli(AbstractNeonCli):
|
||||
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
|
||||
def map_branch(
|
||||
self, name: str, tenant_id: TenantId, timeline_id: TimelineId
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
"""
|
||||
Map tenant id and timeline id to a neon_local branch name. They do not have to exist.
|
||||
Usually needed when creating branches via PageserverHttpClient and not neon_local.
|
||||
|
||||
After creating a name mapping, you can use EndpointFactory.create_start
|
||||
with this registered branch name.
|
||||
"""
|
||||
args = [
|
||||
"mappings",
|
||||
"map",
|
||||
"--branch-name",
|
||||
name,
|
||||
"--tenant-id",
|
||||
str(tenant_id),
|
||||
"--timeline-id",
|
||||
str(timeline_id),
|
||||
]
|
||||
|
||||
return self.raw_cli(args, check_return_code=True)
|
||||
|
||||
def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]":
|
||||
return self.raw_cli(["start"], check_return_code=check_return_code)
|
||||
|
||||
|
||||
@@ -74,11 +74,14 @@ def wait_until_tenant_state(
|
||||
for _ in range(iterations):
|
||||
try:
|
||||
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
|
||||
except Exception as e:
|
||||
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
|
||||
else:
|
||||
log.debug(f"Tenant {tenant_id} data: {tenant}")
|
||||
if tenant["state"]["slug"] == expected_state:
|
||||
return tenant
|
||||
except Exception as e:
|
||||
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
|
||||
if tenant["state"]["slug"] == "Broken":
|
||||
raise RuntimeError(f"tenant became Broken, not {expected_state}")
|
||||
|
||||
time.sleep(period)
|
||||
|
||||
|
||||
@@ -1,14 +1,24 @@
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from typing import List
|
||||
from queue import SimpleQueue
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.neon_fixtures import (
|
||||
Endpoint,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.types import Lsn, TimelineId
|
||||
from fixtures.utils import query_scalar
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
from requests import RequestException
|
||||
from requests.exceptions import RetryError
|
||||
|
||||
|
||||
# Test branch creation
|
||||
@@ -128,3 +138,245 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
|
||||
endpoint1 = env.endpoints.create_start("b1")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])
|
||||
|
||||
|
||||
def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Endpoint should not be possible to create because branch has not been uploaded.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
initial_branch = "initial_branch"
|
||||
|
||||
def start_creating_timeline():
|
||||
with pytest.raises(RequestException):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
t = threading.Thread(target=start_creating_timeline)
|
||||
try:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
with pytest.raises(RuntimeError, match="is not active, state: Loading"):
|
||||
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
t.join()
|
||||
|
||||
|
||||
def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Branch should not be possible to create because ancestor has not been uploaded.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
with pytest.raises(RequestException):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
t = threading.Thread(target=start_creating_timeline)
|
||||
try:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
branch_id = TimelineId.generate()
|
||||
|
||||
with pytest.raises(RetryError, match="too many 503 error responses"):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version,
|
||||
env.initial_tenant,
|
||||
branch_id,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match=f"NotFound: Timeline {env.initial_tenant}/{branch_id} was not found",
|
||||
):
|
||||
ps_http.timeline_detail(env.initial_tenant, branch_id)
|
||||
# important to note that a task might still be in progress to complete
|
||||
# the work, but will never get to that because we have the pause
|
||||
# failpoint
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
t.join()
|
||||
|
||||
|
||||
def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
If the activate only after upload is used, then retries could become competing.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Error processing HTTP request: InternalServerError\\(Timeline .*/.* already exists in pageserver's memory"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
create_root = threading.Thread(target=start_creating_timeline)
|
||||
|
||||
branch_id = TimelineId.generate()
|
||||
|
||||
queue: SimpleQueue[Union[Dict[Any, Any], Exception]] = SimpleQueue()
|
||||
barrier = threading.Barrier(3)
|
||||
|
||||
def try_branch():
|
||||
barrier.wait()
|
||||
barrier.wait()
|
||||
try:
|
||||
ret = ps_http.timeline_create(
|
||||
env.pg_version,
|
||||
env.initial_tenant,
|
||||
branch_id,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
timeout=5,
|
||||
)
|
||||
queue.put(ret)
|
||||
except Exception as e:
|
||||
queue.put(e)
|
||||
|
||||
threads = [threading.Thread(target=try_branch) for _ in range(2)]
|
||||
|
||||
try:
|
||||
create_root.start()
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
barrier.wait()
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
|
||||
barrier.wait()
|
||||
|
||||
# now both requests race to branch, only one can win because they take gc_cs, Tenant::timelines or marker files
|
||||
first = queue.get()
|
||||
second = queue.get()
|
||||
|
||||
log.info(first)
|
||||
log.info(second)
|
||||
|
||||
(succeeded, failed) = (first, second) if isinstance(second, Exception) else (second, first)
|
||||
assert isinstance(failed, Exception)
|
||||
assert isinstance(succeeded, Dict)
|
||||
|
||||
# FIXME: there's probably multiple valid status codes:
|
||||
# - Timeline 62505b9a9f6b1d29117b1b74eaf07b12/56cd19d3b2dbcc65e9d53ec6ca304f24 already exists
|
||||
# - whatever 409 response says, but that is a subclass of PageserverApiException
|
||||
assert isinstance(failed, PageserverApiException)
|
||||
assert succeeded["state"] == "Active"
|
||||
finally:
|
||||
# we might still have the failpoint active
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
# pytest should nag if we leave threads unjoined
|
||||
for t in threads:
|
||||
t.join()
|
||||
create_root.join()
|
||||
|
||||
|
||||
def test_non_uploaded_branch_availability_after_restart(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Currently before RFC#27 we keep and continue uploading branches which were not successfully uploaded before shutdown.
|
||||
|
||||
This test likely duplicates some other test, but it's easier to write one than to make sure there will be a failing test when the rfc is implemented.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
with pytest.raises(RequestException):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
t = threading.Thread(target=start_creating_timeline)
|
||||
try:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
t.join()
|
||||
|
||||
# now without a failpoint
|
||||
env.pageserver.start()
|
||||
|
||||
wait_until_tenant_active(ps_http, env.initial_tenant)
|
||||
|
||||
# currently it lives on and will get eventually uploaded, but this will change
|
||||
detail = ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
|
||||
assert detail["state"] == "Active"
|
||||
|
||||
|
||||
def wait_until_paused(env: NeonEnv, failpoint: str):
|
||||
found = False
|
||||
msg = f"at failpoint {failpoint}"
|
||||
for _ in range(20):
|
||||
time.sleep(1)
|
||||
found = env.pageserver.log_contains(msg) is not None
|
||||
if found:
|
||||
break
|
||||
assert found
|
||||
|
||||
@@ -10,6 +10,7 @@ of the pageserver are:
|
||||
"""
|
||||
|
||||
|
||||
import enum
|
||||
import re
|
||||
import time
|
||||
from typing import Optional
|
||||
@@ -276,15 +277,28 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
assert get_deletion_queue_unexpected_errors(ps_http) == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("keep_attachment", [True, False])
|
||||
@pytest.mark.parametrize("validate_before", [True, False])
|
||||
class KeepAttachment(str, enum.Enum):
|
||||
KEEP = "keep"
|
||||
LOSE = "lose"
|
||||
|
||||
|
||||
class ValidateBefore(str, enum.Enum):
|
||||
VALIDATE = "validate"
|
||||
NO_VALIDATE = "no-validate"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("keep_attachment", [KeepAttachment.KEEP, KeepAttachment.LOSE])
|
||||
@pytest.mark.parametrize("validate_before", [ValidateBefore.VALIDATE, ValidateBefore.NO_VALIDATE])
|
||||
def test_deletion_queue_recovery(
|
||||
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, keep_attachment: bool, validate_before: bool
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
keep_attachment: KeepAttachment,
|
||||
validate_before: ValidateBefore,
|
||||
):
|
||||
"""
|
||||
:param keep_attachment: If true, we re-attach after restart. Else, we act as if some other
|
||||
:param keep_attachment: whether to re-attach after restart. Else, we act as if some other
|
||||
node took the attachment while we were restarting.
|
||||
:param validate_before: If true, we wait for deletions to be validated before restart. This
|
||||
:param validate_before: whether to wait for deletions to be validated before restart. This
|
||||
makes them elegible to be executed after restart, if the same node keeps the attachment.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
@@ -300,7 +314,7 @@ def test_deletion_queue_recovery(
|
||||
("deletion-queue-before-execute", "return"),
|
||||
]
|
||||
|
||||
if not validate_before:
|
||||
if validate_before == ValidateBefore.NO_VALIDATE:
|
||||
failpoints.append(
|
||||
# Prevent deletion lists from being validated, we will test that they are
|
||||
# dropped properly during recovery. 'pause' is okay here because we kill
|
||||
@@ -320,20 +334,25 @@ def test_deletion_queue_recovery(
|
||||
assert get_deletion_queue_unexpected_errors(ps_http) == 0
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
|
||||
if validate_before:
|
||||
if validate_before == ValidateBefore.VALIDATE:
|
||||
|
||||
def assert_validation_complete():
|
||||
assert get_deletion_queue_submitted(ps_http) == get_deletion_queue_validated(ps_http)
|
||||
|
||||
wait_until(20, 1, assert_validation_complete)
|
||||
# A short wait to let the DeletionHeader get written out, as this happens after
|
||||
# the validated count gets incremented.
|
||||
time.sleep(1)
|
||||
|
||||
# The validatated keys statistic advances before the header is written, so we
|
||||
# also wait to see the header hit the disk: this seems paranoid but the race
|
||||
# can really happen on a heavily overloaded test machine.
|
||||
def assert_header_written():
|
||||
assert (env.pageserver.workdir / "deletion" / "header-01").exists()
|
||||
|
||||
wait_until(20, 1, assert_header_written)
|
||||
|
||||
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
if not keep_attachment:
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
some_other_pageserver = 101010
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.attach_hook(env.initial_tenant, some_other_pageserver)
|
||||
@@ -352,7 +371,7 @@ def test_deletion_queue_recovery(
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0))
|
||||
|
||||
if keep_attachment or validate_before:
|
||||
if keep_attachment == KeepAttachment.KEEP or validate_before == ValidateBefore.VALIDATE:
|
||||
# - If we kept the attachment, then our pre-restart deletions should execute
|
||||
# because on re-attach they were from the immediately preceding generation
|
||||
# - If we validated before restart, then the deletions should execute because the
|
||||
|
||||
@@ -17,6 +17,8 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
|
||||
n_restarts = 10
|
||||
scale = 10
|
||||
|
||||
env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down")
|
||||
|
||||
def run_pgbench(connstr: str):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
|
||||
@@ -752,6 +752,9 @@ def test_ignore_while_attaching(
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
# An endpoint is starting up concurrently with our detach, it can
|
||||
# experience RPC failure due to shutdown.
|
||||
env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down")
|
||||
|
||||
data_id = 1
|
||||
data_secret = "very secret secret"
|
||||
|
||||
Reference in New Issue
Block a user