Overview

Intake-esm is a data cataloging utility built on top of intake, pandas, and xarray. Intake-esm aims to facilitate:

  • the discovery of earth’s climate and weather datasets.

  • the ingestion of these datasets into xarray dataset containers.

It’s basic usage is shown below. To begin, let’s import intake:

import intake

Loading a catalog

At import time, intake-esm plugin is available in intake’s registry as esm_datastore and can be accessed with intake.open_esm_datastore() function. For demonstration purposes, we are going to use the catalog for Community Earth System Model Large ensemble (CESM LENS) dataset publicly available in Amazon S3.

Note

You can learn more about CESM LENS dataset in AWS S3 here

You can load data from an ESM Catalog by providing the URL to valid ESM Catalog:

catalog_url = "https://ncar-cesm-lens.s3-us-west-2.amazonaws.com/catalogs/aws-cesm1-le.json"
col = intake.open_esm_datastore(catalog_url)
col

aws-cesm1-le catalog with 56 dataset(s) from 435 asset(s):

unique
variable 77
long_name 74
component 5
experiment 4
frequency 6
vertical_levels 3
spatial_domain 5
units 25
start_time 12
end_time 13
path 420

The summary above tells us that this catalog contains over 400 data assets. We can get more information on the individual data assets contained in the catalog by calling the underlying dataframe created when it is initialized:

col.df.head()
variable long_name component experiment frequency vertical_levels spatial_domain units start_time end_time path
0 FLNS net longwave flux at surface atm 20C daily 1.0 global W/m2 1920-01-01 12:00:00 2005-12-31 12:00:00 s3://ncar-cesm-lens/atm/daily/cesmLE-20C-FLNS....
1 FLNSC clearsky net longwave flux at surface atm 20C daily 1.0 global W/m2 1920-01-01 12:00:00 2005-12-31 12:00:00 s3://ncar-cesm-lens/atm/daily/cesmLE-20C-FLNSC...
2 FLUT upwelling longwave flux at top of model atm 20C daily 1.0 global W/m2 1920-01-01 12:00:00 2005-12-31 12:00:00 s3://ncar-cesm-lens/atm/daily/cesmLE-20C-FLUT....
3 FSNS net solar flux at surface atm 20C daily 1.0 global W/m2 1920-01-01 12:00:00 2005-12-31 12:00:00 s3://ncar-cesm-lens/atm/daily/cesmLE-20C-FSNS....
4 FSNSC clearsky net solar flux at surface atm 20C daily 1.0 global W/m2 1920-01-01 12:00:00 2005-12-31 12:00:00 s3://ncar-cesm-lens/atm/daily/cesmLE-20C-FSNSC...

Finding unique entries for individual columns

To get unique values for given columns in the catalog, intake-esm provides a unique() method. This method returns a dictionary containing count, and unique values:

col.unique(columns=["component", "frequency", "experiment"])
{'component': {'count': 5,
  'values': ['ice_nh', 'ocn', 'lnd', 'ice_sh', 'atm']},
 'frequency': {'count': 6,
  'values': ['daily',
   'hourly6-1990-2005',
   'monthly',
   'hourly6-2071-2080',
   'static',
   'hourly6-2026-2035']},
 'experiment': {'count': 4, 'values': ['HIST', 'RCP85', '20C', 'CTRL']}}

Loading datasets

Intake-esm implements convenience utilities for loading the query results into higher level xarray datasets. The logic for merging/concatenating the query results into higher level xarray datasets is provided in the input JSON file and is available under .aggregation_info property:

col.aggregation_info
AggregationInfo(groupby_attrs=['component', 'experiment', 'frequency'], variable_column_name='variable', aggregations=[{'type': 'union', 'attribute_name': 'variable', 'options': {'compat': 'override'}}], agg_columns=['variable'], aggregation_dict={'variable': {'type': 'union', 'options': {'compat': 'override'}}})
col.aggregation_info.aggregations
[{'type': 'union',
  'attribute_name': 'variable',
  'options': {'compat': 'override'}}]
# Dataframe columns used to determine groups of compatible datasets.
col.aggregation_info.groupby_attrs  # or col.groupby_attrs
['component', 'experiment', 'frequency']
# List of columns used to merge/concatenate compatible multiple Dataset into a single Dataset.
col.aggregation_info.agg_columns  # or col.agg_columns
['variable']

To load data assets into xarray datasets, we need to use the to_dataset_dict() method. This method returns a dictionary of aggregate xarray datasets as the name hints.

dset_dicts = col_subset.to_dataset_dict(zarr_kwargs={"consolidated": True})
--> The keys in the returned dictionary of datasets are constructed as follows:
	'component.experiment.frequency'
---------------------------------------------------------------------------
NoCredentialsError                        Traceback (most recent call last)
~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/intake_esm-0.0.0-py3.8.egg/intake_esm/merge_util.py in _open_asset(path, data_format, zarr_kwargs, cdf_kwargs, preprocess, varname, requested_variables)
    269         try:
--> 270             ds = xr.open_zarr(path, **zarr_kwargs)
    271         except Exception as exc:

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/xarray/backends/zarr.py in open_zarr(store, group, synchronizer, chunks, decode_cf, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, consolidated, overwrite_encoded_chunks, chunk_store, storage_options, decode_timedelta, use_cftime, **kwargs)
    769 
--> 770     ds = open_dataset(
    771         filename_or_obj=store,

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, backend_kwargs, *args, **kwargs)
    496     overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 497     backend_ds = backend.open_dataset(
    498         filename_or_obj,

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/xarray/backends/zarr.py in open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, stacklevel, lock)
    825         filename_or_obj = _normalize_path(filename_or_obj)
--> 826         store = ZarrStore.open_group(
    827             filename_or_obj,

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/xarray/backends/zarr.py in open_group(cls, store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, append_dim, write_region, safe_chunks, stacklevel)
    388             # TODO: an option to pass the metadata_key keyword
--> 389             zarr_group = zarr.open_consolidated(store, **open_kwargs)
    390         else:

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/zarr/convenience.py in open_consolidated(store, metadata_key, mode, **kwargs)
   1177     # setup metadata store
-> 1178     meta_store = ConsolidatedMetadataStore(store, metadata_key=metadata_key)
   1179 

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/zarr/storage.py in __init__(self, store, metadata_key)
   2768         # retrieve consolidated metadata
-> 2769         meta = json_loads(store[metadata_key])
   2770 

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/fsspec/mapping.py in __getitem__(self, key, default)
    132         try:
--> 133             result = self.fs.cat(k)
    134         except self.missing_exceptions:

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
     87         self = obj or args[0]
---> 88         return sync(self.loop, func, *args, **kwargs)
     89 

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/fsspec/asyn.py in sync(loop, func, timeout, *args, **kwargs)
     68     if isinstance(result[0], BaseException):
---> 69         raise result[0]
     70     return result[0]

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/fsspec/asyn.py in _runner(event, coro, result, timeout)
     24     try:
---> 25         result[0] = await coro
     26     except Exception as ex:

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/fsspec/asyn.py in _cat(self, path, recursive, on_error, **kwargs)
    343             if ex:
--> 344                 raise ex
    345         if (

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/s3fs/core.py in _cat_file(self, path, version_id, start, end)
    850             head = {}
--> 851         resp = await self._call_s3(
    852             "get_object",

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/s3fs/core.py in _call_s3(self, method, *akwarglist, **kwargs)
    264                 err = e
--> 265         raise translate_boto_error(err)
    266 

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/s3fs/core.py in _call_s3(self, method, *akwarglist, **kwargs)
    245             try:
--> 246                 out = await method(**additional_kwargs)
    247                 return out

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/aiobotocore/client.py in _make_api_call(self, operation_name, api_params)
    140         else:
--> 141             http, parsed_response = await self._make_request(
    142                 operation_model, request_dict, request_context)

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/aiobotocore/client.py in _make_request(self, operation_model, request_dict, request_context)
    160         try:
--> 161             return await self._endpoint.make_request(operation_model, request_dict)
    162         except Exception as e:

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/aiobotocore/endpoint.py in _send_request(self, request_dict, operation_model)
     86         attempts = 1
---> 87         request = await self.create_request(request_dict, operation_model)
     88         context = request_dict['context']

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/aiobotocore/endpoint.py in create_request(self, params, operation_model)
     79                 op_name=operation_model.name)
---> 80             await self._event_emitter.emit(event_name, request=request,
     81                                            operation_name=operation_model.name)

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/aiobotocore/hooks.py in _emit(self, event_name, kwargs, stop_on_response)
     26             if asyncio.iscoroutinefunction(handler):
---> 27                 response = await handler(**kwargs)
     28             else:

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/aiobotocore/signers.py in handler(self, operation_name, request, **kwargs)
     15         # Don't call this method directly.
---> 16         return await self.sign(operation_name, request)
     17 

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/aiobotocore/signers.py in sign(self, operation_name, request, region_name, signing_type, expires_in, signing_name)
     62 
---> 63             auth.add_auth(request)
     64 

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/botocore/auth.py in add_auth(self, request)
    372         if self.credentials is None:
--> 373             raise NoCredentialsError()
    374         datetime_now = datetime.datetime.utcnow()

NoCredentialsError: Unable to locate credentials

The above exception was the direct cause of the following exception:

OSError                                   Traceback (most recent call last)
/tmp/ipykernel_2336/728946501.py in <module>
----> 1 dset_dicts = col_subset.to_dataset_dict(zarr_kwargs={"consolidated": True})

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/intake_esm-0.0.0-py3.8.egg/intake_esm/core.py in to_dataset_dict(self, zarr_kwargs, cdf_kwargs, preprocess, storage_options, progressbar, aggregate)
    920             ]
    921             for i, task in enumerate(concurrent.futures.as_completed(future_tasks)):
--> 922                 key, ds = task.result()
    923                 self._datasets[key] = ds
    924                 if self.progressbar:

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    435                     raise CancelledError()
    436                 elif self._state == FINISHED:
--> 437                     return self.__get_result()
    438 
    439                 self._condition.wait(timeout)

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    387         if self._exception:
    388             try:
--> 389                 raise self._exception
    390             finally:
    391                 # Break a reference cycle with the exception in self._exception

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/intake_esm-0.0.0-py3.8.egg/intake_esm/core.py in _load_source(key, source)
    906 
    907         def _load_source(key, source):
--> 908             return key, source.to_dask()
    909 
    910         sources = {key: source(**source_kwargs) for key, source in self.items()}

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/intake_esm-0.0.0-py3.8.egg/intake_esm/source.py in to_dask(self)
    243     def to_dask(self):
    244         """Return xarray object (which will have chunks)"""
--> 245         self._load_metadata()
    246         return self._ds
    247 

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/intake/source/base.py in _load_metadata(self)
    234         """load metadata only if needed"""
    235         if self._schema is None:
--> 236             self._schema = self._get_schema()
    237             self.dtype = self._schema.dtype
    238             self.shape = self._schema.shape

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/intake_esm-0.0.0-py3.8.egg/intake_esm/source.py in _get_schema(self)
    172 
    173         if self._ds is None:
--> 174             self._open_dataset()
    175 
    176             metadata = {

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/intake_esm-0.0.0-py3.8.egg/intake_esm/source.py in _open_dataset(self)
    224             for _, row in self.df.iterrows()
    225         ]
--> 226         datasets = dask.compute(*datasets)
    227         mapper_dict = dict(datasets)
    228         nd = create_nested_dict(self.df, self.path_column, self.aggregation_columns)

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     77             pool = MultiprocessingPoolExecutor(pool)
     78 
---> 79     results = get_async(
     80         pool.submit,
     81         pool._max_workers,

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    512                             _execute_task(task, data)  # Re-execute locally
    513                         else:
--> 514                             raise_exception(exc, tb)
    515                     res, worker_id = loads(res_info)
    516                     state["cache"][key] = res

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/dask/local.py in reraise(exc, tb)
    323     if exc.__traceback__ is not tb:
    324         raise exc.with_traceback(tb)
--> 325     raise exc
    326 
    327 

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    221     try:
    222         task, data = loads(task_info)
--> 223         result = _execute_task(task, data)
    224         id = get_id()
    225         result = dumps((result, id))

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/intake_esm-0.0.0-py3.8.egg/intake_esm/source.py in read_dataset(path, data_format, storage_options, cdf_kwargs, zarr_kwargs, preprocess, varname)
    201             # replace path column with mapper (dependent on filesystem type)
    202             mapper = _path_to_mapper(path, storage_options, data_format)
--> 203             ds = _open_asset(
    204                 mapper,
    205                 data_format=data_format,

~/checkouts/readthedocs.org/user_builds/intake-esm/conda/v2021.8.17/lib/python3.8/site-packages/intake_esm-0.0.0-py3.8.egg/intake_esm/merge_util.py in _open_asset(path, data_format, zarr_kwargs, cdf_kwargs, preprocess, varname, requested_variables)
    286             """
    287 
--> 288             raise IOError(message) from exc
    289 
    290     else:

OSError: 
            Failed to open zarr store.

            *** Arguments passed to xarray.open_zarr() ***:

            - store: <fsspec.mapping.FSMap object at 0x7f179d86cb80>
            - kwargs: {'consolidated': True}

            *** fsspec options used ***:

            - root: ncar-cesm-lens/lnd/monthly/cesmLE-HIST-SOILWATER_10CM.zarr
            - protocol: ('s3', 's3a')

            ********************************************
            
[key for key in dset_dicts.keys()]

We can access a particular dataset as follows:

ds = dset_dicts["lnd.20C.monthly"]
print(ds)

Let’s create a quick plot for a slice of the data:

ds.SNOW.isel(time=0, member_id=range(1, 24, 4)).plot(col="member_id", col_wrap=3, robust=True)
import intake_esm  # just to display version information

intake_esm.show_versions()