How to exploit data on Pangeo#

Snow and Cloud Cover Example#

The executable content of this notebook is adapted from EO-College/cubes-and-clouds

Authors & Contributors#

Authors#

  • Michele Claus, Eurac Research (Italy), clausmichele (original author)

  • Pier Lorenzo Marasco, Provare LTD (UK), @pl-marasco (author of the conversion)

Contributors#

  • Alejandro Coca-Castro, The Alan Turing Institute, acocac (author)

  • Anne Fouilloux, Simula Research Laboratory (Norway), @annefou

  • Justus Magin, UMR-LOPS CNRS(France), @keewis

  • Tina Odaka, UMR-LOPS Ifremer (France), @tinaok

Introduction#

In this exercise, we will build a complete the same EO workflow as OpenEO using cloud provided data (STAC Catalogue), processing it locally; from data access to obtaining the result.

We are going to follow these steps in our analysis:

  • Load satellite collections

  • Specify the spatial, temporal extents and the features we are interested in

  • Process the satellite data to retrieve snow cover information

  • Aggregate information in data cubes

  • Visualize and analyse the results

Relevant resources#

Import libraries#

# Data Manipulation and Analysis Libraries
import pandas as pd  
import numpy as np 

# Geospatial Data Handling Libraries
import geopandas as gpd 
from shapely.geometry import mapping  
import pyproj

# Multidimensional and Satellite Data Libraries
import xarray as xr 
import rioxarray as rio
import stackstac

# Data Visualization Libraries
import holoviews as hv
import hvplot.xarray
import hvplot.pandas

# Data parallelization and distributed computing libraries
import dask
from dask.distributed import Client, progress, LocalCluster

# STAC Catalogue Libraries
import pystac_client

Connect to the Dask cluster#

Dask cluster can be deployed on HPC clusters, cloud computing services, or on a local machine. More on how to deploy over different platforms can be found here: https://docs.dask.org/en/stable/deploying.html Here we creates a Dask client, which is essential for managing and executing parallel computations efficiently in the subsequent parts of the notebook.

WARNING ! If you are going to use dask_gateway (on pangeo-EOSC), please activate next cell. If you are using your local PC, or you do not use dask_gateway, leave it as it is.
# Connect to the Dask Gateway
from dask_gateway import Gateway
gateway = Gateway()

#List activated cluster

clusters = gateway.list_clusters()
print(clusters)

#Clean clusters running on your name before starting a new one
for cluster in clusters:
    cluster = gateway.connect(cluster.name)
    cluster.shutdown()

#Create a new cluster and scale it to four workers
cluster = gateway.new_cluster()
cluster.scale(4)
cluster
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[2], line 2
      1 # Connect to the Dask Gateway
----> 2 from dask_gateway import Gateway
      3 gateway = Gateway()
      5 #List activated cluster

ModuleNotFoundError: No module named 'dask_gateway'
WARNING ! Please deactivate cell below if you use dask_gateway (on pangeo-EOSC). If you are using your local PC, or you do not use dask_gateway, leave it as it is.
cluster = None

Get a client from the Dask Cluster#

As stated above,creating a Dask Client is mandatory in order to perform following Daks computations on your Dask Cluster.

from distributed import Client

if cluster:
    client = Client(cluster)  # create a dask Gateway cluster
else:
    cluster = LocalCluster()
    client = Client(cluster)  # create a local dask cluster on the machine.
client

Let’s setup the Dask Dashboard with your new cluster, as explained in the former section (Parallel computing with Dask)

Reminder:

  • If you use Dask-Gateway: just click on the link to open the dashboard into another tab. Then copy and paste the link of web site appearing to the dask lab - extension

Load data#

Catchment outline#

We will use the catchment as our area of interest (AOI) for the analysis. The catchment is defined by a polygon, which we will load from a GeoJSON file. The GeoJSON file contains the geometry of the catchment in the WGS84 coordinate reference system (EPSG:4326) and that has to be defined.

aoi = gpd.read_file('../data/catchment_outline.geojson', crs="EPGS:4326")
aoi_geojson = mapping(aoi.iloc[0].geometry)

Satellite collections#

Search for satellite data using STAC#

We will utilize the STAC API to search for satellite data in this exercise, specifically leveraging the API provided by AWS/Element84. The STAC API operates as a RESTful service, enabling the querying of satellite data with various filters such as spatial range, time period, and other specific metadata. This API is constructed based on the STAC specification, a collaborative, community-driven standard aimed at enhancing the discoverability and usability of satellite data. Numerous data providers, including AWS, Google Earth Engine, and Planet (Copernicus Data Space Ecosystem (CDSE) is coming soon**), among others, have implemented the STAC API, exemplifying its widespread adoption and utility in accessing diverse satellite datasets.

For the purposes of this exercise, we will limit the search to the Sentinel 2 L2A collection, which is a collection of Sentinel 2 data that has been processed to surface reflectance (Top Of Canopy). We will also limit the search to the time period between 1st February 2019 and 10th June 2019 and to the extent of the catchment.

** at the moment of writing the STAC catalog of the CDSE is not yet fully operational.

URL = "https://earth-search.aws.element84.com/v1"
catalog = pystac_client.Client.open(URL)
items = catalog.search(
    intersects=aoi_geojson,
    collections=["sentinel-2-l2a"],
    datetime="2019-02-01/2019-06-10"
    # query={"eo:cloud_cover": {"lt": 60}}, # uncomment to filter by cloud cover
).item_collection()
len(items)

Get bands information#

As the original data provides bands with different names than the original Sentinel 2 bands, we need to get the information about the bands.

# Get bands information
# selected_item = items[1]
# for key, asset in selected_item.assets.items():
#     print(f"{key}: {asset.title}")

Load data#

We will use the stackstac library to load the data. The stackstac library is a library that allows loading data from a STAC API into an xarray dataset. Here we will load the green and swir16 bands (on the original dataset named B03 and B11), which are the bands we will use to calculate the snow cover. We will also load the scl band, which is the scene classification layer, which we will use to mask out clouds. Spatial resolution of 20m is selected for the analysis. The data is loaded in chunks of 2048x2048 pixels.

Stackstac is not the only way to create a xarray dataset from a STAC API. Other libraries can be used, such as xpystac or odc.stac. The choice of the library depends on the use case and specific needs.

stackstac.stack(items)

When the results of the STAC query are compiled into an xarray dataset, the result is a four-dimensional dataset: time, band, x, and y. The ‘band’ dimension comprises the various spectral bands, while ‘x’ and ‘y’ dimensions represent the spatial information. By examining the dataset’s visual representation, we can quickly estimate its total size. Without any filtering, we expect the dataset to be around 5.42 terabytes.

Since we require only certain bands and are focused on the Area of Interest (AOI), we will apply additional filters to the dataset to pare down the data volume to what is strictly necessary.

  • The ‘bounds_latlon’ parameter defines the Area of Interest with four values: the minimum and maximum longitudes and latitudes. We will input the catchment’s boundaries to set our area of interest.

  • The ‘resolution’ parameter determines the dataset’s spatial resolution, requiring a single value. We will select a resolution of 20 meters.

  • The ‘chunksize’ parameter sets the dimensions for data chunking, accepting one value to define chunk size. We will opt for chunks that are 2048 by 2048 pixels. GDAL will handle the data chunking during the loading process as per our specifications.

  • Lastly, the ‘assets’ parameter selects the data bands to be loaded, requiring a list of the band names as strings. We will load the ‘green’ and ‘swir16’ bands for snow cover analysis, along with the ‘scl’ band, the scene classification layer, to filter out clouds

ds = stackstac.stack(items,
                    bounds_latlon=aoi.iloc[0].geometry.bounds,
                    resolution=20,
                    chunksize=2048,
                    assets=['green', 'swir16', 'scl'])

Calculate snow cover#

We will calculate the Normalized Difference Snow Index (NDSI) to calculate the snow cover. The NDSI is calculated as the difference between the green and the swir16 bands divided by the sum of the green and the swir16 bands.

For a matter of clarity we will define the green and the swir16 bands as variables. Other approaches can be used to manage the data, but this is the one we will use in this exercise.

green = ds.sel(band='green')
swir = ds.sel(band='swir16')
scl = ds.sel(band='scl')

Let’s compute the NDSI and mask out the clouds.

ndsi = (green - swir) / (green + swir)
ndsi
Dask Method Differences: `.compute()` vs `.persist()`

Dask provides two primary methods for executing computations: .compute() and .persist(). Below is an overview of each method and their typical use cases.

.compute()#

  • Functionality: Executes the Dask computation and blocks until the result is available. It then collects and returns the final result to the local process.

  • Use Case: Invoke .compute() when you need to bring the computed result into your local memory. It is typically used as the final step in a Dask workflow after all transformations and computations have been defined.

  • Evaluation: Eager - runs immediately and provides results.

.persist()#

  • Functionality: Begins computing the result in the background while immediately returning a new Dask object that represents the ongoing computation.

  • Use Case: Utilize .persist() in a distributed environment when working with large datasets or complex computations that have expensive intermediate steps. This will keep the intermediate results in the cluster’s distributed memory, improving performance for subsequent computations.

  • Evaluation: Lazy - computations are started but the method returns a reference to the future result without waiting for the completion.

Each method plays a crucial role in optimizing and managing the execution of large-scale computations using Dask, particularly when balancing memory usage and computational efficiency in a distributed setting.

ndsi = ndsi.persist()

We will mask out the clouds, which are identified by the values 8 and 9 in the scene classification layer (scl). The scl contains information about the type of land cover. We will mask out the clouds, which are identified by the values 8 and 9 in the scl layer.

More detailed info can be found here: https://sentinel.esa.int/web/sentinel/technical-guides/sentinel-2-msi/level-2a/algorithm-overview

snow = xr.where((ndsi > 0.42) & ~np.isnan(ndsi), 1, ndsi)
snowmap = xr.where((snow <= 0.42) & ~np.isnan(snow), 0, snow)

As the SCL layer contains information about the type of land cover, we will mask out the clouds, which are identified by the values 8 and 9 in the scl layer.

mask = np.logical_not(scl.isin([8, 9, 3])) 
snow_cloud = xr.where(mask, snowmap, 2)

Process snow cover data#

Mask data#

As we are only interested to the snow cover in the catchment, we will mask out the data outside the catchment. To achieve it we need to convert the catchment geometry to the same coordinate reference system as the data. The data is in the UTM32N coordinate reference system (EPSG:32632).

aoi_utm32 = aoi.to_crs(epsg=32632)
geom_utm32 = aoi_utm32.iloc[0]['geometry']

As we are going to use the RioXarray library to mask out the data, we need to add some more information to the data. The RioXarray library is a library that allows to manipulate geospatial data in xarray datasets. Underneath it uses the rasterio library that is a library built on top of GDAL.

We need first to specify the coordinate reference system and the nodata value. Both information can be found in the metadata of the data but we need to reinforce it so that RioXarray can use it.

snow_cloud.rio.write_crs("EPSG:32632", inplace=True)
snow_cloud.rio.set_nodata(np.nan, inplace=True)

Let’s clip the snow_cloud object using the catchment geometry in the UTM32N coordinate reference system.

snowmap_clipped = snow_cloud.rio.clip([geom_utm32])

It’s time to persist the data in memory. We will use the persist method to load the data in memory and keep it there until the end of the analysis.

clipped_date = snowmap_clipped.persist()

Aggregate data#

Data aggregation is a very important step in the analysis. It allows to reduce the amount of data and to make the analysis more efficient. Moreover, as in this case, we are going to aggregate the date to daily values, this will allow use to compute statistic on the data at the basin scale later on.

The groupby method allows to group the data by a specific dimension. We will group the data by the time dimension, aggregating to the date and removing the time information, once the group is obtained we will aggregate the data by taking the maximum value.

clipped_date.time
clipped_date = snowmap_clipped.groupby(snowmap_clipped.time.dt.floor('D')).max(skipna=True)

As the data has been aggregated to daily values, we need to rename the floor method to something more meaningful as date.

clipped_date = clipped_date.rename({'floor': 'date'})
clipped_date = clipped_date.persist()

Visualize data#

We will use the hvplot library to visualize the data. The library allows to visualize data in xarray datasets. It is based on the holoviews library, which is a library that allows to visualize multidimensional data. To visualize the data on a map, we need to specify the coordinate reference system of the data. The data is in the UTM32N coordinate reference system (EPSG:32632). This will allow the library to project the data on a map. More info on the hvplot library can be found here: https://hvplot.holoviz.org/

clipped_date.hvplot.image(
    x='x',
    y='y',
    groupby='date',
    crs=pyproj.CRS.from_epsg(32632),
    cmap='Pastel2',
    clim=(-1, 2),
    frame_width=500,
    frame_height=500,
    title='Snowmap',
    geo=True, tiles='OSM')

Calculate snow cover with apply_ufunc#

Calculate snow cover using Xarray's apply_ufunc
  • The procedure for computing snow cover can also be summed up as following python function.
  • We first verify that Green, swir16 and scr are in the order of 0,1,2 th variable in band variable. Then we simply copy and past all the python codes in a function.
def calculate_ndsi_snow_cloud(data):
    green = data[0]
    swir = data[1]
    scl = data[2]
    ndsi = (green - swir) / (green + swir)
    ndsi_mask = ( ndsi > 0.4 )& ~np.isnan(ndsi)
    snow = np.where(ndsi_mask, 1, ndsi)
    snowmap = np.where((snow <= 0.42) & ~np.isnan(snow), 0, snow)
    mask = ~( (scl == 8) | (scl == 9) | (scl == 3) )
    snow_cloud = np.where(mask, snowmap, 2)
    return snow_cloud
Apply mask then persist the data, then apply_ufunc to perform computation.
  • The masking procedure can also applied before the computation.
  • Xarray's apply_ufunc is passed to each chunk of Xarray.DataArray
  • chunksize for stackstac is specified so that it is only chunked (sliced) in time. All data from band, x and y are in one chunk (slice)
%%time
da = stackstac.stack(items,
                    bounds_latlon=aoi.iloc[0].geometry.bounds,
                    resolution=20,
                    chunksize=(1,-1,-1,-1),
                    assets=['green', 'swir16', 'scl'])
#Mask data
geom_utm32 = aoi.to_crs(epsg=32632).iloc[0]['geometry']
da.rio.write_crs("EPSG:32632", inplace=True)
da.rio.set_nodata(np.nan, inplace=True)
da = da.rio.clip([geom_utm32])

snow_cloud_clipped=xr.apply_ufunc(
    calculate_ndsi_snow_cloud
    ,da
    ,input_core_dims=[["band","y","x"]]
    ,output_core_dims=[["y","x"]]
    ,exclude_dims=set(["band"])
    ,vectorize=True
    ,dask="parallelized"
    ,output_dtypes=[da.dtype]
    ).assign_attrs({'long_name': 'snow_cloud'}).to_dataset(name='snow_cloud')

snow_cloud_clipped
#snow_cloud_clipped_date = snow_cloud_clipped.groupby(snow_cloud_clipped.time.dt.floor('D')).max(skipna=True).rename({'floor': 'date'})
#snow_cloud_clipped_date
Inspect the data dimentions!
  • How did changed from input (da) to output(snow_cloud_clipped)?
  • What is setted as input_core_dims?
  • What is setted as output_core_dims?
  • What is setted as exclude_dims?
  • Did you see 'time' dimension?
  • We will get back to apply_ufunc with next OpenEO example.

Compute statistics#

Our objective is to monitor a specific area over a period of time, ensuring the data quality meets our standards. To achieve this, we determine the proportion of clouds in the watershed at each interval. This cloud coverage data serves to refine our timeline: we exclude any interval where cloud cover exceeds 25%.

Our primary focus is on quantifying the Snow Covered Area (SCA) in the watershed. We tally the number of pixels depicting snow for each interval and calculate the SCA by multiplying the snowy pixels by the pixel’s area. To ascertain the extent of snow coverage, we compare the snowy pixel count to the watershed’s total pixel count, resulting in the percentage of the area that is snow-laden. This percentage is crucial to our analysis.

We need to gather the total pixel counts for the entire watershed, as well as those specific to cloud and snow coverages.

# number of cloud pixels
cloud = xr.where(clipped_date == 2, 1, np.nan).count(dim=['x', 'y']).persist()
# number of all pixels per each single date
aot_total = clipped_date.count(dim=['x', 'y']).persist()
# Cloud fraction per each single date expressed in % 
cloud_fraction = (cloud / aot_total * 100).persist()
# Visualize cloud fraction
cloud_fraction.hvplot.line(title='Cloud cover %', ylabel="&") * hv.HLine(25).opts(
    color='red',
    line_dash='dashed',
    line_width=2.0,
)

We are going to get the same information for the snow cover.

snow = xr.where(clipped_date == 1, 1, np.nan).count(dim=['x', 'y']).persist()
snow_fraction = (snow / aot_total * 100).persist()
# visualize snow fraction
snow_fraction.hvplot.line(title='Snow cover area (%)', ylabel="%")
# mask out cloud fraction > 30% 
masked_cloud_fraction = cloud_fraction < 30
snow_selected = snow_fraction.sel(date=masked_cloud_fraction)
snow_selected.name = 'SCA'
snow_selected.hvplot.line(title="Snow fraction")

Let’s compare the date with the discharge data.

discharge_ds = pd.read_csv('../data/ADO_DSC_ITH1_0025.csv', sep=',', index_col='Time', parse_dates=True)

Let’s refine a little bit the data so that we can compare it with the snow cover data.

start_date = pd.to_datetime("2019/02/01")
end_date = pd.to_datetime("2019/06/30")
# filter discharge data to start and end dates
discharge_ds = discharge_ds.loc[start_date:end_date]
discharge_ds.discharge_m3_s.hvplot(title='Discharge volume', ylabel='Discharge (m$^3$/s)') * snow_selected.hvplot(ylabel='Snow cover area (%)')  

Conclusion#

In this analysis, we have comprehensively examined the features, capabilities, and limitations of two prominent geospatial data processing frameworks: OpenEO and Pangeo. OpenEO offers a unified API that simplifies the process of accessing and processing earth observation data across various backends, allowing users to interact with different data sources seamlessly. Its standardized interface is a strong asset, making it accessible to a wide range of users, from researchers to application developers.

On the other hand, Pangeo excels in facilitating big data geoscience. Its robust ecosystem, built around existing Python libraries like Dask and Xarray, makes it a powerful tool for large-scale data analysis and visualization. Pangeo’s community-driven approach and open-source nature foster collaboration and innovation, promoting a dynamic and adaptable framework.

Each platform has its own set of advantages and constraints. OpenEO simplifies interoperability and enhances accessibility, making it particularly beneficial for users who wish to work with diverse data sources without delving deeply into the complexities of each backend. Pangeo, with its emphasis on leveraging existing Python tools and libraries, is particularly potent for those comfortable with Python and seeking to perform extensive, scalable analyses.

Choosing between OpenEO and Pangeo ultimately depends on the specific requirements and constraints of a project. Considerations such as the user’s familiarity with Python, the necessity for interoperability across various data backends, and the scale of data processing required should guide the decision-making process.