Source code for custEM.core.post_proc_fd

# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""

import dolfin as df
import os
from custEM.misc import logger_print as lp
from custEM.misc import write_h5
from custEM.core import Solver
from petsc4py import PETSc
from dolfin import as_backend_type



[docs] class PostProcessingFD: """ Frequency-domain PostProcessing class for computing and exporting electric and magnetic fields, called from MOD instance. Methods ------- - convert_results() convert main solution to other field quantities, e.g., E to H - export_E_fields() export total and/or secondary electric field data - export_H_fields() export total and/or secondary magnetic field data - export_A_fields() export total and/or secondary potential 'field' data - export_all_results() export all available EM-field and/or potential data - save_field() utility function for writing data on hard drive - convert_H_to_E() convert magnetic fields obtained with magnetic field or potential approaches to electric fields - dc_post_proc() conduct post-porcessing for direct current DC modeling approach """ def __init__(self, FE, export_domains,solvingapproach): """ Initializes instance for converting and exporting post-processing quantities and saves the domains of the mesh to '... Domains.pvd'. Required arguments ------------------ - FE, type class FE approach (e.g., **E_vector**) instance - export_domains, type bool specify if mesh with domains should be exported or not """ self.FS = FE.FS self.MP = FE.MP self.FE = FE self.export_dir = self.MP.out_dir + '/' + self.MP.mod_name self.fi = 0 self.solvingapproach=solvingapproach lp(self.MP.logger, 20, ' - {:<22}'.format('export directory') + ' = ' + self.MP.out_dir + ' - ', pre_dash=False) lp(self.MP.logger, 20, ' - {:<22}'.format('export name') + ' = ' + self.MP.mod_name + '_*...* - ', pre_dash=False) if export_domains: lp(self.MP.logger, 20, '... exporting domain pvd-file ' 'for model validation ...', pre_dash=False) FE.FS.DOM.domain_func.rename('Marker', '') df.File(self.export_dir + "_Domains.pvd") << FE.FS.DOM.domain_func self.dx_0 = self.FS.DOM.dx_0 self.Solver = Solver(self.FS, self.FE, self.FE.MP.mumps_debug, self.FE.MP.serial_ordering) if self.MP.approach[0] in ['A', 'F']: self.Solver_cg = Solver(self.FS, self.FE, self.FE.MP.mumps_debug, self.FE.MP.serial_ordering)
[docs] def convert_results(self, fi=0, convert_to_H=True, convert_to_E=False, export_cg=False, export_pvd=True, export_nedelec=True, **dummy_kwargs): """ Automatically convert results from E-fields to H-fields or vice versa or from potentials to E or H, depending on the utilized approach. Keyword arguments ----------------- - fi = 0, type int integer specifiying to which frequency the current solution belongs - convert_to_H = True, type bool calculate H-fields from E-field or potential approaches - convert_to_E = True, type bool calculate E-fields, if H-field or F-U approach was used - export_cg = False, type bool set **True** for exporting calculated data on Lagrange spaces - export_pvd = True, type bool set **True** to export fields in *.pvd* format for *Paraview* - export_nedelec = True, type bool set **True** for exporting calculated data on a Nedelec spaces """ self.fi = fi om = df.Constant(self.MP.omegas[fi]) om1 = df.Constant(1./self.MP.omegas[fi]) self.H_t_r, self.H_t_i = [], [] self.H_t_r_cg, self.H_t_i_cg = [], [] self.E_t_r, self.E_t_i = [], [] self.E_t_r_cg, self.E_t_i_cg = [], [] self.A_t_r, self.A_t_i = [], [] self.A_t_r_cg, self.A_t_i_cg = [], [] self.Phi_r, self.Phi_i = [], [] self.F_s_r, self.F_s_i = [], [] self.Om_r, self.Om_i = [], [] # initialize constant inner(u*v) system matrix for postprocessing if not hasattr(self, 'v_ned'): # on Nedelec-space for E/H field approaches self.A_ned, v_dummy_ned = df.PETScMatrix(), df.PETScVector() u_ned = df.TrialFunction(self.FS.V) self.v_ned = df.TestFunction(self.FS.V) df.assemble_system(df.inner(u_ned, self.v_ned) * self.dx_0, df.inner(df.Constant(('0', '0', '0')), self.v_ned) * self.dx_0, A_tensor=self.A_ned, b_tensor=v_dummy_ned) # also on vector-Lagrange-space for potential approaches if self.MP.approach[0] in ['A', 'F']: self.A_cg, v_dummy_cg = df.PETScMatrix(), df.PETScVector() u_cg = df.TrialFunction(self.FS.V_cg) self.v_cg = df.TestFunction(self.FS.V_cg) df.assemble_system(df.inner(u_cg, self.v_cg) * self.dx_0, df.inner(df.Constant(('0', '0', '0')), self.v_cg) * self.dx_0, A_tensor=self.A_cg, b_tensor=v_dummy_cg) if self.MP.approach == 'DC': self.dc_post_proc() return if type(self.FS.U) is not list: self.FS.U = [self.FS.U] lp(self.MP.logger, 30, 'Warning! Currently only processing first Tx in postproc.\n' 'Multiple Tx are currently not supported for all approaches. ' 'Continuing ...') if '_t' in self.MP.approach or self.MP.approach == 'MT': if 'E' in self.MP.approach or 'mt' in self.MP.approach.lower(): for ti in range(self.FE.n_tx): E_t_r, E_t_i = self.FS.U[ti].split(True) self.E_t_r.append(E_t_r) self.E_t_i.append(E_t_i) if export_cg or export_pvd: self.E_t_r_cg.append( df.interpolate(E_t_r, self.FS.V_cg)) self.E_t_i_cg.append( df.interpolate(E_t_i, self.FS.V_cg)) elif 'H' in self.MP.approach: for ti in range(self.FE.n_tx): H_t_r, H_t_i = self.FS.U[ti].split(True) self.H_t_r.append(H_t_r) self.H_t_i.append(H_t_i) if export_cg or export_pvd: self.H_t_r_cg.append( df.interpolate(H_t_r, self.FS.V_cg)) self.H_t_i_cg.append( df.interpolate(H_t_i, self.FS.V_cg)) # need to implement correct post-processing here if required # lp(self.MP.logger, 30, 'Warning! Calculating E with H_t-field approach is not ' 'implemented right now - setting E to ZERO function. ' 'Continuing ...') if convert_to_E: for ti in range(self.FE.n_tx): self.E_t_r.append(df.Function(self.FS.V)) self.E_t_i.append(df.Function(self.FS.V)) if export_cg or export_pvd: self.E_t_r_cg.append(df.Function(self.FS.V_cg)) self.E_t_i_cg.append(df.Function(self.FS.V_cg)) self.export_E_fields(export_pvd, export_cg, export_nedelec) else: b_real, b_imag = [], [] for ti in range(self.FE.n_tx): if 'An' in self.MP.approach: self.A_t_r_cg.append(df.Function(self.FS.V_cg)) self.A_t_i_cg.append(df.Function(self.FS.V_cg)) (A_x_r, A_y_r, A_z_r, Phi_r, A_x_i, A_y_i, A_z_i, Phi_i) = self.FS.U[ti].split(True) df.assign(self.A_t_r_cg[ti].sub(0), A_x_r) df.assign(self.A_t_i_cg[ti].sub(0), A_x_i) df.assign(self.A_t_r_cg[ti].sub(1), A_y_r) df.assign(self.A_t_i_cg[ti].sub(1), A_y_i) df.assign(self.A_t_r_cg[ti].sub(2), A_z_r) df.assign(self.A_t_i_cg[ti].sub(2), A_z_i) self.Phi_r.append(Phi_r) self.Phi_i.append(Phi_i) self.A_t_r.append(df.interpolate(self.A_t_r_cg[ti], self.FS.V)) self.A_t_i.append(df.interpolate(self.A_t_i_cg[ti], self.FS.V)) elif 'Am' in self.MP.approach: (A_t_r, A_t_i, Ph_r, Ph_i) = self.FS.U[ti].split(True) self.A_t_r.append(A_t_r) self.A_t_i.append(A_t_i) self.Phi_r.append(Ph_r) self.Phi_i.append(Ph_i) self.A_t_r_cg.append(df.interpolate(self.A_t_r[ti], self.FS.V_cg)) self.A_t_i_cg.append(df.interpolate(self.A_t_i[ti], self.FS.V_cg)) b_real.append(df.assemble( -om * df.inner(self.A_t_i_cg[ti], self.v_cg) * self.dx_0 - om * df.inner(df.grad(self.Phi_i[ti]), self.v_cg) * self.dx_0)) b_imag.append(df.assemble( om * df.inner(self.A_t_r_cg[ti], self.v_cg) * self.dx_0 + om * df.inner(df.grad(self.Phi_r[ti]), self.v_cg) * self.dx_0)) if not hasattr(self.Solver_cg, 'solver'): self.E_t_r_cg = self.Solver_cg.solve_system_mumps( self.A_cg, b_real, self.FS.V_cg, sym=True) else: self.E_t_r_cg = self.Solver_cg.solve_system_mumps( self.A_cg, b_real, self.FS.V_cg, sym=True, first_call=False) self.E_t_i_cg = self.Solver_cg.solve_system_mumps( self.A_cg, b_imag, self.FS.V_cg, sym=True, first_call=False) for ti in range(self.FE.n_tx): self.E_t_r.append(df.interpolate(self.E_t_r_cg[ti], self.FS.V)) self.E_t_i.append(df.interpolate(self.E_t_i_cg[ti], self.FS.V)) if '_s' in self.MP.approach: if self.FS.p != self.FS.PF.pf_p: lp(self.MP.logger, 20, '... converting primary fields from p2 to p1 or vice ' 'versa\n for a conform summation of vectors ...') self.FS.PF.E_0_r_cg2, self.FS.PF.E_0_i_cg2 = [], [] self.FS.PF.H_0_r_cg2, self.FS.PF.H_0_i_cg2 = [], [] for ti in range(self.FE.n_tx): self.FS.PF.E_0_i_cg2.append(df.interpolate( self.FS.PF.E_0_i_cg[ti], self.FS.V_cg)) self.FS.PF.E_0_r_cg2.append(df.interpolate( self.FS.PF.E_0_r_cg[ti], self.FS.V_cg)) self.FS.PF.H_0_i_cg2.append(df.interpolate( self.FS.PF.H_0_i_cg[ti], self.FS.V_cg)) self.FS.PF.H_0_r_cg2.append(df.interpolate( self.FS.PF.H_0_r_cg[ti], self.FS.V_cg)) self.FS.PF.E_0_r_cg = self.FS.PF.E_0_r_cg2 self.FS.PF.E_0_i_cg = self.FS.PF.E_0_i_cg2 self.FS.PF.H_0_r_cg = self.FS.PF.H_0_r_cg2 self.FS.PF.H_0_i_cg = self.FS.PF.H_0_i_cg2 if '_s' in self.MP.approach and self.FS.anom_flag: self.H_s_r, self.H_s_i = [], [] self.H_s_r_cg, self.H_s_i_cg = [], [] self.E_s_r, self.E_s_i = [], [] self.E_s_r_cg, self.E_s_i_cg = [], [] self.A_s_r, self.A_s_i = [], [] self.A_s_r_cg, self.A_s_i_cg = [], [] self.F_s_r, self.F_s_i = [], [] self.F_s_r_cg, self.F_s_i_cg = [], [] if 'E' in self.MP.approach or 'mt' in self.MP.approach.lower(): # hacked if self.FS.PF.nedelec_pf: self.FS.PF.E_0_r_cg = [] self.FS.PF.E_0_i_cg = [] # hacked end for ti in range(self.FE.n_tx): E_s_r, E_s_i = self.FS.U[ti].split(True) self.E_s_r.append(E_s_r) self.E_s_i.append(E_s_i) # hacked if self.FS.PF.nedelec_pf: self.FS.PF.E_0_r_cg.append(df.interpolate( self.FS.PF.E_0_r[ti], self.FS.V_cg)) self.FS.PF.E_0_i_cg.append(df.interpolate( self.FS.PF.E_0_i[ti], self.FS.V_cg)) # hacked end if export_cg or export_pvd: self.E_s_r_cg.append(df.interpolate(self.E_s_r[ti], self.FS.V_cg)) self.E_s_i_cg.append(df.interpolate(self.E_s_i[ti], self.FS.V_cg)) self.E_t_r_cg.append(df.Function(self.FS.V_cg)) self.E_t_i_cg.append(df.Function(self.FS.V_cg)) self.E_t_r_cg[ti].vector()[:] = \ self.E_s_r_cg[ti].vector().get_local() + \ self.FS.PF.E_0_r_cg[ti].vector().get_local() self.E_t_i_cg[ti].vector()[:] = \ self.E_s_i_cg[ti].vector().get_local() + \ self.FS.PF.E_0_i_cg[ti].vector().get_local() E_0_r = df.interpolate(self.FS.PF.E_0_r_cg[ti], self.FS.V) E_0_i = df.interpolate(self.FS.PF.E_0_i_cg[ti], self.FS.V) self.E_t_r.append(df.Function(self.FS.V)) self.E_t_i.append(df.Function(self.FS.V)) self.E_t_r[ti].vector()[:] = E_0_r.vector().get_local() +\ E_s_r.vector().get_local() self.E_t_i[ti].vector()[:] = E_0_i.vector().get_local() +\ E_s_i.vector().get_local() elif 'H' in self.MP.approach: for ti in range(self.FE.n_tx): H_s_r, H_s_i = self.FS.U[ti].split(True) self.H_s_r.append(H_s_r) self.H_s_i.append(H_s_i) if export_cg or export_pvd: self.H_s_r_cg.append(df.interpolate(self.H_s_r[ti], self.FS.V_cg)) self.H_s_i_cg.append(df.interpolate(self.H_s_i[ti], self.FS.V_cg)) self.H_t_r_cg.append(df.Function(self.FS.V_cg)) self.H_t_i_cg.append(df.Function(self.FS.V_cg)) if self.FS.PF.nedelec_pf: H_0_r_cg = df.interpolate(self.FS.PF.H_0_r[ti], self.FS.V_cg) H_0_i_cg = df.interpolate(self.FS.PF.H_0_i[ti], self.FS.V_cg) else: H_0_r_cg = self.FS.PF.H_0_r_cg[ti] H_0_i_cg = self.FS.PF.H_0_i_cg[ti] self.H_t_r_cg[ti].vector()[:] = \ self.H_s_r_cg[ti].vector().get_local() + \ H_0_r_cg.vector().get_local() self.H_t_i_cg[ti].vector()[:] = \ self.H_s_i_cg[ti].vector().get_local() + \ H_0_i_cg.vector().get_local() if self.FS.PF.nedelec_pf: H_0_r = df.interpolate(self.FS.PF.H_0_r[ti], self.FS.V) H_0_i = df.interpolate(self.FS.PF.H_0_i[ti], self.FS.V) else: H_0_r = df.interpolate(self.FS.PF.H_0_r_cg[ti], self.FS.V) H_0_i = df.interpolate(self.FS.PF.H_0_i_cg[ti], self.FS.V) self.H_t_r.append(df.Function(self.FS.V)) self.H_t_i.append(df.Function(self.FS.V)) self.H_t_r[ti].vector()[:] = H_0_r.vector()[:] +\ H_s_r.vector()[:] self.H_t_i[ti].vector()[:] = H_0_i.vector()[:] +\ H_s_i.vector()[:] self.export_H_fields(export_pvd, export_cg, export_nedelec) if convert_to_E: self.convert_H_to_E() self.export_E_fields(export_pvd, export_cg, export_nedelec) else: b_real, b_imag = [], [] for ti in range(self.FE.n_tx): if 'An' in self.MP.approach: self.A_s_r_cg.append(df.Function(self.FS.V_cg)) self.A_s_i_cg.append(df.Function(self.FS.V_cg)) (A_x_r, A_y_r, A_z_r, Phi_r, A_x_i, A_y_i, A_z_i, Phi_i) = self.FS.U[ti].split(True) df.assign(self.A_s_r_cg[ti].sub(0), A_x_r) df.assign(self.A_s_i_cg[ti].sub(0), A_x_i) df.assign(self.A_s_r_cg[ti].sub(1), A_y_r) df.assign(self.A_s_i_cg[ti].sub(1), A_y_i) df.assign(self.A_s_r_cg[ti].sub(2), A_z_r) df.assign(self.A_s_i_cg[ti].sub(2), A_z_i) self.Phi_r.append(Phi_r) self.Phi_i.append(Phi_i) A_s_r = df.interpolate(self.A_s_r_cg[ti], self.FS.V) A_s_i = df.interpolate(self.A_s_i_cg[ti], self.FS.V) self.A_s_r.append(A_s_r) self.A_s_i.append(A_s_i) elif 'Am' in self.MP.approach: (A_s_r, A_s_i, Ph_r, Ph_i) = self.FS.U[ti].split(True) self.A_s_r.append(A_s_r) self.A_s_i.append(A_s_i) self.Phi_r.append(Ph_r) self.Phi_i.append(Ph_i) self.A_s_r_cg.append(df.interpolate(self.A_s_r[ti], self.FS.V_cg)) self.A_s_i_cg.append(df.interpolate(self.A_s_i[ti], self.FS.V_cg)) elif 'Fm' in self.MP.approach: (F_s_r, F_s_i, Om_r, Om_i) = self.FS.U[ti].split(True) self.F_s_r.append(F_s_r) self.F_s_i.append(F_s_i) self.Om_r.append(Om_r) self.Om_i.append(Om_i) self.F_s_r_cg.append(df.interpolate(self.F_s_r[ti], self.FS.V_cg)) self.F_s_i_cg.append(df.interpolate(self.F_s_i[ti], self.FS.V_cg)) if 'A' in self.MP.approach: b_real.append(df.assemble( -om * df.inner(self.A_s_i_cg[ti], self.v_cg) * self.dx_0 - om * df.inner(df.grad(self.Phi_i[ti]), self.v_cg) * self.dx_0)) b_imag.append(df.assemble( om * df.inner(self.A_s_r_cg[ti], self.v_cg) * self.dx_0 + om * df.inner(df.grad(self.Phi_r[ti]), self.v_cg) * self.dx_0)) elif 'Fm' in self.MP.approach: b_real.append(df.assemble( df.inner(self.F_s_r_cg[ti], self.v_cg) * self.dx_0 + df.inner(df.grad(self.Om_r[ti]), self.v_cg) * self.dx_0)) b_imag.append(df.assemble( df.inner(self.F_s_i_cg[ti], self.v_cg) * self.dx_0 + df.inner(df.grad(self.Om_i[ti]), self.v_cg) * self.dx_0)) if 'A' in self.MP.approach: if not hasattr(self.Solver_cg, 'solver'): self.E_s_r_cg = self.Solver_cg.solve_system_mumps( self.A_cg, b_real, self.FS.V_cg, sym=True) else: self.E_s_r_cg = self.Solver_cg.solve_system_mumps( self.A_cg, b_real, self.FS.V_cg, sym=True, first_call=False) self.E_s_i_cg = self.Solver_cg.solve_system_mumps( self.A_cg, b_imag, self.FS.V_cg, sym=True, first_call=False) for ti in range(self.FE.n_tx): self.E_t_r_cg.append(df.Function(self.FS.V_cg)) self.E_t_i_cg.append(df.Function(self.FS.V_cg)) self.A_t_r_cg.append(df.Function(self.FS.V_cg)) self.A_t_i_cg.append(df.Function(self.FS.V_cg)) self.E_t_r_cg[ti].vector()[:] = ( self.E_s_r_cg[ti].vector().get_local() + self.FS.PF.E_0_r_cg[ti].vector().get_local()) self.E_t_i_cg[ti].vector()[:] = ( self.E_s_i_cg[ti].vector().get_local() + self.FS.PF.E_0_i_cg[ti].vector().get_local()) self.A_t_r_cg[ti].vector()[:] = ( self.A_s_r_cg[ti].vector().get_local() + self.FS.PF.E_0_i_cg[ti].vector().get_local() * om1) self.A_t_i_cg[ti].vector()[:] = ( self.A_s_i_cg[ti].vector().get_local() + self.FS.PF.E_0_r_cg[ti].vector().get_local() * om1) self.E_t_r.append(df.interpolate(self.E_t_r_cg[ti], self.FS.V)) self.E_t_i.append(df.interpolate(self.E_t_i_cg[ti], self.FS.V)) self.E_s_r.append(df.interpolate(self.E_s_r_cg[ti], self.FS.V)) self.E_s_i.append(df.interpolate(self.E_s_i_cg[ti], self.FS.V)) elif 'Fm' in self.MP.approach: if not hasattr(self.Solver_cg, 'solver'): self.H_s_r_cg = self.Solver_cg.solve_system_mumps( self.A_cg, b_real, self.FS.V_cg, sym=True) else: self.H_s_r_cg = self.Solver_cg.solve_system_mumps( self.A_cg, b_real, self.FS.V_cg, sym=True, first_call=False) self.H_s_i_cg = self.Solver_cg.solve_system_mumps( self.A_cg, b_imag, self.FS.V_cg, sym=True, first_call=False) for ti in range(self.FE.n_tx): self.H_t_r_cg.append(df.Function(self.FS.V_cg)) self.H_t_i_cg.append(df.Function(self.FS.V_cg)) self.H_t_r_cg[ti].vector()[:] = ( self.H_s_r_cg[ti].vector().get_local( ) + self.FS.PF.H_0_r_cg[ti].vector().get_local()) self.H_t_i_cg[ti].vector()[:] = ( self.H_s_i_cg[ti].vector().get_local( ) + self.FS.PF.H_0_i_cg[ti].vector().get_local()) self.H_t_r.append(df.interpolate(self.H_t_r_cg[ti], self.FS.V)) self.H_t_i.append(df.interpolate(self.H_t_i_cg[ti], self.FS.V)) self.H_s_r.append(df.interpolate(self.H_s_r_cg[ti], self.FS.V)) self.H_s_i.append(df.interpolate(self.H_s_i_cg[ti], self.FS.V)) self.export_H_fields(export_pvd, export_cg, export_nedelec) if convert_to_E: self.convert_H_to_E(from_potential=True) self.export_E_fields(export_pvd, export_cg, export_nedelec) if '_s' in self.MP.approach and not self.FS.anom_flag: for ti in range(self.FE.n_tx): lp(self.MP.logger, 20, ' - no anomaly --> primary fields exported - ', pre_dash=False) # # # avoid numerical issues with pyhed primary fields # # # if not self.FS.PF.nedelec_pf: self.FS.PF.E_0_r_cg[ti].vector()[:] += 1e-32 self.FS.PF.E_0_i_cg[ti].vector()[:] += 1e-32 self.FS.PF.H_0_r_cg[ti].vector()[:] += 1e-32 self.FS.PF.H_0_i_cg[ti].vector()[:] += 1e-32 self.E_t_r_cg.append(self.FS.PF.E_0_r_cg[ti]) self.E_t_i_cg.append(self.FS.PF.E_0_i_cg[ti]) self.H_t_r_cg.append(self.FS.PF.H_0_r_cg[ti]) self.H_t_i_cg.append(self.FS.PF.H_0_i_cg[ti]) if export_nedelec: lp(self.MP.logger, 20, '... interpolating primary fields on Nedelec ' 'spaces ...', pre_dash=False) self.E_t_r.append( df.interpolate(self.FS.PF.E_0_r_cg[ti], self.FS.V)) self.E_t_i.append( df.interpolate(self.FS.PF.E_0_i_cg[ti], self.FS.V)) self.H_t_r.append( df.interpolate(self.FS.PF.H_0_r_cg[ti], self.FS.V)) self.H_t_i.append( df.interpolate(self.FS.PF.H_0_i_cg[ti], self.FS.V)) # # # no anomaly support for custEM primary fields # # # else: self.E_t_r.append(df.interpolate(self.FS.PF.E_0_r[ti], self.FS.V)) self.E_t_i.append(df.interpolate(self.FS.PF.E_0_i[ti], self.FS.V)) self.E_t_r_cg.append( df.interpolate(self.FS.PF.E_0_r[ti], self.FS.V_cg)) self.E_t_i_cg.append( df.interpolate(self.FS.PF.E_0_i[ti], self.FS.V_cg)) self.H_t_r_cg.append( df.interpolate(self.FS.PF.H_0_r[ti], self.FS.V_cg)) self.H_t_i_cg.append( df.interpolate(self.FS.PF.H_0_i[ti], self.FS.V_cg)) # self.E_t_r = self.FS.PF.E_0_r # self.E_t_i = self.FS.PF.E_0_i # self.H_t_r = self.FS.PF.H_0_r # self.H_t_i = self.FS.PF.H_0_i self.export_H_fields(export_pvd, True, export_nedelec) self.export_E_fields(export_pvd, True, export_nedelec) if 'H' in self.MP.approach or 'F' in self.MP.approach: pass else: self.export_E_fields(export_pvd, export_cg, export_nedelec) if convert_to_H and self.MP.approach[0] not in ['H', 'F']: lp(self.MP.logger, 20, '... deriving H-fields from E ...', pre_dash=False) b_real, b_imag = [], [] if '_t' in self.MP.approach or self.MP.approach == 'MT': for ti in range(self.FE.n_tx): if 'E' in self.MP.approach or \ 'mt' in self.MP.approach.lower(): RHS1 = -om1 * df.inner( self.MP.mu_inv_func * self.E_t_i[ti], df.curl(self.v_ned)) * self.dx_0 RHS2 = om1 * df.inner( self.MP.mu_inv_func * self.E_t_r[ti], df.curl(self.v_ned)) * self.dx_0 elif 'Am' in self.MP.approach: RHS1 = -df.inner( self.MP.mu_inv_func * self.A_t_r[ti], df.curl(self.v_ned)) * self.dx_0 RHS2 = -df.inner( self.MP.mu_inv_func * self.A_t_i[ti], df.curl(self.v_ned)) * self.dx_0 elif 'An' in self.MP.approach: RHS1 = -df.inner( (1. / self.MP.mu) * self.A_t_r[ti], df.curl(self.v_ned)) * self.dx_0 RHS2 = -df.inner( (1. / self.MP.mu) * self.A_t_i[ti], df.curl(self.v_ned)) * self.dx_0 b_real.append(df.assemble(RHS1)) b_imag.append(df.assemble(RHS2)) if self.solvingapproach=='H_direct': if not hasattr(self.Solver, 'solver'): self.H_t_r = self.Solver.solve_system_mumps( self.A_ned, b_real, self.FS.V, sym=True) else: self.H_t_r = self.Solver.solve_system_mumps( self.A_ned, b_real, self.FS.V, sym=True, first_call=False) self.H_t_i = self.Solver.solve_system_mumps( self.A_ned, b_imag, self.FS.V, sym=True, first_call=False) elif self.solvingapproach=='H_iterative': if not hasattr(self.Solver, 'solver'): self.H_t_r=self.Solver.solve_system_iter_Hfield( self.A_ned,b_real,self.FS.V) else: self.H_t_r=self.Solver.solve_system_iter_Hfield( self.A_ned,b_real,self.FS.V) self.H_t_i=self.Solver.solve_system_iter_Hfield( self.A_ned,b_imag,self.FS.V) else: lp(self.logger, 50, "Unknown flag. Abort simulation!") raise SystemExit if export_cg or export_pvd: for ti in range(self.FE.n_tx): self.H_t_r_cg.append(df.interpolate(self.H_t_r[ti], self.FS.V_cg)) self.H_t_i_cg.append(df.interpolate(self.H_t_i[ti], self.FS.V_cg)) if '_s' in self.MP.approach and self.FS.anom_flag is True: for ti in range(self.FE.n_tx): if 'E' in self.MP.approach or 'mt' in \ self.MP.approach.lower(): RHS1 = -om1 * df.inner( self.MP.mu_inv_func * self.E_s_i[ti], df.curl(self.v_ned)) * self.dx_0 RHS2 = om1 * df.inner( self.MP.mu_inv_func * self.E_s_r[ti], df.curl(self.v_ned)) * self.dx_0 elif 'Am' in self.MP.approach: RHS1 = -df.inner( self.MP.mu_inv_func * self.A_s_r[ti], df.curl(self.v_ned)) * self.dx_0 RHS2 = -df.inner( self.MP.mu_inv_func * self.A_s_i[ti], df.curl(self.v_ned)) * self.dx_0 elif 'An' in self.MP.approach: RHS1 = -df.inner( (1. / self.MP.mu) * self.A_s_r[ti], df.curl(self.v_ned)) * self.dx_0 RHS2 = -df.inner( (1. / self.MP.mu) * self.A_s_i[ti], df.curl(self.v_ned)) * self.dx_0 b_real.append(df.assemble(RHS1)) b_imag.append(df.assemble(RHS2)) if self.solvingapproach=='H_direct': if not hasattr(self.Solver, 'solver'): self.H_t_r = self.Solver.solve_system_mumps( self.A_ned, b_real, self.FS.V, sym=True) else: self.H_t_r = self.Solver.solve_system_mumps( self.A_ned, b_real, self.FS.V, sym=True, first_call=False) self.H_t_i = self.Solver.solve_system_mumps( self.A_ned, b_imag, self.FS.V, sym=True, first_call=False) elif self.solvingapproach=='H_iterative': if not hasattr(self.Solver, 'solver'): self.H_t_r=self.Solver.solve_system_iter_Hfield( self.A_ned,b_real,self.FS.V) else: self.H_t_r=self.Solver.solve_system_iter_Hfield( self.A_ned,b_real,self.FS.V) self.H_t_i=self.Solver.solve_system_iter_Hfield( self.A_ned,b_imag,self.FS.V) else: lp(self.logger, 50, "Unknown flag. Abort simulation!") raise SystemExit for ti in range(self.FE.n_tx): H_0_r = df.interpolate(self.FS.PF.H_0_r_cg[ti], self.FS.V) H_0_i = df.interpolate(self.FS.PF.H_0_i_cg[ti], self.FS.V) self.H_t_r.append(df.Function(self.FS.V)) self.H_t_i.append(df.Function(self.FS.V)) self.H_t_r[ti].vector()[:] = H_0_r.vector().get_local() +\ self.H_s_r[ti].vector().get_local() self.H_t_i[ti].vector()[:] = H_0_i.vector().get_local() +\ self.H_s_i[ti].vector().get_local() if export_cg or export_pvd: self.H_s_r_cg.append(df.interpolate(self.H_s_r[ti], self.FS.V_cg)) self.H_s_i_cg.append(df.interpolate(self.H_s_i[ti], self.FS.V_cg)) self.H_t_r_cg.append(df.Function(self.FS.V_cg)) self.H_t_i_cg.append(df.Function(self.FS.V_cg)) self.H_t_r_cg[ti].vector()[:] = \ self.H_s_r_cg[ti].vector().get_local() + \ self.FS.PF.H_0_r_cg[ti].vector().get_local() self.H_t_i_cg[ti].vector()[:] = \ self.H_s_i_cg[ti].vector().get_local() + \ self.FS.PF.H_0_i_cg[ti].vector().get_local() self.export_H_fields(export_pvd, export_cg, export_nedelec)
[docs] def export_E_fields(self, export_pvd, export_cg, export_nedelec): """ Export electric fields using specified data (*xml/h5*) and/or *pvd* format from either Nedelec space or Lagrange VectorFunctionSpace. Required arguments ------------------ - export_pvd, type bool specify if fields are also exported in *.pvd* format for *Paraview* - export_cg, type bool specify if fields in data format should be exported directly after conversion - export_nedelec = True, type bool set **True** for exporting caluclated data on a NedelecSpace """ lp(self.MP.logger, 20, '... exporting E-fields ...', pre_dash=False) try: self.E_t_r_cg except AttributeError: lp(self.MP.logger, 30, 'Warning! "E_t_r_cg" not found in "PP". Continuing ...') pass else: self.save_field('_E_t', self.E_t_r_cg, self.E_t_i_cg, export_pvd, export_cg, export_nedelec=False) try: self.E_t_r except AttributeError: lp(self.MP.logger, 30, 'Warning! "E_t_r" not found in "PP". Continuing ...') pass else: self.save_field('_E_t', self.E_t_r, self.E_t_i, False, False, export_nedelec=export_nedelec) try: self.E_s_r_cg except AttributeError: if '_s' in self.FE.FS.approach: lp(self.MP.logger, 30, 'Warning! "E_s_r_cg" not found in "PP". Continuing ...') pass else: self.save_field('_E_s', self.E_s_r_cg, self.E_s_i_cg, export_pvd, export_cg, export_nedelec=False) try: self.E_s_r except AttributeError: if '_s' in self.FE.FS.approach: lp(self.MP.logger, 30, 'Warning! "E_s_r" not found in "PP". Continuing ...') pass else: self.save_field('_E_s', self.E_s_r, self.E_s_i, False, False, export_nedelec=export_nedelec)
[docs] def export_H_fields(self, export_pvd, export_cg, export_nedelec): """ Export magnetic fields using specified data (*xml/h5*) and/or *pvd* format from either Nedelec space or Lagrange VectorFunctionSpace. Required arguments ------------------ - export_pvd, type bool specify if fields are also exported in *.pvd* format for *Paraview* - export_cg, type bool specify if fields in data format should be exported directly after conversion - export_nedelec = True, type bool set **True** for exporting caluclated data on a Nedelec space """ lp(self.MP.logger, 20, '... exporting H-fields ...', pre_dash=False) try: self.H_t_r_cg except AttributeError: lp(self.MP.logger, 30, 'Warning! "H_t_r_cg" not found in "PP". Continuing ...') pass else: self.save_field('_H_t', self.H_t_r_cg, self.H_t_i_cg, export_pvd, export_cg, export_nedelec=False) try: self.H_t_r except AttributeError: lp(self.MP.logger, 30, 'Warning! "H_t_r" not found in "PP". Continuing ...') pass else: self.save_field('_H_t', self.H_t_r, self.H_t_i, False, False, export_nedelec=export_nedelec) try: self.H_s_r_cg except AttributeError: if '_s' in self.FE.FS.approach: lp(self.MP.logger, 30, 'Warning! "H_s_r_cg" not found in "PP". Continuing ...') pass else: self.save_field('_H_s', self.H_s_r_cg, self.H_s_i_cg, export_pvd, export_cg, export_nedelec=False) try: self.H_s_r except AttributeError: if '_s' in self.FE.FS.approach: lp(self.MP.logger, 30, 'Warning! "H_s_r" not found in "PP". Continuing ...') pass else: self.save_field('_H_s', self.H_s_r, self.H_s_i, False, False, export_nedelec=export_nedelec)
[docs] def export_A_fields(self, export_pvd, export_cg, export_nedelec): """ Export potentials using specified data (*xml/h5*) and/or *pvd* format from either Nedelec space or Lagrange VectorFunctionSpace. Required arguments ------------------ - export_cg, type bool specify if fields in data format should be exported directly after conversion - export_pvd, type bool specify if fields are also exported in *.pvd* format for *Paraview* - export_nedelec = True, type bool set **True** for exporting caluclated data on a Nedelec space """ lp(self.MP.logger, 20, '... exporting A-potentials ...', pre_dash=False) try: self.A_t_r_cg except AttributeError: lp(self.MP.logger, 30, 'Warning! "A_t_r_cg" not found in "PP". Continuing ...') pass else: self.save_field('_A_t', self.A_t_r_cg, self.A_t_i_cg, export_pvd, export_cg, export_nedelec=False) try: self.A_t_r except AttributeError: lp(self.MP.logger, 30, 'Warning! "A_t_r" not found in "PP". Continuing ...') pass else: self.save_field('_A_t', self.A_t_r, self.A_t_i, False, False, export_nedelec=export_nedelec) try: self.A_s_r_cg except AttributeError: if '_s' in self.FE.FS.approach: lp(self.MP.logger, 30, 'Warning! "A_s_r_cg" not found in "PP". Continuing ...') pass else: self.save_field('_A_s', self.A_s_r_cg, self.A_s_i_cg, export_pvd, export_cg, export_nedelec=False) try: self.A_s_r except AttributeError: if '_s' in self.FE.FS.approach: lp(self.MP.logger, 30, 'Warning! "A_s_r" not found in "PP". Continuing ...') pass else: self.save_field('_A_s', self.A_s_r, self.A_s_i, False, False, export_nedelec=export_nedelec)
# Exporting scalar potentials would need to be fixed at some point # if users are interested in that. # try: # self.Phi_r # except AttributeError: # pass # else: # if export_pvd: # df.File(self.export_dir + '_Phi_real.pvd') << self.Phi_r # df.File(self.export_dir + '_Phi_imag.pvd') << self.Phi_i # if export_cg: # lp(self.MP.logger, 30, # 'TO DO!, need to enable export of scalar Phi data ...') # # df.File(out_name + '_Phi_real.xml') << self.Phi_r # # df.File(out_name + '_Phi_imag.xml') << self.Phi_i
[docs] def export_all_results(self, quantities='EAH', export_cg=False, export_pvd=True, export_nedelec=True): """ Export a selection of calculated electric, magnetic or potential fields using specified data (*xml/h5*) and/or *pvd* format. Keyword arguments ----------------- - quantities = None, type str if **export_all** is False, specify which data should be exported, use a combination of **E**, **H** and/or **A** combined in one string - export_cg = False, type bool specify if fields in data format should be exported directly after conversion - export_pvd = True, type bool specify if fields are also exported in *.pvd* format for *Paraview* - export_nedelec = True, type bool set **True** for exporting caluclated data on a Nedelec space """ lp(self.MP.logger, 20, '... exporting selecetion of results ...', pre_dash=False) if 'E' in quantities: self.export_E_fields(export_pvd, export_cg, export_nedelec) if 'H' in quantities: self.export_H_fields(export_pvd, export_cg, export_nedelec) if 'A' in quantities: self.export_A_fields(export_pvd, export_cg, export_nedelec)
[docs] def save_field(self, quant, q1, q2, export_pvd, export_cg, export_nedelec): """ Store fields on hard disc drive. Required arguments ------------------ - quant, type str string specifying the field quantitiy to be exported - q1, q2, type dolfin Function functions containing the results - export_cg, type bool specify if fields in data format should be exported directly after conversion - export_pvd, type bool specify if fields are also exported in *.pvd* format for *Paraview* - export_nedelec = True, type bool set **True** for exporting caluclated data on a Nedelec space """ for ti in range(len(q1)): export_dir = self.export_dir if self.MP.n_freqs != 1: export_dir += '_f_{:d}'.format(self.fi) if self.FE.n_tx != 1: export_dir += '_tx_{:d}'.format(ti) if export_pvd: q1[ti].rename(quant + '_real', '') q2[ti].rename(quant + '_imag', '') df.File(export_dir + quant + '_real_cg.pvd') << q1[ti] df.File(export_dir + quant + '_imag_cg.pvd') << q2[ti] if export_cg and self.MP.file_format == 'xml': df.File(export_dir + quant + '_real_cg.xml') << q1[ti] df.File(export_dir + quant + '_imag_cg.xml') << q2[ti] if export_nedelec and self.MP.file_format == 'xml': df.File(export_dir + quant + '_real.xml') << q1[ti] df.File(export_dir + quant + '_imag.xml') << q2[ti] if self.MP.file_format == 'h5': if export_cg: out_name = self.export_dir + quant + '_cg.h5' if export_nedelec: out_name = self.export_dir + quant + '.h5' ti = 0 r_str = 'f{:d}'.format(self.fi) + '_real' i_str = 'f{:d}'.format(self.fi) + '_imag' if export_cg or export_nedelec: if self.FE.n_tx == 1: if self.fi == 0: self.h5_file = write_h5( self.MP.mpi_cw, q1[0], out_name, counter=0, close=False, ri=r_str) else: self.h5_file = write_h5( self.MP.mpi_cw, q1[0], out_name, new=False, append=True, counter=0, close=False, ri=r_str) write_h5(self.MP.mpi_cw, q2[0], out_name, counter=0, new=False, f=self.h5_file, close=True, ri=i_str) else: if self.fi == 0: self.h5_file = write_h5( self.MP.mpi_cw, q1[0], out_name, counter=0, close=False, ri=r_str) else: self.h5_file = write_h5( self.MP.mpi_cw, q1[0], out_name, new=False, append=True, counter=0, close=False,ri=r_str) write_h5(self.MP.mpi_cw, q2[0], out_name, counter=0, new=False, f=self.h5_file, close=False, ri=i_str) for ti in range(1, self.FE.n_tx - 1): write_h5(self.MP.mpi_cw, q1[ti], out_name, counter=ti, new=False, f=self.h5_file, close=False, ri=r_str) write_h5(self.MP.mpi_cw, q2[ti], out_name, counter=ti, new=False, f=self.h5_file, close=False, ri=i_str) write_h5( self.MP.mpi_cw, q1[ti+1], out_name, f=self.h5_file, new=False, counter=ti+1, ri=r_str, close=False) write_h5( self.MP.mpi_cw, q2[ti+1], out_name, f=self.h5_file, new=False, counter=ti+1, ri=i_str, close=True)
[docs] def convert_H_to_E(self, from_potential=False): """ Compute electric on basis of magnetic fields if enabled. """ lp(self.MP.logger, 20, '... deriving E-fields from H (or F) ...', pre_dash=False) b_real, b_imag = [], [] for ti in range(self.FE.n_tx): if not from_potential: Kr = df.inner(df.curl(self.H_t_r[ti]), self.MP.sigma0_inv_func * self.v_ned) * self.dx_0 Ki = df.inner(df.curl(self.H_t_i[ti]), self.MP.sigma0_inv_func * self.v_ned) * self.dx_0 else: Kr = df.inner(self.MP.sigma_inv_func * self.F_s_r[ti], df.curl(self.v_ned)) * self.dx_0 Ki = df.inner(self.MP.sigma_inv_func * self.F_s_i[ti], df.curl(self.v_ned)) * self.dx_0 b_real.append(df.assemble(Kr)) b_imag.append(df.assemble(Ki)) if not hasattr(self.Solver, 'solver'): self.E_s_r = self.Solver.solve_system_mumps( self.A_ned, b_real, self.FS.V, sym=True) else: self.E_s_r = self.Solver.solve_system_mumps( self.A_ned, b_real, self.FS.V, sym=True, first_call=False) self.E_s_i = self.Solver.solve_system_mumps( self.A_ned, b_imag, self.FS.V, sym=True, first_call=False) for ti in range(self.FE.n_tx): if not self.FS.PF.nedelec_pf: self.E_s_r_cg.append(df.interpolate(self.E_s_r[ti], self.FS.V_cg)) self.E_s_i_cg.append(df.interpolate(self.E_s_i[ti], self.FS.V_cg)) E_t_r_cg = df.Function(self.FS.V_cg) E_t_i_cg = df.Function(self.FS.V_cg) E_t_r = df.Function(self.FS.V) E_t_i = df.Function(self.FS.V) E_0_r = df.interpolate(self.FS.PF.E_0_r_cg[ti], self.FS.V) E_0_i = df.interpolate(self.FS.PF.E_0_i_cg[ti], self.FS.V) E_t_r_cg.vector()[:] = \ self.E_s_r_cg[ti].vector().get_local() + \ self.FS.PF.E_0_r_cg[ti].vector().get_local() E_t_i_cg.vector()[:] = \ self.E_s_i_cg[ti].vector().get_local() + \ self.FS.PF.E_0_i_cg[ti].vector().get_local() E_t_r.vector()[:] = \ self.E_s_r[ti].vector().get_local() + \ E_0_r.vector().get_local() E_t_i.vector()[:] = \ self.E_s_i[ti].vector().get_local() + \ E_0_i.vector().get_local() self.E_t_r_cg.append(E_t_r_cg) self.E_t_i_cg.append(E_t_i_cg) self.E_t_r.append(E_t_r) self.E_t_i.append(E_t_i) else: V = df.FunctionSpace(self.FS.mesh, 'N1curl', 2) E_t_r = df.Function(V) E_t_i = df.Function(V) E_s_r = df.interpolate(self.E_s_r[ti], V) E_s_i = df.interpolate(self.E_s_i[ti], V) self.E_s_r_cg.append(df.interpolate(self.E_s_r[ti], self.FS.V_cg)) self.E_s_i_cg.append(df.interpolate(self.E_s_i[ti], self.FS.V_cg)) E_t_r.vector()[:] = \ E_s_r.vector()[:] + \ self.FS.PF.E_0_r[ti].vector()[:] E_t_i.vector()[:] = \ E_s_i.vector()[:] + \ self.FS.PF.E_0_i[ti].vector()[:] print(E_s_r.vector()[:]) self.E_t_r.append(E_t_r) self.E_t_i.append(E_t_i)
[docs] def dc_post_proc(self, export_cg=True, export_pvd=True, export_nedelec=True): """ Post-processing for direct-current (DC) approach. Required arguments ------------------ - export_cg, type bool specify if fields in data format should be exported directly after conversion - export_pvd, type bool specify if fields are also exported in *.pvd* format for *Paraview* - export_nedelec = True, type bool set **True** for exporting caluclated data on a Nedelec space """ M, w, X = df.PETScMatrix(), df.PETScVector(), [] u = df.TrialFunction(self.FS.V) v = df.TestFunction(self.FS.V) K = df.inner(u, v) * self.FS.DOM.dx_0 for ti in range(self.FE.n_tx): # note additional sign change, might be incorporated everythere RHS = -df.inner(-df.grad(self.FE.U[ti]), v) * self.FS.DOM.dx_0 H, b = df.assemble_system(K, RHS, A_tensor=M, b_tensor=w) X.append(b) lp(self.MP.logger, 20, '... deriving electric field from potential ...', pre_dash=False) self.E = self.FE.Solver.solve_system_mumps( H, X, self.FS.V, sym=True, tx_selection=self.MP.grounding) del self.FE.Solver.solver if export_nedelec or export_cg or export_pvd: lp(self.MP.logger, 20, '... exporting direct-current E-fields ...', pre_dash=False) if export_cg or export_pvd: self.E_cg = [] for ti in range(len(self.E)): self.E_cg.append(df.interpolate(self.E[ti], self.FS.V)) export_dir = self.export_dir for ti in range(len(self.E)): if self.FE.n_tx != 1: export_dir = self.export_dir + '_tx_{:d}'.format(ti) if export_pvd: df.File(export_dir + '_E_cg.pvd') << self.E_cg[ti] if export_cg and self.MP.file_format == 'xml': df.File(export_dir + '_E_cg.xml') << self.E_cg[ti] if export_nedelec and self.MP.file_format == 'xml': df.File(export_dir + '_E.xml') << self.E[ti] if self.MP.file_format == 'h5': ti = 0 if export_cg: out_name = self.export_dir + 'E_cg.h5' if self.FE.n_tx == 1: self.h5_file = write_h5( self.MP.mpi_cw, self.E_cg[0], out_name, counter=0, close=True) else: self.h5_file = write_h5( self.MP.mpi_cw, self.E_cg[0], out_name, counter=0, close=False) for ti in range(1, self.FE.n_tx - 1): write_h5(self.MP.mpi_cw, self.E_cg[ti], out_name, counter=ti, new=False, f=self.h5_file, close=False) write_h5( self.MP.mpi_cw, self.E_cg[ti+1], out_name, f=self.h5_file, new=False, counter=ti+1, close=True) ti = 0 if export_nedelec: out_name = self.export_dir + 'E.h5' if self.FE.n_tx == 1: self.h5_file = write_h5( self.MP.mpi_cw, self.E[0], out_name, counter=0, close=True) else: self.h5_file = write_h5( self.MP.mpi_cw, self.E[0], out_name, counter=0, close=False) for ti in range(1, self.FE.n_tx - 1): write_h5(self.MP.mpi_cw, self.E[ti], out_name, counter=ti, new=False, f=self.h5_file, close=False) write_h5( self.MP.mpi_cw, self.E[ti+1], out_name, f=self.h5_file, new=False, counter=ti+1, close=True)