Source code for custEM.fem.fem_utils

# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""

import dolfin as df
import numpy as np
from custEM.misc import logger_print as lp
from custEM.misc import write_h5
from custEM.misc import export_config_file
import os


"""
Definiton of base classes initializing fem-realted parameters and defining
fem-related utility functions.
"""


[docs] class ApproachBaseFD: """ Initialize FE variables for all frequency-domain approaches. All variables can be updated by specifying the corresponding keyword arguments in the *build_var_form()* method. Methods ------- - update_fem_parameters() update valid parameters of the **FE** and **PF** classes and print updated class attributes - update_tx_parameters() if *s_type='auto'*, update Tx information from imported mesh-parameter file in the **MP** instance Description of FE parameters ---------------------------- - s_type = 'auto', type str alternatives: **hed** (HED source, treated identical to **line** tx in halfspace and as infinitesimal source in fullspace with analytical solution) **vmd** (VMD source, treated as infinitesimal source in fullspace with analytical solution) **heds** (multiple HED sources, treated as infinitesimal sources in fullspace with analytical solution; specify origins with a list of 3D coordinated in the *tx* variable) **vmds** (multiple VMD sources, treated as infinitesimal sources in fullspace with analytical solution; specify origins with a list of 3D coordinated in the *tx* variable) **line** (real finite length dipole source) **loop_c** (circular loop source) **loop_r** (rectangular loop source) **path** (arbitrary sources) - tx = None, type list internal usage only; if *s_type=auto*, this variable will be updated by the information in the mesh-parameter file; otherwise, the geometry parameters will be used to define the tx - n_tx = 1, type int number of transmitters (right-hand sides of system of linear equations) - grounding = False, internal usage only and only relevant if *s_type=auto* (this variable will be updated by the information in the mesh-parameter file) - origin = [0.0, 0.0, 0.0], type list of len (3) origin of *hed* or *vmd* Tx; only valid if *s_type* is not *auto* and *n_tx=1* - length = 1., type float length of *hed* Tx; only valid if *s_type* is not *auto* and *n_tx=1* - self.azimuth = 0., type float azimuth of *hed* or *vmd* Tx in degree; only valid if *s_type* is not *auto* and *n_tx=1* - start = [-100, 0.0, 0.0], type list of len(3) start coordinate of *line* Tx or lower left corner of *loop_r* Tx; only valid if *s_type* is not *auto* and *n_tx=1*; z_value is ignored and always set to zero - stop = [100.0, 0.0, 0.0], type list of len(3) stop coordinate of *line* Tx or upper right corner of *loop_r* Tx; only valid if *s_type* is not *auto* and *n_tx=1*; z_value is ignored and always set to zero - radius = 100., type float radius of *loop_c* Tx; only valid if *s_type* is not *auto* and *n_tx=1* - pf_EH_flag = 'E', type str set to 'H' for using primary magentic fields to calculate the right-hand side source contributions in secondary field formulations - bc = None, type str specify alternative boundary conditions (see **FS** class) - quasi_static = True, type bool use diffusive (quasi static) approximation of the Helmholtz equation without displacement currents, *False* only implemented in the *E_vector* approach - ip = False, type bool set *True* to enable modeling of induced polarization effects by specifying Cole-Cole model parameters (see **MP** class), only implemented in the *E_vector* approach - tx_topo = None, type str or function specify topography (from file or as function z=f(x, y)) for searching the vertical Tx coordinates on a manually chosen topography for the total field approaches, not recommendend to be changed - a_tol = 1e-2, type float find correct source dof locations for total field formulations; might be changed but the default choice is almost always sufficient - assembly_time = 0., type float used internally, adds all assembly times to this variable during the to be stored by the *profiler* later on """ def __init__(self): """ Initialize **FE** class attributes for frequency-domain approaches. """ self.s_type = 'auto' self.tx = None self.grounding = [False] self.n_tx = 1 self.origin = [0.0, 0.0, 0.0] self.radius = 100. self.start = [-100, 0.0, 0.0] self.stop = [100.0, 0.0, 0.0] self.length = 1. self.azimuth = 0. self.pf_EH_flag = 'E' self.bc = None self.quasi_static = True self.ip = False self.tx_topo = None self.a_tol = 1e-2 self.assembly_time = 0.
[docs] def update_fem_parameters(self, fem_kwargs): """ Update and print **FE** and **PF** class attributes for frequency-domain approaches by overwriting the corresponding keyword arguments. Required arguments ------------------ fem_kwargs, type dict dictionary of keyword arguments for *build_var_form()* method """ for key in fem_kwargs: if key not in self.__dict__: if key not in ['pyhed_interp', 'ph_log_level', 'max_length', 'pyhed_drop_tol', 'force_primary', 'Assembler', 'dipole_z', 'pf_p', 'pf_name', 'procs_per_proc', 'pf_type', 'export_pvd', 'eval_tx', 'n_segs']: lp(self.MP.logger, 50, 'Error! Unknown FE parameter set', key) lp(self.MP.logger, 50, "... let's stop before something unexpected happens" " ...", pre_dash=False) raise SystemExit self.__dict__.update(fem_kwargs) if '_t' in self.MP.approach: self.mute.extend(['n_segs']) if '_s' in self.MP.approach: self.mute.extend(['tx_topo', 'a_tol']) self.update_tx_parameters() lp(self.MP.logger, 20, 'FE parameters update:') for k, v in sorted(self.__dict__.items()): if k == 'n_tx': to_print = str(v) + ', equals MP.n_tx if s_type == auto' elif k == 'tx' and v is not None: to_print = 'using ' + str(len(v)) + \ ' tx, equals MP.tx if s_type == auto' else: to_print = v if k not in self.__dict__['mute']: lp(self.MP.logger, 20, '--> {:<22}'.format(str(k)) + ' = ' + str(to_print), pre_dash=False) eps_flag = False if not self.quasi_static: lp(self.MP.logger, 20, ' - using enabled - ', post_dash=False) eps_flag=True if not 'An' in self.MP.approach: self.MP.build_dg_parameter_functions(self.FS, self.pf_EH_flag, eps_flag) if '_s' in self.MP.approach: if self.pf_EH_flag == 'E': lp(self.MP.logger, 20, ' - using primary electric fields - ', pre_dash=False) if self.pf_EH_flag == 'H': lp(self.MP.logger, 20, ' - using primary magnetic fields - ', pre_dash=False)
[docs] def update_tx_parameters(self): """ Update Tx information from imported mesh-parameter file in the **MP** instance if *s_type='auto'*. """ if self.s_type in ['path', 'vmds', 'heds']: self.n_tx = len(self.tx) self.MP.n_tx = self.n_tx if self.s_type == 'auto' and not 'mt' in self.MP.approach.lower(): self.s_type = 'path' self.tx = self.MP.tx self.n_tx = self.MP.n_tx self.grounding = self.MP.grounding if self.s_type in ['auto', 'mt'] and 'mt' in self.MP.approach.lower(): self.n_tx = self.MP.n_tx
[docs] class ApproachBaseTD: """ Initialize FE variables for all time-domain approaches. All variables can be updated by specifying the corresponding keyword arguments in the *build_var_form()* method. Methods ------- - update_fem_parameters() update valid parameters of the **FE** and **PF** classes and print updated class attributes - update_tx_parameters() if *s_type='auto'*, update Tx information from imported mesh-parameter file in the **MP** instance - calc_static_magnetic_field() calculate static magnetic field with the Biot-Savart law - biot_savart() vectorized implementation of Biot-Savart law - interpolate_static() interpolate static magentic field on specified interpolation mesh for independent post-processing purposes - calc_dc_field() calculate direct current electric field with either total or secondary potential formulation of the common potential approach - interpolate_dc() interpolate direct current field on specified interpolation mesh for independent post-processing purposes - export_interpolation_data() export interpolated static magentic field or direct current electric field results as numpy files and, if specified, in *.pvd* file format Description of FE parameters ---------------------------- Here, only parameters relevant for the time-domain approaches are listed. For all other parameters, it is referred to the documentation of the **ApproachBaseFD** class. Note that further relevant parameters that can be updated with the *build_var_form_method()* are initialized by each time-domain approach class individually. It is referred to the corresponding documentation in the *time_domain_approaches.py* file. - log_t_min = -6, type int/float specify start observation time of logarithmically divided interval for computing transients; defines the start time as 10^log_t_min - log_t_max = 0, type int/float specify stop observation time of logarithmically divided interval for computing transients; defines the stop time as 10^log_t_max - n_log_steps = 61, type int number of steps to logarithmically divide the time interval of the transient, equal to the number of results exported at the corresponding times - times = None, type list manually define export times of the transients - shut_off = True, type bool calculate either transient transmitter shut-off of shut-on response """ def __init__(self): """ Initialize **FE** class attributes for time-domain approaches. """ self.s_type = 'auto' self.tx = None self.grounding = False self.n_tx = 1 self.origin = [0.0, 0.0, 0.0] self.radius = 100.0 self.start = [-100, 0.0, 0.0] self.stop = [100.0, 0.0, 0.0] self.length = 1. self.azimuth = 0. self.bc = None self.quasi_static = True self.ip = False self.tx_topo = None self.a_tol = 1e-2 self.assembly_time = 0. # relevant variables for all time-domain approaches self.log_t_min = -6 self.log_t_max = 0 self.n_log_steps = 61 self.times = None self.shut_off = True # only for internal usage to disable prints of the following attributes self.mute = ['DOM', 'v', 'u', 'dx', 'dx_0', 'FS', 'MP', 'mute', 'tx']
[docs] def update_fem_parameters(self, fem_kwargs): """ Update and print **FE** class attributes for time-domain approaches by overwriting the corresponding keyword arguments. Required arguments ------------------ - fem_kwargs, type dict dictionary of keyword arguments for *build_var_form()* method """ for key in fem_kwargs: if key not in self.__dict__: if key not in ['source_type', 'be_order', 'n_lin_steps', 'n_on', 't0_j', 'pulse_width', 'times', 'source_times', 'source_func', 'stepping_times', 'omegas', 'path_td', 'fd_approach', 'fd_mod_name', 'parallel_interpolation', 'interp_mesh', 'n_mult', 'n_poles', 't_tol', 'pole_shift', 'dc_field', 'frequencies']: lp(self.MP.logger, 50, 'Error! Unknown FE parameter set', key) lp(self.MP.logger, 50, "... let's stop before something unexpected happens " "...", pre_dash=False) raise SystemExit self.__dict__.update(fem_kwargs) self.update_tx_parameters() lp(self.MP.logger, 20, 'FE parameters update:') for k, v in sorted(self.__dict__.items()): if k == 'n_tx': to_print = str(v) + ', equals MP.n_tx if s_type == auto' elif k == 'tx' and v is not None: to_print = 'using ' + str(len(v)) + \ ' tx, equals MP.tx if s_type == auto' elif k == 'times': if v is not None: if len(v) > 1: to_print = 'calculating ' + str(len(v)) + ' times' else: to_print = v if k not in self.__dict__['mute']: lp(self.MP.logger, 20, '--> {:<22}'.format(str(k)) + ' = ' + str(to_print), pre_dash=False) self.MP.build_dg_parameter_functions(self.FS)
[docs] def update_tx_parameters(self): """ Update Tx information from imported mesh-parameter file in the **MP** instance if *s_type='auto'*. """ if self.s_type in ['path', 'vmds', 'heds']: self.n_tx = len(self.tx) self.MP.n_tx = self.n_tx if self.s_type == 'auto': self.s_type = 'path' self.tx = self.MP.tx self.n_tx = self.MP.n_tx self.grounding = self.MP.grounding
[docs] def calc_static_magnetic_field(self, store=False): """ Calculate static magentic fields with the Biot-Savart law. Keyword arguments ----------------- - store = False, type bool save static magnetic fields as HDF5 file """ rx = self.FS.V_cg.tabulate_dof_coordinates().reshape(-1, 3)[0::3, :] B_static = self.biot_savart(rx, np.array(self.MP.tx[0])) self.B_static = df.Function(self.FS.V_cg) self.B_static.vector()[:] = B_static.ravel() df.File('B_static.pvd') << self.B_static if store: if self.MP.n_tx != 1: lp(self.MP.logger, 30, 'Warning! Storing currently only results for first Tx of ' 'static magnetic field calculation. Continuing ...') self.E_file = write_h5( self.MP.mpi_cw, self.B_static, self.MP.out_dir + '/' + self.MP.mod_name + '_B_static.' + self.MP.file_format)
[docs] def biot_savart(self, rx, tx): """ Vectorized implementation of the Biot-Savart law. Required arguments ------------------ - rx, type list of coordinates receiver coordinates - tx, type list of coordinates list of transmitter coordinates describing the transmitter path """ nrx = len(rx[:, 0]) B = np.zeros((nrx, 3)) dl = np.diff(tx, axis=0) nl = len(dl) rc = tx[:-1, :] + 0.5 * dl B = np.zeros((nrx, 3)) for j in range(nl): r = rx - rc[j, :] norm3 = np.linalg.norm(r, axis=1)**3 db = self.MP.current * 1e-7 * np.cross(dl[j, :], r) /\ norm3[:, np.newaxis] B += db return (B)
[docs] def interpolate_static(self, interp_mesh, interp_p=None, export_pvd=True, interp_dir=None): """ Customized interpolation function to interpolate static magnetic fields on specified interpolation mesh. Required arguments ------------------ - interp_mesh, type str name of target interpolation mesh Keyword arguments ----------------- - interp_p = None, type int default is *interp_p=1*, not reasonable to be changed - export_pvd = True, type bool save fields as *.pvd* file for Paraview or not - interp_dir = None, type str specify custom interpolation directory, not recommended """ self.rank = 0 self.export_name = 'B_static_' + interp_mesh if interp_dir is not None: self.interp_dir = interp_dir if interp_p is None: interp_p = 1 if df.MPI.rank(self.MP.mpi_cw) == 0: if not os.path.isdir(self.interp_dir): os.mkdir(self.interp_dir) else: pass if interp_mesh.find('line') is not -1: self.interp_mesh_dir = self.MP.m_dir + '/lines' elif interp_mesh.find('slice') is not -1: self.interp_mesh_dir = self.MP.m_dir + '/slices' elif interp_mesh.find('path') is not -1: self.interp_mesh_dir = self.MP.m_dir + '/paths' else: lp(self.MP.logger, 50, 'Error! Interpolation target mesh name must ' 'end either with "line", "slice" or "path" so far. ' 'Aborting ...', post_dash=True) raise SystemExit if os.path.isfile(self.interp_mesh_dir + '/' + interp_mesh + '.xml') is False: lp(self.MP.logger, 30, 'Warning! Interpolation mesh "' + str(interp_mesh) + '.xml" ' 'could not be found! Continuing ...') return lp(self.MP.logger, 20, '... interpolating B_static on "' + interp_mesh + '" in serial ...', pre_dash=False, barrier=False) self.rank += 1 if df.MPI.rank(self.MP.mpi_cw) == self.rank: for jj in range(self.MP.n_tx): from custEM.core import MOD M = MOD(self.MP.mod_name, self.MP.mesh_name, self.MP.approach, overwrite_results=False, p=self.FS.p, file_format=self.MP.file_format, self_mode=True, load_existing=str(jj), r_dir=self.MP.r_dir, para_dir=self.MP.para_dir, mute=True, test_mode=False, m_dir=self.MP.m_dir, field_selection='B_static', dg_interpolation=False, fs_type='cg') target_mesh = df.Mesh(self.MP.mpi_cs, self.interp_mesh_dir + '/' + interp_mesh + '.xml') V_target = df.VectorFunctionSpace( target_mesh, "CG", interp_p) d_interp = df.interpolate(M.PP.B_static[jj], V_target) self.export_interpolation_data(d_interp, V_target, jj, export_pvd) else: pass
[docs] def calc_dc_field(self, dc_mesh=None, secondary_potential=False, store=False, td_dc=True): """ Calculate direct current electric fields with either secondary or total field formulation of potential approach for obtaining shut-off transients. Keyword arguments ----------------- - secondary_potential = False, type bool use either total or secondary potential formulation - store = False, type bool save static magnetic fields as HDF5 file """ from custEM.core import Solver from custEM.fem import TotalFieldAssembler # reduce rather disturbing and duplicated prints from nested DC model if self.MP.logger.level < 30: self.MP.logger.handlers[0].setLevel(30) FE_DC = DC(self.FS, self.MP) FE_DC.build_var_form(s_type=self.s_type, tx=self.tx, n_tx=len(self.tx), bc='ZD', grounding=self.grounding, start=self.start, stop=self.stop, origin=self.origin, length=self.length, azimuth=self.azimuth, radius=self.radius, secondary_potential=secondary_potential) self.L = FE_DC.L self.R = FE_DC.R self.dc_solver = Solver(self.FS, self, mumps_debug=False) if FE_DC.secondary_potential: AA, bb = df.PETScMatrix(), df.PETScVector() b = [] if type(self.R) is not list: self.R = [self.R] for ti in range(self.n_tx): if FE_DC.bc is None: A, b_tmp = df.assemble_system(self.L[0], self.R[ti], A_tensor=AA, b_tensor=bb) elif FE_DC.bc in ['ZeroDirichlet', 'ZD']: FE_DC.FS.add_dirichlet_bc() A, b_tmp = df.assemble_system(self.L[0], self.R[ti], FE_DC.FS.bc, A_tensor=AA, b_tensor=bb) b.append(b_tmp) U = self.dc_solver.solve_system_mumps( A, b, 'scalar', sym=True, tx_selection=self.MP.grounding) del self.dc_solver.solver else: Assembler = TotalFieldAssembler(self, bc=self.bc, td_dc=td_dc) Assembler.assemble() U = self.dc_solver.solve_system_mumps( Assembler.A, Assembler.b, 'scalar', sym=True, tx_selection=self.MP.grounding) del self.dc_solver.solver if FE_DC.secondary_potential: for ti in range(self.MP.n_tx): U[ti].vector()[:] = (U[ti].vector().get_local() + FE_DC.U_0[ti].vector().get_local()) A = df.PETScMatrix() b = [df.PETScVector() for ti in range(self.n_tx)] u = df.TrialFunction(self.FS.V) v = df.TestFunction(self.FS.V) K = df.inner(u, v) * self.dx_0 for ti in range(self.n_tx): # note additional sign change, might be incorporated everythere RHS = -df.inner(-df.grad(U[ti]), v) * self.dx_0 df.assemble_system(K, RHS, A_tensor=A, b_tensor=b[ti]) lp(self.MP.logger, 20, '... deriving electric field from potential ...', pre_dash=False) self.E_dc = self.dc_solver.solve_system_mumps( A, b, self.FS.V, sym=True, tx_selection=self.MP.grounding) if store and self.MP.file_format == 'xml': out_name = self.path_fd + '/' + self.fd_mod_name for ti in range(len(self.E_dc)): if self.n_tx != 1: out_name = self.path_fd + '/' + self.MP.mod_name +\ '_tx_{:d}'.format(ti) df.File(out_name + '_E_dc.xml') << self.E_dc[ti] if store and self.MP.file_format == 'h5': out_name = self.path_fd + '/' + self.fd_mod_name if self.n_tx == 1: self.h5_file = write_h5( self.MP.mpi_cw, self.E_dc[0], out_name + '_E_dc.h5', counter=0, close=True) else: ti = 0 self.h5_file = write_h5( self.MP.mpi_cw, self.E_dc[0], out_name + '_E_dc.h5', counter=0, close=False) for ti in range(1, self.n_tx - 1): write_h5(self.MP.mpi_cw, self.E_dc[ti], out_name + '_E_dc.h5', counter=ti, new=False, f=self.h5_file, close=False) write_h5( self.MP.mpi_cw, self.E_dc[ti+1], out_name + '_E_dc.h5', f=self.h5_file, new=False, counter=ti+1, close=True) # switch back to specified debug level of main model self.MP.logger.handlers[0].setLevel(self.MP.logger.level)
[docs] def interpolate_dc(self, interp_mesh, interp_p=None, export_pvd=True, interp_dir=None): """ Customized interpolation function to interpolate direct current fields on specified interpolation mesh. Required arguments ------------------ - interp_mesh, type str name of target interpolation mesh Keyword arguments ----------------- - interp_p = None, type int default is *interp_p=1*, not reasonable to be changed - export_pvd = True, type bool save fields as *.pvd* file for Paraview or not - interp_dir = None, type str specify custom interpolation directory, not recommended """ if interp_dir is not None: self.interp_dir = interp_dir if interp_p is None: interp_p = 1 if df.MPI.rank(self.MP.mpi_cw) == 0: if not os.path.isdir(self.interp_dir): os.mkdir(self.interp_dir) else: pass if os.path.isfile(self.interp_mesh_dir + '/' + interp_mesh + '.xml') is False: lp(self.MP.logger, 30, 'Warning! Interpolation mesh "' + str(interp_mesh) + '.xml" ' 'could not be found! Continuing ...') return lp(self.MP.logger, 20, '... interpolating E_dc on "' + interp_mesh + '" in serial ...', pre_dash=False, barrier=False) from custEM.core import MOD M = MOD(self.fd_mod_name, self.MP.mesh_name, self.fd_approach, overwrite_results=False, p=self.FS.p, file_format=self.MP.file_format, self_mode=True, load_existing=True, import_freq=0, debug_level=30, r_dir=self.MP.r_dir, para_dir=self.MP.para_dir, mute=True, test_mode=self.FS.test_mode, m_dir=self.MP.m_dir, fs_type='ned', dg_interpolation=False) if df.MPI.rank(self.MP.mpi_cw) == 0: # root only is fine here lp(self.MP.logger, 10, '... process ' + str(int( df.MPI.rank(self.MP.mpi_cw))) + ' starts E_dc interpolation ' '...', pre_dash=False, barrier=False, root_only=False) M.PP.import_selected_results(['E_dc'], self.MP.file_format) target_mesh = df.Mesh(self.MP.mpi_cs, self.interp_mesh_dir + '/' + interp_mesh + '.xml') V_target = df.VectorFunctionSpace( target_mesh, "CG", interp_p) for ti in range(self.n_tx): d_interp = df.interpolate(M.PP.E_dc[ti], V_target) self.export_interpolation_data(d_interp, V_target, ti, export_pvd, interp_mesh) else: pass df.MPI.barrier(df.MPI.comm_world)
[docs] def export_interpolation_data(self, field, V_serial, ti, export_pvd, interp_mesh): """ Export interpolated results as complex numpy arrays and, if specified, as *.pvd* files to be viewed in Paraview. Required arguments ------------------ - field, type dolfin Function dolfin function containing the field values - V_serial, type dolin FunctionSpace non-distributed solution function space - ti, type int number of the Tx to specify output file name - export_pvd, type bool aside from numpy array, save interpolated fields also as *.pvd* file for Paraview - interp_mesh, type str interpolation mesh name """ if self.n_tx == 1: f_name = 'E_dc_' + interp_mesh else: f_name = 'tx_' + str(ti) + '_E_dc_' + interp_mesh field_npy = np.hstack((field.vector().get_local( ).reshape(-1, 3), V_serial.tabulate_dof_coordinates( ).reshape(-1, 3)[0::3, :])) np.save(self.interp_dir + f_name + '.npy', field_npy) if export_pvd: df.File(self.MP.mpi_cs, self.interp_dir + f_name + '.pvd') << field
[docs] class DC(ApproachBaseFD): """ Implementation of common potential approach for caluclating direct current (DC) electric fields. Methods ------- - build_var_form() build variational formulation with FEniCS - lhs_form() build left-hand side of variational formulation - rhs_form_total() build right-hand side of variational formulation for secondary potential approach - rhs_form_secondary() build right-hand side of variational formulation for total potential approach - calc_primary_potential() calculate primary potential for 1D backround resistivity distribution analytically - calc_dc_field() customoized method for solving the DC FE problem Description of DC parameters ---------------------------- Most parameters are equivalent to the parameters for the frequency-domain approaches in the **ApproachBaseFD**. Please refer to the corresponding documentaion. The only **DC** class relevant keyword argument to be updated with the *build_var_form()* method is listed below. - secondary_potential = False, type bool use either total or secondary potential formulation """ def __init__(self, FS, MP): """ Initialize **DC** class, which also inherits attributes and methods from the base class **ApproachBaseFD**. Required arguments ------------------ - FS, type class FunctionSpaces instance - MP, type class ModelParameters instance """ # inherit default attributes and common methods from ApproachBaseFD super(self.__class__, self).__init__() # initialize trial and test functions and pipe FE parameters self.v = df.TestFunction(FS.S) self.u = df.TrialFunction(FS.S) self.FS = FS self.MP = MP self.p = self.FS.p self.secondary_potential = False # mute print of the following FE class attributes self.mute = ['v', 'u', 'DOM', 'dx', 'FS', 'MP', 'mute', 'tx']
[docs] def build_var_form(self, **fem_kwargs): """ Update FE parameters and build variational formulations for the matrix assembly. For the description of valid keyword arguments, it is referred to the **ApproachBaseFD** and **DC** class documentations. """ self.update_fem_parameters(fem_kwargs) check_source(self.s_type, self.tx) if self.secondary_potential: self.lhs_form() self.calc_primary_potential() self.rhs_form_secondary() else: self.lhs_form() self.rhs_form_total()
[docs] def lhs_form(self): """ Build left-hand side of variational formulation for DC approach. """ self.L = [df.inner(df.grad(self.u), self.MP.sigma_func * df.grad( self.v)) * self.FS.DOM.dx_0]
[docs] def rhs_form_total(self): """ Build right-hand side of variational formulation for total potential approach. """ dx_0 = self.FS.DOM.dx_0 self.R = [df.inner(df.Constant(("0.0")), self.v) * dx_0]
[docs] def rhs_form_secondary(self): """ Build right-hand side of variational formulation for secondary DC potential approach. """ self.R = [] for ti in range(self.MP.n_tx): self.R.append(-df.inner( df.grad(self.U_0[ti]), self.MP.dsigma_func * df.grad(self.v)) * self.FS.DOM.dx_0)
[docs] def calc_primary_potential(self): """ Calculate primary potential of static current analytically. """ dof_coord = self.FS.S.tabulate_dof_coordinates().reshape(-1, 3) self.U_0 = [df.Function(self.FS.S) for ti in range(self.MP.n_tx)] for ti in range(self.MP.n_tx): r1 = np.linalg.norm(dof_coord - self.MP.tx[ti][0], axis=1) r2 = np.linalg.norm(dof_coord - self.MP.tx[ti][-1], axis=1) r1[r1 <= self.a_tol] = self.a_tol r2[r2 <= self.a_tol] = self.a_tol tmp = self.MP.currents[ti] / (self.MP.sigma_ground[0] * 2. * df.pi * r1) -\ self.MP.currents[ti] / (self.MP.sigma_ground[0] * 2. * df.pi * r2) self.U_0[ti].vector()[:] = tmp self.U_0[ti].vector().apply('insert')
[docs] def calc_dc_field(self, PP, profiler, secondary_potential=None, delete_factorization=True): """ Assemble the system matrix and right-hand sides and solve the resulting linear system of equations for either the total or secondary potential approach. The electric DC field is also derived after the solution of the potential FE problem. Keyword arguments ----------------- - secondary_potential = False, type bool use either total or secondary potential formulation, can be updated during calling this method is required, but should be already specified before """ from custEM.core import Solver from custEM.fem import TotalFieldAssembler self.Solver = Solver(self.FS, self, mumps_debug=False) if secondary_potential is not None: self.secondary_potential = secondary_potential if self.secondary_potential: AA, bb = df.PETScMatrix(), df.PETScVector() b = [] if type(self.R) is not list: self.R = [self.R] for ti in range(self.n_tx): if self.bc is None: A, b_tmp = df.assemble_system(self.L[0], self.R[ti], A_tensor=AA, b_tensor=bb) elif self.bc in ['ZeroDirichlet', 'ZD']: self.FS.add_dirichlet_bc() A, b_tmp = df.assemble_system(self.L[0], self.R[ti], self.FS.bc, A_tensor=AA, b_tensor=bb) b.append(b_tmp) lp(self.MP.logger, 20, 'Solution of main FE system(s) of equations:') self.U = self.Solver.solve_system_mumps(A, b, 'scalar', sym=True) del self.Solver.solver else: Assembler = TotalFieldAssembler(self, bc=self.bc) Assembler.assemble() if profiler: export_config_file(PP) lp(self.MP.logger, 20, 'Solution of main FE system(s) of equations:') self.U = self.Solver.solve_system_mumps( Assembler.A, Assembler.b, 'scalar', sym=True, tx_selection=self.MP.grounding) if delete_factorization: del self.Solver.solver if self.secondary_potential: for ti in range(self.MP.n_tx): self.U[ti].vector()[:] = (U[ti].vector().get_local() + self.U_0[ti].vector().get_local())
[docs] def check_sigma_vals(MP): """ Evaulate, if anomalies are included or halfspace or fullspace model is chosen. Required arguments ------------------ - MP, type class ModelParameters instance """ try: if not all(type(x[0]) == float for x in MP.delta_sigma[2:]): return(True) else: if len(MP.delta_sigma) is 2 or ( all(x[0] < 1e-8 for x in MP.delta_sigma[2:]) and all(x[0] > -1e-8 for x in MP.delta_sigma[2:])): return(False) else: return(True) except Exception as e: if not all(type(x) == list for x in MP.delta_sigma[2:]): return(True) else: if len(MP.delta_sigma) is 2 or ( all(x[0] < 1e-8 for x in MP.delta_sigma[2:]) and all(x[0] > -1e-8 for x in MP.delta_sigma[2:])): return(False) else: return(True)
[docs] def add_sigma_vals_to_list(vals, inverse=False): """ Rearrange conductivity values to appropriate list format, e.g., convert isotropic values given as float to list of length 1. Required arguments ------------------ - vals, type list (of lists) conductivity values specified in **MP** instance Keyword arguments ----------------- - inverse = False, type bool return either sigma or 1/sigma (inv(sigma)) """ s_list = [] for val in vals: if type(val) is list and type(val[0]) is float: if inverse: s_list.append(1. / df.Constant((val[0]))) else: s_list.append(df.Constant((val[0]))) else: if inverse: s_list.append(df.inv(val)) else: s_list.append(val) return(s_list)
[docs] def check_source(s_type, tx, pf_flag='E', approach=None): """ Evaulate, which source type is chosen. Required arguments ------------------ - s_type, type int source type (see **ApproachBaseFD** documentation) - tx, type list of arrays list of transmitter coordinates for each Tx Keywprd arguments ----------------- - pf_flag = 'E', type str either *'E'* or *'H'* """ if approach is not None: if 'mt' in approach.lower(): if s_type not in ['auto', 'mt']: lp(self.MP.logger, 50, 'Error! *s_type* must be *auto* (default setting) for MT ' 'modeling. Aborting ...') raise SystemExit if pf_flag not in ['E', 'H']: lp(self.MP.logger, 50, 'Error! "pf_EH_flag must be either "E" or "H"!') raise SystemExit if s_type in ['auto', 'hed', 'vmd', 'line', 'loop_c', 'loop_r', 'mt']: return elif s_type in ['path', 'vmds', 'heds']: if tx is None: lp(self.MP.logger, 50, 'Error! No coordinates ("tx" variable) for "s_type" ' '--> ' + s_type + ' defined!') raise SystemExit else: lp(self.MP.logger, 50, 'Error! Unknown "s_type" --> ' + s_type + '!') raise SystemExit