Source code for custEM.fem.fem_base

# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""

import dolfin as df
import os
import numpy as np
import json
from custEM.fem import primary_fields as pf
from custEM.misc import logger_print as lp
from custEM.meshgen import vtk2xml2h5


[docs] class FunctionSpaces: """ FunctionSpaces class called from MOD instance. Used for initializing the required function spaces for the FE assembly, boundary condtions, and primary fields for all secondary field formulations. Methods ------- - init_mesh() import mesh and domain information from *.h5* file (default) in parallel or from *.xml* file in serial - add_primary_fields() initialize PrimaryField instance, calculate or import primary fields - add_dirichlet_bc() initalize dirichlet boundary conditions on the model boundary - build_dg_func() build element-wise scalar parameter function (isotropic parameters) - build_dg_tensor_func() build element-wise tensor parameter function (anisotropic parameters) - build_complex_sigma_func() build complex-valued sigma functions (only isotropic) for modeling induced polarization effects Description of initialization order and FunctionSpaces ------------------------------------------------------ 1. import mesh (see *init_mesh* documentation) 2. build FEniCS FunctionSpaces depending on the mesh, polynomial order **p** and the chosen approach - S = Nodal (LaGrange) FunctionSpace order p and dimension 1 - S_dg = element-wise scalar discontinuous FunctionSpace order 0, 1 value per cell - S_dgt = element-wise tensor-valued discontinuous FunctionSpace order 0, 9 values per cell - V = Nedelec Space order p and dimension 3 - V_cg = Nodal (LaGrange) VectorFunctionSpace order p and dimension 3 - W = mixed Nedelec Space order p and dimension 6 for coupled real and imaginary parts - W_cg = mixed (LaGrange) VectorFunctionSpace order p and dimension 6 for coupled real and imaginary parts - M = mixed solution FunctionSpace order p and dimension 6 or 8 (dependent on appraoch) 3. initalize empty solution Function **U** on mixed solution space 4. if a secondary field formulation is used, a PrimaryField instance **PF** is added to the FunctionSpace class later on by calling the **add_Primary_Field()** method automatically during the **FE.build_var_form()** call """ def __init__(self, MP, p, test_mode, self_mode, ned_kind, dg_interp, load_existing, reuse_fs=None, overwrite_mesh=False, mesh_only=False): """ Initializes FunctionsSpaces dependent on the chosen approach. Required arguments ------------------ - see documentation of **MOD** class """ if reuse_fs is not None: lp(MP.logger, 20, '... reusing mesh and domain functions from specified FS ' '...', pre_dash=False) self.mesh = reuse_fs.mesh self.DOM = reuse_fs.DOM MP.n_domains = self.DOM.n_domains else: self.init_mesh(test_mode, MP, self_mode, overwrite_mesh) if mesh_only: return self.approach = MP.approach self.anom_flag = True self.p = p self.test_mode = test_mode self.solution_time = 0. self.bc_geometry = None if ned_kind == '1st': self.V = df.FunctionSpace(self.mesh, 'N1curl', p) elif ned_kind == '2nd': lp(MP.logger, 20, ' - using second kind Nedelec element type - ') self.V = df.FunctionSpace(self.mesh, 'N2curl', p) elif ned_kind == 'div': # Not in usage right now, just for tests lp(MP.logger, 20, ' - using div conforming Nedelec elements - ') self.V = df.FunctionSpace(self.mesh, 'N1div', p) else: lp(MP.logger, 50, 'Error! Nedelec kind must be "1st" or "2nd" ' '(see FEniCS book for definitions). Aborting ...') raise SystemExit self.V_cg = df.VectorFunctionSpace(self.mesh, 'CG', p) if dg_interp: self.V_dg = df.VectorFunctionSpace(self.mesh, 'DG', p) if not load_existing: self.S = df.FunctionSpace(self.mesh, 'CG', p) self.S_dg = df.FunctionSpace(self.mesh, 'DG', 0) if ned_kind == '1st': ele = df.FiniteElement("N1curl", self.mesh.ufl_cell(), p) if ned_kind == '2nd': ele = df.FiniteElement("N2curl", self.mesh.ufl_cell(), p) if ned_kind == 'div': ele = df.FiniteElement("N1div", self.mesh.ufl_cell(), p) self.W = df.FunctionSpace(self.mesh, df.MixedElement([ele, ele])) self.W1 = df.FunctionSpace(self.mesh,"N1curl",p) if 'E' in self.approach or 'mt' in self.approach.lower(): self.M = self.W self.M1= self.W1 elif 'H' in self.approach: self.M = self.W self.M1= self.W1 elif 'Am' in self.approach or 'Fm' in self.approach: ele = df.FiniteElement("N1curl", self.mesh.ufl_cell(), p) sce = df.FiniteElement("CG", self.mesh.ufl_cell(), p) self.M = df.FunctionSpace( self.mesh, df.MixedElement([ele, ele, sce, sce])) elif 'An' in self.approach or 'Fn' in self.approach: sce = df.FiniteElement("CG", self.mesh.ufl_cell(), p) self.M = df.FunctionSpace( self.mesh, df.MixedElement([sce, sce, sce, sce, sce, sce, sce, sce])) if 'dc' in self.approach.lower(): self.M = self.S self.U = df.Function(self.M)
[docs] def init_mesh(self, test_mode, MP, self_mode, overwrite_mesh): """ Import the requested mesh and identify domain markers, if available. Required arguments ------------------ - see documentation of **MOD** class """ path = os.getcwd() if not test_mode and MP.mpi_rank == 0: abort = False if not os.path.isfile(MP.m_dir + '/_' + MP.file_format + '/' + MP.mesh_name + '.' + MP.file_format): lp(MP.logger, 20, '... mesh not available in "' + MP.file_format + '" format yet, conversion started ...', barrier=False) vtk2xml2h5(MP.mesh_name, MP.m_dir, MP.post_rotate, MP.overwrite_markers) lp(MP.logger, 20, '... conversion finished ...', pre_dash=False, post_dash=True, barrier=False) elif os.path.isfile(MP.m_dir + '/_vtk/' + MP.mesh_name + '.vtk'): if os.path.getmtime(MP.m_dir + '/_' + MP.file_format + '/' + MP.mesh_name + '.' + MP.file_format) < \ os.path.getmtime(MP.m_dir + '/_vtk/' + MP.mesh_name + '.vtk'): if overwrite_mesh: lp(MP.logger, 30, 'Warning! Found newer TetGen output file ' 'than already converted *h5/xml* file\nfor the ' 'mesh with the name: "' + MP.mesh_name + '".\n' 'Since *overwrite_mesh* was set to **True**, ' 'automated conversion and\noverwriting the prior ' 'mesh is enabled. ' 'Continuing ...', barrier=False) vtk2xml2h5(MP.mesh_name, MP.m_dir, MP.post_rotate, MP.overwrite_markers) lp(MP.logger, 20, '... conversion finished ...', pre_dash=False, post_dash=True, barrier=False) else: lp(MP.logger, 50, 'Error! Found newer TetGen output file ' 'than already converted *h5/xml* file\nfor the ' 'mesh with the name: "' + MP.mesh_name + '".\n' 'Consider one of the following options:\n' 'a) manually removing the existing *h5/xml* mesh,\n' 'b) re-meshing and re-running with a new mesh ' 'name,\n' 'c) enabling automated re-conversion by setting ' 'the\n *overwrite_mesh* flag (MOD class) to ' '**True**. Aborting ...', barrier=False) abort = True else: abort = None abort = MP.mpi_cw.bcast(abort, root=0) if abort: raise SystemExit os.chdir(MP.m_dir + '/_' + MP.file_format + '/') if test_mode: lp(MP.logger, 30, 'Warning! Test mode activated. Continuing ...\n' ' - using 48000 cell +- 10 km dimensions UnitCubeMesh - ') self.mesh = df.UnitCubeMesh(10, 10, 10) self.mesh.coordinates()[:] = ( self.mesh.coordinates()[:] - 0.5) * 200. n_domains = 1 self.DOM = Domains(self.mesh, n_domains) if MP.file_format == 'h5': f = df.HDF5File(MP.mpi_cw, 'UnitCubeMesh.h5', 'w') f.write(self.mesh, "/mesh") domains = df.MeshFunction('size_t', self.mesh, self.mesh.topology().dim()) f.write(domains, "/domains") f.close() elif MP.file_format == 'xml': lp(MP.logger, 30, 'Warning! TEST_MODE in combination with "xml" file format ' 'shows some bugs for interpolation. Continuing ...') os.chdir(path) else: if self_mode: if MP.file_format == 'h5': self.mesh = df.Mesh(MP.mpi_cs) f = df.HDF5File(MP.mpi_cs, MP.mesh_name + '.' + MP.file_format, 'r') f.read(self.mesh, '/mesh', False) domain_func = df.MeshFunction('size_t', self.mesh, self.mesh.topology().dim()) f.read(domain_func, "/domains") domain_vals = np.unique(domain_func.array()) f.close() elif MP.file_format == 'xml': self.mesh = df.Mesh(MP.mpi_cs, MP.mesh_name + '.' + MP.file_format) domain_func = df.MeshFunction( 'size_t', self.mesh, MP.mesh_name + '_domains.' + MP.file_format) domain_vals = np.unique(domain_func.array()) else: lp(MP.logger, 50, 'Error! Specify correct mesh format *h5* or *xml*. ' 'Aborting ...') raise SystemExit else: if MP.file_format == 'h5': self.mesh = df.Mesh() f = df.HDF5File(MP.mpi_cw, MP.mesh_name + '.' + MP.file_format, 'r') f.read(self.mesh, '/mesh', False) domain_func = df.MeshFunction('size_t', self.mesh, self.mesh.topology().dim()) f.read(domain_func, "/domains") dom_markers = MP.mpi_cw.gather(domain_func.array()[:], root=0) f.close() if MP.mpi_cw.Get_rank() == 0: dm = [] for j in range(MP.mpi_cw.Get_size()): dm.extend(dom_markers[j]) doms = np.unique(np.array(dm)) else: doms = None domain_vals = MP.mpi_cw.bcast(doms, root=0) elif MP.file_format == 'xml': self.mesh = df.Mesh(MP.mesh_name + '.' + MP.file_format) domain_func = df.MeshFunction( 'size_t', self.mesh, MP.mesh_name + '_domains.' + MP.file_format) mesh2 = df.Mesh(MP.mpi_cs, MP.mesh_name + '.' + MP.file_format) domain_func2 = df.MeshFunction("size_t", mesh2, mesh2.topology().dim(), mesh2.domains()) domain_vals = np.unique(domain_func2.array()) else: lp(MP.logger, 50, 'Error! Specify correct mesh format *h5* or *xml*. ' 'Aborting ...') raise SystemExit if self_mode: lp(MP.logger, 20, '... detected ' + str(len(domain_vals)) + ' domains ...', pre_dash=False, barrier=False) else: lp(MP.logger, 20, '... detected ' + str(len(domain_vals)) + ' domains ...', pre_dash=False) self.DOM = Domains(self.mesh, len(domain_vals), domain_func, domain_vals) MP.n_domains = self.DOM.n_domains os.chdir(path)
[docs] def add_primary_fields(self, FE, MP, calc=True, **kwargs): """ Add a PrimaryField instance to the FunctionSpace instance, called by **FE** if the 'Secondary' field formulation is used. Required arguments ------------------ - FE, type class FE approach instance (e.g., **E_vector**) - MP, type class ModelParameters instance Keyword arguments ----------------- - calc = True, type bool enable primary fields calculation or **PF** class initialization only - further kwargs see full list in the description of the **PrimaryField** class """ self.PF = pf.PrimaryField(self, FE, MP, **kwargs) if calc: self.PF.load_primary_fields()
[docs] def add_dirichlet_bc(self, MP=None, dof_re=None, dof_im=None, bc_H=False): """ Initalize Dichichlet BoundaryConditions on the solution space, only for internal usage. The keyword arguments are set automatically to handle different cases. Keyword arguments ----------------- - MP = None, type class only used to apply inhomogeneous Dirichlet conditions in A-V approach, which requires E-field conversion to A depending on *MP.omega* - dof_re = None, type list list of boundary dof of "real" part of FE system to apply inhomogeneous Dirichlet conditions - dof_im = None, type list list of boundary dof of "imaginary" part of FE system to apply inhomogeneous Dirichlet conditions - bc_H = False, type bool flag for testing inhomogeneous Dirichlet conditions for H-field conversion, provides no benefits so far but might be re-considered for other purposes in future """ class B(df.SubDomain): def inside(self, x, on_boundary): return on_boundary if MP is not None and dof_re is not None and dof_im is not None: self.dof_re = dof_re self.dof_im = dof_im if MP is not None: self.PF.load_primary_fields(boundary_fields=True) if bc_H: pass # currently not using ID conditions for H field conversion # real_B = df.interpolate(self.PF.H_0_r_cg_B[0], self.V) # imag_B = df.interpolate(self.PF.H_0_i_cg_B[0], self.V) else: real_B = df.interpolate(self.PF.E_0_r_cg_B[0], self.V) imag_B = df.interpolate(self.PF.E_0_i_cg_B[0], self.V) non_zero = df.Function(self.M) non_zero.vector()[self.dof_re] = real_B.vector().get_local() non_zero.vector()[self.dof_im] = imag_B.vector().get_local() if 'A' in self.approach: non_zero.vector()[self.dof_re] = (1./MP.omega) *\ imag_B.vector()[:] non_zero.vector()[self.dof_im] = (-1./MP.omega) *\ real_B.vector()[:] self.bc = df.DirichletBC(self.M, non_zero, B()) self.bc_type = 'ID' else: if 'E' in self.approach or 'H' in self.approach: zero = df.Constant(("0.0", "0.0", "0.0", "0.0", "0.0", "0.0")) elif 'dc' in self.approach.lower(): zero = df.Constant(("0.0")) else: zero = df.Constant(("0.0", "0.0", "0.0", "0.0", "0.0", "0.0", "0.0", "0.0")) self.bc = df.DirichletBC(self.M, zero, B()) self.bc_type = 'ZD'
[docs] def add_inner_dirichlet_bc(self, logger): """ Initalize Dichichlet BoundaryConditions for inner edges of the mesh, used to specify "perfect conductor" infrastructure. Required arguments ----------_------- - inner_dof, type list list to specify the correct dof corresponding to the infrastructure implemented as edges in the mesh. """ if self.bc_geometry is None: lp(logger, 50, 'Error! Need geometrical expression for defining locations ' 'of infrstructure. Aborting ...') raise SystemExit # class B(df.SubDomain): # def inside(self, x, on_boundary): # if (x[2] > self.c1[2] and x[2] < self.c0[2] and # x[1] > self.c0[1] and x[1] < self.c1[1] and # x[0] > self.c0[0] and x[0] < self.c1[0]): # print(x) # return(False) # # return(x[2] > self.c1[2] and x[2] < self.c0[2] and # # x[1] > self.c0[1] and x[1] < self.c1[1] and # # x[0] > self.c0[0] and x[0] < self.c1[0]) # self.bc_geometry = B() # self.bc_geometry.c0 = [-3325.10911528, 3565.29922087, 267.46104002] # self.bc_geometry.c1 = [-3312.04260977, 3568.65196992, 267.43269787] if 'E' in self.approach: zero = df.Constant(("0.0", "0.0", "0.0", "0.0", "0.0", "0.0")) else: lp(logger, 50, 'Error! Infrastructure Conditions only implemented ' 'for total E-field approach so far. Aborting ...') raise SystemExit self.bc = df.DirichletBC(self.M, zero, self.bc_geometry, 'pointwise') self.bc_type = 'IF'
[docs] def build_dg_func(self, vals, inverse=False): """ Assign isotropic conductivity values to discontinous function spaces. Required arguments ------------------ - vals, type list (of lists) conductivity values specified in **MP** instance Keyword arguments ----------------- - inverse = False, type bool return either sigma or 1/sigma (=inv(sigma)) """ dg_func = df.Function(self.S_dg) np_vals = np.array(vals) if inverse: dg_func.vector().set_local( 1./np_vals[self.DOM.domain_func.array()]) else: dg_func.vector().set_local(np_vals[ self.DOM.domain_func.array()]) dg_func.vector().apply('insert') return(dg_func)
[docs] def build_dg_tensor_func(self, vals, inverse=False, logger=None): """ Assign anisotropic conductivity values to discontinous tensor function spaces. Required arguments ------------------ - vals, type list (of lists) conductivity values specified in **MP** instance Keyword arguments ----------------- - inverse = False, type bool return either sigma or inv(sigma) """ dg_func = df.Function(self.S_dgt) domain_tensor_func = np.repeat(self.DOM.domain_func.array(), 9) * 9 domain_tensor_func[1::9] += 1 domain_tensor_func[2::9] += 2 domain_tensor_func[3::9] += 3 domain_tensor_func[4::9] += 4 domain_tensor_func[5::9] += 5 domain_tensor_func[6::9] += 6 domain_tensor_func[7::9] += 7 domain_tensor_func[8::9] += 8 rearranged = [] for val in vals: if type(val) is float or type(val) is int or len(val) == 1: if len(val) == 1: isoval = val[0] else: isoval = val if inverse: rearranged.extend(np.array([1. / isoval, 0., 0., 0., 1. / isoval, 0., 0., 0., 1. / isoval])) else: rearranged.extend(np.array([isoval, 0., 0., 0., isoval, 0., 0., 0., isoval])) elif len(val) == 3: if inverse: rearranged.extend(np.array([1. / val[0], 0., 0., 0., 1. / val[1], 0., 0., 0., 1. / val[2]])) else: rearranged.extend(np.array([val[0], 0., 0., 0., val[1], 0., 0., 0., val[2]])) elif len(val) == 6: if inverse: # according to definitions to invert a symmetric 3x3 matrix rearranged.extend(np.array([val[3]*val[5] - val[4]*val[4], val[2]*val[4] - val[1]*val[5], val[1]*val[4] - val[2]*val[3], val[2]*val[4] - val[1]*val[5], val[0]*val[5] - val[2]*val[2], val[1]*val[2] - val[0]*val[4], val[1]*val[4] - val[2]*val[3], val[1]*val[2] - val[0]*val[4], val[0]*val[3] - val[1]*val[1]]) / (val[0] * (val[3]*val[5] - val[4]*val[4]) + val[3] * (val[0]*val[5] - val[2]*val[2]) + val[5] * (val[0]*val[3] - val[1]*val[1]))) else: rearranged.extend(np.array([val[0], val[1], val[2], val[1], val[3], val[4], val[2], val[4], val[5]])) else: lp(logger, 50, 'Error! Anisotropy must be either specified by 3 main ' 'diagonal values or 6 upper triangle values of the ' 'tensor. Aborting ...') raise SystemExit dg_func.vector().set_local(np.array(rearranged)[domain_tensor_func]) dg_func.vector().apply('insert') return(dg_func)
[docs] def build_complex_sigma_func(self, omega, sigma, tau, m, c): """ Calculate and assign complex-valued conductivities for simulating induced polarization effects with a Cole-Cole model discontinous function spaces. This only works for isotropic conductivities for now. Required arguments ------------------ - omega, type float angular frequency - sigma, type list list of conductivity values - tau, type list list of *ip_tau* values, see **MP** class documentation - m, type list list of *ip_m* values, see **MP** class documentation - c, type list list of *ip_c* values, see **MP** class documentation """ sigma_real, sigma_imag = df.Function(self.S_dg), df.Function(self.S_dg) if type(tau) is float or type(tau) is int: tau = [tau for j in range(self.DOM.n_domains)] if type(c) is float or type(c) is int: c = [c for j in range(self.DOM.n_domains)] sigma_ip_vals = np.array([sig[0] for sig in sigma]) / ( 1. - np.array(m) * ( 1. - 1. / (1. + (1j * omega * np.array(tau))**np.array(c)))) sigma_real.vector().set_local(sigma_ip_vals[ self.DOM.domain_func.array()].real) sigma_imag.vector().set_local(sigma_ip_vals[ self.DOM.domain_func.array()].imag) sigma_real.vector().apply('insert') sigma_imag.vector().apply('insert') return(sigma_real, sigma_imag)
[docs] class ModelParameters: """ ModelParameter class called internally from MOD instance. This class contains, holds, and updates all physical and modeling parameters during the FE simulation. All model parameters can be updated with the **update_model_parameters()** method. A description of all available parameters of the **ModelParameters** class is listed below. Methods ------- - update_model_parameters() update any supported physical or modeling parameter - calc_delta_sigma() calculate *delta_sigma* as *sigma_ground* - *sigma_0* and take reshape lists to account for any anisotropic value descriptions - build_dg_parameter_functions() build discontinuous Parameter functions for all physical parameters, single-valued functions for isotropic values and tensor-valued functions for anisotropic values - print_model_parameters() print updated list of model parameters - load_mesh_parameteres() update mesh-related model parameters from 'mesh-parameters' *json* file - check_parameter_dx_conformity() check conformity of number of domains and number of physical parameter values Description of physical parameters ---------------------------------- - f = 10., type float frequency (Hz) - omega = 1e1, type float angular frequency, omega = 2 * pi * f - frequencies = [], type list specify multiple frequencies (Hz) - omegas = [], type list specify multiple angular frequencies, omegas = 2 * pi * frequencies - n_freqs = 1, type int number of frequencies, calculated automatically - current = 1., type float source current I (A) in the current carrying wire, variable *J* is deprecated but still used internally, *current* is the new alias for it - currents = [], type 1D or 2D array or list of lists unique source currents I (A) for multiple Tx or multiple frequencies; In time-domain and for DC modeling, the first dimension of the current data array corresponds to n_tx; in frequency-domain modeling, the first dimension of the current data array corresponds to n_freqs and the second dimension of the current data array correponds to n_tx; if not specifed, current = 1. is used for all frequencies and Tx - m_moment = 1., type float magnetic dipole moment for calculating background fields of a vmd source in a fullspace analytically - n_tx = 1, type int number of transmitters (right-hand sides of the system of linear equations), calculated automatically - sigma_air = 1e-8, type float or list homogeneous air-space conductivity, for modeling a fullspace, set sigma_air to the same value as sigma_ground - sigma_ground = 1e-2, type list of float or list of list of floats subsurface conductivities corresponding to all domain markers (except 0 which corresponds to *sigma_air*); anisotropy can be specified by a list of three values (main diagonal) or a list of six values (upper triangle of 3x3 conductivity tensor) for each domain; a list of one value is handled identical to providing this single value as float instead of embedding it into a list - sigma_0 = [], type list of float or list of list of floats background condcutivities for all *sigma_ground*, only required for secondary-field/potential formulations; arbitrary general anisotropic values are supported for *sigma_ground* and *sigma_0*; *delta_sigma* is evaluated internally - sigma = [1e-2, 1e-8], type list of float list of all conductivities, the first two values in sigma are always **sigma_ground** (first layer) and **sigma_air**; if these values are updated later on, sigma will be updated internally; further values are related to subsurface layers or anomaly-domains in the model (see **sigma_anom**) - delta_sigma = [0., 0.], type list of float conductivity differences, needed for secondary field formulations, updated internally - anomaly_layer_markers = [], type list of int define the reference subsurface layer (from top to depth) for each anomaly to calculate **delta_sigma**. For instance, if two anomalies are located in the first and third layer of a three-layered earth, the correct choice would be *[1, 3]*; used for secondary field approaches only, this functionality will be updated by implementing an automated mapping in future - mu = mu_0 = 4 * pi * 1e-7, type float magentic permeability (mu_r * mu_0), default is vacuum permeability - mu_r = 1., type float or list of floats relative magnetic permeability - eps = eps_0 = 8.85 * 1e-12, type float electric permittivity (eps_r * eps_0), default is vacuum permittivity - eps_r = 1., type float or list of floats values of relative electric permittivity - ip_tau = 1., type float or list of floats *tau* values of Cole-Cole model for modeling induced polarization effects - ip_c = 1., type float or list of floats *c* values of Cole-Cole model for modeling induced polarization effects - ip_m = 0., type float or list of floats *m* values of Cole-Cole model for modeling induced polarization effects - system_it = False, type bool used to change system for iterative solver. Default is false. Set true if iterative solver is used Description of mesh parameters ------------------------------ - n_layers = 1, type int number of subsurface layers, automatically imported from mesh-parameter file - layer_depths = [], type list of float depths of subsurface-layer interfaces, automatically imported from mesh-parameter file - layer_thicknesses = [], type list of float subsurface layer thicknesses, calculated automatically from **layer_depths** attribute - topo = None, type str definition of DEM- or synthetic topography, imported automatically from mesh-parameter file. """ def __init__(self, logger, str_args, test_mode,solvingapproach): """ Initializes physical and modeling parameters. Required arguments ------------------ - str_args, type list of str list of strings containing internal name and parameter conventions, see **MOD** class for the documentation - dc, mt, td, fd, type bool flags for specifying the kind of electromagnetic modeling - test_mode, type bool specify if test_mode should be applied (default = **False**) Keyword arguments ----------------- - tensor_flag = False, type bool only used for internal usage for handling anisotropy """ self.logger = logger self.mod_name = str_args[0] self.mesh_name = str_args[1] self.approach = str_args[2] self.file_format = str_args[3] self.m_dir = str_args[4] self.r_dir = str_args[5] self.para_dir = str_args[6] self.out_dir = str_args[7] self.mute = str_args[8] self.mpi_cw = str_args[9] self.mpi_cs = str_args[10] self.mpi_rank = str_args[11] self.dc = str_args[12] self.mt = str_args[13] self.td = str_args[14] self.fd = str_args[15] self.mumps_debug = str_args[16] self.serial_ordering = str_args[17] self.self_mode = str_args[18] self.test_mode = test_mode self.solvingapproach=solvingapproach self.omega = None self.f = 10. self.frequencies = [] self.omegas = [] self.n_freqs = 1 self.current = 1. # I (A) self.currents = None self.m_moment = 1. self.n_tx = 1 self.n_rx = 0 self.mu_0 = 4.0 * np.pi * 1e-7 self.mu = 4.0 * np.pi * 1e-7 self.mu_r = None self.eps_0 = 8.85 * 1e-12 self.eps = 8.85 * 1e-12 self.eps_r = None self.ip_tau = 1. self.ip_c = 1. self.ip_m = 0. self.sigma_ground = [1e-2] self.sigma_air = [1e-8] self.sigma = [self.sigma_air[0]] self.sigma_0 = [self.sigma_air[0]] self.delta_sigma = [[0.]] if solvingapproach=='direct': self.system_it=False elif solvingapproach=='iterative': self.system_it=True else: lp(self.logger, 50, "Unknown flag for solvingapproach[0]. Abort simulation!") raise SystemExit self.n_layers = 1 self.layer_depths = [] self.layer_thicknesses = [] self.topo = None self.n_domains = None self.sigma_tensor_flag = False self.eps_tensor_flag = False self.mu_tensor_flag = False self.tensor_flag = False if not test_mode: self.load_mesh_parameters() layer_depths = [0.] + self.layer_depths self.layer_thicknesses = np.abs(np.diff( np.append(np.array(self.layer_thicknesses), np.array(layer_depths)))).tolist()
[docs] def update_model_parameters(self, **mod_kwargs): """ Updates all non-default physical parameters if called. A list of available parameters is given in the documentation of the **ModelingParameters** class. """ for key in mod_kwargs: if key not in self.__dict__: lp(self.logger, 50, 'Error! Unknown model parameter set', key) lp(self.logger, 50, "... let's stop before something unexpected happens ...", pre_dash=False) raise SystemExit self.__dict__.update(mod_kwargs) if self.td and ('f' in mod_kwargs or 'frequencies' in mod_kwargs or 'omega' in mod_kwargs or 'omegas' in mod_kwargs): lp(self.logger, 50, 'Error! To avoid ambiguities, it is not allowed to spefiy any ' 'kind of frequency\nusing the time-domain approaches.\n' 'Custom frequencies for the FT (fourier-transform-based) ' 'method can be specified\nduring calling the ' '*build_var_form()* method. Aborting ...', pre_dash=False) raise SystemExit # handle the different options for specifying the frequencies if self.omega is None: if (self.fd or self.mt) and 'f' not in mod_kwargs and \ 'frequencies' not in mod_kwargs and 'omegas' not in \ mod_kwargs: lp(self.logger, 50, 'Error! Specify the frequency(ies) by defining either *' 'f*, *omega*, *frequencies* or *omegas*. Aborting ...') raise SystemExit self.omega = self.f * 2. * np.pi else: if 'f' in mod_kwargs: lp(self.logger, 30, 'Warning! Note that *f* is ignored if *omega* is also ' 'specified. It is recommended to specify either *f* or' ' *omega* consistently. Continuing ...') self.f = self.omega / (2. * np.pi) if 'frequencies' not in mod_kwargs and 'omegas' not in mod_kwargs: self.frequencies = [self.f] self.omegas = [self.omega] self.n_freqs = 1 if 'f' not in mod_kwargs and 'omega' not in mod_kwargs: if 'frequencies' in mod_kwargs: self.omegas = (np.array(self.frequencies) * 2. * np.pi).tolist() elif 'omegas' in mod_kwargs: self.frequencies = (np.array(self.omegas) / (2. * np.pi)).tolist() self.n_freqs = len(self.omegas) if self.n_freqs > 1 and self.approach[0] in ['A', 'F']: lp(self.logger, 50, 'Error! Internal multiple-frequency modeling only ' 'supported for the electric and magentic field approaches. ' 'Either calculate each frequency via looping over several ' '**MOD** instances or contact the developers for support. ' 'Aborting ...') raise SystemExit if self.fd or self.mt: lp(self.logger, 10, '... writing frequencies to file in export ' 'directory ...', pre_dash=False) # forcing always two source polarizations for mt approach if self.mt: self.n_tx = 2 if (self.fd or self.mt) and df.MPI.rank(self.mpi_cw) == 0: np.savetxt(self.out_dir + '/' + self.mod_name + '_frequencies.dat', self.frequencies) else: pass # assign same Tx current for all frequencies and Tx if not specified if (self.fd or self.mt or 'FT' in self.approach): if self.currents is not None and \ type(self.currents[0]) is not list: self.currents = np.tile(self.currents, (self.n_freqs, 1)) elif self.currents is None: self.currents = np.ones((self.n_freqs, self.n_tx)) * self.current elif (self.td or self.dc) and 'FT' not in self.approach: if self.currents is None: self.currents = np.ones( self.n_tx) * self.current # convert floats or arrays to lists for internal handling if type(self.sigma_ground) is float: self.sigma_ground = [self.sigma_ground] elif type(self.sigma_ground) is np.ndarray: self.sigma_ground = self.sigma_ground.tolist() if type(self.sigma_air) is float: self.sigma_air = [self.sigma_air] if type(self.sigma_0) is np.ndarray: self.sigma_0 = self.sigma_0.tolist() self.sigma[0] = self.sigma_air[0] if 'sigma' not in mod_kwargs: self.sigma.extend(self.sigma_ground[:]) if not all([type(sig) is float or (type(sig) is list and len(sig) == 1) for sig in self.sigma]): self.sigma_tensor_flag = True for si, sig in enumerate(self.sigma): if type(sig) is float: self.sigma[si] = [sig] if self.mu_r is not None: self.mu = [[self.mu_0]] for mu_r in self.mu_r: if type(mu_r) is not list: self.mu.append([mu_r * self.mu_0]) else: self.mu_tensor_flag = True self.mu.append((np.array(mu_r) * self.mu_0).tolist()) if self.eps_r is not None: self.eps = [[self.eps_0]] for eps_r in self.eps_r: if type(eps_r) is not list: self.eps.append([eps_r * self.eps_0]) else: self.eps_tensor_flag = True self.eps.append((np.array(eps_r) * self.eps_0).tolist()) if '_s' in self.approach: if 'anomaly_layer_markers' in mod_kwargs or 'sigma_0' not in \ mod_kwargs or len(self.sigma_0) != len(self.sigma_ground): if len(self.sigma_ground) == self.n_layers and \ 'sigma_0' not in mod_kwargs: lp(self.logger, 20, '... automatically setting *sigma_0* to sigma ground ' 'for layered model ...', pre_dash=False) self.sigma_0 = [val for val in self.sigma_ground] else: lp(self.logger, 50, 'Error! Specify background resistivity values *sigma_0*' '\nwith the same length as *sigma_ground*.\n' 'The old syntax using *anomaly_layer_markers* is not\n' 'supported anymore. Aborting ...') raise SystemExit self.calc_delta_sigma() if bool(mod_kwargs) is False: lp(self.logger, 30, 'Warning! No parameters or wrong parameters updated. ' 'Continuing ...', post_dash=True) self.check_parameter_dx_conformity() self.print_model_parameters() if self.sigma_tensor_flag or self.mu_tensor_flag or \ self.eps_tensor_flag: self.tensor_flag = True
[docs] def calc_delta_sigma(self): """ Calculate delta sigma using *sigma_ground* and the corresponding background conductivities *sigma_0*. """ # convert floats or arrays to lists for internal handling for si, sig in enumerate(self.sigma_ground): if type(sig) is float: self.sigma_ground[si] = [sig] for si, sig in enumerate(self.sigma_0): if type(sig) is float: self.sigma_0[si] = [sig] for j in range(len(self.sigma_0)): tmp = self.sigma_0[j] if (type(self.sigma_ground[j]) is not list) or \ (type(tmp) is not list): # this old way of implementation only required for A-V nodal # approach, could be updated in future to dg-functions self.sigma_tensor_flag = True if type(self.sigma_ground[j][0]) is float: self.sigma_ground[j] = df.as_matrix( ((self.sigma_ground[j][0], 0., 0.), (0., self.sigma_ground[j][0], 0.), (0., 0., self.sigma_ground[j][0]))) if type(tmp[0]) is float: tmp = df.as_matrix(((tmp[0], 0., 0.), (0., tmp[0], 0.), (0., 0., tmp[0]))) self.sigma_0[j] = tmp self.delta_sigma.extend([self.sigma_ground[j] - tmp]) else: if len(self.sigma_ground[j]) == 1 and len(tmp) == 1: self.delta_sigma.append([self.sigma_ground[j][0] - tmp[0]]) else: # convert automatically len(1/3/6) values for a conformal # subtraction of either three or six conductivity values if len(self.sigma_ground[j]) == 1 and len(tmp) != 1: if len(tmp) == 3: self.sigma_ground[j] = [self.sigma_ground[j][0], self.sigma_ground[j][0], self.sigma_ground[j][0]] elif len(tmp) == 6: self.sigma_ground[j] = [self.sigma_ground[j][0], 0., 0., self.sigma_ground[j][0], 0., self.sigma_ground[j][0]] elif len(self.sigma_ground[j]) != 1 and len(tmp) == 1: if len(self.sigma_ground[j]) == 3: tmp = [tmp[0], tmp[0], tmp[0]] elif len(self.sigma_ground[j]) == 6: tmp = [tmp[0], 0., 0., tmp[0], 0., tmp[0]] elif len(self.sigma_ground[j]) == 3 and len(tmp) == 6: self.sigma_ground[j] = [self.sigma_ground[j][0], 0., 0., self.sigma_ground[j][1], 0., self.sigma_ground[j][2]] elif len(self.sigma_ground[j]) == 6 and len(tmp) == 3: tmp = [tmp[0], 0., 0., tmp[1], 0., tmp[2]] self.delta_sigma.append( (np.array(self.sigma_ground[j]) - np.array(tmp)).tolist())
[docs] def build_dg_parameter_functions(self, FS, pf_EH=None, eps=False): """ Build discontinuous isotropic or anisotropic parameter functions. """ if self.tensor_flag: FS.S_dgt = df.TensorFunctionSpace(FS.mesh, "DG", 0) # build sigma functions if not self.sigma_tensor_flag: self.sigma_func = FS.build_dg_func(self.sigma) self.sigma_inv_func = FS.build_dg_func(self.sigma, inverse=True) else: self.sigma_func = FS.build_dg_tensor_func(self.sigma, logger=self.logger) self.sigma_inv_func = FS.build_dg_tensor_func( self.sigma, inverse=True, logger=self.logger) if '_s' in self.approach: # build further functions for secondary-field formulations if not self.sigma_tensor_flag: self.dsigma_func = FS.build_dg_func(self.delta_sigma) # self.dsigma_inv_func = FS.build_dg_func(self.delta_sigma, # inverse=True) else: self.dsigma_func = FS.build_dg_tensor_func(self.delta_sigma, logger=self.logger) # self.dsigma_inv_func = FS.build_dg_tensor_func( # self.delta_sigma, inverse=True) # the following functions only if required if 'H' in self.approach or 'F' in self.approach or \ pf_EH == 'H': if not self.sigma_tensor_flag: self.sigma0_func = FS.build_dg_func( [self.sigma_air] + self.sigma_0) self.sigma0_inv_func = FS.build_dg_func( [self.sigma_air] + self.sigma_0, inverse=True) else: self.sigma0_func = FS.build_dg_tensor_func( [self.sigma_air] + self.sigma_0, logger=self.logger) self.sigma0_inv_func = FS.build_dg_tensor_func( [self.sigma_air] + self.sigma_0, inverse=True, logger=self.logger) # build permeability functions if type(self.mu) is float: self.mu_func = df.Constant(self.mu) self.mu_inv_func = df.Constant(1./self.mu) else: if not self.mu_tensor_flag: self.mu_func = FS.build_dg_func(self.mu) self.mu_inv_func = FS.build_dg_func(self.mu, inverse=True) else: self.mu_func = FS.build_dg_tensor_func(self.mu) self.mu_inv_func = FS.build_dg_tensor_func(self.mu, inverse=True, logger=self.logger) # build permittivity functions if required if eps: if type(self.eps) is float: self.eps_func = df.Constant(self.eps) # self.eps_inv_func = df.Constant(1./self.eps) else: if not self.eps_tensor_flag: self.eps_func = FS.build_dg_func(self.eps) # self.eps_inv_func = FS.build_dg_func(self.eps, # inverse=True) else: self.eps_func = FS.build_dg_tensor_func(self.eps, logger=self.logger)
# self.eps_inv_func = FS.build_dg_tensor_func(self.eps, # inverse=True)
[docs] def print_model_parameters(self): """ Print model parameters with adjusted style, e.g., no lengthly lists. """ lp(self.logger, 20, 'MODEL parameters update:') for k, v in sorted(self.__dict__.items()): if isinstance(v, (tuple, list)) and not self.tensor_flag: try: to_print = np.array(v, dtype='object') except ValueError: to_print = v else: to_print = v if k == 'n_tx': to_print = str(v) + ', updated to FE.n_tx if s_type != auto' if k == 'tx': to_print = 'found ' + str(len(v)) + \ ' tx, updated to FE.tx if s_type != auto' if k == 'rx': to_print = 'found ' + str(len(v)) + ' rx' if k == 'frequencies': if len(v) > 1: to_print = 'calculating ' + str(len(v)) + ' frequencies' if k == 'omegas': if len(v) > 1: to_print = 'calculating ' + str(len(v)) + ' omegas' if k == 'currents': if len(v) > 1 and type(v[0]) is not list: to_print = 'current array has shape (n_tx)' elif len(v) > 1 and len(v[0] > 1): to_print = 'current array has shape (n_freqs, n_tx)' if 'sigma' in k and 'tensor_flag' not in k: if len(v) < 3: to_print = np.asarray(v).ravel() else: to_print = 'found ' + str(len(v)) + ' conductivity values' if v is None: to_print = str(v) if k not in ['mpi_cw', 'mpi_cs', 'mpi_rank', 'logger']: lp(self.logger, 20, '--> {:<22}'.format(str(k)) + ' = ' + str(to_print), pre_dash=False)
[docs] def load_mesh_parameters(self): """ Import mesh parameters such as the number of domains or topography information. """ if not os.path.isfile(self.m_dir + '/_' + str(self.file_format) + '/' + self.mesh_name + '.' + str(self.file_format)): if not (os.path.isfile(self.m_dir + '/_mesh/' + self.mesh_name + '.mesh') or os.path.isfile(self.m_dir + '/_vtk/' + self.mesh_name + '.vtk')): lp(self.logger, 50, 'Error! The mesh "' + self.mesh_name + '" could not ' 'be found. Aborting ...') raise SystemExit if os.path.isfile(self.para_dir + '/' + self.mesh_name + '_parameters.json'): self.__dict__.update(json.load(open( self.para_dir + '/' + self.mesh_name + '_parameters.json'))) if len(self.tx) > 0: self.n_tx = len(self.tx) if len(self.rx) > 0: self.n_rx = len(self.rx) if not hasattr(self, 'post_rotate'): self.post_rotate = None if not hasattr(self, 'overwrite_markers'): self.overwrite_markers = None else: # check deprecated version for compatibility of meshes gener- # ated with previous custEM versions, will be disabled in future self.centering = False self.topo = None self.easting_shift = None self.northing_shift = None self.rotation = None if os.path.isfile(self.para_dir + '/' + self.mesh_name + '_parameters.txt'): params = np.genfromtxt(self.para_dir + '/' + self.mesh_name + '_parameters.txt', dtype=None, encoding=None) self.topo = params[0] if 'True' in params[1]: self.centering = True if 'None' not in params[2]: self.easting_shift = float(params[2]) if 'None' not in params[3]: self.northing_shift = float(params[3]) if 'None' not in params[4]: self.rotation = float(params[4]) self.n_layers = int(float(params[5])) for j in range(6, len(params)): self.layer_depths.extend([float(params[j])]) else: lp(self.logger, 50, 'Warning! Mesh parameter file could not ' 'be found for the mesh "' + self.mesh_name + '". ' 'Aborting ...') raise SystemExit
[docs] def check_parameter_dx_conformity(self): """ Evaulate, if number of domains matches number of given physical parameter values. """ if len(self.sigma) != self.n_domains and not self.test_mode: lp(self.logger, 50, 'len sigma: ' + str(len(self.sigma)) + '\n' + 'number of domains: ' + str(self.n_domains) + '\n' + 'Error! The numbers of sigma values and given domains must be ' 'the same. Aborting ...') raise SystemExit if '_s' in self.approach and not self.test_mode and \ len(self.delta_sigma) != self.n_domains: lp(self.logger, 50, 'len delta_sigma: ' + str(len(self.delta_sigma)) + '\n' + 'number of domains: ' + str(self.n_domains) + '\n' + 'Error! The numbers of delta sigma values (user must provide\n' '*n_domains - 1* values for *sigma_0* and *sigma_ground*)\n' 'and given domains must be the same. Aborting ...') raise SystemExit if type(self.mu) is list and len(self.mu) != self.n_domains: lp(self.logger, 50, 'len mu: ' + str(len(self.mu)) + '\n' + 'number of domains: ' + str(self.n_domains) + '\n' + 'Error! The numbers of non-constant *mu* parameters ' '(user must provide\n*n_domains - 1* values) ' 'must match the number of domains. Aborting ...') raise SystemExit if type(self.eps) is list and len(self.eps) != self.n_domains: lp(self.logger, 50, 'len eps: ' + str(len(self.eps)) + '\n' + 'number of domains: ' + str(self.n_domains) + '\n' + 'Error! The numbers of non-constant *eps* parameters ' '(user must provide\n*n_domains - 1* values) ' 'must match the number of domains. Aborting ...') raise SystemExit
[docs] class Domains: """ Domain class called internally from FunctionSpace instance. Methods ------- - add_anomaly() DEPRECATED - should not be used anymore! Add an anomaly domain in the 'FEniCS-way', e.g. predefined anomaly instances from the file *misc/anomaly_expressions.py* or user-defined dolfin expressions can be added to the model, each anomaly gets a new marker """ def __init__(self, mesh, n_dom, domain_func=None, domain_vals=None): """ Defines Domains as marked by the MeshGenerator (TetGen) in the imported mesh. If no domain markers are available, the default domains are *ground* (z < 0 + eps) and *air* (z > 0 - eps). Required arguments ------------------ - mesh, type dolfin Mesh imported mesh - n_dom, type int number of identified domains; if domain markers are available, they will be identified by the method *init_mesh()* Keyword arguments ----------------- - domain_func = None, type dolfin MeshFunction initialized by function *init_mesh* if mesh contains domain markers - domain_vals = None, type list of int list of domain markers identified by the method *init_mesh()* """ uniform_domain_func = df.MeshFunction("size_t", mesh, mesh.topology().dim()) uniform_domain_func.set_all(0) uniform_domain_func2 = df.MeshFunction("size_t", mesh, mesh.topology().dim() - 1) uniform_domain_func2.set_all(0) self.dx_0 = df.Measure('dx', subdomain_data=uniform_domain_func) self.ds_0 = df.Measure('ds', subdomain_data=uniform_domain_func2) if n_dom == 1: try: from custEM.misc import Ground, Air except ModuleNotFoundError or ImportError: lp(self.logger, 30, 'Warning! Definitions for "Ground" and "Air" -space could' ' not be imported. Continuing ...') self.domain_func = df.MeshFunction("size_t", mesh, mesh.topology().dim()) self.domain_func.set_all(0) self.ground = Ground() self.air = Air() self.ground.mark(self.domain_func, 0) self.air.mark(self.domain_func, 1) self.dx = df.Measure('dx', subdomain_data=self.domain_func) self.n_adomains = 0 self.n_domains = self.n_adomains + 2 else: self.domain_func = domain_func self.domain_vals = domain_vals self.dx = df.Measure('dx', subdomain_data=self.domain_func) self.n_domains = n_dom
[docs] def add_anomaly(self, instance): """ DEPRECATED! Do not use anymore without contacting the authors. (from custEM.misc import anomaly_expressions as ae) (MOD.FS.DOM.add_anomaly(ae.Plate(origin=[0.0, -100.0, -400.0]))) """ anom = instance anom.mark(self.domain_func, self.n_domains) self.n_adomains += 1 self.n_domains += 1