From 2c0c7759ee3ad263a099670bdee8b401a68ee24e Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 9 Apr 2024 16:32:24 +0800 Subject: [PATCH] feat: add checksum for checkpoint data (#3651) * feat: add checksum for checkpoint data Signed-off-by: tison * add test Signed-off-by: tison * clippy Signed-off-by: tison * fix: checksum should calculate on uncompressed data Signed-off-by: tison * address comments Signed-off-by: tison --------- Signed-off-by: tison --- Cargo.lock | 26 +++-- src/mito2/Cargo.toml | 1 + src/mito2/src/error.rs | 4 + src/mito2/src/manifest/storage.rs | 126 +++++++++++++++++---- src/mito2/src/manifest/tests/checkpoint.rs | 49 +++++++- 5 files changed, 176 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6aafe1aa22..96e4593582 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2285,9 +2285,9 @@ dependencies = [ [[package]] name = "crc32fast" -version = "1.3.2" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa" dependencies = [ "cfg-if 1.0.0", ] @@ -3777,9 +3777,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.24" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ "bytes", "fnv", @@ -3970,9 +3970,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.12" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" dependencies = [ "bytes", "fnv", @@ -5306,6 +5306,7 @@ dependencies = [ "common-test-util", "common-time", "common-wal", + "crc32fast", "criterion", "datafusion", "datafusion-common", @@ -11424,6 +11425,12 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.89" @@ -11579,11 +11586,12 @@ dependencies = [ [[package]] name = "whoami" -version = "1.4.1" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" +checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9" dependencies = [ - "wasm-bindgen", + "redox_syscall 0.4.1", + "wasite", "web-sys", ] diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 2ddf635693..f3a66c9999 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -31,6 +31,7 @@ common-telemetry.workspace = true common-test-util = { workspace = true, optional = true } common-time.workspace = true common-wal.workspace = true +crc32fast = "1" datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 39c1527e08..926b7638a8 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -575,6 +575,9 @@ pub enum Error { #[snafu(display("Invalid region options, {}", reason))] InvalidRegionOptions { reason: String, location: Location }, + + #[snafu(display("checksum mismatch (actual: {}, expected: {})", actual, expected))] + ChecksumMismatch { actual: u32, expected: u32 }, } pub type Result = std::result::Result; @@ -680,6 +683,7 @@ impl ErrorExt for Error { Upload { .. } => StatusCode::StorageUnavailable, BiError { .. } => StatusCode::Internal, EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal, + ChecksumMismatch { .. } => StatusCode::Unexpected, } } diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 32802c128b..fc9467f65f 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -18,6 +18,7 @@ use std::str::FromStr; use common_datasource::compression::CompressionType; use common_telemetry::debug; +use crc32fast::Hasher; use futures::future::try_join_all; use futures::TryStreamExt; use lazy_static::lazy_static; @@ -29,8 +30,8 @@ use store_api::manifest::ManifestVersion; use tokio::sync::Semaphore; use crate::error::{ - CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu, OpenDalSnafu, Result, - SerdeJsonSnafu, Utf8Snafu, + ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu, + OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu, }; lazy_static! { @@ -46,7 +47,6 @@ const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed; const FETCH_MANIFEST_PARALLELISM: usize = 16; /// Returns the [CompressionType] according to whether to compress manifest files. -#[inline] pub const fn manifest_compress_type(compress: bool) -> CompressionType { if compress { DEFAULT_MANIFEST_COMPRESSION_TYPE @@ -55,17 +55,14 @@ pub const fn manifest_compress_type(compress: bool) -> CompressionType { } } -#[inline] pub fn delta_file(version: ManifestVersion) -> String { format!("{version:020}.json") } -#[inline] pub fn checkpoint_file(version: ManifestVersion) -> String { format!("{version:020}.checkpoint") } -#[inline] pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String { if compress_type == CompressionType::Uncompressed { format!("{}{}", path, file) @@ -74,11 +71,30 @@ pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> Strin } } +fn checkpoint_checksum(data: &[u8]) -> u32 { + let mut hasher = Hasher::new(); + hasher.update(data); + hasher.finalize() +} + +fn verify_checksum(data: &[u8], wanted: Option) -> Result<()> { + if let Some(checksum) = wanted { + let calculated_checksum = checkpoint_checksum(data); + ensure!( + checksum == calculated_checksum, + ChecksumMismatchSnafu { + actual: calculated_checksum, + expected: checksum, + } + ); + } + Ok(()) +} + /// Return's the file manifest version from path /// /// # Panics -/// Panics if the file path is not a valid delta or checkpoint file. -#[inline] +/// If the file path is not a valid delta or checkpoint file. pub fn file_version(path: &str) -> ManifestVersion { let s = path.split('.').next().unwrap(); s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}")) @@ -88,18 +104,15 @@ pub fn file_version(path: &str) -> ManifestVersion { /// /// for example file /// `00000000000000000000.json.gz` -> `CompressionType::GZIP` -#[inline] pub fn file_compress_type(path: &str) -> CompressionType { let s = path.rsplit('.').next().unwrap_or(""); CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed) } -#[inline] pub fn is_delta_file(file_name: &str) -> bool { DELTA_RE.is_match(file_name) } -#[inline] pub fn is_checkpoint_file(file_name: &str) -> bool { CHECKPOINT_RE.is_match(file_name) } @@ -368,6 +381,7 @@ impl ManifestObjectStore { path: &path, })?; let checkpoint_size = data.len(); + let checksum = checkpoint_checksum(bytes); self.object_store .write(&path, data) .await @@ -380,7 +394,7 @@ impl ManifestObjectStore { let checkpoint_metadata = CheckpointMetadata { size: bytes.len(), version, - checksum: None, + checksum: Some(checksum), extend_metadata: HashMap::new(), }; @@ -398,10 +412,11 @@ impl ManifestObjectStore { Ok(()) } - pub async fn load_checkpoint( + async fn load_checkpoint( &mut self, - version: ManifestVersion, + metadata: CheckpointMetadata, ) -> Result)>> { + let version = metadata.version; let path = self.checkpoint_file_path(version); // Due to backward compatibility, it is possible that the user's checkpoint not compressed, // so if we don't find file by compressed type. fall back to checkpoint not compressed find again. @@ -415,6 +430,7 @@ impl ManifestObjectStore { path, }, )?; + verify_checksum(&decompress_data, metadata.checksum)?; // set the checkpoint size self.set_checkpoint_file_size(version, checkpoint_size as u64); Ok(Some(decompress_data)) @@ -441,6 +457,7 @@ impl ManifestObjectStore { compress_type: FALL_BACK_COMPRESS_TYPE, path, })?; + verify_checksum(&decompress_data, metadata.checksum)?; self.set_checkpoint_file_size(version, checkpoint_size as u64); Ok(Some(decompress_data)) } @@ -479,7 +496,7 @@ impl ManifestObjectStore { last_checkpoint_path, checkpoint_metadata ); - self.load_checkpoint(checkpoint_metadata.version).await + self.load_checkpoint(checkpoint_metadata).await } #[cfg(test)] @@ -487,6 +504,54 @@ impl ManifestObjectStore { self.object_store.read(path).await.context(OpenDalSnafu) } + #[cfg(test)] + pub async fn write_last_checkpoint( + &mut self, + version: ManifestVersion, + bytes: &[u8], + ) -> Result<()> { + let path = self.checkpoint_file_path(version); + let data = self + .compress_type + .encode(bytes) + .await + .context(CompressObjectSnafu { + compress_type: self.compress_type, + path: &path, + })?; + + let checkpoint_size = data.len(); + + self.object_store + .write(&path, data) + .await + .context(OpenDalSnafu)?; + + self.set_checkpoint_file_size(version, checkpoint_size as u64); + + let last_checkpoint_path = self.last_checkpoint_path(); + let checkpoint_metadata = CheckpointMetadata { + size: bytes.len(), + version, + checksum: Some(1218259706), + extend_metadata: HashMap::new(), + }; + + debug!( + "Rewrite checkpoint in path: {}, metadata: {:?}", + last_checkpoint_path, checkpoint_metadata + ); + + let bytes = checkpoint_metadata.encode()?; + + // Overwrite the last checkpoint with the modified content + self.object_store + .write(&last_checkpoint_path, bytes.clone()) + .await + .context(OpenDalSnafu)?; + Ok(()) + } + /// Compute the size(Byte) in manifest size map. pub(crate) fn total_manifest_size(&self) -> u64 { self.manifest_size_map.values().sum() @@ -509,7 +574,7 @@ struct CheckpointMetadata { pub size: usize, /// The latest version this checkpoint contains. pub version: ManifestVersion, - pub checksum: Option, + pub checksum: Option, pub extend_metadata: HashMap, } @@ -544,6 +609,15 @@ mod tests { ManifestObjectStore::new("/", object_store, CompressionType::Uncompressed) } + fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata { + CheckpointMetadata { + size: 0, + version, + checksum: None, + extend_metadata: Default::default(), + } + } + #[test] // Define this test mainly to prevent future unintentional changes may break the backward compatibility. fn test_compress_file_path_generation() { @@ -608,7 +682,11 @@ mod tests { //delete (,4) logs and keep checkpoint 3. let _ = log_store.delete_until(4, true).await.unwrap(); - let _ = log_store.load_checkpoint(3).await.unwrap().unwrap(); + let _ = log_store + .load_checkpoint(new_checkpoint_metadata_with_version(3)) + .await + .unwrap() + .unwrap(); let _ = log_store.load_last_checkpoint().await.unwrap().unwrap(); let manifests = log_store.scan(0, 11).await.unwrap(); let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); @@ -621,7 +699,11 @@ mod tests { // delete all logs and checkpoints let _ = log_store.delete_until(11, false).await.unwrap(); - assert!(log_store.load_checkpoint(3).await.unwrap().is_none()); + assert!(log_store + .load_checkpoint(new_checkpoint_metadata_with_version(3)) + .await + .unwrap() + .is_none()); assert!(log_store.load_last_checkpoint().await.unwrap().is_none()); let manifests = log_store.scan(0, 11).await.unwrap(); let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); @@ -656,7 +738,7 @@ mod tests { assert_eq!(v, 5); assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes()); - // write compressed data to stimulate compress alogorithom take effect + // write compressed data to stimulate compress algorithm take effect for v in 5..10 { log_store .save(v, format!("hello, {v}").as_bytes()) @@ -678,7 +760,11 @@ mod tests { assert_eq!(v, version); assert_eq!(format!("hello, {v}").as_bytes(), bytes); } - let (v, checkpoint) = log_store.load_checkpoint(5).await.unwrap().unwrap(); + let (v, checkpoint) = log_store + .load_checkpoint(new_checkpoint_metadata_with_version(5)) + .await + .unwrap() + .unwrap(); assert_eq!(v, 5); assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes()); let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap(); diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index c80becd5ec..cab8b43d0c 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; use std::sync::Arc; use common_datasource::compression::CompressionType; use store_api::storage::RegionId; use strum::IntoEnumIterator; +use crate::error::Error::ChecksumMismatch; use crate::manifest::action::{ RegionCheckpoint, RegionEdit, RegionMetaAction, RegionMetaActionList, }; @@ -150,7 +152,8 @@ async fn manager_with_checkpoint_distance_1() { .await .unwrap(); let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); - let expected_json = "{\"size\":846,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; + let expected_json = + "{\"size\":846,\"version\":9,\"checksum\":1218259706,\"extend_metadata\":{}}"; assert_eq!(expected_json, raw_json); // reopen the manager @@ -159,6 +162,50 @@ async fn manager_with_checkpoint_distance_1() { assert_eq!(10, manager.manifest().await.manifest_version); } +#[tokio::test] +async fn test_corrupted_data_causing_checksum_error() { + // Initialize manager + common_telemetry::init_default_ut_logging(); + let (_env, manager) = build_manager(1, CompressionType::Uncompressed).await; + + // Apply actions + for _ in 0..10 { + manager.update(nop_action()).await.unwrap(); + } + + // Check if there is a checkpoint + assert!(manager + .store() + .await + .load_last_checkpoint() + .await + .unwrap() + .is_some()); + + // Corrupt the last checkpoint data + let mut corrupted_bytes = manager + .store() + .await + .read_file(&manager.store().await.last_checkpoint_path()) + .await + .unwrap(); + corrupted_bytes[0] ^= 1; + + // Overwrite the latest checkpoint data + manager + .store() + .await + .write_last_checkpoint(9, &corrupted_bytes) + .await + .unwrap(); + + // Attempt to load the corrupted checkpoint + let load_corrupted_result = manager.store().await.load_last_checkpoint().await; + + // Check if the result is an error and if it's of type VerifyChecksum + assert_matches!(load_corrupted_result, Err(ChecksumMismatch { .. })); +} + #[tokio::test] async fn checkpoint_with_different_compression_types() { common_telemetry::init_default_ut_logging();