Quasi tutto quello che vuoi sapere sulle dimensioni delle partizioni dei Dataframe di Dask

Tutto ciò che devi sapere sulle dimensioni delle partizioni dei Dataframe di Dask

E come utilizzarlo in modo efficiente nel modello XGBoost

Immagine di anteprima (dell'autore)

Recentemente, io e i miei colleghi abbiamo lavorato su un grande servizio ad alta intensità utilizzando il modello di machine learning Xgboost e Dask come strumento per l’elaborazione distribuita dei dati e la generazione di previsioni. Qui vorrei condividere le nostre scoperte su come abbiamo potenziato l’uso di Dask per la preparazione dei dati e l’adattamento del modello di ML.

Cos’è Dask?

Dask è una libreria per l’elaborazione distribuita di grandi quantità di dati. Il concetto di base è quello di dividere grandi array in piccole parti (partizioni).

Ecco come vengono memorizzati e elaborati i Dask Dataframe: le tabelle possono essere divise in piccoli data frame (simili ai DataFrames di pandas) in modo da evitare di memorizzare l’intera tabella in RAM. L’intera tabella sorgente potrebbe essere troppo grande per essere caricata in memoria, ma le singole partizioni possono esserlo. Inoltre, questa modalità di archiviazione dei dati consente un’efficiente utilizzo di più core di processore per parallelizzare i calcoli.

Allo stesso tempo, le dimensioni di queste partizioni (chunks) sono determinate dallo sviluppatore. Pertanto, lo stesso dataframe può essere diviso in diverse partizioni utilizzando ad esempio ‘Split 1’ o ‘Split 2’ (Figura 1).

Figura 1. Divisione del Dask dataframe in partizioni (immagine dell'autore)

La scelta della dimensione delle partizioni ottimale è cruciale, perché se non è ottimale, l’elaborazione dei dati può rallentare. La dimensione delle partizioni ottimale dipende dalla dimensione complessiva del dataset, così come dalle risorse del server (o del laptop) – il numero di CPU e la RAM disponibile.

Disclaimer: In seguito, per comodità, misureremo la dimensione del dataset in base al numero di righe. Tutte le tabelle saranno composte da 4 colonne (3 feature + 1 target). Nell’implementazione dell’algoritmo nel sistema, abbiamo basato tutte le dipendenze non sul numero di righe nelle tabelle, ma sul numero totale di elementi (righe x colonne).

Il problema

Dask può essere utilizzato per calcolare statistiche semplici e aggregazioni, ma può anche essere utilizzato per addestrare grandi modelli di machine learning (utilizzando molti dati). Ad esempio, XGBoost. Poiché il servizio che stavamo sviluppando potrebbe richiedere di addestrare un modello su 2-10 milioni di record utilizzando solo 8-16 GB di RAM (nel caso di macchine virtuali piccole), abbiamo deciso di condurre degli esperimenti.

Anche nel calcolo di statistiche semplici, la dimensione delle partizioni è molto importante perché può rallentare significativamente l’algoritmo di calcolo in due casi:

  • Le partizioni sono troppo grandi e quindi richiedono troppo tempo e risorse per elaborarle in RAM
  • Le partizioni sono troppo piccole e quindi Dask deve caricare queste tabelle in RAM troppo spesso – si impiega più tempo nella sincronizzazione e nel caricamento/scaricamento che nei calcoli stessi

Quindi, utilizzando le stesse risorse di calcolo, la scelta di una dimensione non ottimale delle partizioni può degradare significativamente le prestazioni del programma (Figura 2). La Figura 2 mostra il tempo impiegato per adattare il modello XGBoost sui dataframe Dask con diverse dimensioni di partizioni. È mostrato il tempo medio di esecuzione su 5 run.

Figura 2. Influenza della dimensione delle partizioni sulla velocità di adattamento del modello XGBoost. Il dataframe originale per questi esperimenti contiene 500.000 righe e 4 colonne (immagine dell'autore)

In questo post, verrà discusso l’algoritmo per la ricerca delle dimensioni ottimali delle partizioni dei Dask Dataframes. Tutte le tabelle mostrate in questo post vengono utilizzate per addestrare il modello di machine learning Dask Xgboost. Condivideremo anche alcuni consigli che potrebbero risultare utili.

Suggerimenti sulla documentazione

Nella documentazione ufficiale di Dask ci sono pagine web con suggerimenti su come gestire correttamente gli oggetti Dask (dataframes, array, ecc.), come ad esempio Dask DataFrames Best Practices.

In particolare, su questa pagina si possono trovare i seguenti consigli:

Dovresti puntare a partizioni che abbiano circa 100MB di dati ciascuna.

Tuttavia, questo consiglio è approssimativo e non tiene conto delle specifiche di calcolo del server, delle dimensioni del dataset di origine e delle specificità della risoluzione del problema.

Setup dell’esperimento

Come già accennato, si presume che la dimensione ottimale della partizione dipenda dalle seguenti tre condizioni:

  • Dimensione del dataset completo;
  • Risorse CPU (numero di processi) che XGBoost e Dask possono utilizzare;
  • Memoria di accesso casuale (RAM) disponibile.

Pertanto, durante gli esperimenti, è stata variata sia il numero di risorse di calcolo che la dimensione del dataset di origine. Sono state prese in considerazione le seguenti situazioni:

  • Dimensione della partizione, migliaia di righe: [5, 10, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000] (13 casi)
  • Dimensione del dataset completo, migliaia di righe: [100, 200, 300, 400, 500, 600, 1000, 2000, 3000, 4000] (10 casi)
  • Risorse CPU (workers): [2, 3, 4] (3 casi)
  • Memoria di accesso casuale (RAM) per worker: [1GB, 2GB, 4GB] (3 casi)

Nota: Worker in Dask è un processo in un computer (su un server) che utilizza le risorse di calcolo ad esso allocate ed esegue in isolamento e in parallelo rispetto ad altri workers.

Pertanto, sono stati analizzati 1170 (13 x 10 x 3 x 3) casi. Per ottenere stime più robuste dei tempi di esecuzione, ogni caso è stato lanciato 5 volte. Le metriche (tempi di esecuzione) sono state poi mediati.

Nell’ambito della ricerca, volevamo scoprire i limiti delle dimensioni del dataset a cui la macchina virtuale non sarebbe stata in grado di gestire il carico per scalare il servizio in modo più efficace.

Risultati

Iniziamo con le visualizzazioni generali ottenute dagli esperimenti. Abbiamo eseguito diverse esecuzioni con diversi numeri di core CPU e quantità di RAM, nonché variando la dimensione del dataset originale e la dimensione delle partizioni. Dopo aver completato gli esperimenti, abbiamo preparato una tabella che mostra solo le soluzioni ottimali (dimensioni delle partizioni). Le dimensioni delle partizioni ottimali sono quelle in cui il tempo di esecuzione, date le condizioni (RAM, CPU e dimensione del dataset di origine), era minimo. Le matrici di correlazione delle metriche raccolte sono mostrate nella Figura 3.

Figura 3. Matrici di correlazione dei risultati sperimentali (immagine dell'autore)

Dal grafico si può notare che l’influenza maggiore sul tempo di esecuzione è ovviamente la dimensione del dataset di origine. Il numero di workers e la quantità di RAM hanno anche un effetto significativo sul tempo di adattamento. La dimensione del chunk ha un effetto relativamente debole. Tuttavia, ciò potrebbe essere dovuto al fatto che la dipendenza tra il tempo di esecuzione e la dimensione della partizione è non lineare, come confermato dalla curva della Figura 2. Inoltre, la Figura 4 conferma che le misurazioni sono state effettuate correttamente, poiché i risultati sono coerenti con le nostre aspettative.

Diamo uno sguardo all’animazione con i grafici 3D (Animazione 1).

Animazione 1. Grafico dei diversi casi di test, dove ogni fotogramma è di dimensioni fisse del dataset di origine. Vengono mostrate le superfici ottimali per ogni caso (dall'autore)

Nell’animazione, i casi ottimali (per ogni combinazione di numero di processi e RAM per worker) sono contrassegnati in rosso. Ciò significa che vengono mostrate le condizioni in cui il tempo di esecuzione dell’algoritmo è stato minimo per una determinata dimensione del dataset, numero di core, RAM e dimensione della partizione. I grafici mostrano anche le superfici ottimali a tratti costanti in grigio (NB: la superficie è globale per tutti i casi).

Dall’animazione si può notare che in alcuni fotogrammi non ci sono dati sperimentali (nessun punto) (Figura 4). Ciò significa che le risorse computazionali proposte non sono state sufficienti per eseguire il modello.

Figura 4. Risultati di modellizzazione per un dataset di 4 milioni di righe. Non ci sono risultati per una RAM inferiore a 4 (immagine dell'autore)

Dalla figura si può osservare che con questa dimensione del dataset, se il numero di core è piccolo, allora dovrebbero essere formate partizioni più grandi. Si noti che questa dipendenza non vale per tutti i casi.

In base ai risultati delle esecuzioni con risorse computazionali insufficienti, è stata preparata la seguente visualizzazione (Figura 5).

Figura 5. Limite del volume dei dati, oltre il quale non è possibile avviare la modellizzazione. Il numero di oggetti è calcolato come il numero di righe nella tabella moltiplicato per il numero di colonne (immagine dell'autore)

Inoltre, in base alle statistiche raccolte sulle esecuzioni fallite, si è giunti a una conclusione (consiglio): se la quantità di memoria è limitata, è più affidabile utilizzare una piccola dimensione di partizione.

Discussione

In base a questa ricerca, sono stati formulati alcuni consigli per una configurazione più efficace del sistema basato sul modello Dask XGBoost. Si tenga presente che questo studio è stato condotto al fine di eseguire Dask in modo più efficiente su server relativamente piccoli (che non dispongono di centinaia di gigabyte di RAM e decine di CPU).

L’esperimento ha rivelato gli iperpiani ottimali. Sono modellati utilizzando processi gaussiani. Sulla base di questo algoritmo, le dimensioni ottimali delle partizioni vengono selezionate automaticamente (Animazione 2).

Animazione 2. Superficie ottimale per diverse condizioni (CPU, RAM e dimensioni del dataset di origine). Ogni fotogramma mostra le condizioni per una particolare dimensione del dataset di origine (dall'autore)

Come si può notare dall’animazione, in media, la dimensione ottimale delle partizioni diminuisce all’aumentare del numero di righe nel dataset di origine.

Conclusione (e consigli)

Spero che abbiate trovato interessante leggere su quale dimensione di partizione si è rivelata ottimale per addestrare il modello XGBoost.

Realizzo che questo articolo è diventato molto “tecnico”. Pertanto, per coloro che sono riusciti a leggerlo fino alla fine, darò alcuni consigli:

  • Se stai misurando il tempo di esecuzione, esegui sempre i calcoli più volte e calcola la media dei risultati poiché i tempi di esecuzione sono stocastici;
  • Se hai dubbi su quale dimensione delle partizioni scegliere, è meglio commettere un errore in modo meno grave (altrimenti l’algoritmo non solo impiegherà molto tempo, ma potrebbe anche bloccarsi con un errore);
  • Per inizializzare un cluster localmente in Dask, si utilizzano i comandi cluster = LocalCluster() e Client(cluster) qui. Consigliamo vivamente di inizializzare un cluster del genere solo una volta nel codice utilizzando il pattern Singleton. Puoi vedere come può essere implementato in Python qui. Altrimenti, inizializzerai un nuovo cluster ad ogni avvio;
  • In media, la dimensione ottimale delle partizioni diminuisce all’aumentare del numero di righe nel dataset sorgente.

La storia su Dask e l’apprendimento automatico è stata presentata da Mikhail Sarafanov e il team di Wiredhut