Source code for pysph.solver.utils

"""
Module contains some common functions.
"""

# standard imports
import pickle
import numpy
import sys
import os
import platform
import commands
import tempfile
import zipfile
from numpy.lib import format

from pysph.base.particle_array import ParticleArray
from pysph.base.utils import get_particle_array, get_particles_info

HAS_PBAR = True
try:
    import progressbar
except ImportError:
    HAS_PBAR = False

import pysph

def check_array(x, y):
    """Check if two arrays are equal with an absolute tolerance of
    1e-16."""
    return numpy.allclose(x, y, atol=1e-16, rtol=0)


def zipfile_factory(*args, **kwargs):
    if sys.version_info >= (2, 5):
        kwargs['allowZip64'] = True
    return zipfile.ZipFile(*args, **kwargs)

def savez(file, *args, **kwds):
    """
    Save several arrays into a single file in uncompressed ``.npz`` format.

    If arguments are passed in with no keywords, the corresponding variable
    names, in the .npz file, are 'arr_0', 'arr_1', etc. If keyword arguments
    are given, the corresponding variable names, in the ``.npz`` file will
    match the keyword names.

    Parameters
    ----------
    file : str or file
        Either the file name (string) or an open file (file-like object)
        where the data will be saved. If file is a string, the ``.npz``
        extension will be appended to the file name if it is not already there.
    *args : Arguments, optional
        Arrays to save to the file. Since it is not possible for Python to
        know the names of the arrays outside `savez`, the arrays will be saved
        with names "arr_0", "arr_1", and so on. These arguments can be any
        expression.
    **kwds : Keyword arguments, optional
        Arrays to save to the file. Arrays will be saved in the file with the
        keyword names.

    Returns
    -------
    None

    See Also
    --------
    save : Save a single array to a binary file in NumPy format.
    savetxt : Save an array to a file as plain text.

    Notes
    -----
    The ``.npz`` file format is a zipped archive of files named after the
    variables they contain.  The archive is not compressed and each file
    in the archive contains one variable in ``.npy`` format. For a
    description of the ``.npy`` format, see `format`.

    When opening the saved ``.npz`` file with `load` a `NpzFile` object is
    returned. This is a dictionary-like object which can be queried for
    its list of arrays (with the ``.files`` attribute), and for the arrays
    themselves.

    Examples
    --------
    >>> from tempfile import TemporaryFile
    >>> outfile = TemporaryFile()
    >>> x = np.arange(10)
    >>> y = np.sin(x)

    Using `savez` with *args, the arrays are saved with default names.

    >>> np.savez(outfile, x, y)
    >>> outfile.seek(0) # Only needed here to simulate closing & reopening file
    >>> npzfile = np.load(outfile)
    >>> npzfile.files
    ['arr_1', 'arr_0']
    >>> npzfile['arr_0']
    array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

    Using `savez` with **kwds, the arrays are saved with the keyword names.

    >>> outfile = TemporaryFile()
    >>> np.savez(outfile, x=x, y=y)
    >>> outfile.seek(0)
    >>> npzfile = np.load(outfile)
    >>> npzfile.files
    ['y', 'x']
    >>> npzfile['x']
    array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

    See Also
    --------
    numpy.savez_compressed : Save several arrays into a compressed .npz file
    format

    """
    _savez(file, args, kwds, False)

def savez_compressed(file, *args, **kwds):
    """
    Save several arrays into a single file in compressed ``.npz`` format.

    If keyword arguments are given, then filenames are taken from the keywords.
    If arguments are passed in with no keywords, then stored file names are
    arr_0, arr_1, etc.

    Parameters
    ----------
    file : string
        File name of .npz file.
    args : Arguments
        Function arguments.
    kwds : Keyword arguments
        Keywords.

    See Also
    --------
    numpy.savez : Save several arrays into an uncompressed .npz file format

    """
    _savez(file, args, kwds, True)

def _savez(file, args, kwds, compress):
    if isinstance(file, basestring):
        if not file.endswith('.npz'):
            file = file + '.npz'

    namedict = kwds
    for i, val in enumerate(args):
        key = 'arr_%d' % i
        if key in namedict.keys():
            msg = "Cannot use un-named variables and keyword %s" % key
            raise ValueError, msg
        namedict[key] = val

    if compress:
        compression = zipfile.ZIP_DEFLATED
    else:
        compression = zipfile.ZIP_STORED

    zip = zipfile_factory(file, mode="w", compression=compression)

    # Stage arrays in a temporary file on disk, before writing to zip.
    fd, tmpfile = tempfile.mkstemp(suffix='-numpy.npy')
    os.close(fd)
    try:
        for key, val in namedict.iteritems():
            fname = key + '.npy'
            fid = open(tmpfile, 'wb')
            try:
                format.write_array(fid, numpy.asanyarray(val))
                fid.close()
                fid = None
                zip.write(tmpfile, arcname=fname)
            finally:
                if fid:
                    fid.close()
    finally:
        os.remove(tmpfile)

    zip.close()

#############################################################################


def get_distributed_particles(pa, comm, cell_size):
    # FIXME: this can be removed once the examples all use Application.
    from pysph.parallel.load_balancer import LoadBalancer
    rank = comm.Get_rank()
    num_procs = comm.Get_size()

    if rank == 0:
        lb = LoadBalancer.distribute_particles(pa, num_procs=num_procs,
                                               block_size=cell_size)
    else:
        lb = None

    particles = comm.scatter(lb, root=0)

    return particles


################################################################################
# `PBar` class.
###############################################################################
class PBar(object):
    """A simple wrapper around the progressbar so it works if a user has
    it installed or not.
    """
    def __init__(self, maxval, show=True):
        bar = None
        self.count = 0
        self.maxval = maxval
        self.show = show
        if HAS_PBAR and show:
            widgets = [progressbar.Percentage(), ' ', progressbar.Bar(),
                       progressbar.ETA()]
            bar = progressbar.ProgressBar(widgets=widgets,
                                          maxval=maxval).start()
        self.bar = bar

    def update(self, delta=1):
        self.count += delta
        if self.bar is not None:
            self.bar.update(self.count)
        elif self.show:
            sys.stderr.write('\r%d%%'%int(self.count*100/self.maxval))
            sys.stderr.flush()

    def finish(self):
        if self.bar is not None:
            self.bar.finish()
        elif self.show:
            sys.stderr.write('\r100%\n')

            sys.stderr.flush()


class FloatPBar(object):
    def __init__(self, t_initial, t_final, show=True):
        self.ticks = 1000
        self.bar = PBar(self.ticks, show)
        self.t_initial = t_initial
        self.t_final = t_final
        self.t_diff = t_final - t_initial

    def update(self, time):
        expected_count = int((time - self.t_initial)/self.t_diff*self.ticks)
        expected_count = min(expected_count, self.ticks)
        diff = max(expected_count - self.bar.count, 0)
        if diff > 0:
            self.bar.update(diff)

    def finish(self):
        self.bar.finish()

##############################################################################
# friendly mkdir  from http://code.activestate.com/recipes/82465/.
##############################################################################
def mkdir(newdir):
    """works the way a good mkdir should :)
        - already exists, silently complete
        - regular file in the way, raise an exception
        - parent directory(ies) does not exist, make them as well
    """
    if os.path.isdir(newdir):
        pass

    elif os.path.isfile(newdir):
        raise OSError("a file with the same name as the desired " \
                      "dir, '%s', already exists." % newdir)

    else:
        head, tail = os.path.split(newdir)

        if head and not os.path.isdir(head):
            mkdir(head)

        if tail:
            try:
                os.mkdir(newdir)
            # To prevent race in mpi runs
            except OSError as e:
                import errno
                if e.errno == errno.EEXIST and os.path.isdir(newdir):
                    pass
                else:
                    raise



##############################################################################
# read pickled data from a file
##############################################################################
def get_pickled_data(fname):

    f = open(fname, 'r')
    data = pickle.load(f)
    f.close()

    return data


def get_pysph_root():
    return os.path.split(pysph.__file__)[0]


##############################################################################
# Load an output file
##############################################################################

def _gather_array_data(all_array_data, comm):
    """Given array_data from the current processor and an MPI communicator,
    return a joined array_data from all processors on rank 0 and the same
    array_data on the other machines.
    """
    array_names = all_array_data.keys()

    # gather the data from all processors
    collected_data = comm.gather(all_array_data, root=0)

    if comm.Get_rank() == 0:
        all_array_data = {}
        size = comm.Get_size()

        # concatenate the arrays
        for array_name in array_names:
            array_data = {}
            all_array_data[array_name] = array_data

            _props = collected_data[0][array_name].keys()
            for prop in _props:
                data = [collected_data[pid][array_name][prop]
                            for pid in range(size)]
                prop_arr = numpy.concatenate(data)
                array_data[prop] = prop_arr

    return all_array_data

[docs]def dump(filename, particles, solver_data, detailed_output=False, only_real=True, mpi_comm=None): """Dump the given particles and solver data to the given filename. Parameters ---------- filename: str Filename to dump to. particles: sequence(ParticleArray) Sequence of particle arrays to dump. solver_data: dict Additional information to dump about solver state. detailed_output: bool Specifies if all arrays should be dumped. only_real: bool Only dump the real particles. mpi_comm: mpi4pi.MPI.Intracomm An MPI communicator to use for parallel commmunications. If `mpi_comm` is not passed or is set to None the local particles alone are dumped, otherwise only rank 0 dumps the output. """ particle_data = get_particles_info(particles) output_data = {"particles":particle_data, "solver_data":solver_data} all_array_data = {} for array in particles: all_array_data[array.name] = array.get_property_arrays( all=detailed_output, only_real=only_real ) # Gather particle data on root if this is in parallel. if mpi_comm is not None: all_array_data = _gather_array_data(all_array_data, mpi_comm) for name, arrays in all_array_data.iteritems(): particle_data[name]["arrays"] = arrays if mpi_comm is None or mpi_comm.Get_rank() == 0: savez(filename, version=2, **output_data)
def dump_v1(filename, particles, solver_data, detailed_output=False, only_real=True, mpi_comm=None): """Dump the given particles and solver data to the given filename using version 1. This is mainly used only for testing that we can continue to load older versions of the data files. """ all_array_data = {} output_data = {"arrays":all_array_data, "solver_data":solver_data} for array in particles: all_array_data[array.name] = array.get_property_arrays( all=detailed_output, only_real=only_real) # Gather particle data on root if mpi_comm is not None: all_array_data = _gather_array_data(all_array_data, mpi_comm) output_data['arrays'] = all_array_data if mpi_comm is None or mpi_comm.Get_rank() == 0: savez(filename, version=1, **output_data)
[docs]def load(fname): """Load and return data from an output (.npz) file dumped by PySPH. For output file version 1, the function returns a dictionary with the keys: ``"solver_data"`` : Solver constants at the time of output like time, time step and iteration count. ``"arrays"`` : ParticleArrays keyed on names with the ParticleArray object as value. Parameters ---------- fname : str Name of the file. Examples -------- >>> data = load('elliptical_drop_100.npz') >>> data.keys() ['arrays', 'solver_data'] >>> arrays = data['arrays'] >>> arrays.keys() ['fluid'] >>> fluid = arrays['fluid'] >>> type(fluid) pysph.base.particle_array.ParticleArray >>> data['solver_data'] {'count': 100, 'dt': 4.6416394784204199e-05, 't': 0.0039955855395528766} """ def _get_dict_from_arrays(arrays): arrays.shape = (1,) return arrays[0] data = numpy.load(fname) ret = {"arrays":{}} if not 'version' in data.files: msg = "Wrong file type! No version number recorded." raise RuntimeError(msg) version = data['version'] solver_data = _get_dict_from_arrays(data["solver_data"]) ret["solver_data"] = solver_data if version == 1: arrays = _get_dict_from_arrays(data["arrays"]) for array_name in arrays: array = get_particle_array(name=array_name, **arrays[array_name]) ret["arrays"][array_name] = array elif version == 2: particles = _get_dict_from_arrays(data["particles"]) for array_name, array_info in particles.iteritems(): array = ParticleArray(name=array_name, constants=array_info["constants"], **array_info["arrays"]) array.set_output_arrays( array_info.get('output_property_arrays', []) ) for prop, prop_info in array_info["properties"].iteritems(): if prop not in array_info["arrays"]: array.add_property(**prop_info) ret["arrays"][array_name] = array else: raise RuntimeError("Version not understood!") return ret
[docs]def load_and_concatenate(prefix,nprocs=1,directory=".",count=None): """Load the results from multiple files. Given a filename prefix and the number of processors, return a concatenated version of the dictionary returned via load. Parameters ---------- prefix : str A filename prefix for the output file. nprocs : int The number of processors (files) to read directory : str The directory for the files count : int The file iteration count to read. If None, the last available one is read """ if count is None: counts = [i.rsplit('_',1)[1][:-4] for i in os.listdir(directory) if i.startswith(prefix) and i.endswith('.npz')] counts = sorted( [int(i) for i in counts] ) count = counts[-1] arrays_by_rank = {} for rank in range(nprocs): fname = os.path.join(directory, prefix+'_'+str(rank)+'_'+str(count)+'.npz') data = load(fname) arrays_by_rank[rank] = data["arrays"] arrays = _concatenate_arrays(arrays_by_rank, nprocs) data["arrays"] = arrays return data
def _concatenate_arrays(arrays_by_rank, nprocs): """Concatenate arrays into one single particle array. """ if nprocs <= 0: return 0 array_names = arrays_by_rank[0].keys() first_processors_arrays = arrays_by_rank[0] if nprocs > 1: ret = {} for array_name in array_names: first_array = first_processors_arrays[array_name] for rank in range(1,nprocs): other_processors_arrays = arrays_by_rank[rank] other_array = other_processors_arrays[array_name] # append the other array to the first array first_array.append_parray(other_array) # remove the non local particles first_array.remove_tagged_particles(1) ret[array_name] = first_array else: ret = arrays_by_rank[0] return ret # SPH interpolation of data from pyzoltan.core.carray import UIntArray from pysph.base.kernels import Gaussian, get_compiled_kernel from pysph.base.nnps import LinkedListNNPS as NNPS class SPHInterpolate(object): """Class to perform SPH interpolation Given solution data on possibly a scattered set, SPHInterpolate can be used to interpolate solution data on a regular grid. """ def __init__(self, dim, dst, src, kernel=None): self.dst = dst; self.src = src if kernel is None: self.kernel = get_compiled_kernel(Gaussian(dim)) # create the neighbor locator object self.nnps = nnps = NNPS(dim=dim, particles=[dst, src], radius_scale=self.kernel.radius_scale) nnps.update() def interpolate(self, arr): """Interpolate data given in arr onto coordinate positions""" # the result array np = self.dst.get_number_of_particles() result = numpy.zeros(np) nbrs = UIntArray() # source arrays src = self.src sx, sy, sz, sh = src.get('x', 'y', 'z', 'h', only_real_particles=False) # dest arrays dst = self.dst dx, dy, dz, dh = dst.x, dst.y, dst.z, dst.h # kernel kernel = self.kernel for i in range(np): self.nnps.get_nearest_particles(src_index=1, dst_index=0, d_idx=i, nbrs=nbrs) nnbrs = nbrs.length _wij = 0.0; _sum = 0.0 for indexj in range(nnbrs): j = nbrs[indexj] hij = 0.5 * (sh[j] + dh[i]) wij = kernel.kernel(dx[i], dy[i], dz[i], sx[j], sy[j], sz[j], hij) _wij += wij _sum += arr[j] * wij # save the result result[i] = _sum/_wij return result def gradient(self, arr): """Compute the gradient on the interpolated data""" # the result array np = self.dst.get_number_of_particles() # result arrays resultx = numpy.zeros(np) resulty = numpy.zeros(np) resultz = numpy.zeros(np) nbrs = UIntArray() # data arrays # source arrays src = self.src sx, sy, sz, sh, srho, sm = src.get('x', 'y', 'z', 'h', 'rho', 'm', only_real_particles=False) # dest arrays dst = self.dst dx, dy, dz, dh = dst.x, dst.y, dst.z, dst.h # kernel kernel = self.kernel for i in range(np): self.nnps.get_nearest_particles(src_index=1, dst_index=0, d_idx=i, nbrs=nbrs) nnbrs = nbrs.length _wij = 0.0; _sumx = 0.0; _sumy = 0.0; _sumz = 0.0 for indexj in range(nnbrs): j = nbrs[indexj] hij = 0.5 * (sh[j] + dh[i]) wij = kernel.kernel(dx[i], dy[i], dz[i], sx[j], sy[j], sz[j], hij) rhoj = srho[j]; mj = sm[j] fji = arr[j] - arr[i] # kernel gradient dwij = kernel.gradient(dx[i], dy[i], dz[i], sx[j], sy[j], sz[j], hij) _wij += wij tmp = 1./rhoj * fji * mj _sumx += tmp * dwij.x _sumy += tmp * dwij.y _sumz += tmp * dwij.z # save the result resultx[i] = _sumx resulty[i] = _sumy resultz[i] = _sumz return resultx, resulty, resultz
[docs]def get_files(dirname=None, fname=None, endswith=".npz"): """Get all solution files in a given directory, `dirname`. Parameters ---------- dirname: str Name of directory. fname: str An initial part of the filename, if not specified use the first part of the dirname. endswith: str The extension of the file to load. """ if dirname is None: return [] if fname is None: fname = dirname.split("_output")[0] path = os.path.abspath( dirname ) files = os.listdir( path ) # get all the output files in the directory files = [f for f in files if f.startswith(fname) and f.endswith(endswith)] files = [os.path.join(path, f) for f in files] # sort the files def _sort_func(x, y): """Sort the files correctly.""" def _process(arg): a = os.path.splitext(arg)[0] return int(a[a.rfind('_')+1:]) return cmp(_process(x), _process(y)) files.sort(_sort_func) return files