Source code for custEM.post.interpolation_base

# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""

import dolfin as df
import os
import scipy
import scipy.io
import json
import numpy as np
import custEM as ce
from custEM.misc import logger_print as lp
from custEM.misc import run_serial
from custEM.post import interpolation_utils as iu
from mpi4py import MPI


[docs] class InterpolationBase: """ Interpolator class for interpolating 3D modeling results on arbitrary meshes, which can be generated using this class as well. Notice ------ FEniCS does not support interpolation between different meshes in parallel. If not using *auto_interpolate* during *solve_main_problem()*, interpolation is conducted in serial and might take some time. Multi-threading is used to run several interpolation tasks simultaneously. Class internal functions ------------------------ - update_interpolation_parameters() update interpolation parameters such as line or slice information for corresponding mesh generation, if necessary, and interpolation. - create_line_mesh() create straight line mesh(es) in serial; horizontal lines following topography are supported, which is controlled by the *on_topo* or *on_water_surface* flags - create_slice_mesh() create straight slice mesh(es) in serial, horizontal slices following topography are supported, which is controlled by the *on_topo* or *on_water_surface* flags - create_path_mesh() create path mesh based on list or array of given coordinates in serial; paths following topography are supported, which is controlled by the *on_topo* or *on_water_surface* flags - create_path_meshes() create multiple path meshes at once - find_quantity() utility function to access field data via strings - check_interpolation_meshes() check if all interpolation meshes exist before interpolation - create_interpolation_matrices() create interpolation matrices for *auto_interpolate* and jacobian computations - auto_interpolate_parallel() interpolate in parallel on the fly using interpolation matrices Example: -------- >>> M = MOD('Test_1', 'halfspace_mesh', 'E_t', p=1, >>> owerwrite=False, load_existing=False) >>> >>> M.IB.create_line_mesh('x', -5e3, 5e3, 400, z=50.) >>> M.IB.interpolate( >>> 'E_t', 'x0_-5000.0_x1_5000.0_y_0.0_z_0.0_n_400_topo_line_x') """ def __init__(self, PP, dg_interpolation, self_mode): """ The Interpolator class is initialized via the **MOD** class. Required Arguments: ------------------- - PP; type class **Pre**- or **PostProcessing** instance. """ self.PP = PP self.MP = PP.MP self.Q1, self.Q2 = None, None self.Q = [] self.x, self.y, self.z = 0., 0., 0. self.a_tol = 1e-2 self.line_name = None self.slice_name = None self.path_name = None self.update_print_flag = False self.rank = -1 self.icomms = [] self.max_procs = df.MPI.size(self.MP.mpi_cw) self.on_topo = True self.on_water_surface = False self.dg_interpolation = dg_interpolation self.interp_p = 1 self.force_create = False self.rotate_surface_dem = False if str(self.MP.topo) == 'None': self.on_topo = False self.t_dir = self.MP.para_dir self.interp_dir = (self.MP.r_dir + '/' + self.MP.approach + '/' + self.MP.mesh_name + '/' + self.MP.mod_name + '_interpolated')
[docs] def update_interpolation_parameters(self, **interp_kwargs): """ update and check given keyword arguments for the Interpolator class. Notice: ------- - All interpolation parameters can be specified individually when calling the line - or slice-mesh generation functions. These values are just specified as class attributes to have reasonable default parameters available. Keyword arguments: ------------------ - x, y, z = 0., type float either offset for line interpolation meshes in not-the-axis direction or offset of slice interpolation meshes in slice-normal direction. - a_tol = 0.01, type float numerical tolerance for cutting the line- or slice interpolation meshes. Does not need to be changed in general. - update_print_flag = False, type bool define if updated interpolation keyword arguments should be printed or not. By defualt, the latter are not printed when initialzing the class for the first time but afterwards, this flag is set **True**. - self.on_topo = True, type bool if the mesh has topography in z-direction, horizontal line or slice interpolation meshes at the surface are automatically adjusted to match the crooked surface or follow the surface with a parallel offset. - max_procs = "MPI_SIZE", type df.MPI.size(df.mpi_comm_world()) number of parallel processes used during the *mpirun* call - force_create = False, if *True*, overwrite existing interpolation meshes with the same name - on_topo = True, automatically apply the topography used for the mesh-generation to interpolation meshes following the surface directly or with a constant offset - on_water_surface = False, if True, ignore bathymetry values (z < 0) when creating interpolation meshes which include a sea domain and use *z=0* for corresponding positions instead - line_name = None, type str specify a custom name for line interpolation meshes - slice_name = None, type str specify a custom name for slice interpolation meshes - path_name = None, type str specify a custom name for path interpolation meshes """ for key in interp_kwargs: if key not in self.__dict__: lp(self.MP.logger, 30, 'Warning! Unknown interpolation parameter set', key) self.__dict__.update(interp_kwargs) if self.update_print_flag is False: lp(self.MP.logger, 20, 'Interpolation parameters update:') for k, v in sorted(self.__dict__.items()): if k not in ['MP', 'PP', 'Q', 'Q1', 'Q2']: lp(self.MP.logger, 20, '--> {:<22}'.format(str(k)) + ' = ', v, pre_dash=False) self.update_print_flag = True
[docs] def create_line_mesh(self, axis, start, stop, n_segs, **interp_kwargs): """ Create line interpolation meshes. Only lines parallel to the coordinate axes are supported. Topography for horizontal lines is also supported. For arbitrary lines, use the *create_path_mesh* method. Required arguments ------------------ - axis, type str a string which specifies along which axis a line mesh should be generated, valid values are *'x'*, *'y'* and '*z*' - start, stop, type float start and stop cooridinates of the line - n_segs, type int number of equally distributed segments along a line Keyword arguments ----------------- - **interp_kwargs, see **update_interpolation_parameters** docs """ if 'line_name' not in interp_kwargs: self.line_name = None # little hack to switch to previous *on_topo* option after a single # mesh generation call dummy = None if 'on_topo' in interp_kwargs.keys(): if interp_kwargs['on_topo'] != self.on_topo and self.on_topo: dummy = True elif interp_kwargs['on_topo'] != self.on_topo and not self.on_topo: dummy = False self.update_interpolation_parameters(**interp_kwargs) if df.MPI.rank(self.MP.mpi_cw) == 0: iu.build_line_mesh(self, axis, start, stop, n_segs, self.line_name, self.on_topo) else: pass # reset default parameters self.x, self.y, self.z = 0., 0., 0. self.force_create, self.on_water_surface = False, False df.MPI.barrier(self.MP.mpi_cw) if dummy is not None: self.on_topo = dummy
[docs] def create_slice_mesh(self, axis, dim, n_segs, **interp_kwargs): """ Create slice interpolation meshes. So far, only slices parallel to the coordinate axes are supported. Topography for z-normal slices is supported as well. Alternatively, use the flexible *create_path_mesh* method. Required arguments ------------------ - axis, type str a string which specifies along which normal axis a slice mesh should be generated, valid values are *'x'*, *'y'* and '*z*' - dim, type float or list of two float one float specifies constant dimensions in both slice-parallel directions; a list of two floats specifies different dimensions along the two y/z, x/z or x/y axes for axis=*'x'*, *'y'* or *'z'*, respectively. - n_segs, type int or list of two int one integer specifies an equal number of segments in both slice-parallel directions; a list of two integers specifies a different number of segments along the two y/z, x/z or x/y axes for axis=*'x'*, *'y'* or *'z'*, respectively. Keyword arguments ----------------- - **interp_kwargs, see **update_interpolation_parameters** docs """ if 'slice_name' not in interp_kwargs: self.slice_name = None # little hack to switch to previous *on_topo* option after a single # mesh generation call dummy = None if 'on_topo' in interp_kwargs.keys(): if interp_kwargs['on_topo'] != self.on_topo and self.on_topo: dummy = True elif interp_kwargs['on_topo'] != self.on_topo and not self.on_topo: dummy = False self.update_interpolation_parameters(**interp_kwargs) if df.MPI.rank(self.MP.mpi_cw) == 0: iu.build_slice_mesh(self, axis, dim, n_segs, self.slice_name, self.on_topo) else: pass # reset default parameters self.x, self.y, self.z = 0., 0., 0. self.force_create, self.on_water_surface = False, False df.MPI.barrier(self.MP.mpi_cw) if dummy is not None: self.on_topo = dummy
[docs] def create_path_mesh(self, path, suffix=None, rank=None, **interp_kwargs): """ Create path interpolation mesh to interpolate on arbitraily distributed points. Required arguments ------------------ - path, type array of shape (n, 3) array containing the 3D points to build the path mesh Keyword arguments ----------------- - suffix = '', type str exchange the export name ending "path" with something else, e.g., it is useful to name a path mesh like a line mesh for plotting along a coordinate axis later on - rank = 0, type int use other than root process for multi-threaded serial path mesh generation, e.g., if called from the *create_path_meshes* method - **interp_kwargs, see **update_interpolation_parameters** docs. """ if 'path_name' not in interp_kwargs: path_name = self.path_name # little hack to switch to previous *on_topo* option after a single # mesh generation call dummy = None if 'on_topo' in interp_kwargs.keys(): if interp_kwargs['on_topo'] != self.on_topo and self.on_topo: dummy = True elif interp_kwargs['on_topo'] != self.on_topo and not self.on_topo: dummy = False self.update_interpolation_parameters(**interp_kwargs) barrier = False if rank is None: rank = 0 barrier = True if df.MPI.rank(self.MP.mpi_cw) == rank: iu.build_path_mesh(self, path, self.path_name, self.on_topo, suffix) else: pass # reset default parameters self.x, self.y, self.z = 0., 0., 0. self.force_create, self.on_water_surface = False, False if barrier: df.MPI.barrier(self.MP.mpi_cw) if dummy is not None: self.on_topo = dummy
[docs] def create_path_meshes(self, paths, name=None, names=None, suffix=None, **interp_kwargs): """ Create path interpolation mesh to interpolate on arbitraily distributed points. Required arguments ------------------ - paths, type list list containing coordinate arrays to build the path meshes Keyword arguments ----------------- - name = None, type str specify a single name and use increasing integers ids to name the different path meshes; must be *None* if *names != None* - names = None, type str specify a specific name for each path in the list; must be *None* if *name != None* - suffix = '', type str exchange the export name ending "path" with something else, e.g., it is useful to name a path mesh like a line mesh for plotting along a coordinate axis later on - **interp_kwargs, see **update_interpolation_parameters** docs """ if (name is None and names is None) or \ (name is not None and names is not None): lp(self.MP.logger, 50, 'Error! Specify either the *name* or the *names* keyword ' 'argument\n for building multiple path meshes at once. ' 'Aborting ...', pre_dash=False) raise SystemExit lp(self.MP.logger, 20, '... building multiple path meshes ...', pre_dash=False, barrier=False) self.rank = -1 for pi in range(len(paths)): self.rank += 1 if df.MPI.rank(self.MP.mpi_cw) == self.rank: if name is not None: p_name = name + '_#' + str(pi) if names is not None: p_name = names[pi] self.create_path_mesh(paths[pi], suffix=suffix, rank=self.rank, path_name=p_name, **interp_kwargs) else: pass if self.rank == self.max_procs - 1: self.rank = -1 lp(self.MP.logger, 10, '... waiting for all mpi processes to finish ' 'serial interpolation tasks ...', pre_dash=False) # synchronize after one interpolation task has finished lp(self.MP.logger, 10, '... synchronization finished ...', pre_dash=False, post_dash=False) self.rank = -1
[docs] def find_quantity(self, string, fs_type): """ Utility function to access *dolfin* solution functions via a string. Required arguments: ------------------- - string, type str Either **'E_t, 'E_s', 'H_t'** or **'H_s'**. """ if fs_type == 'None' or 'ned' in self.PP.fs_type.lower(): if string == 'E_t': self.q1 = self.PP.E_t_r self.q2 = self.PP.E_t_i elif string == 'E_s': self.q1 = self.PP.E_s_r self.q2 = self.PP.E_s_i elif string == 'H_t': self.q1 = self.PP.H_t_r self.q2 = self.PP.H_t_i elif string == 'H_s': self.q1 = self.PP.H_s_r self.q2 = self.PP.H_s_i else: print('Wrong variable name chosen, use either: "E_t", ' '"E_s", "H_t", "H_s"') raise SystemExit return if fs_type == 'None' or 'cg' in self.PP.fs_type.lower(): if string == 'E_t': self.q1 = self.PP.E_t_r_cg self.q2 = self.PP.E_t_i_cg elif string == 'E_s': self.q1 = self.PP.E_s_r_cg self.q2 = self.PP.E_s_i_cg elif string == 'H_t': self.q1 = self.PP.H_t_r_cg self.q2 = self.PP.H_t_i_cg elif string == 'H_s': self.q1 = self.PP.H_s_r_cg self.q2 = self.PP.H_s_i_cg else: print('Wrong variable name chosen, use either: "E_t", ' '"E_s", "H_t", "H_s"') raise SystemExit
[docs] def check_interpolation_meshes(self, interp_meshes, m_dir, logger): """ Utility function to specify interpolation mesh directories and check if every interpolation mesh in the list is valid and exists. """ interp_mesh_dir = [] for interp_mesh in interp_meshes: if interp_mesh.find('line') != -1: interp_mesh_dir.append(m_dir + '/lines') elif interp_mesh.find('slice') != -1: interp_mesh_dir.append(m_dir + '/slices') elif interp_mesh.find('path') != -1: interp_mesh_dir.append(m_dir + '/paths') else: lp(logger, 50, 'Error! Interpolation target mesh name must ' 'end either with "line", "slice" or "path" so far. ' 'Aborting ...', post_dash=True) raise SystemExit if os.path.isfile(interp_mesh_dir[-1] + '/' + interp_mesh + '.xml') is False: lp(logger, 50, 'Error! Interpolation mesh "' + str(interp_mesh) + '.xml" ' 'could not be found. Aborting ...') raise SystemExit return(interp_mesh_dir)
[docs] def create_interpolation_matrices(self, xs, return_mixed=False): nx, dim = xs.shape if self.PP.FS.p == 1: n_element_dof = 6 elif self.PP.FS.p == 2: n_element_dof = 20 mesh = self.PP.FS.V.mesh() coords = mesh.coordinates() dolfin_element = self.PP.FS.V.dolfin_element() dofmap = self.PP.FS.V.dofmap() bbt = mesh.bounding_box_tree() sdim = dolfin_element.space_dimension() Qv = [[df.Function(self.PP.FS.V) for d in range(dim)] for n in range(nx)] rank = df.MPI.rank(df.MPI.comm_world) id_offset = dofmap.ownership_range()[0] n_vals = 0 to_bcast= [] v_length = len(Qv[0][0].vector()) if return_mixed: Qr = [[df.Function(self.PP.FS.M) for d in range(dim)] for n in range(nx)] Qi = [[df.Function(self.PP.FS.M) for d in range(dim)] for n in range(nx)] for ni in range(nx): # Loop over all interpolation points x = xs[ni, :] p = df.Point(x[0], x[1], x[2]) # Find cell for the point cell_id = bbt.compute_first_entity_collision(p) # Vertex coordinates for the cell if cell_id < len(mesh.cells()): xvert = coords[mesh.cells()[cell_id, :], :] cell = df.Cell(self.PP.FS.mesh, cell_id) # cell orientation?? # Evaluate the basis functions for the cell at x # dolfin_element.evaluate_basis_all(v, x, xvert, cell_id) v = dolfin_element.evaluate_basis_all(x, xvert, cell.orientation()) # Vector Lagrange space # vx = np.array([v[0], v[3], v[6], v[9], # 0., 0., 0., 0., # 0., 0., 0., 0.]) # vy = np.array([0., 0., 0., 0., # v[13], v[16], v[19], v[22], # 0., 0., 0., 0.]) # vz = np.array([0., 0., 0., 0., # 0., 0., 0., 0., # v[26], v[29], v[32], v[35]]) # Nedelec space vx = v[::3] vy = v[1::3] vz = v[2::3] ids = dofmap.cell_dofs(cell_id) else: ids = [None] * n_element_dof vx, vy, vz = None, None, None for j, idx in enumerate(ids): # all other processes set a local value as well, because # global array-based value manipulation seems to be faster # than the "set-local" alternatives if idx is None: Qv[ni][0].vector()[0] = 0. Qv[ni][1].vector()[0] = 0. Qv[ni][2].vector()[0] = 0. else: # these are the relevant values on one of the processes if idx < v_length: Qv[ni][0].vector()[idx] = vx[j] Qv[ni][1].vector()[idx] = vy[j] Qv[ni][2].vector()[idx] = vz[j] if return_mixed: Qr[ni][0].vector()[2*idx] = vx[j] Qr[ni][1].vector()[2*idx] = vy[j] Qr[ni][2].vector()[2*idx] = vz[j] Qi[ni][0].vector()[2*idx+1] = vx[j] Qi[ni][1].vector()[2*idx+1] = vy[j] Qi[ni][2].vector()[2*idx+1] = vz[j] else: if return_mixed: print('Error! "Return_mixed" for unowned index ' 'special ' 'case not taken care of as this option is ' 'outdated anyway. Aborting ...') raise SystemExit # set a dummy value if idx is not owned by this process for i in range(n_element_dof): if i not in ids: Qv[ni][0].vector()[i] = 0. Qv[ni][1].vector()[i] = 0. Qv[ni][2].vector()[i] = 0. break # get global index and prepare relevant data for # point-to-point communication afterwards to fill the # missing dof g_idx = dofmap.local_to_global_index(idx) to_bcast.append([ni, g_idx, [vx[j], vy[j], vz[j]], dofmap.off_process_owner()[np.argwhere( dofmap.local_to_global_unowned() == g_idx)[0][0]]]) # if len(to_bcast) > 1: # print('Critical warning! Found more than one DOF for this element.' # '\nThis happens VERY rarely during unowned dof sharing for ' # 'interpolation matrix\ngeneration. The results could contain' # ' an artifact at coordinate\n--> **' + str(x) + '**.' # '\nRemesh or ' # 'use another amount of MPI procs to circumvent\nthis issue, ' # 'we have not found the reason yet. Continuing ...') for pid in range(df.MPI.size(df.MPI.comm_world)): n_vals = df.MPI.comm_world.bcast(len(to_bcast), root=pid) if rank != pid: vals = [[0, 0, [0., 0., 0.], -1] for i in range(n_vals)] else: if len(to_bcast) > 0: vals = to_bcast else: vals = [[0, 0, [0., 0., 0.], -1] for i in range(n_vals)] for i in range(n_vals): for j in range(4): vals[i][j] = df.MPI.comm_world.bcast(vals[i][j], root=pid) if rank == vals[i][3]: Qv[vals[i][0]][0].vector()[vals[i][1] - id_offset] = \ vals[i][2][0] Qv[vals[i][0]][1].vector()[vals[i][1] - id_offset] = \ vals[i][2][1] Qv[vals[i][0]][2].vector()[vals[i][1] - id_offset] = \ vals[i][2][2] else: # dummy operations Qv[vals[i][0]][0].vector()[0] = \ Qv[vals[i][0]][0].vector()[0] Qv[vals[i][0]][1].vector()[0] = \ Qv[vals[i][0]][1].vector()[0] Qv[vals[i][0]][2].vector()[0] = \ Qv[vals[i][0]][2].vector()[0] # don't know if this is necessary, but could not hurt df.as_backend_type( Qv[vals[i][0]][0].vector()).update_ghost_values() df.as_backend_type( Qv[vals[i][0]][1].vector()).update_ghost_values() df.as_backend_type( Qv[vals[i][0]][2].vector()).update_ghost_values() if return_mixed: return(Qv, Qr, Qi) else: return(Qv)
# def single_interpolation_matrix(self, coord, cmp): # print('single interpolation matrix function is bugged. Aborting ...') # raise SystemExit # if not hasattr(self, 'Qv'): # self.Qv = [df.Function(self.PP.FS.V) for ci in range(3)] # self.v_length = len(self.Qv[0].vector()) # else: # [self.Qv[ci].vector()[:] == 0. for ci in range(3)] # if self.PP.FS.p == 1: # n_element_dof = 6 # elif self.PP.FS.p == 2: # n_element_dof = 20 # mesh = self.PP.FS.V.mesh() # coords = mesh.coordinates() # dolfin_element = self.PP.FS.V.dolfin_element() # dofmap = self.PP.FS.V.dofmap() # bbt = mesh.bounding_box_tree() # rank = df.MPI.rank(df.MPI.comm_world) # id_offset = dofmap.ownership_range()[0] # to_bcast = [] # # single point # p = df.Point(coord[0], coord[1], coord[2]) # # Find cell for the point # cell_id = bbt.compute_first_entity_collision(p) # # Vertex coordinates for the cell # if cell_id < len(mesh.cells()): # xvert = coords[mesh.cells()[cell_id, :], :] # cell = df.Cell(self.PP.FS.mesh, cell_id) # cell orientation?? # # Evaluate the basis functions for the cell at x # # dolfin_element.evaluate_basis_all(v, x, xvert, cell_id) # v = dolfin_element.evaluate_basis_all(coord, xvert, # cell.orientation()) # # Nedelec space # vx = v[::3] # vy = v[1::3] # vz = v[2::3] # ids = dofmap.cell_dofs(cell_id) # else: # ids = [None] * n_element_dof # vx, vy, vz = None, None, None # for j, idx in enumerate(ids): # # all other processes set a local value as well, because # # global array-based value manipulation seems to be faster # # than the "set-local" alternatives # if idx is None: # self.Qv[0].vector()[0] = 0. # self.Qv[1].vector()[0] = 0. # self.Qv[2].vector()[0] = 0. # else: # # these are the relevant values on one of the processes # if idx < self.v_length: # self.Qv[0].vector()[idx] = vx[j] # self.Qv[1].vector()[idx] = vy[j] # self.Qv[2].vector()[idx] = vz[j] # else: # # set a dummy value if idx is not owned by this process # for i in range(n_element_dof): # if i not in ids: # self.Qv[0].vector()[i] = 0. # self.Qv[1].vector()[i] = 0. # self.Qv[2].vector()[i] = 0. # break # # get global index and prepare relevant data for # # point-to-point communication afterwards to fill the # # missing dof # g_idx = dofmap.local_to_global_index(idx) # to_bcast.append([0, g_idx, [vx[j], vy[j], vz[j]], # dofmap.off_process_owner()[np.argwhere( # dofmap.local_to_global_unowned() == # g_idx)[0][0]]]) # for pid in range(df.MPI.size(df.MPI.comm_world)): # if rank != pid: # vals = [[0, 0, [0., 0., 0.], -1]] # else: # if len(to_bcast) > 0: # vals = to_bcast # else: # vals = [[0, 0, [0., 0., 0.], -1]] # for j in range(4): # vals[0][j] = df.MPI.comm_world.bcast(vals[0][j], root=pid) # if rank == vals[0][3]: # self.Qv[0].vector()[vals[0][1] - id_offset] = vals[0][2][0] # self.Qv[1].vector()[vals[0][1] - id_offset] = vals[0][2][1] # self.Qv[2].vector()[vals[0][1] - id_offset] = vals[0][2][2] # else: # # dummy operations # self.Qv[0].vector()[0] = self.Qv[0].vector()[0] # self.Qv[1].vector()[0] = self.Qv[1].vector()[0] # self.Qv[2].vector()[0] = self.Qv[2].vector()[0] # # don't know if this is necessary, but could not hurt # df.as_backend_type(self.Qv[0].vector()).update_ghost_values() # df.as_backend_type(self.Qv[1].vector()).update_ghost_values() # df.as_backend_type(self.Qv[2].vector()).update_ghost_values() # return(self.Qv[cmp].vector())
[docs] def auto_interpolate_parallel(self, EH, fi=0): if df.MPI.rank(self.MP.mpi_cw) == 0: if not os.path.isdir(self.interp_dir): os.mkdir(self.interp_dir) else: pass if len(self.Q) < 1: lp(self.MP.logger, 20, '... generating interpolation matrices for automated ' 'parallel interpolation ...') # for rx_path in self.MP.rx: # self.Q.append(self.create_interpolation_matrices( # np.array(rx_path))) else: lp(self.MP.logger, 10, '... auto-interpolating fields for frequency ' + str(fi) + ' ...', pre_dash=False) for pi in range(self.MP.n_rx): if df.MPI.rank(self.MP.mpi_cw) == 0: np.save(self.interp_dir + '/rx_path_' + str(pi) + '.npy', self.MP.rx[pi]) else: pass for ri in range(len(self.MP.rx[pi])): qvec = self.create_interpolation_matrices( np.array([self.MP.rx[pi][ri]])) if ri == 0 and 'E_t' in EH: Et = np.zeros((self.MP.n_tx, len(self.MP.rx[pi]), 3), dtype=complex) if fi == 0 and pi == 0: lp(self.MP.logger, 20, '... interpolating E-fields on automatically ' 'imported Rx paths ...', pre_dash=False) if ri == 0 and 'E_s' in EH: Es = np.zeros((self.MP.n_tx, len(self.MP.rx[pi]), 3), dtype=complex) if fi == 0 and pi == 0: lp(self.MP.logger, 20, '... interpolating secondary E-fields on ' 'automatically imported Rx paths ...', pre_dash=False) if ri == 0 and 'H_t' in EH: Ht = np.zeros((self.MP.n_tx, len(self.MP.rx[pi]), 3), dtype=complex) if fi == 0 and pi == 0: lp(self.MP.logger, 20, '... interpolating H-fields on automatically ' 'imported Rx paths ...', pre_dash=False) if ri == 0 and 'H_s' in EH: Hs = np.zeros((self.MP.n_tx, len(self.MP.rx[pi]), 3), dtype=complex) if fi == 0 and pi == 0: lp(self.MP.logger, 20, '... interpolating secondary H-fields on ' 'automatically imported Rx paths ...', pre_dash=False) if 'E_t' in EH: for ti in range(self.MP.n_tx): for c in range(3): Et.real[ti, ri, c] = qvec[0][c].vector( ).inner(self.PP.E_t_r[ti].vector()) Et.imag[ti, ri, c] = qvec[0][c].vector( ).inner(self.PP.E_t_i[ti].vector()) if 'E_s' in EH: for ti in range(self.MP.n_tx): for c in range(3): Es.real[ti, ri, c] = qvec[0][c].vector( ).inner(self.PP.E_s_r[ti].vector()) Es.imag[ti, ri, c] = qvec[0][c].vector( ).inner(self.PP.E_s_i[ti].vector()) if 'H_t' in EH: for ti in range(self.MP.n_tx): for c in range(3): Ht.real[ti, ri, c] = qvec[0][c].vector( ).inner(self.PP.H_t_r[ti].vector()) Ht.imag[ti, ri, c] = qvec[0][c].vector( ).inner(self.PP.H_t_i[ti].vector()) if 'H_s' in EH: for ti in range(self.MP.n_tx): for c in range(3): Hs.real[ti, ri, c] = qvec[0][c].vector( ).inner(self.PP.H_s_r[ti].vector()) Hs.imag[ti, ri, c] = qvec[0][c].vector( ).inner(self.PP.H_s_i[ti].vector()) if 'E_t' in EH: self.export_npy(Et, fi, pi, 'E_t') if 'E_s' in EH: self.export_npy(Es, fi, pi, 'E_s') if 'H_t' in EH: self.export_npy(Ht, fi, pi, 'H_t') if 'H_s' in EH: self.export_npy(Hs, fi, pi, 'H_s')
[docs] def export_npy(self, complex_data, fi, pi, EH, rank=0): if df.MPI.rank(self.MP.mpi_cw) == rank: export_name = EH + '_all_tx_rx_path_' + str(pi) export_name = 'f_' + str(fi) + '_' + export_name np.save(self.interp_dir + '/' + export_name + '.npy', complex_data) else: pass
[docs] def merge_tx_results(self, interp_meshes, EH=['E_t', 'H_t'], freqs=None): lp(self.MP.logger, 20, '... merging interpolated single transmitter results ...') rank = -1 self.MP.n_rx = len(interp_meshes) if freqs is None: freqs = [f for f in range(self.MP.n_freqs)] for fi in freqs: for ii in range(self.MP.n_rx): rank += 1 e_name0 = 'E_t_' + interp_meshes[ii] + '.npy' h_name0 = 'H_t_' + interp_meshes[ii] + '.npy' E = [] H = [] if df.MPI.rank(self.MP.mpi_cw) == rank: for ti in range(self.MP.n_tx): if self.MP.n_tx != 1: e_name = 'tx_' + str(ti) + '_' + e_name0 h_name = 'tx_' + str(ti) + '_' + h_name0 if self.MP.n_freqs != 1: e_name = 'f_' + str(fi) + '_' + e_name h_name = 'f_' + str(fi) + '_' + h_name if 'E_t' in EH: E.append(np.load(self.interp_dir + '/' + e_name)) if 'H_t' in EH: H.append(np.load(self.interp_dir + '/' + h_name)) if self.MP.n_rx == 1: e_outname = 'E_t_all_tx_rx_path.npy' h_outname = 'H_t_all_tx_rx_path.npy' else: e_outname = 'E_t_all_tx_rx_path_' + str(ii) + '.npy' h_outname = 'H_t_all_tx_rx_path_' + str(ii) + '.npy' if self.MP.n_freqs != 1: e_name = 'f_' + str(fi) + '_' + e_outname h_name = 'f_' + str(fi) + '_' + h_outname if 'E_t' in EH: self.export_npy(np.array(E)[:, :, :3], fi, ii, 'E_t', rank=rank) if fi == 0: np.save(self.interp_dir + '/rx_path_' + str(ii) + '.npy', np.array(E)[:, ::-1, 3:]) if 'H_t' in EH: self.export_npy(np.array(H)[:, :, :3], fi, ii, 'H_t', rank=rank) if fi == 0: np.save(self.interp_dir + '/rx_path_' + str(ii) + '.npy', np.array(H)[:, :, 3:]) else: pass if rank == self.max_procs - 1: rank = -1 lp(self.MP.logger, 10, '... waiting for all mpi processes to finish ' 'the merging task ...') # synchronize after merging is done lp(self.MP.logger, 10, '... synchronization finished ...', pre_dash=False, post_dash=False)
[docs] def convert_mt_data(self, impedances=True, rhoa_phase=True, tipper=True, remote_station=None, npy_files=True, mat_files=False, derivatives=False, ned_coord_system=False, freqs=None): lp(self.MP.logger, 20, '... converting electric and magentic fields to MT data ...', pre_dash=False) rank = -1 if freqs is None: freqs = [f for f in range(self.MP.n_freqs)] for fi in freqs: for pi in range(self.MP.n_rx): rank += 1 e_name = 'f_' + str(fi) + '_' + 'E_t_all_tx_rx_path_' +\ str(pi) + '.npy' h_name = 'f_' + str(fi) + '_' + 'H_t_all_tx_rx_path_' +\ str(pi) + '.npy' if not os.path.isfile(self.interp_dir + '/' + e_name): print(e_name) lp(self.MP.logger, 50, 'Error! Interpolation data do not exist for frequency ' + str(fi) + ' on rx_path '+ str(pi) + ' as\nconformal ' 'file with all tx included\nwhich is only the case if ' '*auto_interpolation* is enabled.\n' 'In case you have interpolated with another method on ' 'particular meshes,\ncall the *merge_tx_results()* ' 'method first. Aborting ...', barrier=False) raise SystemExit if remote_station is not None: qvec = self.create_interpolation_matrices( np.array(remote_station)) Href = np.zeros((2, 3), dtype=complex) for ti in range(self.MP.n_tx): for c in range(3): Href.real[ti, c] = qvec[0][c].vector( ).inner(self.PP.H_t_r[ti].vector()) Href.imag[ti, c] = qvec[0][c].vector( ).inner(self.PP.H_t_i[ti].vector()) else: Href = None if df.MPI.rank(self.MP.mpi_cw) == 0: E = np.load(self.interp_dir + '/' + e_name) H = np.load(self.interp_dir + '/' + h_name) if impedances or rhoa_phase or tipper: self.convert_imp_rhoa_tipper( E, H, fi, pi, impedances, rhoa_phase, tipper, Href, npy_files, mat_files, derivatives, ned_coord_system) else: pass if rank == self.max_procs - 1: rank = -1 lp(self.MP.logger, 10, '... waiting for all mpi processes to finish ' 'the mt conversion task ...', pre_dash=False) # synchronize after conversion is done lp(self.MP.logger, 10, '... synchronization finished ...', pre_dash=False, post_dash=False)
[docs] def convert_imp_rhoa_tipper(self, E, H, fi, pi, impedances, rhoa_phase, tipper, Href, npy_files, mat_files, derivatives, ned_coord_system): """ Convert E- and H-field to MT quantities. Required arguments ------------------ - E, H, type numpy array ((2, n_pos, 3)) Electromagnetic fields for two source polarizations on all receiver positions on the current rx_path - fi, type int frequency counter - pi, type int path counter - impedances, rhoa_phase, tipper, type bool flags to enable or disable corresponding output - remote station, type vector of len 3 specify surface reference station coordinates for the horizontal components to calculate tippers - npy_files, mat_files, type bool flags controling output for Python and/or Matlab """ Z, rhoa, phase, T = [], [], [], [] if ned_coord_system: rotmat = np.array([[0,1,0],[1,0,0],[0,0,-1]]) else: rotmat = np.array([[1,0,0],[0,1,0],[0,0,1]]) if derivatives: dZE, dZH, dTH = [], [], [] if mat_files: mat_dict = dict() if Href is not None: Href=np.matmul(Href,rotmat) for stn in range(len(H[0])): H[0][stn,:]=np.matmul(H[0][stn,:],rotmat) H[1][stn,:]=np.matmul(H[1][stn,:],rotmat) E[0][stn,:]=np.matmul(E[0][stn,:],rotmat) E[1][stn,:]=np.matmul(E[1][stn,:],rotmat) Z.append(np.zeros((2, 2), dtype=complex)) if derivatives: dZH.append(np.zeros((2, 2, 2, 2), dtype=complex)) dZE.append(np.zeros((2, 2, 2, 2), dtype=complex)) if Href is None: quot = (H[0][stn, 0] * H[1][stn, 1] - H[1][stn, 0] * H[0][stn, 1]) else: quot = (Href[0, 0] * Href[1, 1] - Href[1, 0] * Href[0, 1]) # Zxx Z[-1][0, 0] = (E[0][stn, 0] * H[1][stn, 1] - E[1][stn, 0] * H[0][stn, 1]) / quot # Zxy Z[-1][0, 1] = (E[1][stn, 0] * H[0][stn, 0] - E[0][stn, 0] * H[1][stn, 0]) / quot # Zyx Z[-1][1, 0] = (E[0][stn, 1] * H[1][stn, 1] - E[1][stn, 1] * H[0][stn, 1]) / quot # Zyy Z[-1][1, 1] = (E[1][stn, 1] * H[0][stn, 0] - E[0][stn, 1] * H[1][stn, 0]) / quot if derivatives: # d Zxx dZE[-1][0, 0, 0, 0] = H[1][stn,1] / quot dZE[-1][1, 0, 0, 0] = -H[0][stn,1] / quot dZH[-1][0, 0, 0, 0] = -H[1][stn,1] / quot * Z[-1][0,0] dZH[-1][1, 0, 0, 0] = H[0][stn,1] / quot * Z[-1][0,0] dZH[-1][0, 1, 0, 0] = -H[1][stn,1] / quot * Z[-1][0,1] dZH[-1][1, 1, 0, 0] = H[0][stn,1] / quot * Z[-1][0,1] # d Zxy dZE[-1][0, 0, 0, 1] = -H[1][stn,0] / quot dZE[-1][1, 0, 0, 1] = H[0][stn,0] / quot dZH[-1][0, 0, 0, 1] = H[1][stn,0] / quot * Z[-1][0,0] dZH[-1][1, 0, 0, 1] = -H[0][stn,0] / quot * Z[-1][0,0] dZH[-1][0, 1, 0, 1] = H[1][stn,0] / quot * Z[-1][0,1] dZH[-1][1, 1, 0, 1] = -H[0][stn,0] / quot * Z[-1][0,1] # d Zyx dZE[-1][0, 1, 1, 0] = H[1][stn,1] / quot dZE[-1][1, 1, 1, 0] = -H[0][stn,1] / quot dZH[-1][0, 0, 1, 0] = -H[1][stn,1] / quot * Z[-1][1,0] dZH[-1][1, 0, 1, 0] = H[0][stn,1] / quot * Z[-1][1,0] dZH[-1][0, 1, 1, 0] = -H[1][stn,1] / quot * Z[-1][1,1] dZH[-1][1, 1, 1, 0] = H[0][stn,1] / quot * Z[-1][1,1] # d Zyy dZE[-1][0, 1, 1, 1] = -H[1][stn,0] / quot dZE[-1][1, 1, 1, 1] = H[0][stn,0] / quot dZH[-1][0, 0, 1, 1] = H[1][stn,0] / quot * Z[-1][1,0] dZH[-1][1, 0, 1, 1] = -H[0][stn,0] / quot * Z[-1][1,0] dZH[-1][0, 1, 1, 1] = H[1][stn,0] / quot * Z[-1][1,1] dZH[-1][1, 1, 1, 1] = -H[0][stn,0] / quot * Z[-1][1,1] if rhoa_phase: rhoa.append(np.abs(Z[stn])**2 / (4 * np.pi * 1e-7 * self.MP.omegas[fi])) phase.append(np.rad2deg(np.arctan2(Z[stn].imag, Z[stn].real))) if tipper: T.append(np.zeros((2), dtype=complex)) dTH.append(np.zeros((2, 3, 2), dtype=complex)) if Href is None: # Tzx T[-1][0] = (H[0][stn, 2] * H[1][stn, 1] - H[1][stn, 2] * H[0][stn, 1]) / quot # Tzy T[-1][1] = (H[1][stn, 2] * H[0][stn, 0] - H[0][stn, 2] * H[1][stn, 0]) / quot else: # Tzx T[-1][0] = (H[0][stn, 2] * Href[1, 1] - H[1][stn, 2] * Href[0, 1]) / quot # Tzy T[-1][1] = (H[1][stn, 2] * Href[0, 0] - H[0][stn, 2] * Href[1, 0]) / quot if derivatives: if Href is None: H0x = H[0][stn, 0] H0y = H[0][stn, 1] H1x = H[1][stn, 0] H1y = H[1][stn, 1] else: H0x = Href[0, 0] H0y = Href[0, 1] H1x = Href[1, 0] H1y = Href[1, 1] # d Tx dTH[-1][0, 2, 0] = H[1][stn, 1] / quot dTH[-1][1, 2, 0] = -H[0][stn, 1] / quot dTH[-1][0, 0, 0] = -H1y / quot * T[-1][0] dTH[-1][1, 0, 0] = H0y / quot * T[-1][0] dTH[-1][0, 1, 0] = -H1y/ quot * T[-1][1] dTH[-1][1, 1, 0] = H0y / quot * T[-1][1] # d Ty dTH[-1][0, 2, 1] = -H[1][stn, 0] / quot dTH[-1][1, 2, 1] = H[0][stn, 0] / quot dTH[-1][0, 0, 1] = H1x / quot * T[-1][0] dTH[-1][1, 0, 1] = -H0x / quot * T[-1][0] dTH[-1][0, 1, 1] = H1x / quot * T[-1][1] dTH[-1][1, 1, 1] = -H0x/ quot * T[-1][1] # if M_tensor: # M, dMH = [], [] # M.append(np.zeros((2, 2), dtype=complex)) # if derivatives: # dMH.append(np.zeros((2, 2, 2, 2), dtype=complex)) # # quot = (H[0][stn, 0] * H[1][stn, 1] - H[1][stn, 0] * H[0][stn, 1]) # # Mxx # M[-1][0, 0] = H[0][stn, 0] * dTHz[0, 0] + H[1][stn, 0] * dTHz[1, 0] # # Mxy # M[-1][0, 1] = H[1][stn, 0] * dTHz[1, 1] + H[0][stn, 0] * dTHz[0, 1] # # Myx # M[-1][1, 0] = H[0][stn, 1] * dTHz[0, 0] + H[1][stn, 1] * dTHz[1, 0] # # Myy # M[-1][1, 1] = H[1][stn, 1] * dTHz[1, 1] + H[0][stn, 1] * dTHz[0, 1] # if derivatives: # # d Mxx # # dME[-1][0, 0, 0, 0] = dTHz[0, 0] # # dME[-1][1, 0, 0, 0] = dTHz[1, 0] # dMH[-1][0, 0, 0, 0] = -dTHz[0, 0] * M[-1][0,0] # dMH[-1][1, 0, 0, 0] = -dTHz[1, 0] * M[-1][0,0] # dMH[-1][0, 1, 0, 0] = -dTHz[0, 0] * M[-1][0,1] # dMH[-1][1, 1, 0, 0] = -dTHz[1, 0] * M[-1][0,1] # # d Mxy # # dME[-1][0, 0, 0, 1] = dTHz[0, 1] # # dME[-1][1, 0, 0, 1] = dTHz[1, 1] # dMH[-1][0, 0, 0, 1] = -dTHz[0, 1] * M[-1][0,0] # dMH[-1][1, 0, 0, 1] = -dTHz[1, 1] * M[-1][0,0] # dMH[-1][0, 1, 0, 1] = -dTHz[0, 1] * M[-1][0,1] # dMH[-1][1, 1, 0, 1] = -dTHz[1, 1] * M[-1][0,1] # # d Myx # # dME[-1][0, 1, 1, 0] = dTHz[0, 0] # # dME[-1][1, 1, 1, 0] = dTHz[1, 0] # dMH[-1][0, 0, 1, 0] = -dTHz[0, 0] * M[-1][1,0] # dMH[-1][1, 0, 1, 0] = -dTHz[1, 0] * M[-1][1,0] # dMH[-1][0, 1, 1, 0] = -dTHz[0, 0] * M[-1][1,1] # dMH[-1][1, 1, 1, 0] = -dTHz[1, 0] * M[-1][1,1] # # d Myy # # dME[-1][0, 1, 1, 1] = dTHz[0, 1] # # dME[-1][1, 1, 1, 1] = dTHz[1, 1] # dMH[-1][0, 0, 1, 1] = -dTHz[0, 1] * M[-1][1,0] # dMH[-1][1, 0, 1, 1] = -dTHz[1, 1] * M[-1][1,0] # dMH[-1][0, 1, 1, 1] = -dTHz[0, 1] * M[-1][1,1] # dMH[-1][1, 1, 1, 1] = -dTHz[1, 1] * M[-1][1,1] outname = '_rx_path_' + str(pi) if mat_files: mat_dict['H'] = H mat_dict['E'] = E if impedances: f_name = 'f_' + str(fi) + '_' + 'Z' + outname if npy_files: np.save(self.interp_dir + '/' + f_name + '.npy', np.array(Z)) if mat_files: mat_dict['impedance'] = np.array(Z) if rhoa_phase: f_name1 = 'f_' + str(fi) + '_' + 'R' + outname f_name2 = 'f_' + str(fi) + '_' + 'P' + outname if npy_files: np.save(self.interp_dir + '/' + f_name1 + '.npy', np.array(rhoa)) np.save(self.interp_dir + '/' + f_name2 + '.npy', np.array(phase)) if mat_files: mat_dict['rhoa'] = np.array(rhoa) if mat_files: mat_dict['phase'] = np.array(phase) if tipper: f_name = 'f_' + str(fi) + '_' + 'T' + outname if npy_files: np.save(self.interp_dir + '/' + f_name + '.npy', np.array(T)) if mat_files: mat_dict['tipper'] = np.array(T) if derivatives: if npy_files: f_name = 'f_' + str(fi) + '_dZH' + outname np.save(self.interp_dir + '/' + f_name + '.npy', np.array(dZH)) f_name = 'f_' + str(fi) + '_dZE' + outname np.save(self.interp_dir + '/' + f_name + '.npy', np.array(dZE)) f_name = 'f_' + str(fi) + '_dTH' + outname np.save(self.interp_dir + '/' + f_name + '.npy', np.array(dTH)) if mat_files: mat_dict['dZH'] = np.array(dZH) mat_dict['dZE'] = np.array(dZE) mat_dict['dTH'] = np.array(dTH) if mat_files: f_name = 'MT_data' + outname if self.MP.n_freqs != 1: f_name = 'f_' + str(fi) + '_' + f_name scipy.io.savemat(self.interp_dir + '/' + f_name + '.mat', mat_dict)