# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""
import dolfin as df
import numpy as np
from custEM.misc import logger_print as lp
from custEM.core import Solver
import time
from mpi4py import MPI
[docs]
class TotalFieldAssembler:
"""
Class for setting the assembled contribution of the current density on
transmitter dof in the right-hand side vector for total field formulations.
Methods
-------
- assemble()
general assembly function of left- and right-hand sides for
total-field formulations; calls all the other methods
- find_dof_coords()
utility function searching for all Nedelec dof and whose
coordinates which belong to the source defined on edges
- find_cells()
for each dof which belongs to the source, search for an element
which contains the former
- find_directions()
evaluate the local direction of the Nedelec dof in the element
which contains it and switch the signto match the global direction
- find_and_fill_start_stop_dof()
required for the A_Phi approach on mixed elements to set divergence
term on the start and stop point of grounded wire sources
- find_for_hed(), find_for_vmd()
utility functions called by **find_dof_coords()**
- find_for_line(), find_for_loop_r(), find_for_path()
utility functions called by **find_dof_coords()**
- append_lists()
utility function to collect the transmitter dof
- eval_target_dir()
utility function to find the global direction of an edge
- test_An_total(),
method for testing total A-V formulation on nodal elements
- add_topo(),
modify z-values of coordinates according to specified topography
"""
def __init__(self, FE, bc, td_dc=False, force_p1=False):
"""
Assembly of system matrix and right-hand side vector.
Required arguments
------------------
- FE, type class
FE-approach (e.g., **E_vector**) instance
- bc, None or type dolfin DirichletBC
see **MOD** class function: **solve_main_problem()**
Keyword arguments
-----------------
- td_dc = False, type bool
only for internal usage; specifiy a dc field computation on the
fly for simulating shut-off transients with time-domain approaches
- force_p1 = True, type bool
required for alternative coordinate search if the
*TotalFieldAssembler* class is used to find edge-midpoints as
Tx coordinates for the calculation of primary fields instead
of specifying the nodes
"""
self.FE = FE
self.bc = bc
self.td_dc = td_dc
if FE.tx is not None and len(FE.tx) != FE.n_tx and \
FE.n_tx != 1:
lp(self.FE.MP.logger, 50,
'Fatal error! "len(tx):" ' + str(len(FE.tx)) + ' must be '
'equal to "n_tx:" ' + str(FE.n_tx) + '. Aborting ...')
raise SystemExit
ownership_offset = self.FE.FS.M.dofmap().ownership_range()
if 'Am' in self.FE.MP.approach or 'An' in self.FE.MP.approach:
self.xyz = self.FE.FS.M.tabulate_dof_coordinates().reshape(-1, 3)
elif 'DC' in self.FE.MP.approach or self.td_dc:
self.xyz = self.FE.FS.S.tabulate_dof_coordinates().reshape(-1, 3)
self.nodal_dof_re = (np.array(self.FE.FS.S.dofmap(
).dofs()) - self.FE.FS.S.dofmap().ownership_range()[0])
else:
self.xyz = self.FE.FS.V.tabulate_dof_coordinates().reshape(-1, 3)
if self.FE.MP.approach in ['E_IE', 'E_RA'] and not self.td_dc:
self.xyz = self.FE.FS.V.tabulate_dof_coordinates().reshape(-1, 3)
ownership_offset = self.FE.FS.V.dofmap().ownership_range()
self.ned_dof_im = (np.array(self.FE.FS.V.dofmap().dofs()) -
ownership_offset[0])
elif 'E' in self.FE.MP.approach or 'H' in self.FE.MP.approach:
if force_p1:
V = df.FunctionSpace(self.FE.FS.mesh, 'N1curl', 1)
self.xyz = V.tabulate_dof_coordinates().reshape(-1, 3)
ele = df.FiniteElement("N1curl", self.FE.FS.mesh.ufl_cell(), 1)
M = df.FunctionSpace(self.FE.FS.mesh,
df.MixedElement([ele, ele]))
ownership_offset = M.dofmap().ownership_range()
self.ned_dof_re = (np.array(M.sub(0).dofmap(
).dofs()) - ownership_offset[0])/2
self.ned_dof_im = (np.array(M.sub(0).dofmap(
).dofs()) - ownership_offset[0])/2+1768
else:
self.ned_dof_re = (np.array(self.FE.FS.M.sub(0).dofmap(
).dofs()) - ownership_offset[0])
self.ned_dof_im = (np.array(self.FE.FS.M.sub(1).dofmap(
).dofs()) - ownership_offset[0])
elif 'Am' in self.FE.MP.approach:
self.all_dof = (np.array(self.FE.FS.M.dofmap(
).dofs()) - ownership_offset[0])
self.ned_dof_re = (np.array(self.FE.FS.M.sub(0).dofmap(
).dofs()) - ownership_offset[0])
self.nodal_dof_re = (np.array(self.FE.FS.M.sub(2).dofmap(
).dofs()) - ownership_offset[0])
elif 'An' in self.FE.MP.approach:
self.x_dof = (np.array(self.FE.FS.M.sub(0).dofmap().dofs()) -
ownership_offset[0])
self.y_dof = (np.array(self.FE.FS.M.sub(1).dofmap().dofs()) -
ownership_offset[0])
self.nodal_dof_re = (np.array(self.FE.FS.M.sub(3).dofmap(
).dofs()) - ownership_offset[0])
self.mesh_coords = self.FE.FS.mesh.coordinates()
self.source_dof, self.tmp_sd = [], []
self.source_dof_coords, self.tmp_sdc = [], []
self.target_directions, self.tmp_td = [], []
self.component_switches, self.tmp_cs = [], []
self.a_tol = self.FE.a_tol
[docs]
def assemble(self, fi=0):
"""
Conduct right-hand side assembly of source currents manually
and let dolfin assemble the system matrix.
Keyword arguments
-----------------
- fi = 0, type int
iteration index over frequencies for specifying the left-hand-side
"""
self.b = []
if 'An' in self.FE.MP.approach:
self.test_An_total(fi)
return
t0 = time.time()
if fi == 0:
lp(self.FE.MP.logger, 20,
'Source vector and system matrix assembly:')
if self.FE.MP.approach != 'DC':
self.find_dof_coords()
self.find_cells()
self.find_directions()
if '_s' in self.FE.MP.approach:
return
if self.FE.MP.approach in ['E_IE', 'E_RA'] and not self.td_dc:
for ti in range(self.FE.n_tx):
bb = df.Function(self.FE.FS.V)
bb.vector()[self.source_dof[ti]] = (self.sign[ti] *
self.FE.MP.currents[ti] /
np.float(self.FE.FS.p))
bb.vector().apply('insert')
self.b.append(bb)
return
if (self.FE.MP.system_it==False):
self.A, bb = df.PETScMatrix(), df.PETScVector()
if (self.FE.MP.system_it==True):
self.Asys, bb = df.PETScMatrix(), df.PETScVector()
self.Hh = df.PETScMatrix()
self.Adiag = df.PETScMatrix()
self.Aoffdiag= df.PETScMatrix()
lp(self.FE.MP.logger, 20,
'... assembling system matrix ...', pre_dash=False)
if self.bc is None:
if (self.FE.MP.system_it==False):
df.assemble_system(self.FE.L[fi], self.FE.R[0],
A_tensor=self.A, b_tensor=bb)
if (self.FE.MP.system_it==True):
df.assemble_system(self.FE.L[fi], self.FE.R[0],
A_tensor=self.Asys, b_tensor=bb)
df.assemble(self.FE.H[fi],tensor=self.Hh)
df.assemble(self.FE.A[fi],tensor=self.Adiag)
df.assemble(self.FE.B[fi],tensor=self.Aoffdiag)
else:
if self.bc in ['ZD', 'ZeroDirichlet']:
self.FE.FS.add_dirichlet_bc()
lp(self.FE.MP.logger, 20,
'... applying zero Dirichlet boundary '
'conditions ...', pre_dash=False)
elif self.bc in ['ID', 'InhomogeneousDirichlet']:
self.FE.FS.add_dirichlet_bc(self.FE.MP, self.ned_dof_re,
self.ned_dof_im)
lp(self.FE.MP.logger, 20,
'... applying inhomogeneous Dirichlet boundary '
'conditions ...', pre_dash=False)
if self.FE.MP.approach not in ['E_t', 'Am_t']:
lp(self.FE.MP.logger, 50,
'Error! Inhomogeneous Dirichlet conditions only '
'supported for total electric field or mixed-space '
'potential approachwa (E_t, Am_t) right now. '
'Aborting ...')
raise SystemExit
elif self.bc in ['IF', 'Infrastructure']:
self.FE.FS.add_inner_dirichlet_bc(self.FE.MP.logger)
xyz = self.FE.FS.M.tabulate_dof_coordinates().reshape(-1, 3)
bvals = self.FE.FS.bc.get_boundary_values()
lp(self.FE.MP.logger, 10,
' - detected coordinates of inner boundary dof - '
+ str(len(xyz[list(bvals.keys())])),
pre_dash=False, root_only=False)
lp(self.FE.MP.logger, 20,
'... applying inner Dirichlet boundary '
'conditions for infrastructure ...', pre_dash=False)
else:
lp(self.FE.MP.logger, 50,
'Error! Choose either zero or inhomogeneous or infrastruct'
'ure Dirichlet boundary conditions (bc="ZD", "ID", or "IF"'
' if not Neumann conditions by default. Aborting ...')
raise SystemExit
df.assemble_system(self.FE.L[fi], self.FE.R[0], self.FE.FS.bc,
A_tensor=self.A, b_tensor=bb)
if 'Am' in self.FE.MP.approach:
for ti in range(self.FE.n_tx):
bb = df.Function(self.FE.FS.M)
bb.vector()[self.source_dof[ti]] = (
self.sign[ti] * (1./self.FE.MP.omegas[fi]) *
self.FE.MP.currents[fi][ti] / np.float(self.FE.FS.p))
bb = self.find_and_fill_start_stop_dof(
bb, self.FE.MP.currents[fi, ti], ti, fi)
bb.vector().apply('insert')
self.b.append(bb)
elif 'E' in self.FE.MP.approach and not self.td_dc:
for ti in range(self.FE.n_tx):
bb = df.Function(self.FE.FS.M)
bb.vector()[self.source_dof[ti]] = (
-self.sign[ti] * self.FE.MP.omegas[fi] *
self.FE.MP.currents[fi][ti] / np.float(self.FE.FS.p))
bb.vector().apply('insert')
self.b.append(bb)
elif 'H' in self.FE.MP.approach:
for ti in range(self.FE.n_tx):
vec = df.Function(self.FE.FS.M)
vec.vector()[self.source_dof[ti]] = (
self.sign[ti] * #(1./self.lengths[ti]) *
self.FE.MP.currents[fi][ti] / np.float(-self.FE.FS.p))
vec2 = df.interpolate(vec.sub(1), self.FE.FS.V_cg)
phi_r, phi_i = df.TestFunctions(self.FE.FS.M)
self.ll = df.Function(self.FE.FS.M)
self.ll.vector()[self.source_dof[ti]] = 1./self.lengths[ti]
bb = df.assemble(
df.inner(vec.sub(1),
df.curl(self.FE.inv_s_func * phi_r)) *
self.FE.FS.DOM.dx_0 +
df.inner(vec.sub(0),
phi_i) * self.FE.FS.DOM.dx_0, tensor=bb)
self.b.append(bb)
elif 'DC' in self.FE.MP.approach or self.td_dc:
for ti in range(self.FE.n_tx):
bb = df.Function(self.FE.FS.S)
# hack to avoid crash with nested runs
if type(self.FE.MP.currents[ti]) is not list:
bb = self.find_and_fill_start_stop_dof(
bb, self.FE.MP.currents[ti], ti)
else:
bb = self.find_and_fill_start_stop_dof(
bb, self.FE.MP.currents[0][ti], ti)
bb.vector().apply('insert')
self.b.append(bb)
self.FE.assembly_time += time.time() - t0
lp(self.FE.MP.logger, 20,
' - assembly time (s) --> ' + str(time.time() - t0) +
' - ', pre_dash=False)
if not self.td_dc:
if (self.FE.MP.system_it==False):
self.FE.system_size = str(self.A.mat().getSize()[0])
if (self.FE.MP.system_it==True):
self.FE.system_size = str(self.Hh.mat().getSize()[0])
if fi == 0 and not self.td_dc:
lp(self.FE.MP.logger, 20,
' - system matrix size --> ' + self.FE.system_size +
' - ', pre_dash=False)
[docs]
def find_dof_coords(self):
"""
Utility function searching for all Nedelec dof on edges and the
corresponding coordinates which belong to the transmitter.
"""
if self.FE.s_type == 'hed':
self.find_for_hed()
if self.FE.s_type == 'vmd':
if 'H' not in self.FE.MP.approach:
lp(self.FE.MP.logger, 50,
'Error! VMD source only supported for H_vector '
'approach right now, but even not working for the latter. '
'Aborting ...')
raise SystemExit
self.find_for_vmd()
elif self.FE.s_type == 'line':
self.find_for_line()
elif self.FE.s_type == 'loop_r':
self.find_for_loop_r()
elif self.FE.s_type == 'loop_c':
lp(self.FE.MP.logger, 50,
'Error! RHS assembly needs to be implemented for loop_c '
'Aborting ...')
raise SystemExit
elif self.FE.s_type == 'path':
self.find_for_path()
[docs]
def find_cells(self):
"""
For each dof which belongs to the source, search for an element
which contains this dof.
"""
all_cells = [cell for cell in df.cells(self.FE.FS.mesh)]
self.cell_list = []
bbt = self.FE.FS.mesh.bounding_box_tree()
for ti in range(self.FE.n_tx):
cell_list, tmp = [], []
for coord in self.source_dof_coords[ti]:
p = df.Point(coord[0], coord[1], coord[2])
try:
cell_list.append(
all_cells[bbt.compute_first_entity_collision(p)])
except IndexError:
# this is an mpi distribution problem with ghost cells
# try to save simulation with deprecated technique
if len(tmp) == 0:
midpoints = np.array([cell.midpoint().array() for
cell in all_cells])
try:
tmp.append(np.argmin(np.linalg.norm(
coord.reshape(1, 3) - midpoints, axis=1)))
if len(tmp) > 0:
cell_list.append(all_cells[tmp[-1]])
except:
lp(self.FE.MP.logger, 30,
'Warning! Could not find any cell for this source '
'dof. This rare issue is related\nto the mesh '
'distribution by the parallel MPI processes.\n'
'Re-running with a slightly different numbers of '
'MPI processes should resolve\nthis rare error. '
'Continuing until next exception ...',
barrier=False, root_only=False)
self.cell_list.append(cell_list)
self.ac = all_cells
[docs]
def find_directions(self):
"""
Evaluate the local direction of the Nedelec dof in the element,
which contains it and switch the sign to match the global direction.
"""
self.sign = []
if '_s' in self.FE.MP.approach:
self.lengths = []
self.orientations = []
self.midpoints = []
for ti in range(self.FE.n_tx):
if '_s' in self.FE.MP.approach:
lengths, midpoints, orientations = [], [], []
nn, sign = [], []
for jj, cell in enumerate(self.cell_list[ti]):
coord = self.source_dof_coords[ti][jj].reshape(1, 3)
eids = np.array([edge.entities(0) for edge in df.edges(cell)])
dists = np.linalg.norm(self.mesh_coords[eids[:, 0]] -
coord, axis=1) + \
np.linalg.norm(self.mesh_coords[eids[:, 1]] -
coord, axis=1)
idx = np.argwhere(dists - np.linalg.norm(
self.mesh_coords[eids[:, 0]] -
self.mesh_coords[eids[:, 1]],
axis=1) < self.a_tol)
n = cell.entities(0)
try:
nn.append([n[0], n[1], n[2], n[3],
eids[idx[0][0]][0], eids[idx[0][0]][1]])
except IndexError:
print(dists)
lp(self.FE.MP.logger, 50,
'Error! Identified most probably a wrong source dof\n'
'which could be caused by:\n'
' - an unsuited search tolerance *a_tol*, '
'suggestion: reduce to, e.g., *1e-6*\n'
' - an artifact in the '
'mesh, e.g., unwanted rx/tx intersection)\n'
' - an unsuited number of mpi-processes\n'
'Aborting ...',
root_only=False, barrier=False)
#raise SystemExit
for j in range(len(nn)):
if self.mesh_coords[nn[j][5]][
self.component_switches[ti][j]] - \
self.mesh_coords[nn[j][4]][
self.component_switches[ti][j]] < \
0.0 and self.target_directions[ti][j] < 0.0:
sign.append(-1.)
elif (self.mesh_coords[nn[j][5]][
self.component_switches[ti][j]] -
self.mesh_coords[nn[j][4]][
self.component_switches[ti][j]] >
0.0 and self.target_directions[ti][j] > 0.0):
sign.append(-1.)
else:
sign.append(1.)
try:
for local_node in range(4):
if nn[j][4] == nn[j][local_node]:
tmp1 = local_node
elif nn[j][5] == nn[j][local_node]:
tmp2 = local_node
if tmp2 < tmp1:
sign[j] *= -1.
except Exception as e:
lp(self.FE.MP.logger, 30, e, pre_dash=False, barrier=False)
lp(self.FE.MP.logger, 30,
'Warning! dof with potentially wrong sign detected.\n'
'Please check if the fields look consistent! '
'Continuing ...', pre_dash=False, root_only=False,
barrier=False)
if '_s' in self.FE.MP.approach:
lengths.append(np.linalg.norm(self.mesh_coords[nn[j][4]] -
self.mesh_coords[nn[j][5]]))
midpoints.append(np.mean(
np.array([self.mesh_coords[nn[j][4]],
self.mesh_coords[nn[j][5]]]), axis=0))
tmp = np.arctan2(
(self.mesh_coords[nn[j][4]][1] -
self.mesh_coords[nn[j][5]][1]),
(self.mesh_coords[nn[j][4]][0] -
self.mesh_coords[nn[j][5]][0]))
if sign[j] < 0.:
tmp += np.pi
orientations.append(tmp)
n_local_source_dof = MPI.COMM_WORLD.gather(len(sign),
root=0)
n_global_sd = MPI.COMM_WORLD.bcast(np.sum(n_local_source_dof),
root=0)
if n_global_sd != 0:
if not self.td_dc and ti == 0:
lp(self.FE.MP.logger, 20,
' - ' + str(n_global_sd) + ' dof located for first '
'transmitter - ', pre_dash=False)
if ti == 1:
lp(self.FE.MP.logger, 20,
'... locating dof for further transmitters ...',
pre_dash=False)
else:
lp(self.FE.MP.logger, 50,
'Error! Not a single source dof could be found '
'for Tx number ' + str(ti) + '.\n'
'This might be caused by a wrong *s_type* or '
'source geometry definition. '
'Aborting ...')
raise SystemExit
self.sign.append(np.array(sign))
if '_s' in self.FE.MP.approach:
tmp_l = MPI.COMM_WORLD.gather(lengths, root=0)
all_lengths = MPI.COMM_WORLD.bcast(np.sum(tmp_l), root=0)
self.lengths.append(np.array(all_lengths))
tmp_o = MPI.COMM_WORLD.gather(orientations, root=0)
all_orientations = MPI.COMM_WORLD.bcast(np.sum(tmp_o), root=0)
self.orientations.append(np.array(all_orientations))
tmp_m = MPI.COMM_WORLD.gather(midpoints, root=0)
all_midpoints = MPI.COMM_WORLD.bcast(np.sum(tmp_m), root=0)
self.midpoints.append(np.array(all_midpoints))
[docs]
def find_and_fill_start_stop_dof(self, bb, current, ti=0, fi=0):
"""
Required for the A-V approach on mixed elements and the DC approach
to set divergence term on the start and stop point of grounded
wire sources.
Required arguements
-------------------
- bb, dolfin function
assembled right-hand side function
- current, type float,
source current for the corresponsing tx
Keyword arguments
-----------------
- ti = 0, type int
index of the corresponding tx
"""
if not self.FE.grounding[ti]:
return(bb)
if self.FE.s_type == 'hed':
lp(self.FE.MP.logger, 50,
'Error, start and stop dof evaluation for total potential'
' approaches not implemented for HED sources.\n'
'Use a "Line" source with start and stop values instead. '
'Aborting ...')
raise SystemExit
s1 = np.empty(0).astype('int')
s2 = np.empty(0).astype('int')
if self.FE.s_type in ['line', 'hed']:
for j in range(len(self.xyz)):
if df.near(self.xyz[j, 0], self.FE.start[0], self.a_tol) and\
df.near(self.xyz[j, 1], self.FE.start[1], self.a_tol) and\
df.near(self.xyz[j, 2], self.FE.start[2], self.a_tol):
if j in self.nodal_dof_re:
s1 = np.array([j]).astype('int')
elif (df.near(self.xyz[j, 0], self.FE.stop[0], self.a_tol)and
df.near(self.xyz[j, 1], self.FE.stop[1], self.a_tol)and
df.near(self.xyz[j, 2], self.FE.stop[2], self.a_tol)):
if j in self.nodal_dof_re:
s2 = np.array([j]).astype('int')
elif self.FE.s_type == 'path':
if type(self.FE.tx[ti]) is list:
tx = np.array(self.FE.tx[ti])
else:
tx = self.FE.tx[ti]
for j in range(len(self.xyz)):
if df.near(self.xyz[j, 0], tx[0, 0], self.a_tol) and \
df.near(self.xyz[j, 1], tx[0, 1], self.a_tol) and \
df.near(self.xyz[j, 2], tx[0, 2], self.a_tol):
if j in self.nodal_dof_re:
s1 = np.array([j]).astype('int')
elif (df.near(self.xyz[j, 0], tx[-1, 0], self.a_tol) and
df.near(self.xyz[j, 1], tx[-1, 1], self.a_tol) and
df.near(self.xyz[j, 2], tx[-1, 2], self.a_tol)):
if j in self.nodal_dof_re:
s2 = np.array([j]).astype('int')
if 'Am' in self.FE.MP.approach:
bb.vector()[s1] = current * (1./self.FE.MP.omegas[fi])
bb.vector()[s2] = -current * (1./self.FE.MP.omegas[fi])
elif 'An' in self.FE.MP.approach:
bb[s1] = current * (1./self.FE.MP.omegas[fi])
bb[s2] = -current * (1./self.FE.MP.omegas[fi])
elif 'DC' in self.FE.MP.approach or self.td_dc:
bb.vector()[s1] = current
bb.vector()[s2] = -current
return(bb)
[docs]
def find_for_hed(self):
"""
Find source dof and coordinates for HED source.
"""
lp(self.FE.MP.logger, 20,
' - using HED source - ', pre_dash=False)
if 'Am' in self.FE.MP.approach:
lp(self.FE.MP.logger, 50,
'Error! HED source for Am approach on mixed elements not '
'implemented yet! You may choose a very short line source '
'instead. Note in that case, the line must start and end '
'at nodes!')
raise SystemExit
self.FE.start[:] = self.FE.origin[:]
self.FE.stop[:] = self.FE.origin[:]
if df.near(self.FE.azimuth, 0.):
self.FE.start[0] = self.FE.origin[0] - self.FE.length/2.
self.FE.stop[0] = self.FE.origin[0] + self.FE.length/2.
elif df.near(self.FE.azimuth, 180.):
self.FE.start[0] = self.FE.origin[0] + self.FE.length/2.
self.FE.stop[0] = self.FE.origin[0] - self.FE.length/2.
elif df.near(self.FE.azimuth, 90.):
self.FE.start[1] = self.FE.origin[1] - self.FE.length/2.
self.FE.stop[1] = self.FE.origin[1] + self.FE.length/2.
elif df.near(self.FE.azimuth, 270.):
self.FE.start[1] = self.FE.origin[0] + self.FE.length/2.
self.FE.stop[1] = self.FE.origin[0] - self.FE.length/2.
else:
lp(self.FE.MP.logger, 50,
'Error! arbitrary directected HED for total field '
'approach not implemented.\nPlease choose azimuth '
'values of *0*, *90*, *180* or *270* and consider using '
'the automated Tx handling\nfor grounded or ungrounded sources '
'described as *path(s)*. Aborting ...')
raise SystemExit
self.find_for_line()
[docs]
def find_for_vmd(self):
"""
Find source dof and coordinates for VMD source.
"""
lp(self.FE.MP.logger, 50,
'Error! Manual assembly for total-field VMD sources not supported.'
'\nOnly dummy function for testing. Aborting ...')
raise SystemExit
[docs]
def find_for_line(self):
"""
Find source dof and coordinates for "line" source.
"""
if self.FE.start[0] < self.FE.stop[0]:
x0, x1 = self.FE.start[0], self.FE.stop[0]
else:
x1, x0 = self.FE.start[0], self.FE.stop[0]
if self.FE.start[1] < self.FE.stop[1]:
y0, y1 = self.FE.start[1], self.FE.stop[1]
else:
y1, y0 = self.FE.start[1], self.FE.stop[1]
if self.FE.start[1] == self.FE.stop[1] and \
self.FE.start[2] == self.FE.stop[2] and \
str(self.FE.MP.topo) == 'None':
lp(self.FE.MP.logger, 20,
' - using straight x-directed *line* source - ',
pre_dash=False)
for j in range(len(self.xyz)):
if df.near(self.xyz[j, 1],
self.FE.start[1], self.a_tol) and \
df.near(self.xyz[j, 2],
self.FE.start[2], self.a_tol) and \
self.xyz[j, 0] > x0 and self.xyz[j, 0] < x1:
self.append_lists(j)
elif (self.FE.start[0] == self.FE.stop[0] and
self.FE.start[2] == self.FE.stop[2] and
str(self.FE.MP.topo) == 'None'):
lp(self.FE.MP.logger, 20,
' - using straight y-directed *line* source - ',
pre_dash=False)
for j in range(len(self.xyz)):
if df.near(self.xyz[j, 0],
self.FE.start[0], self.a_tol) and \
df.near(self.xyz[j, 2],
self.FE.start[2], self.a_tol) and \
self.xyz[j, 1] > y0 and self.xyz[j, 1] < y1:
self.append_lists(j, comp=1)
elif self.FE.start[1] == self.FE.stop[1]:
lp(self.FE.MP.logger, 20,
' - using x-directed *line* source with topography - ',
pre_dash=False)
for j in range(len(self.xyz)):
if df.near(self.xyz[j, 1],
self.FE.start[1], self.a_tol) and \
self.xyz[j, 0] > x0 and self.xyz[j, 0] < x1:
self.append_lists(j)
elif self.FE.start[0] == self.FE.stop[0]:
lp(self.FE.MP.logger, 20,
' - using y-directed *line* source with topography - ',
pre_dash=False)
for j in range(len(self.xyz)):
if df.near(self.xyz[j, 0],
self.FE.start[0], self.a_tol) and \
self.xyz[j, 1] > y0 and self.xyz[j, 1] < y1:
self.append_lists(j, comp=1)
else:
lp(self.FE.MP.logger, 50,
'Error! Using *line* sources with arbitrary direction is not '
'implemented.\n Choose arbtrary *path* sources instead. '
'Aborting ...')
raise SystemExit
self.source_dof.append(np.array(self.tmp_sd))
self.source_dof_coords.append(np.array(self.tmp_sdc))
self.target_directions.append(np.array(self.tmp_td))
self.component_switches.append(np.array(self.tmp_cs))
[docs]
def find_for_loop_r(self):
"""
Find source dof and coordinates for "loop_r" source.
"""
if self.FE.start[0] < self.FE.stop[0]:
x0, x1 = self.FE.start[0], self.FE.stop[0]
y0, y1 = self.FE.start[1], self.FE.stop[1]
else:
x1, x0 = self.FE.start[0], self.FE.stop[0]
y1, y0 = self.FE.start[1], self.FE.stop[1]
z = self.FE.start[2]
if df.near(self.FE.start[0], self.FE.stop[0], self.a_tol) or \
df.near(self.FE.start[1], self.FE.stop[1], self.a_tol):
lp(self.FE.MP.logger, 50,
'Error! Given start and stop coordinates do not '
'match to a rectangular loop source: x or y start or '
'stop coordinates are identical. Aborting ...')
raise SystemExit
if str(self.FE.MP.topo) == 'None':
lp(self.FE.MP.logger, 20,
' - using rectangular *loop* source with constant height'
' - ', pre_dash=False)
for j in range(len(self.xyz)):
if df.near(self.xyz[j, 1], y0, self.a_tol) and \
df.near(self.xyz[j, 2], z, self.a_tol) and \
self.xyz[j, 0] > x0 and self.xyz[j, 0] < x1:
self.append_lists(j)
elif (df.near(self.xyz[j, 0], x1, self.a_tol) and
df.near(self.xyz[j, 2], z, self.a_tol) and
self.xyz[j, 1] > y0 and self.xyz[j, 1] < y1):
self.append_lists(j, comp=1)
elif (df.near(self.xyz[j, 1], y1, self.a_tol) and
df.near(self.xyz[j, 2], z, self.a_tol) and
self.xyz[j, 0] > x0 and self.xyz[j, 0] < x1):
self.append_lists(j, tdir=-1.)
elif (df.near(self.xyz[j, 0], x0, self.a_tol) and
df.near(self.xyz[j, 2], z, self.a_tol) and
self.xyz[j, 1] > y0 and self.xyz[j, 1] < y1):
self.append_lists(j, tdir=-1., comp=1)
else:
lp(self.FE.MP.logger, 20,
' - using rectangular *loop* source with topography - ',
pre_dash=False)
for j in range(len(self.xyz)):
if df.near(self.xyz[j, 1], y0, self.a_tol) and \
self.xyz[j, 0] > x0 and self.xyz[j, 0] < x1:
self.append_lists(j)
elif (df.near(self.xyz[j, 0], x1, self.a_tol) and
self.xyz[j, 1] > y0 and self.xyz[j, 1] < y1):
self.append_lists(j, comp=1)
elif (df.near(self.xyz[j, 1], y1, self.a_tol) and
self.xyz[j, 0] > x0 and self.xyz[j, 0] < x1):
self.append_lists(j, tdir=-1.)
elif (df.near(self.xyz[j, 0], x0, self.a_tol) and
self.xyz[j, 1] > y0 and self.xyz[j, 1] < y1):
self.append_lists(j, tdir=-1., comp=1)
self.source_dof.append(np.array(self.tmp_sd))
self.source_dof_coords.append(np.array(self.tmp_sdc))
self.target_directions.append(np.array(self.tmp_td))
self.component_switches.append(np.array(self.tmp_cs))
[docs]
def find_for_path(self):
"""
Find source dof and coordinates for arbitrary grounded or ungrounded
transmitters described by a path of coordinates.
"""
if type(self.FE.grounding) is not list:
self.FE.grounding = [self.FE.grounding]
for ti, tx in enumerate(self.FE.tx):
if type(tx) is list:
tx = np.array(tx)
if self.FE.grounding[ti]:
if ti == 0:
lp(self.FE.MP.logger, 20,
' - using arbitrary *line* source from path - ',
pre_dash=False)
else:
if ti == 0:
lp(self.FE.MP.logger, 20,
' - using arbitrary *loop* source from path - ',
pre_dash=False)
tx = np.vstack((tx, tx[0, :]))
if ti == 1:
lp(self.FE.MP.logger, 20,
'... building ' + str(self.FE.n_tx - 1) + ' further '
'path transmitters ...',
pre_dash=False)
if str(self.FE.MP.topo) != 'None':
tx = self.add_topo(tx, self.FE.tx_topo)
for j in range(len(tx) - 1):
dists = np.linalg.norm(tx[j].reshape(1, 3) -
self.xyz, axis=1) + \
np.linalg.norm(tx[j+1].reshape(1, 3) -
self.xyz, axis=1)
tmp = np.argwhere(dists - np.linalg.norm(
tx[j] - tx[j + 1]) < self.a_tol)
tdir, comp = self.eval_target_dir(tx[j], tx[j + 1])
if len(tmp) > 0:
for idx in tmp.flatten():
self.append_lists(idx, tdir=tdir, comp=comp)
self.source_dof.append(np.array(self.tmp_sd))
self.source_dof_coords.append(np.array(self.tmp_sdc))
self.target_directions.append(np.array(self.tmp_td))
self.component_switches.append(np.array(self.tmp_cs))
self.tmp_sd = []
self.tmp_sdc = []
self.tmp_td = []
self.tmp_cs = []
[docs]
def append_lists(self, idx, tdir=1., comp=0):
"""
Utility function appending dof, coordinates, directions and component
switches to lists during the dof-coordinate search.
Required arguements
-------------------
- idx, type list of int
integers specifying the transmitter dof
Keyword arguments
-----------------
- tdir, type float
either positive or negative *1*
- comp = 0, type int
either 0 or 1 for x or y component
"""
if 'E' in self.FE.MP.approach or 'H' in self.FE.MP.approach:
self.tmp_sd.append(self.ned_dof_im[idx])
self.tmp_sdc.append(self.xyz[idx, :])
self.tmp_td.append(tdir)
self.tmp_cs.append(comp)
elif 'Am' in self.FE.MP.approach:
if idx in self.ned_dof_re:
self.tmp_sd.append(self.all_dof[idx])
self.tmp_sdc.append(self.xyz[idx, :])
self.tmp_td.append(tdir)
self.tmp_cs.append(comp)
[docs]
def eval_target_dir(self, a, b):
"""
Evaluate global direction of each transmitter segment.
Required arguements
-------------------
- a, b, type array/list of len 3
start and stop coordinates to find the global positive or negative
target direction, counter-clockwise in a right-handed coordinate
system
"""
if np.isclose(np.abs(a[0] - b[0]), np.abs(a[1] - b[1]),
atol=self.a_tol):
if a[2] < b[2]:
return(1., 2)
else:
return(-1., 2)
elif np.abs(a[0] - b[0]) > np.abs(a[1] - b[1]):
if a[0] < b[0]:
return(1., 0)
else:
return(-1., 0)
elif np.abs(a[0] - b[0]) < np.abs(a[1] - b[1]):
if a[1] < b[1]:
return(1., 1)
else:
return(-1., 1)
[docs]
def test_An_total(self, fi):
"""
Experimental total A-Phi nodal approach, use with caution and contact
the authors for advise.
"""
if self.FE.n_tx > 1:
lp(self.FE.MP.logger, 50,
'Error! Multiple right-hand sides not supported for\n'
'experimental A-V nodal approach. Aborting ...')
raise SystemExit
t0 = time.time()
A, b = df.PETScMatrix(), df.PETScVector()
self.FE.FS.add_dirichlet_bc()
lp(self.FE.MP.logger, 20,
'... applying zero Dirichlet boundary '
'conditions ...', pre_dash=False)
self.A, bb = df.assemble_system(
self.FE.L[0], self.FE.R[0], self.FE.FS.bc,
A_tensor=A, b_tensor=b)
self.FE.assembly_time += time.time() - t0
if 'hed' in self.FE.s_type or 'line' in self.FE.s_type:
dof = []
if self.FE.start[1] == self.FE.stop[1] and \
self.FE.start[2] == self.FE.stop[2] and \
str(self.FE.MP.topo) == 'None':
lp(self.FE.MP.logger, 20,
' - using straight x-directed *line* source - ',
pre_dash=False)
for j in range(len(self.xyz)):
if df.near(self.xyz[j, 1],
self.FE.start[1], self.a_tol) and \
df.near(self.xyz[j, 2],
self.FE.start[2], self.a_tol) and \
self.xyz[j, 0] > self.FE.start[0] - self.a_tol and \
self.xyz[j, 0] < self.FE.stop[0] + self.a_tol:
if j in self.x_dof:
dof.append(j)
tx_len = np.abs(self.FE.start[0] - self.FE.stop[0])
elif (self.FE.start[0] == self.FE.stop[0] and
self.FE.start[2] == self.FE.stop[2] and
str(self.FE.MP.topo) == 'None'):
lp(self.FE.MP.logger, 20,
' - using straight y-directed *line* source - ',
pre_dash=False)
for j in range(len(self.xyz)):
if df.near(self.xyz[j, 0],
self.FE.start[0], self.a_tol) and \
df.near(self.xyz[j, 2],
self.FE.start[2], self.a_tol) and \
self.xyz[j, 1] > self.FE.start[1] - self.a_tol and \
self.xyz[j, 1] < self.FE.stop[1] + self.a_tol:
if j in self.y_dof:
dof.append(j)
tx_len = np.abs(self.FE.start[1] - self.FE.stop[1])
else:
lp(self.FE.MP.logger, 50, 'Start', self.FE.start)
lp(self.FE.MP.logger, 50, 'Stop', self.FE.stop)
lp(self.FE.MP.logger, 50, 'Topo', self.FE.MP.topo)
lp(self.FE.MP.logger, 50,
'Currently, straight lines in x- or y- direction '
'required. Aborting ...')
raise SystemExit
n_local_source_dof = MPI.COMM_WORLD.gather(len(dof), root=0)
n_global_sd = MPI.COMM_WORLD.bcast(np.sum(n_local_source_dof),
root=0)
if n_global_sd != 0:
lp(self.FE.MP.logger, 20,
' - ' + str(n_global_sd) + ' dof located for first '
'transmitter - ', pre_dash=False)
else:
lp(self.FE.MP.logger, 50,
'Fatal Error! Not a single source dof could be found '
'for Tx number ' + str(0) + '.\n'
'This is most probably caused by a wrong *s_type* or '
'source geometry definition. Aborting')
raise SystemExit
for ti in range(self.FE.n_tx):
bb[dof] = -tx_len * (1./(n_global_sd)) * \
(1./self.FE.MP.omegas[fi]) * self.FE.MP.current
bb = self.find_and_fill_start_stop_dof(bb, self.FE.MP.current)
bb.apply('insert')
self.b.append(bb)
self.FE.system_size = str(A.mat().getSize()[0])
lp(self.FE.MP.logger, 20,
' - system matrix size --> ' + self.FE.system_size +
' - ', pre_dash=False)
elif 'loop_r' in self.FE.s_type:
dof = [[], [], [], []]
if self.FE.start[0] < self.FE.stop[0]:
x0, x1 = self.FE.start[0], self.FE.stop[0]
y0, y1 = self.FE.start[1], self.FE.stop[1]
else:
x1, x0 = self.FE.start[0], self.FE.stop[0]
y1, y0 = self.FE.start[1], self.FE.stop[1]
z = self.FE.start[2]
for j in range(len(self.xyz)):
if df.near(self.xyz[j, 1], y0, self.a_tol) and \
df.near(self.xyz[j, 2], z, self.a_tol) and \
self.xyz[j, 0] > x0 - self.a_tol and \
self.xyz[j, 0] < x1 + self.a_tol:
if j in self.x_dof:
dof[0].append(j)
elif (df.near(self.xyz[j, 0], x1, self.a_tol) and
df.near(self.xyz[j, 2], z, self.a_tol) and
self.xyz[j, 1] > y0 - self.a_tol and
self.xyz[j, 1] < y1 + self.a_tol):
if j in self.y_dof:
dof[1].append(j)
elif (df.near(self.xyz[j, 1], y1, self.a_tol) and
df.near(self.xyz[j, 2], z, self.a_tol) and
self.xyz[j, 0] > x0 - self.a_tol and
self.xyz[j, 0] < x1 + self.a_tol):
if j in self.x_dof:
dof[2].append(j)
elif (df.near(self.xyz[j, 0], x0, self.a_tol) and
df.near(self.xyz[j, 2], z, self.a_tol) and
self.xyz[j, 1] > y0 - self.a_tol and
self.xyz[j, 1] < y1 + self.a_tol):
if j in self.y_dof:
dof[3].append(j)
tx_len = (2. * (np.abs(self.FE.start[1] - self.FE.stop[1])) +
2. * (np.abs(self.FE.start[0] - self.FE.stop[0])))
all_dof = []
for side in range(4):
n_local_source_dof = MPI.COMM_WORLD.gather(len(dof[side]),
root=0)
all_dof.append(MPI.COMM_WORLD.bcast(
np.sum(n_local_source_dof), root=0))
n_global_sd = sum(all_dof)
if n_global_sd != 0:
lp(self.FE.MP.logger, 20,
' - ' + str(n_global_sd) + ' dof located for first '
'transmitter - ', pre_dash=False)
else:
lp(self.FE.MP.logger, 50,
'Fatal Error! Not a single source dof could be found '
'for Tx number ' + str(0) + '.\n'
'This is most probably caused by a wrong *s_type* or '
'source geometry definition. Aborting')
raise SystemExit
for ti in range(self.FE.n_tx):
for side in range(2):
bb[dof[side]] = -tx_len * (1./(n_global_sd)) *\
(1./self.FE.MP.omegas[fi]) * self.FE.MP.current
for side in range(2, 4):
bb[dof[side]] = tx_len * (1./(n_global_sd)) *\
(1./self.FE.MP.omegas[fi]) * self.FE.MP.current
bb.apply('insert')
self.b.append(bb)
self.FE.system_size = str(A.mat().getSize()[0])
lp(self.FE.MP.logger, 20,
' - system matrix size --> ' + self.FE.system_size +
' - ', pre_dash=False)
else:
lp(self.FE.MP.logger, 50,
'Error! Total A-phi nodal approach only test-wise implemented '
'for *line* or *loop_r* sources for now. Aborting ...')
raise SystemExit
[docs]
def add_topo(self, tx, tx_topo=None):
"""
Apply topography information to the z-values of the transmitter
coordinates.
Required arguements
-------------------
- tx, type list of coordinates
list of transmitter coordinates
Keyword arguments
-----------------
- tx_topo = None, type str or function
user might specify a custom topography reference here,
not recommended, only implemented for tests
"""
if tx_topo is None:
topo = '_surf'
else:
topo = '_' + tx_topo + '_triangulated'
from custEM.meshgen.dem_interpolator import DEM
dem = DEM(self.FE.MP.para_dir + '/' +
self.FE.MP.mesh_name + topo + '.xyz',
centering=False,
easting_shift=None, northing_shift=None)
tx[:, 2] = dem(tx[:, 0], tx[:, 1], rotation=None)
return(tx)
[docs]
class SecondaryFieldAssembler:
"""
Class that assembles the system matrix and the right-hand-sides for
secondary-field formulations based on primary fields and delta_sigma.
Methods
-------
- assemble()
assembly function of left- and right-hand sides for secondary-field
formulations
"""
def __init__(self, FE, bc):
"""
Assembly of system matrix and right-hand side vector.
Required arguments
------------------
- FE, type class
FE-approach (e.g., **E_vector**) instance
- bc, None or type dolfin DirichletBC
see **MOD** class function: **solve_main_problem()**
"""
self.FE = FE
self.bc = bc
[docs]
def assemble(self, fi=0):
"""
Let dolfin assemble the system matrix and right-hand side vectors.
Keyword arguments
-----------------
- fi = 0, type int
iteration index over frequencies for specifying the left-hand-side
"""
lp(self.FE.MP.logger, 20,
'Source vector and system matrix assembly:')
t0 = time.time()
lp(self.FE.MP.logger, 20,
'... assembling system matrix ...', pre_dash=False)
self.b = [df.PETScVector() for n in range(self.FE.n_tx)]
if (self.FE.MP.system_it==False):
self.A= df.PETScMatrix()
if (self.FE.MP.system_it==True):
self.Asys= df.PETScMatrix()
self.Hh = df.PETScMatrix()
self.Adiag = df.PETScMatrix()
self.Aoffdiag= df.PETScMatrix()
if fi != 0:
self.FE.FS.PF.load_primary_fields(fi=fi)
self.FE.rhs_form_secondary(fi)
for ti in range(self.FE.n_tx):
if ti == 0:
if self.bc is None:
if (self.FE.MP.system_it==False):
df.assemble_system(self.FE.L[fi], self.FE.R[0],
A_tensor=self.A, b_tensor=self.b[ti])
if (self.FE.MP.system_it==True):
df.assemble_system(self.FE.L[fi], self.FE.R[0],
A_tensor=self.Asys, b_tensor=self.b[ti])
df.assemble(self.FE.H[fi],tensor=self.Hh)
df.assemble(self.FE.A[fi],tensor=self.Adiag)
df.assemble(self.FE.B[fi],tensor=self.Aoffdiag)
elif self.bc in ['ZeroDirichlet', 'ZD']:
self.FE.FS.add_dirichlet_bc()
lp(self.FE.MP.logger, 20,
'... applying zero Dirichlet boundary '
'conditions ...', pre_dash=False)
df.assemble_system(
self.FE.L[fi], self.FE.R[ti], self.FE.FS.bc,
A_tensor=self.A, b_tensor=self.b[ti])
else:
lp(self.FE.MP.logger, 50,
'Error, Choose either None (default) or "ZD" or '
'"ZeroDirichlet" as boundary condition for secondary '
'field formulations. Aborting ...')
raise SystemExit
else:
df.assemble(self.FE.R[ti], tensor=self.b[ti])
if self.bc in ['ZeroDirichlet', 'ZD']:
self.FE.FS.bc.apply(self.b[ti])
self.FE.assembly_time += time.time() - t0
lp(self.FE.MP.logger, 20,
' - assembly time (s) --> ' + str(time.time() - t0) +
' - ', pre_dash=False)
if fi == 0:
if (self.FE.MP.system_it==False):
self.FE.system_size = str(self.A.mat().getSize()[0])
if (self.FE.MP.system_it==True):
self.FE.system_size = str(self.Hh.mat().getSize()[0])
lp(self.FE.MP.logger, 20,
' - system matrix size --> ' + self.FE.system_size +
' - ', pre_dash=False)
[docs]
class MTAssembler:
"""
Class that assembles the system matrix and the right-hand-sides
(two source polarizations implemented as boundary conditions on the top
of the model domain) for natural-source modeling.
Methods
-------
- assemble()
assembly function of left- and right-hand sides for secondary-field
formulations
"""
def __init__(self, FE, bc):
"""
Assembly of system matrix and right-hand side vector.
Required arguments
------------------
- FE, type class
FE-approach (e.g., **E_vector**) instance
- bc, None or type dolfin DirichletBC
see **MOD** class function: **solve_main_problem()**
"""
self.FE = FE
self.bc = bc
[docs]
def assemble(self, fi=0):
lp(self.FE.MP.logger, 20,
'Source vector and system matrix assembly:')
t0 = time.time()
lp(self.FE.MP.logger, 20,
'... assembling system matrix ...', pre_dash=False)
zmax_local = np.max(self.FE.FS.mesh.coordinates()[:].reshape(
-1, 3)[:, 2])
tmp = self.FE.MP.mpi_cw.gather(zmax_local, root=0)
if self.FE.MP.mpi_cw.Get_rank() == 0:
max_tmp = np.max(np.array(tmp).flatten())
else:
max_tmp = None
zmax = self.FE.MP.mpi_cw.bcast(max_tmp, root=0)
def boundary_t(x, on_boundary):
return on_boundary and (df.near(x[2], zmax, self.FE.a_tol))
def other_boundary(x, on_boundary):
return on_boundary and (x[2] < zmax)
t0 = time.time()
pol_1 = df.Constant(("1.0", "0.0", "0.0", "1.0", "0.0", "0.0"))
pol_2 = df.Constant(("0.0", "1.0", "0.0", "0.0", "1.0", "0.0"))
zero_1 = df.Constant(("0.0", "0.0", "0.0", "0.0", "0.0", "0.0"))
zero_2 = df.Constant(("0.0", "0.0", "0.0", "0.0", "0.0", "0.0"))
if self.bc in ['ZD', 'ZeroDirichlet']:
bc_1 = [df.DirichletBC(self.FE.FS.M, pol_1, boundary_t),
df.DirichletBC(self.FE.FS.M, zero_1, other_boundary)]
bc_2 = [df.DirichletBC(self.FE.FS.M, pol_2, boundary_t),
df.DirichletBC(self.FE.FS.M, zero_2, other_boundary)]
else: # implicit boundary conditions on non-top boundary
bc_1 = df.DirichletBC(self.FE.FS.M, pol_1, boundary_t)
bc_2 = df.DirichletBC(self.FE.FS.M, pol_2, boundary_t)
bcs = [bc_1, bc_2]
self.b = [df.PETScVector(), df.PETScVector()]
if (self.FE.MP.system_it==False):
self.A= df.PETScMatrix()
if (self.FE.MP.system_it==True):
self.Asys= df.PETScMatrix()
self.Hh = df.PETScMatrix()
self.Adiag = df.PETScMatrix()
self.Aoffdiag= df.PETScMatrix()
if '_s' not in self.FE.MP.approach:
for ti in range(self.FE.n_tx): # forced to be 2
#if ti == 0:
if (self.FE.MP.system_it==False):
df.assemble_system(self.FE.L[fi], self.FE.R[0], bcs[ti],
A_tensor=self.A, b_tensor=self.b[ti])
if (self.FE.MP.system_it==True):
df.assemble_system(self.FE.L[fi], self.FE.R[0], bcs[ti],
A_tensor=self.Asys, b_tensor=self.b[ti])
df.assemble(self.FE.H[fi],tensor=self.Hh)
df.assemble(self.FE.A[fi],tensor=self.Adiag)
df.assemble(self.FE.B[fi],tensor=self.Aoffdiag)
#df.assemble_system(
# self.FE.L[fi], self.FE.R[0], bcs[ti],
# A_tensor=self.A, b_tensor=self.b[ti])
#else:
# df.assemble(self.FE.R[0], tensor=self.b[ti])
# bc_2.apply(self.b[ti])
else:
if fi != 0:
self.FE.FS.PF.load_primary_fields(fi=fi)
self.FE.rhs_form_secondary(fi)
for ti in range(self.FE.n_tx): # forced to be 2
if self.bc is None:
if (self.FE.MP.system_it==False):
df.assemble_system(self.FE.L[fi], self.FE.R[0],
A_tensor=self.A, b_tensor=self.b[ti])
if (self.FE.MP.system_it==True):
df.assemble_system(self.FE.L[fi], self.FE.R[0],
A_tensor=self.Asys, b_tensor=self.b[ti])
df.assemble(self.FE.H[fi],tensor=self.Hh)
df.assemble(self.FE.A[fi],tensor=self.Adiag)
df.assemble(self.FE.B[fi],tensor=self.Aoffdiag)
#df.assemble_system(self.FE.L[fi], self.FE.R[ti],
# A_tensor=self.A,
# b_tensor=self.b[ti])
elif self.bc in ['ZeroDirichlet', 'ZD']:
self.FE.FS.add_dirichlet_bc()
lp(self.FE.MP.logger, 20,
'... applying zero Dirichlet boundary '
'conditions ...', pre_dash=False)
df.assemble_system(
self.FE.L[fi], self.FE.R[ti], self.FE.FS.bc,
A_tensor=self.A, b_tensor=self.b[ti])
self.FE.assembly_time += time.time() - t0
lp(self.FE.MP.logger, 20,
' - assembly time (s) --> ' + str(time.time() - t0) +
' - ', pre_dash=False)
if fi == 0:
if (self.FE.MP.system_it==False):
self.FE.system_size = str(self.A.mat().getSize()[0])
if (self.FE.MP.system_it==True):
self.FE.system_size = str(self.Hh.mat().getSize()[0])
lp(self.FE.MP.logger, 20,
' - system matrix size --> ' + self.FE.system_size +
' - ', pre_dash=False)