# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""
import dolfin as df
import numpy as np
from custEM.misc import logger_print as lp
from custEM.misc import write_h5
from custEM.misc import export_config_file
import os
"""
Definiton of base classes initializing fem-realted parameters and defining
fem-related utility functions.
"""
[docs]
class ApproachBaseFD:
"""
Initialize FE variables for all frequency-domain approaches. All variables
can be updated by specifying the corresponding keyword arguments in the
*build_var_form()* method.
Methods
-------
- update_fem_parameters()
update valid parameters of the **FE** and **PF** classes and print
updated class attributes
- update_tx_parameters()
if *s_type='auto'*, update Tx information from imported mesh-parameter
file in the **MP** instance
Description of FE parameters
----------------------------
- s_type = 'auto', type str
alternatives:
**hed** (HED source, treated identical to **line** tx in halfspace
and as infinitesimal source in fullspace
with analytical solution)
**vmd** (VMD source, treated as infinitesimal source in fullspace
with analytical solution)
**heds** (multiple HED sources, treated as infinitesimal sources
in fullspace with analytical solution; specify origins with
a list of 3D coordinated in the *tx* variable)
**vmds** (multiple VMD sources, treated as infinitesimal sources
in fullspace with analytical solution; specify origins with
a list of 3D coordinated in the *tx* variable)
**line** (real finite length dipole source)
**loop_c** (circular loop source)
**loop_r** (rectangular loop source)
**path** (arbitrary sources)
- tx = None, type list
internal usage only; if *s_type=auto*, this variable will be updated
by the information in the mesh-parameter file; otherwise, the geometry
parameters will be used to define the tx
- n_tx = 1, type int
number of transmitters (right-hand sides of system of linear equations)
- grounding = False,
internal usage only and only relevant if *s_type=auto* (this variable
will be updated by the information in the mesh-parameter file)
- origin = [0.0, 0.0, 0.0], type list of len (3)
origin of *hed* or *vmd* Tx; only valid if *s_type* is not *auto*
and *n_tx=1*
- length = 1., type float
length of *hed* Tx; only valid if *s_type* is not *auto* and *n_tx=1*
- self.azimuth = 0., type float
azimuth of *hed* or *vmd* Tx in degree; only valid if *s_type* is
not *auto* and *n_tx=1*
- start = [-100, 0.0, 0.0], type list of len(3)
start coordinate of *line* Tx or lower left corner of *loop_r* Tx;
only valid if *s_type* is not *auto* and *n_tx=1*;
z_value is ignored and always set to zero
- stop = [100.0, 0.0, 0.0], type list of len(3)
stop coordinate of *line* Tx or upper right corner of *loop_r* Tx;
only valid if *s_type* is not *auto* and *n_tx=1*;
z_value is ignored and always set to zero
- radius = 100., type float
radius of *loop_c* Tx; only valid if *s_type* is not *auto* and
*n_tx=1*
- pf_EH_flag = 'E', type str
set to 'H' for using primary magentic fields to calculate the
right-hand side source contributions in secondary field formulations
- bc = None, type str
specify alternative boundary conditions (see **FS** class)
- quasi_static = True, type bool
use diffusive (quasi static) approximation of the Helmholtz equation
without displacement currents, *False* only implemented in the
*E_vector* approach
- ip = False, type bool
set *True* to enable modeling of induced polarization effects by
specifying Cole-Cole model parameters (see **MP** class), only
implemented in the *E_vector* approach
- tx_topo = None, type str or function
specify topography (from file or as function z=f(x, y)) for searching
the vertical Tx coordinates on a manually chosen topography for the
total field approaches, not recommendend to be changed
- a_tol = 1e-2, type float
find correct source dof locations for total field formulations;
might be changed but the default choice is almost always sufficient
- assembly_time = 0., type float
used internally, adds all assembly times to this variable during the
to be stored by the *profiler* later on
"""
def __init__(self):
"""
Initialize **FE** class attributes for frequency-domain approaches.
"""
self.s_type = 'auto'
self.tx = None
self.grounding = [False]
self.n_tx = 1
self.origin = [0.0, 0.0, 0.0]
self.radius = 100.
self.start = [-100, 0.0, 0.0]
self.stop = [100.0, 0.0, 0.0]
self.length = 1.
self.azimuth = 0.
self.pf_EH_flag = 'E'
self.bc = None
self.quasi_static = True
self.ip = False
self.tx_topo = None
self.a_tol = 1e-2
self.assembly_time = 0.
[docs]
def update_fem_parameters(self, fem_kwargs):
"""
Update and print **FE** and **PF** class attributes for
frequency-domain approaches by overwriting the corresponding
keyword arguments.
Required arguments
------------------
fem_kwargs, type dict
dictionary of keyword arguments for *build_var_form()* method
"""
for key in fem_kwargs:
if key not in self.__dict__:
if key not in ['pyhed_interp', 'ph_log_level', 'max_length',
'pyhed_drop_tol', 'force_primary', 'Assembler',
'dipole_z', 'pf_p', 'pf_name', 'procs_per_proc',
'pf_type', 'export_pvd', 'eval_tx', 'n_segs']:
lp(self.MP.logger, 50,
'Error! Unknown FE parameter set', key)
lp(self.MP.logger, 50,
"... let's stop before something unexpected happens"
" ...", pre_dash=False)
raise SystemExit
self.__dict__.update(fem_kwargs)
if '_t' in self.MP.approach:
self.mute.extend(['n_segs'])
if '_s' in self.MP.approach:
self.mute.extend(['tx_topo', 'a_tol'])
self.update_tx_parameters()
lp(self.MP.logger, 20, 'FE parameters update:')
for k, v in sorted(self.__dict__.items()):
if k == 'n_tx':
to_print = str(v) + ', equals MP.n_tx if s_type == auto'
elif k == 'tx' and v is not None:
to_print = 'using ' + str(len(v)) + \
' tx, equals MP.tx if s_type == auto'
else:
to_print = v
if k not in self.__dict__['mute']:
lp(self.MP.logger, 20,
'--> {:<22}'.format(str(k)) + ' = ' + str(to_print),
pre_dash=False)
eps_flag = False
if not self.quasi_static:
lp(self.MP.logger, 20,
' - using enabled - ',
post_dash=False)
eps_flag=True
if not 'An' in self.MP.approach:
self.MP.build_dg_parameter_functions(self.FS, self.pf_EH_flag,
eps_flag)
if '_s' in self.MP.approach:
if self.pf_EH_flag == 'E':
lp(self.MP.logger, 20,
' - using primary electric fields - ', pre_dash=False)
if self.pf_EH_flag == 'H':
lp(self.MP.logger, 20,
' - using primary magnetic fields - ', pre_dash=False)
[docs]
def update_tx_parameters(self):
"""
Update Tx information from imported mesh-parameter file in the **MP**
instance if *s_type='auto'*.
"""
if self.s_type in ['path', 'vmds', 'heds']:
self.n_tx = len(self.tx)
self.MP.n_tx = self.n_tx
if self.s_type == 'auto' and not 'mt' in self.MP.approach.lower():
self.s_type = 'path'
self.tx = self.MP.tx
self.n_tx = self.MP.n_tx
self.grounding = self.MP.grounding
if self.s_type in ['auto', 'mt'] and 'mt' in self.MP.approach.lower():
self.n_tx = self.MP.n_tx
[docs]
class ApproachBaseTD:
"""
Initialize FE variables for all time-domain approaches. All variables
can be updated by specifying the corresponding keyword arguments in the
*build_var_form()* method.
Methods
-------
- update_fem_parameters()
update valid parameters of the **FE** and **PF** classes and print
updated class attributes
- update_tx_parameters()
if *s_type='auto'*, update Tx information from imported mesh-parameter
file in the **MP** instance
- calc_static_magnetic_field()
calculate static magnetic field with the Biot-Savart law
- biot_savart()
vectorized implementation of Biot-Savart law
- interpolate_static()
interpolate static magentic field on specified interpolation mesh for
independent post-processing purposes
- calc_dc_field()
calculate direct current electric field with either total or
secondary potential formulation of the common potential approach
- interpolate_dc()
interpolate direct current field on specified interpolation mesh for
independent post-processing purposes
- export_interpolation_data()
export interpolated static magentic field or direct current electric
field results as numpy files and, if specified, in *.pvd* file format
Description of FE parameters
----------------------------
Here, only parameters relevant for the time-domain approaches are listed.
For all other parameters, it is referred to the documentation of the
**ApproachBaseFD** class.
Note that further relevant parameters that can be updated with the
*build_var_form_method()* are initialized by each time-domain approach
class individually. It is referred to the corresponding documentation
in the *time_domain_approaches.py* file.
- log_t_min = -6, type int/float
specify start observation time of logarithmically divided interval
for computing transients; defines the start time as 10^log_t_min
- log_t_max = 0, type int/float
specify stop observation time of logarithmically divided interval
for computing transients; defines the stop time as 10^log_t_max
- n_log_steps = 61, type int
number of steps to logarithmically divide the time interval of the
transient, equal to the number of results exported at the
corresponding times
- times = None, type list
manually define export times of the transients
- shut_off = True, type bool
calculate either transient transmitter shut-off of shut-on response
"""
def __init__(self):
"""
Initialize **FE** class attributes for time-domain approaches.
"""
self.s_type = 'auto'
self.tx = None
self.grounding = False
self.n_tx = 1
self.origin = [0.0, 0.0, 0.0]
self.radius = 100.0
self.start = [-100, 0.0, 0.0]
self.stop = [100.0, 0.0, 0.0]
self.length = 1.
self.azimuth = 0.
self.bc = None
self.quasi_static = True
self.ip = False
self.tx_topo = None
self.a_tol = 1e-2
self.assembly_time = 0.
# relevant variables for all time-domain approaches
self.log_t_min = -6
self.log_t_max = 0
self.n_log_steps = 61
self.times = None
self.shut_off = True
# only for internal usage to disable prints of the following attributes
self.mute = ['DOM', 'v', 'u', 'dx', 'dx_0', 'FS', 'MP', 'mute', 'tx']
[docs]
def update_fem_parameters(self, fem_kwargs):
"""
Update and print **FE** class attributes for time-domain approaches
by overwriting the corresponding keyword arguments.
Required arguments
------------------
- fem_kwargs, type dict
dictionary of keyword arguments for *build_var_form()* method
"""
for key in fem_kwargs:
if key not in self.__dict__:
if key not in ['source_type', 'be_order', 'n_lin_steps',
'n_on', 't0_j', 'pulse_width', 'times',
'source_times', 'source_func', 'stepping_times',
'omegas', 'path_td', 'fd_approach',
'fd_mod_name', 'parallel_interpolation',
'interp_mesh', 'n_mult', 'n_poles', 't_tol',
'pole_shift', 'dc_field', 'frequencies']:
lp(self.MP.logger, 50,
'Error! Unknown FE parameter set', key)
lp(self.MP.logger, 50,
"... let's stop before something unexpected happens "
"...", pre_dash=False)
raise SystemExit
self.__dict__.update(fem_kwargs)
self.update_tx_parameters()
lp(self.MP.logger, 20, 'FE parameters update:')
for k, v in sorted(self.__dict__.items()):
if k == 'n_tx':
to_print = str(v) + ', equals MP.n_tx if s_type == auto'
elif k == 'tx' and v is not None:
to_print = 'using ' + str(len(v)) + \
' tx, equals MP.tx if s_type == auto'
elif k == 'times':
if v is not None:
if len(v) > 1:
to_print = 'calculating ' + str(len(v)) + ' times'
else:
to_print = v
if k not in self.__dict__['mute']:
lp(self.MP.logger, 20,
'--> {:<22}'.format(str(k)) + ' = ' + str(to_print),
pre_dash=False)
self.MP.build_dg_parameter_functions(self.FS)
[docs]
def update_tx_parameters(self):
"""
Update Tx information from imported mesh-parameter file in the **MP**
instance if *s_type='auto'*.
"""
if self.s_type in ['path', 'vmds', 'heds']:
self.n_tx = len(self.tx)
self.MP.n_tx = self.n_tx
if self.s_type == 'auto':
self.s_type = 'path'
self.tx = self.MP.tx
self.n_tx = self.MP.n_tx
self.grounding = self.MP.grounding
[docs]
def calc_static_magnetic_field(self, store=False):
"""
Calculate static magentic fields with the Biot-Savart law.
Keyword arguments
-----------------
- store = False, type bool
save static magnetic fields as HDF5 file
"""
rx = self.FS.V_cg.tabulate_dof_coordinates().reshape(-1, 3)[0::3, :]
B_static = self.biot_savart(rx, np.array(self.MP.tx[0]))
self.B_static = df.Function(self.FS.V_cg)
self.B_static.vector()[:] = B_static.ravel()
df.File('B_static.pvd') << self.B_static
if store:
if self.MP.n_tx != 1:
lp(self.MP.logger, 30,
'Warning! Storing currently only results for first Tx of '
'static magnetic field calculation. Continuing ...')
self.E_file = write_h5(
self.MP.mpi_cw, self.B_static, self.MP.out_dir + '/' +
self.MP.mod_name + '_B_static.' + self.MP.file_format)
[docs]
def biot_savart(self, rx, tx):
"""
Vectorized implementation of the Biot-Savart law.
Required arguments
------------------
- rx, type list of coordinates
receiver coordinates
- tx, type list of coordinates
list of transmitter coordinates describing the transmitter path
"""
nrx = len(rx[:, 0])
B = np.zeros((nrx, 3))
dl = np.diff(tx, axis=0)
nl = len(dl)
rc = tx[:-1, :] + 0.5 * dl
B = np.zeros((nrx, 3))
for j in range(nl):
r = rx - rc[j, :]
norm3 = np.linalg.norm(r, axis=1)**3
db = self.MP.current * 1e-7 * np.cross(dl[j, :], r) /\
norm3[:, np.newaxis]
B += db
return (B)
[docs]
def interpolate_static(self, interp_mesh,
interp_p=None,
export_pvd=True, interp_dir=None):
"""
Customized interpolation function to interpolate static magnetic fields
on specified interpolation mesh.
Required arguments
------------------
- interp_mesh, type str
name of target interpolation mesh
Keyword arguments
-----------------
- interp_p = None, type int
default is *interp_p=1*, not reasonable to be changed
- export_pvd = True, type bool
save fields as *.pvd* file for Paraview or not
- interp_dir = None, type str
specify custom interpolation directory, not recommended
"""
self.rank = 0
self.export_name = 'B_static_' + interp_mesh
if interp_dir is not None:
self.interp_dir = interp_dir
if interp_p is None:
interp_p = 1
if df.MPI.rank(self.MP.mpi_cw) == 0:
if not os.path.isdir(self.interp_dir):
os.mkdir(self.interp_dir)
else:
pass
if interp_mesh.find('line') is not -1:
self.interp_mesh_dir = self.MP.m_dir + '/lines'
elif interp_mesh.find('slice') is not -1:
self.interp_mesh_dir = self.MP.m_dir + '/slices'
elif interp_mesh.find('path') is not -1:
self.interp_mesh_dir = self.MP.m_dir + '/paths'
else:
lp(self.MP.logger, 50,
'Error! Interpolation target mesh name must '
'end either with "line", "slice" or "path" so far. '
'Aborting ...', post_dash=True)
raise SystemExit
if os.path.isfile(self.interp_mesh_dir + '/' +
interp_mesh + '.xml') is False:
lp(self.MP.logger, 30,
'Warning! Interpolation mesh "' + str(interp_mesh) + '.xml" '
'could not be found! Continuing ...')
return
lp(self.MP.logger, 20,
'... interpolating B_static on "' + interp_mesh +
'" in serial ...', pre_dash=False, barrier=False)
self.rank += 1
if df.MPI.rank(self.MP.mpi_cw) == self.rank:
for jj in range(self.MP.n_tx):
from custEM.core import MOD
M = MOD(self.MP.mod_name, self.MP.mesh_name,
self.MP.approach, overwrite_results=False, p=self.FS.p,
file_format=self.MP.file_format, self_mode=True,
load_existing=str(jj), r_dir=self.MP.r_dir,
para_dir=self.MP.para_dir, mute=True, test_mode=False,
m_dir=self.MP.m_dir, field_selection='B_static',
dg_interpolation=False,
fs_type='cg')
target_mesh = df.Mesh(self.MP.mpi_cs, self.interp_mesh_dir +
'/' + interp_mesh + '.xml')
V_target = df.VectorFunctionSpace(
target_mesh, "CG", interp_p)
d_interp = df.interpolate(M.PP.B_static[jj], V_target)
self.export_interpolation_data(d_interp, V_target,
jj, export_pvd)
else:
pass
[docs]
def calc_dc_field(self, dc_mesh=None, secondary_potential=False,
store=False, td_dc=True):
"""
Calculate direct current electric fields with either secondary or
total field formulation of potential approach for obtaining shut-off
transients.
Keyword arguments
-----------------
- secondary_potential = False, type bool
use either total or secondary potential formulation
- store = False, type bool
save static magnetic fields as HDF5 file
"""
from custEM.core import Solver
from custEM.fem import TotalFieldAssembler
# reduce rather disturbing and duplicated prints from nested DC model
if self.MP.logger.level < 30:
self.MP.logger.handlers[0].setLevel(30)
FE_DC = DC(self.FS, self.MP)
FE_DC.build_var_form(s_type=self.s_type,
tx=self.tx,
n_tx=len(self.tx),
bc='ZD',
grounding=self.grounding,
start=self.start,
stop=self.stop,
origin=self.origin,
length=self.length,
azimuth=self.azimuth,
radius=self.radius,
secondary_potential=secondary_potential)
self.L = FE_DC.L
self.R = FE_DC.R
self.dc_solver = Solver(self.FS, self, mumps_debug=False)
if FE_DC.secondary_potential:
AA, bb = df.PETScMatrix(), df.PETScVector()
b = []
if type(self.R) is not list:
self.R = [self.R]
for ti in range(self.n_tx):
if FE_DC.bc is None:
A, b_tmp = df.assemble_system(self.L[0], self.R[ti],
A_tensor=AA, b_tensor=bb)
elif FE_DC.bc in ['ZeroDirichlet', 'ZD']:
FE_DC.FS.add_dirichlet_bc()
A, b_tmp = df.assemble_system(self.L[0], self.R[ti],
FE_DC.FS.bc,
A_tensor=AA, b_tensor=bb)
b.append(b_tmp)
U = self.dc_solver.solve_system_mumps(
A, b, 'scalar', sym=True, tx_selection=self.MP.grounding)
del self.dc_solver.solver
else:
Assembler = TotalFieldAssembler(self, bc=self.bc,
td_dc=td_dc)
Assembler.assemble()
U = self.dc_solver.solve_system_mumps(
Assembler.A, Assembler.b, 'scalar', sym=True,
tx_selection=self.MP.grounding)
del self.dc_solver.solver
if FE_DC.secondary_potential:
for ti in range(self.MP.n_tx):
U[ti].vector()[:] = (U[ti].vector().get_local() +
FE_DC.U_0[ti].vector().get_local())
A = df.PETScMatrix()
b = [df.PETScVector() for ti in range(self.n_tx)]
u = df.TrialFunction(self.FS.V)
v = df.TestFunction(self.FS.V)
K = df.inner(u, v) * self.dx_0
for ti in range(self.n_tx):
# note additional sign change, might be incorporated everythere
RHS = -df.inner(-df.grad(U[ti]), v) * self.dx_0
df.assemble_system(K, RHS, A_tensor=A, b_tensor=b[ti])
lp(self.MP.logger, 20,
'... deriving electric field from potential ...', pre_dash=False)
self.E_dc = self.dc_solver.solve_system_mumps(
A, b, self.FS.V, sym=True, tx_selection=self.MP.grounding)
if store and self.MP.file_format == 'xml':
out_name = self.path_fd + '/' + self.fd_mod_name
for ti in range(len(self.E_dc)):
if self.n_tx != 1:
out_name = self.path_fd + '/' + self.MP.mod_name +\
'_tx_{:d}'.format(ti)
df.File(out_name + '_E_dc.xml') << self.E_dc[ti]
if store and self.MP.file_format == 'h5':
out_name = self.path_fd + '/' + self.fd_mod_name
if self.n_tx == 1:
self.h5_file = write_h5(
self.MP.mpi_cw, self.E_dc[0], out_name + '_E_dc.h5',
counter=0, close=True)
else:
ti = 0
self.h5_file = write_h5(
self.MP.mpi_cw, self.E_dc[0], out_name + '_E_dc.h5',
counter=0, close=False)
for ti in range(1, self.n_tx - 1):
write_h5(self.MP.mpi_cw, self.E_dc[ti], out_name +
'_E_dc.h5', counter=ti, new=False, f=self.h5_file,
close=False)
write_h5(
self.MP.mpi_cw, self.E_dc[ti+1], out_name + '_E_dc.h5',
f=self.h5_file, new=False, counter=ti+1, close=True)
# switch back to specified debug level of main model
self.MP.logger.handlers[0].setLevel(self.MP.logger.level)
[docs]
def interpolate_dc(self, interp_mesh,
interp_p=None,
export_pvd=True, interp_dir=None):
"""
Customized interpolation function to interpolate direct current fields
on specified interpolation mesh.
Required arguments
------------------
- interp_mesh, type str
name of target interpolation mesh
Keyword arguments
-----------------
- interp_p = None, type int
default is *interp_p=1*, not reasonable to be changed
- export_pvd = True, type bool
save fields as *.pvd* file for Paraview or not
- interp_dir = None, type str
specify custom interpolation directory, not recommended
"""
if interp_dir is not None:
self.interp_dir = interp_dir
if interp_p is None:
interp_p = 1
if df.MPI.rank(self.MP.mpi_cw) == 0:
if not os.path.isdir(self.interp_dir):
os.mkdir(self.interp_dir)
else:
pass
if os.path.isfile(self.interp_mesh_dir + '/' +
interp_mesh + '.xml') is False:
lp(self.MP.logger, 30,
'Warning! Interpolation mesh "' + str(interp_mesh) + '.xml" '
'could not be found! Continuing ...')
return
lp(self.MP.logger, 20,
'... interpolating E_dc on "' + interp_mesh +
'" in serial ...', pre_dash=False, barrier=False)
from custEM.core import MOD
M = MOD(self.fd_mod_name, self.MP.mesh_name,
self.fd_approach, overwrite_results=False, p=self.FS.p,
file_format=self.MP.file_format, self_mode=True,
load_existing=True, import_freq=0, debug_level=30,
r_dir=self.MP.r_dir, para_dir=self.MP.para_dir,
mute=True, test_mode=self.FS.test_mode,
m_dir=self.MP.m_dir, fs_type='ned', dg_interpolation=False)
if df.MPI.rank(self.MP.mpi_cw) == 0: # root only is fine here
lp(self.MP.logger, 10, '... process ' + str(int(
df.MPI.rank(self.MP.mpi_cw))) + ' starts E_dc interpolation '
'...', pre_dash=False, barrier=False, root_only=False)
M.PP.import_selected_results(['E_dc'], self.MP.file_format)
target_mesh = df.Mesh(self.MP.mpi_cs, self.interp_mesh_dir +
'/' + interp_mesh + '.xml')
V_target = df.VectorFunctionSpace(
target_mesh, "CG", interp_p)
for ti in range(self.n_tx):
d_interp = df.interpolate(M.PP.E_dc[ti], V_target)
self.export_interpolation_data(d_interp, V_target,
ti, export_pvd, interp_mesh)
else:
pass
df.MPI.barrier(df.MPI.comm_world)
[docs]
def export_interpolation_data(self, field, V_serial, ti,
export_pvd, interp_mesh):
"""
Export interpolated results as complex numpy arrays and, if specified,
as *.pvd* files to be viewed in Paraview.
Required arguments
------------------
- field, type dolfin Function
dolfin function containing the field values
- V_serial, type dolin FunctionSpace
non-distributed solution function space
- ti, type int
number of the Tx to specify output file name
- export_pvd, type bool
aside from numpy array, save interpolated fields also as *.pvd*
file for Paraview
- interp_mesh, type str
interpolation mesh name
"""
if self.n_tx == 1:
f_name = 'E_dc_' + interp_mesh
else:
f_name = 'tx_' + str(ti) + '_E_dc_' + interp_mesh
field_npy = np.hstack((field.vector().get_local(
).reshape(-1, 3), V_serial.tabulate_dof_coordinates(
).reshape(-1, 3)[0::3, :]))
np.save(self.interp_dir + f_name + '.npy', field_npy)
if export_pvd:
df.File(self.MP.mpi_cs, self.interp_dir + f_name + '.pvd') << field
[docs]
class DC(ApproachBaseFD):
"""
Implementation of common potential approach for caluclating
direct current (DC) electric fields.
Methods
-------
- build_var_form()
build variational formulation with FEniCS
- lhs_form()
build left-hand side of variational formulation
- rhs_form_total()
build right-hand side of variational formulation for secondary
potential approach
- rhs_form_secondary()
build right-hand side of variational formulation for total potential
approach
- calc_primary_potential()
calculate primary potential for 1D backround resistivity distribution
analytically
- calc_dc_field()
customoized method for solving the DC FE problem
Description of DC parameters
----------------------------
Most parameters are equivalent to the parameters for the frequency-domain
approaches in the **ApproachBaseFD**. Please refer to the corresponding
documentaion. The only **DC** class relevant keyword argument to be
updated with the *build_var_form()* method is listed below.
- secondary_potential = False, type bool
use either total or secondary potential formulation
"""
def __init__(self, FS, MP):
"""
Initialize **DC** class, which also inherits attributes and methods
from the base class **ApproachBaseFD**.
Required arguments
------------------
- FS, type class
FunctionSpaces instance
- MP, type class
ModelParameters instance
"""
# inherit default attributes and common methods from ApproachBaseFD
super(self.__class__, self).__init__()
# initialize trial and test functions and pipe FE parameters
self.v = df.TestFunction(FS.S)
self.u = df.TrialFunction(FS.S)
self.FS = FS
self.MP = MP
self.p = self.FS.p
self.secondary_potential = False
# mute print of the following FE class attributes
self.mute = ['v', 'u', 'DOM',
'dx', 'FS', 'MP', 'mute', 'tx']
[docs]
def calc_primary_potential(self):
"""
Calculate primary potential of static current analytically.
"""
dof_coord = self.FS.S.tabulate_dof_coordinates().reshape(-1, 3)
self.U_0 = [df.Function(self.FS.S) for ti in range(self.MP.n_tx)]
for ti in range(self.MP.n_tx):
r1 = np.linalg.norm(dof_coord - self.MP.tx[ti][0], axis=1)
r2 = np.linalg.norm(dof_coord - self.MP.tx[ti][-1], axis=1)
r1[r1 <= self.a_tol] = self.a_tol
r2[r2 <= self.a_tol] = self.a_tol
tmp = self.MP.currents[ti] / (self.MP.sigma_ground[0] *
2. * df.pi * r1) -\
self.MP.currents[ti] / (self.MP.sigma_ground[0] *
2. * df.pi * r2)
self.U_0[ti].vector()[:] = tmp
self.U_0[ti].vector().apply('insert')
[docs]
def calc_dc_field(self, PP, profiler, secondary_potential=None,
delete_factorization=True):
"""
Assemble the system matrix and right-hand sides and solve the
resulting linear system of equations for either the total or
secondary potential approach. The electric DC field is also derived
after the solution of the potential FE problem.
Keyword arguments
-----------------
- secondary_potential = False, type bool
use either total or secondary potential formulation, can be updated
during calling this method is required, but should be already
specified before
"""
from custEM.core import Solver
from custEM.fem import TotalFieldAssembler
self.Solver = Solver(self.FS, self, mumps_debug=False)
if secondary_potential is not None:
self.secondary_potential = secondary_potential
if self.secondary_potential:
AA, bb = df.PETScMatrix(), df.PETScVector()
b = []
if type(self.R) is not list:
self.R = [self.R]
for ti in range(self.n_tx):
if self.bc is None:
A, b_tmp = df.assemble_system(self.L[0], self.R[ti],
A_tensor=AA, b_tensor=bb)
elif self.bc in ['ZeroDirichlet', 'ZD']:
self.FS.add_dirichlet_bc()
A, b_tmp = df.assemble_system(self.L[0], self.R[ti],
self.FS.bc,
A_tensor=AA, b_tensor=bb)
b.append(b_tmp)
lp(self.MP.logger, 20,
'Solution of main FE system(s) of equations:')
self.U = self.Solver.solve_system_mumps(A, b, 'scalar',
sym=True)
del self.Solver.solver
else:
Assembler = TotalFieldAssembler(self, bc=self.bc)
Assembler.assemble()
if profiler:
export_config_file(PP)
lp(self.MP.logger, 20,
'Solution of main FE system(s) of equations:')
self.U = self.Solver.solve_system_mumps(
Assembler.A, Assembler.b, 'scalar', sym=True,
tx_selection=self.MP.grounding)
if delete_factorization:
del self.Solver.solver
if self.secondary_potential:
for ti in range(self.MP.n_tx):
self.U[ti].vector()[:] = (U[ti].vector().get_local() +
self.U_0[ti].vector().get_local())
[docs]
def check_sigma_vals(MP):
"""
Evaulate, if anomalies are included or halfspace or fullspace model is
chosen.
Required arguments
------------------
- MP, type class
ModelParameters instance
"""
try:
if not all(type(x[0]) == float for x in MP.delta_sigma[2:]):
return(True)
else:
if len(MP.delta_sigma) is 2 or (
all(x[0] < 1e-8 for x in MP.delta_sigma[2:]) and
all(x[0] > -1e-8 for x in MP.delta_sigma[2:])):
return(False)
else:
return(True)
except Exception as e:
if not all(type(x) == list for x in MP.delta_sigma[2:]):
return(True)
else:
if len(MP.delta_sigma) is 2 or (
all(x[0] < 1e-8 for x in MP.delta_sigma[2:]) and
all(x[0] > -1e-8 for x in MP.delta_sigma[2:])):
return(False)
else:
return(True)
[docs]
def add_sigma_vals_to_list(vals, inverse=False):
"""
Rearrange conductivity values to appropriate list format, e.g., convert
isotropic values given as float to list of length 1.
Required arguments
------------------
- vals, type list (of lists)
conductivity values specified in **MP** instance
Keyword arguments
-----------------
- inverse = False, type bool
return either sigma or 1/sigma (inv(sigma))
"""
s_list = []
for val in vals:
if type(val) is list and type(val[0]) is float:
if inverse:
s_list.append(1. / df.Constant((val[0])))
else:
s_list.append(df.Constant((val[0])))
else:
if inverse:
s_list.append(df.inv(val))
else:
s_list.append(val)
return(s_list)
[docs]
def check_source(s_type, tx, pf_flag='E', approach=None):
"""
Evaulate, which source type is chosen.
Required arguments
------------------
- s_type, type int
source type (see **ApproachBaseFD** documentation)
- tx, type list of arrays
list of transmitter coordinates for each Tx
Keywprd arguments
-----------------
- pf_flag = 'E', type str
either *'E'* or *'H'*
"""
if approach is not None:
if 'mt' in approach.lower():
if s_type not in ['auto', 'mt']:
lp(self.MP.logger, 50,
'Error! *s_type* must be *auto* (default setting) for MT '
'modeling. Aborting ...')
raise SystemExit
if pf_flag not in ['E', 'H']:
lp(self.MP.logger, 50, 'Error! "pf_EH_flag must be either "E" or "H"!')
raise SystemExit
if s_type in ['auto', 'hed', 'vmd', 'line', 'loop_c', 'loop_r', 'mt']:
return
elif s_type in ['path', 'vmds', 'heds']:
if tx is None:
lp(self.MP.logger, 50,
'Error! No coordinates ("tx" variable) for "s_type" '
'--> ' + s_type + ' defined!')
raise SystemExit
else:
lp(self.MP.logger, 50, 'Error! Unknown "s_type" --> ' + s_type + '!')
raise SystemExit