# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""
import dolfin as df
import os
import numpy as np
import json
from custEM.fem import primary_fields as pf
from custEM.misc import logger_print as lp
from custEM.meshgen import vtk2xml2h5
[docs]
class FunctionSpaces:
"""
FunctionSpaces class called from MOD instance. Used for initializing the
required function spaces for the FE assembly, boundary condtions, and
primary fields for all secondary field formulations.
Methods
-------
- init_mesh()
import mesh and domain information from *.h5* file (default) in
parallel or from *.xml* file in serial
- add_primary_fields()
initialize PrimaryField instance, calculate or import primary fields
- add_dirichlet_bc()
initalize dirichlet boundary conditions on the model boundary
- build_dg_func()
build element-wise scalar parameter function (isotropic parameters)
- build_dg_tensor_func()
build element-wise tensor parameter function (anisotropic parameters)
- build_complex_sigma_func()
build complex-valued sigma functions (only isotropic) for modeling
induced polarization effects
Description of initialization order and FunctionSpaces
------------------------------------------------------
1. import mesh (see *init_mesh* documentation)
2. build FEniCS FunctionSpaces depending on the mesh, polynomial
order **p** and the chosen approach
- S = Nodal (LaGrange) FunctionSpace
order p and dimension 1
- S_dg = element-wise scalar discontinuous FunctionSpace
order 0, 1 value per cell
- S_dgt = element-wise tensor-valued discontinuous FunctionSpace
order 0, 9 values per cell
- V = Nedelec Space
order p and dimension 3
- V_cg = Nodal (LaGrange) VectorFunctionSpace
order p and dimension 3
- W = mixed Nedelec Space
order p and dimension 6 for coupled real and imaginary parts
- W_cg = mixed (LaGrange) VectorFunctionSpace
order p and dimension 6 for coupled real and imaginary parts
- M = mixed solution FunctionSpace
order p and dimension 6 or 8 (dependent on appraoch)
3. initalize empty solution Function **U** on mixed solution space
4. if a secondary field formulation is used, a PrimaryField
instance **PF** is added to the FunctionSpace class later on by
calling the **add_Primary_Field()** method automatically during the
**FE.build_var_form()** call
"""
def __init__(self, MP, p, test_mode, self_mode, ned_kind, dg_interp,
load_existing, reuse_fs=None, overwrite_mesh=False,
mesh_only=False):
"""
Initializes FunctionsSpaces dependent on the chosen approach.
Required arguments
------------------
- see documentation of **MOD** class
"""
if reuse_fs is not None:
lp(MP.logger, 20,
'... reusing mesh and domain functions from specified FS '
'...', pre_dash=False)
self.mesh = reuse_fs.mesh
self.DOM = reuse_fs.DOM
MP.n_domains = self.DOM.n_domains
else:
self.init_mesh(test_mode, MP, self_mode, overwrite_mesh)
if mesh_only:
return
self.approach = MP.approach
self.anom_flag = True
self.p = p
self.test_mode = test_mode
self.solution_time = 0.
self.bc_geometry = None
if ned_kind == '1st':
self.V = df.FunctionSpace(self.mesh, 'N1curl', p)
elif ned_kind == '2nd':
lp(MP.logger, 20,
' - using second kind Nedelec element type - ')
self.V = df.FunctionSpace(self.mesh, 'N2curl', p)
elif ned_kind == 'div': # Not in usage right now, just for tests
lp(MP.logger, 20,
' - using div conforming Nedelec elements - ')
self.V = df.FunctionSpace(self.mesh, 'N1div', p)
else:
lp(MP.logger, 50,
'Error! Nedelec kind must be "1st" or "2nd" '
'(see FEniCS book for definitions). Aborting ...')
raise SystemExit
self.V_cg = df.VectorFunctionSpace(self.mesh, 'CG', p)
if dg_interp:
self.V_dg = df.VectorFunctionSpace(self.mesh, 'DG', p)
if not load_existing:
self.S = df.FunctionSpace(self.mesh, 'CG', p)
self.S_dg = df.FunctionSpace(self.mesh, 'DG', 0)
if ned_kind == '1st':
ele = df.FiniteElement("N1curl", self.mesh.ufl_cell(), p)
if ned_kind == '2nd':
ele = df.FiniteElement("N2curl", self.mesh.ufl_cell(), p)
if ned_kind == 'div':
ele = df.FiniteElement("N1div", self.mesh.ufl_cell(), p)
self.W = df.FunctionSpace(self.mesh, df.MixedElement([ele, ele]))
self.W1 = df.FunctionSpace(self.mesh,"N1curl",p)
if 'E' in self.approach or 'mt' in self.approach.lower():
self.M = self.W
self.M1= self.W1
elif 'H' in self.approach:
self.M = self.W
self.M1= self.W1
elif 'Am' in self.approach or 'Fm' in self.approach:
ele = df.FiniteElement("N1curl", self.mesh.ufl_cell(), p)
sce = df.FiniteElement("CG", self.mesh.ufl_cell(), p)
self.M = df.FunctionSpace(
self.mesh, df.MixedElement([ele, ele, sce, sce]))
elif 'An' in self.approach or 'Fn' in self.approach:
sce = df.FiniteElement("CG", self.mesh.ufl_cell(), p)
self.M = df.FunctionSpace(
self.mesh, df.MixedElement([sce, sce, sce, sce,
sce, sce, sce, sce]))
if 'dc' in self.approach.lower():
self.M = self.S
self.U = df.Function(self.M)
[docs]
def init_mesh(self, test_mode, MP, self_mode, overwrite_mesh):
"""
Import the requested mesh and identify domain markers, if available.
Required arguments
------------------
- see documentation of **MOD** class
"""
path = os.getcwd()
if not test_mode and MP.mpi_rank == 0:
abort = False
if not os.path.isfile(MP.m_dir + '/_' + MP.file_format + '/' +
MP.mesh_name + '.' + MP.file_format):
lp(MP.logger, 20,
'... mesh not available in "' + MP.file_format +
'" format yet, conversion started ...', barrier=False)
vtk2xml2h5(MP.mesh_name, MP.m_dir, MP.post_rotate,
MP.overwrite_markers)
lp(MP.logger, 20,
'... conversion finished ...', pre_dash=False,
post_dash=True, barrier=False)
elif os.path.isfile(MP.m_dir + '/_vtk/' + MP.mesh_name + '.vtk'):
if os.path.getmtime(MP.m_dir + '/_' + MP.file_format + '/' +
MP.mesh_name + '.' + MP.file_format) < \
os.path.getmtime(MP.m_dir + '/_vtk/' +
MP.mesh_name + '.vtk'):
if overwrite_mesh:
lp(MP.logger, 30,
'Warning! Found newer TetGen output file '
'than already converted *h5/xml* file\nfor the '
'mesh with the name: "' + MP.mesh_name + '".\n'
'Since *overwrite_mesh* was set to **True**, '
'automated conversion and\noverwriting the prior '
'mesh is enabled. '
'Continuing ...', barrier=False)
vtk2xml2h5(MP.mesh_name, MP.m_dir, MP.post_rotate,
MP.overwrite_markers)
lp(MP.logger, 20,
'... conversion finished ...', pre_dash=False,
post_dash=True, barrier=False)
else:
lp(MP.logger, 50,
'Error! Found newer TetGen output file '
'than already converted *h5/xml* file\nfor the '
'mesh with the name: "' + MP.mesh_name + '".\n'
'Consider one of the following options:\n'
'a) manually removing the existing *h5/xml* mesh,\n'
'b) re-meshing and re-running with a new mesh '
'name,\n'
'c) enabling automated re-conversion by setting '
'the\n *overwrite_mesh* flag (MOD class) to '
'**True**. Aborting ...', barrier=False)
abort = True
else:
abort = None
abort = MP.mpi_cw.bcast(abort, root=0)
if abort:
raise SystemExit
os.chdir(MP.m_dir + '/_' + MP.file_format + '/')
if test_mode:
lp(MP.logger, 30,
'Warning! Test mode activated. Continuing ...\n'
' - using 48000 cell +- 10 km dimensions UnitCubeMesh - ')
self.mesh = df.UnitCubeMesh(10, 10, 10)
self.mesh.coordinates()[:] = (
self.mesh.coordinates()[:] - 0.5) * 200.
n_domains = 1
self.DOM = Domains(self.mesh, n_domains)
if MP.file_format == 'h5':
f = df.HDF5File(MP.mpi_cw, 'UnitCubeMesh.h5', 'w')
f.write(self.mesh, "/mesh")
domains = df.MeshFunction('size_t', self.mesh,
self.mesh.topology().dim())
f.write(domains, "/domains")
f.close()
elif MP.file_format == 'xml':
lp(MP.logger, 30,
'Warning! TEST_MODE in combination with "xml" file format '
'shows some bugs for interpolation. Continuing ...')
os.chdir(path)
else:
if self_mode:
if MP.file_format == 'h5':
self.mesh = df.Mesh(MP.mpi_cs)
f = df.HDF5File(MP.mpi_cs, MP.mesh_name +
'.' + MP.file_format, 'r')
f.read(self.mesh, '/mesh', False)
domain_func = df.MeshFunction('size_t', self.mesh,
self.mesh.topology().dim())
f.read(domain_func, "/domains")
domain_vals = np.unique(domain_func.array())
f.close()
elif MP.file_format == 'xml':
self.mesh = df.Mesh(MP.mpi_cs,
MP.mesh_name + '.' + MP.file_format)
domain_func = df.MeshFunction(
'size_t', self.mesh, MP.mesh_name +
'_domains.' + MP.file_format)
domain_vals = np.unique(domain_func.array())
else:
lp(MP.logger, 50,
'Error! Specify correct mesh format *h5* or *xml*. '
'Aborting ...')
raise SystemExit
else:
if MP.file_format == 'h5':
self.mesh = df.Mesh()
f = df.HDF5File(MP.mpi_cw, MP.mesh_name +
'.' + MP.file_format, 'r')
f.read(self.mesh, '/mesh', False)
domain_func = df.MeshFunction('size_t', self.mesh,
self.mesh.topology().dim())
f.read(domain_func, "/domains")
dom_markers = MP.mpi_cw.gather(domain_func.array()[:],
root=0)
f.close()
if MP.mpi_cw.Get_rank() == 0:
dm = []
for j in range(MP.mpi_cw.Get_size()):
dm.extend(dom_markers[j])
doms = np.unique(np.array(dm))
else:
doms = None
domain_vals = MP.mpi_cw.bcast(doms, root=0)
elif MP.file_format == 'xml':
self.mesh = df.Mesh(MP.mesh_name + '.' + MP.file_format)
domain_func = df.MeshFunction(
'size_t', self.mesh, MP.mesh_name +
'_domains.' + MP.file_format)
mesh2 = df.Mesh(MP.mpi_cs,
MP.mesh_name + '.' + MP.file_format)
domain_func2 = df.MeshFunction("size_t", mesh2,
mesh2.topology().dim(),
mesh2.domains())
domain_vals = np.unique(domain_func2.array())
else:
lp(MP.logger, 50,
'Error! Specify correct mesh format *h5* or *xml*. '
'Aborting ...')
raise SystemExit
if self_mode:
lp(MP.logger, 20,
'... detected ' + str(len(domain_vals)) + ' domains ...',
pre_dash=False, barrier=False)
else:
lp(MP.logger, 20,
'... detected ' + str(len(domain_vals)) + ' domains ...',
pre_dash=False)
self.DOM = Domains(self.mesh, len(domain_vals),
domain_func, domain_vals)
MP.n_domains = self.DOM.n_domains
os.chdir(path)
[docs]
def add_primary_fields(self, FE, MP, calc=True, **kwargs):
"""
Add a PrimaryField instance to the FunctionSpace instance, called by
**FE** if the 'Secondary' field formulation is used.
Required arguments
------------------
- FE, type class
FE approach instance (e.g., **E_vector**)
- MP, type class
ModelParameters instance
Keyword arguments
-----------------
- calc = True, type bool
enable primary fields calculation or **PF** class initialization
only
- further kwargs
see full list in the description of the **PrimaryField** class
"""
self.PF = pf.PrimaryField(self, FE, MP, **kwargs)
if calc:
self.PF.load_primary_fields()
[docs]
def add_dirichlet_bc(self, MP=None, dof_re=None, dof_im=None, bc_H=False):
"""
Initalize Dichichlet BoundaryConditions on the solution space, only for
internal usage. The keyword arguments are set automatically to handle
different cases.
Keyword arguments
-----------------
- MP = None, type class
only used to apply inhomogeneous Dirichlet conditions in A-V
approach, which requires E-field conversion to A depending on
*MP.omega*
- dof_re = None, type list
list of boundary dof of "real" part of FE system to apply
inhomogeneous Dirichlet conditions
- dof_im = None, type list
list of boundary dof of "imaginary" part of FE system to apply
inhomogeneous Dirichlet conditions
- bc_H = False, type bool
flag for testing inhomogeneous Dirichlet conditions for H-field
conversion, provides no benefits so far but might be re-considered
for other purposes in future
"""
class B(df.SubDomain):
def inside(self, x, on_boundary):
return on_boundary
if MP is not None and dof_re is not None and dof_im is not None:
self.dof_re = dof_re
self.dof_im = dof_im
if MP is not None:
self.PF.load_primary_fields(boundary_fields=True)
if bc_H:
pass
# currently not using ID conditions for H field conversion
# real_B = df.interpolate(self.PF.H_0_r_cg_B[0], self.V)
# imag_B = df.interpolate(self.PF.H_0_i_cg_B[0], self.V)
else:
real_B = df.interpolate(self.PF.E_0_r_cg_B[0], self.V)
imag_B = df.interpolate(self.PF.E_0_i_cg_B[0], self.V)
non_zero = df.Function(self.M)
non_zero.vector()[self.dof_re] = real_B.vector().get_local()
non_zero.vector()[self.dof_im] = imag_B.vector().get_local()
if 'A' in self.approach:
non_zero.vector()[self.dof_re] = (1./MP.omega) *\
imag_B.vector()[:]
non_zero.vector()[self.dof_im] = (-1./MP.omega) *\
real_B.vector()[:]
self.bc = df.DirichletBC(self.M, non_zero, B())
self.bc_type = 'ID'
else:
if 'E' in self.approach or 'H' in self.approach:
zero = df.Constant(("0.0", "0.0", "0.0", "0.0", "0.0", "0.0"))
elif 'dc' in self.approach.lower():
zero = df.Constant(("0.0"))
else:
zero = df.Constant(("0.0", "0.0", "0.0", "0.0",
"0.0", "0.0", "0.0", "0.0"))
self.bc = df.DirichletBC(self.M, zero, B())
self.bc_type = 'ZD'
[docs]
def add_inner_dirichlet_bc(self, logger):
"""
Initalize Dichichlet BoundaryConditions for inner edges of the mesh,
used to specify "perfect conductor" infrastructure.
Required arguments
----------_-------
- inner_dof, type list
list to specify the correct dof corresponding to the
infrastructure implemented as edges in the mesh.
"""
if self.bc_geometry is None:
lp(logger, 50,
'Error! Need geometrical expression for defining locations '
'of infrstructure. Aborting ...')
raise SystemExit
# class B(df.SubDomain):
# def inside(self, x, on_boundary):
# if (x[2] > self.c1[2] and x[2] < self.c0[2] and
# x[1] > self.c0[1] and x[1] < self.c1[1] and
# x[0] > self.c0[0] and x[0] < self.c1[0]):
# print(x)
# return(False)
# # return(x[2] > self.c1[2] and x[2] < self.c0[2] and
# # x[1] > self.c0[1] and x[1] < self.c1[1] and
# # x[0] > self.c0[0] and x[0] < self.c1[0])
# self.bc_geometry = B()
# self.bc_geometry.c0 = [-3325.10911528, 3565.29922087, 267.46104002]
# self.bc_geometry.c1 = [-3312.04260977, 3568.65196992, 267.43269787]
if 'E' in self.approach:
zero = df.Constant(("0.0", "0.0", "0.0", "0.0", "0.0", "0.0"))
else:
lp(logger, 50,
'Error! Infrastructure Conditions only implemented '
'for total E-field approach so far. Aborting ...')
raise SystemExit
self.bc = df.DirichletBC(self.M, zero,
self.bc_geometry,
'pointwise')
self.bc_type = 'IF'
[docs]
def build_dg_func(self, vals, inverse=False):
"""
Assign isotropic conductivity values to discontinous function spaces.
Required arguments
------------------
- vals, type list (of lists)
conductivity values specified in **MP** instance
Keyword arguments
-----------------
- inverse = False, type bool
return either sigma or 1/sigma (=inv(sigma))
"""
dg_func = df.Function(self.S_dg)
np_vals = np.array(vals)
if inverse:
dg_func.vector().set_local(
1./np_vals[self.DOM.domain_func.array()])
else:
dg_func.vector().set_local(np_vals[
self.DOM.domain_func.array()])
dg_func.vector().apply('insert')
return(dg_func)
[docs]
def build_dg_tensor_func(self, vals, inverse=False, logger=None):
"""
Assign anisotropic conductivity values to discontinous tensor function
spaces.
Required arguments
------------------
- vals, type list (of lists)
conductivity values specified in **MP** instance
Keyword arguments
-----------------
- inverse = False, type bool
return either sigma or inv(sigma)
"""
dg_func = df.Function(self.S_dgt)
domain_tensor_func = np.repeat(self.DOM.domain_func.array(), 9) * 9
domain_tensor_func[1::9] += 1
domain_tensor_func[2::9] += 2
domain_tensor_func[3::9] += 3
domain_tensor_func[4::9] += 4
domain_tensor_func[5::9] += 5
domain_tensor_func[6::9] += 6
domain_tensor_func[7::9] += 7
domain_tensor_func[8::9] += 8
rearranged = []
for val in vals:
if type(val) is float or type(val) is int or len(val) == 1:
if len(val) == 1:
isoval = val[0]
else:
isoval = val
if inverse:
rearranged.extend(np.array([1. / isoval, 0., 0.,
0., 1. / isoval, 0.,
0., 0., 1. / isoval]))
else:
rearranged.extend(np.array([isoval, 0., 0.,
0., isoval, 0.,
0., 0., isoval]))
elif len(val) == 3:
if inverse:
rearranged.extend(np.array([1. / val[0], 0., 0.,
0., 1. / val[1], 0.,
0., 0., 1. / val[2]]))
else:
rearranged.extend(np.array([val[0], 0., 0.,
0., val[1], 0.,
0., 0., val[2]]))
elif len(val) == 6:
if inverse:
# according to definitions to invert a symmetric 3x3 matrix
rearranged.extend(np.array([val[3]*val[5] - val[4]*val[4],
val[2]*val[4] - val[1]*val[5],
val[1]*val[4] - val[2]*val[3],
val[2]*val[4] - val[1]*val[5],
val[0]*val[5] - val[2]*val[2],
val[1]*val[2] - val[0]*val[4],
val[1]*val[4] - val[2]*val[3],
val[1]*val[2] - val[0]*val[4],
val[0]*val[3] - val[1]*val[1]]) /
(val[0] * (val[3]*val[5] - val[4]*val[4]) +
val[3] * (val[0]*val[5] - val[2]*val[2]) +
val[5] * (val[0]*val[3] - val[1]*val[1])))
else:
rearranged.extend(np.array([val[0], val[1], val[2],
val[1], val[3], val[4],
val[2], val[4], val[5]]))
else:
lp(logger, 50,
'Error! Anisotropy must be either specified by 3 main '
'diagonal values or 6 upper triangle values of the '
'tensor. Aborting ...')
raise SystemExit
dg_func.vector().set_local(np.array(rearranged)[domain_tensor_func])
dg_func.vector().apply('insert')
return(dg_func)
[docs]
def build_complex_sigma_func(self, omega, sigma, tau, m, c):
"""
Calculate and assign complex-valued conductivities for simulating
induced polarization effects with a Cole-Cole model discontinous function
spaces. This only works for isotropic conductivities for now.
Required arguments
------------------
- omega, type float
angular frequency
- sigma, type list
list of conductivity values
- tau, type list
list of *ip_tau* values, see **MP** class documentation
- m, type list
list of *ip_m* values, see **MP** class documentation
- c, type list
list of *ip_c* values, see **MP** class documentation
"""
sigma_real, sigma_imag = df.Function(self.S_dg), df.Function(self.S_dg)
if type(tau) is float or type(tau) is int:
tau = [tau for j in range(self.DOM.n_domains)]
if type(c) is float or type(c) is int:
c = [c for j in range(self.DOM.n_domains)]
sigma_ip_vals = np.array([sig[0] for sig in sigma]) / (
1. - np.array(m) * (
1. - 1. / (1. +
(1j * omega * np.array(tau))**np.array(c))))
sigma_real.vector().set_local(sigma_ip_vals[
self.DOM.domain_func.array()].real)
sigma_imag.vector().set_local(sigma_ip_vals[
self.DOM.domain_func.array()].imag)
sigma_real.vector().apply('insert')
sigma_imag.vector().apply('insert')
return(sigma_real, sigma_imag)
[docs]
class ModelParameters:
"""
ModelParameter class called internally from MOD instance. This class
contains, holds, and updates all physical and modeling parameters during
the FE simulation. All model parameters can be updated with the
**update_model_parameters()** method. A description of all available
parameters of the **ModelParameters** class is listed below.
Methods
-------
- update_model_parameters()
update any supported physical or modeling parameter
- calc_delta_sigma()
calculate *delta_sigma* as *sigma_ground* - *sigma_0* and take reshape
lists to account for any anisotropic value descriptions
- build_dg_parameter_functions()
build discontinuous Parameter functions for all physical parameters,
single-valued functions for isotropic values and
tensor-valued functions for anisotropic values
- print_model_parameters()
print updated list of model parameters
- load_mesh_parameteres()
update mesh-related model parameters from 'mesh-parameters' *json* file
- check_parameter_dx_conformity()
check conformity of number of domains and number of physical parameter
values
Description of physical parameters
----------------------------------
- f = 10., type float
frequency (Hz)
- omega = 1e1, type float
angular frequency, omega = 2 * pi * f
- frequencies = [], type list
specify multiple frequencies (Hz)
- omegas = [], type list
specify multiple angular frequencies, omegas = 2 * pi * frequencies
- n_freqs = 1, type int
number of frequencies, calculated automatically
- current = 1., type float
source current I (A) in the current carrying wire, variable *J* is
deprecated but still used internally, *current* is the new alias for it
- currents = [], type 1D or 2D array or list of lists
unique source currents I (A) for multiple Tx or multiple frequencies;
In time-domain and for DC modeling, the first dimension of the current
data array corresponds to n_tx; in frequency-domain modeling,
the first dimension of the current data array corresponds to n_freqs
and the second dimension of the current data array correponds to n_tx;
if not specifed, current = 1. is used for all frequencies and Tx
- m_moment = 1., type float
magnetic dipole moment for calculating background fields of a vmd
source in a fullspace analytically
- n_tx = 1, type int
number of transmitters (right-hand sides of the system of linear
equations), calculated automatically
- sigma_air = 1e-8, type float or list
homogeneous air-space conductivity, for modeling a fullspace,
set sigma_air to the same value as sigma_ground
- sigma_ground = 1e-2, type list of float or list of list of floats
subsurface conductivities corresponding to all domain markers
(except 0 which corresponds to *sigma_air*);
anisotropy can be specified by a list of three values (main diagonal)
or a list of six values (upper triangle of 3x3 conductivity tensor)
for each domain; a list of one value is handled identical to providing
this single value as float instead of embedding it into a list
- sigma_0 = [], type list of float or list of list of floats
background condcutivities for all *sigma_ground*, only required
for secondary-field/potential formulations; arbitrary general
anisotropic values are supported for *sigma_ground* and *sigma_0*;
*delta_sigma* is evaluated internally
- sigma = [1e-2, 1e-8], type list of float
list of all conductivities, the first two values in sigma are always
**sigma_ground** (first layer) and **sigma_air**; if these values are
updated later on, sigma will be updated internally; further values are
related to subsurface layers or anomaly-domains in the model
(see **sigma_anom**)
- delta_sigma = [0., 0.], type list of float
conductivity differences, needed for secondary field formulations,
updated internally
- anomaly_layer_markers = [], type list of int
define the reference subsurface layer (from top to depth) for each
anomaly to calculate **delta_sigma**. For instance, if two anomalies
are located in the first and third layer of a three-layered earth,
the correct choice would be *[1, 3]*; used for secondary field
approaches only, this functionality will be updated by implementing an
automated mapping in future
- mu = mu_0 = 4 * pi * 1e-7, type float
magentic permeability (mu_r * mu_0), default is vacuum permeability
- mu_r = 1., type float or list of floats
relative magnetic permeability
- eps = eps_0 = 8.85 * 1e-12, type float
electric permittivity (eps_r * eps_0), default is vacuum permittivity
- eps_r = 1., type float or list of floats
values of relative electric permittivity
- ip_tau = 1., type float or list of floats
*tau* values of Cole-Cole model for modeling induced polarization
effects
- ip_c = 1., type float or list of floats
*c* values of Cole-Cole model for modeling induced polarization
effects
- ip_m = 0., type float or list of floats
*m* values of Cole-Cole model for modeling induced polarization
effects
- system_it = False, type bool
used to change system for iterative solver. Default is false.
Set true if iterative solver is used
Description of mesh parameters
------------------------------
- n_layers = 1, type int
number of subsurface layers, automatically imported from
mesh-parameter file
- layer_depths = [], type list of float
depths of subsurface-layer interfaces, automatically imported from
mesh-parameter file
- layer_thicknesses = [], type list of float
subsurface layer thicknesses, calculated automatically from
**layer_depths** attribute
- topo = None, type str
definition of DEM- or synthetic topography, imported automatically from
mesh-parameter file.
"""
def __init__(self, logger, str_args, test_mode,solvingapproach):
"""
Initializes physical and modeling parameters.
Required arguments
------------------
- str_args, type list of str
list of strings containing internal name and parameter conventions,
see **MOD** class for the documentation
- dc, mt, td, fd, type bool
flags for specifying the kind of electromagnetic modeling
- test_mode, type bool
specify if test_mode should be applied (default = **False**)
Keyword arguments
-----------------
- tensor_flag = False, type bool
only used for internal usage for handling anisotropy
"""
self.logger = logger
self.mod_name = str_args[0]
self.mesh_name = str_args[1]
self.approach = str_args[2]
self.file_format = str_args[3]
self.m_dir = str_args[4]
self.r_dir = str_args[5]
self.para_dir = str_args[6]
self.out_dir = str_args[7]
self.mute = str_args[8]
self.mpi_cw = str_args[9]
self.mpi_cs = str_args[10]
self.mpi_rank = str_args[11]
self.dc = str_args[12]
self.mt = str_args[13]
self.td = str_args[14]
self.fd = str_args[15]
self.mumps_debug = str_args[16]
self.serial_ordering = str_args[17]
self.self_mode = str_args[18]
self.test_mode = test_mode
self.solvingapproach=solvingapproach
self.omega = None
self.f = 10.
self.frequencies = []
self.omegas = []
self.n_freqs = 1
self.current = 1. # I (A)
self.currents = None
self.m_moment = 1.
self.n_tx = 1
self.n_rx = 0
self.mu_0 = 4.0 * np.pi * 1e-7
self.mu = 4.0 * np.pi * 1e-7
self.mu_r = None
self.eps_0 = 8.85 * 1e-12
self.eps = 8.85 * 1e-12
self.eps_r = None
self.ip_tau = 1.
self.ip_c = 1.
self.ip_m = 0.
self.sigma_ground = [1e-2]
self.sigma_air = [1e-8]
self.sigma = [self.sigma_air[0]]
self.sigma_0 = [self.sigma_air[0]]
self.delta_sigma = [[0.]]
if solvingapproach=='direct':
self.system_it=False
elif solvingapproach=='iterative':
self.system_it=True
else:
lp(self.logger, 50, "Unknown flag for solvingapproach[0]. Abort simulation!")
raise SystemExit
self.n_layers = 1
self.layer_depths = []
self.layer_thicknesses = []
self.topo = None
self.n_domains = None
self.sigma_tensor_flag = False
self.eps_tensor_flag = False
self.mu_tensor_flag = False
self.tensor_flag = False
if not test_mode:
self.load_mesh_parameters()
layer_depths = [0.] + self.layer_depths
self.layer_thicknesses = np.abs(np.diff(
np.append(np.array(self.layer_thicknesses),
np.array(layer_depths)))).tolist()
[docs]
def update_model_parameters(self, **mod_kwargs):
"""
Updates all non-default physical parameters if called. A list of
available parameters is given in the documentation of the
**ModelingParameters** class.
"""
for key in mod_kwargs:
if key not in self.__dict__:
lp(self.logger, 50,
'Error! Unknown model parameter set', key)
lp(self.logger, 50,
"... let's stop before something unexpected happens ...",
pre_dash=False)
raise SystemExit
self.__dict__.update(mod_kwargs)
if self.td and ('f' in mod_kwargs or 'frequencies' in mod_kwargs or
'omega' in mod_kwargs or 'omegas' in mod_kwargs):
lp(self.logger, 50,
'Error! To avoid ambiguities, it is not allowed to spefiy any '
'kind of frequency\nusing the time-domain approaches.\n'
'Custom frequencies for the FT (fourier-transform-based) '
'method can be specified\nduring calling the '
'*build_var_form()* method. Aborting ...', pre_dash=False)
raise SystemExit
# handle the different options for specifying the frequencies
if self.omega is None:
if (self.fd or self.mt) and 'f' not in mod_kwargs and \
'frequencies' not in mod_kwargs and 'omegas' not in \
mod_kwargs:
lp(self.logger, 50,
'Error! Specify the frequency(ies) by defining either *'
'f*, *omega*, *frequencies* or *omegas*. Aborting ...')
raise SystemExit
self.omega = self.f * 2. * np.pi
else:
if 'f' in mod_kwargs:
lp(self.logger, 30,
'Warning! Note that *f* is ignored if *omega* is also '
'specified. It is recommended to specify either *f* or'
' *omega* consistently. Continuing ...')
self.f = self.omega / (2. * np.pi)
if 'frequencies' not in mod_kwargs and 'omegas' not in mod_kwargs:
self.frequencies = [self.f]
self.omegas = [self.omega]
self.n_freqs = 1
if 'f' not in mod_kwargs and 'omega' not in mod_kwargs:
if 'frequencies' in mod_kwargs:
self.omegas = (np.array(self.frequencies) *
2. * np.pi).tolist()
elif 'omegas' in mod_kwargs:
self.frequencies = (np.array(self.omegas) /
(2. * np.pi)).tolist()
self.n_freqs = len(self.omegas)
if self.n_freqs > 1 and self.approach[0] in ['A', 'F']:
lp(self.logger, 50,
'Error! Internal multiple-frequency modeling only '
'supported for the electric and magentic field approaches. '
'Either calculate each frequency via looping over several '
'**MOD** instances or contact the developers for support. '
'Aborting ...')
raise SystemExit
if self.fd or self.mt:
lp(self.logger, 10, '... writing frequencies to file in export '
'directory ...', pre_dash=False)
# forcing always two source polarizations for mt approach
if self.mt:
self.n_tx = 2
if (self.fd or self.mt) and df.MPI.rank(self.mpi_cw) == 0:
np.savetxt(self.out_dir + '/' +
self.mod_name + '_frequencies.dat', self.frequencies)
else:
pass
# assign same Tx current for all frequencies and Tx if not specified
if (self.fd or self.mt or 'FT' in self.approach):
if self.currents is not None and \
type(self.currents[0]) is not list:
self.currents = np.tile(self.currents, (self.n_freqs, 1))
elif self.currents is None:
self.currents = np.ones((self.n_freqs,
self.n_tx)) * self.current
elif (self.td or self.dc) and 'FT' not in self.approach:
if self.currents is None:
self.currents = np.ones( self.n_tx) * self.current
# convert floats or arrays to lists for internal handling
if type(self.sigma_ground) is float:
self.sigma_ground = [self.sigma_ground]
elif type(self.sigma_ground) is np.ndarray:
self.sigma_ground = self.sigma_ground.tolist()
if type(self.sigma_air) is float:
self.sigma_air = [self.sigma_air]
if type(self.sigma_0) is np.ndarray:
self.sigma_0 = self.sigma_0.tolist()
self.sigma[0] = self.sigma_air[0]
if 'sigma' not in mod_kwargs:
self.sigma.extend(self.sigma_ground[:])
if not all([type(sig) is float or (type(sig) is list and len(sig) == 1)
for sig in self.sigma]):
self.sigma_tensor_flag = True
for si, sig in enumerate(self.sigma):
if type(sig) is float:
self.sigma[si] = [sig]
if self.mu_r is not None:
self.mu = [[self.mu_0]]
for mu_r in self.mu_r:
if type(mu_r) is not list:
self.mu.append([mu_r * self.mu_0])
else:
self.mu_tensor_flag = True
self.mu.append((np.array(mu_r) * self.mu_0).tolist())
if self.eps_r is not None:
self.eps = [[self.eps_0]]
for eps_r in self.eps_r:
if type(eps_r) is not list:
self.eps.append([eps_r * self.eps_0])
else:
self.eps_tensor_flag = True
self.eps.append((np.array(eps_r) * self.eps_0).tolist())
if '_s' in self.approach:
if 'anomaly_layer_markers' in mod_kwargs or 'sigma_0' not in \
mod_kwargs or len(self.sigma_0) != len(self.sigma_ground):
if len(self.sigma_ground) == self.n_layers and \
'sigma_0' not in mod_kwargs:
lp(self.logger, 20,
'... automatically setting *sigma_0* to sigma ground '
'for layered model ...', pre_dash=False)
self.sigma_0 = [val for val in self.sigma_ground]
else:
lp(self.logger, 50,
'Error! Specify background resistivity values *sigma_0*'
'\nwith the same length as *sigma_ground*.\n'
'The old syntax using *anomaly_layer_markers* is not\n'
'supported anymore. Aborting ...')
raise SystemExit
self.calc_delta_sigma()
if bool(mod_kwargs) is False:
lp(self.logger, 30,
'Warning! No parameters or wrong parameters updated. '
'Continuing ...', post_dash=True)
self.check_parameter_dx_conformity()
self.print_model_parameters()
if self.sigma_tensor_flag or self.mu_tensor_flag or \
self.eps_tensor_flag:
self.tensor_flag = True
[docs]
def calc_delta_sigma(self):
"""
Calculate delta sigma using *sigma_ground* and the corresponding
background conductivities *sigma_0*.
"""
# convert floats or arrays to lists for internal handling
for si, sig in enumerate(self.sigma_ground):
if type(sig) is float:
self.sigma_ground[si] = [sig]
for si, sig in enumerate(self.sigma_0):
if type(sig) is float:
self.sigma_0[si] = [sig]
for j in range(len(self.sigma_0)):
tmp = self.sigma_0[j]
if (type(self.sigma_ground[j]) is not list) or \
(type(tmp) is not list):
# this old way of implementation only required for A-V nodal
# approach, could be updated in future to dg-functions
self.sigma_tensor_flag = True
if type(self.sigma_ground[j][0]) is float:
self.sigma_ground[j] = df.as_matrix(
((self.sigma_ground[j][0], 0., 0.),
(0., self.sigma_ground[j][0], 0.),
(0., 0., self.sigma_ground[j][0])))
if type(tmp[0]) is float:
tmp = df.as_matrix(((tmp[0], 0., 0.),
(0., tmp[0], 0.),
(0., 0., tmp[0])))
self.sigma_0[j] = tmp
self.delta_sigma.extend([self.sigma_ground[j] - tmp])
else:
if len(self.sigma_ground[j]) == 1 and len(tmp) == 1:
self.delta_sigma.append([self.sigma_ground[j][0] - tmp[0]])
else:
# convert automatically len(1/3/6) values for a conformal
# subtraction of either three or six conductivity values
if len(self.sigma_ground[j]) == 1 and len(tmp) != 1:
if len(tmp) == 3:
self.sigma_ground[j] = [self.sigma_ground[j][0],
self.sigma_ground[j][0],
self.sigma_ground[j][0]]
elif len(tmp) == 6:
self.sigma_ground[j] = [self.sigma_ground[j][0],
0.,
0.,
self.sigma_ground[j][0],
0.,
self.sigma_ground[j][0]]
elif len(self.sigma_ground[j]) != 1 and len(tmp) == 1:
if len(self.sigma_ground[j]) == 3:
tmp = [tmp[0], tmp[0], tmp[0]]
elif len(self.sigma_ground[j]) == 6:
tmp = [tmp[0], 0., 0., tmp[0], 0., tmp[0]]
elif len(self.sigma_ground[j]) == 3 and len(tmp) == 6:
self.sigma_ground[j] = [self.sigma_ground[j][0],
0.,
0.,
self.sigma_ground[j][1],
0.,
self.sigma_ground[j][2]]
elif len(self.sigma_ground[j]) == 6 and len(tmp) == 3:
tmp = [tmp[0], 0., 0., tmp[1], 0., tmp[2]]
self.delta_sigma.append(
(np.array(self.sigma_ground[j]) -
np.array(tmp)).tolist())
[docs]
def build_dg_parameter_functions(self, FS, pf_EH=None, eps=False):
"""
Build discontinuous isotropic or anisotropic parameter functions.
"""
if self.tensor_flag:
FS.S_dgt = df.TensorFunctionSpace(FS.mesh, "DG", 0)
# build sigma functions
if not self.sigma_tensor_flag:
self.sigma_func = FS.build_dg_func(self.sigma)
self.sigma_inv_func = FS.build_dg_func(self.sigma,
inverse=True)
else:
self.sigma_func = FS.build_dg_tensor_func(self.sigma,
logger=self.logger)
self.sigma_inv_func = FS.build_dg_tensor_func(
self.sigma, inverse=True, logger=self.logger)
if '_s' in self.approach:
# build further functions for secondary-field formulations
if not self.sigma_tensor_flag:
self.dsigma_func = FS.build_dg_func(self.delta_sigma)
# self.dsigma_inv_func = FS.build_dg_func(self.delta_sigma,
# inverse=True)
else:
self.dsigma_func = FS.build_dg_tensor_func(self.delta_sigma,
logger=self.logger)
# self.dsigma_inv_func = FS.build_dg_tensor_func(
# self.delta_sigma, inverse=True)
# the following functions only if required
if 'H' in self.approach or 'F' in self.approach or \
pf_EH == 'H':
if not self.sigma_tensor_flag:
self.sigma0_func = FS.build_dg_func(
[self.sigma_air] + self.sigma_0)
self.sigma0_inv_func = FS.build_dg_func(
[self.sigma_air] + self.sigma_0, inverse=True)
else:
self.sigma0_func = FS.build_dg_tensor_func(
[self.sigma_air] + self.sigma_0, logger=self.logger)
self.sigma0_inv_func = FS.build_dg_tensor_func(
[self.sigma_air] + self.sigma_0, inverse=True,
logger=self.logger)
# build permeability functions
if type(self.mu) is float:
self.mu_func = df.Constant(self.mu)
self.mu_inv_func = df.Constant(1./self.mu)
else:
if not self.mu_tensor_flag:
self.mu_func = FS.build_dg_func(self.mu)
self.mu_inv_func = FS.build_dg_func(self.mu,
inverse=True)
else:
self.mu_func = FS.build_dg_tensor_func(self.mu)
self.mu_inv_func = FS.build_dg_tensor_func(self.mu,
inverse=True,
logger=self.logger)
# build permittivity functions if required
if eps:
if type(self.eps) is float:
self.eps_func = df.Constant(self.eps)
# self.eps_inv_func = df.Constant(1./self.eps)
else:
if not self.eps_tensor_flag:
self.eps_func = FS.build_dg_func(self.eps)
# self.eps_inv_func = FS.build_dg_func(self.eps,
# inverse=True)
else:
self.eps_func = FS.build_dg_tensor_func(self.eps,
logger=self.logger)
# self.eps_inv_func = FS.build_dg_tensor_func(self.eps,
# inverse=True)
[docs]
def print_model_parameters(self):
"""
Print model parameters with adjusted style, e.g., no lengthly lists.
"""
lp(self.logger, 20, 'MODEL parameters update:')
for k, v in sorted(self.__dict__.items()):
if isinstance(v, (tuple, list)) and not self.tensor_flag:
try:
to_print = np.array(v, dtype='object')
except ValueError:
to_print = v
else:
to_print = v
if k == 'n_tx':
to_print = str(v) + ', updated to FE.n_tx if s_type != auto'
if k == 'tx':
to_print = 'found ' + str(len(v)) + \
' tx, updated to FE.tx if s_type != auto'
if k == 'rx':
to_print = 'found ' + str(len(v)) + ' rx'
if k == 'frequencies':
if len(v) > 1:
to_print = 'calculating ' + str(len(v)) + ' frequencies'
if k == 'omegas':
if len(v) > 1:
to_print = 'calculating ' + str(len(v)) + ' omegas'
if k == 'currents':
if len(v) > 1 and type(v[0]) is not list:
to_print = 'current array has shape (n_tx)'
elif len(v) > 1 and len(v[0] > 1):
to_print = 'current array has shape (n_freqs, n_tx)'
if 'sigma' in k and 'tensor_flag' not in k:
if len(v) < 3:
to_print = np.asarray(v).ravel()
else:
to_print = 'found ' + str(len(v)) + ' conductivity values'
if v is None:
to_print = str(v)
if k not in ['mpi_cw', 'mpi_cs', 'mpi_rank', 'logger']:
lp(self.logger, 20,
'--> {:<22}'.format(str(k)) + ' = ' + str(to_print),
pre_dash=False)
[docs]
def load_mesh_parameters(self):
"""
Import mesh parameters such as the number of domains or topography
information.
"""
if not os.path.isfile(self.m_dir + '/_' + str(self.file_format) +
'/' + self.mesh_name + '.' +
str(self.file_format)):
if not (os.path.isfile(self.m_dir + '/_mesh/' + self.mesh_name +
'.mesh') or
os.path.isfile(self.m_dir + '/_vtk/' + self.mesh_name +
'.vtk')):
lp(self.logger, 50,
'Error! The mesh "' + self.mesh_name + '" could not '
'be found. Aborting ...')
raise SystemExit
if os.path.isfile(self.para_dir + '/' + self.mesh_name +
'_parameters.json'):
self.__dict__.update(json.load(open(
self.para_dir + '/' + self.mesh_name + '_parameters.json')))
if len(self.tx) > 0:
self.n_tx = len(self.tx)
if len(self.rx) > 0:
self.n_rx = len(self.rx)
if not hasattr(self, 'post_rotate'):
self.post_rotate = None
if not hasattr(self, 'overwrite_markers'):
self.overwrite_markers = None
else:
# check deprecated version for compatibility of meshes gener-
# ated with previous custEM versions, will be disabled in future
self.centering = False
self.topo = None
self.easting_shift = None
self.northing_shift = None
self.rotation = None
if os.path.isfile(self.para_dir + '/' +
self.mesh_name + '_parameters.txt'):
params = np.genfromtxt(self.para_dir + '/' + self.mesh_name +
'_parameters.txt', dtype=None,
encoding=None)
self.topo = params[0]
if 'True' in params[1]:
self.centering = True
if 'None' not in params[2]:
self.easting_shift = float(params[2])
if 'None' not in params[3]:
self.northing_shift = float(params[3])
if 'None' not in params[4]:
self.rotation = float(params[4])
self.n_layers = int(float(params[5]))
for j in range(6, len(params)):
self.layer_depths.extend([float(params[j])])
else:
lp(self.logger, 50,
'Warning! Mesh parameter file could not '
'be found for the mesh "' + self.mesh_name + '". '
'Aborting ...')
raise SystemExit
[docs]
class Domains:
"""
Domain class called internally from FunctionSpace instance.
Methods
-------
- add_anomaly()
DEPRECATED - should not be used anymore! Add an anomaly domain in the
'FEniCS-way', e.g. predefined anomaly instances from the file
*misc/anomaly_expressions.py* or user-defined dolfin expressions can
be added to the model, each anomaly gets a new marker
"""
def __init__(self, mesh, n_dom, domain_func=None, domain_vals=None):
"""
Defines Domains as marked by the MeshGenerator (TetGen) in the
imported mesh. If no domain markers are available, the default domains
are *ground* (z < 0 + eps) and *air* (z > 0 - eps).
Required arguments
------------------
- mesh, type dolfin Mesh
imported mesh
- n_dom, type int
number of identified domains; if domain markers are available,
they will be identified by the method *init_mesh()*
Keyword arguments
-----------------
- domain_func = None, type dolfin MeshFunction
initialized by function *init_mesh* if mesh contains domain markers
- domain_vals = None, type list of int
list of domain markers identified by the method *init_mesh()*
"""
uniform_domain_func = df.MeshFunction("size_t", mesh,
mesh.topology().dim())
uniform_domain_func.set_all(0)
uniform_domain_func2 = df.MeshFunction("size_t", mesh,
mesh.topology().dim() - 1)
uniform_domain_func2.set_all(0)
self.dx_0 = df.Measure('dx', subdomain_data=uniform_domain_func)
self.ds_0 = df.Measure('ds', subdomain_data=uniform_domain_func2)
if n_dom == 1:
try:
from custEM.misc import Ground, Air
except ModuleNotFoundError or ImportError:
lp(self.logger, 30,
'Warning! Definitions for "Ground" and "Air" -space could'
' not be imported. Continuing ...')
self.domain_func = df.MeshFunction("size_t", mesh,
mesh.topology().dim())
self.domain_func.set_all(0)
self.ground = Ground()
self.air = Air()
self.ground.mark(self.domain_func, 0)
self.air.mark(self.domain_func, 1)
self.dx = df.Measure('dx', subdomain_data=self.domain_func)
self.n_adomains = 0
self.n_domains = self.n_adomains + 2
else:
self.domain_func = domain_func
self.domain_vals = domain_vals
self.dx = df.Measure('dx', subdomain_data=self.domain_func)
self.n_domains = n_dom
[docs]
def add_anomaly(self, instance):
"""
DEPRECATED! Do not use anymore without contacting the authors.
(from custEM.misc import anomaly_expressions as ae)
(MOD.FS.DOM.add_anomaly(ae.Plate(origin=[0.0, -100.0, -400.0])))
"""
anom = instance
anom.mark(self.domain_func, self.n_domains)
self.n_adomains += 1
self.n_domains += 1