From 1757061272a6d1e13ae424532baabc22ef9d2965 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 13 Nov 2023 18:02:40 +0800 Subject: [PATCH] feat: introduce metadata struct of puffin (#2725) * feat: introduce metadata struct of puffin Signed-off-by: Zhenchi * fix: fmt Signed-off-by: Zhenchi * fix: address suggestions Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- Cargo.lock | 9 + Cargo.toml | 1 + src/puffin/Cargo.toml | 10 + src/puffin/src/blob_metadata.rs | 335 ++++++++++++++++++++++++++++++++ src/puffin/src/file_metadata.rs | 143 ++++++++++++++ src/puffin/src/lib.rs | 16 ++ 6 files changed, 514 insertions(+) create mode 100644 src/puffin/Cargo.toml create mode 100644 src/puffin/src/blob_metadata.rs create mode 100644 src/puffin/src/file_metadata.rs create mode 100644 src/puffin/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index f66dc5de72..6d27e075c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6560,6 +6560,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "puffin" +version = "0.4.2" +dependencies = [ + "derive_builder 0.12.0", + "serde", + "serde_json", +] + [[package]] name = "pulldown-cmark" version = "0.9.3" diff --git a/Cargo.toml b/Cargo.toml index 0659941d9c..8fa2b7eb74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ members = [ "src/partition", "src/plugins", "src/promql", + "src/puffin", "src/query", "src/script", "src/servers", diff --git a/src/puffin/Cargo.toml b/src/puffin/Cargo.toml new file mode 100644 index 0000000000..6c1520f350 --- /dev/null +++ b/src/puffin/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "puffin" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +derive_builder.workspace = true +serde.workspace = true +serde_json.workspace = true diff --git a/src/puffin/src/blob_metadata.rs b/src/puffin/src/blob_metadata.rs new file mode 100644 index 0000000000..fd7d106e02 --- /dev/null +++ b/src/puffin/src/blob_metadata.rs @@ -0,0 +1,335 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::fmt; +use std::collections::HashMap; + +use derive_builder::Builder; +use serde::{Deserialize, Serialize}; + +/// Blob metadata of Puffin +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder)] +#[serde(rename_all = "kebab-case")] +pub struct BlobMetadata { + /// Blob type + #[serde(rename = "type")] + pub blob_type: String, + + /// For Iceberg, it' list of field IDs the blob was computed for; + /// the order of items is used to compute sketches stored in the blob. + /// + /// For usage outside the context of Iceberg, it can be ignored. + #[builder(default)] + #[serde(default)] + #[serde(rename = "fields")] + pub input_fields: Vec, + + /// For Iceberg, it's ID of the Iceberg table’s snapshot the blob was computed from. + /// + /// For usage outside the context of Iceberg, it can be ignored. + #[builder(default)] + #[serde(default)] + pub snapshot_id: i64, + + /// For Iceberg, it's sequence number of the Iceberg table’s snapshot the blob was computed from. + /// + /// For usage outside the context of Iceberg, it can be ignored. + #[builder(default)] + #[serde(default)] + pub sequence_number: i64, + + /// The offset in the file where the blob contents start + pub offset: i64, + + /// The length of the blob stored in the file (after compression, if compressed) + pub length: i64, + + /// See [`CompressionCodec`]. If omitted, the data is assumed to be uncompressed. + #[builder(default, setter(strip_option))] + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub compression_codec: Option, + + /// Storage for arbitrary meta-information about the blob + #[builder(default)] + #[serde(default)] + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub properties: HashMap, +} + +/// Compression codec used to compress the blob +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CompressionCodec { + /// Single [LZ4 compression frame](https://github.com/lz4/lz4/blob/77d1b93f72628af7bbde0243b4bba9205c3138d9/doc/lz4_Frame_format.md), + /// with content size present + Lz4, + + /// Single [Zstandard compression frame](https://github.com/facebook/zstd/blob/8af64f41161f6c2e0ba842006fe238c664a6a437/doc/zstd_compression_format.md#zstandard-frames), + /// with content size present + Zstd, +} + +impl fmt::Display for CompressionCodec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + CompressionCodec::Lz4 => write!(f, "lz4"), + CompressionCodec::Zstd => write!(f, "zstd"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_blob_metadata_builder() { + let mut properties = HashMap::new(); + properties.insert("property1".to_string(), "value1".to_string()); + properties.insert("property2".to_string(), "value2".to_string()); + + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .input_fields(vec![1, 2, 3]) + .snapshot_id(100) + .sequence_number(200) + .offset(300) + .length(400) + .compression_codec(CompressionCodec::Lz4) + .properties(properties) + .build() + .unwrap(); + + assert_eq!("type1", blob_metadata.blob_type); + assert_eq!(vec![1, 2, 3], blob_metadata.input_fields); + assert_eq!(100, blob_metadata.snapshot_id); + assert_eq!(200, blob_metadata.sequence_number); + assert_eq!(300, blob_metadata.offset); + assert_eq!(400, blob_metadata.length); + assert_eq!(Some(CompressionCodec::Lz4), blob_metadata.compression_codec); + assert_eq!( + "value1", + blob_metadata.properties.get("property1").unwrap().as_str() + ); + assert_eq!( + "value2", + blob_metadata.properties.get("property2").unwrap().as_str() + ); + } + + #[test] + fn test_blob_metadata_minimal_builder() { + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .offset(300) + .length(400) + .build() + .unwrap(); + + assert_eq!("type1", blob_metadata.blob_type); + assert_eq!(300, blob_metadata.offset); + assert_eq!(400, blob_metadata.length); + assert_eq!(None, blob_metadata.compression_codec); + assert_eq!(0, blob_metadata.properties.len()); + } + + #[test] + fn test_blob_metadata_missing_field() { + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .offset(300) + .build(); + assert_eq!( + blob_metadata.unwrap_err().to_string(), + "`length` must be initialized" + ); + + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .length(400) + .build(); + assert_eq!( + blob_metadata.unwrap_err().to_string(), + "`offset` must be initialized" + ); + + let blob_metadata = BlobMetadataBuilder::default() + .offset(300) + .length(400) + .build(); + assert_eq!( + blob_metadata.unwrap_err().to_string(), + "`blob_type` must be initialized" + ); + } + + #[test] + fn test_serialize_deserialize_blob_metadata_with_properties() { + let mut properties = HashMap::new(); + properties.insert(String::from("key1"), String::from("value1")); + properties.insert(String::from("key2"), String::from("value2")); + + let metadata = BlobMetadata { + blob_type: String::from("test"), + input_fields: vec![1, 2, 3], + snapshot_id: 12345, + sequence_number: 67890, + offset: 100, + length: 200, + compression_codec: Some(CompressionCodec::Lz4), + properties: properties.clone(), + }; + + let json = serde_json::to_string(&metadata).unwrap(); + let deserialized: BlobMetadata = serde_json::from_str(&json).unwrap(); + + assert_eq!(metadata, deserialized); + assert_eq!(properties, deserialized.properties); + } + + #[test] + fn test_serialize_deserialize_blob_metadata_without_compression_codec() { + let metadata = BlobMetadata { + blob_type: String::from("test"), + input_fields: vec![1, 2, 3], + snapshot_id: 12345, + sequence_number: 67890, + offset: 100, + length: 200, + compression_codec: None, + properties: HashMap::new(), + }; + + let expected_json = r#"{"type":"test","fields":[1,2,3],"snapshot-id":12345,"sequence-number":67890,"offset":100,"length":200}"#; + + let json = serde_json::to_string(&metadata).unwrap(); + let deserialized: BlobMetadata = serde_json::from_str(&json).unwrap(); + + assert_eq!(expected_json, json); + assert_eq!(metadata, deserialized); + } + + #[test] + fn test_deserialize_blob_metadata_with_properties() { + let json = r#"{ + "type": "test", + "fields": [1, 2, 3], + "snapshot-id": 12345, + "sequence-number": 67890, + "offset": 100, + "length": 200, + "compression-codec": "lz4", + "properties": { + "key1": "value1", + "key2": "value2" + } + }"#; + + let mut expected_properties = HashMap::new(); + expected_properties.insert(String::from("key1"), String::from("value1")); + expected_properties.insert(String::from("key2"), String::from("value2")); + + let expected = BlobMetadata { + blob_type: String::from("test"), + input_fields: vec![1, 2, 3], + snapshot_id: 12345, + sequence_number: 67890, + offset: 100, + length: 200, + compression_codec: Some(CompressionCodec::Lz4), + properties: expected_properties.clone(), + }; + + let deserialized: BlobMetadata = serde_json::from_str(json).unwrap(); + + assert_eq!(expected, deserialized); + assert_eq!(expected_properties, deserialized.properties); + } + + #[test] + fn test_deserialize_blob_metadata_without_properties() { + let json = r#"{ + "type": "test", + "fields": [1, 2, 3], + "snapshot-id": 12345, + "sequence-number": 67890, + "offset": 100, + "length": 200, + "compression-codec": "lz4" + }"#; + + let expected = BlobMetadata { + blob_type: String::from("test"), + input_fields: vec![1, 2, 3], + snapshot_id: 12345, + sequence_number: 67890, + offset: 100, + length: 200, + compression_codec: Some(CompressionCodec::Lz4), + properties: HashMap::new(), + }; + + let deserialized: BlobMetadata = serde_json::from_str(json).unwrap(); + + assert_eq!(expected, deserialized); + } + + #[test] + fn test_deserialize_blob_metadata_with_empty_properties() { + let json = r#"{ + "type": "test", + "fields": [1, 2, 3], + "snapshot-id": 12345, + "sequence-number": 67890, + "offset": 100, + "length": 200, + "compression-codec": "lz4", + "properties": {} + }"#; + + let expected_properties = HashMap::new(); + let expected = BlobMetadata { + blob_type: String::from("test"), + input_fields: vec![1, 2, 3], + snapshot_id: 12345, + sequence_number: 67890, + offset: 100, + length: 200, + compression_codec: Some(CompressionCodec::Lz4), + properties: expected_properties.clone(), + }; + + let deserialized: BlobMetadata = serde_json::from_str(json).unwrap(); + + assert_eq!(expected, deserialized); + assert_eq!(expected_properties, deserialized.properties); + } + + #[test] + fn test_deserialize_invalid_blob_metadata() { + let invalid_json = r#"{ + "type": "test", + "input-fields": [1, 2, 3], + "snapshot-id": "12345", + "sequence-number": 67890, + "offset": 100, + "length": 200, + "compression-codec": "Invalid", + "properties": {} + }"#; + + assert!(serde_json::from_str::(invalid_json).is_err()); + } +} diff --git a/src/puffin/src/file_metadata.rs b/src/puffin/src/file_metadata.rs new file mode 100644 index 0000000000..74eea3aa08 --- /dev/null +++ b/src/puffin/src/file_metadata.rs @@ -0,0 +1,143 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use derive_builder::Builder; +use serde::{Deserialize, Serialize}; + +use crate::blob_metadata::BlobMetadata; + +/// Metadata of a Puffin file +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder)] +pub struct FileMetadata { + /// Metadata for each blob in the file + #[builder(default)] + pub blobs: Vec, + + /// Storage for arbitrary meta-information, like writer identification/version + #[builder(default)] + #[serde(default)] + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub properties: HashMap, +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + use crate::blob_metadata::BlobMetadataBuilder; + + #[test] + fn test_file_metadata_builder() { + let mut properties = HashMap::new(); + properties.insert(String::from("key1"), String::from("value1")); + + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .offset(10) + .length(30) + .build() + .unwrap(); + + let metadata = FileMetadataBuilder::default() + .blobs(vec![blob_metadata.clone()]) + .properties(properties.clone()) + .build() + .unwrap(); + + assert_eq!(properties, metadata.properties); + assert_eq!(vec![blob_metadata], metadata.blobs); + } + + #[test] + fn test_file_metadata_serialization() { + let mut properties = HashMap::new(); + properties.insert(String::from("key1"), String::from("value1")); + + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .offset(10) + .length(30) + .build() + .unwrap(); + + let metadata = FileMetadataBuilder::default() + .blobs(vec![blob_metadata.clone()]) + .properties(properties.clone()) + .build() + .unwrap(); + + let serialized = serde_json::to_string(&metadata).unwrap(); + assert_eq!( + serialized, + r#"{"blobs":[{"type":"type1","fields":[],"snapshot-id":0,"sequence-number":0,"offset":10,"length":30}],"properties":{"key1":"value1"}}"# + ); + } + + #[test] + fn test_file_metadata_deserialization() { + let data = r#"{"blobs":[{"type":"type1","fields":[],"snapshot-id":0,"sequence-number":0,"offset":10,"length":30}],"properties":{"key1":"value1"}}"#; + let deserialized: FileMetadata = serde_json::from_str(data).unwrap(); + + assert_eq!(deserialized.blobs[0].blob_type, "type1"); + assert_eq!(deserialized.blobs[0].offset, 10); + assert_eq!(deserialized.blobs[0].length, 30); + assert_eq!(deserialized.properties.get("key1").unwrap(), "value1"); + } + + #[test] + fn test_empty_properties_not_serialized() { + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .offset(10) + .length(30) + .build() + .unwrap(); + + let metadata = FileMetadataBuilder::default() + .blobs(vec![blob_metadata.clone()]) + .build() + .unwrap(); + + let serialized = serde_json::to_string(&metadata).unwrap(); + assert_eq!( + serialized, + r#"{"blobs":[{"type":"type1","fields":[],"snapshot-id":0,"sequence-number":0,"offset":10,"length":30}]}"# + ); + } + + #[test] + fn test_empty_blobs_serialization() { + let metadata = FileMetadataBuilder::default() + .blobs(vec![]) + .build() + .unwrap(); + + let serialized = serde_json::to_string(&metadata).unwrap(); + assert_eq!(serialized, r#"{"blobs":[]}"#); + } + + #[test] + fn test_missing_blobs_deserialization() { + let data = r#"{"properties":{"key1":"value1"}}"#; + let deserialized = serde_json::from_str::(data); + + assert!(deserialized + .unwrap_err() + .to_string() + .contains("missing field `blobs`")); + } +} diff --git a/src/puffin/src/lib.rs b/src/puffin/src/lib.rs new file mode 100644 index 0000000000..53ef929514 --- /dev/null +++ b/src/puffin/src/lib.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod blob_metadata; +pub mod file_metadata;