Source code for custEM.post.interp_tools_fd

# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""

import dolfin as df
from custEM.post import InterpolationBase
import os
import custEM as ce
from custEM.misc import logger_print as lp
from custEM.misc import run_multiple_serial as rs_multiple
from custEM.misc import petsc_transfer_function
from custEM.core import MOD
from custEM.misc import read_h5
from custEM.misc import write_h5
import numpy as np


[docs] class FrequencyDomainInterpolator(InterpolationBase): """ Interpolator class for interpolating 3D modeling results on arbitrary meshes, which can be generated using this class as well. Notice ------ There are four ways of interpolation: 1. Interpolation is conducted in serial and might take some time, because exported solutions need to be imported again in serial for the serial interpolation. Anyway, multi-threading is automatically used to run several interpolation tasks simultaneously. 2. Parallel interpolation based on a PETSc transfer matrix. Fields are always interpolated on a discontinous Lagrange VectorFunctionSpace as auxiliary step, but do not require parallel export of the solutions on the computation mesh and serial import afterwards. That can save lots of time and disc space if only results on interpolation positions are of interest. 3. Function data are evaluated at single single locations (not implemented in this class yet). 4. Automated parallel interpolation using maunally created interpolation-matrices, see *InterpolationBase* Class internal functions ------------------------ - interpolate() this function is a wrapper to call the *interpolate_in_serial.py* file, which calls the FEniCS interpolation routines. - interpolate_parallel() parallel interpolation between different function spaces and meshes. - export_interpolation_data() export interpolated data """ def __init__(self, PP, dg_interpolation, self_mode, fs_type=None): """ The Interpolator class is initialized via the **MOD** class. Required Arguments: ------------------- - PP; type class **Pre**- or **PostProcessing** instance. - dg_interpolation, type bool specifies target function space to be discontinuous if *True* - self_mode, type bool flag for embedded serial MOD calls for interpolation - fs_type, type str either None, 'ned' or 'cg', only needed for interpolation purposes """ super(self.__class__, self).__init__(PP, dg_interpolation, self_mode) self.print_dummy = True if fs_type is None: self.fs_type = fs_type else: self.fs_type = 'ned'
[docs] def interpolate(self, interp_meshes, EH=['E_t', 'H_t'], interp_p=None, fs_type=None, dg_interpolation=None, export_name=None, export_pvd=True, interp_dir=None, freqs=None): """ Customized interpolation function. Required arguments: ------------------- - quantitiy, type str Either **E_t, H_t, E_s** or **H_s**. - interp_mesh, type str name of the mesh to interpolate on. Keyword arguments: ------------------ - mute = False, type bool set True, if info-interpolation messages should not be printed. - interp_p = 2, type int polyonimal order of interpolation mesh *FunctionSpaces* - fs_type = None, type str Specify if source functions are defined on Nedelec ("ned", default) or Lagrange ("cg") function spaces - dg_interpolation = None, type bool set True or False to overwrite *self.dg_interpolation* for manually enabling discontinuous or continuous interpolation - export_name = None, type str specify a custom export name of the interpolated data if desired. - interp_dir, type str specify, if a custom interpolation directory is desired, otherwise, the defualt interpolation directory is located in the corresponding results directory """ self.rank = -1 if interp_dir is not None: self.interp_dir = interp_dir if type(EH) is str: EH = [EH] if interp_p is None: interp_p = self.interp_p if dg_interpolation is None: dg_interp = self.dg_interpolation else: dg_interp = dg_interpolation if fs_type is None: fs_type = self.fs_type if self.print_dummy: if df.MPI.rank(self.MP.mpi_cw) == 0: if not os.path.isdir(self.interp_dir): os.mkdir(self.interp_dir) else: pass # support old syntax, using only a single interpolation mesh and # multiple calls of interpolate if type(interp_meshes) is not list: interp_meshes = [interp_meshes] interp_mesh_dir = self.check_interpolation_meshes( interp_meshes, self.MP.m_dir, self.MP.logger) if self.print_dummy: # backup of original debug level self.d_level = int(self.MP.logger.level) lp(self.MP.logger, 20, 'Interpolation of results:') lp(self.MP.logger, 20, ' - stored in "' + self.interp_dir + '" - ', pre_dash=False) if self.MP.n_freqs > 1: lp(self.MP.logger, 20, '... interpolating ' + str(self.MP.n_freqs) + ' frequencies ...', pre_dash=False) if self.MP.n_tx > 1: lp(self.MP.logger, 20, '... interpolating ' + str(self.MP.n_tx) + ' transmitters per frequency ...', pre_dash=False) self.print_dummy = False for quantity in EH: lp(self.MP.logger, 20, '... interpolating ' + quantity + ' on ' + str(len(interp_meshes)) + ' interpolation mesh(es) in serial ' '...', pre_dash=False) # use warning log level to suppress unnecessary prints if debugging # level is not specified explicitly single_d_level = 30 if self.d_level < 11: single_d_level = self.d_level M = MOD(self.MP.mod_name, self.MP.mesh_name, self.MP.approach, overwrite_results=False, p=self.PP.FS.p, file_format=self.MP.file_format, self_mode=True, load_existing=True, import_freq=0, debug_level=single_d_level, r_dir=self.MP.r_dir, para_dir=self.MP.para_dir, mute=True, test_mode=self.PP.FS.test_mode, m_dir=self.MP.m_dir, fs_type=fs_type, dg_interpolation=self.dg_interpolation) for ii in range(len(interp_meshes)): for fi in range(self.MP.n_freqs): if freqs is not None: if fi not in freqs: continue M.PP.import_freq = fi for quantity in EH: self.rank += 1 if export_name is None: f_name = quantity + '_' + interp_meshes[ii] else: if len(interp_meshes) == 1: f_name = export_name else: f_name = export_name + '_imesh_' + str(ii) if df.MPI.rank(self.MP.mpi_cw) == self.rank: lp(self.MP.logger, 10, '... process ' + str(int( df.MPI.rank(self.MP.mpi_cw))) + ' starts ' + quantity + ' interpolation for frequency ' + str(fi) + ' ...', pre_dash=False, barrier=False, root_only=False) M.PP.import_selected_results([quantity], self.MP.file_format) M.IB.find_quantity(quantity, fs_type) target_mesh = df.Mesh( self.MP.mpi_cs, interp_mesh_dir[ii] + '/' + interp_meshes[ii] + '.xml') if dg_interp: V_target = df.VectorFunctionSpace( target_mesh, "DG", interp_p) else: V_target = df.VectorFunctionSpace( target_mesh, "CG", interp_p) for ti in range(self.MP.n_tx): if M.IB.q1 is None or M.IB.q2 is None: lp(self.MP.logger, 30, 'Warning! *' + quantity + '* for frequency ' + str(fi) + ' could not be imported.\n' 'Skipping this interpolation task. ' 'Continuing ...', pre_dash=False, barrier=False, root_only=False) break d_interp1 = df.interpolate(M.IB.q1[ti], V_target) d_interp2 = df.interpolate(M.IB.q2[ti], V_target) d_interp1.rename(quantity + '_real', '') d_interp2.rename(quantity + '_imag', '') self.export_interpolation_data( d_interp1, d_interp2, V_target, fi, ti, f_name, export_pvd) else: pass if self.rank == self.max_procs - 1: self.rank = -1 lp(self.MP.logger, 10, '... waiting for all mpi processes to finish ' 'serial interpolation tasks ...', pre_dash=False) # switch back to specified debug level of main model self.MP.logger.handlers[0].setLevel(self.d_level) self.MP.logger.setLevel(self.d_level) # synchronize after one interpolation task has finished lp(self.MP.logger, 10, '... synchronization finished ...', pre_dash=False, post_dash=False) self.rank = -1
[docs] def interpolate_parallel(self, interp_mesh, quantity, mute=True, interp_p=None, fs_type=None, dg_interpolation=None, export_name=None, export_pvd=True, interp_dir=None, fi=0): """ Interpolation in parallel between different function spaces and different meshes. Note that a discontinous Lagrange VectorFunctionSpace might be required as auxiliary step to interpolate between Nedelec and Lagrange spaces. """ if export_name is None: export_name = quantity + '_' + interp_mesh if interp_dir is not None: self.interp_dir = interp_dir if interp_p is None: interp_p = self.interp_p if dg_interpolation is None: dg_interp = self.dg_interpolation else: dg_interp = dg_interpolation if fs_type is None: fs_type = self.fs_type if not mute: lp(self.MP.logger, 20, 'Interpolation export directory is: "' + self.interp_dir[:-1] + ', "files named as: "' + export_name + '"', post_dash=True, barrier=False) if df.MPI.rank(self.MP.mpi_cw) == 0: if not os.path.isdir(self.interp_dir): os.mkdir(self.interp_dir) else: pass if interp_mesh.find('line') != -1: interp_mesh_dir = self.MP.m_dir + '/lines' elif interp_mesh.find('slice') != -1: interp_mesh_dir = self.MP.m_dir + '/slices' elif interp_mesh.find('path') != -1: interp_mesh_dir = self.MP.m_dir + '/paths' else: lp(self.MP.logger, 50, 'Error! Interpolation target mesh name must ' 'end either with "line", "slice" or "path" so far. ' 'Aborting ...', post_dash=True) raise SystemExit if os.path.isfile(interp_mesh_dir + '/' + interp_mesh + '.xml') is False: lp(self.MP.logger, 30, 'Warning! Interpolation mesh "' + str(interp_mesh) + '.xml" ' 'could not be found. Continuing ...') return lp(self.MP.logger, 20, '... interpolating ' + quantity + ' on ' + '"' + interp_mesh + '" in parallel ...', pre_dash=False) target_mesh = df.Mesh(self.MP.mpi_cw, interp_mesh_dir + '/' + interp_mesh + '.xml') if df.MPI.rank(self.MP.mpi_cw) == 0: serial_mesh = df.Mesh(self.MP.mpi_cs, interp_mesh_dir + '/' + interp_mesh + '.xml') else: pass if dg_interp: V_target = df.VectorFunctionSpace(target_mesh, "DG", interp_p) if df.MPI.rank(self.MP.mpi_cw) == 0: V_serial = df.VectorFunctionSpace(serial_mesh, "DG", interp_p) real_serial = df.Function(V_serial) imag_serial = df.Function(V_serial) else: V_serial, real_serial, imag_serial = None, None, None else: V_target = df.VectorFunctionSpace(target_mesh, "CG", interp_p) if df.MPI.rank(self.MP.mpi_cw) == 0: V_serial = df.VectorFunctionSpace(serial_mesh, "CG", interp_p) real_serial = df.Function(V_serial) imag_serial = df.Function(V_serial) else: V_serial, real_serial, imag_serial = None, None, None self.find_quantity(quantity, fs_type) if fi == 0: self.Q1 = [None for ti in range(self.MP.n_tx)] self.Q2 = [None for ti in range(self.MP.n_tx)] for ti in range(self.MP.n_tx): if ti == 0 and fi == 0: lp(self.MP.logger, 30, 'Warning! If the interpolation mesh has not enough ' 'nodes to build the petsc transfer function for ' 'parallel interpolation, the run will crash here.\n' 'Unfortunately, there is no possibility ' 'to raise the error. Continuing ...') dg1 = df.interpolate(self.q1[ti], self.PP.FS.V_dg) dg2 = df.interpolate(self.q2[ti], self.PP.FS.V_dg) d_interp1, d_interp2 = df.Function(V_target), df.Function(V_target) if self.MP.n_freqs != 1 and fi == 0: self.Q1[ti] = petsc_transfer_function(dg1, d_interp1, True) self.Q2[ti] = petsc_transfer_function(dg2, d_interp2, True) else: petsc_transfer_function(dg1, d_interp1, q=self.Q1[ti]) petsc_transfer_function(dg2, d_interp2, q=self.Q2[ti]) d_interp1.rename(quantity + '_real', '') d_interp2.rename(quantity + '_imag', '') self.export_interpolation_data( d_interp1, d_interp2, V_serial, fi, ti, export_name, export_pvd, aux_h5=True, real_serial=real_serial, imag_serial=imag_serial)
[docs] def export_interpolation_data(self, field_real, field_imag, V_serial, fi, ti, export_name, export_pvd, aux_h5=False, real_serial=None, imag_serial=None): """ Export interpolated results as complex numpy arrays and *pvd* files to be viewed in Paraview. The flag *aux_h5* is used to enable a temorary h5 export for the parallelized interpolation, dumping the interpolated, parallelized FEniCS functions to file. This file is read again in serial to export a conform numpy array. Note, this step requires insignificant effort as functions on the interpolation meshes are usually way smaller than functions defined on the original mesh. """ if self.MP.n_tx != 1: export_name = 'tx_' + str(ti) + '_' + export_name if self.MP.n_freqs != 1: export_name = 'f_' + str(fi) + '_' + export_name if aux_h5: write_h5(self.MP.mpi_cw, field_real, self.interp_dir + '/' + export_name + '_real.h5') write_h5(self.MP.mpi_cw, field_imag, self.interp_dir + '/' + export_name + '_imag.h5') if df.MPI.rank(self.MP.mpi_cw) == 0: read_h5(self.MP.mpi_cs, real_serial, self.interp_dir + '/' + export_name + '_real.h5') read_h5(self.MP.mpi_cs, imag_serial, self.interp_dir + '/' + export_name + '_imag.h5') field_real_npy = np.hstack((real_serial.vector().get_local( ).reshape(-1, 3), V_serial.tabulate_dof_coordinates( ).reshape(-1, 3)[0::3, :])) field_imag_npy = np.hstack((imag_serial.vector().get_local( ).reshape(-1, 3), V_serial.tabulate_dof_coordinates( ).reshape(-1, 3)[0::3, :])) complex_data = field_real_npy + 1j * field_imag_npy np.save(self.interp_dir + '/' + export_name + '.npy', complex_data) os.remove(self.interp_dir + '/' + export_name + '_real.h5') os.remove(self.interp_dir + '/' + export_name + '_imag.h5') else: pass df.MPI.barrier(df.MPI.comm_world) if export_pvd: df.File(self.MP.mpi_cs, self.interp_dir + '/' + export_name + '_real.pvd') << field_real df.File(self.MP.mpi_cs, self.interp_dir + '/' + export_name + '_imag.pvd') << field_imag else: field_real_npy = np.hstack((field_real.vector().get_local( ).reshape(-1, 3), V_serial.tabulate_dof_coordinates( ).reshape(-1, 3)[0::3, :])) field_imag_npy = np.hstack((field_imag.vector().get_local( ).reshape(-1, 3), V_serial.tabulate_dof_coordinates( ).reshape(-1, 3)[0::3, :])) complex_data = field_real_npy + 1j * field_imag_npy np.save(self.interp_dir + '/' + export_name + '.npy', complex_data) if export_pvd: df.File(self.MP.mpi_cs, self.interp_dir + '/' + export_name + '_real.pvd') << field_real df.File(self.MP.mpi_cs, self.interp_dir + '/' + export_name + '_imag.pvd') << field_imag