Source code for custEM.misc.profiler

# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""

import dolfin as df
import pygimli as pg
import comet
import custEM as ce

import numpy as np
import json
import platform
import os
import gc
import sys
import resource
#import psutil
import logging
from mpi4py import MPI
from custEM.misc import logger_print as lp


"""
Utility functions used for analyzing model and computational parameters in
custEM.
"""


[docs] def get_logger(log_level, profiler, out_path): """ Initialize logger for all custEM prints during simulations. The information in the command promt will be printed corresponding to the specified *debug_level* level in the MOD class. Furthermore, a *log* file is stored in the export directory of the results if *profiler=True*, using always the *debug* level for the log files. Required arguments ------------------ - log_level, type str or int specify log_level, see MOD class description - profiler, type bool flag to set if *log*-field should be created or not - out_path, type bool path where *log*-file should be created """ # create logger logger = logging.getLogger('custEM') logger.propagate = False # create formatter and add it to the handlers formatter = logging.Formatter('%(message)s') if df.MPI.rank(df.MPI.comm_world) == 0 and len(logger.handlers) == 0: if os.path.exists(out_path + '_debug.log'): os.remove(out_path + '_debug.log') df.MPI.barrier(df.MPI.comm_world) # create console handler for logger. sh = logging.StreamHandler(stream=sys.stdout) if type(log_level) is int and log_level in [10, 20, 30, 40, 50]: ll = log_level elif type(log_level) is str and 'debug' in log_level.lower(): ll = 10 elif type(log_level) is str and 'info' in log_level.lower(): ll = 20 elif type(log_level) is str and 'warning' in log_level.lower(): ll = 30 elif type(log_level) is str and 'error' in log_level.lower(): ll = 40 elif type(log_level) is str and 'critical' in log_level.lower(): ll = 50 else: mpi_print('Fatal Error! Specify a valid log level. Aborting ...', post_dash=True) raise SystemExit logger.setLevel(ll) sh.setLevel(ll) sh.setFormatter(formatter) if len(logger.handlers) != 2: logger.addHandler(sh) else: logger.handlers[0].setLevel(ll) if profiler: # create file handler for logger. fh = logging.FileHandler(out_path + '_debug.log') fh.setLevel(level=logging.DEBUG) fh.setFormatter(formatter) if len(logger.handlers) != 2: logger.addHandler(fh) return(logger)
[docs] def max_mem(total=True, to_store=False, logger=None, fmt='GiB'): """ Print the maximum memory requirements, either for each MPI process or in total. Keyword arguments ----------------- - total=True, type bool Flag if memory consumption should be printed in total or per process - to_store = False, type bool set **True** if value should be returned """ if fmt == 'GiB': mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024**2 elif fmt == 'MiB': mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 else: mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss if total: all_mem = np.sum(np.array(MPI.COMM_WORLD.gather(mem, root=0), dtype=float)) if df.MPI.rank(df.MPI.comm_world) > 0: all_mem = 0 # fix for str(int(None)) for non-root processes else: if fmt == 'GiB': all_mem = int(all_mem) + 1 # round up to next full GB else: all_mem = all_mem if to_store: return(all_mem) else: if logger is None: if df.MPI.rank(df.MPI.comm_world) == 0: print(' - current maximum memory usage (' + fmt + ') --> ' + str(all_mem) + ' - ') else: pass else: lp(logger, 20, ' - current maximum memory usage (GiB) --> ' + str(all_mem) + ' - ', pre_dash=False) else: mem = int(mem) + 1 # round up to next full GB if to_store: return(mem) else: if logger is None: print(' - current maximum memory usage of process ' + str(df.MPI.rank(df.MPI.comm_world)) + ' (GiB) --> ' + str(mem) + ' - ') else: lp(logger, 20, ' - current maximum memory usage of process ' + str(df.MPI.rank(df.MPI.comm_world)) + ' (GiB) --> ' + str(mem) + ' - ', pre_dash=False, root_only=False)
# def current_mem(total=True, fmt='MiB'): # if fmt != 'MiB': # print('Error! Only MiB supported as format for current_mem() function') # raise SystemExit # # return the memory usage in MB # df.MPI.barrier(df.MPI.comm_world) # process = psutil.Process(os.getpid()) # mem = process.memory_info()[0] / float(2 ** 20) # all_mem = np.sum(np.array(MPI.COMM_WORLD.gather(mem, root=0), # dtype=float)) # if total: # if df.MPI.rank(df.MPI.comm_world) == 0: # print(' - currently using {0:12.3f} MiB of memory - '.format( # all_mem)) # else: # print(' - process ' + str(df.MPI.rank(df.MPI.comm_world)) + # ' is currently using {0:12.3f} MiB of memory - '.format( # mem))
[docs] def export_config_file(PP): """ Write model parameters - mainly MP and FE instance dictionaries - to file in the specified export directory (*out_dir*) using JSON. Required arguments ------------------ - PP, type class PostProcessing instance """ if PP.MP.mpi_rank == 0: to_del = ['sigma_func', 'sigma_inv_func', 'mu_func', 'mu_inv_func', 'eps_func', 'eps_inv_func', 'dsigma_func', 'dsigma_inv_func', 'sigma0_func', 'sigma0_inv_func', 'path', 'mpi_cw', 'mpi_cs'] A = PP.MP.__dict__.copy() for item in to_del: if item in A: del A[item] if A['sigma_tensor_flag']: del A['sigma'] del A['sigma_0'] del A['sigma_air'] del A['sigma_ground'] del A['delta_sigma'] A['logger'] = PP.MP.logger.level A['topo'] = str(A['topo']) A['bc'] = PP.FE.__dict__['bc'] A['s_type'] = PP.FE.__dict__['s_type'] A['grounding'] = PP.FE.__dict__['grounding'] if PP.FE.__dict__['tx'] is None: all_tx = None else: all_tx = [] for tx in PP.FE.__dict__['tx']: if type(tx) is list: all_tx.append(tx) else: all_tx.append(tx.tolist()) A['tx'] = all_tx if type(A['currents']) is not list and PP.MP.currents is not None: A['currents'] = PP.MP.currents.tolist() A['n_tx'] = PP.FE.__dict__['n_tx'] A['origin'] = PP.FE.__dict__['origin'] A['radius'] = PP.FE.__dict__['radius'] A['start'] = PP.FE.__dict__['start'] A['stop'] = PP.FE.__dict__['stop'] A['length'] = PP.FE.__dict__['length'] A['azimuth'] = PP.FE.__dict__['azimuth'] if 'max_length' in PP.FE.__dict__: A['max_length'] = PP.FE.__dict__['max_length'] if 'n_segs' in PP.FE.__dict__: A['n_segs'] = PP.FE.__dict__['n_segs'] if type(A['omegas']) is not list: A['omegas'] = A['omegas'].tolist() if type(A['frequencies']) is not list: A['frequencies'] = A['frequencies'].tolist() with open(PP.export_dir + "_config.json", "w") as outfile: json.dump(A, outfile, indent=0) else: pass lp(PP.MP.logger, 20, '... storing parameter file in export directory ...', pre_dash=False)
[docs] def export_resource_file(PP): """ Write consumed computational resources and times to file in the specified export directory (*out_dir*) using JSON. Required arguments ------------------ - PP, type class PostProcessing instance """ A = dict() max_memory = max_mem(to_store=True) n_mpi = MPI.COMM_WORLD.size PP.FS.mesh.init_global(1) PP.FS.mesh.init_global(2) n_nodes = PP.FS.mesh.num_entities_global(0) n_edges = PP.FS.mesh.num_entities_global(1) n_faces = PP.FS.mesh.num_entities_global(2) n_cells = PP.FS.mesh.num_entities_global(3) all_cells = [cell for cell in df.cells(PP.FS.mesh)] volumes = np.array([cell.volume() for cell in all_cells]) radius_ratios = np.array([cell.radius_ratio() for cell in all_cells]) if PP.MP.mpi_rank == 0: try: n_threads = int(os.environ['OMP_NUM_THREADS']) except KeyError: n_threads = 0 A['max_mem'] = max_memory A['assembly_time'] = PP.FE.assembly_time A['solution_time'] = PP.FS.solution_time A['system_size'] = PP.FE.system_size A['omp_threads'] = n_threads A['mpi_procs'] = n_mpi A['nodes'] = n_nodes A['edges'] = n_edges A['faces'] = n_faces A['cells'] = n_cells A['dof'] = PP.FE.system_size A['min_volume'] = np.min(volumes) A['median_volume'] = np.median(volumes) A['max_volume'] = np.max(volumes) A['min_rratio'] = np.min(radius_ratios) A['median_rratio'] = np.median(radius_ratios) A['max_rratio'] = np.max(radius_ratios) A['custem_version'] = ce.__version__ A['fenics_version'] = df.__version__ A['pygimli_version'] = pg.__version__[:6] A['comet_version'] = comet.__version__ A['python_version'] = platform.python_version() A['os_type'] = platform.system() A['os_release'] = platform.release() A['os_version'] = platform.version() A['machine'] = platform.machine() with open(PP.export_dir + '_resource.json', "w") as outfile: json.dump(A, outfile, indent=0) else: pass lp(PP.MP.logger, 20, '... storing resource file in export directory ...', pre_dash=False)
[docs] def release_memory(): """ Release unnecessary blocked memory, work in progress. """ gc.collect()