# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""
import dolfin as df
import os
import numpy as np
import time
from custEM.misc import logger_print as lp
from custEM.misc import write_h5
from custEM.misc import specify_td_export_strings
from custEM.core import Solver
[docs]
class PostProcessingTD:
"""
Time-domain PostProcessing class for computing and exporting electric
and magnetic fields, called from MOD instance. Note that interpolated
results are currently only stored as *numpy* arrays. A method to export
time-dependent solutions as *.pvd* file for *Paraview* might be implemented
in future if required.
Methods
-------
- convert_results()
convert frequency-domain solutions of **FT** approach to time domain,
derive *H* from *E* in **IE** and **RA** approaches
- export_td_data()
save total electric and/or magnetic fields on hard disc drive
- interpolate_frequency_solutions()
interpolate frequency-domain results in **FT** approach
- reorder_results()
reorder interpolated frequency-domain results in **FT** approach
- merge_td_results()
combine all interpolated time-domain results in a common data structure
and export all time-domain results of an interpolation mesh as
*numpy* arrays (pvd files not supported yet)
"""
def __init__(self, FE, export_domains):
"""
Initialize instance for converting and exporting postprocessing
quantities and saves the domains of the mesh to '... Domains.pvd'.
Required arguments
------------------
- FE, type class
FE approach (e.g., **ImplicitEuler**) instance
- export_domains, type bool
specify if mesh with domains should be exported or not
"""
self.FS = FE.FS
self.MP = FE.MP
self.FE = FE
self.export_dir = self.MP.out_dir + '/' + self.MP.mod_name
lp(self.MP.logger, 20,
' - {:<22}'.format('export directory') + ' = ' +
self.MP.out_dir + ' - ', pre_dash=False)
lp(self.MP.logger, 20,
' - {:<22}'.format('export name') + ' = ' + self.MP.mod_name +
'_*...* - ', pre_dash=False)
if export_domains:
lp(self.MP.logger, 20,
'... exporting pvd-file '
'for model validation ...', pre_dash=False)
df.File(self.export_dir + "_Domains.pvd") << FE.FS.DOM.domain_func
self.dx_0 = self.FS.DOM.dx_0
self.Solver = Solver(self.FS, self.FE)
self.E_cg, self.H_cg, self.H = None, None, None
[docs]
def convert_results(self, convert_to_H=True, export_cg=False,
export_pvd=False, export_nedelec=True,
interp_mesh=None, interpolate_fd=True,
reorder_fd=True, transform_td=True):
"""
Automatically convert results from E-fields to H-fields.
Keyword arguments
-----------------
- convert_to_H = True, type bool
calculate H-fields from E-fields
- export_cg = False, type bool
set **True** for exporting calculated data on Lagrange spaces
- export_pvd = True, type bool
set **True** to export fields in *.pvd* format for *Paraview*
- export_nedelec = True, type bool
set **True** for exporting calculated data on a Nedelec spaces
- interp_mesh = None, type str
specify interpolation mesh for automated **FT** approach
post-processing
- interpolate_fd = True, type bool
enable automated interpolation of frequency-domain results in
**FT** approach
- reorder_fd = True, type bool
enable automated reordering of interpolated frequency-domain
results in **FT** approach
- transform_td = True, type bool
enable automated transformation of reordered frequency-domain
results in **FT** approach
"""
if self.MP.approach == 'E_FT':
if interp_mesh is None:
if self.FE.interp_mesh is None:
lp(self.MP.logger, 50,
'Error! An interpolation mesh must be specified for '
'the automated\npost-processing of the FT '
'(fourier-transform-based) method.\nIt can be specified'
' either during calling the\n'
'*build_var_form()* or *solve_main_problem()* methods. '
'Aborting ...')
raise SystemExit
else:
interp_mesh = self.FE.interp_mesh
EH = ['E_t']
if convert_to_H:
EH.append('H_t')
if interpolate_fd:
self.interpolate_frequency_solutions(interp_mesh, EH)
if reorder_fd:
self.reorder_results(interp_mesh, EH)
if transform_td:
self.FE.transform_to_time_domain(interp_mesh, EH)
return
self.E = self.FE.U
if export_cg:
lp(self.MP.logger, 30,
'Warning! CG field export is not implemented yet. If required '
'by users, please contact the authors and we will implement '
'this feature. Continuing ...')
# self.E_cg = [df.Function(self.FS.V)
# for k in range(self.FE.n_times)]
# for k in range(self.FE.n_times):
# self.E_cg[k] = df.interpolate(self.E[k], self.FS.V)
if convert_to_H:
from petsc4py import PETSc
lp(self.MP.logger, 20,
'... deriving H-fields from E for ' + str(self.FE.n_tx) +
' transmitter(s) ...', pre_dash=False)
if hasattr(self.FE, 'dc_solver'):
# reusing factorization for H-field conversion if dc fields
# have been comuted before
H_solver = self.FE.dc_solver.solver
v = df.TestFunction(self.FS.V)
else:
u, v = df.TrialFunction(self.FS.V), df.TestFunction(self.FS.V)
LHS = df.inner(u, v) * self.dx_0
RHS = df.inner(df.Constant(("0.0", "0.0", "0.0")),
v) * self.dx_0
A, b = df.PETScMatrix(), df.PETScVector()
df.assemble_system(LHS, RHS, A_tensor=A, b_tensor=b)
A.mat().setOption(PETSc.Mat.Option.SYMMETRIC, True)
H_solver = df.PETScLUSolver(self.FS.mesh.mpi_comm(),
A, "mumps")
self.H = [[df.Function(self.FS.V) for ti in range(
self.FE.n_tx)] for k in range(self.FE.n_times)]
for k in range(self.FE.n_times):
lp(self.MP.logger, 20,
'... converting E at time ' + str(k) + ' ...',
pre_dash=False)
for ti in range(self.FE.n_tx):
RHS = df.inner(self.FE.mu1 * df.curl(self.E[k][ti]),
v) * self.dx_0
b = df.assemble(RHS)
H_solver.solve(self.H[k][ti].vector(), b)
if export_nedelec or export_pvd or export_cg:
lp(self.MP.logger, 20,
'... exporting results ...', pre_dash=False)
self.export_td_data(export_cg, export_pvd, export_nedelec)
[docs]
def export_td_data(self, export_cg, export_pvd, export_nedelec):
"""
Export electric and or magnetic fields using specified data (*xml/h5*)
and/or *pvd* format from Nedelec spaces.
Required arguments
------------------
- export_pvd, type bool
specify if fields are also exported in *.pvd* format for *Paraview*
- export_cg, type bool
specify if fields in data format should be exported directly
after conversion
- export_nedelec = True, type bool
set **True** for exporting caluclated data on a NedelecSpace
"""
lp(self.MP.logger, 20, '... storing *times* to '
'file in export directory ...', pre_dash=False)
if df.MPI.rank(self.MP.mpi_cw) == 0:
np.savetxt(self.export_dir + '_times.dat', self.FE.times)
else:
pass
E_str, H_str, mode_e, mode_h = specify_td_export_strings(self.FE)
if export_nedelec:
out_E = self.MP.out_dir + '/' + self.MP.mod_name + '_' + E_str +\
'_' + mode_e[:-1] + '.' + self.MP.file_format
out_H = self.MP.out_dir + '/' + self.MP.mod_name + '_' +\
H_str + '_' + mode_h[:-1] + '.' + self.MP.file_format
if self.MP.file_format in 'xml':
lp(self.MP.logger, 30,
'Warning! writing xml output of time domain data currently '
'not supported. Only last time step exported. '
'Continuing ...')
df.File(out_E + '.' + self.MP.file_format) << self.E[-1]
if self.H is not None:
df.File(out_H + '.' + self.MP.file_format) << self.H[-1]
elif self.MP.file_format in 'h5':
lp(self.MP.logger, 20,
'... exporting ' + E_str + '-fields in HDF5 format ...',
pre_dash=False)
for k in range(self.FE.n_times):
t_str = 't{:d}'.format(k) + '_data'
if k == 0:
self.E_file = write_h5(
self.MP.mpi_cw, self.E[0][0], out_E,
counter=0, ri=t_str, close=False)
else:
self.E_file = write_h5(
self.MP.mpi_cw, self.E[k][0], out_E, new=False,
append=True, counter=0, ri=t_str, close=False)
for ti in range(1, self.FE.n_tx):
write_h5(
self.MP.mpi_cw, self.E[k][ti], out_E,
counter=ti, new=False, f=self.E_file,
ri=t_str, close=False)
self.E_file.close()
if self.H is not None:
lp(self.MP.logger, 20,
'... exporting ' + H_str +
'-fields in HDF5 format ...', pre_dash=False)
for k in range(self.FE.n_times):
t_str = 't{:d}'.format(k) + '_data'
if k == 0:
self.H_file = write_h5(
self.MP.mpi_cw, self.H[0][0], out_H,
counter=0, ri=t_str, close=False)
else:
self.H_file = write_h5(
self.MP.mpi_cw, self.H[k][0], out_H, new=False,
append=True, counter=0, ri=t_str, close=False)
for ti in range(1, self.FE.n_tx):
write_h5(
self.MP.mpi_cw, self.H[k][ti], out_H,
counter=ti, new=False, f=self.H_file,
ri=t_str, close=False)
self.H_file.close()
if export_pvd:
out_name = self.MP.out_dir + '/' + self.MP.mod_name
lp(self.MP.logger, 20,
'... exporting ' + E_str + '-fields in pvd format ...',
pre_dash=False)
for ti in range(self.FE.n_tx):
if self.FE.n_tx != 1:
outfile_E = df.File(out_name + '_tx_{:d}_'.format(ti) +
E_str + '_' + mode_e[:-1] + '.pvd')
else:
outfile_E = df.File(out_name + '_' +
E_str + '_' + mode_e[:-1] + '.pvd')
for k in range(self.FE.n_times):
if self.E_cg is None:
E_out = df.interpolate(self.E[k][ti], self.FS.V_cg)
E_out.rename(E_str + '_field', 'label')
outfile_E << (E_out, self.FE.times[k])
if self.H is not None:
lp(self.MP.logger, 20,
'... exporting ' + H_str + '-fields in pvd format ...',
pre_dash=False)
for ti in range(self.FE.n_tx):
if self.FE.n_tx != 1:
outfile_H = df.File(out_name + '_tx_{:d}_'.format(ti) +
H_str + '_' + mode_h[:-1] + '.pvd')
else:
outfile_H = df.File(out_name + '_' +
H_str + '_' + mode_h[:-1] + '.pvd')
for k in range(self.FE.n_times):
if self.H_cg is None:
H_out = df.interpolate(self.H[k][ti], self.FS.V_cg)
H_out.rename(H_str + '_field', 'label')
outfile_H << (H_out, self.FE.times[k])
[docs]
def interpolate_frequency_solutions(self, interp_mesh, EH):
"""
Interpolate frequency-domain data on specified interpolation mesh.
Required arguments
------------------
- interp_mesh, type str
target interpolation mesh
- EH, type str
by default, EH='EH', that means that both, electric and magnetic
field results, are interpolated
"""
from custEM.core import MOD
M = MOD(self.FE.fd_mod_name, self.MP.mesh_name, 'E_t', p=self.FS.p,
load_existing=True, file_format=self.MP.file_format,
overwrite_results=False, debug_level=self.MP.logger.level,
m_dir=self.MP.m_dir, r_dir=self.MP.r_dir)
# interpolate fields on the observation line
M.IB.interpolate(interp_mesh, EH, fs_type='ned', export_pvd=False)
[docs]
def reorder_results(self, interp_mesh, EH):
"""
Reorder frequency-domain data station-wise by importing all
results and storing them in one file per station, containing the
values for all chosen frequencies.
Required arguments
------------------
- interp_mesh, type str
target interpolation mesh
- EH, type str
by default, EH='EH', that means that both, electric and magnetic
field results, are reordered
"""
path = self.FE.path_fd + '/' + self.FE.fd_mod_name + '_interpolated/'
rank = -1 # start with process 0, -1 because of +1 below
# all tx interpolation files format
single_tx = True
for q in EH:
if os.path.isfile(path + 'f_0_' + q + '_all_tx_' +
interp_mesh + '.npy'):
fname = '_' + q + '_all_tx_' + interp_mesh + '.npy'
field = np.load(path + 'f_0' + fname)
self.rx_pos = field[0, :, 3:].real
self.n_rx = len(self.rx_pos)
single_tx=False
# single tx interpolation files format
for ti in range(self.FE.n_tx):
rank += 1
if df.MPI.rank(self.MP.mpi_cw) == rank:
for q in EH:
if single_tx:
if self.FE.n_tx == 1:
path_rx = path + 'f_0_' + q + '_' +\
interp_mesh + '.npy'
else:
path_rx = path + 'f_0_tx_0_' + q + '_' +\
interp_mesh + '.npy'
self.rx_pos = np.load(path_rx)[:, 3:].real
self.n_rx = len(self.rx_pos)
all_frequencies = np.zeros(
(len(self.FE.omegas), self.n_rx, 3), dtype=complex)
for fi in range(self.FE.n_freqs):
if single_tx:
if self.FE.n_tx == 1:
path_f = path + 'f_' + str(fi) + '_' + q + \
'_' + interp_mesh + '.npy'
else:
path_f = path + 'f_' + str(fi) + '_' + \
'tx_' + str(ti) + '_' + q + '_' +\
interp_mesh + '.npy'
if os.path.isfile(path_f):
all_frequencies[fi, :, :] = \
np.ascontiguousarray(
np.load(path_f)[:, :3])
else:
lp(self.MP.logger, 30,
'Warning!' + path_f +
'\nis missing for reordering '
'frequency-domain results. Continuing ...')
else:
if os.path.isfile(path + 'f_' + str(fi) + fname):
all_frequencies[fi, :, :] = \
np.ascontiguousarray(np.load(
path + 'f_' + str(fi) +
fname)[ti, :, :3])
else:
lp(self.MP.logger, 30,
'Warning!' + path + 'f_' + str(fi) + fname +
'\nis missing for reordering '
'frequency-domain results. Continuing ...')
for ci, coord in enumerate(self.rx_pos):
if self.FE.n_tx == 1:
path_f = path + q + '_' + \
interp_mesh + '_rx_' + str(ci) + '.npy'
else:
path_f = path + 'tx_' + str(ti) + '_' + q + '_' +\
interp_mesh + '_rx_' + str(ci) + '.npy'
np.save(path_f, np.concatenate((
all_frequencies[:, ci, :])).reshape(-1, 3))
else:
pass
if rank == df.MPI.size(self.MP.mpi_cw) - 1:
rank = -1
lp(self.MP.logger, 10,
'... waiting for all mpi processes to finish '
'serial interpolation tasks ...', pre_dash=False)
df.MPI.barrier(df.MPI.comm_world)
[docs]
def merge_td_results(self, interp_mesh, EH='EH', interp_dir=None):
"""
Resort time-dependent results on interpolation meshes in a common data
format for all time-domain modeling approaches and export data as
*numpy* arrays.
Required arguments
------------------
- interp_mesh, type str
target interpolation mesh
Keyword arguments
-----------------
- EH, type str
by default, EH='EH', that means that both, electric and magnetic
field results, are sorted and exported
"""
if interp_dir is None:
# same directory as default *interp_dir* of InterpolationBase
interp_dir = (self.MP.r_dir + '/' + self.MP.approach + '/' +
self.MP.mesh_name + '/' +
self.MP.mod_name + '_interpolated/')
E_str, H_str, mode_e, mode_h = specify_td_export_strings(self.FE)
rank = -1
if 'FT' in self.FE.FS.approach:
if 'rx_path' in interp_mesh:
self.rx_pos = np.array(self.MP.rx[0])
self.n_rx = len(self.rx_pos)
else:
# initialize again if transform_to_time_domain was not called
if interp_mesh.find('line') is not -1:
self.interp_mesh_dir = self.MP.m_dir + '/lines'
elif interp_mesh.find('slice') is not -1:
self.interp_mesh_dir = self.MP.m_dir + '/slices'
elif interp_mesh.find('path') is not -1:
self.interp_mesh_dir = self.MP.m_dir + '/paths'
i_mesh = df.Mesh(self.MP.mpi_cs, self.interp_mesh_dir +
'/' + interp_mesh + '.xml')
self.rx_pos = i_mesh.coordinates()
self.n_rx = len(self.rx_pos)
for ti in range(self.MP.n_tx):
rank += 1
if self.MP.n_tx == 1:
export_path = interp_dir
else:
export_path = interp_dir + 'tx_' + str(ti) + '_'
if df.MPI.rank(self.MP.mpi_cw) == 0:
rx_to_fill = np.repeat(self.rx_pos[0].reshape(1, 3),
self.FE.n_times, axis=0).T
for kk in range(len(self.rx_pos)):
if os.path.isfile(export_path + 'E_shut_on_' +
'rx_0.npy'):
tmp = np.load(export_path + 'E_shut_on_' +
'rx_' + str(kk) + '.npy')
if kk == 0:
E = np.empty((len(self.rx_pos), 6,
self.FE.n_times))
E[-kk -1, :3, :] = tmp.T
E[-kk -1, 3:, :] = rx_to_fill
np.save(export_path + 'E_shut_on_' +
interp_mesh + '.npy', E)
for kk in range(len(self.rx_pos)):
if os.path.isfile(export_path + 'E_shut_off_' +
'rx_0.npy'):
tmp = np.load(export_path + 'E_shut_off_' +
'rx_' + str(kk) + '.npy')
if kk == 0:
E = np.empty((len(self.rx_pos), 6,
self.FE.n_times))
E[-kk -1, :3, :] = tmp.T
E[-kk -1, 3:, :] = rx_to_fill
np.save(export_path + 'E_shut_off_' +
interp_mesh + '.npy', E)
for kk in range(len(self.rx_pos)):
if os.path.isfile(export_path + 'H_shut_on_' +
'rx_0.npy'):
tmp = np.load(export_path + 'H_shut_on_' +
'rx_' + str(kk) + '.npy')
if kk == 0:
H = np.empty((len(self.rx_pos), 6,
self.FE.n_times))
H[-kk -1, :3, :] = tmp.T
H[-kk -1, 3:, :] = rx_to_fill
np.save(export_path + 'H_shut_on_' +
interp_mesh + '.npy', H)
for kk in range(len(self.rx_pos)):
if os.path.isfile(export_path + 'H_shut_off_' +
'rx_0.npy'):
tmp = np.load(export_path + 'H_shut_off_' +
'rx_' + str(kk) + '.npy')
if kk == 0:
H = np.empty((len(self.rx_pos), 6,
self.FE.n_times))
H[-kk -1, :3, :] = tmp.T
H[-kk -1, 3:, :] = rx_to_fill
np.save(export_path + 'H_shut_off_' +
interp_mesh + '.npy', H)
for kk in range(len(self.rx_pos)):
if os.path.isfile(export_path + 'dH_' +
'rx_0.npy'):
tmp = np.load(export_path + 'dH_' +
'rx_' + str(kk) + '.npy')
if kk == 0:
dH = np.empty((len(self.rx_pos), 6,
self.FE.n_times))
dH[-kk -1, :3, :] = tmp.T
dH[-kk -1, 3:, :] = rx_to_fill
np.save(export_path + 'dH_' +
interp_mesh + '.npy', dH)
else:
pass
if rank == df.MPI.size(self.MP.mpi_cw) - 1:
rank = -1
lp(self.MP.logger, 10,
'... waiting for all mpi processes to finish '
'serial interpolation tasks ...', pre_dash=False)
else:
for ti in range(self.FE.n_tx):
if df.MPI.size(self.MP.mpi_cw) > 1:
rank += 1
if df.MPI.rank(self.MP.mpi_cw) == rank:
if 'E' in EH:
f_name = E_str + '_' + mode_e + interp_mesh
if self.MP.n_tx != 1:
f_name = 'tx_{:d}_'.format(ti) + f_name
tmp = np.load(interp_dir + 't_0_' + f_name + '.npy')
E = np.empty((tmp.shape[0], 6, self.FE.n_times))
for j in range(self.FE.n_times):
tmp = np.load(interp_dir + 't_{:d}_'.format(j) +
f_name + '.npy')
E[:, :, j] = tmp[::-1, :]
np.save(interp_dir + f_name + '.npy', E)
if 'H' in EH:
f_name = H_str + '_' + mode_h + interp_mesh
if self.MP.n_tx != 1:
f_name = 'tx_{:d}_'.format(ti) + f_name
tmp = np.load(interp_dir + 't_0_' + f_name + '.npy')
H = np.empty((tmp.shape[0], 6, self.FE.n_times))
for j in range(self.FE.n_times):
tmp = np.load(interp_dir + 't_{:d}_'.format(j) +
f_name + '.npy')
H[:, :, j] = tmp[::-1, :]
np.save(interp_dir + f_name + '.npy', H)
else:
pass
if rank == df.MPI.size(self.MP.mpi_cw) - 1:
rank = -1
lp(self.MP.logger, 10,
'... waiting for all mpi processes to finish '
'serial interpolation tasks ...', pre_dash=False)
df.MPI.barrier(df.MPI.comm_world)