fixup! move region to the parquet upload task, and not as part of the request context itself

This commit is contained in:
Conrad Ludgate
2025-06-07 19:54:03 +01:00
parent e78254657a
commit 1f62ee5f5c

View File

@@ -233,17 +233,17 @@ pub async fn worker(
.context("remote storage for disconnect events init")?;
let parquet_config_disconnect = parquet_config.clone();
tokio::try_join!(
worker_inner(storage, rx, parquet_config, region.clone()),
worker_inner(storage, rx, parquet_config, &region),
worker_inner(
storage_disconnect,
rx_disconnect,
parquet_config_disconnect,
region
&region
)
)
.map(|_| ())
} else {
worker_inner(storage, rx, parquet_config, region).await
worker_inner(storage, rx, parquet_config, &region).await
}
}
@@ -263,7 +263,7 @@ async fn worker_inner(
storage: GenericRemoteStorage,
rx: impl Stream<Item = RequestData>,
config: ParquetConfig,
region: String,
region: &str,
) -> anyhow::Result<()> {
#[cfg(any(test, feature = "testing"))]
let storage = if config.test_remote_failures > 0 {
@@ -285,7 +285,7 @@ async fn worker_inner(
let mut len = 0;
while let Some(mut row) = rx.next().await {
row.region = region.clone();
region.clone_into(&mut row.region);
rows.push(row);
let force = last_upload.elapsed() > config.max_duration;
if rows.len() == config.rows_per_group || force {
@@ -573,7 +573,7 @@ mod tests {
.await
.unwrap();
worker_inner(storage, rx, config, "us-east-1".to_string())
worker_inner(storage, rx, config, "us-east-1")
.await
.unwrap();