Source code for custEM.fem.assembly

# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""

import dolfin as df
import numpy as np
from custEM.misc import logger_print as lp
from custEM.core import Solver
import time
from mpi4py import MPI


[docs] class TotalFieldAssembler: """ Class for setting the assembled contribution of the current density on transmitter dof in the right-hand side vector for total field formulations. Methods ------- - assemble() general assembly function of left- and right-hand sides for total-field formulations; calls all the other methods - find_dof_coords() utility function searching for all Nedelec dof and whose coordinates which belong to the source defined on edges - find_cells() for each dof which belongs to the source, search for an element which contains the former - find_directions() evaluate the local direction of the Nedelec dof in the element which contains it and switch the signto match the global direction - find_and_fill_start_stop_dof() required for the A_Phi approach on mixed elements to set divergence term on the start and stop point of grounded wire sources - find_for_hed(), find_for_vmd() utility functions called by **find_dof_coords()** - find_for_line(), find_for_loop_r(), find_for_path() utility functions called by **find_dof_coords()** - append_lists() utility function to collect the transmitter dof - eval_target_dir() utility function to find the global direction of an edge - test_An_total(), method for testing total A-V formulation on nodal elements - add_topo(), modify z-values of coordinates according to specified topography """ def __init__(self, FE, bc, td_dc=False, force_p1=False): """ Assembly of system matrix and right-hand side vector. Required arguments ------------------ - FE, type class FE-approach (e.g., **E_vector**) instance - bc, None or type dolfin DirichletBC see **MOD** class function: **solve_main_problem()** Keyword arguments ----------------- - td_dc = False, type bool only for internal usage; specifiy a dc field computation on the fly for simulating shut-off transients with time-domain approaches - force_p1 = True, type bool required for alternative coordinate search if the *TotalFieldAssembler* class is used to find edge-midpoints as Tx coordinates for the calculation of primary fields instead of specifying the nodes """ self.FE = FE self.bc = bc self.td_dc = td_dc if FE.tx is not None and len(FE.tx) != FE.n_tx and \ FE.n_tx != 1: lp(self.FE.MP.logger, 50, 'Fatal error! "len(tx):" ' + str(len(FE.tx)) + ' must be ' 'equal to "n_tx:" ' + str(FE.n_tx) + '. Aborting ...') raise SystemExit ownership_offset = self.FE.FS.M.dofmap().ownership_range() if 'Am' in self.FE.MP.approach or 'An' in self.FE.MP.approach: self.xyz = self.FE.FS.M.tabulate_dof_coordinates().reshape(-1, 3) elif 'DC' in self.FE.MP.approach or self.td_dc: self.xyz = self.FE.FS.S.tabulate_dof_coordinates().reshape(-1, 3) self.nodal_dof_re = (np.array(self.FE.FS.S.dofmap( ).dofs()) - self.FE.FS.S.dofmap().ownership_range()[0]) else: self.xyz = self.FE.FS.V.tabulate_dof_coordinates().reshape(-1, 3) if self.FE.MP.approach in ['E_IE', 'E_RA'] and not self.td_dc: self.xyz = self.FE.FS.V.tabulate_dof_coordinates().reshape(-1, 3) ownership_offset = self.FE.FS.V.dofmap().ownership_range() self.ned_dof_im = (np.array(self.FE.FS.V.dofmap().dofs()) - ownership_offset[0]) elif 'E' in self.FE.MP.approach or 'H' in self.FE.MP.approach: if force_p1: V = df.FunctionSpace(self.FE.FS.mesh, 'N1curl', 1) self.xyz = V.tabulate_dof_coordinates().reshape(-1, 3) ele = df.FiniteElement("N1curl", self.FE.FS.mesh.ufl_cell(), 1) M = df.FunctionSpace(self.FE.FS.mesh, df.MixedElement([ele, ele])) ownership_offset = M.dofmap().ownership_range() self.ned_dof_re = (np.array(M.sub(0).dofmap( ).dofs()) - ownership_offset[0])/2 self.ned_dof_im = (np.array(M.sub(0).dofmap( ).dofs()) - ownership_offset[0])/2+1768 else: self.ned_dof_re = (np.array(self.FE.FS.M.sub(0).dofmap( ).dofs()) - ownership_offset[0]) self.ned_dof_im = (np.array(self.FE.FS.M.sub(1).dofmap( ).dofs()) - ownership_offset[0]) elif 'Am' in self.FE.MP.approach: self.all_dof = (np.array(self.FE.FS.M.dofmap( ).dofs()) - ownership_offset[0]) self.ned_dof_re = (np.array(self.FE.FS.M.sub(0).dofmap( ).dofs()) - ownership_offset[0]) self.nodal_dof_re = (np.array(self.FE.FS.M.sub(2).dofmap( ).dofs()) - ownership_offset[0]) elif 'An' in self.FE.MP.approach: self.x_dof = (np.array(self.FE.FS.M.sub(0).dofmap().dofs()) - ownership_offset[0]) self.y_dof = (np.array(self.FE.FS.M.sub(1).dofmap().dofs()) - ownership_offset[0]) self.nodal_dof_re = (np.array(self.FE.FS.M.sub(3).dofmap( ).dofs()) - ownership_offset[0]) self.mesh_coords = self.FE.FS.mesh.coordinates() self.source_dof, self.tmp_sd = [], [] self.source_dof_coords, self.tmp_sdc = [], [] self.target_directions, self.tmp_td = [], [] self.component_switches, self.tmp_cs = [], [] self.a_tol = self.FE.a_tol
[docs] def assemble(self, fi=0): """ Conduct right-hand side assembly of source currents manually and let dolfin assemble the system matrix. Keyword arguments ----------------- - fi = 0, type int iteration index over frequencies for specifying the left-hand-side """ self.b = [] if 'An' in self.FE.MP.approach: self.test_An_total(fi) return t0 = time.time() if fi == 0: lp(self.FE.MP.logger, 20, 'Source vector and system matrix assembly:') if self.FE.MP.approach != 'DC': self.find_dof_coords() self.find_cells() self.find_directions() if '_s' in self.FE.MP.approach: return if self.FE.MP.approach in ['E_IE', 'E_RA'] and not self.td_dc: for ti in range(self.FE.n_tx): bb = df.Function(self.FE.FS.V) bb.vector()[self.source_dof[ti]] = (self.sign[ti] * self.FE.MP.currents[ti] / np.float(self.FE.FS.p)) bb.vector().apply('insert') self.b.append(bb) return if (self.FE.MP.system_it==False): self.A, bb = df.PETScMatrix(), df.PETScVector() if (self.FE.MP.system_it==True): self.Asys, bb = df.PETScMatrix(), df.PETScVector() self.Hh = df.PETScMatrix() self.Adiag = df.PETScMatrix() self.Aoffdiag= df.PETScMatrix() lp(self.FE.MP.logger, 20, '... assembling system matrix ...', pre_dash=False) if self.bc is None: if (self.FE.MP.system_it==False): df.assemble_system(self.FE.L[fi], self.FE.R[0], A_tensor=self.A, b_tensor=bb) if (self.FE.MP.system_it==True): df.assemble_system(self.FE.L[fi], self.FE.R[0], A_tensor=self.Asys, b_tensor=bb) df.assemble(self.FE.H[fi],tensor=self.Hh) df.assemble(self.FE.A[fi],tensor=self.Adiag) df.assemble(self.FE.B[fi],tensor=self.Aoffdiag) else: if self.bc in ['ZD', 'ZeroDirichlet']: self.FE.FS.add_dirichlet_bc() lp(self.FE.MP.logger, 20, '... applying zero Dirichlet boundary ' 'conditions ...', pre_dash=False) elif self.bc in ['ID', 'InhomogeneousDirichlet']: self.FE.FS.add_dirichlet_bc(self.FE.MP, self.ned_dof_re, self.ned_dof_im) lp(self.FE.MP.logger, 20, '... applying inhomogeneous Dirichlet boundary ' 'conditions ...', pre_dash=False) if self.FE.MP.approach not in ['E_t', 'Am_t']: lp(self.FE.MP.logger, 50, 'Error! Inhomogeneous Dirichlet conditions only ' 'supported for total electric field or mixed-space ' 'potential approachwa (E_t, Am_t) right now. ' 'Aborting ...') raise SystemExit elif self.bc in ['IF', 'Infrastructure']: self.FE.FS.add_inner_dirichlet_bc(self.FE.MP.logger) xyz = self.FE.FS.M.tabulate_dof_coordinates().reshape(-1, 3) bvals = self.FE.FS.bc.get_boundary_values() lp(self.FE.MP.logger, 10, ' - detected coordinates of inner boundary dof - ' + str(len(xyz[list(bvals.keys())])), pre_dash=False, root_only=False) lp(self.FE.MP.logger, 20, '... applying inner Dirichlet boundary ' 'conditions for infrastructure ...', pre_dash=False) else: lp(self.FE.MP.logger, 50, 'Error! Choose either zero or inhomogeneous or infrastruct' 'ure Dirichlet boundary conditions (bc="ZD", "ID", or "IF"' ' if not Neumann conditions by default. Aborting ...') raise SystemExit df.assemble_system(self.FE.L[fi], self.FE.R[0], self.FE.FS.bc, A_tensor=self.A, b_tensor=bb) if 'Am' in self.FE.MP.approach: for ti in range(self.FE.n_tx): bb = df.Function(self.FE.FS.M) bb.vector()[self.source_dof[ti]] = ( self.sign[ti] * (1./self.FE.MP.omegas[fi]) * self.FE.MP.currents[fi][ti] / np.float(self.FE.FS.p)) bb = self.find_and_fill_start_stop_dof( bb, self.FE.MP.currents[fi, ti], ti, fi) bb.vector().apply('insert') self.b.append(bb) elif 'E' in self.FE.MP.approach and not self.td_dc: for ti in range(self.FE.n_tx): bb = df.Function(self.FE.FS.M) bb.vector()[self.source_dof[ti]] = ( -self.sign[ti] * self.FE.MP.omegas[fi] * self.FE.MP.currents[fi][ti] / np.float(self.FE.FS.p)) bb.vector().apply('insert') self.b.append(bb) elif 'H' in self.FE.MP.approach: for ti in range(self.FE.n_tx): vec = df.Function(self.FE.FS.M) vec.vector()[self.source_dof[ti]] = ( self.sign[ti] * #(1./self.lengths[ti]) * self.FE.MP.currents[fi][ti] / np.float(-self.FE.FS.p)) vec2 = df.interpolate(vec.sub(1), self.FE.FS.V_cg) phi_r, phi_i = df.TestFunctions(self.FE.FS.M) self.ll = df.Function(self.FE.FS.M) self.ll.vector()[self.source_dof[ti]] = 1./self.lengths[ti] bb = df.assemble( df.inner(vec.sub(1), df.curl(self.FE.inv_s_func * phi_r)) * self.FE.FS.DOM.dx_0 + df.inner(vec.sub(0), phi_i) * self.FE.FS.DOM.dx_0, tensor=bb) self.b.append(bb) elif 'DC' in self.FE.MP.approach or self.td_dc: for ti in range(self.FE.n_tx): bb = df.Function(self.FE.FS.S) # hack to avoid crash with nested runs if type(self.FE.MP.currents[ti]) is not list: bb = self.find_and_fill_start_stop_dof( bb, self.FE.MP.currents[ti], ti) else: bb = self.find_and_fill_start_stop_dof( bb, self.FE.MP.currents[0][ti], ti) bb.vector().apply('insert') self.b.append(bb) self.FE.assembly_time += time.time() - t0 lp(self.FE.MP.logger, 20, ' - assembly time (s) --> ' + str(time.time() - t0) + ' - ', pre_dash=False) if not self.td_dc: if (self.FE.MP.system_it==False): self.FE.system_size = str(self.A.mat().getSize()[0]) if (self.FE.MP.system_it==True): self.FE.system_size = str(self.Hh.mat().getSize()[0]) if fi == 0 and not self.td_dc: lp(self.FE.MP.logger, 20, ' - system matrix size --> ' + self.FE.system_size + ' - ', pre_dash=False)
[docs] def find_dof_coords(self): """ Utility function searching for all Nedelec dof on edges and the corresponding coordinates which belong to the transmitter. """ if self.FE.s_type == 'hed': self.find_for_hed() if self.FE.s_type == 'vmd': if 'H' not in self.FE.MP.approach: lp(self.FE.MP.logger, 50, 'Error! VMD source only supported for H_vector ' 'approach right now, but even not working for the latter. ' 'Aborting ...') raise SystemExit self.find_for_vmd() elif self.FE.s_type == 'line': self.find_for_line() elif self.FE.s_type == 'loop_r': self.find_for_loop_r() elif self.FE.s_type == 'loop_c': lp(self.FE.MP.logger, 50, 'Error! RHS assembly needs to be implemented for loop_c ' 'Aborting ...') raise SystemExit elif self.FE.s_type == 'path': self.find_for_path()
[docs] def find_cells(self): """ For each dof which belongs to the source, search for an element which contains this dof. """ all_cells = [cell for cell in df.cells(self.FE.FS.mesh)] self.cell_list = [] bbt = self.FE.FS.mesh.bounding_box_tree() for ti in range(self.FE.n_tx): cell_list, tmp = [], [] for coord in self.source_dof_coords[ti]: p = df.Point(coord[0], coord[1], coord[2]) try: cell_list.append( all_cells[bbt.compute_first_entity_collision(p)]) except IndexError: # this is an mpi distribution problem with ghost cells # try to save simulation with deprecated technique if len(tmp) == 0: midpoints = np.array([cell.midpoint().array() for cell in all_cells]) try: tmp.append(np.argmin(np.linalg.norm( coord.reshape(1, 3) - midpoints, axis=1))) if len(tmp) > 0: cell_list.append(all_cells[tmp[-1]]) except: lp(self.FE.MP.logger, 30, 'Warning! Could not find any cell for this source ' 'dof. This rare issue is related\nto the mesh ' 'distribution by the parallel MPI processes.\n' 'Re-running with a slightly different numbers of ' 'MPI processes should resolve\nthis rare error. ' 'Continuing until next exception ...', barrier=False, root_only=False) self.cell_list.append(cell_list) self.ac = all_cells
[docs] def find_directions(self): """ Evaluate the local direction of the Nedelec dof in the element, which contains it and switch the sign to match the global direction. """ self.sign = [] if '_s' in self.FE.MP.approach: self.lengths = [] self.orientations = [] self.midpoints = [] for ti in range(self.FE.n_tx): if '_s' in self.FE.MP.approach: lengths, midpoints, orientations = [], [], [] nn, sign = [], [] for jj, cell in enumerate(self.cell_list[ti]): coord = self.source_dof_coords[ti][jj].reshape(1, 3) eids = np.array([edge.entities(0) for edge in df.edges(cell)]) dists = np.linalg.norm(self.mesh_coords[eids[:, 0]] - coord, axis=1) + \ np.linalg.norm(self.mesh_coords[eids[:, 1]] - coord, axis=1) idx = np.argwhere(dists - np.linalg.norm( self.mesh_coords[eids[:, 0]] - self.mesh_coords[eids[:, 1]], axis=1) < self.a_tol) n = cell.entities(0) try: nn.append([n[0], n[1], n[2], n[3], eids[idx[0][0]][0], eids[idx[0][0]][1]]) except IndexError: print(dists) lp(self.FE.MP.logger, 50, 'Error! Identified most probably a wrong source dof\n' 'which could be caused by:\n' ' - an unsuited search tolerance *a_tol*, ' 'suggestion: reduce to, e.g., *1e-6*\n' ' - an artifact in the ' 'mesh, e.g., unwanted rx/tx intersection)\n' ' - an unsuited number of mpi-processes\n' 'Aborting ...', root_only=False, barrier=False) #raise SystemExit for j in range(len(nn)): if self.mesh_coords[nn[j][5]][ self.component_switches[ti][j]] - \ self.mesh_coords[nn[j][4]][ self.component_switches[ti][j]] < \ 0.0 and self.target_directions[ti][j] < 0.0: sign.append(-1.) elif (self.mesh_coords[nn[j][5]][ self.component_switches[ti][j]] - self.mesh_coords[nn[j][4]][ self.component_switches[ti][j]] > 0.0 and self.target_directions[ti][j] > 0.0): sign.append(-1.) else: sign.append(1.) try: for local_node in range(4): if nn[j][4] == nn[j][local_node]: tmp1 = local_node elif nn[j][5] == nn[j][local_node]: tmp2 = local_node if tmp2 < tmp1: sign[j] *= -1. except Exception as e: lp(self.FE.MP.logger, 30, e, pre_dash=False, barrier=False) lp(self.FE.MP.logger, 30, 'Warning! dof with potentially wrong sign detected.\n' 'Please check if the fields look consistent! ' 'Continuing ...', pre_dash=False, root_only=False, barrier=False) if '_s' in self.FE.MP.approach: lengths.append(np.linalg.norm(self.mesh_coords[nn[j][4]] - self.mesh_coords[nn[j][5]])) midpoints.append(np.mean( np.array([self.mesh_coords[nn[j][4]], self.mesh_coords[nn[j][5]]]), axis=0)) tmp = np.arctan2( (self.mesh_coords[nn[j][4]][1] - self.mesh_coords[nn[j][5]][1]), (self.mesh_coords[nn[j][4]][0] - self.mesh_coords[nn[j][5]][0])) if sign[j] < 0.: tmp += np.pi orientations.append(tmp) n_local_source_dof = MPI.COMM_WORLD.gather(len(sign), root=0) n_global_sd = MPI.COMM_WORLD.bcast(np.sum(n_local_source_dof), root=0) if n_global_sd != 0: if not self.td_dc and ti == 0: lp(self.FE.MP.logger, 20, ' - ' + str(n_global_sd) + ' dof located for first ' 'transmitter - ', pre_dash=False) if ti == 1: lp(self.FE.MP.logger, 20, '... locating dof for further transmitters ...', pre_dash=False) else: lp(self.FE.MP.logger, 50, 'Error! Not a single source dof could be found ' 'for Tx number ' + str(ti) + '.\n' 'This might be caused by a wrong *s_type* or ' 'source geometry definition. ' 'Aborting ...') raise SystemExit self.sign.append(np.array(sign)) if '_s' in self.FE.MP.approach: tmp_l = MPI.COMM_WORLD.gather(lengths, root=0) all_lengths = MPI.COMM_WORLD.bcast(np.sum(tmp_l), root=0) self.lengths.append(np.array(all_lengths)) tmp_o = MPI.COMM_WORLD.gather(orientations, root=0) all_orientations = MPI.COMM_WORLD.bcast(np.sum(tmp_o), root=0) self.orientations.append(np.array(all_orientations)) tmp_m = MPI.COMM_WORLD.gather(midpoints, root=0) all_midpoints = MPI.COMM_WORLD.bcast(np.sum(tmp_m), root=0) self.midpoints.append(np.array(all_midpoints))
[docs] def find_and_fill_start_stop_dof(self, bb, current, ti=0, fi=0): """ Required for the A-V approach on mixed elements and the DC approach to set divergence term on the start and stop point of grounded wire sources. Required arguements ------------------- - bb, dolfin function assembled right-hand side function - current, type float, source current for the corresponsing tx Keyword arguments ----------------- - ti = 0, type int index of the corresponding tx """ if not self.FE.grounding[ti]: return(bb) if self.FE.s_type == 'hed': lp(self.FE.MP.logger, 50, 'Error, start and stop dof evaluation for total potential' ' approaches not implemented for HED sources.\n' 'Use a "Line" source with start and stop values instead. ' 'Aborting ...') raise SystemExit s1 = np.empty(0).astype('int') s2 = np.empty(0).astype('int') if self.FE.s_type in ['line', 'hed']: for j in range(len(self.xyz)): if df.near(self.xyz[j, 0], self.FE.start[0], self.a_tol) and\ df.near(self.xyz[j, 1], self.FE.start[1], self.a_tol) and\ df.near(self.xyz[j, 2], self.FE.start[2], self.a_tol): if j in self.nodal_dof_re: s1 = np.array([j]).astype('int') elif (df.near(self.xyz[j, 0], self.FE.stop[0], self.a_tol)and df.near(self.xyz[j, 1], self.FE.stop[1], self.a_tol)and df.near(self.xyz[j, 2], self.FE.stop[2], self.a_tol)): if j in self.nodal_dof_re: s2 = np.array([j]).astype('int') elif self.FE.s_type == 'path': if type(self.FE.tx[ti]) is list: tx = np.array(self.FE.tx[ti]) else: tx = self.FE.tx[ti] for j in range(len(self.xyz)): if df.near(self.xyz[j, 0], tx[0, 0], self.a_tol) and \ df.near(self.xyz[j, 1], tx[0, 1], self.a_tol) and \ df.near(self.xyz[j, 2], tx[0, 2], self.a_tol): if j in self.nodal_dof_re: s1 = np.array([j]).astype('int') elif (df.near(self.xyz[j, 0], tx[-1, 0], self.a_tol) and df.near(self.xyz[j, 1], tx[-1, 1], self.a_tol) and df.near(self.xyz[j, 2], tx[-1, 2], self.a_tol)): if j in self.nodal_dof_re: s2 = np.array([j]).astype('int') if 'Am' in self.FE.MP.approach: bb.vector()[s1] = current * (1./self.FE.MP.omegas[fi]) bb.vector()[s2] = -current * (1./self.FE.MP.omegas[fi]) elif 'An' in self.FE.MP.approach: bb[s1] = current * (1./self.FE.MP.omegas[fi]) bb[s2] = -current * (1./self.FE.MP.omegas[fi]) elif 'DC' in self.FE.MP.approach or self.td_dc: bb.vector()[s1] = current bb.vector()[s2] = -current return(bb)
[docs] def find_for_hed(self): """ Find source dof and coordinates for HED source. """ lp(self.FE.MP.logger, 20, ' - using HED source - ', pre_dash=False) if 'Am' in self.FE.MP.approach: lp(self.FE.MP.logger, 50, 'Error! HED source for Am approach on mixed elements not ' 'implemented yet! You may choose a very short line source ' 'instead. Note in that case, the line must start and end ' 'at nodes!') raise SystemExit self.FE.start[:] = self.FE.origin[:] self.FE.stop[:] = self.FE.origin[:] if df.near(self.FE.azimuth, 0.): self.FE.start[0] = self.FE.origin[0] - self.FE.length/2. self.FE.stop[0] = self.FE.origin[0] + self.FE.length/2. elif df.near(self.FE.azimuth, 180.): self.FE.start[0] = self.FE.origin[0] + self.FE.length/2. self.FE.stop[0] = self.FE.origin[0] - self.FE.length/2. elif df.near(self.FE.azimuth, 90.): self.FE.start[1] = self.FE.origin[1] - self.FE.length/2. self.FE.stop[1] = self.FE.origin[1] + self.FE.length/2. elif df.near(self.FE.azimuth, 270.): self.FE.start[1] = self.FE.origin[0] + self.FE.length/2. self.FE.stop[1] = self.FE.origin[0] - self.FE.length/2. else: lp(self.FE.MP.logger, 50, 'Error! arbitrary directected HED for total field ' 'approach not implemented.\nPlease choose azimuth ' 'values of *0*, *90*, *180* or *270* and consider using ' 'the automated Tx handling\nfor grounded or ungrounded sources ' 'described as *path(s)*. Aborting ...') raise SystemExit self.find_for_line()
[docs] def find_for_vmd(self): """ Find source dof and coordinates for VMD source. """ lp(self.FE.MP.logger, 50, 'Error! Manual assembly for total-field VMD sources not supported.' '\nOnly dummy function for testing. Aborting ...') raise SystemExit
[docs] def find_for_line(self): """ Find source dof and coordinates for "line" source. """ if self.FE.start[0] < self.FE.stop[0]: x0, x1 = self.FE.start[0], self.FE.stop[0] else: x1, x0 = self.FE.start[0], self.FE.stop[0] if self.FE.start[1] < self.FE.stop[1]: y0, y1 = self.FE.start[1], self.FE.stop[1] else: y1, y0 = self.FE.start[1], self.FE.stop[1] if self.FE.start[1] == self.FE.stop[1] and \ self.FE.start[2] == self.FE.stop[2] and \ str(self.FE.MP.topo) == 'None': lp(self.FE.MP.logger, 20, ' - using straight x-directed *line* source - ', pre_dash=False) for j in range(len(self.xyz)): if df.near(self.xyz[j, 1], self.FE.start[1], self.a_tol) and \ df.near(self.xyz[j, 2], self.FE.start[2], self.a_tol) and \ self.xyz[j, 0] > x0 and self.xyz[j, 0] < x1: self.append_lists(j) elif (self.FE.start[0] == self.FE.stop[0] and self.FE.start[2] == self.FE.stop[2] and str(self.FE.MP.topo) == 'None'): lp(self.FE.MP.logger, 20, ' - using straight y-directed *line* source - ', pre_dash=False) for j in range(len(self.xyz)): if df.near(self.xyz[j, 0], self.FE.start[0], self.a_tol) and \ df.near(self.xyz[j, 2], self.FE.start[2], self.a_tol) and \ self.xyz[j, 1] > y0 and self.xyz[j, 1] < y1: self.append_lists(j, comp=1) elif self.FE.start[1] == self.FE.stop[1]: lp(self.FE.MP.logger, 20, ' - using x-directed *line* source with topography - ', pre_dash=False) for j in range(len(self.xyz)): if df.near(self.xyz[j, 1], self.FE.start[1], self.a_tol) and \ self.xyz[j, 0] > x0 and self.xyz[j, 0] < x1: self.append_lists(j) elif self.FE.start[0] == self.FE.stop[0]: lp(self.FE.MP.logger, 20, ' - using y-directed *line* source with topography - ', pre_dash=False) for j in range(len(self.xyz)): if df.near(self.xyz[j, 0], self.FE.start[0], self.a_tol) and \ self.xyz[j, 1] > y0 and self.xyz[j, 1] < y1: self.append_lists(j, comp=1) else: lp(self.FE.MP.logger, 50, 'Error! Using *line* sources with arbitrary direction is not ' 'implemented.\n Choose arbtrary *path* sources instead. ' 'Aborting ...') raise SystemExit self.source_dof.append(np.array(self.tmp_sd)) self.source_dof_coords.append(np.array(self.tmp_sdc)) self.target_directions.append(np.array(self.tmp_td)) self.component_switches.append(np.array(self.tmp_cs))
[docs] def find_for_loop_r(self): """ Find source dof and coordinates for "loop_r" source. """ if self.FE.start[0] < self.FE.stop[0]: x0, x1 = self.FE.start[0], self.FE.stop[0] y0, y1 = self.FE.start[1], self.FE.stop[1] else: x1, x0 = self.FE.start[0], self.FE.stop[0] y1, y0 = self.FE.start[1], self.FE.stop[1] z = self.FE.start[2] if df.near(self.FE.start[0], self.FE.stop[0], self.a_tol) or \ df.near(self.FE.start[1], self.FE.stop[1], self.a_tol): lp(self.FE.MP.logger, 50, 'Error! Given start and stop coordinates do not ' 'match to a rectangular loop source: x or y start or ' 'stop coordinates are identical. Aborting ...') raise SystemExit if str(self.FE.MP.topo) == 'None': lp(self.FE.MP.logger, 20, ' - using rectangular *loop* source with constant height' ' - ', pre_dash=False) for j in range(len(self.xyz)): if df.near(self.xyz[j, 1], y0, self.a_tol) and \ df.near(self.xyz[j, 2], z, self.a_tol) and \ self.xyz[j, 0] > x0 and self.xyz[j, 0] < x1: self.append_lists(j) elif (df.near(self.xyz[j, 0], x1, self.a_tol) and df.near(self.xyz[j, 2], z, self.a_tol) and self.xyz[j, 1] > y0 and self.xyz[j, 1] < y1): self.append_lists(j, comp=1) elif (df.near(self.xyz[j, 1], y1, self.a_tol) and df.near(self.xyz[j, 2], z, self.a_tol) and self.xyz[j, 0] > x0 and self.xyz[j, 0] < x1): self.append_lists(j, tdir=-1.) elif (df.near(self.xyz[j, 0], x0, self.a_tol) and df.near(self.xyz[j, 2], z, self.a_tol) and self.xyz[j, 1] > y0 and self.xyz[j, 1] < y1): self.append_lists(j, tdir=-1., comp=1) else: lp(self.FE.MP.logger, 20, ' - using rectangular *loop* source with topography - ', pre_dash=False) for j in range(len(self.xyz)): if df.near(self.xyz[j, 1], y0, self.a_tol) and \ self.xyz[j, 0] > x0 and self.xyz[j, 0] < x1: self.append_lists(j) elif (df.near(self.xyz[j, 0], x1, self.a_tol) and self.xyz[j, 1] > y0 and self.xyz[j, 1] < y1): self.append_lists(j, comp=1) elif (df.near(self.xyz[j, 1], y1, self.a_tol) and self.xyz[j, 0] > x0 and self.xyz[j, 0] < x1): self.append_lists(j, tdir=-1.) elif (df.near(self.xyz[j, 0], x0, self.a_tol) and self.xyz[j, 1] > y0 and self.xyz[j, 1] < y1): self.append_lists(j, tdir=-1., comp=1) self.source_dof.append(np.array(self.tmp_sd)) self.source_dof_coords.append(np.array(self.tmp_sdc)) self.target_directions.append(np.array(self.tmp_td)) self.component_switches.append(np.array(self.tmp_cs))
[docs] def find_for_path(self): """ Find source dof and coordinates for arbitrary grounded or ungrounded transmitters described by a path of coordinates. """ if type(self.FE.grounding) is not list: self.FE.grounding = [self.FE.grounding] for ti, tx in enumerate(self.FE.tx): if type(tx) is list: tx = np.array(tx) if self.FE.grounding[ti]: if ti == 0: lp(self.FE.MP.logger, 20, ' - using arbitrary *line* source from path - ', pre_dash=False) else: if ti == 0: lp(self.FE.MP.logger, 20, ' - using arbitrary *loop* source from path - ', pre_dash=False) tx = np.vstack((tx, tx[0, :])) if ti == 1: lp(self.FE.MP.logger, 20, '... building ' + str(self.FE.n_tx - 1) + ' further ' 'path transmitters ...', pre_dash=False) if str(self.FE.MP.topo) != 'None': tx = self.add_topo(tx, self.FE.tx_topo) for j in range(len(tx) - 1): dists = np.linalg.norm(tx[j].reshape(1, 3) - self.xyz, axis=1) + \ np.linalg.norm(tx[j+1].reshape(1, 3) - self.xyz, axis=1) tmp = np.argwhere(dists - np.linalg.norm( tx[j] - tx[j + 1]) < self.a_tol) tdir, comp = self.eval_target_dir(tx[j], tx[j + 1]) if len(tmp) > 0: for idx in tmp.flatten(): self.append_lists(idx, tdir=tdir, comp=comp) self.source_dof.append(np.array(self.tmp_sd)) self.source_dof_coords.append(np.array(self.tmp_sdc)) self.target_directions.append(np.array(self.tmp_td)) self.component_switches.append(np.array(self.tmp_cs)) self.tmp_sd = [] self.tmp_sdc = [] self.tmp_td = [] self.tmp_cs = []
[docs] def append_lists(self, idx, tdir=1., comp=0): """ Utility function appending dof, coordinates, directions and component switches to lists during the dof-coordinate search. Required arguements ------------------- - idx, type list of int integers specifying the transmitter dof Keyword arguments ----------------- - tdir, type float either positive or negative *1* - comp = 0, type int either 0 or 1 for x or y component """ if 'E' in self.FE.MP.approach or 'H' in self.FE.MP.approach: self.tmp_sd.append(self.ned_dof_im[idx]) self.tmp_sdc.append(self.xyz[idx, :]) self.tmp_td.append(tdir) self.tmp_cs.append(comp) elif 'Am' in self.FE.MP.approach: if idx in self.ned_dof_re: self.tmp_sd.append(self.all_dof[idx]) self.tmp_sdc.append(self.xyz[idx, :]) self.tmp_td.append(tdir) self.tmp_cs.append(comp)
[docs] def eval_target_dir(self, a, b): """ Evaluate global direction of each transmitter segment. Required arguements ------------------- - a, b, type array/list of len 3 start and stop coordinates to find the global positive or negative target direction, counter-clockwise in a right-handed coordinate system """ if np.isclose(np.abs(a[0] - b[0]), np.abs(a[1] - b[1]), atol=self.a_tol): if a[2] < b[2]: return(1., 2) else: return(-1., 2) elif np.abs(a[0] - b[0]) > np.abs(a[1] - b[1]): if a[0] < b[0]: return(1., 0) else: return(-1., 0) elif np.abs(a[0] - b[0]) < np.abs(a[1] - b[1]): if a[1] < b[1]: return(1., 1) else: return(-1., 1)
[docs] def test_An_total(self, fi): """ Experimental total A-Phi nodal approach, use with caution and contact the authors for advise. """ if self.FE.n_tx > 1: lp(self.FE.MP.logger, 50, 'Error! Multiple right-hand sides not supported for\n' 'experimental A-V nodal approach. Aborting ...') raise SystemExit t0 = time.time() A, b = df.PETScMatrix(), df.PETScVector() self.FE.FS.add_dirichlet_bc() lp(self.FE.MP.logger, 20, '... applying zero Dirichlet boundary ' 'conditions ...', pre_dash=False) self.A, bb = df.assemble_system( self.FE.L[0], self.FE.R[0], self.FE.FS.bc, A_tensor=A, b_tensor=b) self.FE.assembly_time += time.time() - t0 if 'hed' in self.FE.s_type or 'line' in self.FE.s_type: dof = [] if self.FE.start[1] == self.FE.stop[1] and \ self.FE.start[2] == self.FE.stop[2] and \ str(self.FE.MP.topo) == 'None': lp(self.FE.MP.logger, 20, ' - using straight x-directed *line* source - ', pre_dash=False) for j in range(len(self.xyz)): if df.near(self.xyz[j, 1], self.FE.start[1], self.a_tol) and \ df.near(self.xyz[j, 2], self.FE.start[2], self.a_tol) and \ self.xyz[j, 0] > self.FE.start[0] - self.a_tol and \ self.xyz[j, 0] < self.FE.stop[0] + self.a_tol: if j in self.x_dof: dof.append(j) tx_len = np.abs(self.FE.start[0] - self.FE.stop[0]) elif (self.FE.start[0] == self.FE.stop[0] and self.FE.start[2] == self.FE.stop[2] and str(self.FE.MP.topo) == 'None'): lp(self.FE.MP.logger, 20, ' - using straight y-directed *line* source - ', pre_dash=False) for j in range(len(self.xyz)): if df.near(self.xyz[j, 0], self.FE.start[0], self.a_tol) and \ df.near(self.xyz[j, 2], self.FE.start[2], self.a_tol) and \ self.xyz[j, 1] > self.FE.start[1] - self.a_tol and \ self.xyz[j, 1] < self.FE.stop[1] + self.a_tol: if j in self.y_dof: dof.append(j) tx_len = np.abs(self.FE.start[1] - self.FE.stop[1]) else: lp(self.FE.MP.logger, 50, 'Start', self.FE.start) lp(self.FE.MP.logger, 50, 'Stop', self.FE.stop) lp(self.FE.MP.logger, 50, 'Topo', self.FE.MP.topo) lp(self.FE.MP.logger, 50, 'Currently, straight lines in x- or y- direction ' 'required. Aborting ...') raise SystemExit n_local_source_dof = MPI.COMM_WORLD.gather(len(dof), root=0) n_global_sd = MPI.COMM_WORLD.bcast(np.sum(n_local_source_dof), root=0) if n_global_sd != 0: lp(self.FE.MP.logger, 20, ' - ' + str(n_global_sd) + ' dof located for first ' 'transmitter - ', pre_dash=False) else: lp(self.FE.MP.logger, 50, 'Fatal Error! Not a single source dof could be found ' 'for Tx number ' + str(0) + '.\n' 'This is most probably caused by a wrong *s_type* or ' 'source geometry definition. Aborting') raise SystemExit for ti in range(self.FE.n_tx): bb[dof] = -tx_len * (1./(n_global_sd)) * \ (1./self.FE.MP.omegas[fi]) * self.FE.MP.current bb = self.find_and_fill_start_stop_dof(bb, self.FE.MP.current) bb.apply('insert') self.b.append(bb) self.FE.system_size = str(A.mat().getSize()[0]) lp(self.FE.MP.logger, 20, ' - system matrix size --> ' + self.FE.system_size + ' - ', pre_dash=False) elif 'loop_r' in self.FE.s_type: dof = [[], [], [], []] if self.FE.start[0] < self.FE.stop[0]: x0, x1 = self.FE.start[0], self.FE.stop[0] y0, y1 = self.FE.start[1], self.FE.stop[1] else: x1, x0 = self.FE.start[0], self.FE.stop[0] y1, y0 = self.FE.start[1], self.FE.stop[1] z = self.FE.start[2] for j in range(len(self.xyz)): if df.near(self.xyz[j, 1], y0, self.a_tol) and \ df.near(self.xyz[j, 2], z, self.a_tol) and \ self.xyz[j, 0] > x0 - self.a_tol and \ self.xyz[j, 0] < x1 + self.a_tol: if j in self.x_dof: dof[0].append(j) elif (df.near(self.xyz[j, 0], x1, self.a_tol) and df.near(self.xyz[j, 2], z, self.a_tol) and self.xyz[j, 1] > y0 - self.a_tol and self.xyz[j, 1] < y1 + self.a_tol): if j in self.y_dof: dof[1].append(j) elif (df.near(self.xyz[j, 1], y1, self.a_tol) and df.near(self.xyz[j, 2], z, self.a_tol) and self.xyz[j, 0] > x0 - self.a_tol and self.xyz[j, 0] < x1 + self.a_tol): if j in self.x_dof: dof[2].append(j) elif (df.near(self.xyz[j, 0], x0, self.a_tol) and df.near(self.xyz[j, 2], z, self.a_tol) and self.xyz[j, 1] > y0 - self.a_tol and self.xyz[j, 1] < y1 + self.a_tol): if j in self.y_dof: dof[3].append(j) tx_len = (2. * (np.abs(self.FE.start[1] - self.FE.stop[1])) + 2. * (np.abs(self.FE.start[0] - self.FE.stop[0]))) all_dof = [] for side in range(4): n_local_source_dof = MPI.COMM_WORLD.gather(len(dof[side]), root=0) all_dof.append(MPI.COMM_WORLD.bcast( np.sum(n_local_source_dof), root=0)) n_global_sd = sum(all_dof) if n_global_sd != 0: lp(self.FE.MP.logger, 20, ' - ' + str(n_global_sd) + ' dof located for first ' 'transmitter - ', pre_dash=False) else: lp(self.FE.MP.logger, 50, 'Fatal Error! Not a single source dof could be found ' 'for Tx number ' + str(0) + '.\n' 'This is most probably caused by a wrong *s_type* or ' 'source geometry definition. Aborting') raise SystemExit for ti in range(self.FE.n_tx): for side in range(2): bb[dof[side]] = -tx_len * (1./(n_global_sd)) *\ (1./self.FE.MP.omegas[fi]) * self.FE.MP.current for side in range(2, 4): bb[dof[side]] = tx_len * (1./(n_global_sd)) *\ (1./self.FE.MP.omegas[fi]) * self.FE.MP.current bb.apply('insert') self.b.append(bb) self.FE.system_size = str(A.mat().getSize()[0]) lp(self.FE.MP.logger, 20, ' - system matrix size --> ' + self.FE.system_size + ' - ', pre_dash=False) else: lp(self.FE.MP.logger, 50, 'Error! Total A-phi nodal approach only test-wise implemented ' 'for *line* or *loop_r* sources for now. Aborting ...') raise SystemExit
[docs] def add_topo(self, tx, tx_topo=None): """ Apply topography information to the z-values of the transmitter coordinates. Required arguements ------------------- - tx, type list of coordinates list of transmitter coordinates Keyword arguments ----------------- - tx_topo = None, type str or function user might specify a custom topography reference here, not recommended, only implemented for tests """ if tx_topo is None: topo = '_surf' else: topo = '_' + tx_topo + '_triangulated' from custEM.meshgen.dem_interpolator import DEM dem = DEM(self.FE.MP.para_dir + '/' + self.FE.MP.mesh_name + topo + '.xyz', centering=False, easting_shift=None, northing_shift=None) tx[:, 2] = dem(tx[:, 0], tx[:, 1], rotation=None) return(tx)
[docs] class SecondaryFieldAssembler: """ Class that assembles the system matrix and the right-hand-sides for secondary-field formulations based on primary fields and delta_sigma. Methods ------- - assemble() assembly function of left- and right-hand sides for secondary-field formulations """ def __init__(self, FE, bc): """ Assembly of system matrix and right-hand side vector. Required arguments ------------------ - FE, type class FE-approach (e.g., **E_vector**) instance - bc, None or type dolfin DirichletBC see **MOD** class function: **solve_main_problem()** """ self.FE = FE self.bc = bc
[docs] def assemble(self, fi=0): """ Let dolfin assemble the system matrix and right-hand side vectors. Keyword arguments ----------------- - fi = 0, type int iteration index over frequencies for specifying the left-hand-side """ lp(self.FE.MP.logger, 20, 'Source vector and system matrix assembly:') t0 = time.time() lp(self.FE.MP.logger, 20, '... assembling system matrix ...', pre_dash=False) self.b = [df.PETScVector() for n in range(self.FE.n_tx)] if (self.FE.MP.system_it==False): self.A= df.PETScMatrix() if (self.FE.MP.system_it==True): self.Asys= df.PETScMatrix() self.Hh = df.PETScMatrix() self.Adiag = df.PETScMatrix() self.Aoffdiag= df.PETScMatrix() if fi != 0: self.FE.FS.PF.load_primary_fields(fi=fi) self.FE.rhs_form_secondary(fi) for ti in range(self.FE.n_tx): if ti == 0: if self.bc is None: if (self.FE.MP.system_it==False): df.assemble_system(self.FE.L[fi], self.FE.R[0], A_tensor=self.A, b_tensor=self.b[ti]) if (self.FE.MP.system_it==True): df.assemble_system(self.FE.L[fi], self.FE.R[0], A_tensor=self.Asys, b_tensor=self.b[ti]) df.assemble(self.FE.H[fi],tensor=self.Hh) df.assemble(self.FE.A[fi],tensor=self.Adiag) df.assemble(self.FE.B[fi],tensor=self.Aoffdiag) elif self.bc in ['ZeroDirichlet', 'ZD']: self.FE.FS.add_dirichlet_bc() lp(self.FE.MP.logger, 20, '... applying zero Dirichlet boundary ' 'conditions ...', pre_dash=False) df.assemble_system( self.FE.L[fi], self.FE.R[ti], self.FE.FS.bc, A_tensor=self.A, b_tensor=self.b[ti]) else: lp(self.FE.MP.logger, 50, 'Error, Choose either None (default) or "ZD" or ' '"ZeroDirichlet" as boundary condition for secondary ' 'field formulations. Aborting ...') raise SystemExit else: df.assemble(self.FE.R[ti], tensor=self.b[ti]) if self.bc in ['ZeroDirichlet', 'ZD']: self.FE.FS.bc.apply(self.b[ti]) self.FE.assembly_time += time.time() - t0 lp(self.FE.MP.logger, 20, ' - assembly time (s) --> ' + str(time.time() - t0) + ' - ', pre_dash=False) if fi == 0: if (self.FE.MP.system_it==False): self.FE.system_size = str(self.A.mat().getSize()[0]) if (self.FE.MP.system_it==True): self.FE.system_size = str(self.Hh.mat().getSize()[0]) lp(self.FE.MP.logger, 20, ' - system matrix size --> ' + self.FE.system_size + ' - ', pre_dash=False)
[docs] class MTAssembler: """ Class that assembles the system matrix and the right-hand-sides (two source polarizations implemented as boundary conditions on the top of the model domain) for natural-source modeling. Methods ------- - assemble() assembly function of left- and right-hand sides for secondary-field formulations """ def __init__(self, FE, bc): """ Assembly of system matrix and right-hand side vector. Required arguments ------------------ - FE, type class FE-approach (e.g., **E_vector**) instance - bc, None or type dolfin DirichletBC see **MOD** class function: **solve_main_problem()** """ self.FE = FE self.bc = bc
[docs] def assemble(self, fi=0): lp(self.FE.MP.logger, 20, 'Source vector and system matrix assembly:') t0 = time.time() lp(self.FE.MP.logger, 20, '... assembling system matrix ...', pre_dash=False) zmax_local = np.max(self.FE.FS.mesh.coordinates()[:].reshape( -1, 3)[:, 2]) tmp = self.FE.MP.mpi_cw.gather(zmax_local, root=0) if self.FE.MP.mpi_cw.Get_rank() == 0: max_tmp = np.max(np.array(tmp).flatten()) else: max_tmp = None zmax = self.FE.MP.mpi_cw.bcast(max_tmp, root=0) def boundary_t(x, on_boundary): return on_boundary and (df.near(x[2], zmax, self.FE.a_tol)) def other_boundary(x, on_boundary): return on_boundary and (x[2] < zmax) t0 = time.time() pol_1 = df.Constant(("1.0", "0.0", "0.0", "1.0", "0.0", "0.0")) pol_2 = df.Constant(("0.0", "1.0", "0.0", "0.0", "1.0", "0.0")) zero_1 = df.Constant(("0.0", "0.0", "0.0", "0.0", "0.0", "0.0")) zero_2 = df.Constant(("0.0", "0.0", "0.0", "0.0", "0.0", "0.0")) if self.bc in ['ZD', 'ZeroDirichlet']: bc_1 = [df.DirichletBC(self.FE.FS.M, pol_1, boundary_t), df.DirichletBC(self.FE.FS.M, zero_1, other_boundary)] bc_2 = [df.DirichletBC(self.FE.FS.M, pol_2, boundary_t), df.DirichletBC(self.FE.FS.M, zero_2, other_boundary)] else: # implicit boundary conditions on non-top boundary bc_1 = df.DirichletBC(self.FE.FS.M, pol_1, boundary_t) bc_2 = df.DirichletBC(self.FE.FS.M, pol_2, boundary_t) bcs = [bc_1, bc_2] self.b = [df.PETScVector(), df.PETScVector()] if (self.FE.MP.system_it==False): self.A= df.PETScMatrix() if (self.FE.MP.system_it==True): self.Asys= df.PETScMatrix() self.Hh = df.PETScMatrix() self.Adiag = df.PETScMatrix() self.Aoffdiag= df.PETScMatrix() if '_s' not in self.FE.MP.approach: for ti in range(self.FE.n_tx): # forced to be 2 #if ti == 0: if (self.FE.MP.system_it==False): df.assemble_system(self.FE.L[fi], self.FE.R[0], bcs[ti], A_tensor=self.A, b_tensor=self.b[ti]) if (self.FE.MP.system_it==True): df.assemble_system(self.FE.L[fi], self.FE.R[0], bcs[ti], A_tensor=self.Asys, b_tensor=self.b[ti]) df.assemble(self.FE.H[fi],tensor=self.Hh) df.assemble(self.FE.A[fi],tensor=self.Adiag) df.assemble(self.FE.B[fi],tensor=self.Aoffdiag) #df.assemble_system( # self.FE.L[fi], self.FE.R[0], bcs[ti], # A_tensor=self.A, b_tensor=self.b[ti]) #else: # df.assemble(self.FE.R[0], tensor=self.b[ti]) # bc_2.apply(self.b[ti]) else: if fi != 0: self.FE.FS.PF.load_primary_fields(fi=fi) self.FE.rhs_form_secondary(fi) for ti in range(self.FE.n_tx): # forced to be 2 if self.bc is None: if (self.FE.MP.system_it==False): df.assemble_system(self.FE.L[fi], self.FE.R[0], A_tensor=self.A, b_tensor=self.b[ti]) if (self.FE.MP.system_it==True): df.assemble_system(self.FE.L[fi], self.FE.R[0], A_tensor=self.Asys, b_tensor=self.b[ti]) df.assemble(self.FE.H[fi],tensor=self.Hh) df.assemble(self.FE.A[fi],tensor=self.Adiag) df.assemble(self.FE.B[fi],tensor=self.Aoffdiag) #df.assemble_system(self.FE.L[fi], self.FE.R[ti], # A_tensor=self.A, # b_tensor=self.b[ti]) elif self.bc in ['ZeroDirichlet', 'ZD']: self.FE.FS.add_dirichlet_bc() lp(self.FE.MP.logger, 20, '... applying zero Dirichlet boundary ' 'conditions ...', pre_dash=False) df.assemble_system( self.FE.L[fi], self.FE.R[ti], self.FE.FS.bc, A_tensor=self.A, b_tensor=self.b[ti]) self.FE.assembly_time += time.time() - t0 lp(self.FE.MP.logger, 20, ' - assembly time (s) --> ' + str(time.time() - t0) + ' - ', pre_dash=False) if fi == 0: if (self.FE.MP.system_it==False): self.FE.system_size = str(self.A.mat().getSize()[0]) if (self.FE.MP.system_it==True): self.FE.system_size = str(self.Hh.mat().getSize()[0]) lp(self.FE.MP.logger, 20, ' - system matrix size --> ' + self.FE.system_size + ' - ', pre_dash=False)