%matplotlib inline
import pandas as pd
import socket
host = socket.getfqdn()
from core import load, zoom, calc, save,plots,monitor
#reload funcs after updating ./core/*.py
import importlib
importlib.reload(load)
importlib.reload(zoom)
importlib.reload(calc)
importlib.reload(save)
importlib.reload(plots)
importlib.reload(monitor)
<module 'core.monitor' from '/ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/core/monitor.py'>
# 'month': = 'JOBID' almost month but not really,
# If you submit the job with job scheduler, above
#below are list of enviroment variable one can pass
#%env local='2"
# local : if True run dask local cluster, if not true, put number of workers
# setted in the 'local'
# if no 'local ' given, local will be setted automatically to 'True'
#%env ychunk='2'
#%env tchunk='2'
# controls chunk. 'False' sets no modification from original netcdf file's chunk.
# ychunk=10 will group the original netcdf file to 10 by 10
# tchunk=1 will chunk the time coordinate one by one
#%env control=FWC_SSH
# name of control file to be used for computation/plots/save/
#%env file_exp=
# 'file_exp': Which 'experiment' name is it?
#. this corresopnds to intake catalog name without path and .yaml
#%env year=
# for Validation, this correspoinds to path/year/month 's year
# for monitoring, this corresponids to 'date' having * means do all files in the monitoring directory
# setting it as *0[0-9] &*1[0-9]& *[2-3][0-9], the job can be separated in three lots.
#%env month=
# for monitoring this corresponds to file path path-XIOS.{month}/
#
#%env save= proceed saving? True or False , Default is setted as True
#%env plot= proceed plotting? True or False , Default is setted as True
#%env calc= proceed computation? or just load computed result? True or False , Default is setted as True
#%env save=False
%%time
# 'savefig': Do we save output in html? or not. keep it true.
savefig=True
client,cluster,control,catalog_url,month,year,daskreport,outputpath = load.set_control(host)
!mkdir -p $outputpath
!mkdir -p $daskreport
client
local True using host= irene5651.c-irene.mg1.tgcc.ccc.cea.fr starting dask cluster on local= True workers 16 10000000000 False not local in tgcc rome local cluster starting This code is running on irene5651.c-irene.mg1.tgcc.ccc.cea.fr using SEDNA_ALPHA_MONITOR file experiment, read from ../lib/SEDNA_ALPHA_MONITOR.yaml on year= * on month= 21 outputpath= ../results/SEDNA_ALPHA_MONITOR/21/ daskreport= ../results/dask/2672820irene5651.c-irene.mg1.tgcc.ccc.cea.fr_SEDNA_ALPHA_MONITOR_21IceConce/ CPU times: user 376 ms, sys: 245 ms, total: 621 ms Wall time: 11.7 s
Client
|
Cluster
|
df=load.controlfile(control)
#Take out 'later' tagged computations
#df=df[~df['Value'].str.contains('later')]
df
Value | Inputs | Equation | Zone | Plot | Colourmap | MinMax | Unit | Oldname | Unnamed: 10 | |
---|---|---|---|---|---|---|---|---|---|---|
IceConce | icemod.siconc | (data.siconc.where(data.siconc >0)).to_dataset... | ALL | maps | Blues | None | M-4 |
Each computation consists of
%%time
import os
calcswitch=os.environ.get('calc', 'True')
loaddata=((df.Inputs != '').any())
print('calcswitch=',calcswitch,'df.Inputs != nothing',loaddata)
data = load.datas(catalog_url,df.Inputs,month,year,daskreport) if ((calcswitch=='True' )*loaddata) else 0
data
calcswitch= True df.Inputs != nothing True ../lib/SEDNA_ALPHA_MONITOR.yaml using param_xios reading ../lib/SEDNA_ALPHA_MONITOR.yaml using param_xios reading <bound method DataSourceBase.describe of sources: param_xios: args: combine: by_coords concat_dim: y urlpath: /ccc/work/cont003/gen7420/odakatin/CONFIGS/SEDNA/SEDNA-I/SEDNA_Domain_cfg_Tgt_20210423_tsh10m_L1/param_f32/x_*.nc xarray_kwargs: compat: override coords: minimal data_vars: minimal parallel: true description: SEDNA NEMO parameters from MPI output nav_lon lat fails driver: intake_xarray.netcdf.NetCDFSource metadata: catalog_dir: /ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/../lib/ > {'name': 'param_xios', 'container': 'xarray', 'plugin': ['netcdf'], 'driver': ['netcdf'], 'description': 'SEDNA NEMO parameters from MPI output nav_lon lat fails', 'direct_access': 'forbid', 'user_parameters': [{'name': 'path', 'description': 'file coordinate', 'type': 'str', 'default': '/ccc/work/cont003/gen7420/odakatin/CONFIGS/SEDNA/MESH/SEDNA_mesh_mask_Tgt_20210423_tsh10m_L1/param'}], 'metadata': {}, 'args': {'urlpath': '/ccc/work/cont003/gen7420/odakatin/CONFIGS/SEDNA/SEDNA-I/SEDNA_Domain_cfg_Tgt_20210423_tsh10m_L1/param_f32/x_*.nc', 'combine': 'by_coords', 'concat_dim': 'y'}} 0 read icemod ['siconc'] using load_data_xios reading icemod using load_data_xios reading <bound method DataSourceBase.describe of sources: data_xios: args: combine: by_coords concat_dim: time_counter,x,y urlpath: /ccc/scratch/cont003/gen7420/talandel/ONGOING-RUNS/SEDNA-ALPHA-XIOS.21/SEDNA-ALPHA_1d_icemod_*_0[0-5][0-9][0-9].nc xarray_kwargs: compat: override coords: minimal data_vars: minimal drop_variables: !!set botpres: null deptht_bounds: null depthu_bounds: null iicestru: null iicestrv: null intstrx: null intstry: null mldkz5: null rhop_sig0: null siages: null sidive: null sisali: null sishea: null sistre: null sitemp: null snthic: null snvolu: null sometauy: null sozotaux: null time_centered_bounds: null time_counter_bounds: null utau_atmoce: null utau_iceoce: null uwspd10: null vtau_atmoce: null vtau_iceoce: null vwspd10: null parallel: true preprocess: !!python/name:core.load.prep '' description: SEDNA NEMO outputs from different xios server driver: intake_xarray.netcdf.NetCDFSource metadata: catalog_dir: /ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/../lib/ >
distributed.core - ERROR - Exception while handling op heartbeat_worker Traceback (most recent call last): File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/core.py", line 496, in handle_comm result = handler(comm, **msg) File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in heartbeat_worker parent._tasks[key]: duration for key, duration in executing.items() File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in <dictcomp> parent._tasks[key]: duration for key, duration in executing.items() KeyError: 'open_dataset-07077bbd-a226-4b69-99a7-fe7425d7de11' distributed.core - ERROR - Exception while handling op heartbeat_worker Traceback (most recent call last): File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/core.py", line 496, in handle_comm result = handler(comm, **msg) File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in heartbeat_worker parent._tasks[key]: duration for key, duration in executing.items() File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in <dictcomp> parent._tasks[key]: duration for key, duration in executing.items() KeyError: 'open_dataset-0c472b7d-3d77-498a-891a-cce51f1d9a9b' distributed.core - ERROR - Exception while handling op heartbeat_worker Traceback (most recent call last): File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/core.py", line 496, in handle_comm result = handler(comm, **msg) File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in heartbeat_worker parent._tasks[key]: duration for key, duration in executing.items() File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in <dictcomp> parent._tasks[key]: duration for key, duration in executing.items() KeyError: 'open_dataset-0a274c8c-36a0-4b96-b5e3-e77f334577a7' distributed.core - ERROR - Exception while handling op heartbeat_worker Traceback (most recent call last): File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/core.py", line 496, in handle_comm result = handler(comm, **msg) File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in heartbeat_worker parent._tasks[key]: duration for key, duration in executing.items() File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in <dictcomp> parent._tasks[key]: duration for key, duration in executing.items() KeyError: 'open_dataset-0c606697-3de8-4394-ba08-1fde144ba1de' distributed.core - ERROR - Exception while handling op heartbeat_worker Traceback (most recent call last): File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/core.py", line 496, in handle_comm result = handler(comm, **msg) File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in heartbeat_worker parent._tasks[key]: duration for key, duration in executing.items() File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in <dictcomp> parent._tasks[key]: duration for key, duration in executing.items() KeyError: 'open_dataset-09a4ab38-a1ea-4766-8402-4cf2e0358dc2' distributed.core - ERROR - Exception while handling op heartbeat_worker Traceback (most recent call last): File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/core.py", line 496, in handle_comm result = handler(comm, **msg) File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in heartbeat_worker parent._tasks[key]: duration for key, duration in executing.items() File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in <dictcomp> parent._tasks[key]: duration for key, duration in executing.items() KeyError: 'open_dataset-03e8f2cf-46a0-44ac-9c91-fb7da9cb0d2c' distributed.core - ERROR - Exception while handling op heartbeat_worker Traceback (most recent call last): File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/core.py", line 496, in handle_comm result = handler(comm, **msg) File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in heartbeat_worker parent._tasks[key]: duration for key, duration in executing.items() File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in <dictcomp> parent._tasks[key]: duration for key, duration in executing.items() KeyError: 'open_dataset-0d5b076d-0d9e-4afb-a702-3ea06c45c162' distributed.core - ERROR - Exception while handling op heartbeat_worker Traceback (most recent call last): File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/core.py", line 496, in handle_comm result = handler(comm, **msg) File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in heartbeat_worker parent._tasks[key]: duration for key, duration in executing.items() File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in <dictcomp> parent._tasks[key]: duration for key, duration in executing.items() KeyError: 'open_dataset-0a540741-fce3-4ebf-b754-18eaa4713354' distributed.core - ERROR - Exception while handling op heartbeat_worker Traceback (most recent call last): File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/core.py", line 496, in handle_comm result = handler(comm, **msg) File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in heartbeat_worker parent._tasks[key]: duration for key, duration in executing.items() File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in <dictcomp> parent._tasks[key]: duration for key, duration in executing.items() KeyError: 'open_dataset-0acb1096-7ce0-401f-a42d-d13fe0382bb5'
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) <timed exec> in <module> /ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/core/load.py in datas(catalog_url, dfi, month, year, daskreport, lazy) 601 # data=0 602 #else: --> 603 data=outputs(catalog_url,datadict,month,year,daskreport,lazy) 604 for s in paramdict: 605 print('param',s,'will be included in data') /ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/core/load.py in outputs(catalog_url, datadict, month, year, daskreport, lazy) 424 with performance_report(filename=daskreport+"_load_output_"+filename+"_"+month+year+".html"): 425 #ds=load_data_xios_patch(cat,filename,month,catalog_url) --> 426 ds = load_data_xios(cat,filename,items,month,year) if lazy else persist_data_xios(cat,filename,items,month,year) 427 extime=time.time() - start 428 print(' took', extime, 'seconds') /ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/core/load.py in load_data_xios(cat, filename, items, month, year) 321 desc=cat.data_xios(file=filename,month=month,year=year ,xarray_kwargs=xarray_kwargs).describe 322 print('using load_data_xios reading ',desc) --> 323 ds = cat.data_xios(file=filename,month=month,year=year ,xarray_kwargs=xarray_kwargs).to_dask() 324 ds = ds[items] 325 return ds ~/monitor/lib/python3.7/site-packages/intake_xarray/base.py in to_dask(self) 67 def to_dask(self): 68 """Return xarray object where variables are dask arrays""" ---> 69 return self.read_chunked() 70 71 def close(self): ~/monitor/lib/python3.7/site-packages/intake_xarray/base.py in read_chunked(self) 42 def read_chunked(self): 43 """Return xarray object (which will have chunks)""" ---> 44 self._load_metadata() 45 return self._ds 46 ~/monitor/lib/python3.7/site-packages/intake/source/base.py in _load_metadata(self) 234 """load metadata only if needed""" 235 if self._schema is None: --> 236 self._schema = self._get_schema() 237 self.dtype = self._schema.dtype 238 self.shape = self._schema.shape ~/monitor/lib/python3.7/site-packages/intake_xarray/base.py in _get_schema(self) 16 17 if self._ds is None: ---> 18 self._open_dataset() 19 20 metadata = { ~/monitor/lib/python3.7/site-packages/intake_xarray/netcdf.py in _open_dataset(self) 79 url = fsspec.open_local(url, **self.storage_options) 80 ---> 81 self._ds = _open_dataset(url, chunks=self.chunks, **kwargs) 82 83 def _add_path_to_ds(self, ds): ~/monitor/lib/python3.7/site-packages/xarray/backends/api.py in open_mfdataset(paths, chunks, concat_dim, compat, preprocess, engine, lock, data_vars, coords, combine, autoclose, parallel, join, attrs_file, **kwargs) 954 # calling compute here will return the datasets/file_objs lists, 955 # the underlying datasets will still be stored as dask arrays --> 956 datasets, file_objs = dask.compute(datasets, file_objs) 957 958 # Combine all datasets, closing them in case of a ValueError ~/monitor/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs) 561 postcomputes.append(x.__dask_postcompute__()) 562 --> 563 results = schedule(dsk, keys, **kwargs) 564 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 565 ~/monitor/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) 2653 should_rejoin = False 2654 try: -> 2655 results = self.gather(packed, asynchronous=asynchronous, direct=direct) 2656 finally: 2657 for f in futures.values(): ~/monitor/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous) 1968 direct=direct, 1969 local_worker=local_worker, -> 1970 asynchronous=asynchronous, 1971 ) 1972 ~/monitor/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 837 else: 838 return sync( --> 839 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 840 ) 841 ~/monitor/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs) 338 if error[0]: 339 typ, exc, tb = error[0] --> 340 raise exc.with_traceback(tb) 341 else: 342 return result[0] ~/monitor/lib/python3.7/site-packages/distributed/utils.py in f() 322 if callback_timeout is not None: 323 future = asyncio.wait_for(future, callback_timeout) --> 324 result[0] = yield future 325 except Exception as exc: 326 error[0] = sys.exc_info() ~/monitor/lib/python3.7/site-packages/tornado/gen.py in run(self) 760 761 try: --> 762 value = future.result() 763 except Exception: 764 exc_info = sys.exc_info() ~/monitor/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker) 1827 exc = CancelledError(key) 1828 else: -> 1829 raise exception.with_traceback(traceback) 1830 raise exc 1831 if errors == "skip": ~/monitor/lib/python3.7/site-packages/dask/utils.py in apply() 33 def apply(func, args, kwargs=None): 34 if kwargs: ---> 35 return func(*args, **kwargs) 36 else: 37 return func(*args) ~/monitor/lib/python3.7/site-packages/xarray/backends/api.py in open_dataset() 573 574 with close_on_error(store): --> 575 ds = maybe_decode_store(store, chunks) 576 577 # Ensure source filename always stored in dataset object (GH issue #2550) ~/monitor/lib/python3.7/site-packages/xarray/backends/api.py in maybe_decode_store() 477 drop_variables=drop_variables, 478 use_cftime=use_cftime, --> 479 decode_timedelta=decode_timedelta, 480 ) 481 ~/monitor/lib/python3.7/site-packages/xarray/conventions.py in decode_cf() 598 decode_timedelta=decode_timedelta, 599 ) --> 600 ds = Dataset(vars, attrs=attrs) 601 ds = ds.set_coords(coord_names.union(extra_coords).intersection(vars)) 602 ds._file_obj = file_obj ~/monitor/lib/python3.7/site-packages/xarray/core/dataset.py in __init__() 629 630 variables, coord_names, dims, indexes, _ = merge_data_and_coords( --> 631 data_vars, coords, compat="broadcast_equals" 632 ) 633 ~/monitor/lib/python3.7/site-packages/xarray/core/merge.py in merge_data_and_coords() 466 indexes = dict(_extract_indexes_from_coords(coords)) 467 return merge_core( --> 468 objects, compat, join, explicit_coords=explicit_coords, indexes=indexes 469 ) 470 ~/monitor/lib/python3.7/site-packages/xarray/core/merge.py in merge_core() 592 coerced, join=join, copy=False, indexes=indexes, fill_value=fill_value 593 ) --> 594 collected = collect_variables_and_indexes(aligned) 595 596 prioritized = _get_priority_vars_and_indexes(aligned, priority_arg, compat=compat) ~/monitor/lib/python3.7/site-packages/xarray/core/merge.py in collect_variables_and_indexes() 276 append_all(coords, indexes) 277 --> 278 variable = as_variable(variable, name=name) 279 if variable.dims == (name,): 280 variable = variable.to_index_variable() ~/monitor/lib/python3.7/site-packages/xarray/core/variable.py in as_variable() 158 "dimensions." % (name, obj.dims) 159 ) --> 160 obj = obj.to_index_variable() 161 162 return obj ~/monitor/lib/python3.7/site-packages/xarray/core/variable.py in to_index_variable() 527 """Return this variable as an xarray.IndexVariable""" 528 return IndexVariable( --> 529 self.dims, self._data, self._attrs, encoding=self._encoding, fastpath=True 530 ) 531 ~/monitor/lib/python3.7/site-packages/xarray/core/variable.py in __init__() 2410 # Unlike in Variable, always eagerly load values into memory 2411 if not isinstance(self._data, PandasIndexAdapter): -> 2412 self._data = PandasIndexAdapter(self._data) 2413 2414 def __dask_tokenize__(self): ~/monitor/lib/python3.7/site-packages/xarray/core/indexing.py in __init__() 1395 1396 def __init__(self, array: Any, dtype: DTypeLike = None): -> 1397 self.array = utils.safe_cast_to_index(array) 1398 if dtype is None: 1399 if isinstance(array, pd.PeriodIndex): ~/monitor/lib/python3.7/site-packages/xarray/core/utils.py in safe_cast_to_index() 102 if hasattr(array, "dtype") and array.dtype.kind == "O": 103 kwargs["dtype"] = object --> 104 index = pd.Index(np.asarray(array), **kwargs) 105 return _maybe_cast_to_cftimeindex(index) 106 ~/monitor/lib/python3.7/site-packages/numpy/core/_asarray.py in asarray() 81 82 """ ---> 83 return array(a, dtype, copy=False, order=order) 84 85 ~/monitor/lib/python3.7/site-packages/xarray/core/indexing.py in __array__() 566 def __array__(self, dtype=None): 567 array = as_indexable(self.array) --> 568 return np.asarray(array[self.key], dtype=None) 569 570 def transpose(self, order): ~/monitor/lib/python3.7/site-packages/numpy/core/_asarray.py in asarray() 81 82 """ ---> 83 return array(a, dtype, copy=False, order=order) 84 85 ~/monitor/lib/python3.7/site-packages/xarray/coding/variables.py in __array__() 68 69 def __array__(self, dtype=None): ---> 70 return self.func(self.array) 71 72 def __repr__(self): ~/monitor/lib/python3.7/site-packages/xarray/coding/times.py in decode_cf_datetime() 199 200 if ( --> 201 dates[np.nanargmin(num_dates)].year < 1678 202 or dates[np.nanargmax(num_dates)].year >= 2262 203 ): <__array_function__ internals> in nanargmin() ~/monitor/lib/python3.7/site-packages/numpy/lib/nanfunctions.py in nanargmin() 493 """ 494 a, mask = _replace_nan(a, np.inf) --> 495 res = np.argmin(a, axis=axis) 496 if mask is not None: 497 mask = np.all(mask, axis=axis) <__array_function__ internals> in argmin() ~/monitor/lib/python3.7/site-packages/numpy/core/fromnumeric.py in argmin() 1267 1268 """ -> 1269 return _wrapfunc(a, 'argmin', axis=axis, out=out) 1270 1271 ~/monitor/lib/python3.7/site-packages/numpy/core/fromnumeric.py in _wrapfunc() 56 57 try: ---> 58 return bound(*args, **kwds) 59 except TypeError: 60 # A TypeError occurs if the object does have such a method in its ValueError: attempt to get argmin of an empty sequence
%%time
monitor.auto(df,data,savefig,daskreport,outputpath,file_exp='SEDNA'
)
--------------------------------------------------------------------------- NameError Traceback (most recent call last) <timed eval> in <module> NameError: name 'data' is not defined