Flink solves the challenge of reusing SQL semantics in stream processing and continuously updating results by modeling unbounded streams as dynamic tables, executing continuous queries, and propagating incremental results through changelogs. Keywords: dynamic tables, continuous queries, changelog.
The technical specification snapshot summarizes the core context
| Parameter | Description |
|---|---|
| Core topic | Flink Table API / SQL |
| Language | Java, SQL |
| License | CC 4.0 BY-SA (as declared in the original source) |
| Stars | Not provided in the original content |
| Core dependencies | Apache Flink, Kafka, Upsert-Kafka, Hudi, Paimon |
Flink unifies stream and batch semantics through dynamic tables
A dynamic table is not a physical table. It is Flink’s logical abstraction for an unbounded data stream. Each event in the stream can be treated as a change to the table. This allows developers to write SQL against data as if they were operating on a static table, instead of orchestrating operators event by event.
The value of this abstraction lies in unification. Batch processing works on static, finite tables, while stream processing works on continuously arriving data. In Flink, both map to relational algebra semantics. As a result, Flink builds a two-way bridge between streams and tables: a stream is the change log of a table, and a table is the higher-level view of a stream.
The core characteristics of dynamic tables make them suitable for real-time analytics
A dynamic table starts empty and keeps changing as stream data arrives. It supports insert, update, and delete operations, so there is always a current snapshot at any point in time. For developers, the interface is a table; for the runtime, what actually flows through the system is a sequence of changes.
| Perspective | Data representation |
|---|---|
| Stream view | Events such as (Alice, +1) and (Bob, +1) |
| Table view | A logical table that evolves over time and can be queried with SQL |
// After stream data arrives, Flink interprets it as an insertion into the dynamic table
Row row = Row.of("Alice", 1); // Represents one business record
// At the Table/SQL semantic layer, it drives a change in the dynamic table state
This code shows that a dynamic table is not an independent storage object. It is a logical representation built on stream events and state evolution.
Continuous queries are fundamentally incremental computations that never stop
The biggest difference between a continuous query and a batch query is not SQL syntax, but the execution model. A batch query runs once and terminates. A continuous query keeps consuming the input stream and continuously updating the result table. As long as the job keeps running, the result keeps evolving.
This model depends on state. GROUP BY, window aggregations, joins, and deduplication all maintain intermediate state and update results incrementally when new data arrives, rather than replaying the full dataset. This is one of the key reasons Flink can deliver both low latency and high throughput.
Hourly click aggregation directly demonstrates continuous query behavior
The following SQL continuously aggregates a click stream by user and 1-hour tumbling window. When the watermark advances past the window end, Flink emits the window result. If early firing is enabled, it can also produce intermediate updates.
SELECT
user_id,
COUNT(*) AS click_cnt,
TUMBLE_START(ts, INTERVAL '1' HOUR) AS w_start -- Compute the window start time
FROM clicks
GROUP BY
user_id,
TUMBLE(ts, INTERVAL '1' HOUR); -- Aggregate by user and hourly tumbling window
This SQL continuously counts clicks for each user in each hourly window and outputs window-level aggregation results.
A typical output might be +I (Alice, 5, 2024-06-01 10:00). If the window result is revised before the window closes, you may instead see update messages such as -U/+U. This means the SQL result is not a one-time artifact, but a projection of the latest state of the dynamic table.
The changelog mechanism carries data between dynamic tables and result streams
Changelog is the underlying communication language of Flink Table/SQL. Upstream operators do not repeatedly send the full result table. They send only the parts that changed. This incremental transmission model significantly reduces network, state, and serialization overhead.
Flink uses RowKind to represent four standard change types: +I for insert, -U for the old value before an update, +U for the new value after an update, and -D for delete. Understanding these four message types is foundational to understanding the Flink SQL execution pipeline.
RowKind determines the actual semantics of each record
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
// Build a record with an explicit change type
Row row = Row.withKind(RowKind.INSERT, 1001, "Jack"); // Indicates an inserted row
System.out.println(row); // Possible output: +I[1001, Jack]
This code shows that Flink does not transmit a plain Row at runtime. It transmits a combination of row data plus change type.
| Changelog type | Enum value | Meaning |
|---|---|---|
+I |
INSERT | Insert a new record |
-U |
UPDATE_BEFORE | The old value before an update |
+U |
UPDATE_AFTER | The new value after an update |
-D |
DELETE | Delete a record |
Flink supports three encoding modes for different update scenarios
Although changelog semantics are fixed, the physical encoding can differ. The three most common modes in Flink are Append-only, Retract, and Upsert. The biggest difference among them is how they express updates and whether the sink can consume them correctly.
Append-only is the simplest mode and sends only +I, which makes it suitable for pure append streams. Retract represents an update as delete old plus insert new. It is highly compatible, but it produces more messages. Upsert relies on a primary key and sends the latest value directly, which is often the preferred choice in production.
You should check for a primary key before choosing an encoding mode
| Encoding mode | Update rule | Primary key required | Typical scenario |
|---|---|---|---|
| Append-only | Only +I |
No | Pure append writes |
| Retract | -D + +I |
No | Update scenarios without a primary key |
| Upsert | +I/+U/-D |
Yes | Aggregation results, dimension table results |
// Output the full changelog for debugging the actual changelog shape
tableEnv.toChangelogStream(table, ChangelogMode.all()).print(); // Force inspection of all change types
This code helps debug dynamic table results and verify whether the job actually outputs an append stream or an update stream.
Whether the sink understands changelog semantics directly determines correctness
Not every external system supports updates and deletes. If you write results containing -U/+U/-D directly into a system that supports append only, you will get duplicates, dirty data, or broken semantics. Sink compatibility is therefore not optional. It is part of Flink SQL design.
For real-time aggregation, denormalized dimension tables, and operational dashboards, sinks that support update semantics such as Upsert-Kafka, Hudi, and Paimon are usually a better fit. Plain Kafka can forward the changelog as-is, but downstream consumers must parse rowkind themselves.
Upsert-Kafka is a common choice for materializing aggregation results
CREATE TABLE user_clicks_sink (
user_id STRING,
total_clicks BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED -- Declare a primary key to enable Upsert semantics
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'user-clicks-result',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
);
INSERT INTO user_clicks_sink
SELECT user_id, COUNT(*)
FROM clicks
GROUP BY user_id; -- Continuously output the latest aggregate value for each user
This SQL continuously writes changing aggregation results into Kafka as key-based overwrites, which avoids storing duplicate historical versions.
{"rowkind":"+I","fields":["Alice",1]}
{"rowkind":"-U","fields":["Alice",1]}
{"rowkind":"+U","fields":["Alice",2]}
This JSON snippet shows the full change sequence commonly seen when plain Kafka is used as a transport layer. Downstream systems must explicitly handle the before-and-after update values.
Understanding the full execution path helps you design stable real-time data warehouse jobs
A Flink SQL job typically goes through three steps: the input stream is mapped to a dynamic table, the continuous query performs incremental computation on that dynamic table, and the resulting dynamic table is encoded as a changelog and written to an external system. All three steps are essential.
When modeling a pipeline, you should therefore think about three things at the same time: whether the source table is append-only or CDC-based, whether the query produces updates, and whether the target sink supports the required semantics. Many production incidents are caused not by operator bugs, but by semantic misalignment across these three layers.
You should follow clear production rules when materializing results
- Prefer an Append-only path for pure append results.
- Prefer Upsert for aggregations and dimension table results with a primary key.
- Use Retract when updates exist without a primary key, and evaluate the message amplification cost.
- Use
toChangelogStream().print()during debugging to verify the actual output. - Do not write update streams directly into file or log systems that support append only.
FAQ with structured answers
What is the fundamental difference between a Flink dynamic table and a regular database table?
A regular database table is usually a persistent storage object, while a Flink dynamic table is a logical view built from an unbounded stream and maintained through state. The former emphasizes static queries and transactions; the latter emphasizes continuous evolution and incremental computation.
Why do aggregation queries often produce -U/+U instead of only +I?
Because aggregation results keep changing as new data arrives. When the historical result for the same key must be revised, Flink needs to express that the old value is no longer valid and the new value has taken effect. That is why update-style changelog messages appear.
When should you prefer Upsert-Kafka?
You should prefer Upsert-Kafka when the result table has a clear primary key and downstream consumers care only about the latest value rather than the full change history. It reduces the number of update messages and is naturally well suited for real-time aggregations, dimension synchronization, and dashboard workloads.
The core summary ties together Flink’s streaming SQL abstractions
This article systematically explains the three core abstractions in Flink Table/SQL: dynamic tables, continuous queries, and changelog. It highlights RowKind, the three encoding modes—Append, Retract, and Upsert—and sink adaptation principles for systems such as Upsert-Kafka, helping developers build a unified understanding of stream-batch integration and incremental computation.