Source code for pysph.tools.interpolator

# Standard library imports
from functools import reduce

# Library imports.
import numpy as np

# Package imports.
from pysph.base.utils import get_particle_array
from pysph.base.kernels import Gaussian
from pysph.base.nnps import LinkedListNNPS as NNPS
from pysph.sph.equation import Equation, Group
from pysph.sph.acceleration_eval import AccelerationEval
from pysph.sph.sph_compiler import SPHCompiler
from compyle.api import declare
from pysph.sph.wc.linalg import gj_solve, augmented_matrix
from pysph.sph.basic_equations import SummationDensity


[docs]class InterpolateFunction(Equation):
[docs] def initialize(self, d_idx, d_prop, d_number_density): d_prop[d_idx] = 0.0 d_number_density[d_idx] = 0.0
[docs] def loop(self, s_idx, d_idx, s_temp_prop, d_prop, d_number_density, WIJ): d_number_density[d_idx] += WIJ d_prop[d_idx] += WIJ*s_temp_prop[s_idx]
[docs] def post_loop(self, d_idx, d_prop, d_number_density): if d_number_density[d_idx] > 1e-12: d_prop[d_idx] /= d_number_density[d_idx]
[docs]class InterpolateSPH(Equation):
[docs] def initialize(self, d_idx, d_prop): d_prop[d_idx] = 0.0
[docs] def loop(self, d_idx, s_idx, s_rho, s_m, s_temp_prop, d_prop, WIJ): d_prop[d_idx] += s_m[s_idx]/s_rho[s_idx]*WIJ*s_temp_prop[s_idx]
[docs]class SPHFirstOrderApproximationPreStep(Equation): def __init__(self, dest, sources, dim=1): self.dim = dim super(SPHFirstOrderApproximationPreStep, self).__init__(dest, sources)
[docs] def initialize(self, d_idx, d_moment): i, j = declare('int', 2) for i in range(4): for j in range(4): d_moment[16*d_idx + j+4*i] = 0.0
[docs] def loop(self, d_idx, s_idx, d_h, s_h, s_x, s_y, s_z, d_x, d_y, d_z, s_rho, s_m, WIJ, XIJ, DWIJ, d_moment): Vj = s_m[s_idx] / s_rho[s_idx] i16 = declare('int') i16 = 16*d_idx d_moment[i16+0] += WIJ * Vj d_moment[i16+1] += -XIJ[0] * WIJ * Vj d_moment[i16+2] += -XIJ[1] * WIJ * Vj d_moment[i16+3] += -XIJ[2] * WIJ * Vj d_moment[i16+4] += DWIJ[0] * Vj d_moment[i16+8] += DWIJ[1] * Vj d_moment[i16+12] += DWIJ[2] * Vj d_moment[i16+5] += -XIJ[0] * DWIJ[0] * Vj d_moment[i16+6] += -XIJ[1] * DWIJ[0] * Vj d_moment[i16+7] += -XIJ[2] * DWIJ[0] * Vj d_moment[i16+9] += - XIJ[0] * DWIJ[1] * Vj d_moment[i16+10] += -XIJ[1] * DWIJ[1] * Vj d_moment[i16+11] += -XIJ[2] * DWIJ[1] * Vj d_moment[i16+13] += -XIJ[0] * DWIJ[2] * Vj d_moment[i16+14] += -XIJ[1] * DWIJ[2] * Vj d_moment[i16+15] += -XIJ[2] * DWIJ[2] * Vj
[docs]class SPHFirstOrderApproximation(Equation): """First order SPH approximation. The method used to solve the linear system in this function is not same as in the reference. In the function :math:`Ax=b` is solved where :math:`A := moment` (Moment matrix) and :math:`b := p_sph` (Property calculated using basic SPH). The calculation need the "moment" to be evaluated before this step which is done in `SPHFirstOrderApproximationPreStep` References ----------- .. [Liu2006] M.B. Liu, G.R. Liu, "Restoring particle consistency in smoothed particle hydrodynamics", Applied Numerical Mathematics Volume 56, Issue 1 2006, Pages 19-36, ISSN 0168-9274 """ def _get_helpers_(self): return [gj_solve, augmented_matrix] def __init__(self, dest, sources, dim=1): self.dim = dim super(SPHFirstOrderApproximation, self).__init__(dest, sources)
[docs] def initialize(self, d_idx, d_prop, d_p_sph): i = declare('int') for i in range(3): d_prop[4*d_idx+i] = 0.0 d_p_sph[4*d_idx+i] = 0.0
[docs] def loop(self, d_idx, d_h, s_h, s_x, s_y, s_z, d_x, d_y, d_z, s_rho, s_m, WIJ, DWIJ, s_temp_prop, d_p_sph, s_idx): i4 = declare('int') Vj = s_m[s_idx] / s_rho[s_idx] pj = s_temp_prop[s_idx] i4 = 4*d_idx d_p_sph[i4+0] += pj * WIJ * Vj d_p_sph[i4+1] += pj * DWIJ[0] * Vj d_p_sph[i4+2] += pj * DWIJ[1] * Vj d_p_sph[i4+3] += pj * DWIJ[2] * Vj
[docs] def post_loop(self, d_idx, d_moment, d_prop, d_p_sph): a_mat = declare('matrix(16)') aug_mat = declare('matrix(20)') b = declare('matrix(4)') res = declare('matrix(4)') i, n, i16, i4 = declare('int', 4) i16 = 16*d_idx i4 = 4*d_idx for i in range(16): a_mat[i] = d_moment[i16+i] for i in range(20): aug_mat[i] = 0.0 for i in range(4): b[i] = d_p_sph[4*d_idx+i] res[i] = 0.0 n = self.dim + 1 augmented_matrix(a_mat, b, n, 1, 4, aug_mat) gj_solve(aug_mat, n, 1, res) for i in range(4): d_prop[i4+i] = res[i]
[docs]def get_bounding_box(particle_arrays, tight=False, stretch=0.05): """Find the size of the domain given a sequence of particle arrays. If `tight` is True, the bounds are tight, if not the domain is stretched along each dimension by an amount `stretch` specified as a percentage of the length along that dimension is added in each dimension. """ xmin, xmax = 1e20, -1e20 ymin, ymax = 1e20, -1e20 zmin, zmax = 1e20, -1e20 for pa in particle_arrays: x, y, z = pa.x, pa.y, pa.z xmin = min(xmin, x.min()) xmax = max(xmax, x.max()) ymin = min(ymin, y.min()) ymax = max(ymax, y.max()) zmin = min(zmin, z.min()) zmax = max(zmax, z.max()) bounds = np.asarray((xmin, xmax, ymin, ymax, zmin, zmax)) if not tight: # Add the extra space. lengths = stretch*np.repeat(bounds[1::2] - bounds[::2], 2) lengths[::2] *= -1.0 bounds += lengths return bounds
[docs]def get_nx_ny_nz(num_points, bounds): """Given a number of points to use and the bounds, return a triplet of integers for a uniform mesh with approximately that many points. """ bounds = np.asarray(bounds, dtype=float) length = bounds[1::2] - bounds[::2] total_length = length.sum() rel_length = length/total_length non_zero = rel_length > 1e-3 dim = int(non_zero.sum()) volume = np.prod(length[non_zero]) delta = pow(volume/num_points, 1.0/dim) dimensions = np.ones(3, dtype=int) for i in range(3): if rel_length[i] > 1e-4: dimensions[i] = int(round(length[i]/delta)) return dimensions
[docs]class Interpolator(object): """Convenient class to interpolate particle properties onto a uniform grid or given set of particles. This is particularly handy for visualization. """ def __init__(self, particle_arrays, num_points=125000, kernel=None, x=None, y=None, z=None, domain_manager=None, equations=None, method='shepard'): """ The x, y, z coordinates need not be specified, and if they are not, the bounds of the interpolated domain is automatically computed and `num_points` number of points are used in this domain uniformly placed. Parameters ---------- particle_arrays: list A list of particle arrays. num_points: int the number of points to interpolate on to. kernel: Kernel the kernel to use for interpolation. x: ndarray the x-coordinate of points on which to interpolate. y: ndarray the y-coordinate of points on which to interpolate. z: ndarray the z-coordinate of points on which to interpolate. domain_manager: DomainManager An optional Domain manager for periodic domains. equations: sequence A sequence of equations or groups. Defaults to None. This is used only if the default interpolation equations are inadequate. method : str String with the following allowed methods: 'shepard', 'sph', 'order1' """ self._set_particle_arrays(particle_arrays) bounds = get_bounding_box(self.particle_arrays) shape = get_nx_ny_nz(num_points, bounds) self.dim = 3 - list(shape).count(1) if kernel is None: self.kernel = Gaussian(dim=self.dim) else: self.kernel = kernel self.pa = None self.nnps = None self.equations = equations self.func_eval = None self.domain_manager = domain_manager self.method = method if method not in ['sph', 'shepard', 'order1']: raise RuntimeError('%s method is not implemented' % (method)) if x is None and y is None and z is None: self.set_domain(bounds, shape) else: self.set_interpolation_points(x=x, y=y, z=z) # ## Interpolator protocol ###############################################
[docs] def set_interpolation_points(self, x=None, y=None, z=None): """Set the points on which we must interpolate the arrays. If any of x, y, z is not passed it is assumed to be 0.0 and shaped like the other non-None arrays. Parameters ---------- x: ndarray the x-coordinate of points on which to interpolate. y: ndarray the y-coordinate of points on which to interpolate. z: ndarray the z-coordinate of points on which to interpolate. """ tmp = None for tmp in (x, y, z): if tmp is not None: break if tmp is None: raise RuntimeError('At least one non-None array must be given.') def _get_array(_t): return np.asarray(_t) if _t is not None else np.zeros_like(tmp) x, y, z = _get_array(x), _get_array(y), _get_array(z) self.shape = x.shape self.pa = self._create_particle_array(x, y, z) arrays = self.particle_arrays + [self.pa] if self.func_eval is None: self._compile_acceleration_eval(arrays) self.update_particle_arrays(self.particle_arrays)
[docs] def set_domain(self, bounds, shape): """Set the domain to interpolate into. Parameters ---------- bounds: tuple (xmin, xmax, ymin, ymax, zmin, zmax) shape: tuple (nx, ny, nz) """ self.bounds = np.asarray(bounds) self.shape = np.asarray(shape) x, y, z = self._create_default_points(self.bounds, self.shape) self.set_interpolation_points(x, y, z)
[docs] def interpolate(self, prop, comp=0): """Interpolate given property. Parameters ---------- prop: str The name of the property to interpolate. comp: int The component of the gradient required Returns ------- A numpy array suitably shaped with the property interpolated. """ assert isinstance(comp, int), 'Error: only interger value is allowed' for array in self.particle_arrays: if prop not in array.properties: data = 0.0 else: data = array.get(prop, only_real_particles=False) array.get('temp_prop', only_real_particles=False)[:] = data self.func_eval.compute(0.0, 0.1) # These are junk arguments. if comp and (self.method in ['sph', 'shepard']): raise RuntimeError("Error: use 'order1' method to evaluate" "gradient") elif self.method in ['sph', 'shepard']: result = self.pa.prop.copy() else: if comp > 3: raise RuntimeError("Error: Only 0, 1, 2, 3 allowed") result = self.pa.prop[comp::4].copy() result.shape = self.shape return result.squeeze()
[docs] def update(self, update_domain=True): """Update the NNPS when particles have moved. If the update_domain is False, the domain is not updated. Use this when the arrays are the same but the particles have themselves changed. If the particle arrays themselves change use the `update_particle_arrays` method instead. """ if update_domain: self.nnps.update_domain() self.nnps.update()
[docs] def update_particle_arrays(self, particle_arrays): """Call this for a new set of particle arrays which have the same properties as before. For example, if you are reading the particle array data from files, each time you load a new file a new particle array is read with the same properties. Call this function to reset the arrays. """ self._set_particle_arrays(particle_arrays) arrays = self.particle_arrays + [self.pa] self._create_nnps(arrays) self.func_eval.update_particle_arrays(arrays)
# ### Private protocol ################################################### def _create_nnps(self, arrays): # create the neighbor locator object self.nnps = NNPS(dim=self.kernel.dim, particles=arrays, radius_scale=self.kernel.radius_scale, domain=self.domain_manager, cache=True) self.func_eval.set_nnps(self.nnps) def _create_default_points(self, bounds, shape): b = bounds n = shape x, y, z = np.mgrid[ b[0]:b[1]:n[0]*1j, b[2]:b[3]:n[1]*1j, b[4]:b[5]:n[2]*1j, ] return x, y, z def _create_particle_array(self, x, y, z): xr = x.ravel() yr = y.ravel() zr = z.ravel() self.x, self.y, self.z = x.squeeze(), y.squeeze(), z.squeeze() hmax = self._get_max_h_in_arrays() h = hmax*np.ones_like(xr) pa = get_particle_array( name='interpolate', x=xr, y=yr, z=zr, h=h, number_density=np.zeros_like(xr) ) if self.method in ['sph', 'shepard']: pa.add_property('prop') else: pa.add_property('moment', stride=16) pa.add_property('p_sph', stride=4) pa.add_property('prop', stride=4) return pa def _compile_acceleration_eval(self, arrays): names = [x.name for x in self.particle_arrays] if self.equations is None: if self.method == 'shepard': equations = [ InterpolateFunction(dest='interpolate', sources=names) ] elif self.method == 'sph': equations = [ InterpolateSPH(dest='interpolate', sources=names) ] else: equations = [ Group(equations=[ SummationDensity(dest=name, sources=names) for name in names], real=False), Group(equations=[ SPHFirstOrderApproximationPreStep(dest='interpolate', sources=names, dim=self.dim)], real=True), Group(equations=[ SPHFirstOrderApproximation(dest='interpolate', sources=names, dim=self.dim)], real=True) ] else: equations = self.equations self.func_eval = AccelerationEval(arrays, equations, self.kernel) compiler = SPHCompiler(self.func_eval, None) compiler.compile() def _get_max_h_in_arrays(self): hmax = -1.0 for array in self.particle_arrays: hmax = max(array.h.max(), hmax) return hmax def _set_particle_arrays(self, particle_arrays): self.particle_arrays = particle_arrays for array in self.particle_arrays: if 'temp_prop' not in array.properties: array.add_property('temp_prop')
[docs]def main(fname, prop, npoint): from pysph.solver.utils import load print("Loading", fname) data = load(fname) arrays = list(data['arrays'].values()) interp = Interpolator(arrays, num_points=npoint) print(interp.shape) print("Interpolating") prop = interp.interpolate(prop) print("Visualizing") from mayavi import mlab src = mlab.pipeline.scalar_field(interp.x, interp.y, interp.z, prop) if interp.dim == 3: mlab.pipeline.scalar_cut_plane(src) else: mlab.pipeline.surface(src) mlab.pipeline.outline(src) mlab.show()
if __name__ == '__main__': import sys if len(sys.argv) < 4: print("Usage: interpolator.py filename property num_points") sys.exit(1) else: main(sys.argv[1], sys.argv[2], int(sys.argv[3]))