diff --git a/Cargo.lock b/Cargo.lock index 9e8af77dc0..16d8ef48c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,16 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "aead" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" +dependencies = [ + "crypto-common", + "generic-array", +] + [[package]] name = "aes" version = "0.8.4" @@ -28,6 +38,22 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "aes-siv" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e08d0cdb774acd1e4dac11478b1a0c0d203134b2aab0ba25eb430de9b18f8b9" +dependencies = [ + "aead", + "aes", + "cipher", + "cmac", + "ctr", + "dbl", + "digest", + "zeroize", +] + [[package]] name = "ahash" version = "0.7.8" @@ -713,6 +739,15 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "ascii-canvas" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef1e3e699d84ab1b0911a1010c5c106aa34ae89aeac103be5ce0c3859db1e891" +dependencies = [ + "term", +] + [[package]] name = "async-broadcast" version = "0.7.1" @@ -1175,6 +1210,21 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base16" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d27c3610c36aee21ce8ac510e6224498de4228ad772a171ed65643a24693a5a8" + +[[package]] +name = "base62" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10e52a7bcb1d6beebee21fb5053af9e3cbb7a7ed1a4909e534040e676437ab1f" +dependencies = [ + "rustversion", +] + [[package]] name = "base64" version = "0.13.1" @@ -1193,6 +1243,16 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" @@ -1261,6 +1321,21 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "bit-set" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08807e080ed7f9d5433fa9b275196cfc35414f66a0c79d864dc51a0d825231a3" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" + [[package]] name = "bitflags" version = "1.3.2" @@ -1632,6 +1707,15 @@ dependencies = [ "nom", ] +[[package]] +name = "cfb-mode" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "738b8d467867f80a71351933f70461f5b56f24d5c93e0cf216e59229c968d330" +dependencies = [ + "cipher", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -1674,6 +1758,40 @@ dependencies = [ "num-traits", ] +[[package]] +name = "chacha20" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3613f74bd2eac03dad61bd53dbe620703d4371614fe0bc3b9f04dd36fe4e818" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + +[[package]] +name = "chacha20poly1305" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10cd79432192d1c0f4e1a0fef9527696cc039165d729fb41b3f4f4f354c2dc35" +dependencies = [ + "aead", + "chacha20", + "cipher", + "poly1305", + "zeroize", +] + +[[package]] +name = "charset" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1f927b07c74ba84c7e5fe4db2baeb3e996ab2688992e39ac68ce3220a677c7e" +dependencies = [ + "base64 0.22.1", + "encoding_rs", +] + [[package]] name = "chrono" version = "0.4.38" @@ -1737,6 +1855,12 @@ dependencies = [ "half", ] +[[package]] +name = "cidr" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd1b64030216239a2e7c364b13cd96a2097ebf0dfe5025f2dedee14a23f2ab60" + [[package]] name = "cipher" version = "0.4.4" @@ -1745,6 +1869,7 @@ checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" dependencies = [ "crypto-common", "inout", + "zeroize", ] [[package]] @@ -1922,6 +2047,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "cmac" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8543454e3c3f5126effff9cd44d562af4e31fb8ce1cc0d3dcd8f084515dbc1aa" +dependencies = [ + "cipher", + "dbl", + "digest", +] + [[package]] name = "cmake" version = "0.1.51" @@ -2004,6 +2140,17 @@ dependencies = [ "tracing-appender", ] +[[package]] +name = "codespan-reporting" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe6d2e5af09e8c8ad56c969f2157a3d4238cebc7c55f0a517728c38f7b200f81" +dependencies = [ + "serde", + "termcolor", + "unicode-width", +] + [[package]] name = "colorchoice" version = "1.0.2" @@ -2610,6 +2757,20 @@ dependencies = [ "serde", ] +[[package]] +name = "community-id" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9e701443040497976ce85ba641ef0c4a6b319307b9d93718fc76bb77540bff" +dependencies = [ + "anyhow", + "base64 0.21.7", + "hex", + "lazy_static", + "num_enum 0.6.1", + "sha1", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -2738,6 +2899,15 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" +[[package]] +name = "convert_case" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb402b8d4c85569410425650ce3eddc7d698ed96d39a73f941b08fb63082f1e7" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -2945,9 +3115,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ "generic-array", + "rand_core 0.6.4", "typenum", ] +[[package]] +name = "crypto_secretbox" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d6cf87adf719ddf43a805e92c6870a531aedda35ff640442cbaf8674e141e1" +dependencies = [ + "aead", + "cipher", + "generic-array", + "poly1305", + "salsa20", + "subtle", + "zeroize", +] + [[package]] name = "csv" version = "1.3.0" @@ -2969,6 +3155,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctr" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" +dependencies = [ + "cipher", +] + [[package]] name = "darling" version = "0.14.4" @@ -3600,6 +3795,15 @@ dependencies = [ "sqlparser_derive 0.1.1", ] +[[package]] +name = "dbl" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd2735a791158376708f9347fe8faba9667589d82427ef3aed6794a8981de3d9" +dependencies = [ + "generic-array", +] + [[package]] name = "deadpool" version = "0.12.2" @@ -3874,6 +4078,18 @@ dependencies = [ "const-random", ] +[[package]] +name = "dns-lookup" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" +dependencies = [ + "cfg-if", + "libc", + "socket2", + "windows-sys 0.48.0", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -3889,6 +4105,24 @@ dependencies = [ "litrs", ] +[[package]] +name = "domain" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c84070523f8ba0f9127ff156920f27eb27b302b425efe60bf5f41ec244d1c60" +dependencies = [ + "bytes", + "futures-util", + "moka", + "octseq", + "rand 0.8.5", + "serde", + "smallvec", + "time", + "tokio", + "tracing", +] + [[package]] name = "dotenv" version = "0.15.0" @@ -3961,6 +4195,15 @@ dependencies = [ "serde", ] +[[package]] +name = "ena" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d248bdd43ce613d87415282f69b9bb99d947d290b10962dd6c56233312c2ad5" +dependencies = [ + "log", +] + [[package]] name = "encode_unicode" version = "0.3.6" @@ -3969,9 +4212,9 @@ checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" [[package]] name = "encoding_rs" -version = "0.8.34" +version = "0.8.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" dependencies = [ "cfg-if", ] @@ -4142,6 +4385,17 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" +[[package]] +name = "fancy-regex" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e24cb5a94bcae1e5408b0effca5cd7172ea3c5755049c5f3af4cd283a165298" +dependencies = [ + "bit-set", + "regex-automata 0.4.8", + "regex-syntax 0.8.5", +] + [[package]] name = "fast-float" version = "0.2.0" @@ -4256,11 +4510,12 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.34" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" +checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" dependencies = [ "crc32fast", + "libz-rs-sys", "libz-sys", "miniz_oxide", ] @@ -4743,6 +4998,7 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ "typenum", "version_check", + "zeroize", ] [[package]] @@ -4887,6 +5143,16 @@ dependencies = [ "tonic-build 0.12.3", ] +[[package]] +name = "grok" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "273797968160270573071022613fc4aa28b91fe68f3eef6c96a1b2a1947ddfbd" +dependencies = [ + "glob", + "onig", +] + [[package]] name = "h2" version = "0.3.26" @@ -5137,6 +5403,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "hostname" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" +dependencies = [ + "cfg-if", + "libc", + "windows-link", +] + [[package]] name = "htmlescape" version = "0.3.1" @@ -5686,6 +5963,12 @@ dependencies = [ "serde", ] +[[package]] +name = "indoc" +version = "2.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd" + [[package]] name = "inferno" version = "0.11.21" @@ -5726,6 +6009,19 @@ dependencies = [ "str_stack", ] +[[package]] +name = "influxdb-line-protocol" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22fa7ee6be451ea0b1912b962c91c8380835e97cf1584a77e18264e908448dcb" +dependencies = [ + "bytes", + "log", + "nom", + "smallvec", + "snafu 0.7.5", +] + [[package]] name = "influxdb_line_protocol" version = "0.1.0" @@ -6044,6 +6340,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "keccak" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" +dependencies = [ + "cpufeatures", +] + [[package]] name = "keyed_priority_queue" version = "0.4.2" @@ -6182,6 +6487,37 @@ dependencies = [ "tracing", ] +[[package]] +name = "lalrpop" +version = "0.22.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba4ebbd48ce411c1d10fb35185f5a51a7bfa3d8b24b4e330d30c9e3a34129501" +dependencies = [ + "ascii-canvas", + "bit-set", + "ena", + "itertools 0.14.0", + "lalrpop-util", + "petgraph 0.7.1", + "regex", + "regex-syntax 0.8.5", + "sha3", + "string_cache", + "term", + "unicode-xid", + "walkdir", +] + +[[package]] +name = "lalrpop-util" +version = "0.22.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5baa5e9ff84f1aefd264e6869907646538a52147a755d494517a8007fb48733" +dependencies = [ + "regex-automata 0.4.8", + "rustversion", +] + [[package]] name = "lattices" version = "0.6.1" @@ -6377,6 +6713,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libz-rs-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6489ca9bd760fe9642d7644e827b0c9add07df89857b0416ee15c1cc1a3b8c5a" +dependencies = [ + "zlib-rs", +] + [[package]] name = "libz-sys" version = "1.1.20" @@ -6939,9 +7284,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.8.0" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a" dependencies = [ "adler2", ] @@ -7373,6 +7718,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "new_debug_unreachable" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" + [[package]] name = "nix" version = "0.25.1" @@ -7412,6 +7763,12 @@ dependencies = [ "libc", ] +[[package]] +name = "nohash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0f889fb66f7acdf83442c35775764b51fed3c606ab9cee51500dbde2cf528ca" + [[package]] name = "nom" version = "7.1.3" @@ -7603,6 +7960,15 @@ dependencies = [ "num_enum_derive 0.5.11", ] +[[package]] +name = "num_enum" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a015b430d3c108a207fd776d2e2196aaf8b1cf8cf93253e3a097ff3085076a1" +dependencies = [ + "num_enum_derive 0.6.1", +] + [[package]] name = "num_enum" version = "0.7.3" @@ -7624,6 +7990,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "num_enum_derive" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96667db765a921f7b295ffee8b60472b686a51d4f21c2ee4ffdb94c7013b65a6" +dependencies = [ + "proc-macro-crate 1.3.1", + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "num_enum_derive" version = "0.7.3" @@ -7719,6 +8097,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "octseq" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "126c3ca37c9c44cec575247f43a3e4374d8927684f129d2beeb0d2cef262fe12" +dependencies = [ + "bytes", + "serde", + "smallvec", +] + +[[package]] +name = "ofb" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cc40678e045ff4eb1666ea6c0f994b133c31f673c09aed292261b6d5b6963a0" +dependencies = [ + "cipher", +] + [[package]] name = "once_cell" version = "1.20.1" @@ -7734,12 +8132,40 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e296cf87e61c9cfc1a61c3c63a0f7f286ed4554e0e22be84e8a38e1d264a2a29" +[[package]] +name = "onig" +version = "6.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "336b9c63443aceef14bea841b899035ae3abe89b7c486aaf4c5bd8aafedac3f0" +dependencies = [ + "bitflags 2.9.0", + "libc", + "once_cell", + "onig_sys", +] + +[[package]] +name = "onig_sys" +version = "69.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7f86c6eef3d6df15f23bcfb6af487cbd2fed4e5581d58d5bf1f5f8b7f6727dc" +dependencies = [ + "cc", + "pkg-config", +] + [[package]] name = "oorandom" version = "11.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" +[[package]] +name = "opaque-debug" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" + [[package]] name = "opendal" version = "0.52.0" @@ -8101,6 +8527,12 @@ dependencies = [ "tonic-build 0.12.3", ] +[[package]] +name = "outref" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" + [[package]] name = "overload" version = "0.1.1" @@ -8229,6 +8661,12 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "parse-size" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487f2ccd1e17ce8c1bfab3a65c89525af41cfad4c8659021a1e9a2aacd73b89b" + [[package]] name = "parse-zoneinfo" version = "0.3.1" @@ -8330,6 +8768,12 @@ dependencies = [ "hmac", ] +[[package]] +name = "peeking_take_while" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e9ed2178b0575fff8e1b83b58ba6f75e727aafac2e1b6c795169ad3b17eb518" + [[package]] name = "pem" version = "3.0.4" @@ -8577,6 +9021,7 @@ dependencies = [ "table", "tokio", "urlencoding", + "vrl", "yaml-rust", ] @@ -8680,6 +9125,17 @@ dependencies = [ "snafu 0.8.5", ] +[[package]] +name = "poly1305" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf" +dependencies = [ + "cpufeatures", + "opaque-debug", + "universal-hash", +] + [[package]] name = "portable-atomic" version = "1.9.0" @@ -8777,6 +9233,12 @@ dependencies = [ "zerocopy 0.7.35", ] +[[package]] +name = "precomputed-hash" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" + [[package]] name = "predicates" version = "3.1.3" @@ -9109,6 +9571,17 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "prost-reflect" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5edd582b62f5cde844716e66d92565d7faf7ab1445c8cebce6e00fba83ddb2" +dependencies = [ + "once_cell", + "prost 0.13.5", + "prost-types 0.13.5", +] + [[package]] name = "prost-types" version = "0.11.9" @@ -9176,6 +9649,21 @@ dependencies = [ "autotools", ] +[[package]] +name = "psl" +version = "2.1.112" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c6b4c497a0c6bfb466f75167c728b1a861b0cdc39de9c35b877208a270a9590" +dependencies = [ + "psl-types", +] + +[[package]] +name = "psl-types" +version = "2.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac" + [[package]] name = "psm" version = "0.1.24" @@ -9205,6 +9693,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "publicsuffix" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42ea446cab60335f76979ec15e12619a2165b5ae2c12166bef27d283a9fadf" +dependencies = [ + "idna", + "psl-types", +] + [[package]] name = "puffin" version = "0.15.0" @@ -9424,6 +9922,12 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a206a30ce37189d1340e7da2ee0b4d65e342590af676541c23a4f3959ba272e" +[[package]] +name = "quoted_printable" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "640c9bd8497b02465aeef5375144c26062e0dcd5939dfcbb0f5db76cb8c17c73" + [[package]] name = "r-efi" version = "5.2.0" @@ -9668,6 +10172,20 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "regex-filtered" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c11639076bf147be211b90e47790db89f4c22b6c8a9ca6e960833869da67166" +dependencies = [ + "aho-corasick", + "indexmap 2.9.0", + "itertools 0.13.0", + "nohash", + "regex", + "regex-syntax 0.8.5", +] + [[package]] name = "regex-lite" version = "0.1.6" @@ -9891,6 +10409,12 @@ dependencies = [ "serde", ] +[[package]] +name = "roxmltree" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c20b6793b5c2fa6553b250154b78d6d0db37e72700ae35fad9387a46f487c97" + [[package]] name = "rsa" version = "0.9.6" @@ -10623,7 +11147,7 @@ dependencies = [ "futures", "futures-util", "headers", - "hostname", + "hostname 0.3.1", "http 1.1.0", "http-body 1.0.1", "humantime", @@ -10714,6 +11238,17 @@ dependencies = [ "sql", ] +[[package]] +name = "sha-1" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha1" version = "0.10.6" @@ -10736,6 +11271,16 @@ dependencies = [ "digest", ] +[[package]] +name = "sha3" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75872d278a8f37ef87fa0ddbda7802605cb18344497949862c0d4dcb291eba60" +dependencies = [ + "digest", + "keccak", +] + [[package]] name = "shadow-rs" version = "1.1.1" @@ -11430,6 +11975,18 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a8348af2d9fc3258c8733b8d9d8db2e56f54b2363a4b5b81585c7875ed65e65" +[[package]] +name = "string_cache" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f" +dependencies = [ + "new_debug_unreachable", + "parking_lot 0.12.3", + "phf_shared", + "precomputed-hash", +] + [[package]] name = "stringprep" version = "0.1.5" @@ -11441,6 +11998,15 @@ dependencies = [ "unicode-properties", ] +[[package]] +name = "strip-ansi-escapes" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a8f8038e7e7969abb3f1b7c2a811225e9296da208539e0f79c5251d6cac0025" +dependencies = [ + "vte", +] + [[package]] name = "strsim" version = "0.10.0" @@ -11696,6 +12262,16 @@ dependencies = [ "windows 0.57.0", ] +[[package]] +name = "syslog_loose" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "161028c00842709450114c39db3b29f44c898055ed8833bb9b535aba7facf30e" +dependencies = [ + "chrono", + "nom", +] + [[package]] name = "table" version = "0.15.0" @@ -11932,6 +12508,16 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "term" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a984c8d058c627faaf5e8e2ed493fa3c51771889196de1016cf9c1c6e90d750" +dependencies = [ + "home", + "windows-sys 0.59.0", +] + [[package]] name = "termcolor" version = "1.4.1" @@ -12973,6 +13559,17 @@ dependencies = [ "tz-rs", ] +[[package]] +name = "ua-parser" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7176a413a0b7e94926d11a2054c6db5ac7fa42bf4ebe7e9571152e3f024ddfd" +dependencies = [ + "regex", + "regex-filtered", + "serde", +] + [[package]] name = "ucd-trie" version = "0.1.7" @@ -13050,6 +13647,16 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "universal-hash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" +dependencies = [ + "crypto-common", + "subtle", +] + [[package]] name = "unsafe-libyaml" version = "0.2.11" @@ -13091,6 +13698,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcfc827f90e53a02eaef5e535ee14266c1d569214c6aa70133a624d8a3164ba" +[[package]] +name = "utf8-width" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86bd8d4e895da8537e5315b8254664e6b769c4ff3db18321b297a1e7004392e3" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -13190,6 +13803,115 @@ dependencies = [ "serde", ] +[[package]] +name = "vrl" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9ceadaa40aef567a26079ff014ca7a567ba85344f1b81090b5ec7d7bb16a219" +dependencies = [ + "aes", + "aes-siv", + "base16", + "base62", + "base64-simd", + "bytes", + "cbc", + "cfb-mode", + "cfg-if", + "chacha20poly1305", + "charset", + "chrono", + "chrono-tz", + "ciborium", + "cidr", + "clap 4.5.19", + "codespan-reporting", + "community-id", + "convert_case", + "crc", + "crypto_secretbox", + "csv", + "ctr", + "digest", + "dns-lookup", + "domain", + "dyn-clone", + "encoding_rs", + "fancy-regex", + "flate2", + "grok", + "hex", + "hmac", + "hostname 0.4.1", + "iana-time-zone", + "idna", + "indexmap 2.9.0", + "indoc", + "influxdb-line-protocol", + "itertools 0.14.0", + "lalrpop", + "lalrpop-util", + "lz4_flex", + "md-5", + "nom", + "ofb", + "onig", + "ordered-float 4.3.0", + "parse-size", + "peeking_take_while", + "percent-encoding", + "pest", + "pest_derive", + "prost 0.13.5", + "prost-reflect", + "psl", + "psl-types", + "publicsuffix", + "quoted_printable", + "rand 0.8.5", + "regex", + "roxmltree", + "rust_decimal", + "seahash", + "serde", + "serde_json", + "serde_yaml", + "sha-1", + "sha2", + "sha3", + "simdutf8", + "snafu 0.8.5", + "snap", + "strip-ansi-escapes", + "syslog_loose", + "termcolor", + "thiserror 2.0.12", + "tokio", + "tracing", + "ua-parser", + "unicode-segmentation", + "url", + "utf8-width", + "uuid", + "woothee", + "zstd 0.13.2", +] + +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + +[[package]] +name = "vte" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "231fdcd7ef3037e8330d8e17e61011a2c244126acc0a982f4040ac3f9f0bc077" +dependencies = [ + "memchr", +] + [[package]] name = "walkdir" version = "2.5.0" @@ -13512,6 +14234,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "windows-link" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" + [[package]] name = "windows-registry" version = "0.2.0" @@ -13738,6 +14466,16 @@ dependencies = [ "thiserror 1.0.64", ] +[[package]] +name = "woothee" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "896174c6a4779d4d7d4523dd27aef7d46609eda2497e370f6c998325c6bf6971" +dependencies = [ + "lazy_static", + "regex", +] + [[package]] name = "write16" version = "1.0.0" @@ -13941,6 +14679,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "zlib-rs" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "868b928d7949e09af2f6086dfc1e01936064cc7a819253bce650d4e2a2d63ba8" + [[package]] name = "zstd" version = "0.11.2+zstd.1.5.2" diff --git a/src/pipeline/Cargo.toml b/src/pipeline/Cargo.toml index f022e81f6f..aa72850887 100644 --- a/src/pipeline/Cargo.toml +++ b/src/pipeline/Cargo.toml @@ -59,6 +59,7 @@ sql.workspace = true table.workspace = true tokio.workspace = true urlencoding = "2.1" +vrl = "0.24" yaml-rust = "0.4" [dev-dependencies] diff --git a/src/pipeline/benches/processor.rs b/src/pipeline/benches/processor.rs index 011f492b63..e9aefefb63 100644 --- a/src/pipeline/benches/processor.rs +++ b/src/pipeline/benches/processor.rs @@ -24,9 +24,9 @@ fn processor_mut( let mut result = Vec::with_capacity(input_values.len()); for v in input_values { - let mut payload = json_to_map(v).unwrap(); + let payload = json_to_map(v).unwrap(); let r = pipeline - .exec_mut(&mut payload)? + .exec_mut(payload)? .into_transformed() .expect("expect transformed result "); result.push(r.0); diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index 60ac3c945c..e002030a89 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -686,6 +686,54 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to compile VRL, {}", msg))] + CompileVrl { + msg: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to execute VRL, {}", msg))] + ExecuteVrl { + msg: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Float is not a number: {}", input_float))] + FloatNaN { + input_float: f64, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Invalid timestamp value: {}", input))] + InvalidTimestamp { + input: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to convert bytes to utf8"))] + BytesToUtf8 { + #[snafu(source)] + error: std::string::FromUtf8Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Please don't use regex in Vrl script"))] + VrlRegexValue { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Vrl script should return `.` in the end"))] + VrlReturnValue { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to cast type, msg: {}", msg))] CastType { msg: String, @@ -866,6 +914,13 @@ impl ErrorExt for Error { | ReachedMaxNestedLevels { .. } | RequiredTableSuffixTemplate | InvalidTableSuffixTemplate { .. } + | CompileVrl { .. } + | ExecuteVrl { .. } + | FloatNaN { .. } + | BytesToUtf8 { .. } + | InvalidTimestamp { .. } + | VrlRegexValue { .. } + | VrlReturnValue { .. } | PipelineMissing { .. } => StatusCode::InvalidArguments, } } diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 9f13a45317..eb48610123 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -156,7 +156,7 @@ impl DispatchedTo { pub enum PipelineExecOutput { Transformed(TransformedOutput), AutoTransform(AutoTransformOutput), - DispatchedTo(DispatchedTo), + DispatchedTo(DispatchedTo, PipelineMap), } #[derive(Debug)] @@ -164,6 +164,7 @@ pub struct TransformedOutput { pub opt: String, pub row: Row, pub table_suffix: Option, + pub pipeline_map: PipelineMap, } #[derive(Debug)] @@ -171,6 +172,7 @@ pub struct AutoTransformOutput { pub table_suffix: Option, // ts_column_name -> unit pub ts_unit_map: HashMap, + pub pipeline_map: PipelineMap, } impl PipelineExecOutput { @@ -188,7 +190,7 @@ impl PipelineExecOutput { // Note: This is a test only function, do not use it in production. pub fn into_dispatched(self) -> Option { - if let Self::DispatchedTo(d) = self { + if let Self::DispatchedTo(d, _) = self { Some(d) } else { None @@ -231,30 +233,31 @@ pub fn simd_json_array_to_map(val: Vec) -> Result Result { + pub fn exec_mut(&self, mut val: PipelineMap) -> Result { // process for processor in self.processors.iter() { - processor.exec_mut(val)?; + val = processor.exec_mut(val)?; } // dispatch, fast return if matched - if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(val)) { - return Ok(PipelineExecOutput::DispatchedTo(rule.into())); + if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) { + return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val)); } if let Some(transformer) = self.transformer() { - let (opt, row) = transformer.transform_mut(val)?; - let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val)); + let (opt, row) = transformer.transform_mut(&mut val)?; + let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(&val)); Ok(PipelineExecOutput::Transformed(TransformedOutput { opt, row, table_suffix, + pipeline_map: val, })) } else { - let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val)); + let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(&val)); let mut ts_unit_map = HashMap::with_capacity(4); // get all ts values - for (k, v) in val { + for (k, v) in val.iter() { if let Value::Timestamp(ts) = v { if !ts_unit_map.contains_key(k) { ts_unit_map.insert(k.clone(), ts.get_unit()); @@ -264,6 +267,7 @@ impl Pipeline { Ok(PipelineExecOutput::AutoTransform(AutoTransformOutput { table_suffix, ts_unit_map, + pipeline_map: val, })) } } @@ -318,9 +322,9 @@ transform: type: uint32 "#; let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); - let mut payload = json_to_map(input_value).unwrap(); + let payload = json_to_map(input_value).unwrap(); let result = pipeline - .exec_mut(&mut payload) + .exec_mut(payload) .unwrap() .into_transformed() .unwrap(); @@ -371,7 +375,7 @@ transform: let mut payload = PipelineMap::new(); payload.insert("message".to_string(), Value::String(message)); let result = pipeline - .exec_mut(&mut payload) + .exec_mut(payload) .unwrap() .into_transformed() .unwrap(); @@ -446,9 +450,9 @@ transform: "#; let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); - let mut payload = json_to_map(input_value).unwrap(); + let payload = json_to_map(input_value).unwrap(); let result = pipeline - .exec_mut(&mut payload) + .exec_mut(payload) .unwrap() .into_transformed() .unwrap(); @@ -488,10 +492,10 @@ transform: let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); let schema = pipeline.schemas().unwrap().clone(); - let mut result = json_to_map(input_value).unwrap(); + let result = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(&mut result) + .exec_mut(result) .unwrap() .into_transformed() .unwrap(); diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index 5413dcf379..02681d86fe 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -29,6 +29,7 @@ pub mod select; pub mod simple_extract; pub mod timestamp; pub mod urlencoding; +pub mod vrl; use std::str::FromStr; @@ -58,6 +59,7 @@ use crate::etl::field::{Field, Fields}; use crate::etl::processor::json_parse::JsonParseProcessor; use crate::etl::processor::select::SelectProcessor; use crate::etl::processor::simple_extract::SimpleExtractProcessor; +use crate::etl::processor::vrl::VrlProcessor; use crate::etl::PipelineMap; const FIELD_NAME: &str = "field"; @@ -123,7 +125,7 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static { fn ignore_missing(&self) -> bool; /// Execute the processor on a vector which be preprocessed by the pipeline - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()>; + fn exec_mut(&self, val: PipelineMap) -> Result; } #[derive(Debug)] @@ -146,6 +148,7 @@ pub enum ProcessorKind { Decolorize(DecolorizeProcessor), Digest(DigestProcessor), Select(SelectProcessor), + Vrl(VrlProcessor), } #[derive(Debug, Default)] @@ -227,6 +230,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result { json_parse::PROCESSOR_JSON_PARSE => { ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?) } + vrl::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?), select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?), _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(), }; diff --git a/src/pipeline/src/etl/processor/cmcd.rs b/src/pipeline/src/etl/processor/cmcd.rs index 57e0695098..9959bb6a96 100644 --- a/src/pipeline/src/etl/processor/cmcd.rs +++ b/src/pipeline/src/etl/processor/cmcd.rs @@ -249,7 +249,7 @@ impl Processor for CmcdProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let name = field.input_field(); @@ -277,7 +277,7 @@ impl Processor for CmcdProcessor { } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/csv.rs b/src/pipeline/src/etl/processor/csv.rs index bef4655033..720faf1c87 100644 --- a/src/pipeline/src/etl/processor/csv.rs +++ b/src/pipeline/src/etl/processor/csv.rs @@ -189,7 +189,7 @@ impl Processor for CsvProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let name = field.input_field(); @@ -216,7 +216,7 @@ impl Processor for CsvProcessor { } } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/date.rs b/src/pipeline/src/etl/processor/date.rs index 1ba4dcd605..4756dcba7f 100644 --- a/src/pipeline/src/etl/processor/date.rs +++ b/src/pipeline/src/etl/processor/date.rs @@ -194,7 +194,7 @@ impl Processor for DateProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -221,7 +221,7 @@ impl Processor for DateProcessor { } } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/decolorize.rs b/src/pipeline/src/etl/processor/decolorize.rs index 69053c8743..0b334dc6a2 100644 --- a/src/pipeline/src/etl/processor/decolorize.rs +++ b/src/pipeline/src/etl/processor/decolorize.rs @@ -102,7 +102,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -122,7 +122,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor { } } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/digest.rs b/src/pipeline/src/etl/processor/digest.rs index 334bfd6e63..e5ea493689 100644 --- a/src/pipeline/src/etl/processor/digest.rs +++ b/src/pipeline/src/etl/processor/digest.rs @@ -201,7 +201,7 @@ impl crate::etl::processor::Processor for DigestProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -221,7 +221,7 @@ impl crate::etl::processor::Processor for DigestProcessor { } } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/dissect.rs b/src/pipeline/src/etl/processor/dissect.rs index 8034d984d2..c72de8bd40 100644 --- a/src/pipeline/src/etl/processor/dissect.rs +++ b/src/pipeline/src/etl/processor/dissect.rs @@ -601,7 +601,7 @@ impl Processor for DissectProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -629,7 +629,7 @@ impl Processor for DissectProcessor { } } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/epoch.rs b/src/pipeline/src/etl/processor/epoch.rs index 32124f92f4..bbaa8df7a5 100644 --- a/src/pipeline/src/etl/processor/epoch.rs +++ b/src/pipeline/src/etl/processor/epoch.rs @@ -163,7 +163,7 @@ impl Processor for EpochProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -183,7 +183,7 @@ impl Processor for EpochProcessor { } } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/gsub.rs b/src/pipeline/src/etl/processor/gsub.rs index 519a6d5f8c..1f46856d2e 100644 --- a/src/pipeline/src/etl/processor/gsub.rs +++ b/src/pipeline/src/etl/processor/gsub.rs @@ -118,7 +118,7 @@ impl crate::etl::processor::Processor for GsubProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -138,7 +138,7 @@ impl crate::etl::processor::Processor for GsubProcessor { } } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/join.rs b/src/pipeline/src/etl/processor/join.rs index e70007ca95..74de9e99ae 100644 --- a/src/pipeline/src/etl/processor/join.rs +++ b/src/pipeline/src/etl/processor/join.rs @@ -95,7 +95,7 @@ impl Processor for JoinProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -123,7 +123,7 @@ impl Processor for JoinProcessor { } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/json_parse.rs b/src/pipeline/src/etl/processor/json_parse.rs index 7361ee9151..ef3d021b5c 100644 --- a/src/pipeline/src/etl/processor/json_parse.rs +++ b/src/pipeline/src/etl/processor/json_parse.rs @@ -97,7 +97,7 @@ impl Processor for JsonParseProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -117,7 +117,7 @@ impl Processor for JsonParseProcessor { } } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/json_path.rs b/src/pipeline/src/etl/processor/json_path.rs index b403a132fa..c33af79ffe 100644 --- a/src/pipeline/src/etl/processor/json_path.rs +++ b/src/pipeline/src/etl/processor/json_path.rs @@ -125,7 +125,7 @@ impl Processor for JsonPathProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -145,7 +145,7 @@ impl Processor for JsonPathProcessor { } } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/letter.rs b/src/pipeline/src/etl/processor/letter.rs index 7e439b59e3..1bc94b2c20 100644 --- a/src/pipeline/src/etl/processor/letter.rs +++ b/src/pipeline/src/etl/processor/letter.rs @@ -126,7 +126,7 @@ impl Processor for LetterProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -154,7 +154,7 @@ impl Processor for LetterProcessor { } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/regex.rs b/src/pipeline/src/etl/processor/regex.rs index d31fe5e189..58fd01e5aa 100644 --- a/src/pipeline/src/etl/processor/regex.rs +++ b/src/pipeline/src/etl/processor/regex.rs @@ -192,7 +192,7 @@ impl Processor for RegexProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); let prefix = field.target_or_input_field(); @@ -220,7 +220,7 @@ impl Processor for RegexProcessor { } } - Ok(()) + Ok(val) } } #[cfg(test)] diff --git a/src/pipeline/src/etl/processor/select.rs b/src/pipeline/src/etl/processor/select.rs index cbaeae0e9a..0cea542949 100644 --- a/src/pipeline/src/etl/processor/select.rs +++ b/src/pipeline/src/etl/processor/select.rs @@ -96,7 +96,7 @@ impl Processor for SelectProcessor { true } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { match self.select_type { SelectType::Include => { let mut include_key_set = HashSet::with_capacity(val.len()); @@ -121,7 +121,7 @@ impl Processor for SelectProcessor { } } - Ok(()) + Ok(val) } } @@ -142,8 +142,9 @@ mod test { p.insert("hello".to_string(), Value::String("world".to_string())); p.insert("hello2".to_string(), Value::String("world2".to_string())); - let result = processor.exec_mut(&mut p); + let result = processor.exec_mut(p); assert!(result.is_ok()); + let p = result.unwrap(); assert_eq!(p.len(), 1); assert_eq!(p.get("hello"), Some(&Value::String("world".to_string()))); } @@ -159,8 +160,9 @@ mod test { p.insert("hello".to_string(), Value::String("world".to_string())); p.insert("hello2".to_string(), Value::String("world2".to_string())); - let result = processor.exec_mut(&mut p); + let result = processor.exec_mut(p); assert!(result.is_ok()); + let p = result.unwrap(); assert_eq!(p.len(), 1); assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string()))); } @@ -176,8 +178,9 @@ mod test { p.insert("hello".to_string(), Value::String("world".to_string())); p.insert("hello2".to_string(), Value::String("world2".to_string())); - let result = processor.exec_mut(&mut p); + let result = processor.exec_mut(p); assert!(result.is_ok()); + let p = result.unwrap(); assert_eq!(p.len(), 1); assert_eq!(p.get("hello"), None); assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string()))); diff --git a/src/pipeline/src/etl/processor/simple_extract.rs b/src/pipeline/src/etl/processor/simple_extract.rs index a8c3b48cdd..fd94692742 100644 --- a/src/pipeline/src/etl/processor/simple_extract.rs +++ b/src/pipeline/src/etl/processor/simple_extract.rs @@ -98,7 +98,7 @@ impl Processor for SimpleExtractProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -118,7 +118,7 @@ impl Processor for SimpleExtractProcessor { } } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/timestamp.rs b/src/pipeline/src/etl/processor/timestamp.rs index 6d4c228025..d608625836 100644 --- a/src/pipeline/src/etl/processor/timestamp.rs +++ b/src/pipeline/src/etl/processor/timestamp.rs @@ -298,7 +298,7 @@ impl Processor for TimestampProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -318,7 +318,7 @@ impl Processor for TimestampProcessor { } } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/urlencoding.rs b/src/pipeline/src/etl/processor/urlencoding.rs index 95209bce5a..3eb515484a 100644 --- a/src/pipeline/src/etl/processor/urlencoding.rs +++ b/src/pipeline/src/etl/processor/urlencoding.rs @@ -126,7 +126,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + fn exec_mut(&self, mut val: PipelineMap) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -153,7 +153,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor { } } } - Ok(()) + Ok(val) } } diff --git a/src/pipeline/src/etl/processor/vrl.rs b/src/pipeline/src/etl/processor/vrl.rs new file mode 100644 index 0000000000..eeac4d49a6 --- /dev/null +++ b/src/pipeline/src/etl/processor/vrl.rs @@ -0,0 +1,319 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use chrono_tz::Tz; +use snafu::{OptionExt, ResultExt}; +use vrl::compiler::runtime::Runtime; +use vrl::compiler::{compile, Program, TargetValue}; +use vrl::diagnostic::Formatter; +use vrl::prelude::{Bytes, NotNan, TimeZone}; +use vrl::value::{KeyString, Kind, Secrets, Value as VrlValue}; + +use crate::error::{ + BytesToUtf8Snafu, CompileVrlSnafu, Error, ExecuteVrlSnafu, FloatNaNSnafu, + InvalidTimestampSnafu, KeyMustBeStringSnafu, Result, VrlRegexValueSnafu, VrlReturnValueSnafu, +}; +use crate::etl::processor::yaml_string; +use crate::{PipelineMap, Value as PipelineValue}; + +pub(crate) const PROCESSOR_VRL: &str = "vrl"; +const SOURCE: &str = "source"; + +#[derive(Debug)] +pub struct VrlProcessor { + source: String, + program: Program, +} + +impl VrlProcessor { + pub fn new(source: String) -> Result { + let fns = vrl::stdlib::all(); + + let compile_result = compile(&source, &fns).map_err(|e| { + CompileVrlSnafu { + msg: Formatter::new(&source, e).to_string(), + } + .build() + })?; + + let program = compile_result.program; + + // check if the return value is have regex + let result_def = program.final_type_info().result; + let kind = result_def.kind(); + if !kind.is_object() { + return VrlReturnValueSnafu.fail(); + } + check_regex_output(kind)?; + + Ok(Self { source, program }) + } + + pub fn resolve(&self, m: PipelineMap) -> Result { + let pipeline_vrl = m + .into_iter() + .map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v))) + .collect::>>()?; + + let mut target = TargetValue { + value: VrlValue::Object(pipeline_vrl), + metadata: VrlValue::Object(BTreeMap::new()), + secrets: Secrets::default(), + }; + + let timezone = TimeZone::Named(Tz::UTC); + let mut runtime = Runtime::default(); + let re = runtime + .resolve(&mut target, &self.program, &timezone) + .map_err(|e| { + ExecuteVrlSnafu { + msg: e.get_expression_error().to_string(), + } + .build() + })?; + + vrl_value_to_pipeline_value(re) + } +} + +impl TryFrom<&yaml_rust::yaml::Hash> for VrlProcessor { + type Error = Error; + + fn try_from(value: &yaml_rust::yaml::Hash) -> Result { + let mut source = String::new(); + for (k, v) in value.iter() { + let key = k + .as_str() + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; + if key == SOURCE { + source = yaml_string(v, SOURCE)?; + } + } + let processor = VrlProcessor::new(source)?; + Ok(processor) + } +} + +impl crate::etl::processor::Processor for VrlProcessor { + fn kind(&self) -> &str { + PROCESSOR_VRL + } + + fn ignore_missing(&self) -> bool { + true + } + + fn exec_mut(&self, val: PipelineMap) -> Result { + let val = self.resolve(val)?; + + if let PipelineValue::Map(m) = val { + Ok(m.values) + } else { + VrlRegexValueSnafu.fail() + } + } +} + +fn pipeline_value_to_vrl_value(v: PipelineValue) -> Result { + match v { + PipelineValue::Null => Ok(VrlValue::Null), + PipelineValue::Int8(x) => Ok(VrlValue::Integer(x as i64)), + PipelineValue::Int16(x) => Ok(VrlValue::Integer(x as i64)), + PipelineValue::Int32(x) => Ok(VrlValue::Integer(x as i64)), + PipelineValue::Int64(x) => Ok(VrlValue::Integer(x)), + PipelineValue::Uint8(x) => Ok(VrlValue::Integer(x as i64)), + PipelineValue::Uint16(x) => Ok(VrlValue::Integer(x as i64)), + PipelineValue::Uint32(x) => Ok(VrlValue::Integer(x as i64)), + PipelineValue::Uint64(x) => Ok(VrlValue::Integer(x as i64)), + PipelineValue::Float32(x) => NotNan::new(x as f64) + .map_err(|_| FloatNaNSnafu { input_float: x }.build()) + .map(VrlValue::Float), + PipelineValue::Float64(x) => NotNan::new(x) + .map_err(|_| FloatNaNSnafu { input_float: x }.build()) + .map(VrlValue::Float), + PipelineValue::Boolean(x) => Ok(VrlValue::Boolean(x)), + PipelineValue::String(x) => Ok(VrlValue::Bytes(Bytes::copy_from_slice(x.as_bytes()))), + PipelineValue::Timestamp(x) => x + .to_datetime() + .context(InvalidTimestampSnafu { + input: x.to_string(), + }) + .map(VrlValue::Timestamp), + PipelineValue::Array(array) => Ok(VrlValue::Array( + array + .into_iter() + .map(pipeline_value_to_vrl_value) + .collect::>>()?, + )), + PipelineValue::Map(m) => { + let values = m + .values + .into_iter() + .map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v))) + .collect::>>()?; + Ok(VrlValue::Object(values)) + } + } +} + +fn vrl_value_to_pipeline_value(v: VrlValue) -> Result { + match v { + VrlValue::Bytes(bytes) => String::from_utf8(bytes.to_vec()) + .context(BytesToUtf8Snafu) + .map(PipelineValue::String), + VrlValue::Regex(_) => VrlRegexValueSnafu.fail(), + VrlValue::Integer(x) => Ok(PipelineValue::Int64(x)), + VrlValue::Float(not_nan) => Ok(PipelineValue::Float64(not_nan.into_inner())), + VrlValue::Boolean(b) => Ok(PipelineValue::Boolean(b)), + VrlValue::Timestamp(date_time) => crate::etl::value::Timestamp::from_datetime(date_time) + .context(InvalidTimestampSnafu { + input: date_time.to_string(), + }) + .map(PipelineValue::Timestamp), + VrlValue::Object(bm) => { + let b = bm + .into_iter() + .map(|(k, v)| vrl_value_to_pipeline_value(v).map(|v| (k.to_string(), v))) + .collect::>>()?; + Ok(PipelineValue::Map(b.into())) + } + VrlValue::Array(values) => { + let a = values + .into_iter() + .map(vrl_value_to_pipeline_value) + .collect::>>()?; + Ok(PipelineValue::Array(a.into())) + } + VrlValue::Null => Ok(PipelineValue::Null), + } +} + +fn check_regex_output(output_kind: &Kind) -> Result<()> { + if output_kind.is_regex() { + return VrlRegexValueSnafu.fail(); + } + + if let Some(arr) = output_kind.as_array() { + let k = arr.known(); + for v in k.values() { + check_regex_output(v)? + } + } + + if let Some(obj) = output_kind.as_object() { + let k = obj.known(); + for v in k.values() { + check_regex_output(v)? + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + + use super::*; + use crate::etl::value::Timestamp; + use crate::Map; + + #[test] + fn test_vrl() { + let source = r#" +.name.a = .user_info.name +.name.b = .user_info.name +del(.user_info) +.timestamp = now() +. +"#; + + let v = VrlProcessor::new(source.to_string()); + assert!(v.is_ok()); + let v = v.unwrap(); + + let mut n = PipelineMap::new(); + n.insert( + "name".to_string(), + PipelineValue::String("certain_name".to_string()), + ); + + let mut m = PipelineMap::new(); + m.insert( + "user_info".to_string(), + PipelineValue::Map(Map { values: n }), + ); + + let re = v.resolve(m); + assert!(re.is_ok()); + let re = re.unwrap(); + + assert!(matches!(re, PipelineValue::Map(_))); + assert!(re.get("name").is_some()); + let name = re.get("name").unwrap(); + assert!(matches!(name.get("a").unwrap(), PipelineValue::String(x) if x == "certain_name")); + assert!(matches!(name.get("b").unwrap(), PipelineValue::String(x) if x == "certain_name")); + assert!(re.get("timestamp").is_some()); + let timestamp = re.get("timestamp").unwrap(); + assert!(matches!( + timestamp, + PipelineValue::Timestamp(Timestamp::Nanosecond(_)) + )); + } + + #[test] + fn test_yaml_to_vrl() { + let yaml = r#" +processors: + - vrl: + source: | + .name.a = .user_info.name + .name.b = .user_info.name + del(.user_info) + .timestamp = now() + . +"#; + let y = yaml_rust::YamlLoader::load_from_str(yaml).unwrap(); + let vrl_processor_yaml = y + .first() + .and_then(|x| x.as_hash()) + .and_then(|x| x.get(&yaml_rust::Yaml::String("processors".to_string()))) + .and_then(|x| x.as_vec()) + .and_then(|x| x.first()) + .and_then(|x| x.as_hash()) + .and_then(|x| x.get(&yaml_rust::Yaml::String("vrl".to_string()))) + .and_then(|x| x.as_hash()) + .unwrap(); + + let vrl = VrlProcessor::try_from(vrl_processor_yaml); + assert!(vrl.is_ok()); + let vrl = vrl.unwrap(); + + assert_eq!(vrl.source, ".name.a = .user_info.name\n.name.b = .user_info.name\ndel(.user_info)\n.timestamp = now()\n.\n"); + } + + #[test] + fn test_regex() { + let source = r#" +.re = r'(?i)^Hello, World!$' +del(.re) +.re = r'(?i)^Hello, World!$' +. +"#; + + let v = VrlProcessor::new(source.to_string()); + assert!(v.is_err()); + } +} diff --git a/src/pipeline/src/etl/value/time.rs b/src/pipeline/src/etl/value/time.rs index 414b5558ca..ad022ed6f9 100644 --- a/src/pipeline/src/etl/value/time.rs +++ b/src/pipeline/src/etl/value/time.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use chrono::{DateTime, Utc}; use common_time::timestamp::TimeUnit; #[derive(Debug, Clone, PartialEq)] @@ -104,6 +105,19 @@ impl Timestamp { Timestamp::Second(_) => TimeUnit::Second, } } + + pub fn to_datetime(&self) -> Option> { + match self { + Timestamp::Nanosecond(v) => Some(DateTime::from_timestamp_nanos(*v)), + Timestamp::Microsecond(v) => DateTime::from_timestamp_micros(*v), + Timestamp::Millisecond(v) => DateTime::from_timestamp_millis(*v), + Timestamp::Second(v) => DateTime::from_timestamp(*v, 0), + } + } + + pub fn from_datetime(dt: DateTime) -> Option { + dt.timestamp_nanos_opt().map(Timestamp::Nanosecond) + } } impl Default for Timestamp { diff --git a/src/pipeline/tests/common.rs b/src/pipeline/tests/common.rs index d2435b13f3..6bc0582aea 100644 --- a/src/pipeline/tests/common.rs +++ b/src/pipeline/tests/common.rs @@ -29,9 +29,9 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { match input_value { serde_json::Value::Array(array) => { for value in array { - let mut intermediate_status = json_to_map(value).unwrap(); + let intermediate_status = json_to_map(value).unwrap(); let row = pipeline - .exec_mut(&mut intermediate_status) + .exec_mut(intermediate_status) .expect("failed to exec pipeline") .into_transformed() .expect("expect transformed result "); @@ -39,9 +39,9 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { } } serde_json::Value::Object(_) => { - let mut intermediate_status = json_to_map(input_value).unwrap(); + let intermediate_status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(&mut intermediate_status) + .exec_mut(intermediate_status) .expect("failed to exec pipeline") .into_transformed() .expect("expect transformed result "); diff --git a/src/pipeline/tests/dissect.rs b/src/pipeline/tests/dissect.rs index 6feb5f1c50..85ec7a6310 100644 --- a/src/pipeline/tests/dissect.rs +++ b/src/pipeline/tests/dissect.rs @@ -274,9 +274,9 @@ transform: let yaml_content = pipeline::Content::Yaml(pipeline_yaml); let pipeline: pipeline::Pipeline = pipeline::parse(&yaml_content).expect("failed to parse pipeline"); - let mut result = json_to_map(input_value).unwrap(); + let result = json_to_map(input_value).unwrap(); - let row = pipeline.exec_mut(&mut result); + let row = pipeline.exec_mut(result); assert!(row.is_err()); assert_eq!(row.err().unwrap().to_string(), "No matching pattern found"); diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index fd64e60feb..cd2f771208 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -419,10 +419,10 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); - let mut stats = json_to_map(input_value).unwrap(); + let stats = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(&mut stats) + .exec_mut(stats) .expect("failed to exec pipeline") .into_transformed() .expect("expect transformed result "); @@ -488,9 +488,9 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_map(input_value).unwrap(); + let status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(&mut status) + .exec_mut(status) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -597,9 +597,9 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_map(input_value).unwrap(); + let status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(&mut status) + .exec_mut(status) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -663,9 +663,9 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_map(input_value).unwrap(); + let status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(&mut status) + .exec_mut(status) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -703,10 +703,9 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_map(input_value).unwrap(); - + let status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(&mut status) + .exec_mut(status) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -763,9 +762,9 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_map(input_value).unwrap(); + let status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(&mut status) + .exec_mut(status) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -804,9 +803,9 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_map(input_value).unwrap(); + let status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(&mut status) + .exec_mut(status) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -866,18 +865,18 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_map(input_value1).unwrap(); + let status = json_to_map(input_value1).unwrap(); let dispatched_to = pipeline - .exec_mut(&mut status) + .exec_mut(status) .unwrap() .into_dispatched() .expect("expect dispatched result "); assert_eq!(dispatched_to.table_suffix, "http"); assert_eq!(dispatched_to.pipeline.unwrap(), "access_log_pipeline"); - let mut status = json_to_map(input_value2).unwrap(); + let status = json_to_map(input_value2).unwrap(); let row = pipeline - .exec_mut(&mut status) + .exec_mut(status) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -930,8 +929,8 @@ table_suffix: _${logger} let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_map(input_value).unwrap(); - let exec_re = pipeline.exec_mut(&mut status).unwrap(); + let status = json_to_map(input_value).unwrap(); + let exec_re = pipeline.exec_mut(status).unwrap(); let (row, table_name) = exec_re.into_transformed().unwrap(); let values = row.values; diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index 8e8cb7518b..5f3ff67aff 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -120,9 +120,9 @@ async fn run_custom_pipeline( let mut auto_map = HashMap::new(); let mut auto_map_ts_keys = HashMap::new(); - for mut pipeline_map in pipeline_maps { + for pipeline_map in pipeline_maps { let r = pipeline - .exec_mut(&mut pipeline_map) + .exec_mut(pipeline_map) .inspect_err(|_| { METRIC_HTTP_LOGS_TRANSFORM_ELAPSED .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) @@ -135,6 +135,7 @@ async fn run_custom_pipeline( opt, row, table_suffix, + pipeline_map: _val, }) => { let act_table_name = table_suffix_to_table_name(&table_name, table_suffix); push_to_map!(transformed_map, (opt, act_table_name), row, arr_len); @@ -142,6 +143,7 @@ async fn run_custom_pipeline( PipelineExecOutput::AutoTransform(AutoTransformOutput { table_suffix, ts_unit_map, + pipeline_map, }) => { let act_table_name = table_suffix_to_table_name(&table_name, table_suffix); push_to_map!(auto_map, act_table_name.clone(), pipeline_map, arr_len); @@ -150,8 +152,8 @@ async fn run_custom_pipeline( .or_insert_with(HashMap::new) .extend(ts_unit_map); } - PipelineExecOutput::DispatchedTo(dispatched_to) => { - push_to_map!(dispatched, dispatched_to, pipeline_map, arr_len); + PipelineExecOutput::DispatchedTo(dispatched_to, val) => { + push_to_map!(dispatched, dispatched_to, val, arr_len); } } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index e5f4834614..e199109d40 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -105,6 +105,7 @@ macro_rules! http_tests { test_pipeline_dispatcher, test_pipeline_suffix_template, test_pipeline_context, + test_pipeline_with_vrl, test_otlp_metrics, test_otlp_traces_v0, @@ -2133,6 +2134,74 @@ table_suffix: _${type} guard.remove_all().await; } +pub async fn test_pipeline_with_vrl(storage_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(storage_type, "test_pipeline_with_vrl").await; + + // handshake + let client = TestClient::new(app).await; + + let pipeline = r#" +processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + ignore_missing: true + - vrl: + source: | + .log_id = .id + del(.id) + . + +transform: + - fields: + - log_id + type: int32 + - field: time + type: time + index: timestamp +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/root") + .header("Content-Type", "application/x-yaml") + .body(pipeline) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + // 2. write data + let data_body = r#" +[ + { + "id": "2436", + "time": "2024-05-25 20:16:37.217" + } +] +"#; + let res = client + .post("/v1/events/logs?db=public&table=d_table&pipeline_name=root") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + validate_data( + "test_pipeline_with_vrl", + &client, + "select * from d_table", + "[[2436,1716668197217000000]]", + ) + .await; + + guard.remove_all().await; +} + pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) =