Source code for custEM.misc.misc

# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""

import numpy as np
import dolfin as df
import os
from mpi4py import MPI
from pathlib import Path
import sys
from glob import glob


"""
Miscellaneous utility functions used in custEM.
"""


[docs] def dump_csr(fname, values, column_indices, row_pointers, size): """ Utility function that dumps a PETSCMatrix as a sparse csr-matrix to the hard disc. Required arguments ------------------ - fname, type str export name of the matrix without the suffix **'.csr'** - values, type array values of matrix elements - column_indices, type array column indices - row_pointers, type array row_pointers - size, type int size (length) of the square matrix """ ofile = fname + ".csr" fhdl = open(ofile, 'w') fhdl.write(str(size)) fhdl.write('\n') fhdl.write(str(column_indices.shape[0])) fhdl.write('\n') for i in row_pointers: fhdl.write(str(i+1)) fhdl.write('\n') for j in column_indices: fhdl.write(str(j+1)) fhdl.write('\n') for m in values: fhdl.write(str(m)) fhdl.write('\n') fhdl.close() print('Written: {}'.format(ofile))
[docs] def logger_print(logger, lvl, string, val=None, pre_dash=True, post_dash=False, barrier=True, root_only=True): """ Utility function that prints only from root process in mpi mode. Required arguments ------------------ - logger, type logger from logging module default *custEM* logger - lvl, type int debug_level for custEM code - string, type str string that should be printed Keyword arguments ----------------- - val = None, various types a value that should be printed after the "message" - pre_dash = True, type bool flag that controls if a dashed line shoul be printed before the "message" - post_dash = False, type bool flag that controls if a dashed line shoul be printed before the "message" - barrier = True, type bool enable or disable multi-processing barrier (synchronize mpi processes) - root_only = True, type bool log "message" only with root process or each single one """ if barrier: df.MPI.barrier(df.MPI.comm_world) mpi_size = 1 if not root_only: mpi_size = df.MPI.size(df.MPI.comm_world) for pi in range(mpi_size): if df.MPI.rank(df.MPI.comm_world) == pi: if val is not None: string += ' --> ' + str(val) if lvl == 50: if pre_dash: logger.critical('=' * 80) logger.critical(string) if post_dash: logger.critical('=' * 80) elif lvl == 40: if pre_dash: logger.error('=' * 80) logger.error(string) if post_dash: logger.error('=' * 80) elif lvl == 30: if pre_dash: logger.warning('=' * 80) logger.warning(string) if post_dash: logger.warning('=' * 80) elif lvl == 20: if pre_dash: logger.info('=' * 80) logger.info(string) if post_dash: logger.info('=' * 80) elif lvl == 10: if pre_dash: logger.debug('=' * 80) logger.debug(string) if post_dash: logger.debug('=' * 80) else: pass if barrier: df.MPI.barrier(df.MPI.comm_world)
[docs] def mpi_print(string, val=None, pre_dash=True, post_dash=False, barrier=True): """ Utility function that prints only from root process in mpi mode. Required arguments ------------------ - string, type str string that should be printed Keyword arguments ----------------- - val = None, various types a value that should be printed after the "message" - pre_dash = True, type bool Flag that controls if a dashed line shoul be printed before the "message" - post_dash = False, type bool Flag that controls if a dashed line shoul be printed before the "message" - barrier = True, type bool Enable or disable multi-processing barrier (synchronize all mpi processes) """ if barrier: df.MPI.barrier(df.MPI.comm_world) if df.MPI.rank(df.MPI.comm_world) == 0: if pre_dash: print('=' * 80) if val is None: print(string) else: print(string, val) if post_dash: print('=' * 80) else: pass if barrier: df.MPI.barrier(df.MPI.comm_world)
[docs] def root_write(writing_function): """ Utility function that stores data to file only from root process in mpi mode. Required arguments ------------------ - writing_function, type python function a function for writing e.g. data or json files """ if MPI.COMM_WORLD.Get_rank() == 0: writing_function() else: pass
[docs] def run_serial(script_name, *serial_args): """ Utility function that calls a script in serial from root process in mpi mode. Required arguments ------------------ - script_name, type str name of the python script that should be called in serial - *serial_args, type python list list of input console arguments for the python script """ mpi_print('... computation in serial started ...') str_list = [script_name] for arg in serial_args: str_list.extend(arg) if MPI.COMM_WORLD.Get_rank() == 0: ic = MPI.COMM_SELF.Spawn(sys.executable, args=str_list, maxprocs=1) ic.Barrier() ic.Disconnect() else: ic = None mpi_print('... time to continue in parallel ...', pre_dash=False)
[docs] def run_multiple_serial(script_name, *serial_args): """ Utility function which is nearly identical to **run_serial**. The only difference is that a communicator object **ic** is returned that can be used for multi-threading of several serial runs. A synchronisation with all communicators can be foreced from the root process in mpi mode by collecting all communicator objects in a list and by using the "Barrier" and "Disconnect" mpi4py methods before continuing. Required arguments ------------------ - script_name, type str name of the python script that should be called in serial - *serial_args, type python list list of input console arguments for the python script Returns ------- - ic, type MPI intercommunicator object communicator object to communicate with spawned process """ string_list = [script_name] for arg in serial_args: string_list.extend(arg) if MPI.COMM_WORLD.Get_rank() == 0: ic = MPI.COMM_SELF.Spawn(sys.executable, args=string_list, maxprocs=1) else: ic = None return(ic)
[docs] def check_approach_and_file_format(approach, file_format, path, logger): """ Utility function that checks if a valid modeling approach is chosen (avoid typos) and if HFD5 format is supported. Required arguments ------------------ - approach, type str approach specification (see **MOD** class) - file_format, type str or None if None, used *h5* if supported, otherwise xml for data export - path, type str path to *custEM.misc* directory, forwarded internally Returns ------- - 'h5 or 'xml', type str file format specification """ dc_flag, mt_flag, td_flag, fd_flag = False, False, False, False # set boolean falgs to simplify handling the different appraoch types if approach in ['E_IE', 'E_RA', 'E_FT']: td_flag = True elif 'mt' in approach.lower(): mt_flag = True elif '_t' in approach or '_s' in approach: fd_flag = True elif 'dc' in approach.lower(): dc_flag = True if '_t' not in approach and \ '_s' not in approach and \ '_IE' not in approach and \ '_RA' not in approach and \ '_FT' not in approach and \ 'MT' not in approach and\ 'DC' not in approach: logger_print(logger, 50, 'Error! Use approach with "_t" or "_s" ' 'subscript for the "approach" argument for either "total"' ' or "secondary" field formulation in frequency-domain ' 'modeling, respectively!\n' 'Alternatively, "E_IE", "E_FT" or "E_RA" are valid ' 'arguments for time-domain simulations and "MT" or "DC" ' 'for natural source EM or direct current field modeling.' ' Aborting ...') raise SystemExit if approach not in ['E_IE', 'E_RA', 'E_FT', 'DC', 'MT']: if 'H' not in approach and 'E' not in approach and 'Am' not in \ approach and 'An' not in approach and 'Fm' not in approach and \ 'MT' not in approach: logger_print(logger, 50, 'Error! Specify a valid frequency-domain approach by ' 'choosing either "E", "H", "Am", "An" or "Fm" as ' 'total ("_t") or secondary ("_s") field formulation. ' 'Aborting ...') raise SystemExit if file_format is None: try: df.HDF5File(df.MPI.comm_world, path + 'dummy_hdf5.h5', 'r') return('h5', dc_flag, mt_flag, td_flag, fd_flag) except RuntimeError: logger_print(logger, 20, ' - using "xml" file format since parallel *h5* ' 'appears to be not supported in your custEM ' 'installation - ') return('xml', dc_flag, mt_flag, td_flag, fd_flag) else: return(file_format, dc_flag, mt_flag, td_flag, fd_flag)
[docs] def check_if_model_exists(out_name, overwrite, load_existing, logger=None): """ Utility function that checks if a finite element model already exists, which can either be overwritten or the calculation can be aborted. Required arguments ------------------ - See **'MOD'** class description """ if glob(out_name + '*'): if overwrite is False: if load_existing is False: if type(logger) is not int: logger_print( logger, 50, 'Error! The specified model:\n' + 'out_name = ' + out_name + ' already exists.\nConsider overwriting existing ' 'models by setting the parameter "overwrite_results" ' 'to **True**. Aborting ...', post_dash=True) else: print('Error! Model exists and *overwrite_results* is ' 'disabled. Aborting ...') print('=' * 80) raise SystemExit else: if type(logger) is not int: logger_print(logger, 20, '... load existing model results ...') else: if logger < 21: print('... load existing model results ...') elif overwrite is True and load_existing is True: if type(logger) is not int: logger_print( logger, 50, 'Error! Overwriting an existing model by setting ' 'the parameter\n*overwrite_results* to **True** and ' 'loading the existing results\nof this model at the same ' 'time by setting\n*load_existing* to **True** makes ' 'no sense. Aborting ...', post_dash=False) # deprecated, could be probably removed but does not hurt else: print('Error! setting both, *overwrite_results* and ' '*load_existing*, to **True** is not allowed. ' 'Aborting ...') raise SystemExit else: if type(logger) is not int: logger_print( logger, 30, 'Warning! Overwriting existing model solutions if ' 'available. Continuing ...', pre_dash=False) else: if logger < 21: print('... overwriting existing model solutions if ' 'available ...') print('=' * 80) else: print(out_name) if overwrite is False and load_existing is True: if type(logger) is not int: logger_print( logger, 50, 'Error! The specified model could not be found. ' 'Check if the correct "mod" name is specified ' 'for the import and if the desired model was calcualted. ' 'Aborting ...', post_dash=True) # deprecated, could be probably removed but does not hurt else: print('Error! The specified model could not be found.') print('=' * 80) raise SystemExit
[docs] def read_paths(file_name): """ Utility function that imports the default **'r_dir'** and **'m_dir'** attributes for the **'MOD'** instance from file. Note, these attributes are always overwritten if the keyword arguments **'r_dir'** or **'m_dir'** are set manually by initializing the **'MOD'** instance with theses keyword arguments. Required arguments ------------------ file_name, type str file name of the file which contains the paths, forwarded internally Returns ------- - results_dir, type str main results directory - mesh_dir, type str main mesh directory """ results_dir = 'results' mesh_dir = 'meshes' # deprecated version which became suddenly bugged (paths.dat suddenly # missing even though existent in gitlab repository - super strange) # with open(file_name) as f: # for line in f: # a = ([k for k in range(len(line)) if line.startswith("'", k) or # line.startswith('"', k)]) # r = line.find('results_dir') # if r is not -1: # results_dir = line[a[0] + 1:a[1]] # m = line.find('mesh_dir') # if m is not -1: # mesh_dir = line[a[0] + 1:a[1]] return(results_dir, mesh_dir)
[docs] def write_h5(mpi_comm, var, fname, counter=None, new=True, append=False, close=True, f=None, ri='data'): """ Utility function to write data files in parallel in 'h5' format. Required arguments ------------------ - mpi_cw, type MPI communicator communicator attributes for internal usage, FEniCS version dependent - var, type dolfin function data which should be exported, e.g., the E-field solution *E_t_r* - fname, type str destination file name, containing also the export path Keyword arguments ----------------- - counter = None, type int specify the index of the data functions if more than one solution (e.g., multiple Tx or times) should be stored in the same *h5* file. - new = True, type bool open new *h5* file, otherwise, continue writing into *f* - close = True, type bool close *h5* file or leave it open for writing further solutions - f = None, type str filename to continue writing in an opened, existing file; this makes only sense if *counter* is not *None* or *0* - ri = 'data', type str specify a tag to name the data function in the *h5* file container; e.g., *'real'* and *'imag'* can be used to store both parts of the complex field in one file, reducing the I/O overhead Returns ------- - f, type str file name of export file if not closed """ if new: f = df.HDF5File(mpi_comm, fname, "w") if append: try: f = df.HDF5File(mpi_comm, fname, "a") except RuntimeError: f = df.HDF5File(mpi_comm, fname, "w") if counter is None: f.write(var, ri) else: f.write(var, ri, counter) if close: f.close() else: if new or append: return(f)
[docs] def read_h5(mpi_comm, var, fname, counter=None, ri='data'): """ Utility function to write data files in parallel in *h5* format. Required arguments ------------------ - mpi_cw, type MPI communicator communicator attributes for internal usage, FEniCS version dependent - var, type dolfin function data which should be imported, e.g., the E-field solution *E_t_r* - fname, type str destination file name, containing also the export path Keyword arguments ----------------- - counter = None, type int specify the index of the data functions if more than one solution (e.g., multiple Tx or times) should be imported from the same *h5* file - ri = 'data', type str specify a tag to name the data function in the *h5* file container; e.g., *'real'* and *'imag'* can be used to access both parts of the complex field in the same file, reducing the I/O overhead """ f = df.HDF5File(mpi_comm, fname, "r") if counter is None: f.read(var, ri) else: f.read(var, ri + "/vector_%d" % counter) f.close()
[docs] def specify_td_export_strings(FE): """ Utility function for specifying time-domain export names. Required arguments ------------------ - FE, type class FE approach instance (e.g., **E_vector**) Returns ------- - E_str, H_str, mode, type str data name suffixes to differ between shut-on and shut-off processes and impulse or step-response modeling with the implicite Euler method """ if FE.shut_off: mode_e = 'shut_off_' else: mode_e = 'shut_on_' mode_h = '' E_str, H_str = 'E', 'dH' if 'IE' in FE.FS.approach: if FE.source_type == 'j': E_str, H_str = 'E_int', 'H' if FE.shut_off: mode_h = 'shut_off_' else: mode_h = 'shut_on_' return(E_str, H_str, mode_e, mode_h)
[docs] def petsc_transfer_function(source_func, target_func, cache=False, q=None): """ Build Petsc transfer matrix to interpolate between different meshes in parallel. Note, this function will crash without error notice if there are more mpi processes used than there are dof in the target mesh. Required arguments ------------------ - source_func, type dolfin function data function on source interpolation mesh - target func, tpye dolfin function data function on target interpolation mesh Keyword arguments ----------------- - fi = 0, type int iteration index over frequencies """ if q is None: source_func.set_allow_extrapolation(True) q = df.PETScDMCollection.create_transfer_matrix( source_func.ufl_function_space(), target_func.ufl_function_space()) target_func.vector()[:] = q * source_func.vector() target_func.vector().apply('insert') if cache: return(q)
[docs] def make_directories(r_dir, m_dir, approach, para_dir=None, m_dir_only=False): """ Initialize the export directories if not existing. Required arguments ------------------ - see **MOD** class description Keyword arguments ----------------- - para_dir = None, type str if *None*, the default para_dir is '*m_dir*/_para'; otherwise, the parameter directory will be initializied according to specified path - m_dir_only = False, type bool initialize only the main mesh directory or also the main results directory """ if MPI.COMM_WORLD.Get_rank() == 0: if not m_dir_only: Path(r_dir).joinpath(approach).mkdir(exist_ok=True, parents=True) Path(r_dir).joinpath('primary_fields/custom').mkdir( exist_ok=True, parents=True) Path(r_dir).joinpath('primary_fields/default').mkdir( exist_ok=True, parents=True) Path(r_dir).joinpath('primary_fields/fullspace').mkdir( exist_ok=True, parents=True) Path(m_dir).joinpath('_h5').mkdir(exist_ok=True, parents=True) Path(m_dir).joinpath('_xml').mkdir(exist_ok=True, parents=True) Path(m_dir).joinpath('_mesh').mkdir(exist_ok=True, parents=True) Path(m_dir).joinpath('_vtk').mkdir(exist_ok=True, parents=True) Path(m_dir).joinpath('lines').mkdir(exist_ok=True, parents=True) Path(m_dir).joinpath('paths').mkdir(exist_ok=True, parents=True) Path(m_dir).joinpath('slices').mkdir(exist_ok=True, parents=True) if para_dir is None: Path(m_dir).joinpath('para').mkdir(exist_ok=True, parents=True) else: pass
[docs] def smape(d1, d2): """ Calculate symmetric normalized """ return(200. * np.abs(d1 - d2)/(np.abs(d2) + np.abs(d1)))
[docs] def resort_coordinates(c1, c2): """ Resort two arrays with overall identical row entries but different order. Required arguments ------------------ - c1/c2, type numpy_arrays two data arrays of shape (n, 3) Returns ------- - c1_sort and c2_sort, type numpy_arrays index arrays which can be used to resort data from array 1 to array 2 and vice versa. """ def indexAndSorting(c_): posview = c_.ravel().view( np.dtype((np.void, c_.dtype.itemsize*c_.shape[1]))) _, unique_idx = np.unique(posview, return_index=True) return unique_idx, unique_idx.argsort() c1_idx, c1_sort = indexAndSorting(c1) c2_idx, c2_sort = indexAndSorting(c2) return c1_idx[c2_sort], c2_idx[c1_sort]
[docs] def get_coordinates(FS): """ Return coordinates of a lagrange VectorFunctionSpace as numpy array. Required arguments ------------------ - FS, type class FunctionSpaces instance Returns ------- - xyz, type numpy array array or dof coordinates with shape (n, 3) """ xyz = FS.V_cg.tabulate_dof_coordinates().reshape(-1, 3)[0::3, :] return(xyz)
[docs] def block_print(): """ Supress all prints in the command prompt until *enable_print()* is called. """ sys.stdout = open(os.devnull, 'w')
[docs] def enable_print(): """ Enable prints in the command prompt after *block_print()* was called. """ sys.stdout = sys.__stdout__
""" Utility FEniCS definitions - only required, if TEST_MODE is used; deprecated. """ try:
[docs] class Air(df.SubDomain):
[docs] def inside(self, x, on_boundary): return x[2] > - 1e-2
[docs] class Ground(df.SubDomain):
[docs] def inside(self, x, on_boundary): return x[2] < 1e-2
[docs] class DirichletBoundary(df.SubDomain):
[docs] def inside(self, x, on_boundary): return on_boundary
except: pass