Dask può essere una libreria flessibile per il calcolo parallelo in Python.

Dask è composta da due parti:

Pianificazione dinamica dei compiti ottimizzata per il calcolo. spesso è quasi come Airflow, Luigi, Celery, o Make, ma ottimizzata per carichi di lavoro interattivi di calcolo.

Raccolte di “Big Data” come array paralleli, dataframe e liste che raggiungono interfacce comuni come NumPy, Pandas o iteratori Python in ambienti più grandi della memoria o distribuiti. Queste collezioni parallele vengono eseguite su task scheduler dinamici.

Dask sottolinea le virtù successive:

Familiari: Fornisce array NumPy parallelizzati e Pandas DataFrame di oggetti DataFrame

Flessibile: Fornisce un’interfaccia di programmazione delle attività per un maggior numero di carichi di lavoro personalizzati e l’integrazione con altri progetti.

Nativo: Permette il calcolo distribuito in puro Python con accesso allo stack PyData.

Veloce: Funziona con basso overhead, bassa latenza e minima serializzazione necessaria per algoritmi numerici veloci

Alzate la scala: Funziona in modo resiliente su cluster con migliaia di nuclei

Scala in basso: Banale da allineare e far funzionare su un computer portatile durante un unico processo

Reattivo: Progettato pensando all’informatica interattiva, fornisce un rapido feedback e una rapida diagnostica per assistere gli esseri umani

importare panda come pd importare dask.dataframe come dd

df = pd.read_csv(‘2015-01-01.csv’) df = dd.read_csv(‘2015-*-*.csv’)

df.groupby(df.user_id).valore.medio() df.groupby(df.user_id).valore.medio().calcolo()

Dask Array imita NumPy – documentazione

importare numpy come np importare dask.array come da

f = h5py.File(‘myfile.hdf5’) f = h5py.File(‘myfile.hdf5’)

x = np.array(f[‘/small-data’]) x = da.from_array(f[‘/big-data’],

pezzi=(1000, 1000))

x – x.media(asse=1) x – x.media(asse=1).calcolo()

Dask Bag imita gli iteratori, Toolz e PySpark – documentazione

importare dask.bag come db

b = db.read_text(‘2015-*-*.json.gz’).mappa(json.loads)

b.pluck(‘nome’).frequenze().topk(10, coppia lambda: coppia[1]).compute()

Dask Delayed imita il codice personalizzato per i loop e gli avvolgimenti – documentazione

da importazione tratteggiata in ritardo

L = []

per fn in nomi di file:                  # Usa per i loop per costruire il calcolo

dati = ritardato(carico)(fn) # Ritardo nell’esecuzione della funzione

L.append(ritardato(processo)(dati))  # Costruire connessioni tra le variabili

risultato = ritardato(riepilogo)(L)

result.compute()

L’interfaccia concurrent.futures fornisce la presentazione generale dei compiti personalizzati: – documentazione

da dask.distributed import Client

client = Client(‘scheduler:port’)

futures = []

per fn in nomi di file:

futuro = client.submit(carico, fn)

futures.append(futuro)

riepilogo = client.submit(riepilogo, futures)

summary.result()

Bilance da computer portatili a cluster

Dask è comodo su un portatile. Si installa banalmente con conda o pip ed estende le dimensioni di comodi dataset da “si adatta alla memoria” a “si adatta al disco”.

Dask è in grado di scalare fino ad un cluster di 100s di macchine. è resiliente, elastico, locale di dati e a bassa latenza. Per maggiori informazioni, vedere la documentazione sullo scheduler distribuito.

Questa semplice transizione tra cluster da singola macchina a moderato permette agli utenti di iniziare in modo semplice e di crescere quando necessario.

Algoritmi complessi

Dask rappresenta i calcoli paralleli con i grafici dei compiti. Questi grafici aciclici diretti possono avere una struttura arbitraria, che permette sia agli sviluppatori che agli utenti la libertà di creare algoritmi sofisticati e di gestire situazioni disordinate non facilmente gestibili dal paradigma mappa/filtro/gruppo comune nella maggior parte dei framework di data engineering.

Inizialmente avevamo bisogno di questa complessità per creare algoritmi complessi per array n-dimensionali, ma l’abbiamo trovata ugualmente preziosa quando si tratta di gestire situazioni disordinate in problemi quotidiani.