Stiahnite si kód:
kafka-project.zip
producer_server_logs.py generuje správy do topicu SERVER_LOGS_RAW_TOPIC (je nastavený v constants.py). NEPÚŠŤAJTE tohto producera, máte ho vo vstupoch preto, aby ste si mohli pozrieť kód. Funguje veľmi podobne ako na druhom kafka cvičení, s nasledovnými zmenami:
- Produkuje záznamy v logu usporiadané podľa času.
- Produkuje cca 20% prázdnych záznamov. Prázdny záznam má hodnoty pre všetky kľúče buď None alebo prázdny reťazec "".
- Produkuje cca 20% nevalidných záznamov (= záznamov s nekompletným identifikátorom). Nevalidný záznam nie je prázdny, ale má hodnotu pre aspoň jeden z kľúčov "event", "user_id", "timestamp" rovnú None alebo prázdnemu reťazcu "".
- Produkuje cca 20% duplicitných záznamov. Záznam pokladáme za unikátny, ak má unikátnu kombináciu (neprázdnych) hodnôt pre trojicu kľúčov: "event", "user_id", "timestamp".
V súbore constants.py si prepíšte {surname} na Vaše priezvisko. Zároveň si cez Kafka CLI tools vytvorte nasledovné topicy (opäť {surname} nahraďte):
- server_logs_cleaned_{surname}
- data_quality_monitor_{surname}
Vašou úlohou je naprogramovať program clean_server_logs.py, ktorý bude vykonávať dve úlohy
- Čistiť dáta z topicu SERVER_LOGS_RAW_TOPIC - t.j. bude odfiltrovávať prázdne, nevalidné a duplicitné záznamy. Všetky ostatné validné záznamy bude posielať do topicu SERVER_LOGS_CLEANED_TOPIC (je nastavený v constant.py).
- Posielať report o počte prázdnych / nevalidných / duplicitných záznamov za určené časové okno do topicu DATA_QUALITY_MONITOR_TOPIC, pričom platí
- Dĺžka časového okna v sekundách je určená v konštante TIME_WINDOW_SIZE_SEC (viď constants.py)
- Rozmyslite si, podľa ktorých časov je pre tento use-case najlepšie počítať časové okno. Máme na výber (1) timestamp zaslaný producerom explicitne v obsahu správy, (2) message.timestamp, ktorý napĺňa Kafka a obsahuje čas príchodu správy do daného brokera a (3) lokálny čas consumera. Zdôvodnite v komentári v kóde Vaše rozhodnutie. Po uplynutí časového okna pošlite do DATA_QUALITY_MONITOR_TOPIC správu vo forme
data_quality_report = {
"timestamp_start": # TODO zaciatok casoveho okna, pouzite format strftime('%Y-%m-%dT%H:%M:%S') aplikovany na premennu typu datetime,
"timestamp_end": # TODO koniec casoveho okna, pouzite format strftime('%Y-%m-%dT%H:%M:%S') aplikovany na premennu typu datetime,
"empty_messages_count": # TODO pocet prazdnych zaznamov
"invalid_messages_count": # TODO pocet nevalidnych zaznamov
"duplicate_messages_count": # TODO pocet duplikatov
}
Následne si treba vynulovať počítadlá pre prázdne/nevalidné/duplicitné správy (počet sa nemá akumulovať, počítajte presne počet daného typu správ za dané časové okno)..
- Nepúšťajte producer_server_logs.py, v topicu SERVER_LOGS_RAW_TOPIC je nagenerovaný dostatočný počet správ. Nakonfigurujte si Vášho consumera s atribútom auto_offset_reset='earliest' a nedávajte ho do consumer groupy, aby Vám pri každom spustení spracoval všetky správy v topicu SERVER_LOGS_RAW_TOPIC od začiatku. Po skonzumovaní a spracovaní jednej správy nastavte v consumerovi delay pomocou time.sleep(1) - takto budete môcť sledovať postupné spracovanie správ.
- Odporúčam si pre účely testovania logovať rozpoznanie typu záznamu (validný / prázdny / nevalidný / duplicitný) na konzolu alebo do súboru a takisto logovať zaslanie reportu do topicu DATA_QUALITY_MONITOR_TOPIC.
Voliteľné:
Keď budete mať program hotový (a korektne fungujúci), spustite si ho, a môžete si vyskúšať popri ňom pustiť aj jednoduchý real-time dashboard pomocou
streamlit run .\data_quality_dashboard.py
(vyžaduje pip install streamlit). Do dashboardu sa konzumujú správy z topicu DATA_QUALITY_MONITOR_TOPIC a zobrazujú sa postupne v grafe - pri aktuálnom nastavení časového okna sa nová hodnota načíta každých 20 sekúnd.