Skip to content

OceanDataStore CLI

Mandatory Arguments

Long version Short Version Description
action Specify the action: send_to_zarr, send_to_icechunk, update_zarr, update_icechunk or list.
--filepaths -f Paths to the files to send or update.
--credentials -c Path to the JSON file containing the credentials for the object store.
--bucket -b Bucket name.

Optional Arguments

Flag Short Version Description
--prefix -p Object prefix (default=None).
--append-dim -ad Append dimension (default=time_counter).
--variables -v Variables to send (default=None). Default None will send all variables.
--chunk-strategy -cs Chunk strategy as a JSON string (default=None). E.g., '{\"time_counter\": 1, \"x\": 100, \"y\": 100}'
--dask-configuration -dc Path to the JSON file defining the Dask Local Cluster configuration (default=None).
--grid-filepath -gf File path to model grid file containing domain information (default=None).
--update-coords -uc Coordinate dimensions to update as a JSON string (default=None). E.g., '{\"nav_lon\": \"glamt\", \"nav_lat\": \"gphit\"}'
--attributes -at Attributes to add to the dataset as a JSON string. E.g., '{\"title\": \"my_dataset\"}'
--zarr-version -zv Zarr version used to create the zarr store (default=3). Options are 2 (v2) or 3 (v3).
--branch -br Branch of Icechunk repository to commit changes to (default=main).
--commit_message -cm Commit message to be recorded when committing changes to Icechunk repository. (default="Add new data to my Icechunk repository").
--variable-commits -vc Flag to send variables to Icechunk repository using independent commits.
--icechunk-configuration -ic Path to the JSON file defining the Icechunk storage and repository configurations (default=None).

OceanDataStore Python API

OceanDataStore.object_store_handler.send_to_zarr

send_to_zarr(file, bucket, object_prefix, store_credentials_json, variables=None, append_dim='time_counter', grid_filepath=None, update_coords=None, rechunk=None, attrs=None, dask_config_kwargs=None, dask_cluster_kwargs=None, zarr_version=3)

Write data to new Zarr store in cloud object storage with option of using dask.

Parameters:

Name Type Description Default
file list[str] | str | Dataset

Regular expression or list of filepaths to netCDF file(s). Users can also pass a single xarray.Dataset directly.

required
bucket str

Name of the bucket in the object store. Bucket names can contain only lowercase letters, numbers, dots (.), and hyphens (-).

required
object_prefix str

Prefix to be added to the object names in the object store.

required
store_credentials_json str

Path to the JSON file containing the object store credentials.

required
variables Optional[list[str]]

List of variables to send. If None, all variables will be sent.

None
append_dim str

Name of the append dimension, by default "time_counter".

'time_counter'
grid_filepath Optional[str]

Path to file containing model grid parameter.

None
update_coords Optional[dict]

Dictionary of coordinate variables to update.

None
rechunk Optional[dict]

Rechunk strategy dictionary, by default None.

None
attrs Optional[dict]

Attributes to add to the dataset.

None
dask_config_kwargs Optional[dict]

Dask configuration settings passed to dask.config.set().

None
dask_cluster_kwargs Optional[dict]

Dask cluster configuration settings passed to LocalCluster().

None
zarr_version int

Zarr version to use.

3
Source code in OceanDataStore/object_store_handler.py
def send_to_zarr(
    file: list[str] | str | xr.Dataset,
    bucket: str,
    object_prefix: str,
    store_credentials_json: str,
    variables: Optional[list[str]] = None,
    append_dim: str = "time_counter",
    grid_filepath: Optional[str] = None,
    update_coords: Optional[dict] = None,
    rechunk: Optional[dict] = None,
    attrs: Optional[dict] = None,
    dask_config_kwargs: Optional[dict] = None,
    dask_cluster_kwargs: Optional[dict] = None,
    zarr_version: int = 3
    ) -> None:
    """
    Write data to new Zarr store in cloud object storage with
    option of using dask.

    Parameters
    ----------
    file: list | str | xarray.Dataset
        Regular expression or list of filepaths to netCDF file(s).
        Users can also pass a single xarray.Dataset directly.
    bucket: str
        Name of the bucket in the object store. Bucket names can contain only
        lowercase letters, numbers, dots (.), and hyphens (-).
    object_prefix: str
        Prefix to be added to the object names in the object store.
    store_credentials_json: str
        Path to the JSON file containing the object store credentials.
    variables: list[str], optional
        List of variables to send. If None, all variables will be sent.
    append_dim: str, default="time_counter"
        Name of the append dimension, by default "time_counter".
    grid_filepath: str, optional
        Path to file containing model grid parameter.
    update_coords: dict, optional
        Dictionary of coordinate variables to update.
    rechunk: dict, optional
        Rechunk strategy dictionary, by default None.
    attrs: dict, optional
        Attributes to add to the dataset.
    dask_config_kwargs: Dict[str,str], optional
        Dask configuration settings passed to dask.config.set().
    dask_cluster_kwargs: dict, optional
        Dask cluster configuration settings passed to LocalCluster().
    zarr_version: int, default=3
        Zarr version to use.
    """
    if dask_cluster_kwargs is not None:
        # === Send to Zarr store with Dask === #
        if dask_config_kwargs is not None:
            dask.config.set(dask_config_kwargs)
            logging.info("Updated dask configuration settings.")

        # Create local dask cluster & client:
        with LocalCluster(**dask_cluster_kwargs) as cluster, Client(cluster) as client:
            logging.info(f"Created LocalCluster with {dask_cluster_kwargs["n_workers"]} workers @ Client: {client.dashboard_link}")

            # Catch UserWarnings when rechunking data:
            client.register_worker_plugin(CaptureWarningsPlugin())

            _send_to_zarr(file=file,
                          bucket=bucket,
                          object_prefix=object_prefix,
                          store_credentials_json=store_credentials_json,
                          client=client,
                          variables=variables,
                          append_dim=append_dim,
                          grid_filepath=grid_filepath,
                          update_coords=update_coords,
                          rechunk=rechunk,
                          attrs=attrs,
                          parallel=True,
                          zarr_version=zarr_version
                          )

            # --- Shutdown Store & Dask Cluster --- #
            client.shutdown()
            client.close()
            logging.info("Dask Cluster has been shutdown.")

    else:
        # === Send to Zarr store without Dask === #
        _send_to_zarr(file=file,
                        bucket=bucket,
                        object_prefix=object_prefix,
                        store_credentials_json=store_credentials_json,
                        client=None,
                        variables=variables,
                        append_dim=append_dim,
                        grid_filepath=grid_filepath,
                        update_coords=update_coords,
                        rechunk=rechunk,
                        attrs=attrs,
                        parallel=False,
                        zarr_version=zarr_version
                        )

OceanDataStore.object_store_handler.update_zarr

update_zarr(file, bucket, object_prefix, store_credentials_json, variables=None, append_dim='time_counter', grid_filepath=None, update_coords=None, rechunk=None, attrs=None, dask_config_kwargs=None, dask_cluster_kwargs=None, zarr_version=3)

Update data in existing Zarr store in cloud object storage with option of using dask.

Parameters:

Name Type Description Default
file list[str] | str | Dataset

Regular expression or list of filepaths to netCDF file(s). Users can also pass a single xarray.Dataset directly.

required
bucket str

Name of the bucket in the object store. Bucket names can contain only lowercase letters, numbers, dots (.), and hyphens (-).

required
object_prefix str

Prefix to be added to the object names in the object store.

required
store_credentials_json str

Path to the JSON file containing the object store credentials.

required
variables Optional[list[str]]

List of variables to send to Zarr stores. If None, all variables will be sent.

None
append_dim str

Name of the dimension to append multifile datasets.

'time_counter'
grid_filepath Optional[str]

Path to file containing model grid parameter.

None
update_coords Optional[dict]

Dictionary of coordinate variables to update.

None
rechunk Optional[dict]

Rechunk strategy dictionary.

None
attrs Optional[dict]

Attributes to add to the dataset.

None
dask_config_kwargs Optional[dict]

Dask configuration settings passed to dask.config.set().

None
dask_cluster_kwargs Optional[dict]

Dask cluster configuration settings passed to LocalCluster().

None
zarr_version int

zarr version to use.

3
Source code in OceanDataStore/object_store_handler.py
def update_zarr(
    file: list[str] | str | xr.Dataset,
    bucket: str,
    object_prefix: str,
    store_credentials_json: str,
    variables: Optional[list[str]] = None,
    append_dim: str = "time_counter",
    grid_filepath: Optional[str] = None,
    update_coords: Optional[dict] = None,
    rechunk: Optional[dict] = None,
    attrs: Optional[dict] = None,
    dask_config_kwargs: Optional[dict] = None,
    dask_cluster_kwargs: Optional[dict] = None,
    zarr_version: int = 3
    ) -> None:
    """
    Update data in existing Zarr store in cloud object
    storage with option of using dask.

    Parameters
    ----------
    file: list | str | xarray.Dataset
        Regular expression or list of filepaths to netCDF file(s).
        Users can also pass a single xarray.Dataset directly.
    bucket: str
        Name of the bucket in the object store. Bucket names can contain only
        lowercase letters, numbers, dots (.), and hyphens (-).
    object_prefix: str
        Prefix to be added to the object names in the object store.
    store_credentials_json: str
        Path to the JSON file containing the object store credentials.
    variables: list, optional
        List of variables to send to Zarr stores.
        If None, all variables will be sent.
    append_dim: str, default='time_counter'
        Name of the dimension to append multifile datasets.
    grid_filepath: str, optional
        Path to file containing model grid parameter.
    update_coords: dict, optional
        Dictionary of coordinate variables to update.
    rechunk: dict, optional
        Rechunk strategy dictionary.
    attrs: dict, optional
        Attributes to add to the dataset.
    dask_config_kwargs: Dict[str,str], optional
        Dask configuration settings passed to dask.config.set().
    dask_cluster_kwargs: dict, optional
        Dask cluster configuration settings passed to LocalCluster().
    zarr_version: int, default=3
        zarr version to use.
    """
    if dask_cluster_kwargs is not None:
        # === Update Zarr store with Dask === #
        if dask_config_kwargs is not None:
            dask.config.set(dask_config_kwargs)
            logging.info("Updated dask configuration settings.")

        # Create local dask cluster & client:
        with LocalCluster(**dask_cluster_kwargs) as cluster, Client(cluster) as client:
            logging.info(f"Created LocalCluster with {dask_cluster_kwargs["n_workers"]} workers @ Client: {client.dashboard_link}")

            # Catch UserWarnings when rechunking data:
            client.register_worker_plugin(CaptureWarningsPlugin())

            _update_zarr(file=file,
                         bucket=bucket,
                         object_prefix=object_prefix,
                         store_credentials_json=store_credentials_json,
                         client=client,
                         variables=variables,
                         append_dim=append_dim,
                         grid_filepath=grid_filepath,
                         update_coords=update_coords,
                         rechunk=rechunk,
                         attrs=attrs,
                         parallel=True,
                         zarr_version=zarr_version
                         )

            # --- Shutdown Store & Dask Cluster --- #
            client.shutdown()
            client.close()
            logging.info("Dask Cluster has been shutdown.")

    else:
        # === Update Zarr store without Dask === #
        _update_zarr(file=file,
                     bucket=bucket,
                     object_prefix=object_prefix,
                     store_credentials_json=store_credentials_json,
                     client=None,
                     variables=variables,
                     append_dim=append_dim,
                     grid_filepath=grid_filepath,
                     update_coords=update_coords,
                     rechunk=rechunk,
                     attrs=attrs,
                     parallel=False,
                     zarr_version=zarr_version
                     )

OceanDataStore.object_store_handler.send_to_icechunk

send_to_icechunk(file, bucket, object_prefix, store_credentials_json, variables=None, append_dim='time_counter', grid_filepath=None, update_coords=None, rechunk=None, attrs=None, branch='main', commit_message='Add new data to my Icechunk repository', variable_commits=False, dask_config_kwargs=None, dask_cluster_kwargs=None, icechunk_config=None)

Write data to new Icechunk repository in cloud object storage with option of using dask.

Parameters:

Name Type Description Default
file list[str] | str | Dataset

Regular expression or list of filepaths to netCDF file(s). Users can also pass a single xarray.Dataset directly.

required
bucket str

Name of the bucket in the object store. Bucket names can contain only lowercase letters, numbers, dots (.), and hyphens (-).

required
object_prefix str

Prefix to be added to the object names in the object store.

required
store_credentials_json str

Path to the JSON file containing the object store credentials.

required
variables Optional[list[str]]

List of variables to send. If None, all variables will be sent.

None
append_dim Optional[str]

Name of the dimension to append multifile datasets.

'time_counter'
grid_filepath Optional[str]

Path to file containing model grid parameter.

None
update_coords Optional[dict]

Dictionary of coordinate variables to update.

None
rechunk Optional[dict]

Rechunk strategy dictionary, by default None.

None
attrs Optional[dict]

Attributes to add to the dataset.

None
branch str

Branch on which to write data to IcechunkStore.

'main'
commit_message str

Commit message when updating the Icechunk repository.

'Add new data to my Icechunk repository'
variable_commits bool

Whether to write each variable to Icechunk repository using separate commits.

False
dask_config_kwargs Optional[dict]

Dask configuration settings passed to dask.config.set().

None
dask_cluster_kwargs Optional[dict]

Dask cluster configuration settings passed to LocalCluster().

None
icechunk_config Optional[dict]

Icechunk repository configuration.

None
Source code in OceanDataStore/object_store_handler.py
def send_to_icechunk(
    file: list[str] | str | xr.Dataset,
    bucket: str,
    object_prefix: str,
    store_credentials_json: str,
    variables: Optional[list[str]] = None,
    append_dim: Optional[str] = 'time_counter',
    grid_filepath: Optional[str] = None,
    update_coords: Optional[dict] = None,
    rechunk: Optional[dict] = None,
    attrs: Optional[dict] = None,
    branch: str = "main",
    commit_message: str = "Add new data to my Icechunk repository",
    variable_commits: bool = False,
    dask_config_kwargs: Optional[dict] = None,
    dask_cluster_kwargs: Optional[dict] = None,
    icechunk_config: Optional[dict] = None,
    ) -> None:
    """
    Write data to new Icechunk repository in cloud object storage with
    option of using dask.

    Parameters
    ----------
    file: list | str | xarray.Dataset
        Regular expression or list of filepaths to netCDF file(s).
        Users can also pass a single xarray.Dataset directly.
    bucket: str
        Name of the bucket in the object store. Bucket names can contain only
        lowercase letters, numbers, dots (.), and hyphens (-).
    object_prefix: str
        Prefix to be added to the object names in the object store.
    store_credentials_json: str
        Path to the JSON file containing the object store credentials.
    variables: list[str], optional
        List of variables to send. If None, all variables will be sent.
    append_dim: str, default='time_counter'
        Name of the dimension to append multifile datasets.
    grid_filepath: str, optional
        Path to file containing model grid parameter.
    update_coords: dict, optional
        Dictionary of coordinate variables to update.
    rechunk: dict, optional
        Rechunk strategy dictionary, by default None.
    attrs: dict, optional
        Attributes to add to the dataset.
    branch: str, default="main"
        Branch on which to write data to IcechunkStore.
    commit_message: str, default="Initial commit"
        Commit message when updating the Icechunk repository.
    variable_commits: bool, default=False
        Whether to write each variable to Icechunk repository using
        separate commits.
    dask_config_kwargs: dict, optional
        Dask configuration settings passed to dask.config.set().
    dask_cluster_kwargs: dict, optional
        Dask cluster configuration settings passed to LocalCluster().
    icechunk_config: dict, optional
        Icechunk repository configuration.
    """
    if dask_cluster_kwargs is not None:
        # === Send to Icechunk repo(s) with Dask === #
        if dask_config_kwargs is not None:
            dask.config.set(dask_config_kwargs)
            logging.info("Updated dask configuration settings.")

        # Create local dask cluster & client:
        with LocalCluster(**dask_cluster_kwargs) as cluster, Client(cluster) as client:
            logging.info(f"Created LocalCluster with {dask_cluster_kwargs["n_workers"]} workers @ Client: {client.dashboard_link}")

            # Catch UserWarnings when rechunking data:
            client.register_worker_plugin(CaptureWarningsPlugin())

            _send_to_icechunk(file=file,
                              bucket=bucket,
                              object_prefix=object_prefix,
                              store_credentials_json=store_credentials_json,
                              variables=variables,
                              append_dim=append_dim,
                              grid_filepath=grid_filepath,
                              update_coords=update_coords,
                              rechunk=rechunk,
                              attrs=attrs,
                              parallel=True,
                              branch=branch,
                              commit_message=commit_message,
                              variable_commits=variable_commits,
                              icechunk_config=icechunk_config
                              )

            # --- Shutdown Store & Dask Cluster --- #
            client.shutdown()
            client.close()
            logging.info("Dask Cluster has been shutdown.")

    else:
        # === Send to Icechunk repo(s) without Dask === #
        _send_to_icechunk(file=file,
                          bucket=bucket,
                          object_prefix=object_prefix,
                          store_credentials_json=store_credentials_json,
                          variables=variables,
                          append_dim=append_dim,
                          grid_filepath=grid_filepath,
                          update_coords=update_coords,
                          rechunk=rechunk,
                          attrs=attrs,
                          parallel=False,
                          branch=branch,
                          commit_message=commit_message,
                          variable_commits=variable_commits,
                          icechunk_config=icechunk_config
                          )

OceanDataStore.object_store_handler.update_icechunk

update_icechunk(file, bucket, object_prefix, store_credentials_json, variables='all', append_dim='time_counter', grid_filepath=None, update_coords=None, rechunk=None, attrs=None, branch='main', commit_message='Update data in my Icechunk repository', dask_config_kwargs=None, dask_cluster_kwargs=None, icechunk_config=None)

Update data in existing Icechunk repository in cloud object storage with option of using dask.

Parameters:

Name Type Description Default
file list[str] | str | Dataset

Regular expression or list of filepaths to netCDF file(s). Users can also pass a single xarray.Dataset directly.

required
bucket str

Name of the bucket in the object store. Bucket names can contain only lowercase letters, numbers, dots (.), and hyphens (-).

required
object_prefix str

Prefix to be added to the object names in the object store.

required
store_credentials_json str

Path to the JSON file containing the object store credentials.

required
variables list[str] | str

List of variables to send. If None, all variables will be sent.

'all'
append_dim Optional[str]

Name of the dimension to append multifile datasets.

'time_counter'
grid_filepath Optional[str]

Path to file containing model grid parameter.

None
update_coords Optional[dict]

Dictionary of coordinate variables to update.

None
rechunk Optional[dict]

Rechunk strategy dictionary, by default None.

None
attrs Optional[dict]

Attributes to add to the dataset.

None
branch str

Branch on which to write data to IcechunkStore.

'main'
commit_message str

Commit message when updating the Icechunk repository.

'Update data in my Icechunk repository'
dask_config_kwargs Optional[dict]

Dask configuration settings passed to dask.config.set().

None
dask_cluster_kwargs Optional[dict]

Dask cluster configuration settings passed to LocalCluster().

None
icechunk_config Optional[dict]

Icechunk repository configuration.

None
Source code in OceanDataStore/object_store_handler.py
def update_icechunk(
    file: list[str] | str | xr.Dataset,
    bucket: str,
    object_prefix: str,
    store_credentials_json: str,
    variables: list[str] | str = 'all',
    append_dim: Optional[str] = 'time_counter',
    grid_filepath: Optional[str] = None,
    update_coords: Optional[dict] = None,
    rechunk: Optional[dict] = None,
    attrs: Optional[dict] = None,
    branch: str = "main",
    commit_message: str = "Update data in my Icechunk repository",
    dask_config_kwargs: Optional[dict] = None,
    dask_cluster_kwargs: Optional[dict] = None,
    icechunk_config: Optional[dict] = None,
    ) -> None:
    """
    Update data in existing Icechunk repository in cloud object
    storage with option of using dask.

    Parameters
    ----------
    file: list | str | xarray.Dataset
        Regular expression or list of filepaths to netCDF file(s).
        Users can also pass a single xarray.Dataset directly.
    bucket: str
        Name of the bucket in the object store. Bucket names can contain only
        lowercase letters, numbers, dots (.), and hyphens (-).
    object_prefix: str
        Prefix to be added to the object names in the object store.
    store_credentials_json: str
        Path to the JSON file containing the object store credentials.
    variables: list | str, default="all"
        List of variables to send. If None, all variables will be sent.
    append_dim: str, default='time_counter'
        Name of the dimension to append multifile datasets.
    grid_filepath: str, optional
        Path to file containing model grid parameter.
    update_coords: dict, optional
        Dictionary of coordinate variables to update.
    rechunk: dict, optional
        Rechunk strategy dictionary, by default None.
    attrs: dict, optional
        Attributes to add to the dataset.
    branch: str, default="main"
        Branch on which to write data to IcechunkStore.
    commit_message: str, default="Initial commit"
        Commit message when updating the Icechunk repository.
    dask_config_kwargs: dict, optional
        Dask configuration settings passed to dask.config.set().
    dask_cluster_kwargs: dict, optional
        Dask cluster configuration settings passed to LocalCluster().
    icechunk_config: dict, optional
        Icechunk repository configuration.
    """
    # === Update Icechunk repo(s) with Dask === #
    if dask_cluster_kwargs is not None:
        if dask_config_kwargs is not None:
            dask.config.set(dask_config_kwargs)
            logging.info("Updated dask configuration settings.")

        # Create local dask cluster & client:
        with LocalCluster(**dask_cluster_kwargs) as cluster, Client(cluster) as client:
            logging.info(f"Created LocalCluster with {dask_cluster_kwargs["n_workers"]} workers @ Client: {client.dashboard_link}")

            # Catch UserWarnings when rechunking data:
            client.register_worker_plugin(CaptureWarningsPlugin())

            _update_icechunk(file=file,
                             bucket=bucket,
                             object_prefix=object_prefix,
                             store_credentials_json=store_credentials_json,
                             variables=variables,
                             append_dim=append_dim,
                             grid_filepath=grid_filepath,
                             update_coords=update_coords,
                             rechunk=rechunk,
                             attrs=attrs,
                             parallel=True,
                             branch=branch,
                             commit_message=commit_message,
                             icechunk_config=icechunk_config
                             )

            # --- Shutdown Store & Dask Cluster --- #
            client.shutdown()
            client.close()
            logging.info("Dask Cluster has been shutdown.")

    else:
        # === Update Icechunk repo(s) without Dask === #
        _update_icechunk(file=file,
                         bucket=bucket,
                         object_prefix=object_prefix,
                         store_credentials_json=store_credentials_json,
                         variables=variables,
                         append_dim=append_dim,
                         grid_filepath=grid_filepath,
                         update_coords=update_coords,
                         rechunk=rechunk,
                         attrs=attrs,
                         parallel=False,
                         branch=branch,
                         commit_message=commit_message,
                         icechunk_config=icechunk_config
                         )