Dask può essere una libreria flessibile per il calcolo parallelo in Python.
Dask è composta da due parti:
Pianificazione dinamica dei compiti ottimizzata per il calcolo. spesso è quasi come Airflow, Luigi, Celery, o Make, ma ottimizzata per carichi di lavoro interattivi di calcolo.
Raccolte di “Big Data” come array paralleli, dataframe e liste che raggiungono interfacce comuni come NumPy, Pandas o iteratori Python in ambienti più grandi della memoria o distribuiti. Queste collezioni parallele vengono eseguite su task scheduler dinamici.
Dask sottolinea le virtù successive:
Familiari: Fornisce array NumPy parallelizzati e Pandas DataFrame di oggetti DataFrame
Flessibile: Fornisce un’interfaccia di programmazione delle attività per un maggior numero di carichi di lavoro personalizzati e l’integrazione con altri progetti.
Nativo: Permette il calcolo distribuito in puro Python con accesso allo stack PyData.
Veloce: Funziona con basso overhead, bassa latenza e minima serializzazione necessaria per algoritmi numerici veloci
Alzate la scala: Funziona in modo resiliente su cluster con migliaia di nuclei
Scala in basso: Banale da allineare e far funzionare su un computer portatile durante un unico processo
Reattivo: Progettato pensando all’informatica interattiva, fornisce un rapido feedback e una rapida diagnostica per assistere gli esseri umani
importare panda come pd importare dask.dataframe come dd
df = pd.read_csv(‘2015-01-01.csv’) df = dd.read_csv(‘2015-*-*.csv’)
df.groupby(df.user_id).valore.medio() df.groupby(df.user_id).valore.medio().calcolo()
Dask Array imita NumPy – documentazione
importare numpy come np importare dask.array come da
f = h5py.File(‘myfile.hdf5’) f = h5py.File(‘myfile.hdf5’)
x = np.array(f[‘/small-data’]) x = da.from_array(f[‘/big-data’],
pezzi=(1000, 1000))
x – x.media(asse=1) x – x.media(asse=1).calcolo()
Dask Bag imita gli iteratori, Toolz e PySpark – documentazione
importare dask.bag come db
b = db.read_text(‘2015-*-*.json.gz’).mappa(json.loads)
b.pluck(‘nome’).frequenze().topk(10, coppia lambda: coppia[1]).compute()
Dask Delayed imita il codice personalizzato per i loop e gli avvolgimenti – documentazione
da importazione tratteggiata in ritardo
L = []
per fn in nomi di file: # Usa per i loop per costruire il calcolo
dati = ritardato(carico)(fn) # Ritardo nell’esecuzione della funzione
L.append(ritardato(processo)(dati)) # Costruire connessioni tra le variabili
risultato = ritardato(riepilogo)(L)
result.compute()
L’interfaccia concurrent.futures fornisce la presentazione generale dei compiti personalizzati: – documentazione
da dask.distributed import Client
client = Client(‘scheduler:port’)
futures = []
per fn in nomi di file:
futuro = client.submit(carico, fn)
futures.append(futuro)
riepilogo = client.submit(riepilogo, futures)
summary.result()
Bilance da computer portatili a cluster
Dask è comodo su un portatile. Si installa banalmente con conda o pip ed estende le dimensioni di comodi dataset da “si adatta alla memoria” a “si adatta al disco”.
Dask è in grado di scalare fino ad un cluster di 100s di macchine. è resiliente, elastico, locale di dati e a bassa latenza. Per maggiori informazioni, vedere la documentazione sullo scheduler distribuito.
Questa semplice transizione tra cluster da singola macchina a moderato permette agli utenti di iniziare in modo semplice e di crescere quando necessario.
Algoritmi complessi
Dask rappresenta i calcoli paralleli con i grafici dei compiti. Questi grafici aciclici diretti possono avere una struttura arbitraria, che permette sia agli sviluppatori che agli utenti la libertà di creare algoritmi sofisticati e di gestire situazioni disordinate non facilmente gestibili dal paradigma mappa/filtro/gruppo comune nella maggior parte dei framework di data engineering.
Inizialmente avevamo bisogno di questa complessità per creare algoritmi complessi per array n-dimensionali, ma l’abbiamo trovata ugualmente preziosa quando si tratta di gestire situazioni disordinate in problemi quotidiani.