Source code for custEM.post.interp_tools_td

# -*- coding: utf-8 -*-
"""
@author: Rochlitz.R
"""


from custEM.post import InterpolationBase
import dolfin as df
import os
import numpy as np
from custEM.misc import logger_print as lp
from custEM.misc import read_h5
from custEM.misc import write_h5
from custEM.misc import petsc_transfer_function
from custEM.misc import specify_td_export_strings


[docs] class TimeDomainInterpolator(InterpolationBase): """ Interpolator class for interpolating 3D modeling results on arbitrary meshes, which can be generated using this class as well. DOCS TO BE FILLED """ def __init__(self, PP, dg_interpolation, self_mode): """ The Interpolator class is initialized via the **MOD** class. Required Arguments: ------------------- - PP; type class **Pre**- or **PostProcessing** instance. - dg_interpolation, type bool specifies target function space to be discontinuous if *True* - self_mode, type bool flag for embedded serial MOD calls for interpolation """ super(self.__class__, self).__init__(PP, dg_interpolation, self_mode) self.print_dummy = True
[docs] def interpolate(self, interp_meshes, EH=['E', 'H'], interp_p=None, dg_interp=False, export_pvd=True, interp_dir=None): """ Customized interpolation function. Required arguments ------------------ - interp_mesh, type str name of the mesh to interpolate on Keyword arguments ----------------- - mute = False, type bool set True, if info-interpolation messages should not be printed - interp_p = 2, type int polyonimal order of basis functions on interpolation mesh - interp_dir, type str specify, if a custom interpolation directory is desired; otherwise, the defualt interpolation directory is located in the corresponding results directory - dg_interpolation = None, type bool set True or False to overwrite *self.dg_interpolation* for manually enabling discontinuous or continuous interpolation """ if interp_dir is not None: self.interp_dir = interp_dir if interp_p is None: interp_p = self.interp_p if type(EH) is str: EH = [EH] if df.MPI.rank(self.MP.mpi_cw) == 0: if not os.path.isdir(self.interp_dir): os.mkdir(self.interp_dir) else: pass # support old syntax, using only a single interpolation mesh and # multiple calls of interpolate if type(interp_meshes) is not list: interp_meshes = [interp_meshes] interp_mesh_dir = self.check_interpolation_meshes( interp_meshes, self.MP.m_dir, self.MP.logger) if self.print_dummy: lp(self.MP.logger, 20, 'Interpolation of results:') lp(self.MP.logger, 20, ' - stored in "' + self.interp_dir + '" - ', pre_dash=False) if self.PP.FE.n_times > 1: lp(self.MP.logger, 20, '... interpolating ' + str(self.PP.FE.n_times) + ' times ...', pre_dash=False) if self.MP.n_tx > 1: lp(self.MP.logger, 20, '... interpolating ' + str(self.MP.n_tx) + ' transmitters per observation time ...', pre_dash=False) self.print_dummy = False E_str, H_str, mode_e, mode_h = specify_td_export_strings(self.PP.FE) for quantity in EH: lp(self.MP.logger, 20, '... interpolating ' + quantity + ' on ' + str(len(interp_meshes)) + ' interpolation mesh(es) in serial ' '...', pre_dash=False) for ii in range(len(interp_meshes)): for j in range(self.PP.FE.n_times): t_str = 't{:d}'.format(j) + '_data' self.rank += 1 if df.MPI.rank(self.MP.mpi_cw) == self.rank: lp(self.MP.logger, 10, '... process ' + str(int(df.MPI.rank(self.MP.mpi_cw))) + ' starts interpolation at time ' + str(j) + ' ...', pre_dash=False, barrier=False, root_only=False) target_mesh = df.Mesh(self.MP.mpi_cs, interp_mesh_dir[ii] + '/' + interp_meshes[ii] + '.xml') V_target = df.VectorFunctionSpace(target_mesh, "CG", self.interp_p) mesh = df.Mesh(self.MP.mpi_cs) hdf5 = df.HDF5File(self.MP.mpi_cs, self.MP.m_dir + '/_' + self.MP.file_format + '/' + self.MP.mesh_name + '.' + self.MP.file_format, "r") hdf5.read(mesh, '/mesh', False) V = df.FunctionSpace(mesh, 'N1curl', self.PP.FE.FS.p) for quantity in EH: if 'E' in quantity: string, mode = E_str, mode_e if 'H' in quantity: string, mode = H_str, mode_h F = [df.Function(V) for ti in range(self.MP.n_tx)] for ti in range(self.MP.n_tx): export_name = string + '_' + mode + \ interp_meshes[ii] if self.MP.n_tx != 1: export_name = 'tx_{:d}_'.format(ti) + \ export_name export_name = 't_{:d}_'.format(j) + export_name try: read_h5( self.MP.mpi_cs, F[ti], self.MP.out_dir + '/' + self.MP.mod_name + '_' + string + '_' + mode[:-1] + '.' + self.MP.file_format, counter=ti, ri=t_str) except Exception as e: lp(self.MP.logger, 30, 'Warning! *' + quantity + '* for time ' + str(j) + ' and Tx ' + str(ti) + ' could not be imported.\n' 'Skipping this interpolation task. ' 'Continuing ...', pre_dash=False, barrier=False, root_only=False) break else: d_interp = df.interpolate(F[ti], V_target) data_npy = np.hstack((d_interp.vector( ).get_local().reshape(-1, 3), V_target.tabulate_dof_coordinates( ).reshape(-1, 3)[0::3, :])) np.save(self.interp_dir + '/' + export_name + '.npy', data_npy) d_interp.rename(quantity, '') if export_pvd: df.File(self.MP.mpi_cs, self.interp_dir + '/' + export_name + '.pvd') << d_interp else: pass if self.rank == self.max_procs - 1: self.rank = -1 lp(self.MP.logger, 10, '... waiting for all mpi processes to finish ' 'serial interpolation tasks ...', pre_dash=False) # synchronize after one interpolation task has finished lp(self.MP.logger, 10, '... synchronization finished ...', pre_dash=False, post_dash=False) self.rank = -1
[docs] def interpolate_parallel(self, interp_mesh, quantity='EH', mute=True, interp_p=None, dg_interp=False, interp_dir=None): """ Customized interpolation function. Required arguments: ------------------- - interp_mesh, type str name of the mesh to interpolate on. Keyword arguments: ------------------ - mute = False, type bool set True, if info-interpolation messages should not be printed. - interp_p = 2, type int polyonimal order of interpolation mesh *FunctionSpaces* - interp_dir, type str specify, if a custom interpolation directory is desired, otherwise, the defualt interpolation directory is located in the corresponding results directory - dg_interpolation = None, type bool set True or False to overwrite *self.dg_interpolation* for manually enabling discontinuous or continuous interpolation """ lp(self.MP.logger, 50, 'Error! Deprecated version of parallel interpolation was ' 'not updated to the ' 'support for multiple tx for the time-domain approaches as it ' 'is comparatively slow compared to serial interpolation, ' 'in particular, for multiple tx. If required, it can be ' 're-implemented with significantly better performance using ' 'manually built interpolation matrices. Aborting ...') raise SystemExit if interp_dir is not None: self.interp_dir = interp_dir if interp_p is None: interp_p = self.interp_p if df.MPI.rank(self.MP.mpi_cw) == 0: if not os.path.isdir(self.interp_dir): os.mkdir(self.interp_dir) else: pass if interp_mesh.find('line') is not -1: interp_mesh_dir = self.MP.m_dir + '/lines' elif interp_mesh.find('slice') is not -1: interp_mesh_dir = self.MP.m_dir + '/slices' elif interp_mesh.find('path') is not -1: interp_mesh_dir = self.MP.m_dir + '/paths' else: lp(self.MP.logger, 50, 'Error! Interpolation target mesh name must ' 'end either with "line", "slice" or "path" so far. ' 'Aborting ...', post_dash=True) raise SystemExit if os.path.isfile(interp_mesh_dir + '/' + interp_mesh + '.xml') is False: lp(self.MP.logger, 30, 'Warning! Interpolation mesh "' + str(interp_mesh) + '.xml" ' 'could not be found. Continuing ...') return target_mesh = df.Mesh(self.MP.mpi_cw, interp_mesh_dir + '/' + interp_mesh + '.xml') if df.MPI.rank(self.MP.mpi_cw) == 0: serial_mesh = df.Mesh(self.MP.mpi_cs, interp_mesh_dir + '/' + interp_mesh + '.xml') else: pass if dg_interp: V_target = df.VectorFunctionSpace(target_mesh, "DG", interp_p) if df.MPI.rank(self.MP.mpi_cw) == 0: V_serial = df.VectorFunctionSpace(serial_mesh, "DG", interp_p) E_serial = df.Function(V_serial) H_serial = df.Function(V_serial) else: V_serial, E_serial, H_serial = None, None, None else: V_target = df.VectorFunctionSpace(target_mesh, "CG", interp_p) if df.MPI.rank(self.MP.mpi_cw) == 0: V_serial = df.VectorFunctionSpace(serial_mesh, "CG", interp_p) E_serial = df.Function(V_serial) H_serial = df.Function(V_serial) else: V_serial, E_serial, H_serial = None, None, None V_dg = df.VectorFunctionSpace(self.PP.FE.FS.mesh, 'DG', 1) for j in range(self.PP.FE.n_times): export_name = self.MP.mod_name + '_' +\ interp_mesh + '_' + str(j) if not mute: lp(self.MP.logger, 20, ' - interpolation export directory is: "' + self.interp_dir[:-1] + ', "files named as: "' + export_name + '"', post_dash=True, barrier=False) if 'E' in quantity: lp(self.MP.logger, 20, '... interpolating ' + E_str + ' on *' + interp_mesh + '* in parallel ...', pre_dash=False) dg_E = df.interpolate(self.PP.FE.E[j], V_dg) d_interp_E = df.Function(V_target) petsc_transfer_function(dg_E, d_interp_E) d_interp_E.rename('E', '') if 'H' in quantity: lp(self.MP.logger, 20, '... interpolating ' + H_str + ' on *' + interp_mesh + '* in parallel ...', pre_dash=False) dg_H = df.interpolate(self.PP.FE.H[j], V_dg) d_interp_H = df.Function(V_target) petsc_transfer_function(dg_H, d_interp_H) d_interp_H.rename('H', '') self.export_interpolation_data( d_interp_E, d_interp_H, V_serial, j, export_name, True, aux_h5=True, E_serial=E_serial, H_serial=H_serial, EH=EH) lp(self.MP.logger, 10, '... time ' + str(j) + ' interpolated ...', pre_dash=False)
[docs] def export_interpolation_data(self, electric, magnetic, V_serial, t_id, export_name, export_pvd, aux_h5=False, E_serial=None, H_serial=None, EH='EH'): """ Export interpolated results as complex numpy arrays and *pvd* files to be viewed in Paraview. The flag *aux_h5* is used to enable a temorary h5 export for the parallelized interpolation, dumping the interpolated, parallelized FEniCS functions to file. This file is read again in serial to export a conform numpy array. Note, this step requires insignificant effort as functions on the interpolation meshes are usually way smaller than functions defined on the original mesh. """ E_str, H_str, mode_e, mode_h = specify_td_export_strings(self.PP.FE) if 'E' in EH: write_h5(self.MP.mpi_cw, electric, self.interp_dir + '/' + E_str + '_' + export_name + '.h5') if 'H' in EH: write_h5(self.MP.mpi_cw, magnetic, self.interp_dir + '/' + H_str + '_' + export_name + '.h5') if df.MPI.rank(self.MP.mpi_cw) == 0: if 'E' in EH: read_h5(self.MP.mpi_cs, E_serial, self.interp_dir + '/' + E_str + '_' + mode_e + export_name + '.h5') os.remove(self.interp_dir + '/' + E_str + '_' + mode_e + export_name + '.h5') electric_npy = np.hstack((E_serial.vector().get_local( ).reshape(-1, 3), V_serial.tabulate_dof_coordinates( ).reshape(-1, 3)[0::3, :])) np.save(self.interp_dir + '/' + E_str + '_' + mode_e + export_name + '.npy', electric_npy) if 'H' in EH: read_h5(self.MP.mpi_cs, H_serial, self.interp_dir + '/' + H_str + '_' + mode_h + export_name + '.h5') os.remove(self.interp_dir + '/' + H_str + '_' + mode_h + export_name + '.h5') magnetic_npy = np.hstack((H_serial.vector().get_local( ).reshape(-1, 3), V_serial.tabulate_dof_coordinates( ).reshape(-1, 3)[0::3, :])) np.save(self.interp_dir + '/' + H_str + '_' + mode_h + export_name + '.npy', magnetic_npy) else: pass df.MPI.barrier(df.MPI.comm_world) if export_pvd: if 'E' in EH: df.File(self.MP.mpi_cs, self.interp_dir + '/' + E_str + '_' + mode_e + export_name + '.pvd') << electric if 'H' in EH: df.File(self.MP.mpi_cs, self.interp_dir + '/' + H_str + '_' + mode_h + export_name + '.pvd') << magnetic
[docs] def eval_at_rx(self, rx_positions, rx_name, EH='EH', interp_dir=None, mute=True): """ will be added if required, so far no reasonable usage """ pass