Autor: Trifan Bogdan-Cristian (331CD)
Am dezvoltat backend-ul unui server web în Python cu framework-ul Flask pentru a analiza și procesa date statistice despre activitatea fizică și obezitatea din SUA (2011–2022).
Server-ul este capabil să proceseze simultan mai multe request-uri HTTP, datorită implementării design pattern-ul Replicated Workers (numit și Thread Pool).
Metode pentru procesarea datelor:
| Metoda HTTP | URI | JSON payload |
|---|---|---|
POST |
http://127.0.0.1:5000//api/states_mean |
{"question": "..."} |
POST |
http://127.0.0.1:5000//api/state_mean |
{"question": "...", "state": "..."} |
POST |
http://127.0.0.1:5000//api/best5 |
{"question": "..."} |
POST |
http://127.0.0.1:5000//api/worst5 |
{"question": "..."} |
POST |
http://127.0.0.1:5000//api/global_mean |
{"question": "..."} |
POST |
http://127.0.0.1:5000//api/diff_from_mean |
{"question": "..."} |
POST |
http://127.0.0.1:5000//api/state_diff_from_mean |
{"question": "...", "state": "..."} |
POST |
http://127.0.0.1:5000//api/mean_by_category |
{"question": "..."} |
POST |
http://127.0.0.1:5000//api/state_mean_by_category |
{"question": "...", "state": "..."} |
Metode pentru controlul serverului/server info:
| Metoda HTTP | URI |
|---|---|
GET |
http://127.0.0.1:5000//api/graceful_shutdown |
GET |
http://127.0.0.1:5000//api/num_jobs |
GET |
http://127.0.0.1:5000//api/jobs |
GET |
http://127.0.0.1:5000//api/get_results/<job_id> |
<job_id>este un placeholder: în locul său se va trece un număr întreg
Un lucru cu adevărat interesant pe care l-am învățat în aceast proiect a fost cum să-mi testez API-ul construit (o situație reală la un posibil viitor loc de muncă 🤓).
Am folosit Restfox (alternativa lightweight a lui Postman) pentru a analiza răspunsurile serverului, comportamentul "bazei de date" și logurile.
Dacă nu aș fi rulat request-urile mai întâi secvențial din Restfox,
nu aș fi descoperit dead-lock-uri sau gestionarea greșită a mutex-urilor pe fișiere,
probleme care mi-au limitat procesarea datelor la un singur request pe secundă.
La pornirea server-ului se citește fișierul CSV și se încarcă în memorie doar coloanele de interes, în funcție de care se va realiza selecția ulterioară a datelor.
Metodele clasei DataIngestor
filtrează liniile tabelului în funcție de question și state,
iar mai apoi calculează următoarele operații statistice:
- 📌 Media valorilor pentru fiecare stat
- 🌎 Media globală a valorilor
- 📊 Deviția de la medie
- 🔝 Top 5 cele mai bune/slabe rezultate
Rezultatul acestor funcții, un JSON (sub forma unui dicționar), este inclus în răspunsul cererilor HTTP ce presupun procesări de date.
În programarea paralelă, modelul Replicated Workers (sau Thread Pool) este folosit pentru obținerea de concurență în execuția unui program: în cazul de față, procesarea mai multor request-uri HTTP în același timp de către un server web.
Acest desgin pattern presupune implementarea a două componenta principale:
- Un pool de task-uri de executat, reprezentat de o coadă
-
Structura
Queue()din Python oferă, by default, operații thread-safe
-
- Un grup de workeri (thread-uri)
Numărul de thread-uri create va fi extras dintr-o variabilă de mediu, în absența căreia se vor inițializa atâtea thread-uri câte core-uri are procesorul.
num_threads = int(os.getenv("TP_NUM_OF_THREADS", os.cpu_count()))Thread Pool-ul se ocupă cu gestiunea în paralel a request-urilor de procesare de date,
apelând metodele corespunzătoare din clasa DataIngestor
(celelalte cereri la server fiind executate secvențial).
Pentru tratarea request-ului GET /api/graceful_shutdown,
am definit un Event() la nivelul instantei clasei ThreadPool,
care este activat la primirea acestei cereri HTTP,
declanșând astfel oprirea thread-urilor după ce toate request-urile de procesare de date au fost rezolvate.
Pentru a proteja accesul la fișierele bazei de date în contextul programării paralele,
mi-am implementat propriul Concurrent Hash Map (sub forma unui dictionar și unui Lock() privat)
pentru a stoca, într-un mod dinamic, mutex-uri doar pentru task-urile în curs de procesare.
Accesul la fișier necesită obținerea a două lock-uri: unul pentru Concurrent Hash Map și unul pentru fișier.
Fiecare mutex este activ doar în timpul procesării datelor și se dezalocă imediat după scrierea rezultatului pe disc. Folosirea acestui concept de lifetime (inspirat din Rust), impune ca numărul de mutex-uri pentru fișierele bazei de date să fie cel mult egal cu numărul de thread-uri, economisind astfel memorie.
Thread Pool-ul stochează joburile în procesare drept chei în Concurrent Hash Map, iar lock-urile pentru fișierele respective sunt valorile din dicționar.
Pentru a păstra un istoric persistent la restart al serverului,
am înregistrat activitatea în fișiere webserver.log, stocate pe disc.
⚠️ ATENȚIE! Pornirea serverului presupune resetarea activității de logging, ceea ce înseamnă că fișierele de monitorizare vor fi șterse.
Datorită faptului că mai multe thread-uri ar vrea să scrie simultan activitatea serverului,
am definit un lock privat, la nivelul clasei Logger,
pentru a proteja accesul la fișier.
Metoda log_message() a instantei clasei Logger() primește un mesaj,
pe care îl scrie alături de timestamp-ul curent,
în format 🕚 GMT (Greenwich Mean Time), un standard global, fix și independent de fusurile orare.
În loc să scriu toată activitatea de logging într-un singur fișier mare,
folosesc RotatingFileHandler pentru a impune o limită superioară.
Când webserver.log ajunge la dimensiunea de 10MB,
acesta se va redenumi în webserer.log.1, .2 până la .10.
log_handler = RotatingFileHandler(
"webserver.log",
maxBytes=1010241024, backupCount=10
)Pentru verificarea metodelor clasei DataIngestor,
mi-am creat 2 CSV-uri cu câte 10 intrări:
- Primul pentru querry-urile doar în funcție de "question"
- Al doilea fișier pentru procesările care iau și "state"-ul în considerare
Am încercat să fac clasa de testare cât de generic am putut, astfel încât să testeze metodele în funcție de toate fișierele input output din directoarele aferente. În plus, mi-am definit o singură funcție (de ordin superior) capabilă să testeze toate metodele de procesare (nu câte una pentru fiecate tip de request în parte). Drept urmare, codul meu este mult mai concis și ușor de urmărit.
Din directorul rădăcină al repo-ului:
source venv/bin/activate
PYTHONPATH=. python3 unittests/TestWebserver.py