Source code for custEM.fem.time_domain_approaches

# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""

import dolfin as df
import numpy as np
import scipy.linalg as la
from pathlib import Path
import time
from custEM.misc import logger_print as lp
from custEM.misc import read_h5
from custEM.core import MOD
from custEM.core import Solver
from custEM.fem import ApproachBaseTD
from custEM.fem import check_source
from custEM.fem import add_sigma_vals_to_list
from custEM.fem import TotalFieldAssembler

try:
    from custEM.misc import DirichletBoundary
except Exception as e:
    print(e)
    print('Warning! DirichletBoundary not imported. Continuing  ...')


[docs] class ImplicitEuler(ApproachBaseTD): """ FE implementation of Implicit Euler time-stepping method, e.g., (Um, 2010). Users can use either a first- or second-order discretization in time and, with cautioan, a formulation considering displacement currents which needs some more tests. Methods ------- - time_stepping() runs the complete time-stepping solution process as well as H-field conversion and export methods, FE modeling and time stepping related keyword arguments can be passed to this method. - build_var_form() set up variational formulation and assemble mass and curl-curl matrices as well right-hand side (source) vector - init_initial_condition() initialize initial condition for time-stepping approach - first_order_IE() perform first-order implicit Euler time-stepping approach - second_order_IE() perform second-order implicit Euler time-stepping approach - first_order_IE_full() perform first-order implicit Euler time-stepping approach including displacement currents - init_times() initalize internally used time vector for the time stepping - gaussian() define gaussian source pulse for impulse response modeling - print_stepping_info() print some information before starting the time-stepping process IE approach specific parameters ------------------------------- - source_type = 'djdt', type str alternatively, use *'j'* to calculate B-fields instead of dB/dt - source_func = None, type numpy array specify custom source (transmitter) function over time; needs to correspond to suitable *source_times* - source_times = None, type numpy array specify custom *source_times* if no default ramp-free source functions are used - stepping_times = None, type numpy array specify custom calculation times for the time-stepping after the source pulse - be_order = 1, type int order of discretization in time for backward Euler method, alterntively set to *2* - n_lin_steps = 30, type int number of linear steps between each logarithmic time step, each linear step requires only an inexpensive back-substitution for solving the system of linear equations - n_on = 101, type int number of linear time steps for creating source signal, not recommended to be changed - t0_j = -10., type float specify start time (negative) for specifying the duration of the source signal ramp if the source type is *j* - pulse_width = 1e-2, type float relative width of dirac function, not recommended to be changed - t_tol = 1e-2, type float tolerance to check if a calculated time step is linear or not (if not linear, a new matrix factorization will be computed) """ def __init__(self, FS, MP): """ Initialize FE class for *E_IE* approach. See description of **ApproachBaseTD** class for further information about parameters of the time-domain modeling approaches. Required arguments ------------------ - FS, type class FunctionSpaces instance - MP, type class ModelParameters instance """ # inherit default attributes and common methods from ApproachBaseTD super(self.__class__, self).__init__() self.FS = FS self.MP = MP self.dx_0 = self.FS.DOM.dx_0 # initialize trial and test functions self.v = df.TestFunction(FS.V) self.u = df.TrialFunction(FS.V) # initialize IE approach specific parameters self.source_type = 'djdt' self.be_order = 1 self.n_lin_steps = 30 self.n_on = 101 self.t0_j = -10. self.pulse_width = 1e-2 self.t_tol = 1e-2 self.source_func = None self.source_times = None self.stepping_times = None # mute print of the following FE class attributes and attributes only # relevant for the other time-domain modeling approaches self.mute = ['FS', 'MP', 'dx_0', 'u', 'v', 'mute', 'omegas', 'path_td', 'fd_mod_name', 'parallel_interpolation', 'interp_mesh', 'n_mult', 'n_poles', 'pole_shift', 'dc_field', 'frequencies', 'fd_approach']
[docs] def time_stepping(self): """ Initialize time-discfretization and variational formulations for assembly. """ self.U = [] t0 = time.time() lp(self.MP.logger, 20, 'Solution of main FE system(s) of equations:') if self.be_order == 1 and self.quasi_static: u_0 = self.init_initial_condition(None) self.first_order_IE(u_0) elif self.be_order == 2: u_0, u_1 = self.init_initial_condition(None, order=2) self.second_order_IE(u_0, u_1) elif self.be_order == 1 and not self.quasi_static: u_0, u_1 = self.init_initial_condition(None, order=2) self.first_order_IE_full(u_0, u_1) else: lp(self.MP.logger, 50, 'Error!, Only order *1* or *2* for implicit Euler method' ' are supported. Aborting ...') raise SystemExit self.FS.solution_time = time.time() - t0 lp(self.MP.logger, 50, ' - solution time for time stepping (s) --> ' + str(time.time() - t0) + ' - ', pre_dash=False, post_dash=True)
[docs] def build_var_form(self, **fem_kwargs): """ Build variational formulation for E-field time-stepping approach and print FE parameters. A description of the keyword arguments is given in the **ApproachBaseTD** and **ImplicitEuler**classes. """ self.update_fem_parameters(fem_kwargs) if self.source_type not in ['j', 'djdt']: lp(self.MP.logger, 50, 'Error! *source_type* must be either "j" or "djdt."') raise SystemExit if 'times' not in fem_kwargs: self.times = np.logspace( self.log_t_min, self.log_t_max, self.n_log_steps) self.n_times = len(self.times) if self.source_func is None: self.source_func = self.gaussian( np.linspace(0., 10**(self.log_t_min-1), 101), 0.5 * 10**(self.log_t_min-1), self.pulse_width * 10**(self.log_t_min-1)) if self.shut_off and self.MP.grounding[0]: self.source_func = np.append(self.source_func, np.zeros(100)) self.source_func = np.append(self.source_func, -self.source_func[:101]) check_source(self.s_type, self.tx) mu = df.Constant(self.MP.mu) self.mu1 = df.Constant(1./self.MP.mu) if self.bc == 'ZD' or self.bc == 'ZeroDirichlet': boundary = DirichletBoundary() bc = df.DirichletBC(self.FS.V, df.Constant((0, 0, 0)), boundary) else: bc = None t0 = time.time() self.Assembler = TotalFieldAssembler(self, bc) self.Assembler.assemble() lp(self.MP.logger, 20, '... assembling system matrix ...', pre_dash=False) C_form = df.inner(self.MP.mu_inv_func * df.curl(self.u), df.curl(self.v)) * self.dx_0 B_form = df.inner(self.MP.sigma_func * self.u, self.v) * self.dx_0 if not self.quasi_static: A_form = df.inner(self.MP.eps_func * self.u, self.v) * self.dx_0 self.A, self.B, self.C = df.PETScMatrix(), df.PETScMatrix(), \ df.PETScMatrix() a, b, c = df.PETScVector(), df.PETScVector(), df.PETScVector() RHS = df.inner(df.Constant(("0.0", "0.0", "0.0")), self.v) * self.dx_0 df.assemble_system(B_form, RHS, bc, A_tensor=self.B, b_tensor=b) df.assemble_system(C_form, RHS, bc, A_tensor=self.C, b_tensor=c) self.system_size = str(self.B.mat().getSize()[0]) if not self.quasi_static: df.assemble_system(A_form, RHS, bc, A_tensor=self.A, b_tensor=a) self.assembly_time = time.time() - t0 lp(self.MP.logger, 20, ' - assembly time (s) --> ', self.assembly_time, pre_dash=False)
[docs] def init_initial_condition(self, ini_con, order=1): """ Initialize the initial fields u_0 and u1 (if second order scheme or full formulation) for the time-stepping. Required arguments ------------------ - ini_con, type dolfin function use customized initial fields, not supported yet Keyword arguments ----------------- - order = 1, type int specify first or second order time-stepping scheme """ # further choices for *ini_con* might be added for incorporating # custom inital conditions if ini_con is None: if order == 1: return([df.Function(self.FS.V) for ti in range(self.n_tx)]) else: return([df.Function(self.FS.V) for ti in range(self.n_tx)], [df.Function(self.FS.V) for ti in range(self.n_tx)])
[docs] def first_order_IE(self, u_0): """ First-order backward Euler time-stepping scheme using diffusive approximation (no displacement currents). Required arguments ------------------ - u_0, type dolfin function initial solution at first time step """ counter = 0 times, self.export_flags = self.init_times() self.times = [] dt = np.diff(times, axis=0).ravel() lp(self.MP.logger, 20, '... starting 1st order time stepping ...', pre_dash=False) self.print_stepping_info(dt) if self.export_flags[0]: lp(self.MP.logger, 20, ' - first observation time is equal to zeroth calculation ' 'time - \n' '... setting solution of first observation time to ' 'zero-function ...', pre_dash=False) self.U.append([df.Function(self.FS.V) for ti in range(self.n_tx)]) # first-order time-stepping after step 0 while counter < len(dt): lp(self.MP.logger, 10, 'starting step: %d, t =%8.3e, dt = %8.3e' % ( counter, times[counter + 1], dt[counter])) b = [] u_1 = [df.Function(self.FS.V) for ti in range(self.n_tx)] for ti in range(self.n_tx): if self.source_type == 'j' and self.shut_off: if counter < self.n_on - 1: b.append(self.B*u_0[ti].vector() - dt[counter] * self.Assembler.b[ti].vector()) if ti == 0: lp(self.MP.logger, 10, 'Tx on, counter', counter, pre_dash=False) else: b.append(self.B*u_0[ti].vector()) if ti == 0: lp(self.MP.logger, 10, 'Tx off, counter', counter, pre_dash=False) elif self.source_type == 'j' and not self.shut_off: if counter < self.n_on - 1: b.append(self.B*u_0[ti].vector()) if ti == 0: lp(self.MP.logger, 10, 'Tx off, counter', counter, pre_dash=False) else: b.append(self.B*u_0[ti].vector() - dt[counter] * self.Assembler.b[ti].vector()) if ti == 0: lp(self.MP.logger, 10, 'Tx on, counter', counter, pre_dash=False) elif self.source_type == 'djdt': if counter < self.n_on: b.append(self.B*u_0[ti].vector() + dt[counter] * self.Assembler.b[ti].vector() * self.source_func[counter]) if ti == 0: lp(self.MP.logger, 10, 'Tx on, counter', counter, pre_dash=False) else: b.append(self.B*u_0[ti].vector()) if ti == 0: lp(self.MP.logger, 10, 'Tx off, counter', counter, pre_dash=False) if ti == 0: if not np.isclose(dt[counter], dt[counter - 1], atol=dt[counter] * self.t_tol): lp(self.MP.logger, 10, '... factorizing new ' 'system matrix ...', pre_dash=False) D = self.B + dt[counter]*self.C solver = df.PETScLUSolver(self.FS.mesh.mpi_comm(), D, "mumps") solver.solve(u_1[ti].vector(), b[ti]) # update previous solution u_0[ti].assign(u_1[ti]) # store E-field for export times if self.export_flags[counter + 1]: self.U.append(u_1) self.times.append(times[counter + 1]) counter += 1
[docs] def first_order_IE_full(self, u_0, u_1): """ First-order backward Euler time-stepping scheme for full formulation considering displacement currents. Required arguments ------------------ - u_0, type dolfin function initial solution at first time step - u_1, type dolfin function intital solution at second time step """ lp(self.MP.logger, 50, 'Error! Updates to new class design need to be implemented for ' 'full formulation of first order IE method. If required.\nPlease ' 'contact the authors and we will update this method accordingly. ' 'Aborting ...') raise SystemExit counter = 0 times, self.export_flags = self.init_times() self.times = [] dt = np.diff(times, axis=0).ravel() lp(self.MP.logger, 20, '... starting 1st order time stepping including ' 'displacement currents ...', pre_dash=False) self.print_stepping_info(dt) gauss = self.gaussian(np.linspace(0., 10**self.log_t_min, 101), 0.5 * 10**self.log_t_min, 0.1 * 10**self.log_t_min) if self.export_flags[0]: lp(self.MP.logger, 20, ' - first observation time is equal to zeroth calculation ' 'time - \n' '... setting solution of first observation time to ' 'zero-function ...', pre_dash=False) self.U.append([df.Function(self.FS.V) for ti in range(self.n_tx)]) while counter < len(dt): lp(self.MP.logger, 10, 'starting step: %d, t =%8.3e, dt = %8.3e' % ( counter, times[counter + 1], dt[counter])) u_2 = df.Function(self.FS.V) if self.source_type == 'j': if counter < self.n_on - 1: b = ((2./dt[counter]) * self.A + self.B) * u_1.vector() -\ (1./dt[counter]) * self.A * u_0.vector() +\ dt[counter] * self.Assembler.b[ti].vector() lp(self.MP.logger, 10, 'on', counter) else: b = ((2./dt[counter]) * self.A + self.B) * u_1.vector() -\ (1./dt[counter]) * self.A * u_0.vector() lp(self.MP.logger, 10, 'off', counter) elif self.source_type == 'djdt': if counter < self.n_on - 1: b = ((2./dt[counter]) * self.A + self.B) * u_1.vector() -\ (1./dt[counter]) * self.A * u_0.vector() -\ dt[counter] * self.Assembler.b[ti].vector() *\ gauss[counter] lp(self.MP.logger, 10, 'on', counter) else: b = ((2./dt[counter]) * self.A + self.B) * u_1.vector() -\ (1./dt[counter]) * self.A * u_0.vector() lp(self.MP.logger, 10, 'off', counter) if not np.isclose(dt[counter], dt[counter - 1], atol=dt[counter] * self.t_tol): lp(self.MP.logger, 10, '... factorizing new ' 'system matrix ...', pre_dash=False) D = (1./dt[counter]) * self.A + self.B + dt[counter] * self.C solver = df.PETScLUSolver(self.FS.mesh.mpi_comm(), D, "mumps") solver.solve(u_2.vector(), b) # update previous solution and save E-field if self.export_flags[counter + 1]: self.U.append(u_1) self.times.append(times[counter + 1]) counter += 1 u_0.assign(u_1)
[docs] def second_order_IE(self, u_0, u_1): """ Second-order backward Euler time-stepping scheme. Required arguments ------------------ - u_0, type dolfin function initial solution at first time step - u_1, type dolfin function intital solution at second time step """ counter = 0 times, self.export_flags = self.init_times() self.times = [] dt = np.diff(times, axis=0).ravel() lp(self.MP.logger, 20, '... starting 2nd order time stepping ...', pre_dash=False) self.print_stepping_info(dt) if self.export_flags[0]: lp(self.MP.logger, 20, ' - first observation time is equal to zeroth calculation ' 'time - \n' '... setting solution of first observation time to ' 'zero-function ...', pre_dash=False) self.U.append([df.Function(self.FS.V) for ti in range(self.n_tx)]) # second-order time-stepping after step 1 while counter < len(dt): lp(self.MP.logger, 10, 'starting step: %d, t =%8.3e, dt = %8.3e' % ( counter, times[counter + 1], dt[counter])) b = [] u_2 = [df.Function(self.FS.V) for ti in range(self.n_tx)] for ti in range(self.n_tx): if self.source_type == 'j' and self.shut_off: if counter < self.n_on - 1: b.append( self.B * (4. * u_1[ti].vector() - u_0[ti].vector()) - 2 * dt[counter] * self.Assembler.b[ti].vector()) if ti == 0: lp(self.MP.logger, 10, 'Tx on, counter', counter, pre_dash=False) else: b.append(self.B*(4.*u_1[ti].vector() - u_0[ti].vector())) if ti == 0: lp(self.MP.logger, 10, 'Tx off, counter', counter, pre_dash=False) elif self.source_type == 'j' and not self.shut_off: if counter < self.n_on - 1: b.append(self.B*(4.*u_1[ti].vector() - u_0[ti].vector())) if ti == 0: lp(self.MP.logger, 10, 'Tx off, counter', counter, pre_dash=False) else: b.append( self.B * (4. * u_1[ti].vector() - u_0[ti].vector()) - 2 * dt[counter] * self.Assembler.b[ti].vector()) if ti == 0: lp(self.MP.logger, 10, 'Tx on, counter', counter, pre_dash=False) elif self.source_type == 'djdt': if counter < self.n_on: b.append( self.B * (4. * u_1[ti].vector() - u_0[ti].vector()) - 2 * dt[counter] * self.Assembler.b[ti].vector() * self.source_func[counter]) if ti == 0: lp(self.MP.logger, 10, 'Tx on, counter', counter, pre_dash=False) else: b.append(self.B*(4.*u_1[ti].vector() - u_0[ti].vector())) if ti == 0: lp(self.MP.logger, 10, 'Tx off, counter', counter, pre_dash=False) if ti == 0: if not np.isclose(dt[counter], dt[counter - 1], atol=dt[counter] * self.t_tol): D = 3.*self.B + 2.*dt[counter]*self.C solver = df.PETScLUSolver(self.FS.mesh.mpi_comm(), D, "mumps") solver.solve(u_2[ti].vector(), b[ti]) # update previous solution u_0[ti].assign(u_1[ti]) u_1[ti].assign(u_2[ti]) # store E-field for export times if self.export_flags[counter + 1]: self.U.append(u_2) self.times.append(times[counter + 1]) counter += 1
[docs] def init_times(self): """ Initialize times for time-stepping. Depending on the *source_type*, initial time steps before are added automatically to incorporate the source signal before the observation times are calculated. """ # use custom source times if self.source_times is not None: all_times = self.source_times self.n_on = len(all_times) - 1 # initialize default times for default ideal source functions else: if self.source_type == 'j': all_times = np.linspace(self.t0_j, 0., self.n_on) all_times = np.append( all_times, np.linspace(10**(self.log_t_min - 2), 10**self.log_t_min, 101)[:-1]) if self.source_type == 'djdt': if self.shut_off and self.MP.grounding[0]: all_times = np.linspace(0., 10**(self.log_t_min - 1), 101) - 10**(self.log_t_min-1) -\ 10**(self.log_t_max+1) all_times = np.append( all_times, np.linspace(-10. + 10**(self.log_t_min - 3), -10**(self.log_t_min - 1) - 10**(self.log_t_min - 3), 100)) all_times = np.append( all_times, np.linspace(0., 10**(self.log_t_min-1), 101) - 10**(self.log_t_min-1)) self.n_on = 300 else: all_times = np.linspace(0., 10**(self.log_t_min - 1), 101) - 10**(self.log_t_min - 1) self.n_on = 100 all_times = np.append( all_times, np.linspace(10**(self.log_t_min - 3), 10**self.log_t_min, 101)[:-1]) if self.be_order == 2: self.n_on -= 1 if self.stepping_times is None: # add times for calculating the transient response if self.n_lin_steps == 0: all_times = np.append(all_times, np.logspace( self.log_t_min, self.log_t_max, self.n_log_steps)) else: log_times = np.logspace(self.log_t_min, self.log_t_max, self.n_log_steps) for jj in range(len(log_times) - 1): all_times = np.append(all_times, np.linspace( log_times[jj], log_times[jj+1], self.n_lin_steps + 1)[:-1]) all_times = np.append(all_times, log_times[-1]) else: all_times = np.append(all_times, self.stepping_times) export_flags = np.array([False for ll in range(len(all_times))]) for tt in self.times: export_flags[np.abs(all_times - tt).argmin()] = True if len(export_flags[export_flags]) != len(self.times): lp(self.MP.logger, 50, 'Number of target times', len(self.times), pre_dash=False) lp(self.MP.logger, 50, 'Number of export flags', len(export_flags[export_flags]), pre_dash=False) lp(self.MP.logger, 50, 'Error! Not all target times can be exported with this ' 'mix of linear and logarithmic time steps.\nChoose more time ' 'steps or decrease the number of target times. Aborting ...') raise SystemExit return(all_times.reshape(-1, 1), export_flags)
[docs] def gaussian(self, x, x0, alpha): """ Calculate gaussian pulse. Required arguments ------------------ - x, type numpy array time values - x0, type float central value of times values - alpha, type float stretching parameter of Dirac pulse function """ return (np.exp(-(x - x0)**2 / alpha**2)) / (alpha * np.sqrt(np.pi))
[docs] def print_stepping_info(self, dt): """ Print some information before starting the time-stepping process. Required arguments ------------------ - dt, type array vector of time-steps """ lp(self.MP.logger, 20, ' - overall number of observation times --> ' + str(len(self.times)) + ' - ', pre_dash=False) lp(self.MP.logger, 20, ' - overall number of computation time steps --> ' + str(len(dt)) + ' - ', pre_dash=False) lp(self.MP.logger, 20, '... calculating ' + str(self.n_tx) + ' transmitters ' 'for each time step ...', pre_dash=False) lp(self.MP.logger, 20, '... time-stepping will take a while ...', pre_dash=False) lp(self.MP.logger, 20, ' - choose *debug* (10) logging level to follow the time-stepping' ' process - ', pre_dash=False)
[docs] class FourierTransformBased(ApproachBaseTD): """ FE implementation of inverse Fourier-transform based approach. It calculates several tens of frequency-domain solutions with the total E-field approach and converts them to time domain afterwards. Methods ------- - build_var_form() set up attributes required in the frequency-domain variational formulations - initialize frequencies() initialize frequencies for the frequency-domain computations automatically if not given explicitly as keyword argument - calc_frequency_solutions() calculate frequency-domain solutions with total E-field approach - transform_to_time_domain() transform frequency-domain solutions to time domain - fht_hahnstein_80() use default set of 80 filter coefficients to perform the fast Hankel transformation RA approach specific parameters ------------------------------- - n_mult = 6, type int multiplyer of poles, do not change - n_poles = 2, type int number of poles to be used - pole_shift = None, type float shift poles in time; the default covered range is about 4 to 5 decades from the earliest times on; this values is adjusted automatically and it is not recommended to change it - dc_field = None, type dolfin Function the DC field is automatically calculated on the fly; for special purposes, it might be computed externally and passed here """ def __init__(self, FS, MP): """ Initialize FE class for *E_FT* approach. See description of **ApproachBaseTD** class for further information about parameters of the time-domain modeling approaches. Required arguments ------------------ - FS, type class FunctionSpaces instance - MP, type class ModelParameters instance """ # inherit default attributes and common methods from ApproachBaseTD super(self.__class__, self).__init__() self.FS = FS self.MP = MP self.dx_0 = self.FS.DOM.dx_0 self.path_td = self.MP.r_dir + '/E_FT/' + self.MP.mesh_name # initialize FT approach specific parameters self.omegas = None self.frequencies = None self.fd_mod_name = 'FT_' + self.MP.mod_name self.fd_approach = 'E_t' self.interp_mesh = None self.parallel_interpolation = False if df.MPI.rank(self.MP.mpi_cw) == 0: Path('.').joinpath( self.path_td + '/' + self.MP.mod_name +'_interpolated').mkdir( exist_ok=True, parents=True) # mute print of the following FE class attributes and attributes only # relevant for the other time-domain modeling approaches self.mute = ['FS', 'MP', 'dx_0', 'mute', 'source_type', 'be_order', 'n_lin_steps', 'n_on', 't0_j', 'pulse_width', 'source_func', 'source_times', 'stepping_times', 't_tol', 'n_mult', 'n_poles', 'pole_shift', 'dc_field']
[docs] def build_var_form(self, **fem_kwargs): """ Initialize parameters for the Fourier-transform based approach and print FE parameters. A description of the keyword arguments is given in the **ApproachBaseTD** and **FourierTransformBased**classes. """ self.update_fem_parameters(fem_kwargs) self.path_fd = (self.MP.r_dir + '/' + self.fd_approach + '/' + self.MP.mesh_name) if self.times is None: self.times = np.logspace(self.log_t_min, self.log_t_max, self.n_log_steps) self.n_times = len(self.times) if self.omegas is None and self.frequencies is None: self.initalize_frequencies() else: if self.frequencies is not None: self.omegas = self.frequencies * 2. * np.pi if self.omegas is not None: self.frequencies = self.omegas / (2. * np.pi) self.n_freqs = len(self.omegas) if len(self.MP.currents) == 1: self.MP.currents = np.tile(self.MP.currents[0], (self.n_freqs, 1)) lp(self.MP.logger, 20, '... storing *frequencies* and *times* to ' 'files in export directory ...', pre_dash=False) if df.MPI.rank(self.MP.mpi_cw) == 0: np.savetxt(self.path_td + '/' + self.MP.mod_name + '_frequencies.dat', self.frequencies) np.savetxt(self.path_td + '/' + self.MP.mod_name + '_times.dat', self.times)
[docs] def initalize_frequencies(self): """ Initialize frequencies for the frequency-domain solutions automatically based on the default 80 FHT filter coefficients. """ t0 = 10.**self.log_t_min # First time channel tmax = 10.**self.log_t_max # Last time channel nc = 80 # Hahnstein filters q = 10. ** 0.1 # Factor for increasing the time channel t_shift = 1.653801536E+03 # Factor for shift of frequency nt = 1 tmp = t0 while True: tmp = tmp * q nt += 1 if(tmp > tmax): break omega = t_shift/t0 self.omegas = [] self.frequencies = [] for iteration in range(nc + nt - 1): omega = omega/q self.omegas.append(omega) self.frequencies.append(omega / (2 * np.pi)) lp(self.MP.logger, 20, ' - using ' + str(len(self.omegas)) + ' frequencies - ', pre_dash=False)
[docs] def calc_frequency_solutions(self, auto_interpolate, repeat_frequencies, **solve_kwargs): """ Calculate frequency-domain solutions. Keyword arguments are piped to this method by the *build_var_form()* method. """ t0 = time.time() # define model M = MOD(self.fd_mod_name, self.MP.mesh_name, self.fd_approach, p=self.FS.p, overwrite_results=True, m_dir=self.MP.m_dir, r_dir=self.MP.r_dir, reuse_fs=self.FS, serial_ordering=self.MP.serial_ordering, mumps_debug=self.MP.mumps_debug) # define frequency and condcutivities M.MP.update_model_parameters( omegas=self.omegas, sigma_ground=self.MP.sigma_ground, sigma_air=self.MP.sigma_air, currents=self.MP.currents, n_domains=self.MP.n_domains) # let FEniCS set up the variational formulations M.FE.build_var_form(s_type=self.s_type, grounding=self.grounding, start=self.start, stop=self.stop, origin=self.origin, azimuth=self.azimuth, length=self.length, radius=self.radius, n_tx=self.n_tx, tx=self.tx, bc=self.bc, ip=self.ip, quasi_static=self.quasi_static, tx_topo=self.tx_topo, a_tol=self.a_tol) # call solver and convert H-fields if specified M.solve_main_problem(auto_interpolate=auto_interpolate, repeat_frequencies=repeat_frequencies, **solve_kwargs) self.assembly_time = M.FE.assembly_time self.system_size = M.FE.system_size self.FS.solution_time = time.time() - t0 lp(self.MP.logger, 20, ' - solution time for all frequencies (s) --> ' + str(self.FS.solution_time) + ' - ', pre_dash=False, post_dash=True)
[docs] def transform_to_time_domain(self, interp_mesh, EH, shut_on=True, shut_off=True, impulse=True): """ Transform frequency-domain solutions to time domain using the fast Hankel transformation routine. Required arguments ------------------ - interp_mesh, type str interpolation mesh for the chosen frequency-domain results which should be converted to time domain - EH, type str default is *'EH'*, that means both, electric and magentic fields are converted. Users can choose only *'E'* or *'H'*. Keyword arguments ----------------- - shut_on = True, type str transform to shut-on field transients or not if *False* - shut_off = True, type str transform to shut-off field transients or not if *False* - impulse = True, type str transform to impulse (dB/dt) repsonse transients or not if *False* """ self.interp_dir = self.path_fd + '/' + self.fd_mod_name + \ '_interpolated/' interp_dir = self.path_td + '/' + self.MP.mod_name + '_interpolated/' if 'rx_path' in interp_mesh: self.rx_pos = self.MP.rx[0] self.n_rx = len(self.rx_pos) else: if interp_mesh.find('line') != -1: self.interp_mesh_dir = self.MP.m_dir + '/lines' elif interp_mesh.find('slice') != -1: self.interp_mesh_dir = self.MP.m_dir + '/slices' elif interp_mesh.find('path') != -1: self.interp_mesh_dir = self.MP.m_dir + '/paths' else: lp(self.MP.logger, 50, 'Error! Interpolation target mesh name must ' 'end either with "line", "slice" or "path" so far. ' 'Aborting ...', post_dash=True) raise SystemExit lp(self.MP.logger, 20, '... transforming results to time-domain ...') if shut_off and self.MP.grounding[0]: lp(self.MP.logger, 20, '... calculating DC-field ...', pre_dash=False) self.calc_dc_field(store=True) self.interpolate_dc(interp_mesh, export_pvd=False) i_mesh = df.Mesh(self.MP.mpi_cs, self.interp_mesh_dir + '/' + interp_mesh + '.xml') self.rx_pos = i_mesh.coordinates() self.n_rx = len(self.rx_pos) rank = -1 for ti in range(self.n_tx): for ci in range(self.n_rx): rank += 1 if self.n_tx == 1: dc_name = self.interp_dir + 'E_dc_' +\ interp_mesh + '.npy' e_name = self.interp_dir + 'E_t_' + \ interp_mesh + '_rx_' + str(ci) + '.npy' h_name = self.interp_dir + 'H_t_' + \ interp_mesh + '_rx_' + str(ci) + '.npy' export_path = interp_dir else: dc_name = self.interp_dir + 'tx_' + str(ti) + \ '_E_dc_' + interp_mesh + '.npy' e_name = self.interp_dir + 'tx_' + str(ti) + '_E_t_' +\ interp_mesh + '_rx_' + str(ci) + '.npy' h_name = self.interp_dir + 'tx_' + str(ti) + '_H_t_' +\ interp_mesh + '_rx_' + str(ci) + '.npy' export_path = interp_dir + 'tx_' + str(ti) + '_' if df.MPI.rank(self.MP.mpi_cw) == rank: fd_to_td = np.zeros((self.n_times, 3)) if 'E_t' in EH or 'E' in EH: pass if shut_on: E_field = np.load(e_name) lp(self.MP.logger, 10, '... process ' + str(df.MPI.rank( self.MP.mpi_cw)) + ' transforming to E shut-on ...', pre_dash=False, root_only=False, barrier=False) for c in range(3): fd_to_td[:, c] = self.fht_hanstein_80( E_field[:, c], 1) np.save(export_path + 'E_shut_on_rx_' + str(ci) + '.npy', fd_to_td) if shut_off: E_field = np.load(e_name) lp(self.MP.logger, 10, '... process ' + str(df.MPI.rank( self.MP.mpi_cw)) + ' transforming to E shut-off ...', pre_dash=False, root_only=False, barrier=False) for c in range(3): if self.MP.grounding[ti]: dc_level = np.load(dc_name) dc_level = dc_level[ci, c] else: dc_level = None fd_to_td[:, c] = self.fht_hanstein_80( E_field[:, c], -1, dc_level=dc_level) np.save(export_path + 'E_shut_off_rx_' + str(ci) + '.npy', fd_to_td) if 'H_t' in EH or 'H' in EH: if shut_on: H_field = np.load(h_name) lp(self.MP.logger, 10, '... process ' + str(df.MPI.rank( self.MP.mpi_cw)) + ' transforming to H shut-on ...', pre_dash=False, root_only=False, barrier=False) for c in range(3): fd_to_td[:, c] = self.fht_hanstein_80( H_field[:, c], 1) np.save(export_path + 'H_shut_on_rx_' + str(ci) + '.npy', fd_to_td) if shut_off: H_field = np.load(h_name) lp(self.MP.logger, 10, '... process ' + str(df.MPI.rank( self.MP.mpi_cw)) + ' transforming to H shut-off ...', pre_dash=False, root_only=False, barrier=False) for c in range(3): fd_to_td[:, c] = self.fht_hanstein_80( H_field[:, c], -1) np.save(export_path + 'H_shut_off_rx_' + str(ci) + '.npy', fd_to_td) if impulse: H_field = np.load(h_name) lp(self.MP.logger, 10, '... process ' + str(df.MPI.rank( self.MP.mpi_cw)) + ' transforming to dH/dt ...', pre_dash=False, root_only=False, barrier=False) for c in range(3): fd_to_td[:, c] = self.fht_hanstein_80( H_field[:, c], 0) np.save(export_path + 'dH_rx_' + str(ci) + '.npy', fd_to_td) else: pass if rank == df.MPI.size(self.MP.mpi_cw) - 1: rank = -1 lp(self.MP.logger, 10, '... waiting for all mpi processes to finish ' 'serial interpolation tasks ...', pre_dash=False) df.MPI.barrier(df.MPI.comm_world)
[docs] def fht_hanstein_80(self, fd_data, modus, dc_level=None): """ Default fast Hankel transformation routine with 80 filter coefficients. Required arguments ------------------ - fd_data, type numpy array frequency-domain data, only one component - modus, type int modus switch to calculate either shut-on, shut-off or impulse response results Keyword arguments ----------------- dc_level = None, type float used to apply explicitly a calculated DC level for shut-off E-field transients calculations; otherwise, the DC level will be approximated by the fields values of the lowest frequency """ fht_coeff = [5.001828e-11, 8.000316e-11, 1.256402e-10, 2.009589e-10, 3.155941e-10, 5.047858e-10, 7.927364e-10, 1.267965e-09, 1.991264e-09, 3.184983e-09, 5.001829e-09, 8.000316e-09, 1.256403e-08, 2.009589e-08, 3.155941e-08, 5.047858e-08, 7.927363e-08, 1.267965e-07, 1.991263e-07, 3.184982e-07, 5.001826e-07, 8.000310e-07, 1.256401e-06, 2.009585e-06, 3.155931e-06, 5.047835e-06, 7.927307e-06, 1.267950e-05, 1.991228e-05, 3.184892e-05, 5.001599e-05, 7.999741e-05, 1.256258e-04, 2.009226e-04, 3.155029e-04, 5.045568e-04, 7.921612e-04, 1.266520e-03, 1.987636e-03, 3.175872e-03, 4.978954e-03, 7.942905e-03, 1.242000e-02, 1.973483e-02, 3.065536e-02, 4.821916e-02, 7.364350e-02, 1.128325e-01, 1.647498e-01, 2.348622e-01, 3.004906e-01, 3.372419e-01, 2.337323e-01, -8.647107e-02, -6.612999e-01, -8.142974e-01, 3.308580e-01, 1.402432e-00, -1.565116e-00, 8.359842e-01, -3.059907e-01, 9.125030e-02, -2.463153e-02, 6.369876e-03, -1.618934e-03, 4.085877e-04, -1.028279e-04, 2.584884e-05, -6.494904e-06, 1.631643e-06, -4.098700e-07, 1.029566e-07, -2.586174e-08, 6.496194e-09, -1.631772e-09, 4.098828e-10, -1.029579e-10, 2.586184e-11, -6.496186e-12, 1.631761e-12] nc = len(fht_coeff) td_data = np.zeros(self.n_times) if modus == -1 and dc_level is None: dc_level = fd_data[-1].real if modus in [1, -1]: # step on / off fd_data /= np.zeros(self.n_freqs) + 1j * np.array(self.omegas) # fht routine for nn in range(self.n_freqs): ita = max(1, nn - nc) ite = min(self.n_times, nn - 1) for it in range(ita - 1, ite): td_data[it] -= fd_data[nn].imag * fht_coeff[nc - nn + it - 1] # post-processing out = td_data * np.sqrt(2. / np.pi) / self.times if modus == -1: out -= dc_level return(out)
[docs] class RationalArnoldi(ApproachBaseTD): """ FE implementation of rational Arnoldi (Krylov-subspace) method after Börner (2015). Methods ------- - rational_arnoldi() runs the complete rational-arnoldi solution process as well as H-field conversion and export methods, FE modeling and time stepping related keyword arguments can be passed to this method - build_var_form() set up variational formulation and assemble mass and curl-curl matrices as well as right-hand side (source) vector - get_poles_from_table() define poles used in rational Arnoldi method - build_rational_arnoldi_basis() build Krylov subspace basis - expm_ra() utility function to calculate matrix exponentials - project_solution() calculate solution on Krylov subspace and project it back RA approach specific parameters ------------------------------- - n_mult = 6, type int multiplyer of poles, do not change - n_poles = 2, type int number of poles to be used - pole_shift = None, type float shift poles in time; the default covered range is about 4 to 5 decades from the earliest times on; this values is adjusted automatically and it is not recommended to change it - dc_field = None, type dolfin Function the DC field is automatically calculated on the fly; for special purposes, it might be computed externally and passed here """ def __init__(self, FS, MP): """ Initialize FE class for *E_RA* approach. See description of **ApproachBaseTD** class for further information about parameters of the time-domain modeling approaches. Required arguments ------------------ - FS, type class FunctionSpaces instance - MP, type class ModelParameters instance """ # inherit default attributes and common methods from ApproachBaseTD super(self.__class__, self).__init__() self.FS = FS self.MP = MP self.Solver = Solver(self.FS, self, mumps_debug=self.MP.mumps_debug) self.dx_0 = self.FS.DOM.dx_0 # initialize trial and test functions self.v = df.TestFunction(FS.V) self.u = df.TrialFunction(FS.V) # initialize RA approach specific parameters self.n_mult = 6 self.n_poles = 2 self.pole_shift = None self.dc_field = None # mute print of the following FE class attributes and attributes only # relevant for the other time-domain modeling approaches self.mute = ['FS', 'MP', 'dx_0', 'u', 'v', 'mute', 'source_type', 'be_order', 'n_lin_steps', 'n_on', 't0_j', 'pulse_width', 'source_func', 'source_times', 'stepping_times', 't_tol', 'omegas', 'path_td', 'fd_mod_name', 'Solver', 'parallel_interpolation', 'interp_mesh', 'frequencies', 'fd_approach']
[docs] def rational_arnoldi(self): """ Conduct rational Arnoldi modeling. """ self.get_poles_from_table() self.m = int(self.n_poles * self.n + 1) self.xi = np.tile(self.poles[0, :], self.n) self.xi_index = np.tile(np.arange(self.n_poles, dtype=int), self.n) t0 = time.time() lp(self.MP.logger, 20, 'Solution of main FE system(s) of equations:') self.build_krylov_subspace_basis() lp(self.MP.logger, 20, '... projecting solution ...', pre_dash=False) self.project_solution() self.FS.solution_time = time.time() - t0 lp(self.MP.logger, 20, ' - combined rational arnoldi solution time (s) --> ' + str(self.FS.solution_time) + ' - ', pre_dash=False, post_dash=True)
[docs] def build_var_form(self, **fem_kwargs): """ Build variational formulation for rational Arnoldi approach and print FE parameters. A description of the keyword arguments is given in the **ApproachBaseTD** and **RationalArnoldi** classes. """ self.update_fem_parameters(fem_kwargs) if self.times is None: self.times = np.logspace(self.log_t_min, self.log_t_max, self.n_log_steps) if self.pole_shift is None: self.pole_shift = 10**(-6 - np.log10(self.times[0])) self.n_times = len(self.times) self.n = int(12 * (self.n_mult) / self.n_poles) self.mu1 = df.Constant(1./self.MP.mu) if self.bc == 'ZD' or self.bc == 'ZeroDirichlet': boundary = DirichletBoundary() bc = df.DirichletBC(self.FS.V, df.Constant((0, 0, 0)), boundary) else: bc = None t0 = time.time() self.Assembler = TotalFieldAssembler(self, bc) self.Assembler.assemble() lp(self.MP.logger, 20, '... assembling system matrix ...', pre_dash=False) C_form = df.inner(self.MP.mu_inv_func * df.curl(self.u), df.curl(self.v)) * self.dx_0 B_form = df.inner(self.MP.sigma_func * self.u, self.v) * self.dx_0 self.B, self.C = df.PETScMatrix(), df.PETScMatrix() b, c = df.PETScVector(), df.PETScVector() RHS = df.inner(df.Constant(("0.0", "0.0", "0.0")), self.v) * self.dx_0 df.assemble_system(B_form, RHS, bc, A_tensor=self.B, b_tensor=b) df.assemble_system(C_form, RHS, bc, A_tensor=self.C, b_tensor=c) self.system_size = str(self.B.mat().getSize()[0]) self.assembly_time = time.time() - t0 lp(self.MP.logger, 20, ' - assembly time (s) --> ' + str(self.assembly_time) + ' - ', pre_dash=False)
[docs] def get_poles_from_table(self): """ Define *poles* for ration Arnoldi method. """ if self.n_poles == 1: self.poles = -np.array([[3.14e5]]) * self.pole_shift elif self.n_poles == 2: # self.poles = -np.array([[6.35e4, 7.58e6]]) # # # new poles 1st! # self.poles = -np.array([[1.6082e+03, 8.1261e+05]]) # self.n = 20 # new poles 2nd ! # self.poles = -np.array([[238.75, 4.519e+5]]) #* 0.01 # (1e-6 / 10**self.log_t_min) # self.n = 20 # new poles 3rd! self.poles = -np.array([[323.1722, 6.0128e+5]]) * self.pole_shift self.n = 24 elif self.n_poles == 3: self.poles = -np.array([[2.60e4, 6.73e5, 1.96e7]]) * self.pole_shift else: lp(self.MP.logger, 50, 'Error! Only "1, 2, 3" are valid values for "n_poles" ' 'right now. Aborting ...') raise SystemExit
[docs] def build_krylov_subspace_basis(self): """ Build the Krylov subspace basis. """ # calculate inital condition lp(self.MP.logger, 20, '... calculating initial condition ...', pre_dash=False) self.u_0 = self.Solver.solve_system_mumps(self.B, self.Assembler.b, self.FS.V, sym=True) # subtract dc field if shut off response for grounded sources if any(self.MP.grounding) and self.shut_off: if self.dc_field is None: lp(self.MP.logger, 20, '... calculating DC-field ...', pre_dash=False) self.calc_dc_field() else: lp(self.MP.logger, 20, '... importing DC-field ...', pre_dash=False) if self.n_tx > 1: lp(self.MP.logger, 50, 'Error! Import instead of on-thy-fly DC field ' 'calculation only works for n_tx=1 so far. ' 'Aborting ...') raise SystemExit self.E_dc = [df.Function(self.FS.V) for x in range(self.n_tx)] read_h5(self.MP.mpi_cw, self.E_dc[0], self.dc_field + '.h5') for ti in range(self.n_tx): if self.MP.grounding[ti]: self.u_0[ti].vector().set_local( self.u_0[ti].vector().get_local() - self.E_dc[ti].vector().get_local()) self.u_0[ti].vector().apply('insert') self.u = [[df.Function(self.FS.V) for ti in range(self.n_tx)] for j in range(self.m + 1)] w = [df.Function(self.FS.V) for ti in range(self.n_tx)] H = [np.zeros((self.m + 1, self.m)) for ti in range(self.n_tx)] for ti in range(self.n_tx): w[ti] = w[ti].vector() self.u_0[ti] = self.u_0[ti].vector() for j in range(self.m + 1): self.u[j][ti] = self.u[j][ti].vector() for ti in range(self.n_tx): self.u[0][ti] = self.u_0[ti] * 1./np.sqrt( self.u_0[ti].inner(self.B * self.u_0[ti])) solver0, solver1, solver2, solver3 = None, None, None, None lp(self.MP.logger, 20, '... building subspace basis ...') for j in range(self.m): if self.n_tx != 1 and j == 0: lp(self.MP.logger, 20, '... calculating ' + str(self.n_tx) + ' transmitters ' 'for each basis ...', pre_dash=False) lp(self.MP.logger, 10, '... calculating basis ' + str(j) + ' ...', pre_dash=False) for ti in range(self.n_tx): if j == self.m - 1: self.Solver.solver.solve(w[ti], self.C * self.u[j][ti]) elif self.xi_index[j] == 0: if solver0 is None: D0 = df.PETScMatrix(self.B.mat() - self.C.mat() * (1. / self.xi[j])) solver0 = df.PETScLUSolver(D0, "mumps") solver0.solve(w[ti], self.C * self.u[j][ti]) elif self.xi_index[j] == 1: if solver1 is None: D1 = df.PETScMatrix(self.B.mat() - self.C.mat() * (1. / self.xi[j])) solver1 = df.PETScLUSolver(D1, "mumps") solver1.solve(w[ti], self.C * self.u[j][ti]) elif self.xi_index[j] == 2: if solver2 is None: D2 = df.PETScMatrix(self.B.mat() - self.C.mat() * (1. / self.xi[j])) solver2 = df.PETScLUSolver(D2, "mumps") solver2.solve(w[ti], self.C * self.u[j][ti]) elif self.xi_index[j] == 3: if solver3 is None: D3 = df.PETScMatrix(self.B.mat() - self.C.mat() * (1. / self.xi[j])) solver3 = df.PETScLUSolver(D3, "mumps") solver3.solve(w[ti], self.C * self.u[j][ti]) H[ti][:j, j] = 0. for reo in range(2): for k in range(j+1): h = self.u[k][ti].inner(self.B*w[ti]) H[ti][k, j] += h w[ti] = w[ti] - self.u[k][ti] * h H[ti][j + 1, j] = np.sqrt(w[ti].inner(self.B*w[ti])) w[ti] *= 1./H[ti][j + 1, j] self.u[j+1][ti][:] = w[ti]
[docs] def expm_ra(self, A, E): """ Calculate matrix exponentials with eigenvalues and eigenvectors. Required arguments ------------------ - A, E, type numpy ndarray input matrices for the calculation """ DD, VV = la.eig(A, E) DD = np.diag(DD.real) return(la.solve(VV.T, np.dot(VV, np.diag(np.exp(np.diag(DD)))).T))
[docs] def project_solution(self): """ Project subspace solution back to original solution space. """ self.U = [[df.Function(self.FS.V) for ti in range(self.n_tx)] for k in range(self.n_times)] for ti in range(self.n_tx): prod = [self.C * self.u[j][ti] for j in range(self.m)] Mm = np.eye(self.m) Km = np.zeros((self.m, self.m)) for i in range(self.m): for j in range(self.m): Km[i, j] = self.u[i][ti].inner(prod[j]) proj = np.sqrt(self.u_0[ti].inner(self.B * self.u_0[ti])) * \ np.eye(self.m, 1) Um = np.zeros((self.m, self.n_times)) if df.MPI.rank(self.MP.mpi_cw) == 0: for k in range(self.n_times): dt = self.times[k] try: tmp = np.dot(self.expm_ra(-dt * Km, Mm), proj).ravel() except ValueError: tmp = np.nan Um[:, k] = tmp else: pass Um = self.MP.mpi_cw.bcast(Um, root=0) for j in range(self.m): for k in range(self.n_times): self.U[k][ti].vector()[:] += self.u[j][ti].get_local() * \ Um[j, k]