Pratiche migliori per la Data Science, Parte 1 – Testa le tue query

Best Practices for Data Science, Part 1 - Test Your Queries

Come assicurarsi che le nostre query facciano quello che ci aspettiamo — e altri vantaggi futuri.

Generato con Midjourney

Il campo della Scienza dei Dati ha le sue radici nella Matematica, nelle Statistiche e nella Scienze dell’Informatica. Sebbene si sia sviluppato considerevolmente negli ultimi decenni, è solo negli ultimi 10-15 anni che è diventato una figura di spicco come ruolo ben definito all’interno delle organizzazioni e come campo indipendente nel settore tecnologico.

Essendo una professione relativamente giovane, le migliori pratiche nella Scienza dei Dati non hanno avuto sufficiente tempo per consolidarsi e non sono ben documentate. Questo è in contrasto con il campo tangenziale dell’Ingegneria del Software, che è molto più maturo e ricco di guide, strutture e metodologie che nel tempo si sono rivelate vantaggiose.

Sarebbe logico pensare che gli Scienziati dei Dati trarrebbero vantaggio dall’intersezione e dalla stretta collaborazione con gli Ingegneri del Software, specialmente per quanto riguarda la pratica. Purtroppo, spesso non è il caso, poiché molti Scienziati dei Dati sono o ignari di queste metodologie o non sono disposti a impararle, sostenendo che non siano rilevanti o non rientrino nel loro campo di responsabilità.

In questa serie di blog, vorrei condividere consigli, trucchi e approcci sistematici che possono essere utilizzati nel lavoro degli Scienziati dei Dati, con l’obiettivo di aumentare la correttezza e la stabilità del nostro codice, gestire meglio i nostri modelli e migliorare il lavoro di squadra.

La premessa

Partiamo da uno scenario che chiunque lavori con grandi quantità di dati si trova ad affrontare prima o poi, e alcuni di noi potrebbero addirittura affrontarlo quotidianamente:

Stai usando PySpark e vorresti estrarre alcune informazioni da una grande tabella. Non puoi memorizzare in memoria le enormi quantità di dati rilevanti, quindi sei costretto a fare tutte le trasformazioni, aggregazioni, join e quant’altro nel linguaggio delle query.

Inizi a scrivere la query e sei contento perché PySpark rende facile utilizzare un’API pythonica ed elegante anche quando la query è troppo complicata da spiegare ad altri esseri umani. Anche se decidi di utilizzare l’interfaccia SQL, lo fai con gioia.

Poi ti accorgi di aver dimenticato una colonna chiave in una chiamata groupBy e torni indietro per correggerla.

Poi ti accorgi che una delle funzioni di finestra manca della clausola orderBy.

Poi decidi che questo numero magico che hai usato nella quarta riga dovrebbe essere 1.25 invece di 1.2.

Fini per rileggere queste 20-50 righe della query avanti e indietro ripetutamente per 20-30 minuti, modificandola leggermente mentre ti avvicini alla struttura finale della query.

Quindi… Esegui la query e fallisce.

Attraversi ancora una volta le righe e le righe di codice che hai appena scritto, cercando di capire quali necessità logiche hai perso e risolvendole una per una.

Alla fine, la query viene eseguita e restituisce alcuni risultati.

Ma…

Chi ti assicura che questi risultati riflettano effettivamente ciò che stavi cercando di ottenere tutto questo tempo e che corrispondano al processo che hai in mente al momento?

È qui che il testing ci viene in aiuto.

Testing?

Sì. Ciò che facciamo è:

  1. Creato manualmente un piccolo dataset.
  2. Calcolato manualmente i risultati che desideriamo ottenere con la query.
  3. Applicato la query che abbiamo scritto a quel piccolo dataset.
  4. Confrontato i risultati della query con i nostri calcoli.

Se c’è una discrepanza, dobbiamo correggere qualcosa: o i nostri calcoli manuali erano errati o la query non esegue quello che ci aspettiamo. D’altra parte, se i risultati corrispondono, possiamo procedere al passaggio successivo.

Ora ti guiderò passo dopo passo nella struttura che utilizzo quando scrivo questi test.

Preparare l’ambiente

Iniziamo creando l’ambiente (detto anche fixture) di cui abbiamo bisogno per lavorare con PySpark. Potremmo eseguire numerosi casi di test in ogni esecuzione, quindi configuriamo la sessione PySpark a livello di modulo. Altrimenti, potremmo dover avviare e interrompere una sessione per ogni test, e questo comporta un costo non trascurabile.

Utilizzo il modulo unittest di Python, tuttavia se tu o altri membri del tuo team state utilizzando pytest o nose o qualsiasi altro framework di testing, confido che troverete un modo per eseguire queste azioni.

unittest ha due hook, setUpModule e tearDownModule, che vengono eseguiti prima e dopo i test, rispettivamente. Utilizzeremo questi hook per avviare e interrompere la nostra sessione PySpark.

# test_pyspark.py

import pysparkimport unittestspark: pyspark.sql.SparkSession | None = Nonedef setUpModule():    global spark    spark = get_spark_session('local')def tearDownModule():    global spark    if spark is None:        return    try:        spark.stop()    finally:        spark = None

Mi piace che la mia funzione per la creazione della sessione sia riutilizzabile, quindi eccola (compilero’ l’opzione non locale quando sarà necessario):

# query_pyspark.py

import pysparkdef get_spark_session(scope='local'):    if scope == 'local':        return (            pyspark.sql.SparkSession.builder            .appName('unit-tests')            .master('local[4]')        ).getOrCreate()    else:        ...  # TODO

Se il progetto diventa più grande, metterei questa funzione in un file di utilità specifico di PySpark, ma per ora manteniamo il progetto semplice e piatto.

Il nostro primo test

La prima cosa che testerò ora è se ottengo effettivamente una sessione quando eseguo questo test. Ecco il test:

# test_pyspark.py

class TestPysparkQueries(unittest.TestCase):    def test_session_created(self):        self.assertIsNotNone(spark)

E sai una cosa, eseguo il test (PyCharm ti consente di farlo direttamente dal codice, come puoi vedere dai pulsanti “play” verdi accanto al codice) e ottengo un messaggio OK:

Creazione e testing dei nostri dati

A questo punto possiamo iniziare a parlare dei dati. Dovresti avere a disposizione un piccolo dataset che copra diversi casi e che tu riesca ancora a gestire. In termini di dimensioni effettive, voterei solitamente per 20-50 righe a seconda del dominio e della complessità della query. Se sono coinvolti raggruppamenti, opta per 5-10 gruppi diversi.

A scopo didattico, ho creato un dataset di nomi e date di nascita. Assumerò per semplicità che tutte le persone con lo stesso cognome siano fratelli/sorelle. Ho anche introdotto casualità nell’ordine delle righe, per evitare che le query sensibili all’ordine ottengano la risposta corretta senza considerare l’ordine direttamente. I dati sono simili a questo:

Adesso è il momento di caricare i dati nella nostra sessione PySpark. Ma prima, creiamo un test di verifica per questo. A proposito, creare un test e solo successivamente scrivere il codice che fa passare il test fa parte della metodologia dello sviluppo guidato dai test (TDD), tuttavia non la predicizzo per i Data Scientist, solo la parte di testing.

In un test di verifica possiamo testare i nomi delle colonne, possiamo testare la dimensione del dataset, possiamo fare entrambe le cose o possiamo ideare test più approfonditi. Possiamo persino scrivere un test che confronta il file CSV con le righe del DataFrame una per una.

Più siamo rigorosi nella scrittura dei test, più saremo sicuri in seguito che il codice sia corretto, ma renderà anche più difficile apportare modifiche in futuro, ad esempio se vogliamo aggiungere/cambiare una riga nel dataset per testare un caso limite specifico?

Bilanciare questi fattori di velocità e correttezza fa parte dell’arte piuttosto che della scienza nel nostro campo di lavoro e verrà naturale con il tempo e la pratica.

# test_pyspark.py

    def test_load_data(self):        df = get_family_data()        self.assertEqual(25, df.count())

Successivamente, scriviamo la funzione per caricare i dati:

# query_pyspark.py

def get_family_data():    return (        get_spark_session(scope='local')        .read.csv(os.path.join(os.path.dirname(__file__),                   '../assets/data_sample.csv'))    )

E quando eseguo il test… Fallisce? Ma come è possibile?

Dopo aver controllato nuovamente il numero di righe e aver verificato che siano 25, finisco per aggiungere header=True al codice, e i test passano (non preoccuparti, risparmierò il finto dramma negli esempi successivi):

# query_pyspark.py

def get_family_data():    return (        get_spark_session(scope='local')        .read.csv(os.path.join(os.path.dirname(__file__),                   '../assets/data_sample.csv'), header=True)    )

Testare le nostre query

Ora è il momento del test specifico della query. Supponiamo che voglia ottenere il figlio più grande di ogni famiglia. Esamino i dati nel dataset manualmente (o utilizzo un foglio di calcolo ordinato) per trovare l’insieme esatto di nomi che mi aspetto di ottenere e lo inserisco nel mio test:

# test_pyspark.py

    def test_elder_child_query(self):        df = get_elder_child(get_family_data())        elders = {_.elder_child for _ in df.toLocalIterator()}        self.assertEqual(elders, {'Gus', 'Rita', 'Sam', 'Trent', 'Ursula'})

Ecco il codice che fa passare il test:

# query_pyspark.py

def get_elder_child(family_df: pyspark.sql.DataFrame):    return (        family_df        .orderBy(f.col('date_born').desc())        .groupby('last_name')        .agg(f.first('first_name').alias('elder_child'))    )

Nonostante ti abbia risparmiato il dramma, ti dirò che ho dovuto correggere la query diverse volte prima di far passare il test. Ad esempio, ho raggruppato per first_name, ho aggregato i valori di last_name e ho dimenticato di ordinare in modo decrescente.

Nel mio lavoro, i test mi hanno salvato da perdere credibilità molte volte.

Siamo finiti? Assolutamente no.

Dovremmo pensare a casi estremi, come ad esempio se abbiamo dei gemelli. Ci sono famiglie senza figli? E se i nostri dati non sono affidabili, che succede con i valori nulli?

Per ognuna di queste opzioni andremo nel nostro dataset, lo modificheremo per creare un caso del genere, quindi aggiorneremo il nostro test e il nostro codice.

Se incontriamo questi casi speciali attraverso bug che emergono successivamente (anche se non li abbiamo pensati noi stessi), faremo lo stesso: modificheremo il dataset per riflettere questi casi e continueremo da lì.

Dovremmo anche scrivere test per le altre query e incontreremo diversi tipi di test. Nel test precedente ci siamo preoccupati dell’insieme di risultati, ma cosa succede se vogliamo testare una semplice trasformazione 1:1, ad esempio f(riga) = y. Dobbiamo considerare la non determinazione di Spark in termini di ordine delle righe.

Ad esempio, supponiamo che vogliamo ottenere le iniziali dei nomi nel nostro dataset.

Un’opzione sarebbe quella di ordinare il DataFrame e fidarci di questo ordine quando confrontiamo l’uguaglianza con la nostra lista fatta a mano:

# query_pyspark.py

def get_initials_col():    return (        f.concat(            f.substring('first_name', 0, 1),            f.lit('. '),            f.substring('last_name', 0, 1),            f.lit('.'),        )    ).alias('initials')

# test_pyspark.py

    def test_get_initials_col_1_by_1(self):        df = (            get_family_data()            .withColumn('initials', get_initials_col())            .orderBy('date_born')        )        expected_list = ['V. A.', 'W. W.', 'X. M.', 'Y. T.', 'Z. C.', 'I. M.', 'J. T.', 'K. C.', 'L. A.', 'M. W.',                         'N. M.', 'O. T.', 'P. C.', 'Q. A.', 'A. A.', 'B. W.', 'C. M.', 'E. T.', 'F. C.', 'G. A.',                         'H. W.', 'R. W.', 'S. M.', 'T. T.', 'U. C.']        for expected, actual in zip(expected_list, [_.initials for _ in df.toLocalIterator()]):            self.assertEqual(expected, actual)

Un’altra opzione sarebbe quella di scrivere una funzione nativa che svolga lo stesso lavoro e testarla bene. Quindi possiamo applicarla agli input dopo aver caricato i risultati in memoria e scrivere un test per verificare l’uguaglianza su ogni riga. Ecco un esempio:

# query_pyspark.py

def get_initials(first_name, last_name):    return f'{first_name[:1]}. {last_name[:1]}.'

# test_pyspark.py

    def test_get_initials(self):        self.assertEqual('B. H.', get_initials('Bob', 'Hope'))        self.assertEqual('C. C.', get_initials('Charlie', 'Chaplin'))        self.assertEqual('J. L.', get_initials('Jonathan', 'Livingstone'))    def test_get_initials_col_support_function(self):        df = (            get_family_data()            .withColumn('initials', get_initials_col())        )        for row in df.toLocalIterator():            self.assertEqual(get_initials(row.first_name, row.last_name), row.initials)

Delle due opzioni preferirei sicuramente la seconda, poiché è più flessibile in quanto non si basa direttamente sui dati, ma tramite la funzione di supporto – che viene testata senza alcun accoppiamento con il dataset.

Ovviamente, se la funzione non è troppo pesante per la tua query, potresti preferire usarla come UDF, mantenendo bassa la complessità del codice.

Aspetta, c’è di più?

Certo. Ci sono molti casi diversi, come i risultati di join e di funzioni di finestra, ma confido che gli esempi sopra siano sufficienti per far capire che il testing è uno strumento importante e una scelta valida di metodologia quando si scrivono query, anche per Data Scientist come me.

Nota, ho scelto di mostrare come potrebbe funzionare mentre si lavora con PySpark poiché è uno strumento comune per i big data, tuttavia questo pattern non è limitato solo a PySpark né solo ai database di grandi dimensioni. Infatti, dovrebbe funzionare con qualsiasi database. Puoi anche utilizzare questa metodologia con strumenti simili a database in memoria, come pandas.

Finché puoi:

  1. Connetterti alla fonte dei dati.
  2. Caricare/mockare i dati nella fonte dei dati.
  3. Eseguire la query.
  4. Recuperare e elaborare il risultato della query.

Sei a posto. Se lo strumento che stai utilizzando non ti consente di eseguire una di queste operazioni, potresti voler ripensare all’utilizzo di questo strumento.

E sai una cosa, ci sono anche dei benefici nascosti nel testare il tuo codice. Supponiamo che tu scopra che una delle tue funzioni non sta performando bene in termini di tempo di esecuzione o consumo di memoria, e decida di provare ad ottimizzarla o rifattorizzarla. Ora puoi utilizzare i tuoi test esistenti come garanzia che il tuo nuovo codice produca lo stesso output del precedente. Senza questi test, personalmente avrei paura di cambiare anche una singola riga di codice, terrorizzato dal fatto di rompere qualche dipendenza a valle.

Sommario

Il testing è un modo potente ed importante per assicurare la correttezza del codice e rendere più facile gestire eventuali refactoring.

In futuri post, darò altri esempi di ciò che trovo essere buone pratiche nel campo della Data Science. Affronteremo argomenti come come lavorare insieme sullo stesso modello senza intralciarci reciprocamente, come gestire le versioni dei dataset, come osservare le performance del nostro codice in ambienti di produzione e molto altro ancora.

Resta sintonizzato.

FAQ

D: Aspetta, cosa?

R: Sentiti libero di avviare una conversazione qui o altrove sulle nozioni presentate in questa serie di blog.

D: Cosa succede se nella mia query ho bisogno di migliaia di righe per testare?

R: Scrivi una versione parametrica della query, ad esempio def get_n_smalles_children(family_df, n): … e rendi il parametro sufficientemente piccolo. Un’altra opzione è simulare i dati in modo programmato, tuttavia ciò presenta anche nuove domande e sfide a loro volta.

D: Cosa succede se modifico continuamente la query? Significa che devo modificare anche i test?

R: Idealmente non dovresti modificare la query nel tempo, ma sono consapevole della natura esplorativa del nostro campo. Quindi la risposta è sì. Questo è uno dei motivi per cui potresti sentire che la tua velocità è ridotta quando scrivi test. Tuttavia, la velocità è contrapposta all’accuratezza/correttezza. Puoi ricorrere a scrivere i test successivamente nel processo, quando la struttura della query è più solidificata.

Q: Come eseguo i test se non uso PyCharm?

A: Aggiungi le linee magiche di seguito alla fine del file dei test e eseguilo con python test_pyspark.py. Non dimenticare di assicurarti che la directory radice del codice sia inclusa in PYTHONPATH affinché gli import funzionino (PyCharm lo fa automaticamente).

if __name__ == '__main__':    unittest.main()

Q: Cosa succede se non voglio (o non posso) tenere i miei dati in un file .csv?

A: Qualsiasi metodo di archiviazione e caricamento dei dati che funzioni per te va bene, cerca solo di mantenerli ordinati. Per dataset molto piccoli ho usato dict-to-DataFrame (o se preferisci json-to-DataFrame), per dataset più grandi ho utilizzato tabelle memorizzate permanentemente su Hadoop.

Q: Non sono le funzioni di esempio che hai dato sopra, tipo, super semplici?

A: Beh, sì. Questo è il mio approccio all’insegnamento: dare esempi semplici e renderli gradualmente più complessi. Purtroppo i margini di questo post sono troppo piccoli per contenere la parte successiva.

Q: Hai il codice sopra in qualche repository in modo che possa usarlo come riferimento?

A: Sì, sì lo ho.