# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""
from custEM.post import InterpolationBase
import dolfin as df
import os
import numpy as np
from custEM.misc import logger_print as lp
from custEM.misc import read_h5
from custEM.misc import write_h5
from custEM.misc import petsc_transfer_function
from custEM.misc import specify_td_export_strings
[docs]
class TimeDomainInterpolator(InterpolationBase):
"""
Interpolator class for interpolating 3D modeling results on arbitrary
meshes, which can be generated using this class as well.
DOCS TO BE FILLED
"""
def __init__(self, PP, dg_interpolation, self_mode):
"""
The Interpolator class is initialized via the **MOD** class.
Required Arguments:
-------------------
- PP; type class
**Pre**- or **PostProcessing** instance.
- dg_interpolation, type bool
specifies target function space to be discontinuous if *True*
- self_mode, type bool
flag for embedded serial MOD calls for interpolation
"""
super(self.__class__, self).__init__(PP, dg_interpolation, self_mode)
self.print_dummy = True
[docs]
def interpolate(self, interp_meshes, EH=['E', 'H'], interp_p=None,
dg_interp=False, export_pvd=True, interp_dir=None):
"""
Customized interpolation function.
Required arguments
------------------
- interp_mesh, type str
name of the mesh to interpolate on
Keyword arguments
-----------------
- mute = False, type bool
set True, if info-interpolation messages should not be printed
- interp_p = 2, type int
polyonimal order of basis functions on interpolation mesh
- interp_dir, type str
specify, if a custom interpolation directory is desired; otherwise,
the defualt interpolation directory is located in the corresponding
results directory
- dg_interpolation = None, type bool
set True or False to overwrite *self.dg_interpolation* for
manually enabling discontinuous or continuous interpolation
"""
if interp_dir is not None:
self.interp_dir = interp_dir
if interp_p is None:
interp_p = self.interp_p
if type(EH) is str:
EH = [EH]
if df.MPI.rank(self.MP.mpi_cw) == 0:
if not os.path.isdir(self.interp_dir):
os.mkdir(self.interp_dir)
else:
pass
# support old syntax, using only a single interpolation mesh and
# multiple calls of interpolate
if type(interp_meshes) is not list:
interp_meshes = [interp_meshes]
interp_mesh_dir = self.check_interpolation_meshes(
interp_meshes, self.MP.m_dir, self.MP.logger)
if self.print_dummy:
lp(self.MP.logger, 20,
'Interpolation of results:')
lp(self.MP.logger, 20,
' - stored in "' + self.interp_dir +
'" - ', pre_dash=False)
if self.PP.FE.n_times > 1:
lp(self.MP.logger, 20,
'... interpolating ' + str(self.PP.FE.n_times) +
' times ...', pre_dash=False)
if self.MP.n_tx > 1:
lp(self.MP.logger, 20,
'... interpolating ' + str(self.MP.n_tx) +
' transmitters per observation time ...', pre_dash=False)
self.print_dummy = False
E_str, H_str, mode_e, mode_h = specify_td_export_strings(self.PP.FE)
for quantity in EH:
lp(self.MP.logger, 20,
'... interpolating ' + quantity + ' on ' +
str(len(interp_meshes)) + ' interpolation mesh(es) in serial '
'...', pre_dash=False)
for ii in range(len(interp_meshes)):
for j in range(self.PP.FE.n_times):
t_str = 't{:d}'.format(j) + '_data'
self.rank += 1
if df.MPI.rank(self.MP.mpi_cw) == self.rank:
lp(self.MP.logger, 10, '... process ' +
str(int(df.MPI.rank(self.MP.mpi_cw))) +
' starts interpolation at time ' + str(j) + ' ...',
pre_dash=False, barrier=False, root_only=False)
target_mesh = df.Mesh(self.MP.mpi_cs, interp_mesh_dir[ii] +
'/' + interp_meshes[ii] + '.xml')
V_target = df.VectorFunctionSpace(target_mesh, "CG",
self.interp_p)
mesh = df.Mesh(self.MP.mpi_cs)
hdf5 = df.HDF5File(self.MP.mpi_cs, self.MP.m_dir + '/_' +
self.MP.file_format + '/' +
self.MP.mesh_name +
'.' + self.MP.file_format, "r")
hdf5.read(mesh, '/mesh', False)
V = df.FunctionSpace(mesh, 'N1curl', self.PP.FE.FS.p)
for quantity in EH:
if 'E' in quantity:
string, mode = E_str, mode_e
if 'H' in quantity:
string, mode = H_str, mode_h
F = [df.Function(V) for ti in range(self.MP.n_tx)]
for ti in range(self.MP.n_tx):
export_name = string + '_' + mode + \
interp_meshes[ii]
if self.MP.n_tx != 1:
export_name = 'tx_{:d}_'.format(ti) + \
export_name
export_name = 't_{:d}_'.format(j) + export_name
try:
read_h5(
self.MP.mpi_cs, F[ti], self.MP.out_dir +
'/' + self.MP.mod_name + '_' + string +
'_' + mode[:-1] + '.' +
self.MP.file_format, counter=ti, ri=t_str)
except Exception as e:
lp(self.MP.logger, 30,
'Warning! *' + quantity + '* for time '
+ str(j) + ' and Tx ' + str(ti) +
' could not be imported.\n'
'Skipping this interpolation task. '
'Continuing ...', pre_dash=False,
barrier=False, root_only=False)
break
else:
d_interp = df.interpolate(F[ti], V_target)
data_npy = np.hstack((d_interp.vector(
).get_local().reshape(-1, 3),
V_target.tabulate_dof_coordinates(
).reshape(-1, 3)[0::3, :]))
np.save(self.interp_dir + '/' +
export_name + '.npy', data_npy)
d_interp.rename(quantity, '')
if export_pvd:
df.File(self.MP.mpi_cs,
self.interp_dir + '/' +
export_name + '.pvd') << d_interp
else:
pass
if self.rank == self.max_procs - 1:
self.rank = -1
lp(self.MP.logger, 10,
'... waiting for all mpi processes to finish '
'serial interpolation tasks ...', pre_dash=False)
# synchronize after one interpolation task has finished
lp(self.MP.logger, 10, '... synchronization finished ...',
pre_dash=False, post_dash=False)
self.rank = -1
[docs]
def interpolate_parallel(self, interp_mesh, quantity='EH', mute=True,
interp_p=None, dg_interp=False, interp_dir=None):
"""
Customized interpolation function.
Required arguments:
-------------------
- interp_mesh, type str
name of the mesh to interpolate on.
Keyword arguments:
------------------
- mute = False, type bool
set True, if info-interpolation messages should not be printed.
- interp_p = 2, type int
polyonimal order of interpolation mesh *FunctionSpaces*
- interp_dir, type str
specify, if a custom interpolation directory is desired, otherwise,
the defualt interpolation directory is located in the corresponding
results directory
- dg_interpolation = None, type bool
set True or False to overwrite *self.dg_interpolation* for
manually enabling discontinuous or continuous interpolation
"""
lp(self.MP.logger, 50,
'Error! Deprecated version of parallel interpolation was '
'not updated to the '
'support for multiple tx for the time-domain approaches as it '
'is comparatively slow compared to serial interpolation, '
'in particular, for multiple tx. If required, it can be '
're-implemented with significantly better performance using '
'manually built interpolation matrices. Aborting ...')
raise SystemExit
if interp_dir is not None:
self.interp_dir = interp_dir
if interp_p is None:
interp_p = self.interp_p
if df.MPI.rank(self.MP.mpi_cw) == 0:
if not os.path.isdir(self.interp_dir):
os.mkdir(self.interp_dir)
else:
pass
if interp_mesh.find('line') is not -1:
interp_mesh_dir = self.MP.m_dir + '/lines'
elif interp_mesh.find('slice') is not -1:
interp_mesh_dir = self.MP.m_dir + '/slices'
elif interp_mesh.find('path') is not -1:
interp_mesh_dir = self.MP.m_dir + '/paths'
else:
lp(self.MP.logger, 50,
'Error! Interpolation target mesh name must '
'end either with "line", "slice" or "path" so far. '
'Aborting ...', post_dash=True)
raise SystemExit
if os.path.isfile(interp_mesh_dir + '/' +
interp_mesh + '.xml') is False:
lp(self.MP.logger, 30,
'Warning! Interpolation mesh "' + str(interp_mesh) + '.xml" '
'could not be found. Continuing ...')
return
target_mesh = df.Mesh(self.MP.mpi_cw, interp_mesh_dir +
'/' + interp_mesh + '.xml')
if df.MPI.rank(self.MP.mpi_cw) == 0:
serial_mesh = df.Mesh(self.MP.mpi_cs, interp_mesh_dir +
'/' + interp_mesh + '.xml')
else:
pass
if dg_interp:
V_target = df.VectorFunctionSpace(target_mesh, "DG", interp_p)
if df.MPI.rank(self.MP.mpi_cw) == 0:
V_serial = df.VectorFunctionSpace(serial_mesh, "DG", interp_p)
E_serial = df.Function(V_serial)
H_serial = df.Function(V_serial)
else:
V_serial, E_serial, H_serial = None, None, None
else:
V_target = df.VectorFunctionSpace(target_mesh, "CG", interp_p)
if df.MPI.rank(self.MP.mpi_cw) == 0:
V_serial = df.VectorFunctionSpace(serial_mesh, "CG", interp_p)
E_serial = df.Function(V_serial)
H_serial = df.Function(V_serial)
else:
V_serial, E_serial, H_serial = None, None, None
V_dg = df.VectorFunctionSpace(self.PP.FE.FS.mesh, 'DG', 1)
for j in range(self.PP.FE.n_times):
export_name = self.MP.mod_name + '_' +\
interp_mesh + '_' + str(j)
if not mute:
lp(self.MP.logger, 20,
' - interpolation export directory is: "' +
self.interp_dir[:-1] + ', "files named as: "' +
export_name + '"', post_dash=True, barrier=False)
if 'E' in quantity:
lp(self.MP.logger, 20,
'... interpolating ' + E_str + ' on *' +
interp_mesh + '* in parallel ...', pre_dash=False)
dg_E = df.interpolate(self.PP.FE.E[j], V_dg)
d_interp_E = df.Function(V_target)
petsc_transfer_function(dg_E, d_interp_E)
d_interp_E.rename('E', '')
if 'H' in quantity:
lp(self.MP.logger, 20,
'... interpolating ' + H_str + ' on *' +
interp_mesh + '* in parallel ...', pre_dash=False)
dg_H = df.interpolate(self.PP.FE.H[j], V_dg)
d_interp_H = df.Function(V_target)
petsc_transfer_function(dg_H, d_interp_H)
d_interp_H.rename('H', '')
self.export_interpolation_data(
d_interp_E, d_interp_H, V_serial, j, export_name,
True, aux_h5=True,
E_serial=E_serial, H_serial=H_serial, EH=EH)
lp(self.MP.logger, 10,
'... time ' + str(j) + ' interpolated ...', pre_dash=False)
[docs]
def export_interpolation_data(self, electric, magnetic, V_serial, t_id,
export_name, export_pvd, aux_h5=False,
E_serial=None, H_serial=None, EH='EH'):
"""
Export interpolated results as complex numpy arrays and *pvd* files
to be viewed in Paraview. The flag *aux_h5* is used to enable a
temorary h5 export for the parallelized interpolation, dumping the
interpolated, parallelized FEniCS functions to file.
This file is read again in serial to export a conform numpy array.
Note, this step requires insignificant effort as functions on the
interpolation meshes are usually way smaller than functions defined
on the original mesh.
"""
E_str, H_str, mode_e, mode_h = specify_td_export_strings(self.PP.FE)
if 'E' in EH:
write_h5(self.MP.mpi_cw, electric, self.interp_dir +
'/' + E_str + '_' + export_name + '.h5')
if 'H' in EH:
write_h5(self.MP.mpi_cw, magnetic, self.interp_dir +
'/' + H_str + '_' + export_name + '.h5')
if df.MPI.rank(self.MP.mpi_cw) == 0:
if 'E' in EH:
read_h5(self.MP.mpi_cs, E_serial, self.interp_dir +
'/' + E_str + '_' + mode_e + export_name + '.h5')
os.remove(self.interp_dir + '/' + E_str + '_' + mode_e +
export_name + '.h5')
electric_npy = np.hstack((E_serial.vector().get_local(
).reshape(-1, 3), V_serial.tabulate_dof_coordinates(
).reshape(-1, 3)[0::3, :]))
np.save(self.interp_dir + '/' + E_str + '_' + mode_e +
export_name + '.npy', electric_npy)
if 'H' in EH:
read_h5(self.MP.mpi_cs, H_serial, self.interp_dir +
'/' + H_str + '_' + mode_h + export_name + '.h5')
os.remove(self.interp_dir + '/' + H_str + '_' + mode_h +
export_name + '.h5')
magnetic_npy = np.hstack((H_serial.vector().get_local(
).reshape(-1, 3), V_serial.tabulate_dof_coordinates(
).reshape(-1, 3)[0::3, :]))
np.save(self.interp_dir + '/' + H_str + '_' + mode_h +
export_name + '.npy', magnetic_npy)
else:
pass
df.MPI.barrier(df.MPI.comm_world)
if export_pvd:
if 'E' in EH:
df.File(self.MP.mpi_cs, self.interp_dir +
'/' + E_str + '_' + mode_e +
export_name + '.pvd') << electric
if 'H' in EH:
df.File(self.MP.mpi_cs, self.interp_dir +
'/' + H_str + '_' + mode_h +
export_name + '.pvd') << magnetic
[docs]
def eval_at_rx(self, rx_positions, rx_name, EH='EH',
interp_dir=None, mute=True):
"""
will be added if required, so far no reasonable usage
"""
pass