# Parallel computing with Dask

## Authors & Contributors
### Authors
- Tina Odaka, UMR-LOPS Ifremer (France), [@tinaok](https://github.com/tinaok)
- Pier Lorenzo Marasco, Provare LTD (UK), [@pl-marasco](https://github.com/pl-marasco)

### Contributors
- Alejandro Coca-Castro, The Alan Turing Institute, [acocac](https://github.com/acocac)
- Anne Fouilloux, Simula Research Laboratory (Norway), [@annefou](https://github.com/annefou)
- Guillaume Eynard-Bontemps, CNES (France), [@guillaumeeb](https://github.com/guillaumeeb)

<div class="alert alert-info">
<i class="fa-question-circle fa" style="font-size: 22px;color:#666;"></i> <b>Overview</b>
    <br>
    <br>
    <b>Questions</b>
    <ul>
        <li>What is Dask?</li>
        <li>How can I parallelize my data analysis with Dask?</li>
    </ul>
    <b>Objectives</b>
    <ul>
        <li>Learn about Dask</li>
        <li>Learn about Dask Gateway, Dask Client, Scheduler, Workers</li>
        <li>Understand out-of-core and speed-up limitations</li>
    </ul>
</div>

## Context

We will be using [Dask](https://docs.dask.org/) with [Xarray](https://docs.xarray.dev/en/stable/) to parallelize our data analysis.  We continue to use the snow index example.  


## Setup

This episode uses the following Python packages:

- pooch {cite:ps}`e-pooch-Uieda2020`
- s3fs {cite:ps}`e-s3fs-2016`
- xarray {cite:ps}`e-xarray-hoyer2017` with [`netCDF4`](https://pypi.org/project/h5netcdf/) and [`h5netcdf`](https://pypi.org/project/h5netcdf/) engines
- hvplot {cite:ps}`e-holoviews-rudiger2020`
- dask {cite:ps}`e-dask-2016`
- graphviz {cite:ps}`e-graphviz-Ellson2003`
- numpy {cite:ps}`e-numpy-harris2020`
- pandas {cite:ps}`e-pandas-reback2020`
- geopandas {cite:ps}`e-geopandas-jordahl2020`

Please install these packages if not already available in your Python environment (you might want to take a look at [the Setup page of the tutorial](https://pangeo-data.github.io/foss4g-2022/before/setup.html)).

<div class="alert alert-info">
    <i class="fa-check-circle fa" style="font-size: 22px;color:#666;"></i> <b>Note</b>
    <br>
    <ul>
        <li>In this notebook, Python packages are imported when we start to use them. However, for best software practices, we recommend you to install and import all the necessary libraries at the top of your Jupyter notebook.
        </li>
    </ul>
</div>

## Parallelize with Dask

We know from previous chapter [chunking_introduction](./chunking_introduction.ipynb) that chunking is key for analyzing large datasets. In this episode, we will learn to parallelize our data analysis using [Dask](https://docs.dask.org/) on our chunked dataset. 

### What is [Dask](https://docs.dask.org/) ?

**Dask** scales the existing Python ecosystem: with very or no changes in your code, you can speed-up computation using Dask or process bigger than memory datasets.

- Dask is a flexible library for parallel computing in Python.
- It is widely used for handling large and complex Earth Science datasets and speed up science.
- Dask is powerful, scalable and flexible. It is the leading platform today for data analytics at scale.
- It scales natively to clusters, cloud, HPC and bridges prototyping up to production.
- The strength of Dask is that is scales and accelerates the existing Python ecosystem e.g. Numpy, Pandas and Scikit-learn with few effort from end-users.

It is interesting to note that at first, [Dask has been created to handle data that is larger than memory, on a single computer](https://coiled.io/blog/history-dask/). It then was extended with Distributed to compute data in parallel over clusters of computers.

#### How does Dask scale and accelerate your data analysis?

[Dask proposes different abstractions to distribute your computation](https://docs.dask.org/en/stable/10-minutes-to-dask.html). In this _Dask Introduction_ section, we will focus on [Dask Array](https://docs.dask.org/en/stable/array.html) which is widely used in Pangeo ecosystem as a back end of Xarray.

As shown in the [previous section](./chunking_introduction.ipynb) Dask Array is based on chunks.
Chunks of a Dask Array are well-known Numpy arrays. By transforming big datasets to Dask Array, making use of chunk, a large array is handled as many smaller Numpy ones and we can compute each of these chunks independently.

![Dask and Numpy](https://examples.dask.org/_images/dask-array-black-text.svg)

<div class="alert alert-info">
    <i class="fa-check-circle fa" style="font-size: 22px;color:#666;"></i> <b>Note</b>
    <br>
    <ul>
        <li>`Xarray` uses Dask Arrays instead of Numpy when chunking is enabled, and thus all Xarray operations are performed through Dask, which enables distributed processing. </li>
    </ul>
</div>

#### How does Xarray with Dask distribute data analysis?

When we use chunks with `Xarray`, the real computation is only done when needed or asked for, usually when invoking `compute()` or `load()` functions. Dask generates a **task graph** describing the computations to be done. When using [Dask Distributed](https://distributed.dask.org/en/stable/) a **Scheduler** distributes these tasks across several **Workers**.

![Xarray with dask](../figures/dask-xarray-explained.png)

#### What is a Dask Distributed cluster ?

A Dask Distributed cluster is made of two main components:

- a Scheduler, responsible for handling computations graph and distributing tasks to Workers.
- One or several (up to 1000s) Workers, computing individual tasks and storing results and data into distributed memory (RAM and/or worker's local disk).

A user usually needs __Client__ and __Cluster__ objects as shown below to use Dask Distributed.    

![Dask Distributed Cluster](https://user-images.githubusercontent.com/306380/66413985-27111600-e9be-11e9-9995-8f418ff48f8a.png)

#### Where can we deploy a Dask distributed cluster?

[Dask distributed clusters can be deployed on your laptop or on distributed infrastructures (Cloud, HPC centers, Hadoop, etc.).](https://docs.dask.org/en/stable/deploying.html)  Dask distributed `Cluster` object is responsible of deploying and scaling a Dask Cluster on the underlying resources.

![Dask Cluster deployment](https://docs.dask.org/en/stable/_images/dask-cluster-manager.svg)

:::{tip}
A Dask `Cluster` can be created on a single machine (for instance your laptop) e.g. there is no need to have dedicated computational resources. However, speedup will only be limited to your single machine resources if you do not have dedicated computational resources!
:::

### Dask distributed Client
 
The Dask distributed `Client` is what allows you to interact with Dask distributed Clusters. When using Dask distributed, you always need to create a `Client` object. Once a `Client` has been created, it will be used by default by each call to a Dask API, even if you do not explicitly use it.

No matter the Dask API (e.g. Arrays, Dataframes, Delayed, Futures, etc.) that you use, under the hood, Dask will create a Directed Acyclic Graph (DAG) of tasks by analysing the code. Client will be responsible to submit this DAG to the Scheduler along with the final result you want to compute. The Client will also gather results from the Workers, and aggregate it back in its underlying Python process.

Using `Client()` function with no argument, you will create a local Dask cluster with a number of workers and threads per worker corresponding to the number of cores in the 'local' machine. Here, during the workshop, we are running this notebook in Pangeo EOSC cloud deployment, so the 'local' machine is the jupyterlab you are using at the Cloud, and the number of cores is the number of cores on the cloud computing resources you've been given (not on your laptop).

In [None]:
from dask.distributed import Client

client = Client()   # create a local dask cluster on the local machine.
client

Inspecting the `Cluster Info` section above gives us information about the created cluster: we have 2 or 4 workers and the same number of threads (e.g. 1 thread per worker).

<div class="alert alert-warning">
    <i class="fa-check-circle fa" style="font-size: 22px;color:#666;"></i> <b>Go further</b>
    <br>
    <ul>
        <li> You can also create a local cluster with the `LocalCluster` constructor and use `n_workers` 
        and `threads_per_worker` to manually specify the number of processes and threads you want to use. 
        For instance, we could use `n_workers=2` and `threads_per_worker=2`.  </li>
        <li> This is sometimes preferable (in terms of performance), or when you run this tutorial on your PC, 
        you can avoid dask to use all your resources you have on your PC!  </li>
    </ul>
</div>

### Dask Dashboard

Dask comes with a really handy interface: the Dask Dashboard. It is a web interface that you can open in a separated tab of your browser (but not with he link above, you've got to use Jupyterlabs proxy: **/user/todaka/proxy/8787/status**).

We will learn here how to use it through [dask jupyterlab extension](https://github.com/dask/dask-labextension).  

To use Dask Dashboard through jupyterlab extension on Pangeo EOSC infrastructure,
you will just need too look at the html link you have for your jupyterlab, and Dask dashboard port number, as highlighted in the figure below.

![Dask.array](../figures/dashboardlink.png)

![Dask.array](../figures/dasklab.png)

Then click the orange icon indicated in the above figure, and type 'your' dashboard link (normally, you just need to replace 'todaka' to 'your username').  

![Dask.array](../figures/daskclick.png)

You can click several buttons indicated with blue arrows in above figures, then drag and drop to place them as your convenience.  

![Dask.array](../figures/exampledasklab.png)

It's really helpful to understand your computation and how it is distributed.

## Dask Distributed computations on our dataset

Let's open the virtual dataset we've prepared as in previous episode, select a single location over time, visualize the task graph generated by Dask, and observe the Dask Dashboard.

### Read from online kerchunked consolidated dataset

We will access Long Term TimeSeries of NDVI statistics from OpenStack Object Storage using the Zarr metadata generated with kerchunk, prepared in [previous chunking_introduction](./chunking_introduction.ipynb) section using intake

In [None]:
import intake
cat = intake.open_catalog('../data/LTS.yaml')
LTS=cat.LTS.to_dask()

By inspecting any of the variables on the representation above, you'll see that each data array represents __about 85GiB of data__, so much more than the availabe memory on this notebook server, and even on the Dask Cluster we created above. But thanks to chunking, we can still analyze it!

In [None]:
save = LTS.sel(lat=45.50, lon=9.36, method='nearest')['min'].mean()
save

Did you notice something on the Dask Dashboard when running the two previous cells?

We didn't 'compute' anything. We just built a Dask task graph with it's size indicated as count above, but did not ask Dask to return a result.

Here, you can check 'Dask graph' with how many layers of graph you have, to estimate the complexity of your computation.

It is indicated that you have '7 graph'.  this can be optimized with following step

Let's try to plot the dask graph before computation and understand what dask workers will do to compute the value we asked for.

### Optimize the task graph

In [None]:
import dask
(save,) = dask.optimize(save)
save.data

Now our graph is reduced 1. Lets try to visualise it:

In [None]:
save.data.visualize()

### Compute on the dask workers

In [None]:
save.compute()

Calling compute on our Xarray object triggered the execution on Dask Cluster side. 

You should be able to see how Dask is working on Dask Dashboard. 

<div class="alert alert-warning">
    <i class="fa-check-circle fa" style="font-size: 22px;color:#666;"></i> <b>Go Further</b>
    <br>
    <ul>
        <li>You can re-open the LTS with chunks=({"time":-1}) option, and try to visualize the task graph, size of each chunk.  How has it changed?  Do you see the difference of task graph using chunks keyword argument when opening the dataset?  </li>
    </ul>
</div>

### Close client to terminate local dask cluster

The `Client` and associated `LocalCluster` object will be automatically closed when your Python session ends. When using Jupyter notebooks, we recommend to close it explicitely whenever you are done with your local Dask cluster.

In [None]:
client.close()

## Scaling your Computation using Dask Gateway.

For this workshop, according to the Pangeo EOSC deployment, you will learn how to use Dask Gateway to manage Dask clusters over Kubernetes, allowing to run our data analysis in parallel e.g. distribute tasks across several workers.

Lets set up your Dask cluster through Dask Gateway.  
As Dask Gateway is configured by default on this infrastructure, you just need to execute the following cells.

In [None]:
from dask_gateway import Gateway
gateway = Gateway()

In [None]:
#WARNING In case you already created gateway cluster, you will see list of your clusters. 
#And this cell will kill all your orphan clusters.
#Please clean them before you make a new cluster using following command 
clusters = gateway.list_clusters()
print(clusters)

for cluster in clusters:
    cluster = gateway.connect(cluster.name)
    cluster.shutdown()

### Create a new Dask cluster with the Dask Gateway

In [None]:
cluster = gateway.new_cluster()
cluster.scale(4)
cluster

Let's setup the Dask Dashboard with your new cluster.

***This time, just click on the link to open the dashboard into another tab.  Then copy and past the link of web site appearing to the dask lab-extension***

![daskgateway click](../figures/daskgatewayclick.png)

### Get a client from the Dask Gateway Cluster

As stated above, creating a Dask `Client` is mandatory in order to perform following Daks computations on your Dask Cluster.


In [None]:
## Please don't execute this cell, it is needed for building the Jupyter Book
cluster = None

In [None]:
from distributed import Client

if cluster:
    client = Client(cluster) # create a dask Gateway cluster
else:
    client = Client()   # create a local dask cluster on the machine.
client

## Global LTS computation

In the previous episode, we used Long-term Timeseries for the region of Lombardy e.g. a very small area that was extracted upfront for simplicity. Now we will use the original dataset that has a global coverage, and work directly on it to extract our AOI and perform computations.

Let's check our LTS data we have loaded before.

In [None]:
LTS

### Fix time coordinate

As observed data are coming with a predefined year. To let xarray automatically align the LTS with the latest NDVI values, the time dimension needs to be shifted to the NDVI values.

In [None]:
import pandas as pd
import numpy as np

dates_2022 = pd.date_range('20220101', '20221231')
time_list = dates_2022[np.isin(dates_2022.day, [1,11,21])]
LTS = LTS.assign_coords(time=time_list)
LTS

### Clip LTS over Lombardia
As in previous episodes, we use a shapefile over Italy to select data over this Area of Interest (AOI).

In [None]:
import geopandas as gpd
try:
    GAUL = gpd.read_file('../data/Italy.geojson')
except:
    GAUL = gpd.read_file('zip+https://mars.jrc.ec.europa.eu/asap/files/gaul1_asap.zip') 


In [None]:
AOI_name = 'Lombardia'
AOI = GAUL[GAUL.name1 == AOI_name]
AOI_poly = AOI.geometry
AOI_poly

We first select a geographical area that covers Lombardia (so that we have a first reduction from the global coverage) and then clip using the shapefile to avoid useless pixels.

In [None]:
LTS_AOI = LTS.sel(lat=slice(46.5,44.5), lon=slice(8.5,11.5))
LTS_AOI.rio.write_crs(4326, inplace=True)

We apply a mask using rio.clip 

In [None]:
LTS_AOI = LTS_AOI.rio.clip(AOI_poly, crs=4326)
LTS_AOI

<div class="alert alert-warning">
    <i class="fa-check-circle fa" style="font-size: 22px;color:#666;"></i> <b>Lets keep our LTS_Lombardia on the memory of your Dask cluster, distributed on every workers.</b>
    <br>
    <ul>
        <li> To do that we will use `.persist()`. Please look at the dashboard during the computation.
 </li>
    </ul>
</div>


In [None]:
%%time
LTS_AOI.persist()

## Get NDVI for 2022 over Lombardia

We re-use the file we created during the first episode. If the file is missing it will be downloaded from Zenodo.

In [None]:
import pooch
import xarray as xr

try:
    cgls_ds = xr.open_dataset('../data/C_GLS_NDVI_20220101_20220701_Lombardia_S3_2_masked.nc')
except:
    cgls_file = pooch.retrieve(
        url="https://zenodo.org/record/6969999/files/C_GLS_NDVI_20220101_20220701_Lombardia_S3_2_masked.nc",
        known_hash="md5:be3f16913ebbdb4e7af227f971007b22",
        path=f".",)    
    cgls_ds = xr.open_dataset(cgls_file)
cgls_ds

In [None]:
NDVI_AOI = cgls_ds.NDVI.rio.write_crs(4326, inplace=True)

In [None]:
NDVI_AOI = NDVI_AOI.rio.clip(AOI_poly, crs=4326)
NDVI_AOI

The nominal spatial resolution of the Long term statistics is 1km. As the current NDVI product has a nominal spatial resolution of 300m a re projection is needed. RioXarray through RasterIO that wraps the GDAL method can take care of this. More info about all the options can be found [here](https://rasterio.readthedocs.io/en/stable/api/rasterio.warp.html#rasterio.warp.reproject).

In [None]:
NDVI_1k = NDVI_AOI.rio.reproject_match(LTS_AOI)

In [None]:
NDVI_1k = NDVI_1k.rename({'x': 'lon', 'y':'lat'})

In [None]:
VCI = ((NDVI_1k - LTS_AOI['min']) / (LTS_AOI['max'] - LTS_AOI['min'])) * 100
VCI.name = 'VCI'
VCI

In [None]:
%%time 
from hvplot import xarray
VCI.hvplot(x = 'lon', y = 'lat',
           cmap='RdYlGn', clim=(-200,+200), alpha=0.7,
           geo=True, tiles= 'CartoLight',
           title=f'CGLS VCI {AOI_name} {VCI.isel(time=-1).time.dt.date.data}',
           width=400, height=300,
           widget_location='left_top'
           )

<div class="alert alert-warning">
    <i class="fa-check-circle fa" style="font-size: 22px;color:#666;"></i> <b>Exercise</b>
    <br>
    <ul>
        <li> Try moving time slider to see how Dask computes and load data on the fly (observe well the dask dashboard!) </li>          
    </ul>
</div>

### Visualize LTS statistics

Let's try to scale out the visualization of LTS statistic datas.  We will set an arbitaly size to see how dask behaves.

In [None]:
size=0  # You can try later, for example size=10, 50, 100 
LTS_plot=LTS.sel(lat=slice(80,20), lon=slice(-15,30+size))#.min(dim='time')
LTS_plot

In [None]:
import holoviews as hv
import hvplot.xarray    
plots = [LTS_plot[z].hvplot.image(x = 'lon', y = 'lat',
           cmap='RdYlGn', clim=(0.0,0.9)
           , alpha=0.7,rasterize=True,
           geo=True, tiles= 'CartoLight',
           width=400, height=300)  for z in ['min','max']]
hv.Layout(plots).cols(1).opts(title='LTS NDVI statistics (Minimum and Maximum)')

<div class="alert alert-warning">
    <i class="fa-check-circle fa" style="font-size: 22px;color:#666;"></i> <b>Go Further</b>
    <br>
    <ul>
        <li>Compare the data size and 'used' data size for each worker in dask dashboard  </li>   
        <li>Lets try to zoom.  What happened with your plot? How was the dask dashboard reacted with zooming?   </li>
        <li>What is rastersize=True ? (Hint: https://hvplot.holoviz.org/user_guide/Customization.html#datashading-options)   </li>  
        <li>Let's try to scale out using 'cluster.scale(6)' and use size=10 (or 50, 100...) </li>
    </ul>
</div>


In [None]:
client.close()

In [None]:
cluster.shutdown()

## Packages citation

```{bibliography}
:style: alpha
:filter: topic % "dask" and topic % "package"
:keyprefix: e-
```