diff --git a/Cargo.lock b/Cargo.lock
index 63ba289947..a65159d26a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2278,6 +2278,7 @@ dependencies = [
"futures",
"lazy_static",
"object-store",
+ "object_store_opendal",
"orc-rust",
"parquet",
"paste",
@@ -5102,6 +5103,7 @@ dependencies = [
"datatypes",
"futures",
"object-store",
+ "object_store_opendal",
"serde",
"serde_json",
"snafu 0.8.6",
@@ -8320,6 +8322,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datatypes",
+ "derive_more",
"dotenv",
"either",
"futures",
@@ -9074,8 +9077,9 @@ dependencies = [
[[package]]
name = "object_store_opendal"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0eb12a624a41fce745838d0ef3701ff6c47797c13cd18ad3612fd2a3134fdbd8"
dependencies = [
"async-trait",
"bytes",
@@ -9162,8 +9166,9 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "opendal"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "96c9c85ce253ff87225e7669979d877a20c98a06604ec9d6dd5f4473e08f1ae1"
dependencies = [
"ctor",
"opendal-core",
@@ -9183,8 +9188,9 @@ dependencies = [
[[package]]
name = "opendal-core"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c4f8607c90e2c963a91467f50fb49fbc7fb3d573f88cea219ca59ccd3740b309"
dependencies = [
"anyhow",
"base64 0.22.1",
@@ -9210,8 +9216,9 @@ dependencies = [
[[package]]
name = "opendal-layer-concurrent-limit"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0d6f81ba6960e3fae1882f253b114b21d7e444e1534f209c7737a79f6243eb6f"
dependencies = [
"futures",
"http 1.3.1",
@@ -9221,8 +9228,9 @@ dependencies = [
[[package]]
name = "opendal-layer-logging"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "58ada45c6d81d1aa4c9305d0c7d4bc317c59c85866a0908a2d75a7a978aa5ee2"
dependencies = [
"log",
"opendal-core",
@@ -9230,8 +9238,9 @@ dependencies = [
[[package]]
name = "opendal-layer-observe-metrics-common"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "628b0228fdbd13c3d9d50eee4341f2eb82ca5b44991e4c68f07c84cc823e2d12"
dependencies = [
"futures",
"http 1.3.1",
@@ -9240,8 +9249,9 @@ dependencies = [
[[package]]
name = "opendal-layer-prometheus"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0487bdb1357097ec8654781bad03ef310282517738e2864ebde69e27aaafc5ec"
dependencies = [
"opendal-core",
"opendal-layer-observe-metrics-common",
@@ -9250,8 +9260,9 @@ dependencies = [
[[package]]
name = "opendal-layer-retry"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b2a25a718afb81fad81cb9a0580a1cb989221fa2317f888c6a37f8dad408eb7"
dependencies = [
"backon",
"log",
@@ -9260,8 +9271,9 @@ dependencies = [
[[package]]
name = "opendal-layer-timeout"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e91f731724c213af81e9d03517859c8fc47b4578e64ad61ae4f099f10fe36e3"
dependencies = [
"opendal-core",
"tokio",
@@ -9269,8 +9281,9 @@ dependencies = [
[[package]]
name = "opendal-layer-tracing"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "90c6fc9df6da1f0dafbdf55fa48525f1643aefbe7da8f46936e869e2a5b8a34f"
dependencies = [
"futures",
"http 1.3.1",
@@ -9280,8 +9293,9 @@ dependencies = [
[[package]]
name = "opendal-service-azblob"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0030644366ef5d8cbe3a4a5822bf99a4aafddc1666e9d24b44d158d9062fc76a"
dependencies = [
"base64 0.22.1",
"bytes",
@@ -9300,8 +9314,9 @@ dependencies = [
[[package]]
name = "opendal-service-azure-common"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9b489f13c42e69d69bdd72952b634356ec43a7881a20259b38b540fcecdf4051"
dependencies = [
"http 1.3.1",
"opendal-core",
@@ -9309,8 +9324,9 @@ dependencies = [
[[package]]
name = "opendal-service-fs"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "22e89a665fef0e6bd249cf5ea47fc174b7ba892159bee4b9382528b1ca873a2c"
dependencies = [
"bytes",
"log",
@@ -9322,8 +9338,9 @@ dependencies = [
[[package]]
name = "opendal-service-gcs"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "48de101aac565ed06af4b47903c24eafd249075553ec1fb18256751c45148d47"
dependencies = [
"async-trait",
"bytes",
@@ -9342,8 +9359,9 @@ dependencies = [
[[package]]
name = "opendal-service-http"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fb6af628a0bf14075b957179444927e1df40dc7addef382b585a05ef015a077b"
dependencies = [
"http 1.3.1",
"log",
@@ -9353,8 +9371,9 @@ dependencies = [
[[package]]
name = "opendal-service-oss"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "328fa55e8888cbdfe00826bfea2a79042422b720e8369e9e021e46121dea5ace"
dependencies = [
"bytes",
"http 1.3.1",
@@ -9369,8 +9388,9 @@ dependencies = [
[[package]]
name = "opendal-service-s3"
-version = "0.56.0"
-source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7"
+version = "0.57.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "313d46c9f5ae70bca26b7c3e3fbb9b639292625f28af73aa016f47e788af9deb"
dependencies = [
"base64 0.22.1",
"bytes",
@@ -14102,9 +14122,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "tar"
-version = "0.4.45"
+version = "0.4.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "22692a6476a21fa75fdfc11d452fda482af402c008cdbaf3476414e122040973"
+checksum = "3f6221d9a6003c78398e3b239969f352578258df48c8eb051caadae0015bc840"
dependencies = [
"filetime",
"libc",
diff --git a/Cargo.toml b/Cargo.toml
index 32407f31cf..56200a24d6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -178,7 +178,7 @@ nalgebra = "0.33"
nix = { version = "0.30.1", default-features = false, features = ["event", "fs", "process"] }
notify = "8.0"
num_cpus = "1.16"
-object_store_opendal = { git = "https://github.com/apache/opendal.git", rev = "4ad2d85296ffa6fdc2882f97d3c760ee243913f7" }
+object_store_opendal = "0.57"
once_cell = "1.18"
opentelemetry-proto = { version = "0.31", features = [
"gen-tonic",
diff --git a/config/config.md b/config/config.md
index 0fae0caaa4..d9cffaf122 100644
--- a/config/config.md
+++ b/config/config.md
@@ -14,6 +14,7 @@
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
+| `auto_create_table` | Bool | `true` | Server-side global switch for auto table creation on write.
When `false`, a missing table is never auto-created even if the request sets the `auto_create_table` hint to `true`. Default: `true`. |
| `user_provider` | String | Unset | The user provider for authentication.
Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" |
| `max_in_flight_write_bytes` | String | Unset | Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
Set to 0 to disable the limit. Default: "0" (unlimited) |
| `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted.
Options: "wait" (default, 10s timeout), "wait()" (e.g., "wait(30s)"), "fail" |
@@ -230,6 +231,7 @@
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
+| `auto_create_table` | Bool | `true` | Server-side global switch for auto table creation on write.
When `false`, a missing table is never auto-created even if the request sets the `auto_create_table` hint to `true`. Default: `true`. |
| `user_provider` | String | Unset | The user provider for authentication.
Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" |
| `max_in_flight_write_bytes` | String | Unset | Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
Set to 0 to disable the limit. Default: "0" (unlimited) |
| `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted.
Options: "wait" (default, 10s timeout), "wait()" (e.g., "wait(30s)"), "fail" |
@@ -628,6 +630,7 @@
| `flow.batching_mode.experimental_frontend_scan_timeout` | String | `30s` | Flow wait for available frontend timeout,
if failed to find available frontend after frontend_scan_timeout elapsed, return error
which prevent flownode from starting |
| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query |
| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance |
+| `flow.batching_mode.experimental_enable_incremental_read` | Bool | `false` | Whether to enable experimental flow incremental source reads.
When disabled, batching flows always execute full-snapshot queries.
Can be overridden per flow with WITH (experimental_enable_incremental_read = 'true'). |
| `flow.batching_mode.read_preference` | String | `Leader` | Read preference of the Frontend client. |
| `flow.batching_mode.frontend_tls` | -- | -- | -- |
| `flow.batching_mode.frontend_tls.enabled` | Bool | `false` | Whether to enable TLS for client. |
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index 2c053e6e8c..ff8a9e4a50 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -31,6 +31,10 @@ node_id = 14
#+experimental_max_filter_num_per_query=20
## Time window merge distance
#+experimental_time_window_merge_threshold=3
+## Whether to enable experimental flow incremental source reads.
+## When disabled, batching flows always execute full-snapshot queries.
+## Can be overridden per flow with WITH (experimental_enable_incremental_read = 'true').
+#+experimental_enable_incremental_read=false
## Read preference of the Frontend client.
#+read_preference="Leader"
[flow.batching_mode.frontend_tls]
diff --git a/config/frontend.example.toml b/config/frontend.example.toml
index 39f38fbef9..a044aebda6 100644
--- a/config/frontend.example.toml
+++ b/config/frontend.example.toml
@@ -6,6 +6,10 @@ default_timezone = "UTC"
## @toml2docs:none-default
default_column_prefix = "greptime"
+## Server-side global switch for auto table creation on write.
+## When `false`, a missing table is never auto-created even if the request sets the `auto_create_table` hint to `true`. Default: `true`.
+#+ auto_create_table = true
+
## The user provider for authentication.
## Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd"
## @toml2docs:none-default
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index d5c42e744c..5740e0e1cf 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -6,6 +6,10 @@ default_timezone = "UTC"
## @toml2docs:none-default
default_column_prefix = "greptime"
+## Server-side global switch for auto table creation on write.
+## When `false`, a missing table is never auto-created even if the request sets the `auto_create_table` hint to `true`. Default: `true`.
+#+ auto_create_table = true
+
## The user provider for authentication.
## Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd"
## @toml2docs:none-default
diff --git a/docs/rfcs/2026-05-28-table-semantic-layer.md b/docs/rfcs/2026-05-28-table-semantic-layer.md
new file mode 100644
index 0000000000..e4d899d704
--- /dev/null
+++ b/docs/rfcs/2026-05-28-table-semantic-layer.md
@@ -0,0 +1,157 @@
+---
+Feature Name: Table Semantic Layer
+Tracking Issue: TBD
+Date: 2026-05-28
+Author: "Dennis Zhuang "
+---
+
+# Summary
+
+Attach a thin layer of semantic metadata to each table so machine consumers — LLM agents, alert generators, dashboard builders, MCP servers, ETL pipelines — can align it with the observability concepts they already know (OTel instrument kinds, Prometheus naming conventions, UCUM units, semantic conventions, severity numbers, OTel ↔ Prometheus translation rules).
+
+The mechanism reuses what already exists in `table_options` (the same slot that today carries `table_data_model` and `otlp_metric_compat`): a reserved `greptime.semantic.*` namespace, plus standard SQL column `COMMENT` for field-level supplements, plus an `information_schema.semantic_tables` view as the discovery entry point. No new protocol, no new DDL keyword.
+
+Per-table identity only. Cross-table relationships are deferred.
+
+# Motivation
+
+GreptimeDB already ingests OTLP metrics / traces / logs and Prometheus remote write. Each protocol carries rich metadata on the wire (instrument kind, temporality, unit, scope, resource, semantic-conventions version), and most of it is dropped when rows land in a table:
+
+- An `opentelemetry_traces` table looks like any wide table; signal type, source, and field provenance must be guessed from naming.
+- The OTel-to-Prometheus translation in v0.16+ actively drops scope attributes and most resource attributes; the table never records *what was dropped*.
+- Prometheus remote write v1 metadata is unreliable by protocol, but downstream tables do not flag whether `counter` typing was *declared* or *inferred* from the `_total` suffix.
+- Mixed-temporality data (OTel delta + Prometheus cumulative in the same table) is unrecoverable from schema alone.
+
+The audience is broader than LLM agents. Alert generators need to choose between `rate()` and absolute thresholds, and need units to pick sensible bounds. Dashboard builders pick visualisations by signal type. MCP servers surface a structured tool catalog instead of free-text descriptions. ETL pipelines need lineage to know whether a `service_name` column is `resource.service.name` or a free-form label. All of them currently guess from column names; the metadata to remove the guess already exists at ingest time, we just do not preserve it.
+
+# Goals
+
+1. Tag every ingested table with a stable identity using existing SQL surfaces — no new protocol, no new DDL keyword.
+2. Record the lossy transformations the ingestion path performs (dropped attributes, scope handling, type inference vs. declaration).
+3. Expose one `information_schema` view as the consumer-facing discovery entry point.
+4. Keep the layer optional and additive — tables without these options keep working unchanged.
+
+# Non-Goals
+
+- Cross-table relationship modelling. Deferred to a follow-up RFC.
+- Bespoke storage. Reuse `table_options` and column `COMMENT`.
+- Semantic enforcement at query time. The layer is descriptive, not coercive.
+- New wire protocol. Upstream standardisation is mentioned only as a future direction.
+
+# Proposal
+
+## Three mechanisms
+
+1. **`greptime.semantic.*` table options** — table-level identity and lineage. Carried inside the existing `table_options` blob. This is the same slot that today carries `table_data_model = 'greptime_trace_v1'` and `otlp_metric_compat = 'prom'`, so the mechanism is generalising what the OTLP trace auto-create path already does.
+2. **Column `COMMENT`** — column-level supplements ("this column is `resource.service.name`"; "this column carries delta values"). Standard SQL.
+3. **`information_schema.semantic_tables` view** — a denormalised projection of the options, registered through the existing `with_extra_table_factories()` hook. Tables without a `greptime.semantic.*` option do not appear in the view.
+
+## Vocabulary
+
+All keys are flat strings under the `greptime.semantic.` prefix; values are strings; unknown keys are tolerated so the vocabulary can grow without coordinated rollouts.
+
+**Common (all signals)**
+
+| Key | Example |
+| --- | --- |
+| `greptime.semantic.signal_type` | `trace` / `log` / `metric` / `event` |
+| `greptime.semantic.source` | `opentelemetry` / `prometheus` / `elasticsearch` / `loki` / `custom` |
+| `greptime.semantic.source_version` | protocol or SDK version, e.g. `v2` (Prom remote write), `1.30.0` (optional) |
+| `greptime.semantic.pipeline` | `greptime_trace_v1` (subsumes the existing `table_data_model` value) |
+
+**Trace**: `greptime.semantic.trace.conventions` (e.g. `otel-semconv-1.27`, lifted from `schema_url`, which is the version of the OpenTelemetry semantic conventions used in this table), `greptime.semantic.trace.has_events`, `greptime.semantic.trace.has_links`.
+
+**Metric** — v1 assumes one metric type per table, which is how both Prom RW and the post-v0.16 OTel ingestion path land data today; mixed-type tables are a follow-up.
+
+| Key | Example |
+| --- | --- |
+| `greptime.semantic.metric.type` | `counter` / `gauge` / `histogram` / `summary` / `updown_counter` / `gauge_histogram` / `info` / `stateset` |
+| `greptime.semantic.metric.unit` | UCUM, e.g. `s`, `By`, `{request}` |
+| `greptime.semantic.metric.temporality` | `cumulative` / `delta` (OTel only) |
+| `greptime.semantic.metric.monotonic` | `true` / `false` |
+| `greptime.semantic.metric.metadata_quality` | `declared` (OTLP / Prom RW v2 / exposition) or `inferred` (Prom RW v1, name-suffix guess) |
+| `greptime.semantic.metric.original_name` | Pre-translation OTel name when the table name was Prometheus-ised |
+
+`metadata_quality = inferred` is the load-bearing field for confidence-aware tooling: an inferred counter should be re-checked before betting on `rate()`-style semantics.
+
+**Log**: `greptime.semantic.log.severity_scheme` (`otlp` / `syslog` / `custom`), `greptime.semantic.log.body_format` (`string` / `json` / `mixed`).
+
+**Resource / scope preservation**: `greptime.semantic.resource.attributes_preserved` (JSON array string of attrs promoted to columns), `greptime.semantic.resource.attributes_dropped` (boolean), `greptime.semantic.scope.preserved` (boolean). These answer the most common downstream question: "is this data missing because it was dropped, or because it lives on a different column than I think?" List-shaped values use JSON array strings rather than comma-separated text to avoid escaping and ordering ambiguity.
+
+## Conflict and update semantics
+
+Two design decisions worth pinning down up front, because they constrain everything else:
+
+- **Conflict.** Some table-level keys (`trace.conventions` lifted from `schema_url`, `metric.temporality`, ...) cannot represent the truth when a long-lived table sees rows from multiple sources. v1 records `mixed` or `unknown` rather than a fictitious single value. Downstream consumers must treat any single-valued semantic key as best-effort, not strong evidence.
+- **Update.** Semantic options are stamped at table creation. v1 does not specify an update path; promoting `metadata_quality` from `inferred` to `declared`, refreshing `resource.attributes_preserved`, or revising `trace.conventions` on later writes is deferred. If real usage shows update is needed, it lands as a separate RFC.
+
+## `information_schema.semantic_tables`
+
+A consumer's first SQL on connect:
+
+```sql
+SELECT table_catalog, table_schema, table_name, signal_type, source, pipeline
+FROM information_schema.semantic_tables;
+```
+
+returns one row per semantic-tagged table. The view exposes a stable set of core columns (`table_catalog`, `table_schema`, `table_name`, `signal_type`, `source`, `source_version`, `pipeline`) plus a `semantic_options` JSON column carrying the rest of the `greptime.semantic.*` keys verbatim. Future keys appear inside `semantic_options` without forcing a view-schema change; only widely-used keys are ever promoted to first-class columns.
+
+# Implementation Plan
+
+Four phases, each independently shippable.
+
+1. **Identity.** Stamp `signal_type` and `source` on every auto-create path. The OTLP paths already have natural injection points; Prom remote write is the one non-trivial path because metric-engine logical tables share physical storage (see Open Question 2).
+2. **Metric specifics.** Add type / unit / temporality / monotonic / metadata_quality / original_name at OTel metric and Prom RW ingestion sites; the data is already at hand inside the OTel translator.
+3. **Resource / scope lineage.** Record what the OTel-to-Prometheus translation kept and dropped.
+4. **`information_schema.semantic_tables` view + documentation** as a stable user-facing contract.
+
+# Relationship to OpenTelemetry standardisation
+
+OTel today standardises what producers emit and how data collectors are managed; the read side — what a backend exposes back to clients — is deliberately vendor turf. OTLP is one-way; OpAMP is agent management; OTEP-0243 (App Telemetry Schema) is producer-side; `schema_url` is producer-stated with no reverse. Adjacent precedents — Prometheus `/api/v1/metadata`, Loki labels API, Tempo tags, Jaeger services, ad-hoc MCP servers — are all vendor-specific.
+
+This is a real gap. The shape we propose locally (signal-agnostic, `schema_url`-aware, structured around a small vocabulary) is deliberately close to what a future upstream OTEP for a backend-catalog read API could look like, with Weaver's *Resolved Telemetry Schema* as the natural data model. We do not commit to driving such an OTEP here; we do commit to keeping the local shape close enough that a future upstream proposal does not force a breaking migration.
+
+# Alternatives
+
+- **New DDL syntax (`SEMANTIC trace WITH (...)`).** Cleaner-looking but non-standard and forces every client to learn it. The metadata is not interesting enough to justify a new keyword.
+- **Dedicated `_semantic` system table.** Doubles the storage path for what is static per-table KV and adds lifecycle questions (drop, backfill). A view over `table_options` covers the same access pattern.
+- **Column comments only.** Discovery (`WHERE signal_type = 'trace'`) becomes a full-text problem. Comments are good for column-level supplements, not for identity.
+- **Encode everything into the table name.** What we do today. Every new field becomes a new naming convention.
+
+# Open Questions
+
+1. **Namespace prefix.** `greptime.semantic.*` vs. bare `semantic.*`. v1 picks the vendored prefix; alias or migrate if a community standard later emerges.
+2. **Prom RW injection point.** Metric-engine logical tables share physical storage, so per-logical-table options need a hook that does not exist as cleanly as the OTLP trace branch. A short spike before Phase 1 lands for Prom RW.
+3. **Mixed-type metric tables.** When ingestion modes that pack multiple metric types into one table appear, `metric.type` migrates from table-level to row-level. v1 leaves a `metric.type = 'mixed'` marker and punts.
+4. **Stability surface.** Top-level keys (`signal_type`, `source`) are stable; sub-namespaces (`metric.*`, ...) are evolving until v1.0 of the layer is declared.
+
+# Future Work
+
+- **Cross-table relationships.** Paired trace/services tables, metric/info pairing, JOIN hints. Its own RFC.
+- **Producer SDK/client identity.** An optional `greptime.semantic.source.sdk` key recording the emitting client (e.g. `opentelemetry-go`, `opentelemetry-java`, `opentelemetry-collector`). Because a single table can receive data from multiple SDKs (a shared trace table is the common case), mixed producers collapse to `mixed`, following the same conflict rule as the table-level keys above.
+- **Backfill** for tables created before this feature shipped.
+- **Upstream proposal.** Carry the shape into a community proposal — likely an OTEP for an OTLP-Catalog read API plus an MCP binding — informed by Greptime's local usage data.
+
+# References
+
+OpenTelemetry:
+- [OTLP specification](https://opentelemetry.io/docs/specs/otlp/)
+- [OTel Schemas (`schema_url`)](https://opentelemetry.io/docs/specs/otel/schemas/)
+- [Semantic Conventions](https://opentelemetry.io/docs/specs/semconv/)
+- [OTEP-0243: App Telemetry Schema](https://github.com/open-telemetry/oteps/blob/main/text/0243-app-telemetry-schema-vision-roadmap.md)
+- [OpAMP specification](https://github.com/open-telemetry/opamp-spec/blob/main/specification.md)
+- [Weaver: Resolved Telemetry Schema](https://github.com/open-telemetry/weaver)
+- [2025 Stability Proposal](https://opentelemetry.io/blog/2025/stability-proposal-announcement/)
+
+Prometheus / OpenMetrics:
+- [Prometheus Remote Write 1.0](https://prometheus.io/docs/specs/prw/remote_write_spec/)
+- [Prometheus Remote Write 2.0](https://prometheus.io/docs/specs/prw/remote_write_spec_2_0/)
+- [Prometheus exposition formats](https://prometheus.io/docs/instrumenting/exposition_formats/)
+- [Prometheus HTTP API: `/api/v1/metadata`](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-metric-metadata)
+
+Units and conventions:
+- [UCUM — Unified Code for Units of Measure](https://ucum.org/)
+
+GreptimeDB:
+- [OTLP ingestion guide](https://docs.greptime.com/user-guide/ingest-data/for-observability/opentelemetry/)
+- [Trace data model](https://docs.greptime.com/user-guide/traces/data-model/)
diff --git a/src/catalog/src/kvbackend/table_cache.rs b/src/catalog/src/kvbackend/table_cache.rs
index 42b3fbc74b..13f74a48c9 100644
--- a/src/catalog/src/kvbackend/table_cache.rs
+++ b/src/catalog/src/kvbackend/table_cache.rs
@@ -14,7 +14,9 @@
use std::sync::Arc;
-use common_meta::cache::{CacheContainer, Initializer, TableInfoCacheRef, TableNameCacheRef};
+use common_meta::cache::{
+ CacheContainer, InitStrategy, Initializer, TableInfoCacheRef, TableNameCacheRef,
+};
use common_meta::error::{Result as MetaResult, ValueNotExistSnafu};
use common_meta::instruction::CacheIdent;
use futures::future::BoxFuture;
@@ -38,7 +40,14 @@ pub fn new_table_cache(
) -> TableCache {
let init = init_factory(table_info_cache, table_name_cache);
- CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
+ CacheContainer::with_strategy(
+ name,
+ cache,
+ Box::new(invalidator),
+ init,
+ filter,
+ InitStrategy::VersionChecked,
+ )
}
fn init_factory(
diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs
index 3c106bd43f..7f2057bc97 100644
--- a/src/cmd/src/datanode.rs
+++ b/src/cmd/src/datanode.rs
@@ -79,7 +79,7 @@ impl App for Instance {
}
async fn start(&mut self) -> Result<()> {
- plugins::start_datanode_plugins(self.datanode.plugins())
+ plugins::start_datanode_plugins(&self.datanode)
.await
.context(StartDatanodeSnafu)?;
diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs
index 6228cbd3f3..32d9070ec6 100644
--- a/src/cmd/src/flownode.rs
+++ b/src/cmd/src/flownode.rs
@@ -90,7 +90,7 @@ impl App for Instance {
}
async fn start(&mut self) -> Result<()> {
- plugins::start_flownode_plugins(self.flownode.flow_engine().plugins().clone())
+ plugins::start_flownode_plugins(&self.flownode)
.await
.context(StartFlownodeSnafu)?;
diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs
index da2c111e7c..cbc07d10e9 100644
--- a/src/cmd/src/frontend.rs
+++ b/src/cmd/src/frontend.rs
@@ -95,8 +95,7 @@ impl App for Instance {
}
async fn start(&mut self) -> Result<()> {
- let plugins = self.frontend.instance.plugins().clone();
- plugins::start_frontend_plugins(plugins)
+ plugins::start_frontend_plugins(&self.frontend.instance)
.await
.context(error::StartFrontendSnafu)?;
diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs
index bf3cb2f5e7..e30b115ada 100644
--- a/src/cmd/src/metasrv.rs
+++ b/src/cmd/src/metasrv.rs
@@ -68,7 +68,7 @@ impl App for Instance {
}
async fn start(&mut self) -> Result<()> {
- plugins::start_metasrv_plugins(self.instance.plugins())
+ plugins::start_metasrv_plugins(&self.instance)
.await
.context(StartMetaServerSnafu)?;
diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs
index e0f2c673ff..7d99e99554 100644
--- a/src/cmd/src/standalone.rs
+++ b/src/cmd/src/standalone.rs
@@ -164,7 +164,7 @@ impl App for Instance {
.start(self.leader_services_context.clone())
.await?;
- plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
+ plugins::start_frontend_plugins(&self.frontend.instance)
.await
.context(error::StartFrontendSnafu)?;
diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs
index 6cffcd67c2..cee29e4456 100644
--- a/src/cmd/tests/load_config_test.rs
+++ b/src/cmd/tests/load_config_test.rs
@@ -114,6 +114,7 @@ fn test_load_frontend_example_config() {
component: FrontendOptions {
default_timezone: Some("UTC".to_string()),
default_column_prefix: Some("greptime".to_string()),
+ auto_create_table: true,
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
@@ -267,6 +268,7 @@ fn test_load_standalone_example_config() {
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
default_column_prefix: Some("greptime".to_string()),
+ auto_create_table: true,
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)),
sync_period: Some(Duration::from_secs(10)),
diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml
index 470b5371f7..8b4053db2f 100644
--- a/src/common/datasource/Cargo.toml
+++ b/src/common/datasource/Cargo.toml
@@ -33,6 +33,7 @@ datatypes.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
+object_store_opendal.workspace = true
orc-rust = { version = "0.8", default-features = false, features = ["async"] }
parquet.workspace = true
paste.workspace = true
diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs
index a6a358c9e4..e36f94c0d2 100644
--- a/src/common/datasource/src/file_format.rs
+++ b/src/common/datasource/src/file_format.rs
@@ -316,7 +316,7 @@ pub async fn file_to_stream(
.with_file_compression_type(df_compression)
.build();
- let store = Arc::new(object_store::compat::OpendalStore::new(store.clone()));
+ let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone()));
let file_opener = config.file_source().create_file_opener(store, &config, 0)?;
let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())?;
diff --git a/src/common/datasource/src/file_format/tests.rs b/src/common/datasource/src/file_format/tests.rs
index 93ab3b4409..a925f73d48 100644
--- a/src/common/datasource/src/file_format/tests.rs
+++ b/src/common/datasource/src/file_format/tests.rs
@@ -44,7 +44,7 @@ struct Test<'a> {
impl Test<'_> {
async fn run(self, store: &ObjectStore) {
- let store = Arc::new(object_store::compat::OpendalStore::new(store.clone()));
+ let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone()));
let file_opener = self
.file_source
.create_file_opener(store, &self.config, 0)
diff --git a/src/common/datasource/src/object_store/oss.rs b/src/common/datasource/src/object_store/oss.rs
index aded3eca2c..aacc17ac5e 100644
--- a/src/common/datasource/src/object_store/oss.rs
+++ b/src/common/datasource/src/object_store/oss.rs
@@ -27,12 +27,14 @@ const ACCESS_KEY_ID: &str = "access_key_id";
const ACCESS_KEY_SECRET: &str = "access_key_secret";
const ROOT: &str = "root";
const ALLOW_ANONYMOUS: &str = "allow_anonymous";
+const SKIP_SIGNATURE: &str = "skip_signature";
/// Check if the key is supported in OSS configuration.
pub fn is_supported_in_oss(key: &str) -> bool {
[
ROOT,
ALLOW_ANONYMOUS,
+ SKIP_SIGNATURE,
BUCKET,
ENDPOINT,
ACCESS_KEY_ID,
@@ -61,18 +63,23 @@ pub fn build_oss_backend(
builder = builder.access_key_secret(access_key_secret);
}
- if let Some(allow_anonymous) = connection.get(ALLOW_ANONYMOUS) {
- let allow = allow_anonymous.as_str().parse::().map_err(|e| {
+ if let Some((key, value)) = connection
+ .get(SKIP_SIGNATURE)
+ .map(|value| (SKIP_SIGNATURE, value))
+ .or_else(|| {
+ connection
+ .get(ALLOW_ANONYMOUS)
+ .map(|value| (ALLOW_ANONYMOUS, value))
+ })
+ {
+ let skip_signature = value.as_str().parse::().map_err(|e| {
error::InvalidConnectionSnafu {
- msg: format!(
- "failed to parse the option {}={}, {}",
- ALLOW_ANONYMOUS, allow_anonymous, e
- ),
+ msg: format!("failed to parse the option {}={}, {}", key, value, e),
}
.build()
})?;
- if allow {
- builder = builder.allow_anonymous();
+ if skip_signature {
+ builder = builder.skip_signature();
}
}
@@ -93,6 +100,7 @@ mod tests {
fn test_is_supported_in_oss() {
assert!(is_supported_in_oss(ROOT));
assert!(is_supported_in_oss(ALLOW_ANONYMOUS));
+ assert!(is_supported_in_oss(SKIP_SIGNATURE));
assert!(is_supported_in_oss(BUCKET));
assert!(is_supported_in_oss(ENDPOINT));
assert!(is_supported_in_oss(ACCESS_KEY_ID));
diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs
index 0a13d9c6e8..ea2b0c768c 100644
--- a/src/common/datasource/src/test_util.rs
+++ b/src/common/datasource/src/test_util.rs
@@ -103,7 +103,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
test_util::TEST_BATCH_SIZE,
schema.clone(),
FileCompressionType::UNCOMPRESSED,
- Arc::new(object_store::compat::OpendalStore::new(store.clone())),
+ Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
true,
);
@@ -157,7 +157,7 @@ pub async fn setup_stream_to_csv_test(
let csv_opener = csv_source
.create_file_opener(
- Arc::new(object_store::compat::OpendalStore::new(store.clone())),
+ Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
&config,
0,
)
diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs
index f16290937a..c26a0fab76 100644
--- a/src/common/meta/src/cache.rs
+++ b/src/common/meta/src/cache.rs
@@ -17,7 +17,7 @@ mod flow;
mod registry;
mod table;
-pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter};
+pub use container::{CacheContainer, InitStrategy, Initializer, Invalidator, TokenFilter};
pub use flow::{TableFlownodeSetCache, TableFlownodeSetCacheRef, new_table_flownode_set_cache};
pub use registry::{
CacheRegistry, CacheRegistryBuilder, CacheRegistryRef, LayeredCacheRegistry,
diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs
index ddfb0c0759..8a419176c9 100644
--- a/src/common/meta/src/ddl/create_flow.rs
+++ b/src/common/meta/src/ddl/create_flow.rs
@@ -437,11 +437,13 @@ pub fn defer_on_missing_source(flow_task: &CreateFlowTask) -> Result {
pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> {
for key in flow_task.flow_options.keys() {
match key.as_str() {
- DEFER_ON_MISSING_SOURCE_KEY | FlowType::FLOW_TYPE_KEY => {}
+ DEFER_ON_MISSING_SOURCE_KEY
+ | FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY
+ | FlowType::FLOW_TYPE_KEY => {}
unknown => {
return UnexpectedSnafu {
err_msg: format!(
- "Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}"
+ "Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}, {FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY}"
),
}
.fail();
@@ -487,6 +489,9 @@ pub enum FlowType {
Streaming,
}
+pub const FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY: &str =
+ "experimental_enable_incremental_read";
+
impl FlowType {
pub const BATCHING: &str = "batching";
pub const STREAMING: &str = "streaming";
diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs
index a1a6c040f1..7150be39cb 100644
--- a/src/common/meta/src/ddl/tests/create_flow.rs
+++ b/src/common/meta/src/ddl/tests/create_flow.rs
@@ -24,8 +24,9 @@ use table::table_name::TableName;
use crate::ddl::DdlContext;
use crate::ddl::create_flow::{
- CreateFlowData, CreateFlowProcedure, CreateFlowState, DEFER_ON_MISSING_SOURCE_KEY, FlowType,
- defer_on_missing_source,
+ CreateFlowData, CreateFlowProcedure, CreateFlowState, DEFER_ON_MISSING_SOURCE_KEY,
+ FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType, defer_on_missing_source,
+ validate_flow_options,
};
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler;
@@ -275,6 +276,22 @@ fn test_defer_on_missing_source_invalid_value() {
);
}
+#[test]
+fn test_validate_flow_options_allows_incremental_read_option() {
+ let mut task = test_create_flow_task(
+ "my_flow",
+ vec![],
+ TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"),
+ false,
+ );
+ task.flow_options.insert(
+ FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(),
+ "true".to_string(),
+ );
+
+ validate_flow_options(&task).unwrap();
+}
+
#[tokio::test]
async fn test_create_flow_rejects_unknown_option_in_meta_task() {
let mut task = test_create_flow_task(
diff --git a/src/file-engine/Cargo.toml b/src/file-engine/Cargo.toml
index 6c8c9e887d..9d031cb279 100644
--- a/src/file-engine/Cargo.toml
+++ b/src/file-engine/Cargo.toml
@@ -29,6 +29,7 @@ datafusion-expr.workspace = true
datatypes.workspace = true
futures.workspace = true
object-store.workspace = true
+object_store_opendal.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
snafu.workspace = true
diff --git a/src/file-engine/src/query/file_stream.rs b/src/file-engine/src/query/file_stream.rs
index eec8f8961d..a480a50374 100644
--- a/src/file-engine/src/query/file_stream.rs
+++ b/src/file-engine/src/query/file_stream.rs
@@ -61,7 +61,7 @@ fn build_record_batch_stream(
.with_file_group(FileGroup::new(files))
.build();
- let store = Arc::new(object_store::compat::OpendalStore::new(
+ let store = Arc::new(object_store_opendal::OpendalStore::new(
scan_plan_config.store.clone(),
));
diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs
index 580762a142..a8bd139d98 100644
--- a/src/flow/src/batching_mode.rs
+++ b/src/flow/src/batching_mode.rs
@@ -23,7 +23,6 @@ use session::ReadPreference;
mod checkpoint;
pub(crate) mod engine;
pub(crate) mod frontend_client;
-mod incremental_filter;
mod state;
mod table_creator;
mod task;
@@ -55,6 +54,10 @@ pub struct BatchingModeOptions {
pub experimental_max_filter_num_per_query: usize,
/// Time window merge distance
pub experimental_time_window_merge_threshold: usize,
+ /// Whether to enable experimental flow incremental source reads.
+ ///
+ /// When disabled, batching flows always execute full-snapshot queries.
+ pub experimental_enable_incremental_read: bool,
/// Read preference of the Frontend client.
pub read_preference: ReadPreference,
/// TLS option for client connections to frontends.
@@ -72,6 +75,7 @@ impl Default for BatchingModeOptions {
experimental_frontend_scan_timeout: Duration::from_secs(30),
experimental_max_filter_num_per_query: 20,
experimental_time_window_merge_threshold: 3,
+ experimental_enable_incremental_read: false,
read_preference: Default::default(),
frontend_tls: None,
}
diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs
index f37e54d80b..68fb3793e4 100644
--- a/src/flow/src/batching_mode/engine.rs
+++ b/src/flow/src/batching_mode/engine.rs
@@ -21,7 +21,7 @@ use std::time::Duration;
use api::v1::flow::DirtyWindowRequests;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
-use common_meta::ddl::create_flow::FlowType;
+use common_meta::ddl::create_flow::{FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::flow::flow_state::FlowStat;
@@ -38,6 +38,7 @@ use session::context::QueryContext;
use snafu::{OptionExt, ResultExt, ensure};
use sql::parsers::utils::is_tql;
use store_api::metric_engine_consts::is_metric_engine_internal_column;
+use store_api::mito_engine_options::APPEND_MODE_KEY;
use store_api::storage::{RegionId, TableId};
use table::table_reference::TableReference;
use tokio::sync::{RwLock, oneshot};
@@ -428,6 +429,55 @@ async fn get_table_info(
}
impl BatchingEngine {
+ fn batch_opts_for_flow_options(
+ &self,
+ flow_options: &HashMap,
+ ) -> Result, Error> {
+ let mut batch_opts = (*self.batch_opts).clone();
+ if let Some(enable_incremental_read) =
+ flow_options.get(FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY)
+ {
+ batch_opts.experimental_enable_incremental_read = enable_incremental_read
+ .parse::()
+ .map_err(|_| {
+ InvalidQuerySnafu {
+ reason: format!(
+ "Invalid flow option {FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY}: {enable_incremental_read}"
+ ),
+ }
+ .build()
+ })?;
+ }
+
+ Ok(Arc::new(batch_opts))
+ }
+
+ fn table_options_enable_append_mode(extra_options: &HashMap) -> bool {
+ extra_options
+ .get(APPEND_MODE_KEY)
+ .is_some_and(|value| value.eq_ignore_ascii_case("true"))
+ }
+
+ fn ensure_incremental_source_append_only(
+ batch_opts: &BatchingModeOptions,
+ table_name: &[String; 3],
+ extra_options: &HashMap,
+ ) -> Result<(), Error> {
+ if batch_opts.experimental_enable_incremental_read {
+ ensure!(
+ Self::table_options_enable_append_mode(extra_options),
+ UnsupportedSnafu {
+ reason: format!(
+ "Flow incremental read requires append-only source table, but source table `{}` is not append-only. Consider setting append_mode='true' on the source table or disabling experimental_enable_incremental_read",
+ table_name.join(".")
+ ),
+ }
+ );
+ }
+
+ Ok(())
+ }
+
pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result