# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""
import numpy as np
import dolfin as df
import os
from mpi4py import MPI
from pathlib import Path
import sys
from glob import glob
"""
Miscellaneous utility functions used in custEM.
"""
[docs]
def dump_csr(fname, values, column_indices, row_pointers, size):
"""
Utility function that dumps a PETSCMatrix as a sparse csr-matrix to
the hard disc.
Required arguments
------------------
- fname, type str
export name of the matrix without the suffix **'.csr'**
- values, type array
values of matrix elements
- column_indices, type array
column indices
- row_pointers, type array
row_pointers
- size, type int
size (length) of the square matrix
"""
ofile = fname + ".csr"
fhdl = open(ofile, 'w')
fhdl.write(str(size))
fhdl.write('\n')
fhdl.write(str(column_indices.shape[0]))
fhdl.write('\n')
for i in row_pointers:
fhdl.write(str(i+1))
fhdl.write('\n')
for j in column_indices:
fhdl.write(str(j+1))
fhdl.write('\n')
for m in values:
fhdl.write(str(m))
fhdl.write('\n')
fhdl.close()
print('Written: {}'.format(ofile))
[docs]
def logger_print(logger, lvl, string, val=None, pre_dash=True,
post_dash=False, barrier=True, root_only=True):
"""
Utility function that prints only from root process in mpi mode.
Required arguments
------------------
- logger, type logger from logging module
default *custEM* logger
- lvl, type int
debug_level for custEM code
- string, type str
string that should be printed
Keyword arguments
-----------------
- val = None, various types
a value that should be printed after the "message"
- pre_dash = True, type bool
flag that controls if a dashed line shoul be printed before the
"message"
- post_dash = False, type bool
flag that controls if a dashed line shoul be printed before the
"message"
- barrier = True, type bool
enable or disable multi-processing barrier (synchronize mpi processes)
- root_only = True, type bool
log "message" only with root process or each single one
"""
if barrier:
df.MPI.barrier(df.MPI.comm_world)
mpi_size = 1
if not root_only:
mpi_size = df.MPI.size(df.MPI.comm_world)
for pi in range(mpi_size):
if df.MPI.rank(df.MPI.comm_world) == pi:
if val is not None:
string += ' --> ' + str(val)
if lvl == 50:
if pre_dash:
logger.critical('=' * 80)
logger.critical(string)
if post_dash:
logger.critical('=' * 80)
elif lvl == 40:
if pre_dash:
logger.error('=' * 80)
logger.error(string)
if post_dash:
logger.error('=' * 80)
elif lvl == 30:
if pre_dash:
logger.warning('=' * 80)
logger.warning(string)
if post_dash:
logger.warning('=' * 80)
elif lvl == 20:
if pre_dash:
logger.info('=' * 80)
logger.info(string)
if post_dash:
logger.info('=' * 80)
elif lvl == 10:
if pre_dash:
logger.debug('=' * 80)
logger.debug(string)
if post_dash:
logger.debug('=' * 80)
else:
pass
if barrier:
df.MPI.barrier(df.MPI.comm_world)
[docs]
def mpi_print(string, val=None, pre_dash=True, post_dash=False, barrier=True):
"""
Utility function that prints only from root process in mpi mode.
Required arguments
------------------
- string, type str
string that should be printed
Keyword arguments
-----------------
- val = None, various types
a value that should be printed after the "message"
- pre_dash = True, type bool
Flag that controls if a dashed line shoul be printed before the
"message"
- post_dash = False, type bool
Flag that controls if a dashed line shoul be printed before the
"message"
- barrier = True, type bool
Enable or disable multi-processing barrier (synchronize all mpi
processes)
"""
if barrier:
df.MPI.barrier(df.MPI.comm_world)
if df.MPI.rank(df.MPI.comm_world) == 0:
if pre_dash:
print('=' * 80)
if val is None:
print(string)
else:
print(string, val)
if post_dash:
print('=' * 80)
else:
pass
if barrier:
df.MPI.barrier(df.MPI.comm_world)
[docs]
def root_write(writing_function):
"""
Utility function that stores data to file only from root process
in mpi mode.
Required arguments
------------------
- writing_function, type python function
a function for writing e.g. data or json files
"""
if MPI.COMM_WORLD.Get_rank() == 0:
writing_function()
else:
pass
[docs]
def run_serial(script_name, *serial_args):
"""
Utility function that calls a script in serial from root process in
mpi mode.
Required arguments
------------------
- script_name, type str
name of the python script that should be called in serial
- *serial_args, type python list
list of input console arguments for the python script
"""
mpi_print('... computation in serial started ...')
str_list = [script_name]
for arg in serial_args:
str_list.extend(arg)
if MPI.COMM_WORLD.Get_rank() == 0:
ic = MPI.COMM_SELF.Spawn(sys.executable, args=str_list, maxprocs=1)
ic.Barrier()
ic.Disconnect()
else:
ic = None
mpi_print('... time to continue in parallel ...', pre_dash=False)
[docs]
def run_multiple_serial(script_name, *serial_args):
"""
Utility function which is nearly identical to **run_serial**.
The only difference is that a communicator object **ic** is returned
that can be used for multi-threading of several serial runs.
A synchronisation with all communicators can be foreced from the root
process in mpi mode by collecting all communicator objects in a list and
by using the "Barrier" and "Disconnect" mpi4py methods before continuing.
Required arguments
------------------
- script_name, type str
name of the python script that should be called in serial
- *serial_args, type python list
list of input console arguments for the python script
Returns
-------
- ic, type MPI intercommunicator object
communicator object to communicate with spawned process
"""
string_list = [script_name]
for arg in serial_args:
string_list.extend(arg)
if MPI.COMM_WORLD.Get_rank() == 0:
ic = MPI.COMM_SELF.Spawn(sys.executable, args=string_list, maxprocs=1)
else:
ic = None
return(ic)
[docs]
def check_if_model_exists(out_name, overwrite, load_existing, logger=None):
"""
Utility function that checks if a finite element model already exists,
which can either be overwritten or the calculation can be aborted.
Required arguments
------------------
- See **'MOD'** class description
"""
if glob(out_name + '*'):
if overwrite is False:
if load_existing is False:
if type(logger) is not int:
logger_print(
logger, 50,
'Error! The specified model:\n' + 'out_name = ' +
out_name +
' already exists.\nConsider overwriting existing '
'models by setting the parameter "overwrite_results" '
'to **True**. Aborting ...', post_dash=True)
else:
print('Error! Model exists and *overwrite_results* is '
'disabled. Aborting ...')
print('=' * 80)
raise SystemExit
else:
if type(logger) is not int:
logger_print(logger, 20,
'... load existing model results ...')
else:
if logger < 21:
print('... load existing model results ...')
elif overwrite is True and load_existing is True:
if type(logger) is not int:
logger_print(
logger, 50,
'Error! Overwriting an existing model by setting '
'the parameter\n*overwrite_results* to **True** and '
'loading the existing results\nof this model at the same '
'time by setting\n*load_existing* to **True** makes '
'no sense. Aborting ...', post_dash=False)
# deprecated, could be probably removed but does not hurt
else:
print('Error! setting both, *overwrite_results* and '
'*load_existing*, to **True** is not allowed. '
'Aborting ...')
raise SystemExit
else:
if type(logger) is not int:
logger_print(
logger, 30,
'Warning! Overwriting existing model solutions if '
'available. Continuing ...', pre_dash=False)
else:
if logger < 21:
print('... overwriting existing model solutions if '
'available ...')
print('=' * 80)
else:
print(out_name)
if overwrite is False and load_existing is True:
if type(logger) is not int:
logger_print(
logger, 50,
'Error! The specified model could not be found. '
'Check if the correct "mod" name is specified '
'for the import and if the desired model was calcualted. '
'Aborting ...', post_dash=True)
# deprecated, could be probably removed but does not hurt
else:
print('Error! The specified model could not be found.')
print('=' * 80)
raise SystemExit
[docs]
def read_paths(file_name):
"""
Utility function that imports the default **'r_dir'** and **'m_dir'**
attributes for the **'MOD'** instance from file.
Note, these attributes are always overwritten if the keyword arguments
**'r_dir'** or **'m_dir'** are set manually by initializing the **'MOD'**
instance with theses keyword arguments.
Required arguments
------------------
file_name, type str
file name of the file which contains the paths, forwarded internally
Returns
-------
- results_dir, type str
main results directory
- mesh_dir, type str
main mesh directory
"""
results_dir = 'results'
mesh_dir = 'meshes'
# deprecated version which became suddenly bugged (paths.dat suddenly
# missing even though existent in gitlab repository - super strange)
# with open(file_name) as f:
# for line in f:
# a = ([k for k in range(len(line)) if line.startswith("'", k) or
# line.startswith('"', k)])
# r = line.find('results_dir')
# if r is not -1:
# results_dir = line[a[0] + 1:a[1]]
# m = line.find('mesh_dir')
# if m is not -1:
# mesh_dir = line[a[0] + 1:a[1]]
return(results_dir, mesh_dir)
[docs]
def write_h5(mpi_comm, var, fname, counter=None,
new=True, append=False, close=True, f=None, ri='data'):
"""
Utility function to write data files in parallel in 'h5' format.
Required arguments
------------------
- mpi_cw, type MPI communicator
communicator attributes for internal usage, FEniCS version dependent
- var, type dolfin function
data which should be exported, e.g., the E-field solution *E_t_r*
- fname, type str
destination file name, containing also the export path
Keyword arguments
-----------------
- counter = None, type int
specify the index of the data functions if more than one solution
(e.g., multiple Tx or times) should be stored in the same *h5* file.
- new = True, type bool
open new *h5* file, otherwise, continue writing into *f*
- close = True, type bool
close *h5* file or leave it open for writing further solutions
- f = None, type str
filename to continue writing in an opened, existing file; this makes
only sense if *counter* is not *None* or *0*
- ri = 'data', type str
specify a tag to name the data function in the *h5* file container;
e.g., *'real'* and *'imag'* can be used to store both parts of the
complex field in one file, reducing the I/O overhead
Returns
-------
- f, type str
file name of export file if not closed
"""
if new:
f = df.HDF5File(mpi_comm, fname, "w")
if append:
try:
f = df.HDF5File(mpi_comm, fname, "a")
except RuntimeError:
f = df.HDF5File(mpi_comm, fname, "w")
if counter is None:
f.write(var, ri)
else:
f.write(var, ri, counter)
if close:
f.close()
else:
if new or append:
return(f)
[docs]
def read_h5(mpi_comm, var, fname, counter=None, ri='data'):
"""
Utility function to write data files in parallel in *h5* format.
Required arguments
------------------
- mpi_cw, type MPI communicator
communicator attributes for internal usage, FEniCS version dependent
- var, type dolfin function
data which should be imported, e.g., the E-field solution *E_t_r*
- fname, type str
destination file name, containing also the export path
Keyword arguments
-----------------
- counter = None, type int
specify the index of the data functions if more than one solution
(e.g., multiple Tx or times) should be imported from the same *h5*
file
- ri = 'data', type str
specify a tag to name the data function in the *h5* file container;
e.g., *'real'* and *'imag'* can be used to access both parts of
the complex field in the same file, reducing the I/O overhead
"""
f = df.HDF5File(mpi_comm, fname, "r")
if counter is None:
f.read(var, ri)
else:
f.read(var, ri + "/vector_%d" % counter)
f.close()
[docs]
def specify_td_export_strings(FE):
"""
Utility function for specifying time-domain export names.
Required arguments
------------------
- FE, type class
FE approach instance (e.g., **E_vector**)
Returns
-------
- E_str, H_str, mode, type str
data name suffixes to differ between shut-on and shut-off processes
and impulse or step-response modeling with the implicite Euler method
"""
if FE.shut_off:
mode_e = 'shut_off_'
else:
mode_e = 'shut_on_'
mode_h = ''
E_str, H_str = 'E', 'dH'
if 'IE' in FE.FS.approach:
if FE.source_type == 'j':
E_str, H_str = 'E_int', 'H'
if FE.shut_off:
mode_h = 'shut_off_'
else:
mode_h = 'shut_on_'
return(E_str, H_str, mode_e, mode_h)
[docs]
def petsc_transfer_function(source_func, target_func, cache=False, q=None):
"""
Build Petsc transfer matrix to interpolate between different meshes
in parallel.
Note, this function will crash without error notice if there are more
mpi processes used than there are dof in the target mesh.
Required arguments
------------------
- source_func, type dolfin function
data function on source interpolation mesh
- target func, tpye dolfin function
data function on target interpolation mesh
Keyword arguments
-----------------
- fi = 0, type int
iteration index over frequencies
"""
if q is None:
source_func.set_allow_extrapolation(True)
q = df.PETScDMCollection.create_transfer_matrix(
source_func.ufl_function_space(),
target_func.ufl_function_space())
target_func.vector()[:] = q * source_func.vector()
target_func.vector().apply('insert')
if cache:
return(q)
[docs]
def make_directories(r_dir, m_dir, approach, para_dir=None, m_dir_only=False):
"""
Initialize the export directories if not existing.
Required arguments
------------------
- see **MOD** class description
Keyword arguments
-----------------
- para_dir = None, type str
if *None*, the default para_dir is '*m_dir*/_para'; otherwise, the
parameter directory will be initializied according to specified path
- m_dir_only = False, type bool
initialize only the main mesh directory or also the main results
directory
"""
if MPI.COMM_WORLD.Get_rank() == 0:
if not m_dir_only:
Path(r_dir).joinpath(approach).mkdir(exist_ok=True, parents=True)
Path(r_dir).joinpath('primary_fields/custom').mkdir(
exist_ok=True, parents=True)
Path(r_dir).joinpath('primary_fields/default').mkdir(
exist_ok=True, parents=True)
Path(r_dir).joinpath('primary_fields/fullspace').mkdir(
exist_ok=True, parents=True)
Path(m_dir).joinpath('_h5').mkdir(exist_ok=True, parents=True)
Path(m_dir).joinpath('_xml').mkdir(exist_ok=True, parents=True)
Path(m_dir).joinpath('_mesh').mkdir(exist_ok=True, parents=True)
Path(m_dir).joinpath('_vtk').mkdir(exist_ok=True, parents=True)
Path(m_dir).joinpath('lines').mkdir(exist_ok=True, parents=True)
Path(m_dir).joinpath('paths').mkdir(exist_ok=True, parents=True)
Path(m_dir).joinpath('slices').mkdir(exist_ok=True, parents=True)
if para_dir is None:
Path(m_dir).joinpath('para').mkdir(exist_ok=True, parents=True)
else:
pass
[docs]
def smape(d1, d2):
"""
Calculate symmetric normalized
"""
return(200. * np.abs(d1 - d2)/(np.abs(d2) + np.abs(d1)))
[docs]
def resort_coordinates(c1, c2):
"""
Resort two arrays with overall identical row entries but different order.
Required arguments
------------------
- c1/c2, type numpy_arrays
two data arrays of shape (n, 3)
Returns
-------
- c1_sort and c2_sort, type numpy_arrays
index arrays which can be used to resort data from array 1 to array 2
and vice versa.
"""
def indexAndSorting(c_):
posview = c_.ravel().view(
np.dtype((np.void, c_.dtype.itemsize*c_.shape[1])))
_, unique_idx = np.unique(posview, return_index=True)
return unique_idx, unique_idx.argsort()
c1_idx, c1_sort = indexAndSorting(c1)
c2_idx, c2_sort = indexAndSorting(c2)
return c1_idx[c2_sort], c2_idx[c1_sort]
[docs]
def get_coordinates(FS):
"""
Return coordinates of a lagrange VectorFunctionSpace as numpy array.
Required arguments
------------------
- FS, type class
FunctionSpaces instance
Returns
-------
- xyz, type numpy array
array or dof coordinates with shape (n, 3)
"""
xyz = FS.V_cg.tabulate_dof_coordinates().reshape(-1, 3)[0::3, :]
return(xyz)
[docs]
def block_print():
"""
Supress all prints in the command prompt until *enable_print()* is called.
"""
sys.stdout = open(os.devnull, 'w')
[docs]
def enable_print():
"""
Enable prints in the command prompt after *block_print()* was called.
"""
sys.stdout = sys.__stdout__
"""
Utility FEniCS definitions - only required, if TEST_MODE is used; deprecated.
"""
try:
[docs]
class Air(df.SubDomain):
[docs]
def inside(self, x, on_boundary):
return x[2] > - 1e-2
[docs]
class Ground(df.SubDomain):
[docs]
def inside(self, x, on_boundary):
return x[2] < 1e-2
[docs]
class DirichletBoundary(df.SubDomain):
[docs]
def inside(self, x, on_boundary):
return on_boundary
except:
pass