diff --git a/compute_tools/src/catalog.rs b/compute_tools/src/catalog.rs index 4a297cfacf..28b10ce21c 100644 --- a/compute_tools/src/catalog.rs +++ b/compute_tools/src/catalog.rs @@ -140,5 +140,34 @@ pub async fn get_database_schema( warn!("pg_dump stderr: {}", line) } }); - Ok(initial_stream.chain(stdout_reader.map(|res| res.map(|b| b.freeze())))) + + #[allow(dead_code)] + struct SchemaStream { + // We keep a reference to the child process to ensure it stays alive + // while the stream is being consumed. When SchemaStream is dropped, + // cmd will be dropped, which triggers kill_on_drop and terminates pg_dump + cmd: tokio::process::Child, + stream: S, + } + + impl Stream for SchemaStream + where + S: Stream> + Unpin, + { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Stream::poll_next(std::pin::Pin::new(&mut self.stream), cx) + } + } + + let schema_stream = SchemaStream { + cmd, + stream: initial_stream.chain(stdout_reader.map(|res| res.map(|b| b.freeze()))), + }; + + Ok(schema_stream) } diff --git a/test_runner/regress/test_compute_catalog.py b/test_runner/regress/test_compute_catalog.py index 50a922a616..3a08671bbf 100644 --- a/test_runner/regress/test_compute_catalog.py +++ b/test_runner/regress/test_compute_catalog.py @@ -82,7 +82,7 @@ def test_compute_catalog(neon_simple_env: NeonEnv): ddl = client.database_schema(database=test_db["name"]) # Check that it looks like a valid PostgreSQL dump - assert "-- PostgreSQL database dump" in ddl + assert "-- PostgreSQL database dump complete" in ddl # Check that it doesn't contain health_check and migration traces. # They are only created in system `postgres` database, so by checking