Why does my Kafka sink output show inconsistent initial values before converging to zero?
I'm running a test that simulates bank transactions, and I'm observing inconsistent behavior at the beginning of the output. The total view should always start at "0", but instead, it oscillates. After a while, it does converge to "0". I'm using SQL code to create tables, views, and materialized views in a streaming database, and Python code to produce messages to a Kafka topic. Here's the SQL and Python code I'm using:
-- SQL code for creating tables and views
create table if not exists transactions (
id int, from_account int, to_account int, amount int, ts timestamp
)
with (
connector='kafka',
topic='transactions',
properties.bootstrap.server='message_queue:29092',
scan.startup.mode='earliest',
scan.startup.timestamp_millis='140000000'
)
row format json;
create view accounts as select from_account as account from transactions union select to_account from transactions;
create materialized view credits as select transactions.to_account as account, sum(transactions.amount) as credits from transactions left join accounts on transactions.to_account = accounts.account group by to_account;
create materialized view debits as select transactions.from_account as account, sum(transactions.amount) as debits from transactions left join accounts on transactions.from_account = accounts.account group by from_account;
create materialized view balance as select credits.account as account, credits - debits as balance from credits inner join debits on credits.account = debits.account;
create materialized view total as select sum(balance) from balance;
create sink total_sink from total
with (
connector='kafka',
properties.bootstrap.server='message_queue:29092',
topic='total',
type='append-only',
force_append_only='true'
);
# Python code for producing messages to Kafka
import random
import json
import datetime
import time
from kashpy.kash import *
c = Cluster("local")
id = 0
random.seed(42)
while True:
row = json.dumps({
'id': id,
'from_account': random.randint(0,9),
'to_account': random.randint(0,9),
'amount': 1,
'ts': datetime.datetime.now().isoformat()
})
print(row)
c.produce("transactions", row, key=str(id))
if id % 1000 == 0:
c.flush()
id += 1
time.sleep(1)
The output on the sink topic starts with inconsistent values like -3
, -4
, etc., before it stabilizes at 0
. I'm looking for an explanation and a way to fix this initial inconsistency.
Ralph Matthias Debusmann
Asked on May 25, 2023
The inconsistency at the beginning of the Kafka sink output is likely due to the way the streaming database handles events and barriers. When a new account is created, it emits an event, which might not fall on the barrier boundary, leading to temporary inconsistency. Once all accounts have been initialized, events tend to be emitted on the barrier, resulting in consistent results. The balance
view, calculated by an inner join
, could miss some accounts if credits
and debits
are not initialized with all accounts from the start. This is why the introduction of the accounts
view helped stabilize the output in Materialize. The current Kafka sink implementation provides eventual consistency rather than strong consistency, but strong consistency could be supported if there is sufficient user demand.