%matplotlib inline
import pandas as pd
import socket
host = socket.getfqdn()
from core import load, zoom, calc, save,plots,monitor
#reload funcs after updating ./core/*.py
import importlib
importlib.reload(load)
importlib.reload(zoom)
importlib.reload(calc)
importlib.reload(save)
importlib.reload(plots)
importlib.reload(monitor)
<module 'core.monitor' from '/ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/core/monitor.py'>
# 'month': = 'JOBID' almost month but not really,
# If you submit the job with job scheduler, above
#below are list of enviroment variable one can pass
#%env local='2"
# local : if True run dask local cluster, if not true, put number of workers
# setted in the 'local'
# if no 'local ' given, local will be setted automatically to 'True'
#%env ychunk='2'
#%env tchunk='2'
# controls chunk. 'False' sets no modification from original netcdf file's chunk.
# ychunk=10 will group the original netcdf file to 10 by 10
# tchunk=1 will chunk the time coordinate one by one
#%env control=Ints_monitor
# name of control file to be used for computation/plots/save/
#%env file_exp=
# 'file_exp': Which 'experiment' name is it?
#. this corresopnds to intake catalog name without path and .yaml
#%env year=
# for Validation, this correspoinds to path/year/month 's year
# for monitoring, this corresponids to 'date' having * means do all files in the monitoring directory
# setting it as *0[0-9] &*1[0-9]& *[2-3][0-9], the job can be separated in three lots.
#%env month=
# for monitoring this corresponds to file path path-XIOS.{month}/
#
#%env save= proceed saving? True or False , Default is setted as True
#%env plot= proceed plotting? True or False , Default is setted as True
#%env calc= proceed computation? or just load computed result? True or False , Default is setted as True
%%time
# 'savefig': Do we save output in html? or not. keep it true.
savefig=True
client,cluster,control,catalog_url,month,year,daskreport,outputpath = load.set_control(host)
!mkdir -p $outputpath
!mkdir -p $daskreport
client
local True using host= irene4388.c-irene.mg1.tgcc.ccc.cea.fr starting dask cluster on local= True workers 16 10000000000 False not local in tgcc rome local cluster starting This code is running on irene4388.c-irene.mg1.tgcc.ccc.cea.fr using SEDNA_ALPHA_MONITOR file experiment, read from ../lib/SEDNA_ALPHA_MONITOR.yaml on year= *2 on month= 23 outputpath= ../results/rome_SEDNA_ALPHA_MONITOR/23/ daskreport= ../results/dask/2514002irene4388.c-irene.mg1.tgcc.ccc.cea.fr_SEDNA_ALPHA_MONITOR_23AW_maxtemp_depth_moni/ CPU times: user 309 ms, sys: 237 ms, total: 546 ms Wall time: 8.58 s
Client
|
Cluster
|
df=load.controlfile(control)
#Take out 'later' tagged computations
df=df[~df['Value'].str.contains('later')]
df
Value | Inputs | Equation | Zone | Plot | Colourmap | MinMax | Unit | Oldname | Unnamed: 10 | |
---|---|---|---|---|---|---|---|---|---|---|
AW_maxtemp_depth | gridT.votemper,gridS.vosaline,param.mask,param... | calc.AWTD4(data) | ALL | AWTD_map | jet | (0,800) | m | M-5 |
Each computation consists of
%%time
#todo add 'year' here.
data=load.datas(catalog_url,df.Inputs,month,year,daskreport)
#print('#1 Data: created:')
#print('# if we raed too much file, we can do sel to take out some dates here')
data
../lib/SEDNA_ALPHA_MONITOR.yaml using param_xios reading ../lib/SEDNA_ALPHA_MONITOR.yaml using param_xios reading <bound method DataSourceBase.describe of sources: param_xios: args: combine: by_coords concat_dim: y urlpath: /ccc/work/cont003/gen7420/odakatin/CONFIGS/SEDNA/SEDNA-I/SEDNA_Domain_cfg_Tgt_20210423_tsh10m_L1/param_f32/x_*.nc xarray_kwargs: compat: override coords: minimal data_vars: minimal parallel: true description: SEDNA NEMO parameters from MPI output nav_lon lat fails driver: intake_xarray.netcdf.NetCDFSource metadata: catalog_dir: /ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/../lib/ > {'name': 'param_xios', 'container': 'xarray', 'plugin': ['netcdf'], 'driver': ['netcdf'], 'description': 'SEDNA NEMO parameters from MPI output nav_lon lat fails', 'direct_access': 'forbid', 'user_parameters': [{'name': 'path', 'description': 'file coordinate', 'type': 'str', 'default': '/ccc/work/cont003/gen7420/odakatin/CONFIGS/SEDNA/MESH/SEDNA_mesh_mask_Tgt_20210423_tsh10m_L1/param'}], 'metadata': {}, 'args': {'urlpath': '/ccc/work/cont003/gen7420/odakatin/CONFIGS/SEDNA/SEDNA-I/SEDNA_Domain_cfg_Tgt_20210423_tsh10m_L1/param_f32/x_*.nc', 'combine': 'by_coords', 'concat_dim': 'y'}} 0 read gridS ['vosaline'] using load_data_xios reading gridS using load_data_xios reading <bound method DataSourceBase.describe of sources: data_xios: args: combine: by_coords concat_dim: time_counter,x,y urlpath: /ccc/scratch/cont003/gen7420/talandel/ONGOING-RUNS/SEDNA-ALPHA-XIOS.23/SEDNA-ALPHA_1d_gridS_*2_0[0-5][0-9][0-9].nc xarray_kwargs: compat: override coords: minimal data_vars: minimal drop_variables: !!set deptht_bounds: null depthu_bounds: null nav_lat: null nav_lon: null time_centerd: null time_centered_bounds: null time_counter_bounds: null parallel: true preprocess: !!python/name:core.load.prep '' description: SEDNA NEMO outputs from different xios server driver: intake_xarray.netcdf.NetCDFSource metadata: catalog_dir: /ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/../lib/ > took 137.05811429023743 seconds 0 merging gridS ['vosaline'] 1 read gridT ['votemper'] using load_data_xios reading gridT using load_data_xios reading <bound method DataSourceBase.describe of sources: data_xios: args: combine: by_coords concat_dim: time_counter,x,y urlpath: /ccc/scratch/cont003/gen7420/talandel/ONGOING-RUNS/SEDNA-ALPHA-XIOS.23/SEDNA-ALPHA_1d_gridT_*2_0[0-5][0-9][0-9].nc xarray_kwargs: compat: override coords: minimal data_vars: minimal drop_variables: !!set deptht_bounds: null depthu_bounds: null nav_lat: null nav_lon: null time_centerd: null time_centered_bounds: null time_counter_bounds: null parallel: true preprocess: !!python/name:core.load.prep '' description: SEDNA NEMO outputs from different xios server driver: intake_xarray.netcdf.NetCDFSource metadata: catalog_dir: /ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/../lib/ > took 16.337703943252563 seconds 1 merging gridT ['votemper'] took 0.2422797679901123 seconds param mask2d will be included in data param nav_lon will be included in data param mask will be included in data param depth will be included in data param nav_lat will be included in data sum_num (13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12) start rechunking with (130, 122, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 48) end of y_rechunk CPU times: user 30.5 s, sys: 18.7 s, total: 49.2 s Wall time: 2min 46s
<xarray.Dataset> Dimensions: (t: 3, x: 6560, y: 6540, z: 150) Coordinates: * t (t) object 2004-07-02 12:00:00 ... 2004-07-22 12:00:00 * y (y) int64 1 2 3 4 5 6 7 8 ... 6534 6535 6536 6537 6538 6539 6540 * x (x) int64 1 2 3 4 5 6 7 8 ... 6554 6555 6556 6557 6558 6559 6560 * z (z) int64 1 2 3 4 5 6 7 8 9 ... 143 144 145 146 147 148 149 150 mask2d (y, x) bool dask.array<chunksize=(130, 6560), meta=np.ndarray> nav_lon (y, x) float32 dask.array<chunksize=(130, 6560), meta=np.ndarray> mask (z, y, x) bool dask.array<chunksize=(150, 130, 6560), meta=np.ndarray> depth (z, y, x) float32 dask.array<chunksize=(150, 130, 6560), meta=np.ndarray> nav_lat (y, x) float32 dask.array<chunksize=(130, 6560), meta=np.ndarray> Data variables: vosaline (t, z, y, x) float32 dask.array<chunksize=(1, 150, 130, 6560), meta=np.ndarray> votemper (t, z, y, x) float32 dask.array<chunksize=(1, 150, 130, 6560), meta=np.ndarray>
array([cftime.DatetimeNoLeap(2004, 7, 2, 12, 0, 0, 0), cftime.DatetimeNoLeap(2004, 7, 12, 12, 0, 0, 0), cftime.DatetimeNoLeap(2004, 7, 22, 12, 0, 0, 0)], dtype=object)
array([ 1, 2, 3, ..., 6538, 6539, 6540])
array([ 1, 2, 3, ..., 6558, 6559, 6560])
array([ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150])
|
|
|
|
|
|
|
%%time
monitor.auto(df,data,savefig,daskreport,outputpath,file_exp='SEDNA'
)
True True ../nc_results/rome_SEDNA_ALPHA_MONITOR/23/ switch:calcswitch,saveswitch,plotswitch True True False dtaa= calc.AWTD4(data) #3 Start computing count: <xarray.Dataset> Dimensions: () Data variables: vosaline int64 dask.array<chunksize=(), meta=np.ndarray> votemper int64 dask.array<chunksize=(), meta=np.ndarray>
<xarray.Dataset> Dimensions: () Data variables: vosaline int64 dask.array<chunksize=(), meta=np.ndarray> votemper int64 dask.array<chunksize=(), meta=np.ndarray>
|
|
nbytes: 1415884024 count: <xarray.Dataset> Dimensions: () Data variables: AWT int64 dask.array<chunksize=(), meta=np.ndarray> AWD int64 dask.array<chunksize=(), meta=np.ndarray>
<xarray.Dataset> Dimensions: (t: 3, x: 6560, y: 6540) Coordinates: * t (t) object 2004-07-02 12:00:00 ... 2004-07-22 12:00:00 * y (y) int64 1 2 3 4 5 6 7 8 ... 6534 6535 6536 6537 6538 6539 6540 * x (x) int64 1 2 3 4 5 6 7 8 ... 6554 6555 6556 6557 6558 6559 6560 mask2d (y, x) bool dask.array<chunksize=(130, 6560), meta=np.ndarray> nav_lon (y, x) float32 dask.array<chunksize=(130, 6560), meta=np.ndarray> nav_lat (y, x) float32 dask.array<chunksize=(130, 6560), meta=np.ndarray> Data variables: AWT (t, y, x) float32 dask.array<chunksize=(1, 130, 6560), meta=np.ndarray> AWD (t, y, x) float32 dask.array<chunksize=(1, 130, 6560), meta=np.ndarray>
array([cftime.DatetimeNoLeap(2004, 7, 2, 12, 0, 0, 0), cftime.DatetimeNoLeap(2004, 7, 12, 12, 0, 0, 0), cftime.DatetimeNoLeap(2004, 7, 22, 12, 0, 0, 0)], dtype=object)
array([ 1, 2, 3, ..., 6538, 6539, 6540])
array([ 1, 2, 3, ..., 6558, 6559, 6560])
|
|
|
|
|
filename='SEDNA_AWTD_map_ALL_AW_maxtemp_depth' outputpath='../nc_results/rome_SEDNA_ALPHA_MONITOR/23/' saving starting start saving data saving data in a file before ds.persist
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.core - ERROR - Exception while handling op heartbeat_worker Traceback (most recent call last): File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/core.py", line 496, in handle_comm result = handler(comm, **msg) File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in heartbeat_worker parent._tasks[key]: duration for key, duration in executing.items() File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in <dictcomp> parent._tasks[key]: duration for key, duration in executing.items() KeyError: "('open_dataset-concatenate-fe92b59066cddda074aca66d4f1e29e8', 0, 503, 0)" distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Restarting worker distributed.core - ERROR - Exception while handling op heartbeat_worker Traceback (most recent call last): File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/core.py", line 496, in handle_comm result = handler(comm, **msg) File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in heartbeat_worker parent._tasks[key]: duration for key, duration in executing.items() File "/ccc/cont003/home/ifremer/odakatin/monitor/lib/python3.7/site-packages/distributed/scheduler.py", line 3431, in <dictcomp> parent._tasks[key]: duration for key, duration in executing.items() KeyError: "('open_dataset-concatenate-8b35e92294f81bb419e65088be8f44ee', 0, 0, 501, 0)" distributed.nanny - WARNING - Restarting worker
--------------------------------------------------------------------------- KilledWorker Traceback (most recent call last) <timed eval> in <module> /ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/core/monitor.py in auto(df, val, savefig, daskreport, outputpath, file_exp) 59 print('outputpath=\''+nc_outputpath+'\'', 'saving starting') 60 with performance_report(filename=daskreport+"_save_"+step.Value+".html"): ---> 61 save.datas(data,plot=step.Plot,path=nc_outputpath,filename=filename) 62 # 5. Plot 63 cmap=step.Colourmap /ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/core/save.py in datas(data, plot, path, filename) 17 print('save computed data at',savedfile,'completed') 18 else : ---> 19 twoD(data,path,filename) 20 return None 21 /ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/core/save.py in twoD(data, path, filename) 46 print('saving data in a file') 47 filesave=path+filename ---> 48 return to_mfnetcdf_map(data,prefix=filesave, nested=True) 49 50 def twoD_onefile(data /ccc/work/cont003/gen7420/odakatin/monitor-sedna/notebook/core/save.py in to_mfnetcdf_map(ds, prefix, nested) 196 template=ds 197 print('before ds.persist') --> 198 ds.compute() 199 print('after ds.persist') 200 mapped=xr.map_blocks( ~/monitor/lib/python3.7/site-packages/xarray/core/dataset.py in compute(self, **kwargs) 904 """ 905 new = self.copy(deep=False) --> 906 return new.load(**kwargs) 907 908 def _persist_inplace(self, **kwargs) -> "Dataset": ~/monitor/lib/python3.7/site-packages/xarray/core/dataset.py in load(self, **kwargs) 739 740 # evaluate all the dask arrays simultaneously --> 741 evaluated_data = da.compute(*lazy_data.values(), **kwargs) 742 743 for k, data in zip(lazy_data, evaluated_data): ~/monitor/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs) 561 postcomputes.append(x.__dask_postcompute__()) 562 --> 563 results = schedule(dsk, keys, **kwargs) 564 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 565 ~/monitor/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) 2653 should_rejoin = False 2654 try: -> 2655 results = self.gather(packed, asynchronous=asynchronous, direct=direct) 2656 finally: 2657 for f in futures.values(): ~/monitor/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous) 1968 direct=direct, 1969 local_worker=local_worker, -> 1970 asynchronous=asynchronous, 1971 ) 1972 ~/monitor/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 837 else: 838 return sync( --> 839 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 840 ) 841 ~/monitor/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs) 338 if error[0]: 339 typ, exc, tb = error[0] --> 340 raise exc.with_traceback(tb) 341 else: 342 return result[0] ~/monitor/lib/python3.7/site-packages/distributed/utils.py in f() 322 if callback_timeout is not None: 323 future = asyncio.wait_for(future, callback_timeout) --> 324 result[0] = yield future 325 except Exception as exc: 326 error[0] = sys.exc_info() ~/monitor/lib/python3.7/site-packages/tornado/gen.py in run(self) 760 761 try: --> 762 value = future.result() 763 except Exception: 764 exc_info = sys.exc_info() ~/monitor/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker) 1827 exc = CancelledError(key) 1828 else: -> 1829 raise exception.with_traceback(traceback) 1830 raise exc 1831 if errors == "skip": KilledWorker: ("('open_dataset-concatenate-e6d8ff9f2d38e0042c95464fa9096fce', 308, 0)", <Worker 'tcp://127.0.0.1:40931', name: 10, memory: 0, processing: 856>)