Source code for custEM.fem.time_domain_approaches
# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""
import dolfin as df
import numpy as np
import scipy.linalg as la
from pathlib import Path
import time
from custEM.misc import logger_print as lp
from custEM.misc import read_h5
from custEM.core import MOD
from custEM.core import Solver
from custEM.fem import ApproachBaseTD
from custEM.fem import check_source
from custEM.fem import add_sigma_vals_to_list
from custEM.fem import TotalFieldAssembler
try:
from custEM.misc import DirichletBoundary
except Exception as e:
print(e)
print('Warning! DirichletBoundary not imported. Continuing ...')
[docs]
class ImplicitEuler(ApproachBaseTD):
"""
FE implementation of Implicit Euler time-stepping method, e.g.,
(Um, 2010). Users can use either a first- or second-order discretization
in time and, with cautioan, a formulation considering displacement
currents which needs some more tests.
Methods
-------
- time_stepping()
runs the complete time-stepping solution process as well as H-field
conversion and export methods, FE modeling and time stepping related
keyword arguments can be passed to this method.
- build_var_form()
set up variational formulation and assemble mass and curl-curl matrices
as well right-hand side (source) vector
- init_initial_condition()
initialize initial condition for time-stepping approach
- first_order_IE()
perform first-order implicit Euler time-stepping approach
- second_order_IE()
perform second-order implicit Euler time-stepping approach
- first_order_IE_full()
perform first-order implicit Euler time-stepping approach including
displacement currents
- init_times()
initalize internally used time vector for the time stepping
- gaussian()
define gaussian source pulse for impulse response modeling
- print_stepping_info()
print some information before starting the time-stepping process
IE approach specific parameters
-------------------------------
- source_type = 'djdt', type str
alternatively, use *'j'* to calculate B-fields instead of dB/dt
- source_func = None, type numpy array
specify custom source (transmitter) function over time; needs to
correspond to suitable *source_times*
- source_times = None, type numpy array
specify custom *source_times* if no default ramp-free source
functions are used
- stepping_times = None, type numpy array
specify custom calculation times for the time-stepping after the
source pulse
- be_order = 1, type int
order of discretization in time for backward Euler method,
alterntively set to *2*
- n_lin_steps = 30, type int
number of linear steps between each logarithmic time step,
each linear step requires only an inexpensive back-substitution
for solving the system of linear equations
- n_on = 101, type int
number of linear time steps for creating source signal, not
recommended to be changed
- t0_j = -10., type float
specify start time (negative) for specifying the duration of the
source signal ramp if the source type is *j*
- pulse_width = 1e-2, type float
relative width of dirac function, not recommended to be changed
- t_tol = 1e-2, type float
tolerance to check if a calculated time step is linear or not
(if not linear, a new matrix factorization will be computed)
"""
def __init__(self, FS, MP):
"""
Initialize FE class for *E_IE* approach. See description of
**ApproachBaseTD** class for further information about parameters of
the time-domain modeling approaches.
Required arguments
------------------
- FS, type class
FunctionSpaces instance
- MP, type class
ModelParameters instance
"""
# inherit default attributes and common methods from ApproachBaseTD
super(self.__class__, self).__init__()
self.FS = FS
self.MP = MP
self.dx_0 = self.FS.DOM.dx_0
# initialize trial and test functions
self.v = df.TestFunction(FS.V)
self.u = df.TrialFunction(FS.V)
# initialize IE approach specific parameters
self.source_type = 'djdt'
self.be_order = 1
self.n_lin_steps = 30
self.n_on = 101
self.t0_j = -10.
self.pulse_width = 1e-2
self.t_tol = 1e-2
self.source_func = None
self.source_times = None
self.stepping_times = None
# mute print of the following FE class attributes and attributes only
# relevant for the other time-domain modeling approaches
self.mute = ['FS', 'MP', 'dx_0', 'u', 'v', 'mute', 'omegas', 'path_td',
'fd_mod_name', 'parallel_interpolation', 'interp_mesh',
'n_mult', 'n_poles', 'pole_shift', 'dc_field',
'frequencies', 'fd_approach']
[docs]
def time_stepping(self):
"""
Initialize time-discfretization and variational formulations for
assembly.
"""
self.U = []
t0 = time.time()
lp(self.MP.logger, 20, 'Solution of main FE system(s) of equations:')
if self.be_order == 1 and self.quasi_static:
u_0 = self.init_initial_condition(None)
self.first_order_IE(u_0)
elif self.be_order == 2:
u_0, u_1 = self.init_initial_condition(None, order=2)
self.second_order_IE(u_0, u_1)
elif self.be_order == 1 and not self.quasi_static:
u_0, u_1 = self.init_initial_condition(None, order=2)
self.first_order_IE_full(u_0, u_1)
else:
lp(self.MP.logger, 50,
'Error!, Only order *1* or *2* for implicit Euler method'
' are supported. Aborting ...')
raise SystemExit
self.FS.solution_time = time.time() - t0
lp(self.MP.logger, 50,
' - solution time for time stepping (s) --> ' +
str(time.time() - t0) + ' - ', pre_dash=False, post_dash=True)
[docs]
def build_var_form(self, **fem_kwargs):
"""
Build variational formulation for E-field time-stepping approach and
print FE parameters. A description of the keyword arguments is given
in the **ApproachBaseTD** and **ImplicitEuler**classes.
"""
self.update_fem_parameters(fem_kwargs)
if self.source_type not in ['j', 'djdt']:
lp(self.MP.logger, 50,
'Error! *source_type* must be either "j" or "djdt."')
raise SystemExit
if 'times' not in fem_kwargs:
self.times = np.logspace(
self.log_t_min, self.log_t_max, self.n_log_steps)
self.n_times = len(self.times)
if self.source_func is None:
self.source_func = self.gaussian(
np.linspace(0., 10**(self.log_t_min-1), 101),
0.5 * 10**(self.log_t_min-1),
self.pulse_width * 10**(self.log_t_min-1))
if self.shut_off and self.MP.grounding[0]:
self.source_func = np.append(self.source_func,
np.zeros(100))
self.source_func = np.append(self.source_func,
-self.source_func[:101])
check_source(self.s_type, self.tx)
mu = df.Constant(self.MP.mu)
self.mu1 = df.Constant(1./self.MP.mu)
if self.bc == 'ZD' or self.bc == 'ZeroDirichlet':
boundary = DirichletBoundary()
bc = df.DirichletBC(self.FS.V, df.Constant((0, 0, 0)), boundary)
else:
bc = None
t0 = time.time()
self.Assembler = TotalFieldAssembler(self, bc)
self.Assembler.assemble()
lp(self.MP.logger, 20,
'... assembling system matrix ...', pre_dash=False)
C_form = df.inner(self.MP.mu_inv_func * df.curl(self.u),
df.curl(self.v)) * self.dx_0
B_form = df.inner(self.MP.sigma_func * self.u, self.v) * self.dx_0
if not self.quasi_static:
A_form = df.inner(self.MP.eps_func * self.u, self.v) * self.dx_0
self.A, self.B, self.C = df.PETScMatrix(), df.PETScMatrix(), \
df.PETScMatrix()
a, b, c = df.PETScVector(), df.PETScVector(), df.PETScVector()
RHS = df.inner(df.Constant(("0.0", "0.0", "0.0")), self.v) * self.dx_0
df.assemble_system(B_form, RHS, bc, A_tensor=self.B, b_tensor=b)
df.assemble_system(C_form, RHS, bc, A_tensor=self.C, b_tensor=c)
self.system_size = str(self.B.mat().getSize()[0])
if not self.quasi_static:
df.assemble_system(A_form, RHS, bc, A_tensor=self.A, b_tensor=a)
self.assembly_time = time.time() - t0
lp(self.MP.logger, 20,
' - assembly time (s) --> ', self.assembly_time,
pre_dash=False)
[docs]
def init_initial_condition(self, ini_con, order=1):
"""
Initialize the initial fields u_0 and u1 (if second order scheme or
full formulation) for the time-stepping.
Required arguments
------------------
- ini_con, type dolfin function
use customized initial fields, not supported yet
Keyword arguments
-----------------
- order = 1, type int
specify first or second order time-stepping scheme
"""
# further choices for *ini_con* might be added for incorporating
# custom inital conditions
if ini_con is None:
if order == 1:
return([df.Function(self.FS.V) for ti in range(self.n_tx)])
else:
return([df.Function(self.FS.V) for ti in range(self.n_tx)],
[df.Function(self.FS.V) for ti in range(self.n_tx)])
[docs]
def first_order_IE(self, u_0):
"""
First-order backward Euler time-stepping scheme using diffusive
approximation (no displacement currents).
Required arguments
------------------
- u_0, type dolfin function
initial solution at first time step
"""
counter = 0
times, self.export_flags = self.init_times()
self.times = []
dt = np.diff(times, axis=0).ravel()
lp(self.MP.logger, 20,
'... starting 1st order time stepping ...', pre_dash=False)
self.print_stepping_info(dt)
if self.export_flags[0]:
lp(self.MP.logger, 20,
' - first observation time is equal to zeroth calculation '
'time - \n'
'... setting solution of first observation time to '
'zero-function ...', pre_dash=False)
self.U.append([df.Function(self.FS.V) for ti in range(self.n_tx)])
# first-order time-stepping after step 0
while counter < len(dt):
lp(self.MP.logger, 10,
'starting step: %d, t =%8.3e, dt = %8.3e' % (
counter, times[counter + 1], dt[counter]))
b = []
u_1 = [df.Function(self.FS.V) for ti in range(self.n_tx)]
for ti in range(self.n_tx):
if self.source_type == 'j' and self.shut_off:
if counter < self.n_on - 1:
b.append(self.B*u_0[ti].vector() - dt[counter] *
self.Assembler.b[ti].vector())
if ti == 0:
lp(self.MP.logger, 10, 'Tx on, counter',
counter, pre_dash=False)
else:
b.append(self.B*u_0[ti].vector())
if ti == 0:
lp(self.MP.logger, 10, 'Tx off, counter',
counter, pre_dash=False)
elif self.source_type == 'j' and not self.shut_off:
if counter < self.n_on - 1:
b.append(self.B*u_0[ti].vector())
if ti == 0:
lp(self.MP.logger, 10, 'Tx off, counter',
counter, pre_dash=False)
else:
b.append(self.B*u_0[ti].vector() - dt[counter] *
self.Assembler.b[ti].vector())
if ti == 0:
lp(self.MP.logger, 10, 'Tx on, counter',
counter, pre_dash=False)
elif self.source_type == 'djdt':
if counter < self.n_on:
b.append(self.B*u_0[ti].vector() + dt[counter] *
self.Assembler.b[ti].vector() *
self.source_func[counter])
if ti == 0:
lp(self.MP.logger, 10, 'Tx on, counter',
counter, pre_dash=False)
else:
b.append(self.B*u_0[ti].vector())
if ti == 0:
lp(self.MP.logger, 10, 'Tx off, counter',
counter, pre_dash=False)
if ti == 0:
if not np.isclose(dt[counter], dt[counter - 1],
atol=dt[counter] * self.t_tol):
lp(self.MP.logger, 10, '... factorizing new '
'system matrix ...', pre_dash=False)
D = self.B + dt[counter]*self.C
solver = df.PETScLUSolver(self.FS.mesh.mpi_comm(),
D, "mumps")
solver.solve(u_1[ti].vector(), b[ti])
# update previous solution
u_0[ti].assign(u_1[ti])
# store E-field for export times
if self.export_flags[counter + 1]:
self.U.append(u_1)
self.times.append(times[counter + 1])
counter += 1
[docs]
def first_order_IE_full(self, u_0, u_1):
"""
First-order backward Euler time-stepping scheme for full formulation
considering displacement currents.
Required arguments
------------------
- u_0, type dolfin function
initial solution at first time step
- u_1, type dolfin function
intital solution at second time step
"""
lp(self.MP.logger, 50,
'Error! Updates to new class design need to be implemented for '
'full formulation of first order IE method. If required.\nPlease '
'contact the authors and we will update this method accordingly. '
'Aborting ...')
raise SystemExit
counter = 0
times, self.export_flags = self.init_times()
self.times = []
dt = np.diff(times, axis=0).ravel()
lp(self.MP.logger, 20,
'... starting 1st order time stepping including '
'displacement currents ...', pre_dash=False)
self.print_stepping_info(dt)
gauss = self.gaussian(np.linspace(0., 10**self.log_t_min, 101),
0.5 * 10**self.log_t_min,
0.1 * 10**self.log_t_min)
if self.export_flags[0]:
lp(self.MP.logger, 20,
' - first observation time is equal to zeroth calculation '
'time - \n'
'... setting solution of first observation time to '
'zero-function ...', pre_dash=False)
self.U.append([df.Function(self.FS.V) for ti in range(self.n_tx)])
while counter < len(dt):
lp(self.MP.logger, 10,
'starting step: %d, t =%8.3e, dt = %8.3e' % (
counter, times[counter + 1], dt[counter]))
u_2 = df.Function(self.FS.V)
if self.source_type == 'j':
if counter < self.n_on - 1:
b = ((2./dt[counter]) * self.A + self.B) * u_1.vector() -\
(1./dt[counter]) * self.A * u_0.vector() +\
dt[counter] * self.Assembler.b[ti].vector()
lp(self.MP.logger, 10, 'on', counter)
else:
b = ((2./dt[counter]) * self.A + self.B) * u_1.vector() -\
(1./dt[counter]) * self.A * u_0.vector()
lp(self.MP.logger, 10, 'off', counter)
elif self.source_type == 'djdt':
if counter < self.n_on - 1:
b = ((2./dt[counter]) * self.A + self.B) * u_1.vector() -\
(1./dt[counter]) * self.A * u_0.vector() -\
dt[counter] * self.Assembler.b[ti].vector() *\
gauss[counter]
lp(self.MP.logger, 10, 'on', counter)
else:
b = ((2./dt[counter]) * self.A + self.B) * u_1.vector() -\
(1./dt[counter]) * self.A * u_0.vector()
lp(self.MP.logger, 10, 'off', counter)
if not np.isclose(dt[counter], dt[counter - 1],
atol=dt[counter] * self.t_tol):
lp(self.MP.logger, 10, '... factorizing new '
'system matrix ...', pre_dash=False)
D = (1./dt[counter]) * self.A + self.B + dt[counter] * self.C
solver = df.PETScLUSolver(self.FS.mesh.mpi_comm(), D, "mumps")
solver.solve(u_2.vector(), b)
# update previous solution and save E-field
if self.export_flags[counter + 1]:
self.U.append(u_1)
self.times.append(times[counter + 1])
counter += 1
u_0.assign(u_1)
[docs]
def second_order_IE(self, u_0, u_1):
"""
Second-order backward Euler time-stepping scheme.
Required arguments
------------------
- u_0, type dolfin function
initial solution at first time step
- u_1, type dolfin function
intital solution at second time step
"""
counter = 0
times, self.export_flags = self.init_times()
self.times = []
dt = np.diff(times, axis=0).ravel()
lp(self.MP.logger, 20,
'... starting 2nd order time stepping ...', pre_dash=False)
self.print_stepping_info(dt)
if self.export_flags[0]:
lp(self.MP.logger, 20,
' - first observation time is equal to zeroth calculation '
'time - \n'
'... setting solution of first observation time to '
'zero-function ...', pre_dash=False)
self.U.append([df.Function(self.FS.V) for ti in range(self.n_tx)])
# second-order time-stepping after step 1
while counter < len(dt):
lp(self.MP.logger, 10,
'starting step: %d, t =%8.3e, dt = %8.3e' % (
counter, times[counter + 1], dt[counter]))
b = []
u_2 = [df.Function(self.FS.V) for ti in range(self.n_tx)]
for ti in range(self.n_tx):
if self.source_type == 'j' and self.shut_off:
if counter < self.n_on - 1:
b.append(
self.B * (4. * u_1[ti].vector() -
u_0[ti].vector()) -
2 * dt[counter] * self.Assembler.b[ti].vector())
if ti == 0:
lp(self.MP.logger, 10, 'Tx on, counter',
counter, pre_dash=False)
else:
b.append(self.B*(4.*u_1[ti].vector() -
u_0[ti].vector()))
if ti == 0:
lp(self.MP.logger, 10, 'Tx off, counter',
counter, pre_dash=False)
elif self.source_type == 'j' and not self.shut_off:
if counter < self.n_on - 1:
b.append(self.B*(4.*u_1[ti].vector() -
u_0[ti].vector()))
if ti == 0:
lp(self.MP.logger, 10, 'Tx off, counter',
counter, pre_dash=False)
else:
b.append(
self.B * (4. * u_1[ti].vector() -
u_0[ti].vector()) -
2 * dt[counter] * self.Assembler.b[ti].vector())
if ti == 0:
lp(self.MP.logger, 10, 'Tx on, counter',
counter, pre_dash=False)
elif self.source_type == 'djdt':
if counter < self.n_on:
b.append(
self.B * (4. * u_1[ti].vector() -
u_0[ti].vector()) -
2 * dt[counter] * self.Assembler.b[ti].vector() *
self.source_func[counter])
if ti == 0:
lp(self.MP.logger, 10, 'Tx on, counter',
counter, pre_dash=False)
else:
b.append(self.B*(4.*u_1[ti].vector() -
u_0[ti].vector()))
if ti == 0:
lp(self.MP.logger, 10, 'Tx off, counter',
counter, pre_dash=False)
if ti == 0:
if not np.isclose(dt[counter], dt[counter - 1],
atol=dt[counter] * self.t_tol):
D = 3.*self.B + 2.*dt[counter]*self.C
solver = df.PETScLUSolver(self.FS.mesh.mpi_comm(),
D, "mumps")
solver.solve(u_2[ti].vector(), b[ti])
# update previous solution
u_0[ti].assign(u_1[ti])
u_1[ti].assign(u_2[ti])
# store E-field for export times
if self.export_flags[counter + 1]:
self.U.append(u_2)
self.times.append(times[counter + 1])
counter += 1
[docs]
def init_times(self):
"""
Initialize times for time-stepping. Depending on the *source_type*,
initial time steps before are added automatically to incorporate the
source signal before the observation times are calculated.
"""
# use custom source times
if self.source_times is not None:
all_times = self.source_times
self.n_on = len(all_times) - 1
# initialize default times for default ideal source functions
else:
if self.source_type == 'j':
all_times = np.linspace(self.t0_j, 0., self.n_on)
all_times = np.append(
all_times, np.linspace(10**(self.log_t_min - 2),
10**self.log_t_min, 101)[:-1])
if self.source_type == 'djdt':
if self.shut_off and self.MP.grounding[0]:
all_times = np.linspace(0., 10**(self.log_t_min - 1),
101) - 10**(self.log_t_min-1) -\
10**(self.log_t_max+1)
all_times = np.append(
all_times, np.linspace(-10. + 10**(self.log_t_min - 3),
-10**(self.log_t_min - 1) -
10**(self.log_t_min - 3), 100))
all_times = np.append(
all_times, np.linspace(0.,
10**(self.log_t_min-1),
101) - 10**(self.log_t_min-1))
self.n_on = 300
else:
all_times = np.linspace(0., 10**(self.log_t_min - 1),
101) - 10**(self.log_t_min - 1)
self.n_on = 100
all_times = np.append(
all_times, np.linspace(10**(self.log_t_min - 3),
10**self.log_t_min, 101)[:-1])
if self.be_order == 2:
self.n_on -= 1
if self.stepping_times is None:
# add times for calculating the transient response
if self.n_lin_steps == 0:
all_times = np.append(all_times, np.logspace(
self.log_t_min, self.log_t_max, self.n_log_steps))
else:
log_times = np.logspace(self.log_t_min, self.log_t_max,
self.n_log_steps)
for jj in range(len(log_times) - 1):
all_times = np.append(all_times, np.linspace(
log_times[jj], log_times[jj+1],
self.n_lin_steps + 1)[:-1])
all_times = np.append(all_times, log_times[-1])
else:
all_times = np.append(all_times, self.stepping_times)
export_flags = np.array([False for ll in range(len(all_times))])
for tt in self.times:
export_flags[np.abs(all_times - tt).argmin()] = True
if len(export_flags[export_flags]) != len(self.times):
lp(self.MP.logger, 50, 'Number of target times', len(self.times),
pre_dash=False)
lp(self.MP.logger, 50, 'Number of export flags',
len(export_flags[export_flags]), pre_dash=False)
lp(self.MP.logger, 50,
'Error! Not all target times can be exported with this '
'mix of linear and logarithmic time steps.\nChoose more time '
'steps or decrease the number of target times. Aborting ...')
raise SystemExit
return(all_times.reshape(-1, 1), export_flags)
[docs]
def gaussian(self, x, x0, alpha):
"""
Calculate gaussian pulse.
Required arguments
------------------
- x, type numpy array
time values
- x0, type float
central value of times values
- alpha, type float
stretching parameter of Dirac pulse function
"""
return (np.exp(-(x - x0)**2 / alpha**2)) / (alpha * np.sqrt(np.pi))
[docs]
def print_stepping_info(self, dt):
"""
Print some information before starting the time-stepping process.
Required arguments
------------------
- dt, type array
vector of time-steps
"""
lp(self.MP.logger, 20,
' - overall number of observation times --> ' +
str(len(self.times)) + ' - ', pre_dash=False)
lp(self.MP.logger, 20,
' - overall number of computation time steps --> ' +
str(len(dt)) + ' - ', pre_dash=False)
lp(self.MP.logger, 20,
'... calculating ' + str(self.n_tx) + ' transmitters '
'for each time step ...', pre_dash=False)
lp(self.MP.logger, 20,
'... time-stepping will take a while ...', pre_dash=False)
lp(self.MP.logger, 20,
' - choose *debug* (10) logging level to follow the time-stepping'
' process - ', pre_dash=False)
[docs]
class FourierTransformBased(ApproachBaseTD):
"""
FE implementation of inverse Fourier-transform based approach. It
calculates several tens of frequency-domain solutions with the total
E-field approach and converts them to time domain afterwards.
Methods
-------
- build_var_form()
set up attributes required in the frequency-domain variational
formulations
- initialize frequencies()
initialize frequencies for the frequency-domain computations
automatically if not given explicitly as keyword argument
- calc_frequency_solutions()
calculate frequency-domain solutions with total E-field approach
- transform_to_time_domain()
transform frequency-domain solutions to time domain
- fht_hahnstein_80()
use default set of 80 filter coefficients to perform the
fast Hankel transformation
RA approach specific parameters
-------------------------------
- n_mult = 6, type int
multiplyer of poles, do not change
- n_poles = 2, type int
number of poles to be used
- pole_shift = None, type float
shift poles in time; the default covered range is about 4 to 5 decades
from the earliest times on; this values is adjusted automatically and
it is not recommended to change it
- dc_field = None, type dolfin Function
the DC field is automatically calculated on the fly; for special
purposes, it might be computed externally and passed here
"""
def __init__(self, FS, MP):
"""
Initialize FE class for *E_FT* approach. See description of
**ApproachBaseTD** class for further information about parameters of
the time-domain modeling approaches.
Required arguments
------------------
- FS, type class
FunctionSpaces instance
- MP, type class
ModelParameters instance
"""
# inherit default attributes and common methods from ApproachBaseTD
super(self.__class__, self).__init__()
self.FS = FS
self.MP = MP
self.dx_0 = self.FS.DOM.dx_0
self.path_td = self.MP.r_dir + '/E_FT/' + self.MP.mesh_name
# initialize FT approach specific parameters
self.omegas = None
self.frequencies = None
self.fd_mod_name = 'FT_' + self.MP.mod_name
self.fd_approach = 'E_t'
self.interp_mesh = None
self.parallel_interpolation = False
if df.MPI.rank(self.MP.mpi_cw) == 0:
Path('.').joinpath(
self.path_td + '/' + self.MP.mod_name +'_interpolated').mkdir(
exist_ok=True, parents=True)
# mute print of the following FE class attributes and attributes only
# relevant for the other time-domain modeling approaches
self.mute = ['FS', 'MP', 'dx_0', 'mute', 'source_type', 'be_order',
'n_lin_steps', 'n_on', 't0_j', 'pulse_width',
'source_func', 'source_times', 'stepping_times',
't_tol', 'n_mult', 'n_poles', 'pole_shift', 'dc_field']
[docs]
def build_var_form(self, **fem_kwargs):
"""
Initialize parameters for the Fourier-transform based approach and
print FE parameters. A description of the keyword arguments is given
in the **ApproachBaseTD** and **FourierTransformBased**classes.
"""
self.update_fem_parameters(fem_kwargs)
self.path_fd = (self.MP.r_dir + '/' + self.fd_approach + '/' +
self.MP.mesh_name)
if self.times is None:
self.times = np.logspace(self.log_t_min, self.log_t_max,
self.n_log_steps)
self.n_times = len(self.times)
if self.omegas is None and self.frequencies is None:
self.initalize_frequencies()
else:
if self.frequencies is not None:
self.omegas = self.frequencies * 2. * np.pi
if self.omegas is not None:
self.frequencies = self.omegas / (2. * np.pi)
self.n_freqs = len(self.omegas)
if len(self.MP.currents) == 1:
self.MP.currents = np.tile(self.MP.currents[0], (self.n_freqs, 1))
lp(self.MP.logger, 20, '... storing *frequencies* and *times* to '
'files in export directory ...', pre_dash=False)
if df.MPI.rank(self.MP.mpi_cw) == 0:
np.savetxt(self.path_td + '/' + self.MP.mod_name +
'_frequencies.dat', self.frequencies)
np.savetxt(self.path_td + '/' + self.MP.mod_name +
'_times.dat', self.times)
[docs]
def initalize_frequencies(self):
"""
Initialize frequencies for the frequency-domain solutions
automatically based on the default 80 FHT filter coefficients.
"""
t0 = 10.**self.log_t_min # First time channel
tmax = 10.**self.log_t_max # Last time channel
nc = 80 # Hahnstein filters
q = 10. ** 0.1 # Factor for increasing the time channel
t_shift = 1.653801536E+03 # Factor for shift of frequency
nt = 1
tmp = t0
while True:
tmp = tmp * q
nt += 1
if(tmp > tmax):
break
omega = t_shift/t0
self.omegas = []
self.frequencies = []
for iteration in range(nc + nt - 1):
omega = omega/q
self.omegas.append(omega)
self.frequencies.append(omega / (2 * np.pi))
lp(self.MP.logger, 20,
' - using ' + str(len(self.omegas)) + ' frequencies - ',
pre_dash=False)
[docs]
def calc_frequency_solutions(self, auto_interpolate,
repeat_frequencies, **solve_kwargs):
"""
Calculate frequency-domain solutions. Keyword arguments are piped
to this method by the *build_var_form()* method.
"""
t0 = time.time()
# define model
M = MOD(self.fd_mod_name, self.MP.mesh_name, self.fd_approach,
p=self.FS.p, overwrite_results=True, m_dir=self.MP.m_dir,
r_dir=self.MP.r_dir, reuse_fs=self.FS,
serial_ordering=self.MP.serial_ordering,
mumps_debug=self.MP.mumps_debug)
# define frequency and condcutivities
M.MP.update_model_parameters(
omegas=self.omegas,
sigma_ground=self.MP.sigma_ground,
sigma_air=self.MP.sigma_air,
currents=self.MP.currents,
n_domains=self.MP.n_domains)
# let FEniCS set up the variational formulations
M.FE.build_var_form(s_type=self.s_type, grounding=self.grounding,
start=self.start, stop=self.stop,
origin=self.origin, azimuth=self.azimuth,
length=self.length, radius=self.radius,
n_tx=self.n_tx, tx=self.tx, bc=self.bc,
ip=self.ip, quasi_static=self.quasi_static,
tx_topo=self.tx_topo, a_tol=self.a_tol)
# call solver and convert H-fields if specified
M.solve_main_problem(auto_interpolate=auto_interpolate,
repeat_frequencies=repeat_frequencies,
**solve_kwargs)
self.assembly_time = M.FE.assembly_time
self.system_size = M.FE.system_size
self.FS.solution_time = time.time() - t0
lp(self.MP.logger, 20,
' - solution time for all frequencies (s) --> ' +
str(self.FS.solution_time) + ' - ',
pre_dash=False, post_dash=True)
[docs]
def transform_to_time_domain(self, interp_mesh, EH, shut_on=True,
shut_off=True, impulse=True):
"""
Transform frequency-domain solutions to time domain using the
fast Hankel transformation routine.
Required arguments
------------------
- interp_mesh, type str
interpolation mesh for the chosen frequency-domain results
which should be converted to time domain
- EH, type str
default is *'EH'*, that means both, electric and magentic
fields are converted. Users can choose only *'E'* or *'H'*.
Keyword arguments
-----------------
- shut_on = True, type str
transform to shut-on field transients or not if *False*
- shut_off = True, type str
transform to shut-off field transients or not if *False*
- impulse = True, type str
transform to impulse (dB/dt) repsonse transients or not if *False*
"""
self.interp_dir = self.path_fd + '/' + self.fd_mod_name + \
'_interpolated/'
interp_dir = self.path_td + '/' + self.MP.mod_name + '_interpolated/'
if 'rx_path' in interp_mesh:
self.rx_pos = self.MP.rx[0]
self.n_rx = len(self.rx_pos)
else:
if interp_mesh.find('line') != -1:
self.interp_mesh_dir = self.MP.m_dir + '/lines'
elif interp_mesh.find('slice') != -1:
self.interp_mesh_dir = self.MP.m_dir + '/slices'
elif interp_mesh.find('path') != -1:
self.interp_mesh_dir = self.MP.m_dir + '/paths'
else:
lp(self.MP.logger, 50,
'Error! Interpolation target mesh name must '
'end either with "line", "slice" or "path" so far. '
'Aborting ...', post_dash=True)
raise SystemExit
lp(self.MP.logger, 20,
'... transforming results to time-domain ...')
if shut_off and self.MP.grounding[0]:
lp(self.MP.logger, 20,
'... calculating DC-field ...', pre_dash=False)
self.calc_dc_field(store=True)
self.interpolate_dc(interp_mesh, export_pvd=False)
i_mesh = df.Mesh(self.MP.mpi_cs, self.interp_mesh_dir +
'/' + interp_mesh + '.xml')
self.rx_pos = i_mesh.coordinates()
self.n_rx = len(self.rx_pos)
rank = -1
for ti in range(self.n_tx):
for ci in range(self.n_rx):
rank += 1
if self.n_tx == 1:
dc_name = self.interp_dir + 'E_dc_' +\
interp_mesh + '.npy'
e_name = self.interp_dir + 'E_t_' + \
interp_mesh + '_rx_' + str(ci) + '.npy'
h_name = self.interp_dir + 'H_t_' + \
interp_mesh + '_rx_' + str(ci) + '.npy'
export_path = interp_dir
else:
dc_name = self.interp_dir + 'tx_' + str(ti) + \
'_E_dc_' + interp_mesh + '.npy'
e_name = self.interp_dir + 'tx_' + str(ti) + '_E_t_' +\
interp_mesh + '_rx_' + str(ci) + '.npy'
h_name = self.interp_dir + 'tx_' + str(ti) + '_H_t_' +\
interp_mesh + '_rx_' + str(ci) + '.npy'
export_path = interp_dir + 'tx_' + str(ti) + '_'
if df.MPI.rank(self.MP.mpi_cw) == rank:
fd_to_td = np.zeros((self.n_times, 3))
if 'E_t' in EH or 'E' in EH:
pass
if shut_on:
E_field = np.load(e_name)
lp(self.MP.logger, 10,
'... process ' + str(df.MPI.rank(
self.MP.mpi_cw)) +
' transforming to E shut-on ...',
pre_dash=False, root_only=False,
barrier=False)
for c in range(3):
fd_to_td[:, c] = self.fht_hanstein_80(
E_field[:, c], 1)
np.save(export_path + 'E_shut_on_rx_' +
str(ci) + '.npy', fd_to_td)
if shut_off:
E_field = np.load(e_name)
lp(self.MP.logger, 10,
'... process ' + str(df.MPI.rank(
self.MP.mpi_cw)) +
' transforming to E shut-off ...',
pre_dash=False, root_only=False,
barrier=False)
for c in range(3):
if self.MP.grounding[ti]:
dc_level = np.load(dc_name)
dc_level = dc_level[ci, c]
else:
dc_level = None
fd_to_td[:, c] = self.fht_hanstein_80(
E_field[:, c], -1,
dc_level=dc_level)
np.save(export_path + 'E_shut_off_rx_' +
str(ci) + '.npy', fd_to_td)
if 'H_t' in EH or 'H' in EH:
if shut_on:
H_field = np.load(h_name)
lp(self.MP.logger, 10,
'... process ' + str(df.MPI.rank(
self.MP.mpi_cw)) +
' transforming to H shut-on ...',
pre_dash=False, root_only=False,
barrier=False)
for c in range(3):
fd_to_td[:, c] = self.fht_hanstein_80(
H_field[:, c], 1)
np.save(export_path + 'H_shut_on_rx_' +
str(ci) + '.npy', fd_to_td)
if shut_off:
H_field = np.load(h_name)
lp(self.MP.logger, 10,
'... process ' + str(df.MPI.rank(
self.MP.mpi_cw)) +
' transforming to H shut-off ...',
pre_dash=False, root_only=False,
barrier=False)
for c in range(3):
fd_to_td[:, c] = self.fht_hanstein_80(
H_field[:, c], -1)
np.save(export_path + 'H_shut_off_rx_' +
str(ci) + '.npy', fd_to_td)
if impulse:
H_field = np.load(h_name)
lp(self.MP.logger, 10,
'... process ' + str(df.MPI.rank(
self.MP.mpi_cw)) +
' transforming to dH/dt ...',
pre_dash=False, root_only=False,
barrier=False)
for c in range(3):
fd_to_td[:, c] = self.fht_hanstein_80(
H_field[:, c], 0)
np.save(export_path + 'dH_rx_' +
str(ci) + '.npy', fd_to_td)
else:
pass
if rank == df.MPI.size(self.MP.mpi_cw) - 1:
rank = -1
lp(self.MP.logger, 10,
'... waiting for all mpi processes to finish '
'serial interpolation tasks ...', pre_dash=False)
df.MPI.barrier(df.MPI.comm_world)
[docs]
def fht_hanstein_80(self, fd_data, modus, dc_level=None):
"""
Default fast Hankel transformation routine with 80 filter coefficients.
Required arguments
------------------
- fd_data, type numpy array
frequency-domain data, only one component
- modus, type int
modus switch to calculate either shut-on, shut-off or impulse
response results
Keyword arguments
-----------------
dc_level = None, type float
used to apply explicitly a calculated DC level for shut-off E-field
transients calculations; otherwise, the DC level will be
approximated by the fields values of the lowest frequency
"""
fht_coeff = [5.001828e-11, 8.000316e-11, 1.256402e-10, 2.009589e-10,
3.155941e-10, 5.047858e-10, 7.927364e-10, 1.267965e-09,
1.991264e-09, 3.184983e-09, 5.001829e-09, 8.000316e-09,
1.256403e-08, 2.009589e-08, 3.155941e-08, 5.047858e-08,
7.927363e-08, 1.267965e-07, 1.991263e-07, 3.184982e-07,
5.001826e-07, 8.000310e-07, 1.256401e-06, 2.009585e-06,
3.155931e-06, 5.047835e-06, 7.927307e-06, 1.267950e-05,
1.991228e-05, 3.184892e-05, 5.001599e-05, 7.999741e-05,
1.256258e-04, 2.009226e-04, 3.155029e-04, 5.045568e-04,
7.921612e-04, 1.266520e-03, 1.987636e-03, 3.175872e-03,
4.978954e-03, 7.942905e-03, 1.242000e-02, 1.973483e-02,
3.065536e-02, 4.821916e-02, 7.364350e-02, 1.128325e-01,
1.647498e-01, 2.348622e-01, 3.004906e-01, 3.372419e-01,
2.337323e-01, -8.647107e-02, -6.612999e-01, -8.142974e-01,
3.308580e-01, 1.402432e-00, -1.565116e-00, 8.359842e-01,
-3.059907e-01, 9.125030e-02, -2.463153e-02, 6.369876e-03,
-1.618934e-03, 4.085877e-04, -1.028279e-04, 2.584884e-05,
-6.494904e-06, 1.631643e-06, -4.098700e-07, 1.029566e-07,
-2.586174e-08, 6.496194e-09, -1.631772e-09, 4.098828e-10,
-1.029579e-10, 2.586184e-11, -6.496186e-12, 1.631761e-12]
nc = len(fht_coeff)
td_data = np.zeros(self.n_times)
if modus == -1 and dc_level is None:
dc_level = fd_data[-1].real
if modus in [1, -1]: # step on / off
fd_data /= np.zeros(self.n_freqs) + 1j * np.array(self.omegas)
# fht routine
for nn in range(self.n_freqs):
ita = max(1, nn - nc)
ite = min(self.n_times, nn - 1)
for it in range(ita - 1, ite):
td_data[it] -= fd_data[nn].imag * fht_coeff[nc - nn + it - 1]
# post-processing
out = td_data * np.sqrt(2. / np.pi) / self.times
if modus == -1:
out -= dc_level
return(out)
[docs]
class RationalArnoldi(ApproachBaseTD):
"""
FE implementation of rational Arnoldi (Krylov-subspace) method after
Börner (2015).
Methods
-------
- rational_arnoldi()
runs the complete rational-arnoldi solution process as well as H-field
conversion and export methods, FE modeling and time stepping related
keyword arguments can be passed to this method
- build_var_form()
set up variational formulation and assemble mass and curl-curl matrices
as well as right-hand side (source) vector
- get_poles_from_table()
define poles used in rational Arnoldi method
- build_rational_arnoldi_basis()
build Krylov subspace basis
- expm_ra()
utility function to calculate matrix exponentials
- project_solution()
calculate solution on Krylov subspace and project it back
RA approach specific parameters
-------------------------------
- n_mult = 6, type int
multiplyer of poles, do not change
- n_poles = 2, type int
number of poles to be used
- pole_shift = None, type float
shift poles in time; the default covered range is about 4 to 5 decades
from the earliest times on; this values is adjusted automatically and
it is not recommended to change it
- dc_field = None, type dolfin Function
the DC field is automatically calculated on the fly; for special
purposes, it might be computed externally and passed here
"""
def __init__(self, FS, MP):
"""
Initialize FE class for *E_RA* approach. See description of
**ApproachBaseTD** class for further information about parameters of
the time-domain modeling approaches.
Required arguments
------------------
- FS, type class
FunctionSpaces instance
- MP, type class
ModelParameters instance
"""
# inherit default attributes and common methods from ApproachBaseTD
super(self.__class__, self).__init__()
self.FS = FS
self.MP = MP
self.Solver = Solver(self.FS, self, mumps_debug=self.MP.mumps_debug)
self.dx_0 = self.FS.DOM.dx_0
# initialize trial and test functions
self.v = df.TestFunction(FS.V)
self.u = df.TrialFunction(FS.V)
# initialize RA approach specific parameters
self.n_mult = 6
self.n_poles = 2
self.pole_shift = None
self.dc_field = None
# mute print of the following FE class attributes and attributes only
# relevant for the other time-domain modeling approaches
self.mute = ['FS', 'MP', 'dx_0', 'u', 'v', 'mute', 'source_type',
'be_order', 'n_lin_steps', 'n_on', 't0_j', 'pulse_width',
'source_func', 'source_times', 'stepping_times',
't_tol', 'omegas', 'path_td', 'fd_mod_name', 'Solver',
'parallel_interpolation', 'interp_mesh', 'frequencies',
'fd_approach']
[docs]
def rational_arnoldi(self):
"""
Conduct rational Arnoldi modeling.
"""
self.get_poles_from_table()
self.m = int(self.n_poles * self.n + 1)
self.xi = np.tile(self.poles[0, :], self.n)
self.xi_index = np.tile(np.arange(self.n_poles, dtype=int), self.n)
t0 = time.time()
lp(self.MP.logger, 20, 'Solution of main FE system(s) of equations:')
self.build_krylov_subspace_basis()
lp(self.MP.logger, 20, '... projecting solution ...', pre_dash=False)
self.project_solution()
self.FS.solution_time = time.time() - t0
lp(self.MP.logger, 20,
' - combined rational arnoldi solution time (s) --> ' +
str(self.FS.solution_time) + ' - ',
pre_dash=False, post_dash=True)
[docs]
def build_var_form(self, **fem_kwargs):
"""
Build variational formulation for rational Arnoldi approach and
print FE parameters. A description of the keyword arguments is given
in the **ApproachBaseTD** and **RationalArnoldi** classes.
"""
self.update_fem_parameters(fem_kwargs)
if self.times is None:
self.times = np.logspace(self.log_t_min, self.log_t_max,
self.n_log_steps)
if self.pole_shift is None:
self.pole_shift = 10**(-6 - np.log10(self.times[0]))
self.n_times = len(self.times)
self.n = int(12 * (self.n_mult) / self.n_poles)
self.mu1 = df.Constant(1./self.MP.mu)
if self.bc == 'ZD' or self.bc == 'ZeroDirichlet':
boundary = DirichletBoundary()
bc = df.DirichletBC(self.FS.V, df.Constant((0, 0, 0)), boundary)
else:
bc = None
t0 = time.time()
self.Assembler = TotalFieldAssembler(self, bc)
self.Assembler.assemble()
lp(self.MP.logger, 20,
'... assembling system matrix ...', pre_dash=False)
C_form = df.inner(self.MP.mu_inv_func * df.curl(self.u),
df.curl(self.v)) * self.dx_0
B_form = df.inner(self.MP.sigma_func * self.u, self.v) * self.dx_0
self.B, self.C = df.PETScMatrix(), df.PETScMatrix()
b, c = df.PETScVector(), df.PETScVector()
RHS = df.inner(df.Constant(("0.0", "0.0", "0.0")), self.v) * self.dx_0
df.assemble_system(B_form, RHS, bc, A_tensor=self.B, b_tensor=b)
df.assemble_system(C_form, RHS, bc, A_tensor=self.C, b_tensor=c)
self.system_size = str(self.B.mat().getSize()[0])
self.assembly_time = time.time() - t0
lp(self.MP.logger, 20,
' - assembly time (s) --> ' + str(self.assembly_time) + ' - ',
pre_dash=False)
[docs]
def get_poles_from_table(self):
"""
Define *poles* for ration Arnoldi method.
"""
if self.n_poles == 1:
self.poles = -np.array([[3.14e5]]) * self.pole_shift
elif self.n_poles == 2:
# self.poles = -np.array([[6.35e4, 7.58e6]])
# # # new poles 1st!
# self.poles = -np.array([[1.6082e+03, 8.1261e+05]])
# self.n = 20
# new poles 2nd !
# self.poles = -np.array([[238.75, 4.519e+5]]) #* 0.01
# (1e-6 / 10**self.log_t_min)
# self.n = 20
# new poles 3rd!
self.poles = -np.array([[323.1722, 6.0128e+5]]) * self.pole_shift
self.n = 24
elif self.n_poles == 3:
self.poles = -np.array([[2.60e4, 6.73e5,
1.96e7]]) * self.pole_shift
else:
lp(self.MP.logger, 50,
'Error! Only "1, 2, 3" are valid values for "n_poles" '
'right now. Aborting ...')
raise SystemExit
[docs]
def build_krylov_subspace_basis(self):
"""
Build the Krylov subspace basis.
"""
# calculate inital condition
lp(self.MP.logger, 20,
'... calculating initial condition ...', pre_dash=False)
self.u_0 = self.Solver.solve_system_mumps(self.B, self.Assembler.b,
self.FS.V, sym=True)
# subtract dc field if shut off response for grounded sources
if any(self.MP.grounding) and self.shut_off:
if self.dc_field is None:
lp(self.MP.logger, 20,
'... calculating DC-field ...', pre_dash=False)
self.calc_dc_field()
else:
lp(self.MP.logger, 20,
'... importing DC-field ...', pre_dash=False)
if self.n_tx > 1:
lp(self.MP.logger, 50,
'Error! Import instead of on-thy-fly DC field '
'calculation only works for n_tx=1 so far. '
'Aborting ...')
raise SystemExit
self.E_dc = [df.Function(self.FS.V) for x in range(self.n_tx)]
read_h5(self.MP.mpi_cw, self.E_dc[0],
self.dc_field + '.h5')
for ti in range(self.n_tx):
if self.MP.grounding[ti]:
self.u_0[ti].vector().set_local(
self.u_0[ti].vector().get_local() -
self.E_dc[ti].vector().get_local())
self.u_0[ti].vector().apply('insert')
self.u = [[df.Function(self.FS.V) for ti in range(self.n_tx)]
for j in range(self.m + 1)]
w = [df.Function(self.FS.V) for ti in range(self.n_tx)]
H = [np.zeros((self.m + 1, self.m)) for ti in range(self.n_tx)]
for ti in range(self.n_tx):
w[ti] = w[ti].vector()
self.u_0[ti] = self.u_0[ti].vector()
for j in range(self.m + 1):
self.u[j][ti] = self.u[j][ti].vector()
for ti in range(self.n_tx):
self.u[0][ti] = self.u_0[ti] * 1./np.sqrt(
self.u_0[ti].inner(self.B * self.u_0[ti]))
solver0, solver1, solver2, solver3 = None, None, None, None
lp(self.MP.logger, 20, '... building subspace basis ...')
for j in range(self.m):
if self.n_tx != 1 and j == 0:
lp(self.MP.logger, 20,
'... calculating ' + str(self.n_tx) + ' transmitters '
'for each basis ...', pre_dash=False)
lp(self.MP.logger, 10,
'... calculating basis ' + str(j) + ' ...', pre_dash=False)
for ti in range(self.n_tx):
if j == self.m - 1:
self.Solver.solver.solve(w[ti], self.C * self.u[j][ti])
elif self.xi_index[j] == 0:
if solver0 is None:
D0 = df.PETScMatrix(self.B.mat() - self.C.mat() *
(1. / self.xi[j]))
solver0 = df.PETScLUSolver(D0, "mumps")
solver0.solve(w[ti], self.C * self.u[j][ti])
elif self.xi_index[j] == 1:
if solver1 is None:
D1 = df.PETScMatrix(self.B.mat() - self.C.mat() *
(1. / self.xi[j]))
solver1 = df.PETScLUSolver(D1, "mumps")
solver1.solve(w[ti], self.C * self.u[j][ti])
elif self.xi_index[j] == 2:
if solver2 is None:
D2 = df.PETScMatrix(self.B.mat() - self.C.mat() *
(1. / self.xi[j]))
solver2 = df.PETScLUSolver(D2, "mumps")
solver2.solve(w[ti], self.C * self.u[j][ti])
elif self.xi_index[j] == 3:
if solver3 is None:
D3 = df.PETScMatrix(self.B.mat() - self.C.mat() *
(1. / self.xi[j]))
solver3 = df.PETScLUSolver(D3, "mumps")
solver3.solve(w[ti], self.C * self.u[j][ti])
H[ti][:j, j] = 0.
for reo in range(2):
for k in range(j+1):
h = self.u[k][ti].inner(self.B*w[ti])
H[ti][k, j] += h
w[ti] = w[ti] - self.u[k][ti] * h
H[ti][j + 1, j] = np.sqrt(w[ti].inner(self.B*w[ti]))
w[ti] *= 1./H[ti][j + 1, j]
self.u[j+1][ti][:] = w[ti]
[docs]
def expm_ra(self, A, E):
"""
Calculate matrix exponentials with eigenvalues and eigenvectors.
Required arguments
------------------
- A, E, type numpy ndarray
input matrices for the calculation
"""
DD, VV = la.eig(A, E)
DD = np.diag(DD.real)
return(la.solve(VV.T, np.dot(VV, np.diag(np.exp(np.diag(DD)))).T))
[docs]
def project_solution(self):
"""
Project subspace solution back to original solution space.
"""
self.U = [[df.Function(self.FS.V) for ti in range(self.n_tx)] for
k in range(self.n_times)]
for ti in range(self.n_tx):
prod = [self.C * self.u[j][ti] for j in range(self.m)]
Mm = np.eye(self.m)
Km = np.zeros((self.m, self.m))
for i in range(self.m):
for j in range(self.m):
Km[i, j] = self.u[i][ti].inner(prod[j])
proj = np.sqrt(self.u_0[ti].inner(self.B * self.u_0[ti])) * \
np.eye(self.m, 1)
Um = np.zeros((self.m, self.n_times))
if df.MPI.rank(self.MP.mpi_cw) == 0:
for k in range(self.n_times):
dt = self.times[k]
try:
tmp = np.dot(self.expm_ra(-dt * Km, Mm), proj).ravel()
except ValueError:
tmp = np.nan
Um[:, k] = tmp
else:
pass
Um = self.MP.mpi_cw.bcast(Um, root=0)
for j in range(self.m):
for k in range(self.n_times):
self.U[k][ti].vector()[:] += self.u[j][ti].get_local() * \
Um[j, k]