diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index 5f65b17374..d7ffff0483 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -187,10 +187,6 @@ pub async fn worker( let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx)); let rx = rx.map(RequestData::from); - let storage = GenericRemoteStorage::from_config(&remote_storage_config) - .await - .context("remote storage init")?; - let properties = WriterProperties::builder() .set_data_page_size_limit(config.parquet_upload_page_size) .set_compression(config.parquet_upload_compression); @@ -224,18 +220,18 @@ pub async fn worker( let rx_disconnect = futures::stream::poll_fn(move |cx| rx_disconnect.poll_recv(cx)); let rx_disconnect = rx_disconnect.map(RequestData::from); - let storage_disconnect = - GenericRemoteStorage::from_config(&disconnect_events_storage_config) - .await - .context("remote storage for disconnect events init")?; let parquet_config_disconnect = parquet_config.clone(); tokio::try_join!( - worker_inner(storage, rx, parquet_config), - worker_inner(storage_disconnect, rx_disconnect, parquet_config_disconnect) + worker_inner(remote_storage_config, rx, parquet_config), + worker_inner( + disconnect_events_storage_config, + rx_disconnect, + parquet_config_disconnect + ) ) .map(|_| ()) } else { - worker_inner(storage, rx, parquet_config).await + worker_inner(remote_storage_config, rx, parquet_config).await } } @@ -251,18 +247,32 @@ struct ParquetConfig { test_remote_failures: u64, } +impl ParquetConfig { + async fn storage( + &self, + storage_config: &RemoteStorageConfig, + ) -> anyhow::Result { + let storage = GenericRemoteStorage::from_config(storage_config) + .await + .context("remote storage init")?; + + #[cfg(any(test, feature = "testing"))] + if self.test_remote_failures > 0 { + return Ok(GenericRemoteStorage::unreliable_wrapper( + storage, + self.test_remote_failures, + )); + } + + Ok(storage) + } +} + async fn worker_inner( - storage: GenericRemoteStorage, + storage_config: RemoteStorageConfig, rx: impl Stream, config: ParquetConfig, ) -> anyhow::Result<()> { - #[cfg(any(test, feature = "testing"))] - let storage = if config.test_remote_failures > 0 { - GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures) - } else { - storage - }; - let mut rx = std::pin::pin!(rx); let mut rows = Vec::with_capacity(config.rows_per_group); @@ -285,7 +295,7 @@ async fn worker_inner( } if len > config.file_size || force { last_upload = time::Instant::now(); - let file = upload_parquet(w, len, &storage).await?; + let file = upload_parquet(w, len, &storage_config, &config).await?; w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?; len = 0; } @@ -298,7 +308,7 @@ async fn worker_inner( } if !w.flushed_row_groups().is_empty() { - let _rtchk: Writer = upload_parquet(w, len, &storage).await?; + let _rtchk: Writer = upload_parquet(w, len, &storage_config, &config).await?; } Ok(()) @@ -340,7 +350,8 @@ where async fn upload_parquet( mut w: SerializedFileWriter>, len: i64, - storage: &GenericRemoteStorage, + storage_config: &RemoteStorageConfig, + config: &ParquetConfig, ) -> anyhow::Result> { let len_uncompressed = w .flushed_row_groups() @@ -377,6 +388,15 @@ async fn upload_parquet( size, compression, "uploading request parquet file" ); + // A bug in azure-sdk means that the identity-token-file that expires after + // 1 hour is not refreshed. This identity-token is used to fetch the actual azure storage + // tokens that last for 24 hours. After this 24 hour period, azure-sdk tries to refresh + // the storage token, but the identity token has now expired. + // + // + // To work around this, we recreate the storage every time. + let storage = config.storage(storage_config).await?; + let year = now.year(); let month = now.month(); let day = now.day(); @@ -431,8 +451,8 @@ mod tests { use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use remote_storage::{ - GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config, - DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT, + RemoteStorageConfig, RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, + DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT, }; use tokio::sync::mpsc; use tokio::time; @@ -559,12 +579,11 @@ mod tests { timeout: std::time::Duration::from_secs(120), small_timeout: std::time::Duration::from_secs(30), }; - let storage = GenericRemoteStorage::from_config(&remote_storage_config) + + worker_inner(remote_storage_config, rx, config) .await .unwrap(); - worker_inner(storage, rx, config).await.unwrap(); - let mut files = WalkDir::new(tmpdir.as_std_path()) .into_iter() .filter_map(|entry| entry.ok())