Source code for custEM.core.post_proc_td

# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""

import dolfin as df
import os
import numpy as np
import time
from custEM.misc import logger_print as lp
from custEM.misc import write_h5
from custEM.misc import specify_td_export_strings
from custEM.core import Solver


[docs] class PostProcessingTD: """ Time-domain PostProcessing class for computing and exporting electric and magnetic fields, called from MOD instance. Note that interpolated results are currently only stored as *numpy* arrays. A method to export time-dependent solutions as *.pvd* file for *Paraview* might be implemented in future if required. Methods ------- - convert_results() convert frequency-domain solutions of **FT** approach to time domain, derive *H* from *E* in **IE** and **RA** approaches - export_td_data() save total electric and/or magnetic fields on hard disc drive - interpolate_frequency_solutions() interpolate frequency-domain results in **FT** approach - reorder_results() reorder interpolated frequency-domain results in **FT** approach - merge_td_results() combine all interpolated time-domain results in a common data structure and export all time-domain results of an interpolation mesh as *numpy* arrays (pvd files not supported yet) """ def __init__(self, FE, export_domains): """ Initialize instance for converting and exporting postprocessing quantities and saves the domains of the mesh to '... Domains.pvd'. Required arguments ------------------ - FE, type class FE approach (e.g., **ImplicitEuler**) instance - export_domains, type bool specify if mesh with domains should be exported or not """ self.FS = FE.FS self.MP = FE.MP self.FE = FE self.export_dir = self.MP.out_dir + '/' + self.MP.mod_name lp(self.MP.logger, 20, ' - {:<22}'.format('export directory') + ' = ' + self.MP.out_dir + ' - ', pre_dash=False) lp(self.MP.logger, 20, ' - {:<22}'.format('export name') + ' = ' + self.MP.mod_name + '_*...* - ', pre_dash=False) if export_domains: lp(self.MP.logger, 20, '... exporting pvd-file ' 'for model validation ...', pre_dash=False) df.File(self.export_dir + "_Domains.pvd") << FE.FS.DOM.domain_func self.dx_0 = self.FS.DOM.dx_0 self.Solver = Solver(self.FS, self.FE) self.E_cg, self.H_cg, self.H = None, None, None
[docs] def convert_results(self, convert_to_H=True, export_cg=False, export_pvd=False, export_nedelec=True, interp_mesh=None, interpolate_fd=True, reorder_fd=True, transform_td=True): """ Automatically convert results from E-fields to H-fields. Keyword arguments ----------------- - convert_to_H = True, type bool calculate H-fields from E-fields - export_cg = False, type bool set **True** for exporting calculated data on Lagrange spaces - export_pvd = True, type bool set **True** to export fields in *.pvd* format for *Paraview* - export_nedelec = True, type bool set **True** for exporting calculated data on a Nedelec spaces - interp_mesh = None, type str specify interpolation mesh for automated **FT** approach post-processing - interpolate_fd = True, type bool enable automated interpolation of frequency-domain results in **FT** approach - reorder_fd = True, type bool enable automated reordering of interpolated frequency-domain results in **FT** approach - transform_td = True, type bool enable automated transformation of reordered frequency-domain results in **FT** approach """ if self.MP.approach == 'E_FT': if interp_mesh is None: if self.FE.interp_mesh is None: lp(self.MP.logger, 50, 'Error! An interpolation mesh must be specified for ' 'the automated\npost-processing of the FT ' '(fourier-transform-based) method.\nIt can be specified' ' either during calling the\n' '*build_var_form()* or *solve_main_problem()* methods. ' 'Aborting ...') raise SystemExit else: interp_mesh = self.FE.interp_mesh EH = ['E_t'] if convert_to_H: EH.append('H_t') if interpolate_fd: self.interpolate_frequency_solutions(interp_mesh, EH) if reorder_fd: self.reorder_results(interp_mesh, EH) if transform_td: self.FE.transform_to_time_domain(interp_mesh, EH) return self.E = self.FE.U if export_cg: lp(self.MP.logger, 30, 'Warning! CG field export is not implemented yet. If required ' 'by users, please contact the authors and we will implement ' 'this feature. Continuing ...') # self.E_cg = [df.Function(self.FS.V) # for k in range(self.FE.n_times)] # for k in range(self.FE.n_times): # self.E_cg[k] = df.interpolate(self.E[k], self.FS.V) if convert_to_H: from petsc4py import PETSc lp(self.MP.logger, 20, '... deriving H-fields from E for ' + str(self.FE.n_tx) + ' transmitter(s) ...', pre_dash=False) if hasattr(self.FE, 'dc_solver'): # reusing factorization for H-field conversion if dc fields # have been comuted before H_solver = self.FE.dc_solver.solver v = df.TestFunction(self.FS.V) else: u, v = df.TrialFunction(self.FS.V), df.TestFunction(self.FS.V) LHS = df.inner(u, v) * self.dx_0 RHS = df.inner(df.Constant(("0.0", "0.0", "0.0")), v) * self.dx_0 A, b = df.PETScMatrix(), df.PETScVector() df.assemble_system(LHS, RHS, A_tensor=A, b_tensor=b) A.mat().setOption(PETSc.Mat.Option.SYMMETRIC, True) H_solver = df.PETScLUSolver(self.FS.mesh.mpi_comm(), A, "mumps") self.H = [[df.Function(self.FS.V) for ti in range( self.FE.n_tx)] for k in range(self.FE.n_times)] for k in range(self.FE.n_times): lp(self.MP.logger, 20, '... converting E at time ' + str(k) + ' ...', pre_dash=False) for ti in range(self.FE.n_tx): RHS = df.inner(self.FE.mu1 * df.curl(self.E[k][ti]), v) * self.dx_0 b = df.assemble(RHS) H_solver.solve(self.H[k][ti].vector(), b) if export_nedelec or export_pvd or export_cg: lp(self.MP.logger, 20, '... exporting results ...', pre_dash=False) self.export_td_data(export_cg, export_pvd, export_nedelec)
[docs] def export_td_data(self, export_cg, export_pvd, export_nedelec): """ Export electric and or magnetic fields using specified data (*xml/h5*) and/or *pvd* format from Nedelec spaces. Required arguments ------------------ - export_pvd, type bool specify if fields are also exported in *.pvd* format for *Paraview* - export_cg, type bool specify if fields in data format should be exported directly after conversion - export_nedelec = True, type bool set **True** for exporting caluclated data on a NedelecSpace """ lp(self.MP.logger, 20, '... storing *times* to ' 'file in export directory ...', pre_dash=False) if df.MPI.rank(self.MP.mpi_cw) == 0: np.savetxt(self.export_dir + '_times.dat', self.FE.times) else: pass E_str, H_str, mode_e, mode_h = specify_td_export_strings(self.FE) if export_nedelec: out_E = self.MP.out_dir + '/' + self.MP.mod_name + '_' + E_str +\ '_' + mode_e[:-1] + '.' + self.MP.file_format out_H = self.MP.out_dir + '/' + self.MP.mod_name + '_' +\ H_str + '_' + mode_h[:-1] + '.' + self.MP.file_format if self.MP.file_format in 'xml': lp(self.MP.logger, 30, 'Warning! writing xml output of time domain data currently ' 'not supported. Only last time step exported. ' 'Continuing ...') df.File(out_E + '.' + self.MP.file_format) << self.E[-1] if self.H is not None: df.File(out_H + '.' + self.MP.file_format) << self.H[-1] elif self.MP.file_format in 'h5': lp(self.MP.logger, 20, '... exporting ' + E_str + '-fields in HDF5 format ...', pre_dash=False) for k in range(self.FE.n_times): t_str = 't{:d}'.format(k) + '_data' if k == 0: self.E_file = write_h5( self.MP.mpi_cw, self.E[0][0], out_E, counter=0, ri=t_str, close=False) else: self.E_file = write_h5( self.MP.mpi_cw, self.E[k][0], out_E, new=False, append=True, counter=0, ri=t_str, close=False) for ti in range(1, self.FE.n_tx): write_h5( self.MP.mpi_cw, self.E[k][ti], out_E, counter=ti, new=False, f=self.E_file, ri=t_str, close=False) self.E_file.close() if self.H is not None: lp(self.MP.logger, 20, '... exporting ' + H_str + '-fields in HDF5 format ...', pre_dash=False) for k in range(self.FE.n_times): t_str = 't{:d}'.format(k) + '_data' if k == 0: self.H_file = write_h5( self.MP.mpi_cw, self.H[0][0], out_H, counter=0, ri=t_str, close=False) else: self.H_file = write_h5( self.MP.mpi_cw, self.H[k][0], out_H, new=False, append=True, counter=0, ri=t_str, close=False) for ti in range(1, self.FE.n_tx): write_h5( self.MP.mpi_cw, self.H[k][ti], out_H, counter=ti, new=False, f=self.H_file, ri=t_str, close=False) self.H_file.close() if export_pvd: out_name = self.MP.out_dir + '/' + self.MP.mod_name lp(self.MP.logger, 20, '... exporting ' + E_str + '-fields in pvd format ...', pre_dash=False) for ti in range(self.FE.n_tx): if self.FE.n_tx != 1: outfile_E = df.File(out_name + '_tx_{:d}_'.format(ti) + E_str + '_' + mode_e[:-1] + '.pvd') else: outfile_E = df.File(out_name + '_' + E_str + '_' + mode_e[:-1] + '.pvd') for k in range(self.FE.n_times): if self.E_cg is None: E_out = df.interpolate(self.E[k][ti], self.FS.V_cg) E_out.rename(E_str + '_field', 'label') outfile_E << (E_out, self.FE.times[k]) if self.H is not None: lp(self.MP.logger, 20, '... exporting ' + H_str + '-fields in pvd format ...', pre_dash=False) for ti in range(self.FE.n_tx): if self.FE.n_tx != 1: outfile_H = df.File(out_name + '_tx_{:d}_'.format(ti) + H_str + '_' + mode_h[:-1] + '.pvd') else: outfile_H = df.File(out_name + '_' + H_str + '_' + mode_h[:-1] + '.pvd') for k in range(self.FE.n_times): if self.H_cg is None: H_out = df.interpolate(self.H[k][ti], self.FS.V_cg) H_out.rename(H_str + '_field', 'label') outfile_H << (H_out, self.FE.times[k])
[docs] def interpolate_frequency_solutions(self, interp_mesh, EH): """ Interpolate frequency-domain data on specified interpolation mesh. Required arguments ------------------ - interp_mesh, type str target interpolation mesh - EH, type str by default, EH='EH', that means that both, electric and magnetic field results, are interpolated """ from custEM.core import MOD M = MOD(self.FE.fd_mod_name, self.MP.mesh_name, 'E_t', p=self.FS.p, load_existing=True, file_format=self.MP.file_format, overwrite_results=False, debug_level=self.MP.logger.level, m_dir=self.MP.m_dir, r_dir=self.MP.r_dir) # interpolate fields on the observation line M.IB.interpolate(interp_mesh, EH, fs_type='ned', export_pvd=False)
[docs] def reorder_results(self, interp_mesh, EH): """ Reorder frequency-domain data station-wise by importing all results and storing them in one file per station, containing the values for all chosen frequencies. Required arguments ------------------ - interp_mesh, type str target interpolation mesh - EH, type str by default, EH='EH', that means that both, electric and magnetic field results, are reordered """ path = self.FE.path_fd + '/' + self.FE.fd_mod_name + '_interpolated/' rank = -1 # start with process 0, -1 because of +1 below # all tx interpolation files format single_tx = True for q in EH: if os.path.isfile(path + 'f_0_' + q + '_all_tx_' + interp_mesh + '.npy'): fname = '_' + q + '_all_tx_' + interp_mesh + '.npy' field = np.load(path + 'f_0' + fname) self.rx_pos = field[0, :, 3:].real self.n_rx = len(self.rx_pos) single_tx=False # single tx interpolation files format for ti in range(self.FE.n_tx): rank += 1 if df.MPI.rank(self.MP.mpi_cw) == rank: for q in EH: if single_tx: if self.FE.n_tx == 1: path_rx = path + 'f_0_' + q + '_' +\ interp_mesh + '.npy' else: path_rx = path + 'f_0_tx_0_' + q + '_' +\ interp_mesh + '.npy' self.rx_pos = np.load(path_rx)[:, 3:].real self.n_rx = len(self.rx_pos) all_frequencies = np.zeros( (len(self.FE.omegas), self.n_rx, 3), dtype=complex) for fi in range(self.FE.n_freqs): if single_tx: if self.FE.n_tx == 1: path_f = path + 'f_' + str(fi) + '_' + q + \ '_' + interp_mesh + '.npy' else: path_f = path + 'f_' + str(fi) + '_' + \ 'tx_' + str(ti) + '_' + q + '_' +\ interp_mesh + '.npy' if os.path.isfile(path_f): all_frequencies[fi, :, :] = \ np.ascontiguousarray( np.load(path_f)[:, :3]) else: lp(self.MP.logger, 30, 'Warning!' + path_f + '\nis missing for reordering ' 'frequency-domain results. Continuing ...') else: if os.path.isfile(path + 'f_' + str(fi) + fname): all_frequencies[fi, :, :] = \ np.ascontiguousarray(np.load( path + 'f_' + str(fi) + fname)[ti, :, :3]) else: lp(self.MP.logger, 30, 'Warning!' + path + 'f_' + str(fi) + fname + '\nis missing for reordering ' 'frequency-domain results. Continuing ...') for ci, coord in enumerate(self.rx_pos): if self.FE.n_tx == 1: path_f = path + q + '_' + \ interp_mesh + '_rx_' + str(ci) + '.npy' else: path_f = path + 'tx_' + str(ti) + '_' + q + '_' +\ interp_mesh + '_rx_' + str(ci) + '.npy' np.save(path_f, np.concatenate(( all_frequencies[:, ci, :])).reshape(-1, 3)) else: pass if rank == df.MPI.size(self.MP.mpi_cw) - 1: rank = -1 lp(self.MP.logger, 10, '... waiting for all mpi processes to finish ' 'serial interpolation tasks ...', pre_dash=False) df.MPI.barrier(df.MPI.comm_world)
[docs] def merge_td_results(self, interp_mesh, EH='EH', interp_dir=None): """ Resort time-dependent results on interpolation meshes in a common data format for all time-domain modeling approaches and export data as *numpy* arrays. Required arguments ------------------ - interp_mesh, type str target interpolation mesh Keyword arguments ----------------- - EH, type str by default, EH='EH', that means that both, electric and magnetic field results, are sorted and exported """ if interp_dir is None: # same directory as default *interp_dir* of InterpolationBase interp_dir = (self.MP.r_dir + '/' + self.MP.approach + '/' + self.MP.mesh_name + '/' + self.MP.mod_name + '_interpolated/') E_str, H_str, mode_e, mode_h = specify_td_export_strings(self.FE) rank = -1 if 'FT' in self.FE.FS.approach: if 'rx_path' in interp_mesh: self.rx_pos = np.array(self.MP.rx[0]) self.n_rx = len(self.rx_pos) else: # initialize again if transform_to_time_domain was not called if interp_mesh.find('line') is not -1: self.interp_mesh_dir = self.MP.m_dir + '/lines' elif interp_mesh.find('slice') is not -1: self.interp_mesh_dir = self.MP.m_dir + '/slices' elif interp_mesh.find('path') is not -1: self.interp_mesh_dir = self.MP.m_dir + '/paths' i_mesh = df.Mesh(self.MP.mpi_cs, self.interp_mesh_dir + '/' + interp_mesh + '.xml') self.rx_pos = i_mesh.coordinates() self.n_rx = len(self.rx_pos) for ti in range(self.MP.n_tx): rank += 1 if self.MP.n_tx == 1: export_path = interp_dir else: export_path = interp_dir + 'tx_' + str(ti) + '_' if df.MPI.rank(self.MP.mpi_cw) == 0: rx_to_fill = np.repeat(self.rx_pos[0].reshape(1, 3), self.FE.n_times, axis=0).T for kk in range(len(self.rx_pos)): if os.path.isfile(export_path + 'E_shut_on_' + 'rx_0.npy'): tmp = np.load(export_path + 'E_shut_on_' + 'rx_' + str(kk) + '.npy') if kk == 0: E = np.empty((len(self.rx_pos), 6, self.FE.n_times)) E[-kk -1, :3, :] = tmp.T E[-kk -1, 3:, :] = rx_to_fill np.save(export_path + 'E_shut_on_' + interp_mesh + '.npy', E) for kk in range(len(self.rx_pos)): if os.path.isfile(export_path + 'E_shut_off_' + 'rx_0.npy'): tmp = np.load(export_path + 'E_shut_off_' + 'rx_' + str(kk) + '.npy') if kk == 0: E = np.empty((len(self.rx_pos), 6, self.FE.n_times)) E[-kk -1, :3, :] = tmp.T E[-kk -1, 3:, :] = rx_to_fill np.save(export_path + 'E_shut_off_' + interp_mesh + '.npy', E) for kk in range(len(self.rx_pos)): if os.path.isfile(export_path + 'H_shut_on_' + 'rx_0.npy'): tmp = np.load(export_path + 'H_shut_on_' + 'rx_' + str(kk) + '.npy') if kk == 0: H = np.empty((len(self.rx_pos), 6, self.FE.n_times)) H[-kk -1, :3, :] = tmp.T H[-kk -1, 3:, :] = rx_to_fill np.save(export_path + 'H_shut_on_' + interp_mesh + '.npy', H) for kk in range(len(self.rx_pos)): if os.path.isfile(export_path + 'H_shut_off_' + 'rx_0.npy'): tmp = np.load(export_path + 'H_shut_off_' + 'rx_' + str(kk) + '.npy') if kk == 0: H = np.empty((len(self.rx_pos), 6, self.FE.n_times)) H[-kk -1, :3, :] = tmp.T H[-kk -1, 3:, :] = rx_to_fill np.save(export_path + 'H_shut_off_' + interp_mesh + '.npy', H) for kk in range(len(self.rx_pos)): if os.path.isfile(export_path + 'dH_' + 'rx_0.npy'): tmp = np.load(export_path + 'dH_' + 'rx_' + str(kk) + '.npy') if kk == 0: dH = np.empty((len(self.rx_pos), 6, self.FE.n_times)) dH[-kk -1, :3, :] = tmp.T dH[-kk -1, 3:, :] = rx_to_fill np.save(export_path + 'dH_' + interp_mesh + '.npy', dH) else: pass if rank == df.MPI.size(self.MP.mpi_cw) - 1: rank = -1 lp(self.MP.logger, 10, '... waiting for all mpi processes to finish ' 'serial interpolation tasks ...', pre_dash=False) else: for ti in range(self.FE.n_tx): if df.MPI.size(self.MP.mpi_cw) > 1: rank += 1 if df.MPI.rank(self.MP.mpi_cw) == rank: if 'E' in EH: f_name = E_str + '_' + mode_e + interp_mesh if self.MP.n_tx != 1: f_name = 'tx_{:d}_'.format(ti) + f_name tmp = np.load(interp_dir + 't_0_' + f_name + '.npy') E = np.empty((tmp.shape[0], 6, self.FE.n_times)) for j in range(self.FE.n_times): tmp = np.load(interp_dir + 't_{:d}_'.format(j) + f_name + '.npy') E[:, :, j] = tmp[::-1, :] np.save(interp_dir + f_name + '.npy', E) if 'H' in EH: f_name = H_str + '_' + mode_h + interp_mesh if self.MP.n_tx != 1: f_name = 'tx_{:d}_'.format(ti) + f_name tmp = np.load(interp_dir + 't_0_' + f_name + '.npy') H = np.empty((tmp.shape[0], 6, self.FE.n_times)) for j in range(self.FE.n_times): tmp = np.load(interp_dir + 't_{:d}_'.format(j) + f_name + '.npy') H[:, :, j] = tmp[::-1, :] np.save(interp_dir + f_name + '.npy', H) else: pass if rank == df.MPI.size(self.MP.mpi_cw) - 1: rank = -1 lp(self.MP.logger, 10, '... waiting for all mpi processes to finish ' 'serial interpolation tasks ...', pre_dash=False) df.MPI.barrier(df.MPI.comm_world)