From 4ab18444ecd88e1d18c01b54779e933a4428c3be Mon Sep 17 00:00:00 2001 From: Andrew Rudenko Date: Tue, 11 Feb 2025 08:02:13 +0100 Subject: [PATCH] compute_ctl: database_schema should keep process::Child as part of returned value (#10273) ## Problem /database_schema endpoint returns incomplete output from `pg_dump` ## Summary of changes The Tokio process was not used properly. The returned stream does not include `process::Child`, and the process is scheduled to be killed immediately after the `get_database_schema` call when `cmd` goes out of scope. The solution in this PR is to return a special Stream implementation that retains `process::Child`. --- compute_tools/src/catalog.rs | 31 ++++++++++++++++++++- test_runner/regress/test_compute_catalog.py | 2 +- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/compute_tools/src/catalog.rs b/compute_tools/src/catalog.rs index 4a297cfacf..28b10ce21c 100644 --- a/compute_tools/src/catalog.rs +++ b/compute_tools/src/catalog.rs @@ -140,5 +140,34 @@ pub async fn get_database_schema( warn!("pg_dump stderr: {}", line) } }); - Ok(initial_stream.chain(stdout_reader.map(|res| res.map(|b| b.freeze())))) + + #[allow(dead_code)] + struct SchemaStream { + // We keep a reference to the child process to ensure it stays alive + // while the stream is being consumed. When SchemaStream is dropped, + // cmd will be dropped, which triggers kill_on_drop and terminates pg_dump + cmd: tokio::process::Child, + stream: S, + } + + impl Stream for SchemaStream + where + S: Stream> + Unpin, + { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Stream::poll_next(std::pin::Pin::new(&mut self.stream), cx) + } + } + + let schema_stream = SchemaStream { + cmd, + stream: initial_stream.chain(stdout_reader.map(|res| res.map(|b| b.freeze()))), + }; + + Ok(schema_stream) } diff --git a/test_runner/regress/test_compute_catalog.py b/test_runner/regress/test_compute_catalog.py index 50a922a616..3a08671bbf 100644 --- a/test_runner/regress/test_compute_catalog.py +++ b/test_runner/regress/test_compute_catalog.py @@ -82,7 +82,7 @@ def test_compute_catalog(neon_simple_env: NeonEnv): ddl = client.database_schema(database=test_db["name"]) # Check that it looks like a valid PostgreSQL dump - assert "-- PostgreSQL database dump" in ddl + assert "-- PostgreSQL database dump complete" in ddl # Check that it doesn't contain health_check and migration traces. # They are only created in system `postgres` database, so by checking