# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""
import dolfin as df
import os
import numpy as np
import hashlib
import custEM as ce
from custEM.fem.fem_utils import ApproachBaseFD
from custEM.misc import logger_print as lp
from custEM.misc import run_serial as rs
from custEM.misc import write_h5
from custEM.misc import read_h5
from mpi4py import MPI
import time
[docs]
class PrimaryField(ApproachBaseFD):
"""
PrimaryField class for computing or loading existing primary fields for
secondary field formulations. Called by *FE.build_var_form()* via
the **FunctionSpace** class method *add_Primary_Field()*.
For the documentation of the the ***comet** toolbox (**pyhed** module), it
is referred to the documentation of this software.
Methods
-------
- export_primary_fields()
save calcualted primary fields
- load_primary_fields()
import existing primary fields; if not existing, start calculation
- calc_hed_fullspace_field()
calculate analytic primary-field solution for a HED in a fullspace
- calc_vmd_fullspace_field()
calculate analytic primary-field solution for a VMD in a fullspace
- calc_layered_earth_field()
calculate semi-analytic primary-field solutions for layered earth
geometries with *pyhed*
- init_primary_field_name()
generate an encrypted primary field data file name based on the Tx
configuration to avoid recomputation of exisiting indentical setups
(same mesh, same Tx geometry, same physical parameters)
- write_serial_calculation_parameters()
store **PF** class parameters in a file for the reimport of serial
calculation scripts
Description of PF parameters
----------------------------
Note that further relevant parameters are initialized
by the **ApproachBaseFD** class. They are also valid keyword arguments of
the *build_var_form()* method and can be changed accordingly.
- max_length = None, type float
maximum length of the single HED segments used to approximate a
finite-length bipole or loop transmitter; overwrites *n_segs* if both
parameters are specified
- n_segs = None, type int
total number of segments (single HED transmitters) for approximating
finite-length bipole or loop transmitters; overwritten by *max_length*
if both parameters are specified
- pf_type = 'default', type str
primary field type, alternatives: **fullspace** or **custom**
- pf_name = None, type str
an alternative name for the primary fields of the model can be
defined; it is not recommended to change the default name
- export_pvd = False, type bool
flag controlling if 'pvd' - files are exported (**'True'**) or not
- eval_tx = False, type bool
evaulate Tx coordinates on corresponding edge-midpoints between
nodes and use them for the primary field calculation instead of
the node coordinates, also provides edge lengths and orientations
- force_primary = False, type bool
force re-caculation of exisiting primary field with an identical
parametrization
- pyhed_interp = False, type bool
set True, if the alternative interpolation-based primary field
calculation technique of the *pyhed*-module should be used
- pf_EH_flag = 'E', type str
either **E** or **H** to specify, which type of primary fields
should be used for the FE caluclations later on
- procs_per_proc = 1, type int
change, if more processes should be used to calculate primary fields
with 'pyhed' than specified via the mpirun command (e.g., -n 12) for
sloving the main FEm problem. For instance, a value of **3** would lead
to 36 processes used for the primary field calculation.
- pf_p = 1, type int
polynomials order of Lagrange VectorFunctionSpace which is used to
calculate the primary fields; could be changes to *2*, but this only
leads to significantly increased primary field computation times and
barely more accurate results
- ph_log_level = 0, type int
log level of the pyhed software
- pyhed_drop_tol = 1., type bool
internal drop tolerance for approximating the semi-analytical results
at positions with singular fields values
- dipole_z = 0., type float
shift z-value of source to the subsurface to enable calculations of
primary fields for marine setups; use with caution as this option was
not tested sufficiently
- Assembler = None, type class
*Assembler* class that might be passed to the **PF** class for
custom purposes
"""
def __init__(self, FS, FE, MP, **pf_kwargs):
"""
Initialization of PrimaryField parameters.
Required arguments
------------------
- FS, type class
FunctionSpaces instance
- FE, type class
finite element approach instance
- MP, type class
ModelParameters instance
Keyword arguments
-----------------
The PF class parameters can be updated to custom values by setting
the same keyword arguments in the method **build_var_form()** of
the **MOD.FE** instance. For the documentation, it is referred to the
description on the top.
"""
super(self.__class__, self).__init__()
self.FS = FS
self.FE = FE
self.MP = MP
self.Assembler = None
self.max_length = None
self.n_segs = 10
self.pyhed_interp = False
self.ph_log_level = 0
self.pyhed_drop_tol = 1.
self.eval_tx = False
self.force_primary = False
self.export_pvd = False
self.dipole_z = 0.
self.pf_p = 1
self.pf_EH_flag = 'E'
self.pf_type = 'default'
self.pf_name = None
self.nedelec_pf = False
self.procs_per_proc = 1
for key in pf_kwargs:
if key not in self.__dict__:
if key not in ['bc', 'sigma_from_func', 'tx_topo',
'quasi_static', 'ip', 'a_tol']:
lp(self.MP.logger, 50,
'Error! Unknown PF parameter set:', key)
lp(self.MP.logger, 50,
"... let's stop before something unexpected happens"
" ...", pre_dash=False)
raise SystemExit
self.__dict__.update(pf_kwargs)
if self.s_type in ['path', 'vmds', 'heds']:
self.n_tx = len(self.tx)
self.MP.n_tx = self.n_tx
if self.s_type == 'auto':
self.s_type = 'path'
self.tx = self.MP.tx
self.n_tx = len(self.MP.tx)
self.grounding = self.MP.grounding
if self.s_type == 'mt':
self.n_tx = self.MP.n_tx
if self.eval_tx and '_s' in self.FS.approach:
self.Assembler = ce.fem.TotalFieldAssembler(self.FE, self.FE.bc,
force_p1=True)
self.Assembler.assemble()
self.s_dir = self.MP.r_dir + '/primary_fields/' + self.pf_type + '/'
self.layer_thicknesses = self.MP.layer_thicknesses
self.sigma = []
self.sigma = self.MP.sigma[1:1+self.MP.n_layers]
self.f = self.MP.frequencies[0]
self.omega = self.MP.omegas[0]
self.n_procs = int(MPI.COMM_WORLD.Get_size() *
self.procs_per_proc)
self.E_0 = []
self.H_0 = []
if self.pf_type not in ('default', 'fullspace', 'total', 'custom'):
lp(self.MP.logger, 50,
'Error! Choose either "default, fullspace, total" or "custom" '
'as "pf_type" parameter. Aborting ...')
raise SystemExit
[docs]
def export_primary_fields(self):
"""
Saves calculated primary fields in dedicated HDF5 or *.pvd* files.
"""
V_cg = df.VectorFunctionSpace(self.FS.mesh, 'CG', self.pf_p)
lp(self.MP.logger, 10,
'... saving primary fields in "default" '
'"primary_fields" directory ...', pre_dash=False)
for ti in range(self.n_tx):
E_imag = df.Function(V_cg)
E_real = df.Function(V_cg)
H_imag = df.Function(V_cg)
H_real = df.Function(V_cg)
E_real.vector().set_local(self.E_0[ti].real.ravel())
E_imag.vector().set_local(self.E_0[ti].imag.ravel())
H_real.vector().set_local(self.H_0[ti].real.ravel())
H_imag.vector().set_local(self.H_0[ti].imag.ravel())
if self.MP.file_format == 'h5':
if self.n_tx == 1:
write_h5(self.MP.mpi_cw, E_real, self.s_dir +
self.pf_name + '_E_0_real_cg.h5', counter=0)
write_h5(self.MP.mpi_cw, E_imag, self.s_dir +
self.pf_name + '_E_0_imag_cg.h5', counter=0)
write_h5(self.MP.mpi_cw, H_real, self.s_dir +
self.pf_name + '_H_0_real_cg.h5', counter=0)
write_h5(self.MP.mpi_cw, H_imag, self.s_dir +
self.pf_name + '_H_0_imag_cg.h5', counter=0)
else:
if ti == 0:
self.E_r_file = write_h5(
self.MP.mpi_cw, E_real, self.s_dir +
self.pf_name + '_E_0_real_cg.h5',
counter=ti, close=False)
self.E_i_file = write_h5(
self.MP.mpi_cw, E_imag, self.s_dir +
self.pf_name + '_E_0_imag_cg.h5',
counter=ti, close=False)
self.H_r_file = write_h5(
self.MP.mpi_cw, H_real, self.s_dir +
self.pf_name + '_H_0_real_cg.h5',
counter=ti, close=False)
self.H_i_file = write_h5(
self.MP.mpi_cw, H_imag, self.s_dir +
self.pf_name + '_H_0_imag_cg.h5',
counter=ti, close=False)
elif ti < self.n_tx - 1:
write_h5(self.MP.mpi_cw, E_real, self.s_dir +
self.pf_name + '_E_0_real_cg.h5',
counter=ti,
new=False, close=False, f=self.E_r_file)
write_h5(self.MP.mpi_cw, E_imag, self.s_dir +
self.pf_name + '_E_0_imag_cg.h5',
counter=ti,
new=False, close=False, f=self.E_i_file)
write_h5(self.MP.mpi_cw, H_real, self.s_dir +
self.pf_name + '_H_0_real_cg.h5',
counter=ti,
new=False, close=False, f=self.H_r_file)
write_h5(self.MP.mpi_cw, H_imag, self.s_dir +
self.pf_name + '_H_0_imag_cg.h5',
counter=ti,
new=False, close=False, f=self.H_i_file)
elif ti == self.n_tx - 1:
write_h5(self.MP.mpi_cw, E_real, self.s_dir +
self.pf_name + '_E_0_real_cg.h5',
counter=ti,
new=False, close=True, f=self.E_r_file)
write_h5(self.MP.mpi_cw, E_imag, self.s_dir +
self.pf_name + '_E_0_imag_cg.h5',
counter=ti,
new=False, close=True, f=self.E_i_file)
write_h5(self.MP.mpi_cw, H_real, self.s_dir +
self.pf_name + '_H_0_real_cg.h5',
counter=ti,
new=False, close=True, f=self.H_r_file)
write_h5(self.MP.mpi_cw, H_imag, self.s_dir +
self.pf_name + '_H_0_imag_cg.h5',
counter=ti,
new=False, close=True, f=self.H_i_file)
else:
lp(self.MP.logger, 50,
'Error! Cannot properly export primary fields '
'for multiple tx to one HD5 file. Aborting ...')
raise SystemExit
elif self.MP.file_format == 'xml':
if self.n_tx != 1:
lp(self.MP.logger, 50,
'Error! *xml* format only supported for *n_tx = 1*. '
'Aborting ...')
raise SystemExit
df.File(self.s_dir + self.pf_name + '_E_0_real_cg' +
'.xml') << E_real
df.File(self.s_dir + self.pf_name + '_E_0_imag_cg' +
'.xml') << E_imag
df.File(self.s_dir + self.pf_name + '_H_0_real_cg' +
'.xml') << H_real
df.File(self.s_dir + self.pf_name + '_H_0_imag_cg' +
'.xml') << H_imag
if self.export_pvd:
tx_str = ''
if self.n_tx != 1:
tx_str = '_tx_#' + str(ti)
df.File(self.s_dir + self.pf_name + '_E_0_real_cg' +
tx_str + '.pvd') << E_real
df.File(self.s_dir + self.pf_name + '_E_0_imag_cg' +
tx_str + '.pvd') << E_imag
df.File(self.s_dir + self.pf_name + '_H_0_real_cg' +
tx_str + '.pvd') << H_real
df.File(self.s_dir + self.pf_name + '_H_0_imag_cg' +
tx_str + '.pvd') << H_imag
[docs]
def load_primary_fields(self, boundary_fields=False, fi=0):
"""
Import existing or start calculation of primary fields.
Keyword arguments
-----------------
- boundary_fields = False, type bool
only for internal usage, used to calculate primary fields only on
the domain boundaries to apply inhomogeneous Dirichlet BC
- fi = 0, type int
iteration index over frequencies
"""
lp(self.MP.logger, 20,
'Primary field calculation or import:')
# use always correct frequency for n_freq modelings
self.f = self.MP.frequencies[fi]
self.omega = self.MP.omegas[fi]
name_dummy = False
if self.pf_name is None:
name_dummy = True
self.pf_name = self.init_primary_field_name(boundary_fields)
if self.pf_p != self.FS.p:
lp(self.MP.logger, 10,
'... using specified polynomial order: ' + str(self.pf_p) +
' for primary fields ...', pre_dash=False)
V_cg = df.VectorFunctionSpace(self.FS.mesh, 'CG', self.pf_p)
if not os.path.isfile(self.s_dir + self.pf_name + '_E_0_real_cg.' +
self.MP.file_format) or self.force_primary:
if self.pf_type is 'fullspace':
if self.s_type not in ['hed', 'heds', 'vmd', 'vmds', 'mt']:
lp(self.MP.logger, 50,
'Error! Only "hed(s)", "vmd(s)", "mt" are supported '
'as *s_type* for "fullspace" primary fields. '
'Aborting ...')
raise SystemExit
if self.s_type in ['hed', 'heds']:
lp(self.MP.logger, 20,
' - calculating primary HED fullspace fields '
'for ' + str(self.MP.n_tx) + ' Tx positions - ',
pre_dash=False)
self.calc_hed_fullspace_field(V_cg)
elif self.s_type in ['vmd', 'vmds']:
lp(self.MP.logger, 20,
' - calculating primary VMD fullspace fields '
'for ' + str(self.MP.n_tx) + ' Tx positions - ',
pre_dash=False)
self.calc_vmd_fullspace_field(V_cg)
elif self.s_type in ['mt']:
self.calc_mt_fullspace_field(V_cg)
elif self.pf_type is 'default':
for ti in range(self.n_tx):
self.calc_layered_earth_field(V_cg, boundary_fields, ti)
elif self.pf_type is 'total':
lp(self.MP.logger, 20,
'... importing primary fields calculated with '
'total electric field approach ...', pre_dash=False)
full_name = self.MP.r_dir + '/E_t/' + self.MP.mesh_name + \
'/' + self.pf_name + '_'
r_str = 'f{:d}'.format(fi) + '_real'
i_str = 'f{:d}'.format(fi) + '_imag'
V = df.FunctionSpace(self.FS.mesh, 'N1curl', 1)
self.E_0_r = [df.Function(V) for n_tx in range(self.n_tx)]
self.E_0_i = [df.Function(V) for n_tx in range(self.n_tx)]
self.H_0_r = [df.Function(V) for n_tx in range(self.n_tx)]
self.H_0_i = [df.Function(V) for n_tx in range(self.n_tx)]
df.MPI.barrier(df.MPI.comm_world)
f = df.HDF5File(self.MP.mpi_cw, full_name + 'E_t.h5', "r")
for ti in range(self.MP.n_tx):
f.read(self.E_0_r[ti], r_str + "/vector_%d" % ti)
f.close()
df.MPI.barrier(df.MPI.comm_world)
f = df.HDF5File(self.MP.mpi_cw, full_name + 'E_t.h5', "r")
for ti in range(self.MP.n_tx):
f.read(self.E_0_i[ti], i_str + "/vector_%d" % ti)
f.close()
df.MPI.barrier(df.MPI.comm_world)
f = df.HDF5File(self.MP.mpi_cw, full_name + 'H_t.h5', "r")
for ti in range(self.MP.n_tx):
f.read(self.H_0_r[ti], r_str + "/vector_%d" % ti)
f.close()
df.MPI.barrier(df.MPI.comm_world)
f = df.HDF5File(self.MP.mpi_cw, full_name + 'H_t.h5', "r")
for ti in range(self.MP.n_tx):
f.read(self.H_0_i[ti], i_str + "/vector_%d" % ti)
f.close()
df.MPI.barrier(df.MPI.comm_world)
self.nedelec_pf = True
return
else:
# here one can implement usage of individual primary fields
self.nedelec_pf = True
return
self.export_primary_fields()
lp(self.MP.logger, 20,
'... loading ' + self.pf_type + ' primary fields ...',
pre_dash=False)
target_str = self.s_dir + self.pf_name
if self.MP.file_format == 'h5':
E_0_r = [df.Function(V_cg) for n_tx in range(self.n_tx)]
E_0_i = [df.Function(V_cg) for n_tx in range(self.n_tx)]
H_0_r = [df.Function(V_cg) for n_tx in range(self.n_tx)]
H_0_i = [df.Function(V_cg) for n_tx in range(self.n_tx)]
for ti in range(self.n_tx):
try:
read_h5(self.MP.mpi_cw, E_0_r[ti], target_str +
'_E_0_real_cg.h5', counter=ti)
read_h5(self.MP.mpi_cw, E_0_i[ti], target_str +
'_E_0_imag_cg.h5', counter=ti)
read_h5(self.MP.mpi_cw, H_0_r[ti], target_str +
'_H_0_real_cg.h5', counter=ti)
read_h5(self.MP.mpi_cw, H_0_i[ti], target_str +
'_H_0_imag_cg.h5', counter=ti)
except RuntimeError:
lp(self.MP.logger, 50,
'Error! Cannot read previously computed primary field '
'data.\nMost likely, the underlying mesh with the same '
'name was updated in between.\n'
'Either delete the "results/primary_fields" directory '
'manually or\n'
'use the *force_primary=True* option. Aborting ...')
raise SystemExit
elif self.MP.file_format == 'xml':
if self.n_tx != 1:
lp(self.MP.logger, 50,
'Error! *xml* format only supported for *n_tx = 1*. '
'Aborting ...')
raise SystemExit
E_0_r = [df.Function(V_cg, target_str + '_E_0_real_cg.xml')]
E_0_i = [df.Function(V_cg, target_str + '_E_0_imag_cg.xml')]
H_0_r = [df.Function(V_cg, target_str + '_H_0_real_cg.xml')]
H_0_i = [df.Function(V_cg, target_str + '_H_0_imag_cg.xml')]
if boundary_fields:
self.E_0_r_cg_B = E_0_r
self.E_0_i_cg_B = E_0_i
self.H_0_r_cg_B = H_0_r
self.H_0_i_cg_B = H_0_i
else:
self.E_0_r_cg = E_0_r
self.E_0_i_cg = E_0_i
self.H_0_r_cg = H_0_r
self.H_0_i_cg = H_0_i
if name_dummy:
self.pf_name = None
[docs]
def calc_hed_fullspace_field(self, V_cg):
"""
Calculate analytic primary field solution for a HED in a fullspace.
Required arguments
------------------
- V_cg, type dolfin VectorFunctionSpace
dolfin Lagrange vector function space
"""
for ti in range(self.n_tx):
if self.s_type == 'hed':
origin = self.origin
elif self.s_type == 'heds':
origin = self.tx[ti]
lp(self.MP.logger, 20,
'--> {:<22}'.format('origin') + ' = ', origin,
pre_dash=False)
if ti == 0:
xyz = (V_cg.tabulate_dof_coordinates().reshape(-1, 3)[0::3, :])
n_xyz = len(xyz)
ids = self.MP.current * self.length
kk = np.sqrt((self.MP.sigma_air[0] *
self.omega * self.MP.mu_0) / 2)
k = kk - kk * 1j
E_0 = np.zeros((n_xyz, 3), dtype=complex)
H_0 = np.zeros((n_xyz, 3), dtype=complex)
for i in range(n_xyz):
x, y, z = xyz[i, :] - self.origin
r = np.linalg.norm((x, y, z))
if r < 1e-2:
r = 1.0
const_E = ((-ids / (4 * np.pi *
self.MP.sigma_air[0] * r**3)) *
np.exp(-k * r * 1j))
const_H = ((ids / (4 * np.pi * r**2)) *
(1j * k * r + 1) * np.exp(-k * r * 1j))
if self.azimuth == 0.:
E_0[i, 0] = (const_E * (x * x / r**2) *
(-k**2 * r**2 + 3 * k * r * 1j + 3) +
const_E * (k**2 * r**2 - k * r * 1j - 1))
E_0[i, 1] = (const_E * (x * y / r**2) *
(-k**2 * r**2 + 3 * k * r * 1j + 3))
E_0[i, 2] = (const_E * (x * z / r**2) *
(-k**2 * r**2 + 3 * k * r * 1j + 3))
H_0[i, 0] = const_H * 0.0
H_0[i, 1] = const_H * (-z / r)
H_0[i, 2] = const_H * (y / r)
if self.azimuth == 90.:
E_0[i, 1] = (const_E * (y * y / r**2) *
(-k**2 * r**2 + 3 * k * r * 1j + 3) +
const_E * (k**2 * r**2 - k * r * 1j - 1))
E_0[i, 0] = (const_E * (y * x / r**2) *
(-k**2 * r**2 + 3 * k * r * 1j + 3))
E_0[i, 2] = (const_E * (y * z / r**2) *
(-k**2 * r**2 + 3 * k * r * 1j + 3))
H_0[i, 1] = const_H * 0.0
H_0[i, 0] = const_H * (-z / r)
H_0[i, 2] = const_H * (x / r)
self.E_0.append(E_0)
self.H_0.append(H_0)
[docs]
def calc_vmd_fullspace_field(self, V_cg):
"""
Calculate analytic primary field solution for a VMD in a fullspace.
Required arguments
------------------
- V_cg, type dolfin VectorFunctionSpace
dolfin Lagrange vector function space
"""
for ti in range(self.n_tx):
if self.s_type == 'vmd':
origin = self.origin
elif self.s_type == 'vmds':
origin = self.tx[ti]
lp(self.MP.logger, 20,
'--> {:<22}'.format('origin') + ' = ', origin,
pre_dash=False)
if ti == 0:
xyz = (V_cg.tabulate_dof_coordinates().reshape(-1, 3)[0::3, :])
n_xyz = len(xyz)
kk = np.sqrt((self.MP.sigma_air[0] *
self.omega * self.MP.mu_0) / 2)
k = kk - kk * 1j
E_0 = np.zeros((n_xyz, 3), dtype=complex)
H_0 = np.zeros((n_xyz, 3), dtype=complex)
for i in range(n_xyz):
x, y, z = xyz[i, :] - self.origin
r = np.linalg.norm((x, y, z))
if r < 1e-2:
r = 1.0
const_E = ((self.MP.m_moment * self.MP.mu_0 *
self.omega * 1j) / (4 * np.pi * r**2)) * \
(k * r * 1j + 1) * np.exp(-k * r * 1j)
const_H = ((self.MP.m_moment / (4 * np.pi * r**3)) *
np.exp(-k * r * 1j))
E_0[i, 0] = const_E * (y / r)
E_0[i, 1] = const_E * (-x / r)
E_0[i, 2] = 0.
H_0[i, 0] = const_H * ((x * z / r**2) *
(-k**2 * r**2 + 3 * k * r * 1j + 3))
H_0[i, 1] = const_H * ((y * z / r**2) *
(-k**2 * r**2 + 3 * k * r * 1j + 3))
H_0[i, 2] = const_H * ((z**2 / r**2) *
(-k**2 * r**2 + 3 * k * r * 1j + 3) +
(k**2 * r**2 - k * r * 1j - 1))
self.E_0.append(E_0)
self.H_0.append(H_0)
[docs]
def calc_mt_fullspace_field(self, V_cg):
"""
Calculate analytic primary field solution for a HED in a fullspace.
Required arguments
------------------
- V_cg, type dolfin VectorFunctionSpace
dolfin Lagrange vector function space
"""
xyz = (V_cg.tabulate_dof_coordinates().reshape(-1, 3)[0::3, :])
zmax_local = np.max(self.FS.mesh.coordinates()[:].reshape(
-1, 3)[:, 2])
tmp = self.FE.MP.mpi_cw.gather(zmax_local, root=0)
if self.FE.MP.mpi_cw.Get_rank() == 0:
max_tmp = np.max(np.array(tmp).flatten())
else:
max_tmp = None
zmax = self.FE.MP.mpi_cw.bcast(max_tmp, root=0)
self.origin = [0., 0., zmax]
n_xyz = len(xyz)
kk = np.sqrt((self.MP.sigma_air[0] *
self.omega * self.MP.mu_0) / 2)
k = kk - kk * 1j
E_0 = np.zeros((n_xyz, 3), dtype=complex)
H_0 = np.zeros((n_xyz, 3), dtype=complex)
for i in range(n_xyz):
x, y, z = xyz[i, :] - self.origin
r = np.linalg.norm((x, y, z))
if r < 1e-2:
r = 1.0
const_E = ((-1./ (4 * np.pi * self.MP.sigma_air[0] * r**3)) *
np.exp(-k * r * 1j))
const_H = ((1. / (4 * np.pi * r**2)) *
(1j * k * r + 1) * np.exp(-k * r * 1j))
E_0[i, 0] = const_E * (k**2 * r**2 - k * r * 1j - 1)
H_0[i, 1] = const_H * (-z / r)
self.E_0.append(E_0)
self.H_0.append(H_0)
# second polarization
E_1 = np.zeros((n_xyz, 3), dtype=complex)
H_1 = np.zeros((n_xyz, 3), dtype=complex)
E_1[:, 1] = E_0[:, 0]
H_1[:, 0] = -H_0[:, 1]
self.E_0.append(E_1)
self.H_0.append(H_1)
[docs]
def calc_layered_earth_field(self, V_cg, boundary_fields, ti):
"""
Calculate semi-analytic primary field solution with the *pyhed*
software for layered-earth geometries.
Required arguments
------------------
- V_cg, type dolfin VectorFunctionSpace
dolfin Lagrange vector function space
- boundary_fields, type bool
only for internal usage, used to calculate primary fields only on
the domain boundaries to apply inhomogeneous Dirichlet BC
- ti, type int
number of the Tx if multiple Tx are specified
"""
if 'loop' in self.s_type:
self.grounding = [False]
elif 'hed' in self.s_type or 'line' in self.s_type:
self.grounding = [True]
# use uneven number of dipoles to avoid singularities / artifacts in
# on symmetry plane of dipole source
if self.grounding[ti] and (self.n_segs % 2) == 0:
self.n_segs += 1
lp(self.MP.logger, 20,
' - raised "n_segs" by 1 to get an odd number of segments\n'
'for avoiding singularities in symmetry plane - ')
if boundary_fields:
self.write_serial_calculation_parameters()
rs(os.path.dirname(ce.__file__) +
'/fem/calc_primary_boundary_fields.py',
['-m'], [self.MP.mesh_name],
['-d'], [self.MP.r_dir],
['-t'], [self.pf_type],
['-f'], [self.MP.file_format])
return
from comet import pyhed as ph
ph.log.setLevel(self.ph_log_level)
t1 = time.time()
xyz = (V_cg.tabulate_dof_coordinates().reshape(-1, 3)[0::3, :]).T
if ti == 0:
lp(self.MP.logger, 20,
' - calculating primary "' + self.s_type + '" field - ',
pre_dash=False)
if self.s_type is 'hed':
source = ph.loop.buildDipole(self.origin, length=self.length,
angle=np.deg2rad(self.azimuth))
lp(self.MP.logger, 20,
'--> {:<22}'.format('origin') + ' = ', self.origin,
pre_dash=False)
lp(self.MP.logger, 20,
'--> {:<22}'.format('length') + ' = ', self.length,
pre_dash=False)
lp(self.MP.logger, 20,
'--> {:<22}'.format('segments') + ' = ', self.n_segs,
pre_dash=False)
elif self.s_type is 'line':
source = ph.loop.buildLine(self.start[:2], self.stop[:2],
num_segs=self.n_segs,
max_length=self.max_length)
lp(self.MP.logger, 20,
'--> {:<22}'.format('start') + ' = ', self.start,
pre_dash=False)
lp(self.MP.logger, 20,
'--> {:<22}'.format('stop') + ' = ', self.stop,
pre_dash=False)
lp(self.MP.logger, 20,
'--> {:<22}'.format('segments') + ' = ', self.n_segs,
pre_dash=False)
elif self.s_type is 'loop_c':
source = ph.loop.buildCircle(self.radius, P=self.origin,
num_segs=self.n_segs,
max_length=self.max_length)
lp(self.MP.logger, 20,
'--> {:<22}'.format('origin') + ' = ', self.origin,
pre_dash=False)
lp(self.MP.logger, 20,
'--> {:<22}'.format('radius') + ' = ', self.radius,
pre_dash=False)
lp(self.MP.logger, 20,
'--> {:<22}'.format('segments') + ' = ', self.n_segs,
pre_dash=False)
elif self.s_type is 'loop_r':
if df.near(self.start[0], self.stop[0], eps=1e-4) or \
df.near(self.start[1], self.stop[1], eps=1e-4):
lp(self.MP.logger, 50,
'Error! Given start and stop coordinates do not '
'match to a rectangular loop source: x or y start or stop '
'coordinates are identical')
raise SystemExit
c_1 = [self.start[0], self.start[1], 0.0]
c_2 = [self.stop[0], self.start[1], 0.0]
c_3 = [self.stop[0], self.stop[1], 0.0]
c_4 = [self.start[0], self.stop[1], 0.0]
source = ph.loop.buildRectangle(np.array((c_1, c_2, c_3, c_4)),
num_segs=self.n_segs,
max_length=self.max_length)
lp(self.MP.logger, 20,
'--> {:<22}'.format('corner 1') + ' = ', c_1,
pre_dash=False)
lp(self.MP.logger, 20,
'--> {:<22}'.format('corner 2') + ' = ', c_2,
pre_dash=False)
lp(self.MP.logger, 20,
'--> {:<22}'.format('corner 3') + ' = ', c_3,
pre_dash=False)
lp(self.MP.logger, 20,
'--> {:<22}'.format('corner 4') + ' = ', c_4,
pre_dash=False)
lp(self.MP.logger, 20,
'--> {:<22}'.format('segments') + ' = ', self.n_segs,
pre_dash=False)
elif self.s_type is 'path':
if self.Assembler is None:
source = ph.loop.buildLoop(self.tx[ti],
num_segs=self.n_segs,
max_length=self.max_length,
grounded=self.grounding[ti])
else:
source = ph.loop.Loop([self.Assembler.midpoints[ti],
self.Assembler.orientations[ti],
self.Assembler.lengths[ti],
self.grounding[ti], 'custEM'])
lp(self.MP.logger, 20,
'... tx number ' + str(ti) + ' ...', pre_dash=False)
lp(self.MP.logger, 20,
'--> {:<22}'.format('grounding') + ' = ',
self.grounding[ti], pre_dash=False)
else:
lp(self.MP.logger, 50,
'Error!, supported "s_type" parameters for '
'default" primary fields are: "hed, "line", "loop_c"'
' or "loop_r"')
raise SystemExit
source.setLoopMesh(xyz)
source.config.rho = 1. / np.array(self.MP.sigma[
1:1+self.MP.n_layers]).ravel()
source.config.d = self.MP.layer_thicknesses
source.config.f = self.f
source.config.current = self.MP.current
# ################### calc E-Field ##################################
source.config.ftype = 'E'
E_0 = source.calculate(interpolate=self.pyhed_interp,
src_z=self.dipole_z,
num_cpu=self.procs_per_proc,
drop_tol=self.pyhed_drop_tol)
# ################### calc H-Field ##################################
source.config.ftype = 'H'
if self.dipole_z != 0.:
lp(self.MP.logger, 30,
'Warning! Shifting Tx to depth is only considered for '
'primary E fields, not H fields! Continuning ...')
H_0 = source.calculate(interpolate=self.pyhed_interp,
src_z=self.dipole_z, # ignored !
num_cpu=self.procs_per_proc,
drop_tol=self.pyhed_drop_tol)
if np.shape(E_0)[0] == 3 and np.shape(E_0)[1] != 3:
E_0 = E_0.T # always (n, 3) to ensure
H_0 = H_0.T # .ravel() is working correctly later on
# avoid numerical issues:
E_0.real[np.abs(E_0.real) < 1e-32] = 1e-32
E_0.imag[np.abs(E_0.imag) < 1e-32] = 1e-32
H_0.real[np.abs(H_0.real) < 1e-32] = 1e-32
H_0.imag[np.abs(H_0.imag) < 1e-32] = 1e-32
self.E_0.append(E_0)
self.H_0.append(H_0)
df.MPI.barrier(self.MP.mpi_cw)
if ti == self.n_tx - 1:
t2 = time.time() - t1
lp(self.MP.logger, 20,
' - computation time (s) for primary fields [s]: --> ', t2,
pre_dash=False)
[docs]
def init_primary_field_name(self, boundary_fields):
"""
Initialize an encoded, unique name for the files to store primary
fields based on the mesh, transmitter characteristics and others.
Required arguments
------------------
- boundary_fields = False, type bool
only for internal usage, used to calculate primary fields only on
the domain boundaries to apply inhomogeneous Dirichlet BC
"""
name = (self.MP.mesh_name + '_' + self.pf_type + '_' + self.s_type +
'_nL_' + str(self.MP.n_layers) + '_sg_' +
'_format_' + self.MP.file_format +
'_f_' + str(self.f) + '_current_' +
str(self.MP.current) +
'_Sg_' + str(self.MP.sigma_0[:self.MP.n_layers]) +
'_n_segs_' + str(self.n_segs) + '_max_length' +
str(self.max_length) + '_x0_' + str(self.origin[0]) +
'_y0_' + str(self.origin[1]) + '_z0_' + str(self.origin[2]) +
'_R_' + str(self.radius) + '_sx0_' + str(self.start[0]) +
'_sy0_' + str(self.start[1]) + '_sy0_' + str(self.start[2]) +
'_sx1_' + str(self.stop[0]) + '_sz1_' + str(self.stop[1]) +
'_sz1_' + str(self.stop[2]) + '_pf_p_' + str(self.pf_p) +
str(self.tx))
if boundary_fields:
name += '_BF'
return(hashlib.sha256(name.encode('ascii')).hexdigest())
[docs]
def write_serial_calculation_parameters(self):
"""
Export geometry and transmitter setup for seperate primary
field calculations in serial.
Required arguments
------------------
- path, type str
current working directory, switch to this directory after the
export of the parameter file
"""
import json
if self.tx is not None:
np.save(self.MP.r_dir + '/primary_fields/' + self.pf_type +
'/source_path.npy', self.tx)
self.tx = 'import'
A = self.__dict__.copy()
del A['FS']
del A['MP']
del A['FE']
del A['H_0']
del A['E_0']
del A['E_0_r_cg']
del A['E_0_i_cg']
del A['H_0_r_cg']
del A['H_0_i_cg']
if df.MPI.rank(df.MPI.comm_world) == 0:
for key, value in A.items():
print(key, value)
json = json.dumps(A)
f = open(self.MP.r_dir + '/primary_fields/' + self.pf_type +
'/pf_dict.json', 'w')
f.write(json)
f.close()