In [28]:
import xarray as xr
import s3fs
import zarr
import numpy as np
import dask.array as da
import xarray as xr
from dask.utils import parse_bytes
import math
import pandas as pd
import dask
import os

# Simple MinIO access with s3fs

In [29]:
#!aws configure set aws_access_key_id xxx
#!aws configure set aws_secret_access_key yyy

In [30]:
access_key = !aws configure get aws_access_key_id
access_key = access_key[0]
secret_key = !aws configure get aws_secret_access_key
secret_key = secret_key[0]

In [31]:
client_kwargs={'endpoint_url': 'https://pangeo-eosc-minioapi.vm.fedcloud.eu/'}

#s3 = s3fs.S3FileSystem(anon=False, client_kwargs=client_kwargs) # Works only when using s3 in this Notebook, not with distributed.
s3 = s3fs.S3FileSystem(key=access_key, secret=secret_key, client_kwargs=client_kwargs)

In [32]:
s3_prefix = os.environ['JUPYTERHUB_USER']
# s3_suffix below is what you need to customise to create your own buckets:
s3_suffix = 'test'
s3_bucket = s3_prefix + '-' + s3_suffix + '/'
s3.ls(s3_bucket)

['slunavalero-test/myfile',
 'slunavalero-test/xarray-demo-dask-s3',
 'slunavalero-test/xarray-demo-dask-s3-distributed']

In [33]:
s3.touch(s3_bucket + 'myfile')

{'ResponseMetadata': {'RequestId': '1748E551D9273E59',
  'HostId': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Fri, 03 Mar 2023 11:30:00 GMT',
   'content-length': '0',
   'connection': 'keep-alive',
   'accept-ranges': 'bytes',
   'content-security-policy': 'block-all-mixed-content',
   'etag': '"d41d8cd98f00b204e9800998ecf8427e"',
   'strict-transport-security': 'max-age=15724800; includeSubDomains',
   'vary': 'Accept-Encoding',
   'x-amz-id-2': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
   'x-amz-request-id': '1748E551D9273E59',
   'x-content-type-options': 'nosniff',
   'x-xss-protection': '1; mode=block'},
  'RetryAttempts': 0},
 'ETag': '"d41d8cd98f00b204e9800998ecf8427e"'}

In [34]:
s3.ls(s3_bucket)

['slunavalero-test/myfile',
 'slunavalero-test/xarray-demo-dask-s3',
 'slunavalero-test/xarray-demo-dask-s3-distributed']

# Try to write some Zarr

In [35]:
def timeseries(
    chunk_per_worker=5,
    chunk_size="128 MB",
    num_nodes=12,
    worker_per_node=4,
    chunking_scheme=None,
    lat=320,
    lon=384,
    start="1980-01-01",
    freq="1H",
    nan=False,
):
    """ Create synthetic Xarray dataset filled with random
    data.
    Parameters
    ----------
    chunk_per_worker : int
          number of chunk placed per worker.
          see docs.dask.org, best practices, for chunk.
          Best chunk size is around 100M but, each worker can
          have many chunk, which automate the parallelism in dask.
    chunk_size : str
          chunk size in bytes, kilo, mega or any factor of bytes
    num_nodes : int
           number of compute nodes
    worker_per_node: int
           number of dask workers per node
    chunking_scheme : str
           Whether to chunk across time dimension ('temporal') or
           horizontal dimensions (lat, lon) ('spatial').
           If None, automatically determine chunk sizes along all dimensions.
    lat : int
         number of latitude values
    lon : int
         number of longitude values
    start : datetime (or datetime-like string)
        Start of time series
    freq : string
        String like '2s' or '1H' or '12W' for the time series frequency
    nan : bool
         Whether to include nan in generated data
    Examples
    ---------
    >>> from benchmarks.datasets import timeseries
    >>> ds = timeseries('128MB', 5, chunking_scheme='spatial', lat=500, lon=600)
    >>> ds
    <xarray.Dataset>
    Dimensions:  (lat: 500, lon: 600, time: 267)
    Coordinates:
    * time     (time) datetime64[ns] 1980-01-01 1980-01-02 ... 1980-09-23
    * lon      (lon) float64 -180.0 -179.4 -178.8 -178.2 ... 178.8 179.4 180.0
    * lat      (lat) float64 -90.0 -89.64 -89.28 -88.92 ... 88.92 89.28 89.64 90.0
    Data variables:
        sst      (time, lon, lat) float64 dask.array<shape=(267, 600, 500), .....
    Attributes:
        history:  created for compute benchmarking
    """

    dt = np.dtype("f8")
    itemsize = dt.itemsize
    chunk_size = parse_bytes(chunk_size)
    total_bytes = chunk_size * num_nodes * worker_per_node * chunk_per_worker
    # total_bytes = chunk_size * num_nodes * worker_per_node
    size = total_bytes / itemsize
    timesteps = math.ceil(size / (lat * lon))
    shape = (timesteps, lon, lat)
    if chunking_scheme == "temporal":
        x = math.ceil(chunk_size / (lon * lat * itemsize))
        chunks = (x, lon, lat)
    elif chunking_scheme == "spatial":
        x = math.ceil(math.sqrt(chunk_size / (timesteps * itemsize)))
        chunks = (timesteps, x, x)
    else:
        chunks = "auto"

    lats = xr.DataArray(np.linspace(start=-90, stop=90, num=lat), dims=["lat"])
    lons = xr.DataArray(np.linspace(start=-180, stop=180, num=lon), dims=["lon"])
    times = xr.DataArray(pd.date_range(start=start, freq=freq, periods=timesteps), dims=["time"])
    if chunks == "auto":
        with dask.config.set({"array.chunk-size": chunk_size}):
            random_data = randn(shape=shape, chunks=chunks, nan=nan)
    else:
        random_data = randn(shape=shape, chunks=chunks, nan=nan)
    ds = xr.DataArray(
        random_data,
        dims=["time", "lon", "lat"],
        coords={"time": times, "lon": lons, "lat": lats},
        name="sst",
        attrs={"units": "baz units", "description": "a description"},
    ).to_dataset()
    ds.attrs = {"history": "created for compute benchmarking"}

    return ds


def randn(shape, chunks=None, nan=False, seed=0):
    rng = da.random.RandomState(seed)
    x = 5 + 3 * rng.standard_normal(shape, chunks=chunks)
    if nan:
        x = da.where(x < 0, np.nan, x)
    return x

In [36]:
ds = timeseries(chunk_size='16 MiB', chunking_scheme='temporal', chunk_per_worker=1, num_nodes=1)
ds

Unnamed: 0,Array,Chunk
Bytes,64.69 MiB,16.88 MiB
Shape,"(69, 384, 320)","(18, 384, 320)"
Count,3 Graph Layers,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 64.69 MiB 16.88 MiB Shape (69, 384, 320) (18, 384, 320) Count 3 Graph Layers 4 Chunks Type float64 numpy.ndarray",320  384  69,

Unnamed: 0,Array,Chunk
Bytes,64.69 MiB,16.88 MiB
Shape,"(69, 384, 320)","(18, 384, 320)"
Count,3 Graph Layers,4 Chunks
Type,float64,numpy.ndarray


In [37]:
store_s3 = s3fs.S3Map(root=s3_bucket + 'xarray-demo-dask-s3',
                   s3=s3,
                   check=False)
store_s3.clear()

In [38]:
%%time
ds.to_zarr(store=store_s3, mode='w', consolidated=True)

CPU times: user 217 ms, sys: 32.2 ms, total: 249 ms
Wall time: 1.29 s


<xarray.backends.zarr.ZarrStore at 0x7f6b902154a0>

# Try some computation with dask-gateway

In [39]:
from dask_gateway import Gateway
gateway = Gateway()

In [40]:
cluster = gateway.new_cluster(worker_memory=4)
cluster.scale(6)
cluster



AttributeError: 'VBox' object has no attribute '_ipython_display_'

In [41]:
from dask.distributed import Client
client = Client(cluster)
client.wait_for_workers(4)
client

0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: /services/dask-gateway/clusters/c-scale-pangeo-dask.c58c80c2374847edac55dc1054895ab8/status,


In [None]:
import dask.array as da

sample = 10_000_000_000  # <- this is huge!
xxyy = da.random.uniform(-1, 1, size=(2, sample))
norm = da.linalg.norm(xxyy, axis=0)
summ = da.sum(norm <= 1)
insiders = summ.compute()
pi = 4 * insiders / sample
print("pi ~= {}".format(pi))

# Try to write with distributed

In [None]:
store_s3 = s3fs.S3Map(root=s3_bucket + 'xarray-demo-dask-s3-distributed',
                   s3=s3,
                   check=False)
store_s3.clear()

In [None]:
%%time
ds.to_zarr(store=store_s3, mode='w', consolidated=True)