"""
Predictive-Corrective Incompressible SPH (PCISPH)
#################################################
References
-----------
.. [SolPaj2009] B. Solenthaler, R. Pajarola "Predictive-Corrective
Incompressible SPH", ACM Trans. Graph 28 (2009), pp. 1--6.
"""
import numpy as np
from pysph.sph.integrator import Integrator
from pysph.sph.equation import Equation, Group
from pysph.base.utils import get_particle_array
from pysph.sph.integrator_step import IntegratorStep
from pysph.sph.scheme import Scheme, add_bool_argument
[docs]def get_particle_array_pcisph(constants=None, **props):
pcisph_props = [
'au', 'av', 'aw', 'arho', 'dwij2', 'u0', 'v0', 'w0', 'aup', 'avp',
'awp', 'x0', 'y0', 'z0', 'rho0'
]
pa = get_particle_array(
constants=constants, additional_props=pcisph_props, **props
)
pa.add_constant('iters', np.zeros(10000))
pa.add_property('dw', stride=3)
pa.add_output_arrays(['p', 'dwij2'])
return pa
[docs]class PCISPHIntegrator(Integrator):
[docs] def one_timestep(self, t, dt):
self.initialize()
self.compute_accelerations(0)
self.stage1()
self.update_domain()
self.do_post_stage(dt, 1)
[docs] def initial_acceleration(self, t, dt):
pass
[docs]class PCISPHStep(IntegratorStep):
def __init__(self, show_itercount=False):
self.show_itercount = show_itercount
self.index = 0
[docs] def initialize(self, d_idx, d_u, d_v, d_w, d_u0, d_v0, d_w0,
d_x, d_y, d_z, d_x0, d_y0, d_z0, d_rho, d_rho0):
d_u0[d_idx] = d_u[d_idx]
d_v0[d_idx] = d_v[d_idx]
d_w0[d_idx] = d_w[d_idx]
d_x0[d_idx] = d_x[d_idx]
d_y0[d_idx] = d_y[d_idx]
d_z0[d_idx] = d_z[d_idx]
d_rho0[d_idx] = d_rho[d_idx]
[docs] def py_stage1(self, dst, t, dt):
if self.show_itercount:
print("Iteration count = ", dst.iters[self.index])
self.index += 1
[docs] def stage1(self, d_idx, d_u, d_v, d_w, d_au, d_av, d_aw, d_x, d_y, d_z,
d_aup, d_avp, d_awp, d_u0, d_v0, d_w0, d_x0, d_y0, d_z0, dt):
d_u[d_idx] = d_u0[d_idx] + dt * (d_au[d_idx] + d_aup[d_idx])
d_v[d_idx] = d_v0[d_idx] + dt * (d_av[d_idx] + d_avp[d_idx])
d_w[d_idx] = d_w0[d_idx] + dt * (d_aw[d_idx] + d_awp[d_idx])
d_x[d_idx] = d_x0[d_idx] + dt * d_u[d_idx]
d_y[d_idx] = d_y0[d_idx] + dt * d_v[d_idx]
d_z[d_idx] = d_z0[d_idx] + dt * d_w[d_idx]
[docs]class MomentumEquationViscosity(Equation):
r"""**Momentum Equation Viscosity**
See "pysph.sph.wc.viscosity.LaminarViscocity"
"""
def __init__(self, dest, sources, nu=0.0, gx=0.0, gy=0.0, gz=0.0):
self.nu = nu
self.gx = gx
self.gy = gy
self.gz = gz
super(MomentumEquationViscosity, self).__init__(dest, sources)
[docs] def initialize(self, d_idx, d_au, d_av, d_aw):
d_au[d_idx] = self.gx
d_av[d_idx] = self.gy
d_aw[d_idx] = self.gz
[docs] def loop(self, d_idx, s_idx, s_m, d_rho, s_rho, d_au, d_av, d_aw, DWIJ,
XIJ, VIJ, R2IJ, EPS):
mb = s_m[s_idx]
rhoij = (d_rho[d_idx] + s_rho[s_idx])
xdotdwij = DWIJ[0] * XIJ[0] + DWIJ[1] * XIJ[1] + DWIJ[2] * XIJ[2]
tmp = mb * 4 * self.nu * xdotdwij / (rhoij * (R2IJ + EPS))
d_au[d_idx] += tmp * VIJ[0]
d_av[d_idx] += tmp * VIJ[1]
d_aw[d_idx] += tmp * VIJ[2]
[docs] def post_loop(self, d_idx, d_au, d_av, d_aw, d_u, d_v, d_w, d_p, d_aup,
d_avp, d_awp, dt):
d_u[d_idx] += dt * d_au[d_idx]
d_v[d_idx] += dt * d_av[d_idx]
d_w[d_idx] += dt * d_aw[d_idx]
# Retaining the old pressure seems to give better results for the
# TG problem.
#d_p[d_idx] = 0.0
d_aup[d_idx] = 0.0
d_avp[d_idx] = 0.0
d_awp[d_idx] = 0.0
[docs]class Predict(Equation):
r"""**Predict velocity and position**
.. math::
\mathbf{v}^{*}(t+1) = \mathbf{v}(t) + dt
\left(\frac{d \mathbf{v}_{visc, g}(t)}{dt} +
\frac{d \mathbf{v}_{p} (t)}{dt} \right)
.. math::
\mathbf{x}^{*}(t+1) = \mathbf{x}(t) + dt * \mathbf{v}(t+1)
"""
[docs] def initialize(self, d_idx, d_u, d_v, d_w, d_aup, d_avp, d_awp, d_x, d_y,
d_z, d_au, d_av, d_aw, d_u0, d_v0, d_w0, d_x0, d_y0, d_z0,
dt):
d_u[d_idx] = d_u0[d_idx] + dt * (d_au[d_idx] + d_aup[d_idx])
d_v[d_idx] = d_v0[d_idx] + dt * (d_av[d_idx] + d_avp[d_idx])
d_w[d_idx] = d_w0[d_idx] + dt * (d_aw[d_idx] + d_awp[d_idx])
d_x[d_idx] = d_x0[d_idx] + dt * d_u[d_idx]
d_y[d_idx] = d_y0[d_idx] + dt * d_v[d_idx]
d_z[d_idx] = d_z0[d_idx] + dt * d_w[d_idx]
[docs]class ComputePressure(Equation):
r"""**Compute Pressure**
Compute pressure iteratively maintaining density within a given tolerance.
.. math::
p_i += \delta \rho^{*}_{{err}_i}
where,
.. math::
\rho_{err_i} = \rho_i^{*} - \rho_0
.. math::
\delta = \frac{-1}{\beta (-\sum_j \nabla W_{ij} \cdot
\sum_j \nabla W_{ij} - \sum_j \nabla W_{ij} \nabla W_{ij})}
"""
def __init__(self, dest, sources, rho0):
self.rho0 = rho0
super(ComputePressure, self).__init__(dest, sources)
[docs] def initialize(self, d_idx, d_dw, d_dwij2):
d_dw[d_idx * 3 + 0] = 0.0
d_dw[d_idx * 3 + 1] = 0.0
d_dw[d_idx * 3 + 2] = 0.0
d_dwij2[d_idx] = 0.0
[docs] def loop(self, d_idx, d_dw, d_dwij2, DWIJ):
d_dw[d_idx * 3 + 0] += DWIJ[0]
d_dw[d_idx * 3 + 1] += DWIJ[1]
d_dw[d_idx * 3 + 2] += DWIJ[2]
dwij2 = DWIJ[0] * DWIJ[0] + DWIJ[1] * DWIJ[1] + DWIJ[2] * DWIJ[2]
d_dwij2[d_idx] += dwij2
[docs] def post_loop(self, d_idx, d_dw, d_m, dt, d_dwij2, d_p, d_rho):
dwx = d_dw[d_idx * 3 + 0]
dwy = d_dw[d_idx * 3 + 1]
dwz = d_dw[d_idx * 3 + 2]
tmp = dwx * dwx + dwy * dwy + dwz * dwz
mi = d_m[d_idx]
rho0 = self.rho0
beta = 2 * mi * mi * (dt / rho0) * (dt / rho0)
delta = 1.0 / (beta * (tmp + d_dwij2[d_idx]))
rho_err = d_rho[d_idx] - rho0
d_p[d_idx] += delta * rho_err
[docs]class MomentumEquationPressureGradient(Equation):
r"""**Momentum Equation pressure gradient**
Standard WCSPH pressure gradient,
.. math::
\frac{d\mathbf{v}}{dt} = - \sum_j m_j \left(\frac{p_i}{\rho_i^2}
+ \frac{p_i}{\rho_i^2}\right) \nabla W(x_{ij}, h)
"""
def __init__(self, dest, sources, rho0, tolerance, debug):
self.rho0 = rho0
self.tolerance = tolerance
self.debug = debug
self.rho_err = 0.0
self.ctr = 0
super(MomentumEquationPressureGradient, self).__init__(dest, sources)
[docs] def loop(self, d_idx, s_idx, d_p, s_p, d_rho, s_rho, s_m, d_aup, d_avp,
d_awp, DWIJ):
rhoi2 = 1.0 / (d_rho[d_idx] * d_rho[d_idx])
rhoj2 = 1.0 / (s_rho[s_idx] * s_rho[s_idx])
mj = s_m[d_idx]
pij = -1.0 * mj * (d_p[d_idx] * rhoi2 + s_p[s_idx] * rhoj2)
d_aup[d_idx] += pij * DWIJ[0]
d_avp[d_idx] += pij * DWIJ[1]
d_awp[d_idx] += pij * DWIJ[2]
[docs] def reduce(self, dst, t, dt):
import numpy as np
self.rho_err = np.mean(np.abs(dst.rho / self.rho0 - 1.0))
dst.iters[self.ctr] += 1
[docs] def converged(self):
debug = self.debug
rho_err = self.rho_err
if rho_err > self.tolerance:
if debug:
print("Not converged:", rho_err)
return -1.0
else:
self.ctr += 1
if debug:
print("Converged:", rho_err)
return 1.0
[docs]class PCISPHScheme(Scheme):
def __init__(self, fluids, dim, rho0, nu, gx=0.0, gy=0.0, gz=0.0,
tolerance=0.1, debug=False, show_itercount=False):
self.fluids = fluids
self.solver = None
self.dim = dim
self.rho0 = rho0
self.nu = nu
self.gx = gx
self.gy = gy
self.gz = gz
self.tolerance = tolerance
self.debug = debug
self.show_itercount = show_itercount
[docs] def add_user_options(self, group):
group.add_argument(
'--pcisph-tol', action='store', type=float, dest='tolerance',
default=None,
help='relative error tolerance for convergence as a percentage.'
)
add_bool_argument(
group, 'pcisph-debug', dest='debug', default=None,
help="Produce some debugging output on convergence of iterations."
)
add_bool_argument(
group, 'pcisph-itercount', dest='show_itercount', default=False,
help="Produce some debugging output on convergence of iterations."
)
[docs] def consume_user_options(self, options):
vars = ['tolerance', 'debug', 'show_itercount']
data = dict((var, self._smart_getattr(options, var))
for var in vars)
self.configure(**data)
[docs] def get_equations(self):
from pysph.sph.basic_equations import SummationDensity
all = self.fluids
equations = []
eq1 = []
for fluid in self.fluids:
eq1.append(
MomentumEquationViscosity(
dest=fluid, sources=all, nu=self.nu, gx=self.gx,
gy=self.gy, gz=self.gz
)
)
equations.append(Group(equations=eq1))
eq1, g2 = [], []
for fluid in self.fluids:
eq1.append(Predict(dest=fluid, sources=None))
g2.append(Group(equations=eq1, update_nnps=True))
eq2 = []
for fluid in self.fluids:
eq2.append(SummationDensity(dest=fluid, sources=all))
g2.append(Group(equations=eq2))
eq3 = []
for fluid in self.fluids:
eq3.append(
ComputePressure(dest=fluid, sources=all, rho0=self.rho0)
)
g2.append(Group(equations=eq3, update_nnps=True))
eq4 = []
for fluid in self.fluids:
eq4.append(
MomentumEquationPressureGradient(
dest=fluid, sources=all, rho0=self.rho0,
tolerance=self.tolerance, debug=self.debug
),
)
g2.append(Group(equations=eq4))
equations.append(
Group(equations=g2, iterate=True,
max_iterations=500, min_iterations=2)
)
return equations
[docs] def setup_properties(self, particles, clean=True):
particle_arrays = dict([(p.name, p) for p in particles])
dummy = get_particle_array_pcisph(name='junk')
props = list(dummy.properties.keys())
props += [dict(name=x, stride=y) for x, y in dummy.stride.items()]
output_props = dummy.output_property_arrays
constants = [dict(name=x, data=y) for x, y in dummy.constants.items()]
for fluid in self.fluids:
pa = particle_arrays[fluid]
self._ensure_properties(pa, props, clean)
pa.set_output_arrays(output_props)
for const in constants:
pa.add_constant(**const)