Dask может быть гибкой библиотекой для параллельных вычислений на Python.

Dask состоит из двух частей:

Динамическое планирование задач, оптимизированное для вычислений. Часто это почти как Airflow, Luigi, Celery или Make, но оптимизированное для интерактивных вычислительных нагрузок.

Коллекции “больших данных“, такие как параллельные массивы, кадры данных и списки, которые достигают общих интерфейсов, таких как NumPy, Pandas или итераторы Python, для более крупных, чем память, или распределенных сред. Эти параллельные коллекции работают поверх динамических планировщиков задач.

Dask подчеркивает последующие достоинства:

Знакомые: Предоставляет параллельные массивы NumPy и объекты Pandas DataFrame.

Гибкий: Предоставляет интерфейс планирования задач для большего количества пользовательских рабочих нагрузок и интеграции с другими проектами.

Родной: Позволяет выполнять распределенные вычисления на чистом Python с доступом к стеку PyData.

Быстро: Работает с низкими накладными расходами, низкой задержкой и минимальной сериализацией, необходимой для быстрых численных алгоритмов.

Увеличить масштаб: Устойчиво работает на кластерах с 1000 ядрами.

Весы снизились: Тривиально для выстраивания в линию и запуска на ноутбуке во время одного процесса.

Отзывчивый: Разработанный с учетом интерактивных вычислений, он обеспечивает быструю обратную связь и диагностику, чтобы помочь человеку

импорт панд как pd импорт dask.dataframe как dd

df = pd.read_csv(‘2015-01-01.csv’) df = dd.read_csv(‘2015-*-*.csv’)

df.groupby(df.user_id).value.mean() df.groupby(df.user_id).value.mean().compute()

Массивная мимика NumPy – документация

импорт numpy как np импорт dask.array как da

f = h5py.File(‘myfile.hdf5’) f = h5py.File(‘myfile.hdf5’)

x = np.array(f[‘/small-data’]) x = da.from_array(f[‘/big-data’],

куски=(1000, 1000))

x – x.mean(axis=1) x – x.mean(axis=1).compute()

Dask Bag имитирует итераторы, Toolz и PySpark – документация

импортный пакет dask.bag как db

b = db.read_text(‘2015-*-*.json.gz’).map(json.load)

b.pluck(‘name’).frequencies().topk(10, lambda pair: pair[1]).compute()

Задержка выполнения задач мимика для циклов и обёртки пользовательского кода – документация

от задержек с импортом в виде масок

L = []

для fn в названиях фильмов:                  # Используется для циклов для построения вычислений.

data = delay(load)(fn) # Задержка выполнения функции

L.append(задержка(процесс)(данные))  # Построить связи между переменными

результат = задержка(суммирование)(L)

result.compute()

Интерфейс concurrent.futures обеспечивает общую подачу пользовательских задач: – документация

из dask.distribution import Client

клиент = клиент(‘планировщик:порт’)

фьючерсы = []

для fn в названиях фильмов:

future = client.submit(load, fn)

futures.append(future)

summary = client.submit(summarize, futures)

summary.result()

Весы от ноутбуков до кластеров

Маска удобна на ноутбуке. Устанавливается тривиально с помощью конда или пипса и расширяет размеры удобных наборов данных от “помещается в память” до “помещается на диск”.

Dask может масштабироваться до кластера из 100 машин. он упруг, эластичен, локален для данных и имеет низкую задержку. Подробнее см. документацию о распределенном планировщике.

Этот простой переход от одномашинного к умеренному кластеру позволяет пользователям как начинать простое, так и расти, когда это необходимо.

Сложные алгоритмы

Dask представляет параллельные вычисления с графами задач. Эти направленные ациклические графы могут иметь произвольную структуру, что позволяет как разработчикам, так и пользователям свободно создавать сложные алгоритмы и обрабатывать грязные ситуации, нелегко управляемые парадигмой “карта/фильтр/группа”, распространенной в большинстве фреймворков инженерии данных.

Изначально эта сложность была нам нужна для создания сложных алгоритмов для n-мерных массивов, но мы обнаружили, что она одинаково ценна при работе с грязными ситуациями в повседневных задачах.