Dask może być elastyczną biblioteką do obliczeń równoległych w Pythonie.

Dask składa się z dwóch części:

Dynamiczne planowanie zadań zoptymalizowane pod kątem obliczeń. często jest to prawie jak Airflow, Luigi, Selery lub Make, ale zoptymalizowane pod kątem interaktywnych zadań obliczeniowych.

Zbiory “Big Data“, takie jak równoległe tablice, ramki danych i listy, które docierają do wspólnych interfejsów, takich jak NumPy, Pandas lub iteratory Pythona do środowisk większych niż pamięć lub rozproszonych. Te równoległe kolekcje działają w oparciu o dynamiczne schedulery zadań.

Dask kładzie nacisk na kolejne cnoty:

Znane: Dostarcza równoległe obiekty macierzy NumPy i Pandas DataFrame.

Elastyczne: Zapewnia interfejs do planowania zadań dla bardziej niestandardowych obciążeń i integracji z innymi projektami.

Native: Umożliwia obliczenia rozproszone w czystym Pythonie z dostępem do stosu PyData.

Szybko: Pracuje z niskimi kosztami ogólnymi, małymi opóźnieniami i minimalną serializacją niezbędną dla szybkich algorytmów numerycznych.

Skala w górę: Działa elastycznie na klastrach z 1000s rdzeni

Waga w dół: Trywialne ustawianie się w linii i uruchamianie na laptopie podczas jednego procesu

Odpowiadający: Zaprojektowany z myślą o interaktywnych obliczeniach, zapewnia szybkie sprzężenie zwrotne i diagnostykę, aby pomóc ludziom

import pandas jako pd import dask.dataframe jako dd

df = pd.read_csv(‘2015-01-01.csv’) df = dd.read_csv(‘2015-*-*.csv’)

df.groupby(df.user_id).value.mean() df.groupby(df.user_id).value.mean().compute()

Macierze zadań naśladujące NumPy – dokumentacja

import numpy jako np. import dask.array jako da

f = h5py.File(‘myfile.hdf5’) f = h5py.File(‘myfile.hdf5’)

x = np.array(f[‘/małe dane’]) x = da.from_array(f[‘/big-data’],

chunks=(1000, 1000))

x – x.mean(axis=1) x – x.mean(axis=1).compute()

Torebki naśladujące iteratory, Toolz i PySpark – dokumentacja

import dask.bag as db

b = db.read_text(‘2015-*-*.json.gz’).map(json.loads)

b.pluck(‘name’).frequencies().topk(10, lambda pair: pair[1]).compute()

Dask Delayed mimics dla pętli i owijania niestandardowym kodem – dokumentacja

z opóźnionym przywozem z daszkiem

L = []

dla fn in filenames:                  # Użyj do pętli, aby zbudować obliczenia

data = delayed(load)(fn) # Opóźnienie wykonania funkcji

L.append(opóźniony(proces)(dane))  # Buduj połączenia między zmiennymi

wynik = opóźniony (podsumowanie)(L)

result.compute()

Interfejs concurrent.futures umożliwia ogólne składanie niestandardowych zadań: – dokumentacji

z dask.distributed import Klient

client = Client(‘scheduler:port’)

futures = []

dla fn in filenames:

future = client.submit(load, fn)

futures.append(future)

streszczenie = client.submit(streszczenie, futures)

streszczenie.wynik()

Wagi od laptopów do klastrów

Dask jest wygodny na laptopie. Instaluje się go trywialnie z conda lub pipem i rozszerza wymiary wygodnych zestawów danych z “mieści się w pamięci” do “mieści się na dysku”.

Dask może przeskalować się do klastra 100 maszyn. jest sprężysty, elastyczny, dane lokalne i o małym opóźnieniu. Więcej informacji na ten temat można znaleźć w dokumentacji dotyczącej rozproszonego schedulera.

To proste przejście między pojedynczą maszyną a umiarkowanym klastrem umożliwia użytkownikom zarówno proste uruchamianie, jak i rozbudowę w razie potrzeby.

Złożone Algorytmy

Zadanie reprezentuje równoległe obliczenia z wykresami zadań. Te ukierunkowane wykresy acykliczne mogą mieć dowolną strukturę, co pozwala zarówno programistom, jak i użytkownikom na swobodne tworzenie wyrafinowanych algorytmów i radzenie sobie z nieporządanymi sytuacjami, które nie są łatwo zarządzane przez paradygmat mapy/filtra/grupy powszechny w większości frameworków inżynierii danych.

Pierwotnie potrzebowaliśmy tej złożoności do stworzenia złożonych algorytmów dla tablic n-wymiarowych, ale uznaliśmy, że jest ona równie cenna przy rozwiązywaniu nieporządanych sytuacji w codziennych problemach.