"""
Module contains some common functions.
"""
# standard imports
import pickle
import numpy
import sys
import os
import platform
import commands
import tempfile
import zipfile
from numpy.lib import format
from pysph.base.particle_array import ParticleArray
from pysph.base.utils import get_particle_array, get_particles_info
HAS_PBAR = True
try:
import progressbar
except ImportError:
HAS_PBAR = False
import pysph
def check_array(x, y):
"""Check if two arrays are equal with an absolute tolerance of
1e-16."""
return numpy.allclose(x, y, atol=1e-16, rtol=0)
def zipfile_factory(*args, **kwargs):
if sys.version_info >= (2, 5):
kwargs['allowZip64'] = True
return zipfile.ZipFile(*args, **kwargs)
def savez(file, *args, **kwds):
"""
Save several arrays into a single file in uncompressed ``.npz`` format.
If arguments are passed in with no keywords, the corresponding variable
names, in the .npz file, are 'arr_0', 'arr_1', etc. If keyword arguments
are given, the corresponding variable names, in the ``.npz`` file will
match the keyword names.
Parameters
----------
file : str or file
Either the file name (string) or an open file (file-like object)
where the data will be saved. If file is a string, the ``.npz``
extension will be appended to the file name if it is not already there.
*args : Arguments, optional
Arrays to save to the file. Since it is not possible for Python to
know the names of the arrays outside `savez`, the arrays will be saved
with names "arr_0", "arr_1", and so on. These arguments can be any
expression.
**kwds : Keyword arguments, optional
Arrays to save to the file. Arrays will be saved in the file with the
keyword names.
Returns
-------
None
See Also
--------
save : Save a single array to a binary file in NumPy format.
savetxt : Save an array to a file as plain text.
Notes
-----
The ``.npz`` file format is a zipped archive of files named after the
variables they contain. The archive is not compressed and each file
in the archive contains one variable in ``.npy`` format. For a
description of the ``.npy`` format, see `format`.
When opening the saved ``.npz`` file with `load` a `NpzFile` object is
returned. This is a dictionary-like object which can be queried for
its list of arrays (with the ``.files`` attribute), and for the arrays
themselves.
Examples
--------
>>> from tempfile import TemporaryFile
>>> outfile = TemporaryFile()
>>> x = np.arange(10)
>>> y = np.sin(x)
Using `savez` with *args, the arrays are saved with default names.
>>> np.savez(outfile, x, y)
>>> outfile.seek(0) # Only needed here to simulate closing & reopening file
>>> npzfile = np.load(outfile)
>>> npzfile.files
['arr_1', 'arr_0']
>>> npzfile['arr_0']
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
Using `savez` with **kwds, the arrays are saved with the keyword names.
>>> outfile = TemporaryFile()
>>> np.savez(outfile, x=x, y=y)
>>> outfile.seek(0)
>>> npzfile = np.load(outfile)
>>> npzfile.files
['y', 'x']
>>> npzfile['x']
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
See Also
--------
numpy.savez_compressed : Save several arrays into a compressed .npz file
format
"""
_savez(file, args, kwds, False)
def savez_compressed(file, *args, **kwds):
"""
Save several arrays into a single file in compressed ``.npz`` format.
If keyword arguments are given, then filenames are taken from the keywords.
If arguments are passed in with no keywords, then stored file names are
arr_0, arr_1, etc.
Parameters
----------
file : string
File name of .npz file.
args : Arguments
Function arguments.
kwds : Keyword arguments
Keywords.
See Also
--------
numpy.savez : Save several arrays into an uncompressed .npz file format
"""
_savez(file, args, kwds, True)
def _savez(file, args, kwds, compress):
if isinstance(file, basestring):
if not file.endswith('.npz'):
file = file + '.npz'
namedict = kwds
for i, val in enumerate(args):
key = 'arr_%d' % i
if key in namedict.keys():
msg = "Cannot use un-named variables and keyword %s" % key
raise ValueError, msg
namedict[key] = val
if compress:
compression = zipfile.ZIP_DEFLATED
else:
compression = zipfile.ZIP_STORED
zip = zipfile_factory(file, mode="w", compression=compression)
# Stage arrays in a temporary file on disk, before writing to zip.
fd, tmpfile = tempfile.mkstemp(suffix='-numpy.npy')
os.close(fd)
try:
for key, val in namedict.iteritems():
fname = key + '.npy'
fid = open(tmpfile, 'wb')
try:
format.write_array(fid, numpy.asanyarray(val))
fid.close()
fid = None
zip.write(tmpfile, arcname=fname)
finally:
if fid:
fid.close()
finally:
os.remove(tmpfile)
zip.close()
#############################################################################
def get_distributed_particles(pa, comm, cell_size):
# FIXME: this can be removed once the examples all use Application.
from pysph.parallel.load_balancer import LoadBalancer
rank = comm.Get_rank()
num_procs = comm.Get_size()
if rank == 0:
lb = LoadBalancer.distribute_particles(pa, num_procs=num_procs,
block_size=cell_size)
else:
lb = None
particles = comm.scatter(lb, root=0)
return particles
################################################################################
# `PBar` class.
###############################################################################
class PBar(object):
"""A simple wrapper around the progressbar so it works if a user has
it installed or not.
"""
def __init__(self, maxval, show=True):
bar = None
self.count = 0
self.maxval = maxval
self.show = show
if HAS_PBAR and show:
widgets = [progressbar.Percentage(), ' ', progressbar.Bar(),
progressbar.ETA()]
bar = progressbar.ProgressBar(widgets=widgets,
maxval=maxval).start()
self.bar = bar
def update(self, delta=1):
self.count += delta
if self.bar is not None:
self.bar.update(self.count)
elif self.show:
sys.stderr.write('\r%d%%'%int(self.count*100/self.maxval))
sys.stderr.flush()
def finish(self):
if self.bar is not None:
self.bar.finish()
elif self.show:
sys.stderr.write('\r100%\n')
sys.stderr.flush()
class FloatPBar(object):
def __init__(self, t_initial, t_final, show=True):
self.ticks = 1000
self.bar = PBar(self.ticks, show)
self.t_initial = t_initial
self.t_final = t_final
self.t_diff = t_final - t_initial
def update(self, time):
expected_count = int((time - self.t_initial)/self.t_diff*self.ticks)
expected_count = min(expected_count, self.ticks)
diff = max(expected_count - self.bar.count, 0)
if diff > 0:
self.bar.update(diff)
def finish(self):
self.bar.finish()
##############################################################################
# friendly mkdir from http://code.activestate.com/recipes/82465/.
##############################################################################
def mkdir(newdir):
"""works the way a good mkdir should :)
- already exists, silently complete
- regular file in the way, raise an exception
- parent directory(ies) does not exist, make them as well
"""
if os.path.isdir(newdir):
pass
elif os.path.isfile(newdir):
raise OSError("a file with the same name as the desired " \
"dir, '%s', already exists." % newdir)
else:
head, tail = os.path.split(newdir)
if head and not os.path.isdir(head):
mkdir(head)
if tail:
try:
os.mkdir(newdir)
# To prevent race in mpi runs
except OSError as e:
import errno
if e.errno == errno.EEXIST and os.path.isdir(newdir):
pass
else:
raise
##############################################################################
# read pickled data from a file
##############################################################################
def get_pickled_data(fname):
f = open(fname, 'r')
data = pickle.load(f)
f.close()
return data
def get_pysph_root():
return os.path.split(pysph.__file__)[0]
##############################################################################
# Load an output file
##############################################################################
def _gather_array_data(all_array_data, comm):
"""Given array_data from the current processor and an MPI communicator,
return a joined array_data from all processors on rank 0 and the same
array_data on the other machines.
"""
array_names = all_array_data.keys()
# gather the data from all processors
collected_data = comm.gather(all_array_data, root=0)
if comm.Get_rank() == 0:
all_array_data = {}
size = comm.Get_size()
# concatenate the arrays
for array_name in array_names:
array_data = {}
all_array_data[array_name] = array_data
_props = collected_data[0][array_name].keys()
for prop in _props:
data = [collected_data[pid][array_name][prop]
for pid in range(size)]
prop_arr = numpy.concatenate(data)
array_data[prop] = prop_arr
return all_array_data
[docs]def dump(filename, particles, solver_data, detailed_output=False,
only_real=True, mpi_comm=None):
"""Dump the given particles and solver data to the given filename.
Parameters
----------
filename: str
Filename to dump to.
particles: sequence(ParticleArray)
Sequence of particle arrays to dump.
solver_data: dict
Additional information to dump about solver state.
detailed_output: bool
Specifies if all arrays should be dumped.
only_real: bool
Only dump the real particles.
mpi_comm: mpi4pi.MPI.Intracomm
An MPI communicator to use for parallel commmunications.
If `mpi_comm` is not passed or is set to None the local particles alone
are dumped, otherwise only rank 0 dumps the output.
"""
particle_data = get_particles_info(particles)
output_data = {"particles":particle_data, "solver_data":solver_data}
all_array_data = {}
for array in particles:
all_array_data[array.name] = array.get_property_arrays(
all=detailed_output, only_real=only_real
)
# Gather particle data on root if this is in parallel.
if mpi_comm is not None:
all_array_data = _gather_array_data(all_array_data, mpi_comm)
for name, arrays in all_array_data.iteritems():
particle_data[name]["arrays"] = arrays
if mpi_comm is None or mpi_comm.Get_rank() == 0:
savez(filename, version=2, **output_data)
def dump_v1(filename, particles, solver_data, detailed_output=False,
only_real=True, mpi_comm=None):
"""Dump the given particles and solver data to the given filename using
version 1. This is mainly used only for testing that we can continue
to load older versions of the data files.
"""
all_array_data = {}
output_data = {"arrays":all_array_data, "solver_data":solver_data}
for array in particles:
all_array_data[array.name] = array.get_property_arrays(
all=detailed_output, only_real=only_real)
# Gather particle data on root
if mpi_comm is not None:
all_array_data = _gather_array_data(all_array_data, mpi_comm)
output_data['arrays'] = all_array_data
if mpi_comm is None or mpi_comm.Get_rank() == 0:
savez(filename, version=1, **output_data)
[docs]def load(fname):
"""Load and return data from an output (.npz) file dumped by PySPH.
For output file version 1, the function returns a dictionary with
the keys:
``"solver_data"`` : Solver constants at the time of output like time,
time step and iteration count.
``"arrays"`` : ParticleArrays keyed on names with the ParticleArray object
as value.
Parameters
----------
fname : str
Name of the file.
Examples
--------
>>> data = load('elliptical_drop_100.npz')
>>> data.keys()
['arrays', 'solver_data']
>>> arrays = data['arrays']
>>> arrays.keys()
['fluid']
>>> fluid = arrays['fluid']
>>> type(fluid)
pysph.base.particle_array.ParticleArray
>>> data['solver_data']
{'count': 100, 'dt': 4.6416394784204199e-05, 't': 0.0039955855395528766}
"""
def _get_dict_from_arrays(arrays):
arrays.shape = (1,)
return arrays[0]
data = numpy.load(fname)
ret = {"arrays":{}}
if not 'version' in data.files:
msg = "Wrong file type! No version number recorded."
raise RuntimeError(msg)
version = data['version']
solver_data = _get_dict_from_arrays(data["solver_data"])
ret["solver_data"] = solver_data
if version == 1:
arrays = _get_dict_from_arrays(data["arrays"])
for array_name in arrays:
array = get_particle_array(name=array_name,
**arrays[array_name])
ret["arrays"][array_name] = array
elif version == 2:
particles = _get_dict_from_arrays(data["particles"])
for array_name, array_info in particles.iteritems():
array = ParticleArray(name=array_name,
constants=array_info["constants"],
**array_info["arrays"])
array.set_output_arrays(
array_info.get('output_property_arrays', [])
)
for prop, prop_info in array_info["properties"].iteritems():
if prop not in array_info["arrays"]:
array.add_property(**prop_info)
ret["arrays"][array_name] = array
else:
raise RuntimeError("Version not understood!")
return ret
[docs]def load_and_concatenate(prefix,nprocs=1,directory=".",count=None):
"""Load the results from multiple files.
Given a filename prefix and the number of processors, return a
concatenated version of the dictionary returned via load.
Parameters
----------
prefix : str
A filename prefix for the output file.
nprocs : int
The number of processors (files) to read
directory : str
The directory for the files
count : int
The file iteration count to read. If None, the last available
one is read
"""
if count is None:
counts = [i.rsplit('_',1)[1][:-4] for i in os.listdir(directory) if i.startswith(prefix) and i.endswith('.npz')]
counts = sorted( [int(i) for i in counts] )
count = counts[-1]
arrays_by_rank = {}
for rank in range(nprocs):
fname = os.path.join(directory, prefix+'_'+str(rank)+'_'+str(count)+'.npz')
data = load(fname)
arrays_by_rank[rank] = data["arrays"]
arrays = _concatenate_arrays(arrays_by_rank, nprocs)
data["arrays"] = arrays
return data
def _concatenate_arrays(arrays_by_rank, nprocs):
"""Concatenate arrays into one single particle array. """
if nprocs <= 0:
return 0
array_names = arrays_by_rank[0].keys()
first_processors_arrays = arrays_by_rank[0]
if nprocs > 1:
ret = {}
for array_name in array_names:
first_array = first_processors_arrays[array_name]
for rank in range(1,nprocs):
other_processors_arrays = arrays_by_rank[rank]
other_array = other_processors_arrays[array_name]
# append the other array to the first array
first_array.append_parray(other_array)
# remove the non local particles
first_array.remove_tagged_particles(1)
ret[array_name] = first_array
else:
ret = arrays_by_rank[0]
return ret
# SPH interpolation of data
from pyzoltan.core.carray import UIntArray
from pysph.base.kernels import Gaussian, get_compiled_kernel
from pysph.base.nnps import LinkedListNNPS as NNPS
class SPHInterpolate(object):
"""Class to perform SPH interpolation
Given solution data on possibly a scattered set, SPHInterpolate
can be used to interpolate solution data on a regular grid.
"""
def __init__(self, dim, dst, src, kernel=None):
self.dst = dst; self.src = src
if kernel is None:
self.kernel = get_compiled_kernel(Gaussian(dim))
# create the neighbor locator object
self.nnps = nnps = NNPS(dim=dim, particles=[dst, src],
radius_scale=self.kernel.radius_scale)
nnps.update()
def interpolate(self, arr):
"""Interpolate data given in arr onto coordinate positions"""
# the result array
np = self.dst.get_number_of_particles()
result = numpy.zeros(np)
nbrs = UIntArray()
# source arrays
src = self.src
sx, sy, sz, sh = src.get('x', 'y', 'z', 'h', only_real_particles=False)
# dest arrays
dst = self.dst
dx, dy, dz, dh = dst.x, dst.y, dst.z, dst.h
# kernel
kernel = self.kernel
for i in range(np):
self.nnps.get_nearest_particles(src_index=1, dst_index=0, d_idx=i, nbrs=nbrs)
nnbrs = nbrs.length
_wij = 0.0; _sum = 0.0
for indexj in range(nnbrs):
j = nbrs[indexj]
hij = 0.5 * (sh[j] + dh[i])
wij = kernel.kernel(dx[i], dy[i], dz[i], sx[j], sy[j], sz[j], hij)
_wij += wij
_sum += arr[j] * wij
# save the result
result[i] = _sum/_wij
return result
def gradient(self, arr):
"""Compute the gradient on the interpolated data"""
# the result array
np = self.dst.get_number_of_particles()
# result arrays
resultx = numpy.zeros(np)
resulty = numpy.zeros(np)
resultz = numpy.zeros(np)
nbrs = UIntArray()
# data arrays
# source arrays
src = self.src
sx, sy, sz, sh, srho, sm = src.get('x', 'y', 'z', 'h', 'rho', 'm', only_real_particles=False)
# dest arrays
dst = self.dst
dx, dy, dz, dh = dst.x, dst.y, dst.z, dst.h
# kernel
kernel = self.kernel
for i in range(np):
self.nnps.get_nearest_particles(src_index=1, dst_index=0, d_idx=i, nbrs=nbrs)
nnbrs = nbrs.length
_wij = 0.0; _sumx = 0.0; _sumy = 0.0; _sumz = 0.0
for indexj in range(nnbrs):
j = nbrs[indexj]
hij = 0.5 * (sh[j] + dh[i])
wij = kernel.kernel(dx[i], dy[i], dz[i], sx[j], sy[j], sz[j], hij)
rhoj = srho[j]; mj = sm[j]
fji = arr[j] - arr[i]
# kernel gradient
dwij = kernel.gradient(dx[i], dy[i], dz[i], sx[j], sy[j], sz[j], hij)
_wij += wij
tmp = 1./rhoj * fji * mj
_sumx += tmp * dwij.x
_sumy += tmp * dwij.y
_sumz += tmp * dwij.z
# save the result
resultx[i] = _sumx
resulty[i] = _sumy
resultz[i] = _sumz
return resultx, resulty, resultz
[docs]def get_files(dirname=None, fname=None, endswith=".npz"):
"""Get all solution files in a given directory, `dirname`.
Parameters
----------
dirname: str
Name of directory.
fname: str
An initial part of the filename, if not specified use the first
part of the dirname.
endswith: str
The extension of the file to load.
"""
if dirname is None:
return []
if fname is None:
fname = dirname.split("_output")[0]
path = os.path.abspath( dirname )
files = os.listdir( path )
# get all the output files in the directory
files = [f for f in files if f.startswith(fname) and f.endswith(endswith)]
files = [os.path.join(path, f) for f in files]
# sort the files
def _sort_func(x, y):
"""Sort the files correctly."""
def _process(arg):
a = os.path.splitext(arg)[0]
return int(a[a.rfind('_')+1:])
return cmp(_process(x), _process(y))
files.sort(_sort_func)
return files