Source code for custEM.fem.primary_fields

# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""

import dolfin as df
import os
import numpy as np
import hashlib
import custEM as ce
from custEM.fem.fem_utils import ApproachBaseFD
from custEM.misc import logger_print as lp
from custEM.misc import run_serial as rs
from custEM.misc import write_h5
from custEM.misc import read_h5
from mpi4py import MPI
import time


[docs] class PrimaryField(ApproachBaseFD): """ PrimaryField class for computing or loading existing primary fields for secondary field formulations. Called by *FE.build_var_form()* via the **FunctionSpace** class method *add_Primary_Field()*. For the documentation of the the ***comet** toolbox (**pyhed** module), it is referred to the documentation of this software. Methods ------- - export_primary_fields() save calcualted primary fields - load_primary_fields() import existing primary fields; if not existing, start calculation - calc_hed_fullspace_field() calculate analytic primary-field solution for a HED in a fullspace - calc_vmd_fullspace_field() calculate analytic primary-field solution for a VMD in a fullspace - calc_layered_earth_field() calculate semi-analytic primary-field solutions for layered earth geometries with *pyhed* - init_primary_field_name() generate an encrypted primary field data file name based on the Tx configuration to avoid recomputation of exisiting indentical setups (same mesh, same Tx geometry, same physical parameters) - write_serial_calculation_parameters() store **PF** class parameters in a file for the reimport of serial calculation scripts Description of PF parameters ---------------------------- Note that further relevant parameters are initialized by the **ApproachBaseFD** class. They are also valid keyword arguments of the *build_var_form()* method and can be changed accordingly. - max_length = None, type float maximum length of the single HED segments used to approximate a finite-length bipole or loop transmitter; overwrites *n_segs* if both parameters are specified - n_segs = None, type int total number of segments (single HED transmitters) for approximating finite-length bipole or loop transmitters; overwritten by *max_length* if both parameters are specified - pf_type = 'default', type str primary field type, alternatives: **fullspace** or **custom** - pf_name = None, type str an alternative name for the primary fields of the model can be defined; it is not recommended to change the default name - export_pvd = False, type bool flag controlling if 'pvd' - files are exported (**'True'**) or not - eval_tx = False, type bool evaulate Tx coordinates on corresponding edge-midpoints between nodes and use them for the primary field calculation instead of the node coordinates, also provides edge lengths and orientations - force_primary = False, type bool force re-caculation of exisiting primary field with an identical parametrization - pyhed_interp = False, type bool set True, if the alternative interpolation-based primary field calculation technique of the *pyhed*-module should be used - pf_EH_flag = 'E', type str either **E** or **H** to specify, which type of primary fields should be used for the FE caluclations later on - procs_per_proc = 1, type int change, if more processes should be used to calculate primary fields with 'pyhed' than specified via the mpirun command (e.g., -n 12) for sloving the main FEm problem. For instance, a value of **3** would lead to 36 processes used for the primary field calculation. - pf_p = 1, type int polynomials order of Lagrange VectorFunctionSpace which is used to calculate the primary fields; could be changes to *2*, but this only leads to significantly increased primary field computation times and barely more accurate results - ph_log_level = 0, type int log level of the pyhed software - pyhed_drop_tol = 1., type bool internal drop tolerance for approximating the semi-analytical results at positions with singular fields values - dipole_z = 0., type float shift z-value of source to the subsurface to enable calculations of primary fields for marine setups; use with caution as this option was not tested sufficiently - Assembler = None, type class *Assembler* class that might be passed to the **PF** class for custom purposes """ def __init__(self, FS, FE, MP, **pf_kwargs): """ Initialization of PrimaryField parameters. Required arguments ------------------ - FS, type class FunctionSpaces instance - FE, type class finite element approach instance - MP, type class ModelParameters instance Keyword arguments ----------------- The PF class parameters can be updated to custom values by setting the same keyword arguments in the method **build_var_form()** of the **MOD.FE** instance. For the documentation, it is referred to the description on the top. """ super(self.__class__, self).__init__() self.FS = FS self.FE = FE self.MP = MP self.Assembler = None self.max_length = None self.n_segs = 10 self.pyhed_interp = False self.ph_log_level = 0 self.pyhed_drop_tol = 1. self.eval_tx = False self.force_primary = False self.export_pvd = False self.dipole_z = 0. self.pf_p = 1 self.pf_EH_flag = 'E' self.pf_type = 'default' self.pf_name = None self.nedelec_pf = False self.procs_per_proc = 1 for key in pf_kwargs: if key not in self.__dict__: if key not in ['bc', 'sigma_from_func', 'tx_topo', 'quasi_static', 'ip', 'a_tol']: lp(self.MP.logger, 50, 'Error! Unknown PF parameter set:', key) lp(self.MP.logger, 50, "... let's stop before something unexpected happens" " ...", pre_dash=False) raise SystemExit self.__dict__.update(pf_kwargs) if self.s_type in ['path', 'vmds', 'heds']: self.n_tx = len(self.tx) self.MP.n_tx = self.n_tx if self.s_type == 'auto': self.s_type = 'path' self.tx = self.MP.tx self.n_tx = len(self.MP.tx) self.grounding = self.MP.grounding if self.s_type == 'mt': self.n_tx = self.MP.n_tx if self.eval_tx and '_s' in self.FS.approach: self.Assembler = ce.fem.TotalFieldAssembler(self.FE, self.FE.bc, force_p1=True) self.Assembler.assemble() self.s_dir = self.MP.r_dir + '/primary_fields/' + self.pf_type + '/' self.layer_thicknesses = self.MP.layer_thicknesses self.sigma = [] self.sigma = self.MP.sigma[1:1+self.MP.n_layers] self.f = self.MP.frequencies[0] self.omega = self.MP.omegas[0] self.n_procs = int(MPI.COMM_WORLD.Get_size() * self.procs_per_proc) self.E_0 = [] self.H_0 = [] if self.pf_type not in ('default', 'fullspace', 'total', 'custom'): lp(self.MP.logger, 50, 'Error! Choose either "default, fullspace, total" or "custom" ' 'as "pf_type" parameter. Aborting ...') raise SystemExit
[docs] def export_primary_fields(self): """ Saves calculated primary fields in dedicated HDF5 or *.pvd* files. """ V_cg = df.VectorFunctionSpace(self.FS.mesh, 'CG', self.pf_p) lp(self.MP.logger, 10, '... saving primary fields in "default" ' '"primary_fields" directory ...', pre_dash=False) for ti in range(self.n_tx): E_imag = df.Function(V_cg) E_real = df.Function(V_cg) H_imag = df.Function(V_cg) H_real = df.Function(V_cg) E_real.vector().set_local(self.E_0[ti].real.ravel()) E_imag.vector().set_local(self.E_0[ti].imag.ravel()) H_real.vector().set_local(self.H_0[ti].real.ravel()) H_imag.vector().set_local(self.H_0[ti].imag.ravel()) if self.MP.file_format == 'h5': if self.n_tx == 1: write_h5(self.MP.mpi_cw, E_real, self.s_dir + self.pf_name + '_E_0_real_cg.h5', counter=0) write_h5(self.MP.mpi_cw, E_imag, self.s_dir + self.pf_name + '_E_0_imag_cg.h5', counter=0) write_h5(self.MP.mpi_cw, H_real, self.s_dir + self.pf_name + '_H_0_real_cg.h5', counter=0) write_h5(self.MP.mpi_cw, H_imag, self.s_dir + self.pf_name + '_H_0_imag_cg.h5', counter=0) else: if ti == 0: self.E_r_file = write_h5( self.MP.mpi_cw, E_real, self.s_dir + self.pf_name + '_E_0_real_cg.h5', counter=ti, close=False) self.E_i_file = write_h5( self.MP.mpi_cw, E_imag, self.s_dir + self.pf_name + '_E_0_imag_cg.h5', counter=ti, close=False) self.H_r_file = write_h5( self.MP.mpi_cw, H_real, self.s_dir + self.pf_name + '_H_0_real_cg.h5', counter=ti, close=False) self.H_i_file = write_h5( self.MP.mpi_cw, H_imag, self.s_dir + self.pf_name + '_H_0_imag_cg.h5', counter=ti, close=False) elif ti < self.n_tx - 1: write_h5(self.MP.mpi_cw, E_real, self.s_dir + self.pf_name + '_E_0_real_cg.h5', counter=ti, new=False, close=False, f=self.E_r_file) write_h5(self.MP.mpi_cw, E_imag, self.s_dir + self.pf_name + '_E_0_imag_cg.h5', counter=ti, new=False, close=False, f=self.E_i_file) write_h5(self.MP.mpi_cw, H_real, self.s_dir + self.pf_name + '_H_0_real_cg.h5', counter=ti, new=False, close=False, f=self.H_r_file) write_h5(self.MP.mpi_cw, H_imag, self.s_dir + self.pf_name + '_H_0_imag_cg.h5', counter=ti, new=False, close=False, f=self.H_i_file) elif ti == self.n_tx - 1: write_h5(self.MP.mpi_cw, E_real, self.s_dir + self.pf_name + '_E_0_real_cg.h5', counter=ti, new=False, close=True, f=self.E_r_file) write_h5(self.MP.mpi_cw, E_imag, self.s_dir + self.pf_name + '_E_0_imag_cg.h5', counter=ti, new=False, close=True, f=self.E_i_file) write_h5(self.MP.mpi_cw, H_real, self.s_dir + self.pf_name + '_H_0_real_cg.h5', counter=ti, new=False, close=True, f=self.H_r_file) write_h5(self.MP.mpi_cw, H_imag, self.s_dir + self.pf_name + '_H_0_imag_cg.h5', counter=ti, new=False, close=True, f=self.H_i_file) else: lp(self.MP.logger, 50, 'Error! Cannot properly export primary fields ' 'for multiple tx to one HD5 file. Aborting ...') raise SystemExit elif self.MP.file_format == 'xml': if self.n_tx != 1: lp(self.MP.logger, 50, 'Error! *xml* format only supported for *n_tx = 1*. ' 'Aborting ...') raise SystemExit df.File(self.s_dir + self.pf_name + '_E_0_real_cg' + '.xml') << E_real df.File(self.s_dir + self.pf_name + '_E_0_imag_cg' + '.xml') << E_imag df.File(self.s_dir + self.pf_name + '_H_0_real_cg' + '.xml') << H_real df.File(self.s_dir + self.pf_name + '_H_0_imag_cg' + '.xml') << H_imag if self.export_pvd: tx_str = '' if self.n_tx != 1: tx_str = '_tx_#' + str(ti) df.File(self.s_dir + self.pf_name + '_E_0_real_cg' + tx_str + '.pvd') << E_real df.File(self.s_dir + self.pf_name + '_E_0_imag_cg' + tx_str + '.pvd') << E_imag df.File(self.s_dir + self.pf_name + '_H_0_real_cg' + tx_str + '.pvd') << H_real df.File(self.s_dir + self.pf_name + '_H_0_imag_cg' + tx_str + '.pvd') << H_imag
[docs] def load_primary_fields(self, boundary_fields=False, fi=0): """ Import existing or start calculation of primary fields. Keyword arguments ----------------- - boundary_fields = False, type bool only for internal usage, used to calculate primary fields only on the domain boundaries to apply inhomogeneous Dirichlet BC - fi = 0, type int iteration index over frequencies """ lp(self.MP.logger, 20, 'Primary field calculation or import:') # use always correct frequency for n_freq modelings self.f = self.MP.frequencies[fi] self.omega = self.MP.omegas[fi] name_dummy = False if self.pf_name is None: name_dummy = True self.pf_name = self.init_primary_field_name(boundary_fields) if self.pf_p != self.FS.p: lp(self.MP.logger, 10, '... using specified polynomial order: ' + str(self.pf_p) + ' for primary fields ...', pre_dash=False) V_cg = df.VectorFunctionSpace(self.FS.mesh, 'CG', self.pf_p) if not os.path.isfile(self.s_dir + self.pf_name + '_E_0_real_cg.' + self.MP.file_format) or self.force_primary: if self.pf_type is 'fullspace': if self.s_type not in ['hed', 'heds', 'vmd', 'vmds', 'mt']: lp(self.MP.logger, 50, 'Error! Only "hed(s)", "vmd(s)", "mt" are supported ' 'as *s_type* for "fullspace" primary fields. ' 'Aborting ...') raise SystemExit if self.s_type in ['hed', 'heds']: lp(self.MP.logger, 20, ' - calculating primary HED fullspace fields ' 'for ' + str(self.MP.n_tx) + ' Tx positions - ', pre_dash=False) self.calc_hed_fullspace_field(V_cg) elif self.s_type in ['vmd', 'vmds']: lp(self.MP.logger, 20, ' - calculating primary VMD fullspace fields ' 'for ' + str(self.MP.n_tx) + ' Tx positions - ', pre_dash=False) self.calc_vmd_fullspace_field(V_cg) elif self.s_type in ['mt']: self.calc_mt_fullspace_field(V_cg) elif self.pf_type is 'default': for ti in range(self.n_tx): self.calc_layered_earth_field(V_cg, boundary_fields, ti) elif self.pf_type is 'total': lp(self.MP.logger, 20, '... importing primary fields calculated with ' 'total electric field approach ...', pre_dash=False) full_name = self.MP.r_dir + '/E_t/' + self.MP.mesh_name + \ '/' + self.pf_name + '_' r_str = 'f{:d}'.format(fi) + '_real' i_str = 'f{:d}'.format(fi) + '_imag' V = df.FunctionSpace(self.FS.mesh, 'N1curl', 1) self.E_0_r = [df.Function(V) for n_tx in range(self.n_tx)] self.E_0_i = [df.Function(V) for n_tx in range(self.n_tx)] self.H_0_r = [df.Function(V) for n_tx in range(self.n_tx)] self.H_0_i = [df.Function(V) for n_tx in range(self.n_tx)] df.MPI.barrier(df.MPI.comm_world) f = df.HDF5File(self.MP.mpi_cw, full_name + 'E_t.h5', "r") for ti in range(self.MP.n_tx): f.read(self.E_0_r[ti], r_str + "/vector_%d" % ti) f.close() df.MPI.barrier(df.MPI.comm_world) f = df.HDF5File(self.MP.mpi_cw, full_name + 'E_t.h5', "r") for ti in range(self.MP.n_tx): f.read(self.E_0_i[ti], i_str + "/vector_%d" % ti) f.close() df.MPI.barrier(df.MPI.comm_world) f = df.HDF5File(self.MP.mpi_cw, full_name + 'H_t.h5', "r") for ti in range(self.MP.n_tx): f.read(self.H_0_r[ti], r_str + "/vector_%d" % ti) f.close() df.MPI.barrier(df.MPI.comm_world) f = df.HDF5File(self.MP.mpi_cw, full_name + 'H_t.h5', "r") for ti in range(self.MP.n_tx): f.read(self.H_0_i[ti], i_str + "/vector_%d" % ti) f.close() df.MPI.barrier(df.MPI.comm_world) self.nedelec_pf = True return else: # here one can implement usage of individual primary fields self.nedelec_pf = True return self.export_primary_fields() lp(self.MP.logger, 20, '... loading ' + self.pf_type + ' primary fields ...', pre_dash=False) target_str = self.s_dir + self.pf_name if self.MP.file_format == 'h5': E_0_r = [df.Function(V_cg) for n_tx in range(self.n_tx)] E_0_i = [df.Function(V_cg) for n_tx in range(self.n_tx)] H_0_r = [df.Function(V_cg) for n_tx in range(self.n_tx)] H_0_i = [df.Function(V_cg) for n_tx in range(self.n_tx)] for ti in range(self.n_tx): try: read_h5(self.MP.mpi_cw, E_0_r[ti], target_str + '_E_0_real_cg.h5', counter=ti) read_h5(self.MP.mpi_cw, E_0_i[ti], target_str + '_E_0_imag_cg.h5', counter=ti) read_h5(self.MP.mpi_cw, H_0_r[ti], target_str + '_H_0_real_cg.h5', counter=ti) read_h5(self.MP.mpi_cw, H_0_i[ti], target_str + '_H_0_imag_cg.h5', counter=ti) except RuntimeError: lp(self.MP.logger, 50, 'Error! Cannot read previously computed primary field ' 'data.\nMost likely, the underlying mesh with the same ' 'name was updated in between.\n' 'Either delete the "results/primary_fields" directory ' 'manually or\n' 'use the *force_primary=True* option. Aborting ...') raise SystemExit elif self.MP.file_format == 'xml': if self.n_tx != 1: lp(self.MP.logger, 50, 'Error! *xml* format only supported for *n_tx = 1*. ' 'Aborting ...') raise SystemExit E_0_r = [df.Function(V_cg, target_str + '_E_0_real_cg.xml')] E_0_i = [df.Function(V_cg, target_str + '_E_0_imag_cg.xml')] H_0_r = [df.Function(V_cg, target_str + '_H_0_real_cg.xml')] H_0_i = [df.Function(V_cg, target_str + '_H_0_imag_cg.xml')] if boundary_fields: self.E_0_r_cg_B = E_0_r self.E_0_i_cg_B = E_0_i self.H_0_r_cg_B = H_0_r self.H_0_i_cg_B = H_0_i else: self.E_0_r_cg = E_0_r self.E_0_i_cg = E_0_i self.H_0_r_cg = H_0_r self.H_0_i_cg = H_0_i if name_dummy: self.pf_name = None
[docs] def calc_hed_fullspace_field(self, V_cg): """ Calculate analytic primary field solution for a HED in a fullspace. Required arguments ------------------ - V_cg, type dolfin VectorFunctionSpace dolfin Lagrange vector function space """ for ti in range(self.n_tx): if self.s_type == 'hed': origin = self.origin elif self.s_type == 'heds': origin = self.tx[ti] lp(self.MP.logger, 20, '--> {:<22}'.format('origin') + ' = ', origin, pre_dash=False) if ti == 0: xyz = (V_cg.tabulate_dof_coordinates().reshape(-1, 3)[0::3, :]) n_xyz = len(xyz) ids = self.MP.current * self.length kk = np.sqrt((self.MP.sigma_air[0] * self.omega * self.MP.mu_0) / 2) k = kk - kk * 1j E_0 = np.zeros((n_xyz, 3), dtype=complex) H_0 = np.zeros((n_xyz, 3), dtype=complex) for i in range(n_xyz): x, y, z = xyz[i, :] - self.origin r = np.linalg.norm((x, y, z)) if r < 1e-2: r = 1.0 const_E = ((-ids / (4 * np.pi * self.MP.sigma_air[0] * r**3)) * np.exp(-k * r * 1j)) const_H = ((ids / (4 * np.pi * r**2)) * (1j * k * r + 1) * np.exp(-k * r * 1j)) if self.azimuth == 0.: E_0[i, 0] = (const_E * (x * x / r**2) * (-k**2 * r**2 + 3 * k * r * 1j + 3) + const_E * (k**2 * r**2 - k * r * 1j - 1)) E_0[i, 1] = (const_E * (x * y / r**2) * (-k**2 * r**2 + 3 * k * r * 1j + 3)) E_0[i, 2] = (const_E * (x * z / r**2) * (-k**2 * r**2 + 3 * k * r * 1j + 3)) H_0[i, 0] = const_H * 0.0 H_0[i, 1] = const_H * (-z / r) H_0[i, 2] = const_H * (y / r) if self.azimuth == 90.: E_0[i, 1] = (const_E * (y * y / r**2) * (-k**2 * r**2 + 3 * k * r * 1j + 3) + const_E * (k**2 * r**2 - k * r * 1j - 1)) E_0[i, 0] = (const_E * (y * x / r**2) * (-k**2 * r**2 + 3 * k * r * 1j + 3)) E_0[i, 2] = (const_E * (y * z / r**2) * (-k**2 * r**2 + 3 * k * r * 1j + 3)) H_0[i, 1] = const_H * 0.0 H_0[i, 0] = const_H * (-z / r) H_0[i, 2] = const_H * (x / r) self.E_0.append(E_0) self.H_0.append(H_0)
[docs] def calc_vmd_fullspace_field(self, V_cg): """ Calculate analytic primary field solution for a VMD in a fullspace. Required arguments ------------------ - V_cg, type dolfin VectorFunctionSpace dolfin Lagrange vector function space """ for ti in range(self.n_tx): if self.s_type == 'vmd': origin = self.origin elif self.s_type == 'vmds': origin = self.tx[ti] lp(self.MP.logger, 20, '--> {:<22}'.format('origin') + ' = ', origin, pre_dash=False) if ti == 0: xyz = (V_cg.tabulate_dof_coordinates().reshape(-1, 3)[0::3, :]) n_xyz = len(xyz) kk = np.sqrt((self.MP.sigma_air[0] * self.omega * self.MP.mu_0) / 2) k = kk - kk * 1j E_0 = np.zeros((n_xyz, 3), dtype=complex) H_0 = np.zeros((n_xyz, 3), dtype=complex) for i in range(n_xyz): x, y, z = xyz[i, :] - self.origin r = np.linalg.norm((x, y, z)) if r < 1e-2: r = 1.0 const_E = ((self.MP.m_moment * self.MP.mu_0 * self.omega * 1j) / (4 * np.pi * r**2)) * \ (k * r * 1j + 1) * np.exp(-k * r * 1j) const_H = ((self.MP.m_moment / (4 * np.pi * r**3)) * np.exp(-k * r * 1j)) E_0[i, 0] = const_E * (y / r) E_0[i, 1] = const_E * (-x / r) E_0[i, 2] = 0. H_0[i, 0] = const_H * ((x * z / r**2) * (-k**2 * r**2 + 3 * k * r * 1j + 3)) H_0[i, 1] = const_H * ((y * z / r**2) * (-k**2 * r**2 + 3 * k * r * 1j + 3)) H_0[i, 2] = const_H * ((z**2 / r**2) * (-k**2 * r**2 + 3 * k * r * 1j + 3) + (k**2 * r**2 - k * r * 1j - 1)) self.E_0.append(E_0) self.H_0.append(H_0)
[docs] def calc_mt_fullspace_field(self, V_cg): """ Calculate analytic primary field solution for a HED in a fullspace. Required arguments ------------------ - V_cg, type dolfin VectorFunctionSpace dolfin Lagrange vector function space """ xyz = (V_cg.tabulate_dof_coordinates().reshape(-1, 3)[0::3, :]) zmax_local = np.max(self.FS.mesh.coordinates()[:].reshape( -1, 3)[:, 2]) tmp = self.FE.MP.mpi_cw.gather(zmax_local, root=0) if self.FE.MP.mpi_cw.Get_rank() == 0: max_tmp = np.max(np.array(tmp).flatten()) else: max_tmp = None zmax = self.FE.MP.mpi_cw.bcast(max_tmp, root=0) self.origin = [0., 0., zmax] n_xyz = len(xyz) kk = np.sqrt((self.MP.sigma_air[0] * self.omega * self.MP.mu_0) / 2) k = kk - kk * 1j E_0 = np.zeros((n_xyz, 3), dtype=complex) H_0 = np.zeros((n_xyz, 3), dtype=complex) for i in range(n_xyz): x, y, z = xyz[i, :] - self.origin r = np.linalg.norm((x, y, z)) if r < 1e-2: r = 1.0 const_E = ((-1./ (4 * np.pi * self.MP.sigma_air[0] * r**3)) * np.exp(-k * r * 1j)) const_H = ((1. / (4 * np.pi * r**2)) * (1j * k * r + 1) * np.exp(-k * r * 1j)) E_0[i, 0] = const_E * (k**2 * r**2 - k * r * 1j - 1) H_0[i, 1] = const_H * (-z / r) self.E_0.append(E_0) self.H_0.append(H_0) # second polarization E_1 = np.zeros((n_xyz, 3), dtype=complex) H_1 = np.zeros((n_xyz, 3), dtype=complex) E_1[:, 1] = E_0[:, 0] H_1[:, 0] = -H_0[:, 1] self.E_0.append(E_1) self.H_0.append(H_1)
[docs] def calc_layered_earth_field(self, V_cg, boundary_fields, ti): """ Calculate semi-analytic primary field solution with the *pyhed* software for layered-earth geometries. Required arguments ------------------ - V_cg, type dolfin VectorFunctionSpace dolfin Lagrange vector function space - boundary_fields, type bool only for internal usage, used to calculate primary fields only on the domain boundaries to apply inhomogeneous Dirichlet BC - ti, type int number of the Tx if multiple Tx are specified """ if 'loop' in self.s_type: self.grounding = [False] elif 'hed' in self.s_type or 'line' in self.s_type: self.grounding = [True] # use uneven number of dipoles to avoid singularities / artifacts in # on symmetry plane of dipole source if self.grounding[ti] and (self.n_segs % 2) == 0: self.n_segs += 1 lp(self.MP.logger, 20, ' - raised "n_segs" by 1 to get an odd number of segments\n' 'for avoiding singularities in symmetry plane - ') if boundary_fields: self.write_serial_calculation_parameters() rs(os.path.dirname(ce.__file__) + '/fem/calc_primary_boundary_fields.py', ['-m'], [self.MP.mesh_name], ['-d'], [self.MP.r_dir], ['-t'], [self.pf_type], ['-f'], [self.MP.file_format]) return from comet import pyhed as ph ph.log.setLevel(self.ph_log_level) t1 = time.time() xyz = (V_cg.tabulate_dof_coordinates().reshape(-1, 3)[0::3, :]).T if ti == 0: lp(self.MP.logger, 20, ' - calculating primary "' + self.s_type + '" field - ', pre_dash=False) if self.s_type is 'hed': source = ph.loop.buildDipole(self.origin, length=self.length, angle=np.deg2rad(self.azimuth)) lp(self.MP.logger, 20, '--> {:<22}'.format('origin') + ' = ', self.origin, pre_dash=False) lp(self.MP.logger, 20, '--> {:<22}'.format('length') + ' = ', self.length, pre_dash=False) lp(self.MP.logger, 20, '--> {:<22}'.format('segments') + ' = ', self.n_segs, pre_dash=False) elif self.s_type is 'line': source = ph.loop.buildLine(self.start[:2], self.stop[:2], num_segs=self.n_segs, max_length=self.max_length) lp(self.MP.logger, 20, '--> {:<22}'.format('start') + ' = ', self.start, pre_dash=False) lp(self.MP.logger, 20, '--> {:<22}'.format('stop') + ' = ', self.stop, pre_dash=False) lp(self.MP.logger, 20, '--> {:<22}'.format('segments') + ' = ', self.n_segs, pre_dash=False) elif self.s_type is 'loop_c': source = ph.loop.buildCircle(self.radius, P=self.origin, num_segs=self.n_segs, max_length=self.max_length) lp(self.MP.logger, 20, '--> {:<22}'.format('origin') + ' = ', self.origin, pre_dash=False) lp(self.MP.logger, 20, '--> {:<22}'.format('radius') + ' = ', self.radius, pre_dash=False) lp(self.MP.logger, 20, '--> {:<22}'.format('segments') + ' = ', self.n_segs, pre_dash=False) elif self.s_type is 'loop_r': if df.near(self.start[0], self.stop[0], eps=1e-4) or \ df.near(self.start[1], self.stop[1], eps=1e-4): lp(self.MP.logger, 50, 'Error! Given start and stop coordinates do not ' 'match to a rectangular loop source: x or y start or stop ' 'coordinates are identical') raise SystemExit c_1 = [self.start[0], self.start[1], 0.0] c_2 = [self.stop[0], self.start[1], 0.0] c_3 = [self.stop[0], self.stop[1], 0.0] c_4 = [self.start[0], self.stop[1], 0.0] source = ph.loop.buildRectangle(np.array((c_1, c_2, c_3, c_4)), num_segs=self.n_segs, max_length=self.max_length) lp(self.MP.logger, 20, '--> {:<22}'.format('corner 1') + ' = ', c_1, pre_dash=False) lp(self.MP.logger, 20, '--> {:<22}'.format('corner 2') + ' = ', c_2, pre_dash=False) lp(self.MP.logger, 20, '--> {:<22}'.format('corner 3') + ' = ', c_3, pre_dash=False) lp(self.MP.logger, 20, '--> {:<22}'.format('corner 4') + ' = ', c_4, pre_dash=False) lp(self.MP.logger, 20, '--> {:<22}'.format('segments') + ' = ', self.n_segs, pre_dash=False) elif self.s_type is 'path': if self.Assembler is None: source = ph.loop.buildLoop(self.tx[ti], num_segs=self.n_segs, max_length=self.max_length, grounded=self.grounding[ti]) else: source = ph.loop.Loop([self.Assembler.midpoints[ti], self.Assembler.orientations[ti], self.Assembler.lengths[ti], self.grounding[ti], 'custEM']) lp(self.MP.logger, 20, '... tx number ' + str(ti) + ' ...', pre_dash=False) lp(self.MP.logger, 20, '--> {:<22}'.format('grounding') + ' = ', self.grounding[ti], pre_dash=False) else: lp(self.MP.logger, 50, 'Error!, supported "s_type" parameters for ' 'default" primary fields are: "hed, "line", "loop_c"' ' or "loop_r"') raise SystemExit source.setLoopMesh(xyz) source.config.rho = 1. / np.array(self.MP.sigma[ 1:1+self.MP.n_layers]).ravel() source.config.d = self.MP.layer_thicknesses source.config.f = self.f source.config.current = self.MP.current # ################### calc E-Field ################################## source.config.ftype = 'E' E_0 = source.calculate(interpolate=self.pyhed_interp, src_z=self.dipole_z, num_cpu=self.procs_per_proc, drop_tol=self.pyhed_drop_tol) # ################### calc H-Field ################################## source.config.ftype = 'H' if self.dipole_z != 0.: lp(self.MP.logger, 30, 'Warning! Shifting Tx to depth is only considered for ' 'primary E fields, not H fields! Continuning ...') H_0 = source.calculate(interpolate=self.pyhed_interp, src_z=self.dipole_z, # ignored ! num_cpu=self.procs_per_proc, drop_tol=self.pyhed_drop_tol) if np.shape(E_0)[0] == 3 and np.shape(E_0)[1] != 3: E_0 = E_0.T # always (n, 3) to ensure H_0 = H_0.T # .ravel() is working correctly later on # avoid numerical issues: E_0.real[np.abs(E_0.real) < 1e-32] = 1e-32 E_0.imag[np.abs(E_0.imag) < 1e-32] = 1e-32 H_0.real[np.abs(H_0.real) < 1e-32] = 1e-32 H_0.imag[np.abs(H_0.imag) < 1e-32] = 1e-32 self.E_0.append(E_0) self.H_0.append(H_0) df.MPI.barrier(self.MP.mpi_cw) if ti == self.n_tx - 1: t2 = time.time() - t1 lp(self.MP.logger, 20, ' - computation time (s) for primary fields [s]: --> ', t2, pre_dash=False)
[docs] def init_primary_field_name(self, boundary_fields): """ Initialize an encoded, unique name for the files to store primary fields based on the mesh, transmitter characteristics and others. Required arguments ------------------ - boundary_fields = False, type bool only for internal usage, used to calculate primary fields only on the domain boundaries to apply inhomogeneous Dirichlet BC """ name = (self.MP.mesh_name + '_' + self.pf_type + '_' + self.s_type + '_nL_' + str(self.MP.n_layers) + '_sg_' + '_format_' + self.MP.file_format + '_f_' + str(self.f) + '_current_' + str(self.MP.current) + '_Sg_' + str(self.MP.sigma_0[:self.MP.n_layers]) + '_n_segs_' + str(self.n_segs) + '_max_length' + str(self.max_length) + '_x0_' + str(self.origin[0]) + '_y0_' + str(self.origin[1]) + '_z0_' + str(self.origin[2]) + '_R_' + str(self.radius) + '_sx0_' + str(self.start[0]) + '_sy0_' + str(self.start[1]) + '_sy0_' + str(self.start[2]) + '_sx1_' + str(self.stop[0]) + '_sz1_' + str(self.stop[1]) + '_sz1_' + str(self.stop[2]) + '_pf_p_' + str(self.pf_p) + str(self.tx)) if boundary_fields: name += '_BF' return(hashlib.sha256(name.encode('ascii')).hexdigest())
[docs] def write_serial_calculation_parameters(self): """ Export geometry and transmitter setup for seperate primary field calculations in serial. Required arguments ------------------ - path, type str current working directory, switch to this directory after the export of the parameter file """ import json if self.tx is not None: np.save(self.MP.r_dir + '/primary_fields/' + self.pf_type + '/source_path.npy', self.tx) self.tx = 'import' A = self.__dict__.copy() del A['FS'] del A['MP'] del A['FE'] del A['H_0'] del A['E_0'] del A['E_0_r_cg'] del A['E_0_i_cg'] del A['H_0_r_cg'] del A['H_0_i_cg'] if df.MPI.rank(df.MPI.comm_world) == 0: for key, value in A.items(): print(key, value) json = json.dumps(A) f = open(self.MP.r_dir + '/primary_fields/' + self.pf_type + '/pf_dict.json', 'w') f.write(json) f.close()