# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""
import dolfin as df
import pygimli as pg
import comet
import custEM as ce
import numpy as np
import json
import platform
import os
import gc
import sys
import resource
#import psutil
import logging
from mpi4py import MPI
from custEM.misc import logger_print as lp
"""
Utility functions used for analyzing model and computational parameters in
custEM.
"""
[docs]
def get_logger(log_level, profiler, out_path):
"""
Initialize logger for all custEM prints during simulations. The information
in the command promt will be printed corresponding to the specified
*debug_level* level in the MOD class. Furthermore, a *log* file is stored
in the export directory of the results if *profiler=True*,
using always the *debug* level for the log files.
Required arguments
------------------
- log_level, type str or int
specify log_level, see MOD class description
- profiler, type bool
flag to set if *log*-field should be created or not
- out_path, type bool
path where *log*-file should be created
"""
# create logger
logger = logging.getLogger('custEM')
logger.propagate = False
# create formatter and add it to the handlers
formatter = logging.Formatter('%(message)s')
if df.MPI.rank(df.MPI.comm_world) == 0 and len(logger.handlers) == 0:
if os.path.exists(out_path + '_debug.log'):
os.remove(out_path + '_debug.log')
df.MPI.barrier(df.MPI.comm_world)
# create console handler for logger.
sh = logging.StreamHandler(stream=sys.stdout)
if type(log_level) is int and log_level in [10, 20, 30, 40, 50]:
ll = log_level
elif type(log_level) is str and 'debug' in log_level.lower():
ll = 10
elif type(log_level) is str and 'info' in log_level.lower():
ll = 20
elif type(log_level) is str and 'warning' in log_level.lower():
ll = 30
elif type(log_level) is str and 'error' in log_level.lower():
ll = 40
elif type(log_level) is str and 'critical' in log_level.lower():
ll = 50
else:
mpi_print('Fatal Error! Specify a valid log level. Aborting ...',
post_dash=True)
raise SystemExit
logger.setLevel(ll)
sh.setLevel(ll)
sh.setFormatter(formatter)
if len(logger.handlers) != 2:
logger.addHandler(sh)
else:
logger.handlers[0].setLevel(ll)
if profiler:
# create file handler for logger.
fh = logging.FileHandler(out_path + '_debug.log')
fh.setLevel(level=logging.DEBUG)
fh.setFormatter(formatter)
if len(logger.handlers) != 2:
logger.addHandler(fh)
return(logger)
[docs]
def max_mem(total=True, to_store=False, logger=None, fmt='GiB'):
"""
Print the maximum memory requirements, either for each MPI process or
in total.
Keyword arguments
-----------------
- total=True, type bool
Flag if memory consumption should be printed in total or per process
- to_store = False, type bool
set **True** if value should be returned
"""
if fmt == 'GiB':
mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024**2
elif fmt == 'MiB':
mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024
else:
mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
if total:
all_mem = np.sum(np.array(MPI.COMM_WORLD.gather(mem, root=0),
dtype=float))
if df.MPI.rank(df.MPI.comm_world) > 0:
all_mem = 0 # fix for str(int(None)) for non-root processes
else:
if fmt == 'GiB':
all_mem = int(all_mem) + 1 # round up to next full GB
else:
all_mem = all_mem
if to_store:
return(all_mem)
else:
if logger is None:
if df.MPI.rank(df.MPI.comm_world) == 0:
print(' - current maximum memory usage (' + fmt +
') --> ' + str(all_mem) + ' - ')
else:
pass
else:
lp(logger, 20,
' - current maximum memory usage (GiB) --> ' +
str(all_mem) + ' - ', pre_dash=False)
else:
mem = int(mem) + 1 # round up to next full GB
if to_store:
return(mem)
else:
if logger is None:
print(' - current maximum memory usage of process ' +
str(df.MPI.rank(df.MPI.comm_world)) + ' (GiB) --> ' +
str(mem) + ' - ')
else:
lp(logger, 20,
' - current maximum memory usage of process ' +
str(df.MPI.rank(df.MPI.comm_world)) + ' (GiB) --> ' +
str(mem) + ' - ', pre_dash=False, root_only=False)
# def current_mem(total=True, fmt='MiB'):
# if fmt != 'MiB':
# print('Error! Only MiB supported as format for current_mem() function')
# raise SystemExit
# # return the memory usage in MB
# df.MPI.barrier(df.MPI.comm_world)
# process = psutil.Process(os.getpid())
# mem = process.memory_info()[0] / float(2 ** 20)
# all_mem = np.sum(np.array(MPI.COMM_WORLD.gather(mem, root=0),
# dtype=float))
# if total:
# if df.MPI.rank(df.MPI.comm_world) == 0:
# print(' - currently using {0:12.3f} MiB of memory - '.format(
# all_mem))
# else:
# print(' - process ' + str(df.MPI.rank(df.MPI.comm_world)) +
# ' is currently using {0:12.3f} MiB of memory - '.format(
# mem))
[docs]
def export_config_file(PP):
"""
Write model parameters - mainly MP and FE instance dictionaries -
to file in the specified export directory (*out_dir*) using JSON.
Required arguments
------------------
- PP, type class
PostProcessing instance
"""
if PP.MP.mpi_rank == 0:
to_del = ['sigma_func', 'sigma_inv_func', 'mu_func', 'mu_inv_func',
'eps_func', 'eps_inv_func', 'dsigma_func', 'dsigma_inv_func',
'sigma0_func', 'sigma0_inv_func', 'path', 'mpi_cw', 'mpi_cs']
A = PP.MP.__dict__.copy()
for item in to_del:
if item in A:
del A[item]
if A['sigma_tensor_flag']:
del A['sigma']
del A['sigma_0']
del A['sigma_air']
del A['sigma_ground']
del A['delta_sigma']
A['logger'] = PP.MP.logger.level
A['topo'] = str(A['topo'])
A['bc'] = PP.FE.__dict__['bc']
A['s_type'] = PP.FE.__dict__['s_type']
A['grounding'] = PP.FE.__dict__['grounding']
if PP.FE.__dict__['tx'] is None:
all_tx = None
else:
all_tx = []
for tx in PP.FE.__dict__['tx']:
if type(tx) is list:
all_tx.append(tx)
else:
all_tx.append(tx.tolist())
A['tx'] = all_tx
if type(A['currents']) is not list and PP.MP.currents is not None:
A['currents'] = PP.MP.currents.tolist()
A['n_tx'] = PP.FE.__dict__['n_tx']
A['origin'] = PP.FE.__dict__['origin']
A['radius'] = PP.FE.__dict__['radius']
A['start'] = PP.FE.__dict__['start']
A['stop'] = PP.FE.__dict__['stop']
A['length'] = PP.FE.__dict__['length']
A['azimuth'] = PP.FE.__dict__['azimuth']
if 'max_length' in PP.FE.__dict__:
A['max_length'] = PP.FE.__dict__['max_length']
if 'n_segs' in PP.FE.__dict__:
A['n_segs'] = PP.FE.__dict__['n_segs']
if type(A['omegas']) is not list:
A['omegas'] = A['omegas'].tolist()
if type(A['frequencies']) is not list:
A['frequencies'] = A['frequencies'].tolist()
with open(PP.export_dir + "_config.json", "w") as outfile:
json.dump(A, outfile, indent=0)
else:
pass
lp(PP.MP.logger, 20,
'... storing parameter file in export directory ...',
pre_dash=False)
[docs]
def export_resource_file(PP):
"""
Write consumed computational resources and times to file in the
specified export directory (*out_dir*) using JSON.
Required arguments
------------------
- PP, type class
PostProcessing instance
"""
A = dict()
max_memory = max_mem(to_store=True)
n_mpi = MPI.COMM_WORLD.size
PP.FS.mesh.init_global(1)
PP.FS.mesh.init_global(2)
n_nodes = PP.FS.mesh.num_entities_global(0)
n_edges = PP.FS.mesh.num_entities_global(1)
n_faces = PP.FS.mesh.num_entities_global(2)
n_cells = PP.FS.mesh.num_entities_global(3)
all_cells = [cell for cell in df.cells(PP.FS.mesh)]
volumes = np.array([cell.volume() for cell in all_cells])
radius_ratios = np.array([cell.radius_ratio() for cell in all_cells])
if PP.MP.mpi_rank == 0:
try:
n_threads = int(os.environ['OMP_NUM_THREADS'])
except KeyError:
n_threads = 0
A['max_mem'] = max_memory
A['assembly_time'] = PP.FE.assembly_time
A['solution_time'] = PP.FS.solution_time
A['system_size'] = PP.FE.system_size
A['omp_threads'] = n_threads
A['mpi_procs'] = n_mpi
A['nodes'] = n_nodes
A['edges'] = n_edges
A['faces'] = n_faces
A['cells'] = n_cells
A['dof'] = PP.FE.system_size
A['min_volume'] = np.min(volumes)
A['median_volume'] = np.median(volumes)
A['max_volume'] = np.max(volumes)
A['min_rratio'] = np.min(radius_ratios)
A['median_rratio'] = np.median(radius_ratios)
A['max_rratio'] = np.max(radius_ratios)
A['custem_version'] = ce.__version__
A['fenics_version'] = df.__version__
A['pygimli_version'] = pg.__version__[:6]
A['comet_version'] = comet.__version__
A['python_version'] = platform.python_version()
A['os_type'] = platform.system()
A['os_release'] = platform.release()
A['os_version'] = platform.version()
A['machine'] = platform.machine()
with open(PP.export_dir + '_resource.json', "w") as outfile:
json.dump(A, outfile, indent=0)
else:
pass
lp(PP.MP.logger, 20,
'... storing resource file in export directory ...',
pre_dash=False)
[docs]
def release_memory():
"""
Release unnecessary blocked memory, work in progress.
"""
gc.collect()