# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""
import dolfin as df
from custEM.post import InterpolationBase
import os
import custEM as ce
from custEM.misc import logger_print as lp
from custEM.misc import run_multiple_serial as rs_multiple
from custEM.misc import petsc_transfer_function
from custEM.core import MOD
from custEM.misc import read_h5
from custEM.misc import write_h5
import numpy as np
[docs]
class FrequencyDomainInterpolator(InterpolationBase):
"""
Interpolator class for interpolating 3D modeling results on arbitrary
meshes, which can be generated using this class as well.
Notice
------
There are four ways of interpolation:
1. Interpolation is conducted in serial and might take some time,
because exported solutions need to be imported again in serial for
the serial interpolation. Anyway, multi-threading is automatically used
to run several interpolation tasks simultaneously.
2. Parallel interpolation based on a PETSc transfer matrix. Fields are
always interpolated on a discontinous Lagrange VectorFunctionSpace as
auxiliary step, but do not require parallel export of the solutions
on the computation mesh and serial import afterwards. That can save
lots of time and disc space if only results on interpolation positions
are of interest.
3. Function data are evaluated at single single locations
(not implemented in this class yet).
4. Automated parallel interpolation using maunally created
interpolation-matrices, see *InterpolationBase*
Class internal functions
------------------------
- interpolate()
this function is a wrapper to call the *interpolate_in_serial.py* file,
which calls the FEniCS interpolation routines.
- interpolate_parallel()
parallel interpolation between different function spaces and meshes.
- export_interpolation_data()
export interpolated data
"""
def __init__(self, PP, dg_interpolation, self_mode, fs_type=None):
"""
The Interpolator class is initialized via the **MOD** class.
Required Arguments:
-------------------
- PP; type class
**Pre**- or **PostProcessing** instance.
- dg_interpolation, type bool
specifies target function space to be discontinuous if *True*
- self_mode, type bool
flag for embedded serial MOD calls for interpolation
- fs_type, type str
either None, 'ned' or 'cg', only needed for interpolation purposes
"""
super(self.__class__, self).__init__(PP, dg_interpolation, self_mode)
self.print_dummy = True
if fs_type is None:
self.fs_type = fs_type
else:
self.fs_type = 'ned'
[docs]
def interpolate(self, interp_meshes, EH=['E_t', 'H_t'], interp_p=None,
fs_type=None, dg_interpolation=None, export_name=None,
export_pvd=True, interp_dir=None, freqs=None):
"""
Customized interpolation function.
Required arguments:
-------------------
- quantitiy, type str
Either **E_t, H_t, E_s** or **H_s**.
- interp_mesh, type str
name of the mesh to interpolate on.
Keyword arguments:
------------------
- mute = False, type bool
set True, if info-interpolation messages should not be printed.
- interp_p = 2, type int
polyonimal order of interpolation mesh *FunctionSpaces*
- fs_type = None, type str
Specify if source functions are defined on Nedelec
("ned", default) or Lagrange ("cg") function spaces
- dg_interpolation = None, type bool
set True or False to overwrite *self.dg_interpolation* for
manually enabling discontinuous or continuous interpolation
- export_name = None, type str
specify a custom export name of the interpolated data if desired.
- interp_dir, type str
specify, if a custom interpolation directory is desired, otherwise,
the defualt interpolation directory is located in the corresponding
results directory
"""
self.rank = -1
if interp_dir is not None:
self.interp_dir = interp_dir
if type(EH) is str:
EH = [EH]
if interp_p is None:
interp_p = self.interp_p
if dg_interpolation is None:
dg_interp = self.dg_interpolation
else:
dg_interp = dg_interpolation
if fs_type is None:
fs_type = self.fs_type
if self.print_dummy:
if df.MPI.rank(self.MP.mpi_cw) == 0:
if not os.path.isdir(self.interp_dir):
os.mkdir(self.interp_dir)
else:
pass
# support old syntax, using only a single interpolation mesh and
# multiple calls of interpolate
if type(interp_meshes) is not list:
interp_meshes = [interp_meshes]
interp_mesh_dir = self.check_interpolation_meshes(
interp_meshes, self.MP.m_dir, self.MP.logger)
if self.print_dummy:
# backup of original debug level
self.d_level = int(self.MP.logger.level)
lp(self.MP.logger, 20,
'Interpolation of results:')
lp(self.MP.logger, 20,
' - stored in "' + self.interp_dir +
'" - ', pre_dash=False)
if self.MP.n_freqs > 1:
lp(self.MP.logger, 20,
'... interpolating ' + str(self.MP.n_freqs) +
' frequencies ...', pre_dash=False)
if self.MP.n_tx > 1:
lp(self.MP.logger, 20,
'... interpolating ' + str(self.MP.n_tx) +
' transmitters per frequency ...', pre_dash=False)
self.print_dummy = False
for quantity in EH:
lp(self.MP.logger, 20,
'... interpolating ' + quantity + ' on ' +
str(len(interp_meshes)) + ' interpolation mesh(es) in serial '
'...', pre_dash=False)
# use warning log level to suppress unnecessary prints if debugging
# level is not specified explicitly
single_d_level = 30
if self.d_level < 11:
single_d_level = self.d_level
M = MOD(self.MP.mod_name, self.MP.mesh_name,
self.MP.approach, overwrite_results=False, p=self.PP.FS.p,
file_format=self.MP.file_format, self_mode=True,
load_existing=True, import_freq=0, debug_level=single_d_level,
r_dir=self.MP.r_dir, para_dir=self.MP.para_dir,
mute=True, test_mode=self.PP.FS.test_mode,
m_dir=self.MP.m_dir, fs_type=fs_type,
dg_interpolation=self.dg_interpolation)
for ii in range(len(interp_meshes)):
for fi in range(self.MP.n_freqs):
if freqs is not None:
if fi not in freqs:
continue
M.PP.import_freq = fi
for quantity in EH:
self.rank += 1
if export_name is None:
f_name = quantity + '_' + interp_meshes[ii]
else:
if len(interp_meshes) == 1:
f_name = export_name
else:
f_name = export_name + '_imesh_' + str(ii)
if df.MPI.rank(self.MP.mpi_cw) == self.rank:
lp(self.MP.logger, 10, '... process ' + str(int(
df.MPI.rank(self.MP.mpi_cw))) + ' starts ' +
quantity + ' interpolation for frequency ' +
str(fi) + ' ...',
pre_dash=False, barrier=False, root_only=False)
M.PP.import_selected_results([quantity],
self.MP.file_format)
M.IB.find_quantity(quantity, fs_type)
target_mesh = df.Mesh(
self.MP.mpi_cs, interp_mesh_dir[ii] +
'/' + interp_meshes[ii] + '.xml')
if dg_interp:
V_target = df.VectorFunctionSpace(
target_mesh, "DG", interp_p)
else:
V_target = df.VectorFunctionSpace(
target_mesh, "CG", interp_p)
for ti in range(self.MP.n_tx):
if M.IB.q1 is None or M.IB.q2 is None:
lp(self.MP.logger, 30,
'Warning! *' + quantity + '* for frequency '
+ str(fi) + ' could not be imported.\n'
'Skipping this interpolation task. '
'Continuing ...', pre_dash=False,
barrier=False, root_only=False)
break
d_interp1 = df.interpolate(M.IB.q1[ti], V_target)
d_interp2 = df.interpolate(M.IB.q2[ti], V_target)
d_interp1.rename(quantity + '_real', '')
d_interp2.rename(quantity + '_imag', '')
self.export_interpolation_data(
d_interp1, d_interp2, V_target,
fi, ti, f_name, export_pvd)
else:
pass
if self.rank == self.max_procs - 1:
self.rank = -1
lp(self.MP.logger, 10,
'... waiting for all mpi processes to finish '
'serial interpolation tasks ...', pre_dash=False)
# switch back to specified debug level of main model
self.MP.logger.handlers[0].setLevel(self.d_level)
self.MP.logger.setLevel(self.d_level)
# synchronize after one interpolation task has finished
lp(self.MP.logger, 10, '... synchronization finished ...',
pre_dash=False, post_dash=False)
self.rank = -1
[docs]
def interpolate_parallel(self, interp_mesh, quantity, mute=True,
interp_p=None, fs_type=None,
dg_interpolation=None, export_name=None,
export_pvd=True, interp_dir=None, fi=0):
"""
Interpolation in parallel between different function spaces and
different meshes. Note that a discontinous Lagrange VectorFunctionSpace
might be required as auxiliary step to interpolate between Nedelec and
Lagrange spaces.
"""
if export_name is None:
export_name = quantity + '_' + interp_mesh
if interp_dir is not None:
self.interp_dir = interp_dir
if interp_p is None:
interp_p = self.interp_p
if dg_interpolation is None:
dg_interp = self.dg_interpolation
else:
dg_interp = dg_interpolation
if fs_type is None:
fs_type = self.fs_type
if not mute:
lp(self.MP.logger, 20,
'Interpolation export directory is: "' + self.interp_dir[:-1] +
', "files named as: "' + export_name + '"', post_dash=True,
barrier=False)
if df.MPI.rank(self.MP.mpi_cw) == 0:
if not os.path.isdir(self.interp_dir):
os.mkdir(self.interp_dir)
else:
pass
if interp_mesh.find('line') != -1:
interp_mesh_dir = self.MP.m_dir + '/lines'
elif interp_mesh.find('slice') != -1:
interp_mesh_dir = self.MP.m_dir + '/slices'
elif interp_mesh.find('path') != -1:
interp_mesh_dir = self.MP.m_dir + '/paths'
else:
lp(self.MP.logger, 50,
'Error! Interpolation target mesh name must '
'end either with "line", "slice" or "path" so far. '
'Aborting ...', post_dash=True)
raise SystemExit
if os.path.isfile(interp_mesh_dir + '/' +
interp_mesh + '.xml') is False:
lp(self.MP.logger, 30,
'Warning! Interpolation mesh "' + str(interp_mesh) + '.xml" '
'could not be found. Continuing ...')
return
lp(self.MP.logger, 20,
'... interpolating ' + quantity + ' on ' + '"' + interp_mesh +
'" in parallel ...', pre_dash=False)
target_mesh = df.Mesh(self.MP.mpi_cw, interp_mesh_dir +
'/' + interp_mesh + '.xml')
if df.MPI.rank(self.MP.mpi_cw) == 0:
serial_mesh = df.Mesh(self.MP.mpi_cs, interp_mesh_dir +
'/' + interp_mesh + '.xml')
else:
pass
if dg_interp:
V_target = df.VectorFunctionSpace(target_mesh, "DG", interp_p)
if df.MPI.rank(self.MP.mpi_cw) == 0:
V_serial = df.VectorFunctionSpace(serial_mesh, "DG", interp_p)
real_serial = df.Function(V_serial)
imag_serial = df.Function(V_serial)
else:
V_serial, real_serial, imag_serial = None, None, None
else:
V_target = df.VectorFunctionSpace(target_mesh, "CG", interp_p)
if df.MPI.rank(self.MP.mpi_cw) == 0:
V_serial = df.VectorFunctionSpace(serial_mesh, "CG", interp_p)
real_serial = df.Function(V_serial)
imag_serial = df.Function(V_serial)
else:
V_serial, real_serial, imag_serial = None, None, None
self.find_quantity(quantity, fs_type)
if fi == 0:
self.Q1 = [None for ti in range(self.MP.n_tx)]
self.Q2 = [None for ti in range(self.MP.n_tx)]
for ti in range(self.MP.n_tx):
if ti == 0 and fi == 0:
lp(self.MP.logger, 30,
'Warning! If the interpolation mesh has not enough '
'nodes to build the petsc transfer function for '
'parallel interpolation, the run will crash here.\n'
'Unfortunately, there is no possibility '
'to raise the error. Continuing ...')
dg1 = df.interpolate(self.q1[ti], self.PP.FS.V_dg)
dg2 = df.interpolate(self.q2[ti], self.PP.FS.V_dg)
d_interp1, d_interp2 = df.Function(V_target), df.Function(V_target)
if self.MP.n_freqs != 1 and fi == 0:
self.Q1[ti] = petsc_transfer_function(dg1, d_interp1, True)
self.Q2[ti] = petsc_transfer_function(dg2, d_interp2, True)
else:
petsc_transfer_function(dg1, d_interp1, q=self.Q1[ti])
petsc_transfer_function(dg2, d_interp2, q=self.Q2[ti])
d_interp1.rename(quantity + '_real', '')
d_interp2.rename(quantity + '_imag', '')
self.export_interpolation_data(
d_interp1, d_interp2, V_serial, fi, ti, export_name,
export_pvd, aux_h5=True,
real_serial=real_serial, imag_serial=imag_serial)
[docs]
def export_interpolation_data(self, field_real, field_imag, V_serial, fi,
ti, export_name, export_pvd, aux_h5=False,
real_serial=None, imag_serial=None):
"""
Export interpolated results as complex numpy arrays and *pvd* files
to be viewed in Paraview. The flag *aux_h5* is used to enable a
temorary h5 export for the parallelized interpolation, dumping the
interpolated, parallelized FEniCS functions to file.
This file is read again in serial to export a conform numpy array.
Note, this step requires insignificant effort as functions on the
interpolation meshes are usually way smaller than functions defined
on the original mesh.
"""
if self.MP.n_tx != 1:
export_name = 'tx_' + str(ti) + '_' + export_name
if self.MP.n_freqs != 1:
export_name = 'f_' + str(fi) + '_' + export_name
if aux_h5:
write_h5(self.MP.mpi_cw, field_real, self.interp_dir +
'/' + export_name + '_real.h5')
write_h5(self.MP.mpi_cw, field_imag, self.interp_dir +
'/' + export_name + '_imag.h5')
if df.MPI.rank(self.MP.mpi_cw) == 0:
read_h5(self.MP.mpi_cs, real_serial, self.interp_dir +
'/' + export_name + '_real.h5')
read_h5(self.MP.mpi_cs, imag_serial, self.interp_dir +
'/' + export_name + '_imag.h5')
field_real_npy = np.hstack((real_serial.vector().get_local(
).reshape(-1, 3), V_serial.tabulate_dof_coordinates(
).reshape(-1, 3)[0::3, :]))
field_imag_npy = np.hstack((imag_serial.vector().get_local(
).reshape(-1, 3), V_serial.tabulate_dof_coordinates(
).reshape(-1, 3)[0::3, :]))
complex_data = field_real_npy + 1j * field_imag_npy
np.save(self.interp_dir + '/' + export_name + '.npy',
complex_data)
os.remove(self.interp_dir + '/' + export_name + '_real.h5')
os.remove(self.interp_dir + '/' + export_name + '_imag.h5')
else:
pass
df.MPI.barrier(df.MPI.comm_world)
if export_pvd:
df.File(self.MP.mpi_cs, self.interp_dir + '/' +
export_name + '_real.pvd') << field_real
df.File(self.MP.mpi_cs, self.interp_dir + '/' +
export_name + '_imag.pvd') << field_imag
else:
field_real_npy = np.hstack((field_real.vector().get_local(
).reshape(-1, 3), V_serial.tabulate_dof_coordinates(
).reshape(-1, 3)[0::3, :]))
field_imag_npy = np.hstack((field_imag.vector().get_local(
).reshape(-1, 3), V_serial.tabulate_dof_coordinates(
).reshape(-1, 3)[0::3, :]))
complex_data = field_real_npy + 1j * field_imag_npy
np.save(self.interp_dir + '/' + export_name + '.npy', complex_data)
if export_pvd:
df.File(self.MP.mpi_cs, self.interp_dir + '/' +
export_name + '_real.pvd') << field_real
df.File(self.MP.mpi_cs, self.interp_dir + '/' +
export_name + '_imag.pvd') << field_imag