diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 05f1713359..b739e55566 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -240,9 +240,7 @@ impl<'a> Drop for CopyInReader<'a> { impl<'a> Read for CopyInReader<'a> { fn read(&mut self, buf: &mut [u8]) -> io::Result { - loop { - // TODO check if shutdown was requested? - + while !thread_mgr::is_shutdown_requested() { // Return from buffer if nonempty if self.buf_begin < self.buf.len() { let bytes_to_read = std::cmp::min(buf.len(), self.buf.len() - self.buf_begin); @@ -287,6 +285,10 @@ impl<'a> Read for CopyInReader<'a> { } } } + + // Shutting down + // TODO is this fine? + Ok(0) } } @@ -545,19 +547,22 @@ impl PageServerHandler { base_lsn: Lsn, _end_lsn: Lsn, ) -> anyhow::Result<()> { + thread_mgr::associate_with(Some(tenant_id), Some(timeline_id)); let _enter = info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered(); - // TODO thread_mgr::associate_with? // Create empty timeline info!("creating new timeline"); let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; let timeline = repo.create_empty_timeline(timeline_id, Lsn(0))?; - let repartition_distance = repo.get_checkpoint_distance(); // TODO + let repartition_distance = repo.get_checkpoint_distance(); let mut datadir_timeline = DatadirTimeline::::new(timeline, repartition_distance); // TODO mark timeline as not ready until it reaches end_lsn? + // This would prevent compute node from connecting to it and writing conflicting wal. + + // TODO leave clean state on error // Import basebackup provided via CopyData info!("importing basebackup"); @@ -577,18 +582,20 @@ impl PageServerHandler { start_lsn: Lsn, end_lsn: Lsn, ) -> anyhow::Result<()> { + thread_mgr::associate_with(Some(tenant_id), Some(timeline_id)); let _enter = info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered(); - // TODO thread_mgr::associate_with? let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; let timeline = repo.get_timeline_load(timeline_id)?; - let repartition_distance = repo.get_checkpoint_distance(); // TODO + let repartition_distance = repo.get_checkpoint_distance(); let mut datadir_timeline = DatadirTimeline::::new(timeline, repartition_distance); // TODO ensure start_lsn matches current lsn + // TODO leave clean state on error + // Import wal provided via CopyData info!("importing wal"); pgb.write_message(&BeMessage::CopyInResponse)?; @@ -876,6 +883,7 @@ impl postgres_backend::Handler for PageServerHandler { self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("import basebackup ") { + // Import the `base` section (everything but the wal) of a basebackup. let (_, params_raw) = query_string.split_at("import basebackup ".len()); let params = params_raw.split_whitespace().collect::>(); ensure!(params.len() == 4); @@ -889,6 +897,7 @@ impl postgres_backend::Handler for PageServerHandler { self.handle_import_basebackup(pgb, tenant, timeline, base_lsn, end_lsn)?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("import wal ") { + // Import the `pg_wal` section of a basebackup. let (_, params_raw) = query_string.split_at("import wal ".len()); let params = params_raw.split_whitespace().collect::>(); ensure!(params.len() == 4);