"""Module containing standard functions for the dust."""
import dustpy.constants as c
from dustpy.std import dust_f as dp_dust_f
import dustpy.std.dust as dp_dust
from tripodpy.std import dust_f
from simframe.integration import Scheme
import numpy as np
import scipy.sparse as sp
[docs]
def dt(sim):
"""Function calculates the time step from dust.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
dt : float
Dust time step"""
dtSigma = dt_Sigma(sim) or 1e100
dtsmax = dt_smax(sim)
return np.minimum(dtSigma, dtsmax)
[docs]
def dt_Sigma(sim):
"""Function calculates the time step due to changes in Sigma.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
dt_Sigma : float
Time step due to changes in Sigma"""
if np.any(sim.dust.S.tot[1:-1, ...] < 0.):
mask = np.logical_and(
sim.dust.Sigma > sim.dust.SigmaFloor,
sim.dust.S.tot < 0.)
mask[0, :] = False
mask[-1:, :] = False
mask2 = sim.dust.S.tot[:, 1] < 0.
mask2 = sim.dust.S.tot[:,1] * sim.dust.Sigma[:,0] - sim.dust.S.tot[:,0] * sim.dust.Sigma[:,1] < 0.
f = sim.dust.Sigma[:,1]/sim.dust.Sigma.sum(-1)
mask2 = np.logical_and(mask2, f<0.43)
dsig_da = dsigda(sim)
dt_pred = 10 * ((-0.1* sim.dust.s.max[mask2] * dsig_da[mask2]) + sim.dust.Sigma[mask2,1] - sim.dust.f.crit* sim.dust.Sigma[mask2,:].sum(-1)) \
/(sim.dust.S.tot[mask2, 1] *(sim.dust.f.crit - 1.) + sim.dust.f.crit * sim.dust.S.tot[mask2, 0] + dsig_da[mask2] * sim.dust.s.sdot_coag[mask2] +sim.dust.S.smax_hyd[mask2]*dsig_da[mask2])
dt_pred = np.where(dt_pred != dt_pred, 0, dt_pred)
dt_pred = np.abs(dt_pred)
dt_pred = np.where(dt_pred == np.inf, 0, dt_pred)
dt = np.ones_like(sim.dust.Sigma)*1e100
dt[mask] = np.abs(sim.dust.Sigma[mask] / sim.dust.S.tot[mask])
dt[mask2,1] = np.minimum(dt[mask2,1],dt_pred)
return dt.min(initial=1e100)
return 1e100
[docs]
def dt_compo(sim):
"""Function calculates the time step due to changes in Sigma.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
dt_Sigma : float
Time step due to changes in Sigma"""
dt = 1e100
for key,comp in sim.components.__dict__.items():
if not key.startswith("_") and comp.dust._active:
if np.any(comp.dust.S.tot[1:-1, ...] < 0.):
mask = np.logical_and(
comp.dust.Sigma > sim.dust.SigmaFloor,
comp.dust.S.tot < 0.)
mask[0, :] = False
mask[-1:, :] = False
dt_int = np.ones_like(sim.dust.Sigma)*1e100
dt_int[mask] = np.abs(comp.dust.Sigma[mask] / comp.dust.S.tot[mask])
dt = min(dt,dt_int.min(initial=1e100))
return dt
[docs]
def S_smax_hyd(sim):
"""Function calculates the hydrodynamic source terms for s.max
-> assumes that I can callculate the hydrodynamic source term for the product and divide it by Sig1
"""
Fi = Fi_sig1smax(sim)
S_hyd = (dp_dust_f.s_hyd(Fi,sim.grid.ri)[:,0] - sim.dust.S.hyd[:,1]*sim.dust.s.max)/sim.dust.Sigma[:,1]
return S_hyd
[docs]
def S_hyd_compo(sim, group=None):
"""Function calculates the hydrodynamic source terms.
Parameters
----------
sim : Frame
Parent simulation frame
Sigma : Field, optional, default : None
Surface density to be used if not None
Returns
-------
Shyd : Field
Hydrodynamic source terms"""
return dp_dust_f.s_hyd(group.Fi,sim.grid.ri)
[docs]
def S_tot_compo(sim,group=None):
"""Function returns the external source terms for each component.
Parameters
----------
sim : Frame
Parent simulation frame
compkey : str, optional
Key of the component, by default "default"
Returns
-------
S_ext : Field
External source terms for each component"""
return group.S.ext + group.S.hyd + group.S.coag
[docs]
def rhos_compo(sim):
"""Function returns the material density of dust from the composition.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
rhos : Field
Material density of dust grains for each component"""
temp = np.zeros_like(sim.gas.Sigma)
sig_tot = np.zeros_like(sim.gas.Sigma)
for key, comp in sim.components.__dict__.items():
if not key.startswith("_") and comp.dust._active:
sig_tot += comp.dust.Sigma.sum(-1)
temp += comp.dust.Sigma.sum(-1)/comp.dust.pars.rhos
res = np.where(sim.dust.Sigma.sum(-1) > sim.dust.SigmaFloor.sum(-1), (sig_tot)/temp, sim.dust.rhos[:,0])
return res[:,None] * np.ones_like(sim.dust.rhos)
[docs]
def Fi_sig1smax (sim):
"""Function that calculates the total flux of the Sigma[1] and s.max
used to solved the advection equation for s.max
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
Fi_tot : Field
Total flux of Sigma[1] * s.max"""
Fi_diff = F_diff(sim,Sigma = sim.dust.Sigma*sim.dust.s.max[:,np.newaxis])
Fi_adv = F_adv(sim,Sigma = sim.dust.Sigma*sim.dust.s.max[:,np.newaxis])
Fi_tot = (Fi_diff + Fi_adv)[:,1]
return Fi_tot
[docs]
def dt_smax(sim):
"""Function calculates the time step due to changes in smax.
Change of smax during one integration step bound by smin and maximum growth factor.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
dt_smax : float
Particle growth time step"""
mask2 = sim.dust.S.tot[:, 1] < 0.
mask2 = sim.dust.S.tot[:,1] * sim.dust.Sigma[:,0] - sim.dust.S.tot[:,0] * sim.dust.Sigma[:,1] < 0.
f = sim.dust.Sigma[:,1]/sim.dust.Sigma.sum(-1)
mask2 = np.logical_and(mask2, f<0.43)
smax_dot_hyd = sim.dust.S.smax_hyd
smax_dot = np.minimum(np.abs(sim.dust.s.sdot_coag[1:-1]) , np.abs(sim.dust.s.sdot_coag[1:-1]+smax_dot_hyd[1:-1]))
dt = sim.dust.s.max[1:-1] / (smax_dot + 1e-100)
dt[mask2[1:-1]] = 1e100
return dt.min(initial=1e100)
[docs]
def prepare(sim):
"""Function prepares implicit dust integration step.
It stores the current value of the surface density in a hidden field.
Parameters
----------
sim : Frame
Parent simulation frame"""
Nm_s = int(sim.grid._Nm_short)
Nr = int(sim.grid.Nr)
# Copy values to state vector Y
sim.dust._Y[:Nm_s * Nr] = sim.dust.Sigma.ravel()
sim.dust._Y[Nm_s * Nr:] = sim.dust.s.max * sim.dust.Sigma[:, 1]
# Setting coagulation sources and external sources at boundaries to zero
sim.dust.S.coag[0] = 0.
sim.dust.S.coag[-1] = 0.
# Storing current surface density
sim.dust._SigmaOld[...] = sim.dust.Sigma[...]
sim.dust.s._maxOld = sim.dust.s.max
sim.dust.s._prev_sdot_coag = sim.dust.s.sdot_coag
s_max_deriv = sim.dust.s.max.derivative()
enforce_f(sim)
sim.dust.S.ext.update()
sim.dust.S.coag.update()
sim.dust.S.hyd.update()
sim.dust.S.coag[0] = 0.
sim.dust.S.coag[-1] = 0.
sim.dust.S.ext[0] = 0.
sim.dust.S.ext[-1] = 0.
[docs]
def finalize(sim):
"""Function finalizes implicit integration step.
Parameters
----------
sim : Frame
Parent integration frame"""
Nm_s = int(sim.grid._Nm_short)
Nr = int(sim.grid.Nr)
# Copy values from state vector to fields
sim.dust.Sigma[...] = sim.dust._Y[:Nr * Nm_s].reshape(sim.dust.Sigma.shape)
# Making sure smax is not smaller than 1.5 smin to ensure minimal distribution width])
sim.dust.s.max = np.maximum(
1.5 * sim.dust.s.min, sim.dust._Y[Nr * Nm_s:] / sim.dust.Sigma[..., 1])
# sims up the dust suface density if there are active dust components if there are any
if sim._dust_compo:
temp = np.zeros_like(sim.dust.Sigma)
for nm, cint in sim.components.__dict__.items():
if(nm.startswith("_")):
continue
if(cint.dust._active):
temp += cint.dust.Sigma
cint.dust.Sigma = np.where(
cint.dust.Sigma > sim.dust.SigmaFloor,
cint.dust.Sigma,0.1*sim.dust.SigmaFloor)
sim.dust.Sigma = np.maximum(temp, sim.dust.SigmaFloor)
dp_dust.boundary(sim)
dp_dust.enforce_floor_value(sim)
enforce_f(sim)
sim.dust.s.max.derivative()
sim.dust.v.rad.update()
sim.dust.Fi.update()
sim.dust.S.coag.update()
sim.dust.S.ext.update()
sim.dust.S.hyd.update()
dp_dust.set_implicit_boundaries(sim)
[docs]
def smax_initial(sim):
"""Function calculates the initial maximum particle sizes
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
smax : Field
initial maximum particle sizes"""
# Helper variables
smin = sim.dust.s.min
# Routine for excluding initially drifting particles
if not sim.ini.dust.allowDriftingParticles:
# Calculating pressure gradient
P = sim.gas.P
Pi = dust_f.interp1d(sim.grid.ri, sim.grid.r, P)
gamma = (Pi[1:] - Pi[:-1]) / (sim.grid.ri[1:] - sim.grid.ri[:-1])
gamma = np.abs(gamma)
# Exponent of pressure gradient
gamma *= sim.grid.r / P
# Maximum drift limited particle size with safety margin
ad = 5e-3 * 2. / np.pi * sim.ini.dust.d2gRatio * sim.gas.Sigma / sim.dust.fill[:, 0] \
* sim.dust.rhos[:, 0] * (sim.grid.OmegaK * sim.grid.r) ** 2. / sim.gas.cs ** 2. / gamma
aIni = np.minimum(sim.ini.dust.aIniMax, ad)
# Enforce initial drift limit
# sim.dust.q.eff = np.where(
# aIni < sim.ini.dust.aIniMax, sim.dust.q.sweep, sim.dust.q.eff)
return np.maximum(1.5 * smin, aIni)
# Return the ini value if drifting particles are allowed
return np.ones_like(smin) * sim.ini.dust.aIniMax
[docs]
def Sigma_initial(sim):
"""Function calculates the initial condition of the dust surface densities
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
Sigma : Field
Initial dust surface density"""
# Helper variables
q = sim.dust.q.eff
qp4 = q + 4.
smin = sim.dust.s.min
smax = sim.dust.s.max
sint = np.sqrt(smin * smax)
SigmaFloor = sim.dust.SigmaFloor
# Values for q != -4
S0 = np.zeros_like(sim.grid.r)
S1 = np.zeros_like(sim.grid.r)
for i in range(int(sim.grid.Nr)):
if smax[i] <= 1.5* smin[i]:
S0[i] = SigmaFloor[i, 0]
S1[i] = SigmaFloor[i, 1]
else:
S0[i] = (sint[i] ** qp4[i] - smin[i] ** qp4[i]) / \
(smax[i] ** qp4[i] - smin[i] ** qp4[i])
S1[i] = 1. - S0[i]
S = np.array([S0, S1]).T
# Values for q == -4
S0_4 = np.zeros_like(sim.grid.r)
S1_4 = np.zeros_like(sim.grid.r)
for i in range(int(sim.grid.Nr)):
if smax[i] == smin[i]:
S0_4[i] = SigmaFloor[i, 0]
S1_4[i] = SigmaFloor[i, 1]
else:
S0_4[i] = np.log(sint[i] / smin[i]) / np.log(smax[i] / smin[i])
S1_4[i] = 1. - S0_4[i]
S_4 = np.array([S0_4, S1_4]).T
Sigma = sim.ini.dust.d2gRatio * \
sim.gas.Sigma[:, None] * np.where(q[:, None] == -4., S_4, S)
Sigma = np.where(Sigma <= sim.dust.SigmaFloor,
0.1 * sim.dust.SigmaFloor,
Sigma)
return Sigma
[docs]
def jacobian(sim, x, dx=None, *args, **kwargs):
"""Function calculates the Jacobian for implicit dust integration.
Parameters
----------
sim : Frame
Parent simulation frame
x : IntVar
Integration variable
dx : float, optional, default : None
stepsize
args : additional positional arguments
kwargs : additional keyword arguments
Returns
-------
jac : Sparse matrix
Dust Jacobian
Notes
-----
Function returns the Jacobian for ``Simulation.dust.Sigma.ravel()``
instead of ``Simulation.dust.Sigma``. The Jacobian is stored as
sparse matrix."""
# Helper variables for convenience
if dx is None:
dt = x.stepsize
else:
dt = dx
r = sim.grid.r
ri = sim.grid.ri
area = sim.grid.A
Nr = int(sim.grid.Nr)
Nm_s = int(sim.grid._Nm_short)
# Total problem size
Ntot = int((Nr * Nm_s))
# Getting data vector and coordinates for the coagulation sparse matrix
dat_coag, row_coag, col_coag = dust_f.jacobian_coagulation_generator(
# here we compute the cross section where the first entry is
# the cross section of (a0, a1) and the second of (a1, fudge * a1)
np.pi * (sim.dust.a[:, [0, 2]]+sim.dust.a[:, [2, 1]])**2,
# same for the relative velocities
sim.dust.v.rel.tot[:, [0, 2], [2, 1]],
sim.dust.H[:, [0, 2]],
sim.dust.m[:, [0, 2]],
sim.dust.Sigma,
sim.dust.s.min,
sim.dust.s.max,
sim.dust.q.eff
)
# Getting data vector and coordinates for the hydrodynamic sparse matrix
A, B, C = dp_dust_f.jacobian_hydrodynamic_generator(
area,
sim.dust.D[:, [0, 2]],
r,
ri,
sim.gas.Sigma,
sim.dust.v.rad_flux[:, [0, 2]]
)
row_hyd = np.hstack(
(np.arange(Ntot - Nm_s) + Nm_s, np.arange(Ntot), np.arange(Ntot - Nm_s)))
col_hyd = np.hstack(
(np.arange(Ntot - Nm_s), np.arange(Ntot), np.arange(Ntot - Nm_s) + Nm_s))
dat_hyd = np.hstack((A.ravel()[Nm_s:], B.ravel(), C.ravel()[:-Nm_s]))
# Right-hand side
sim.dust._rhs[Nm_s:-Nm_s] = sim.dust.Sigma.ravel()[Nm_s:-Nm_s]
# BOUNDARIES
# Inner boundary
# Initializing data and coordinate vectors for sparse matrix
dat_in = np.zeros(int(3. * Nm_s))
row0 = np.arange(int(Nm_s))
col0 = np.arange(int(Nm_s))
col1 = np.arange(int(Nm_s)) + Nm_s
col2 = np.arange(int(Nm_s)) + 2. * Nm_s
row_in = np.concatenate((row0, row0, row0))
col_in = np.concatenate((col0, col1, col2))
# Filling data vector depending on boundary condition
if sim.dust.boundary.inner is not None:
# Given value
if sim.dust.boundary.inner.condition == "val":
sim.dust._rhs[:Nm_s] = sim.dust.boundary.inner.value
# Constant value
elif sim.dust.boundary.inner.condition == "const_val":
dat_in[Nm_s:2 * Nm_s] = 1. / dt
sim.dust._rhs[:Nm_s] = 0.
# Given gradient
elif sim.dust.boundary.inner.condition == "grad":
K1 = - r[1] / r[0]
dat_in[Nm_s:2 * Nm_s] = -K1 / dt
sim.dust._rhs[:Nm_s] = - ri[1] / r[0] * \
(r[1] - r[0]) * sim.dust.boundary.inner.value
# Constant gradient
elif sim.dust.boundary.inner.condition == "const_grad":
Di = ri[1] / ri[2] * (r[1] - r[0]) / (r[2] - r[0])
K1 = - r[1] / r[0] * (1. + Di)
K2 = r[2] / r[0] * Di
dat_in[:Nm_s] = 0.
dat_in[Nm_s:2 * Nm_s] = -K1 / dt
dat_in[2 * Nm_s:] = -K2 / dt
sim.dust._rhs[:Nm_s] = 0.
# Given power law
elif sim.dust.boundary.inner.condition == "pow":
p = sim.dust.boundary.inner.value
sim.dust._rhs[:Nm_s] = sim.dust.Sigma[1] * (r[0] / r[1]) ** p
# Constant power law
elif sim.dust.boundary.inner.condition == "const_pow":
p = np.log(sim.dust.Sigma[2,0] /
sim.dust.Sigma[1,0]) / np.log(r[2] / r[1])
K1 = - (r[0] / r[1]) ** p
dat_in[Nm_s:2 * Nm_s] = -K1 / dt
sim.dust._rhs[:Nm_s] = 0.
# Outer boundary
# Initializing data and coordinate vectors for sparse matrix
dat_out = np.zeros(int(3. * Nm_s))
row0 = np.arange(int(Nm_s))
col0 = np.arange(int(Nm_s))
col1 = np.arange(int(Nm_s)) - Nm_s
col2 = np.arange(int(Nm_s)) - 2. * Nm_s
offset = (Nr - 1) * Nm_s
row_out = np.concatenate((row0, row0, row0)) + offset
col_out = np.concatenate((col2, col1, col0)) + offset
# Filling data vector depending on boundary condition
if sim.dust.boundary.outer is not None:
# Given value
if sim.dust.boundary.outer.condition == "val":
sim.dust._rhs[-Nm_s:] = sim.dust.boundary.outer.value
# Constant value
elif sim.dust.boundary.outer.condition == "const_val":
dat_out[-2 * Nm_s:-Nm_s] = 1. / dt
sim.dust._rhs[-Nm_s:] = 0.
# Given gradient
elif sim.dust.boundary.outer.condition == "grad":
KNrm2 = -r[-2] / r[-1]
dat_out[-2 * Nm_s:-Nm_s] = -KNrm2 / dt
sim.dust._rhs[-Nm_s:] = ri[-2] / r[-1] * \
(r[-1] - r[-2]) * sim.dust.boundary.outer.value
# Constant gradient
elif sim.dust.boundary.outer.condition == "const_grad":
Do = ri[-2] / ri[-3] * (r[-1] - r[-2]) / (r[-2] - r[-3])
KNrm2 = - r[-2] / r[-1] * (1. + Do)
KNrm3 = r[-3] / r[-1] * Do
dat_out[-2 * Nm_s:-Nm_s] = -KNrm2 / dt
dat_out[-3 * Nm_s:-2 * Nm_s] = -KNrm3 / dt
sim.dust._rhs[-Nm_s:] = 0.
# Given power law
elif sim.dust.boundary.outer.condition == "pow":
p = sim.dust.boundary.outer.value
sim.dust._rhs[-Nm_s:] = sim.dust.Sigma[-2] * (r[-1] / r[-2]) ** p
# Constant power law
elif sim.dust.boundary.outer.condition == "const_pow":
p = np.log(sim.dust.Sigma[-2] /
sim.dust.Sigma[-3]) / np.log(r[-2] / r[-3])
KNrm2 = - (r[-1] / r[-2]) ** p
dat_out[-2 * Nm_s:-Nm_s] = -KNrm2 / dt
sim.dust._rhs[-Nm_s:] = 0.
# Stitching together the generators
row = np.hstack((row_coag, row_hyd, row_in, row_out))
col = np.hstack((col_coag, col_hyd, col_in, col_out))
dat = np.hstack((dat_coag, dat_hyd, dat_in, dat_out))
gen = (dat, (row, col))
# Building sparse matrix of coagulation Jacobian
J = sp.csc_matrix(
gen,
shape=(Ntot, Ntot)
)
# Adding and returning all matrix components
return J
[docs]
def a(sim):
"""Function calculates the particle sizes from the characteristic particle sizes and the distribution exponent.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
a : Field [a0, fudge * a1, a1, fudge * amax, amax]
Particle sizes"""
# interpolate between drift and turbulence dominated pre-factor
return dust_f.calculate_a(sim.dust.s.min, sim.dust.s.max, sim.dust.qrec, sim.dust.f.dv, sim.grid._Nm_long)
[docs]
def F_adv(sim, Sigma=None):
"""Function calculates the advective flux at the cell interfaces. It is linearly interpolating
the velocities onto the grid cell interfaces and is assuming
vi(0, :) = vi(1, :) and vi(Nr, :) = vi(Nr-1, :).
Parameters
----------
sim : Frame
Parent simulation frame
Sigma : Field, optional, default : None
Surface density to be used if not None
Returns
-------
Fi : Field
Advective mass fluxes through the grid cell interfaces"""
if Sigma is None:
Sigma = sim.dust.Sigma
return dp_dust_f.fi_adv(Sigma, sim.dust.v.rad_flux[:, [0, 2]], sim.grid.r, sim.grid.ri)
[docs]
def F_diff(sim, Sigma=None):
"""Function calculates the diffusive flux at the cell interfaces
Parameters
----------
sim : Frame
Parent simulation frame
Sigma : Field, optional, default : None
Surface density to be used if defaults to sim.dust.Sigma
Returns
-------
Fi : Field
Diffusive mass fluxes through the grid cell interfaces"""
if Sigma is None:
Sigma = sim.dust.Sigma
Fi = dust_f.fi_diff_no_limit(sim.dust.D[:, [0, 2]],
Sigma,
sim.gas.Sigma,
sim.dust.St[:, [0, 2]]*sim.dust.f.drift,
np.sqrt(sim.dust.delta.rad * sim.gas.cs ** 2),
sim.grid.r,
sim.grid.ri)
Fi[:1, :] = 0.
Fi[-1:, :] = 0.
return Fi
[docs]
def m(sim):
"""Function calculates the particle mass from the particle sizes.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
m : Field
Particle masses"""
return dust_f.calculate_m(sim.dust.a, sim.dust.rhos, sim.dust.fill)
[docs]
def p_frag(sim):
"""Function calculates the fragmentation probability.
The type of assumed transition between sticking and
fragmentation is given as an argument.
This uses the relative velocities between 0.5 * amax and amax.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
pf : Field
Fragmentation probability."""
return dust_f.pfrag(sim.dust.v.rel.tot[:, -2, -1], sim.dust.v.frag)
[docs]
def p_stick(sim):
"""Function calculates the sticking probability.
The sticking probability is simply 1 minus the
fragmentation probability.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
ps : Field
Sticking probability"""
p = 1. - sim.dust.p.frag
return p
[docs]
def H(sim):
"""Computes the dust scale height by using the
dustpy function, for all Nm_long dust sizes.
Parameters
----------
sim : the simulation frame
"""
return dp_dust_f.h_dubrulle1995(
sim.gas.Hp,
sim.dust.St,
sim.dust.delta.vert)
[docs]
def rho_midplane(sim):
"""Function calculates the midplane mass density.
Note that we have more particle sizes than surface densities,
so we use the appropriate sigma for each size:
[a0, fudge * a1, a1, 0.5 * amax, amax]
->
[Sig0, Sig1, Sig1, Sig1, Sig1]
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
rho : Field
Midplane mass density"""
# The scale height H has a longer shape than Sigma and has to be adjusted
return sim.dust.Sigma[:, [0, 1, 1, 1, 1]] / (np.sqrt(2 * c.pi) * sim.dust.H)
[docs]
def smax_deriv(sim, t, smax):
"""Function calculates the derivative of smax.
Parameters
----------
sim : Frame
Parent simulation frame
t : IntVar
Current time
smax : Field, optional, defaul : None
Current smax to be used if not None
Returns
-------
smax_dot: Field
Derivative of smax"""
if smax is None:
smax = sim.dust.s.max
ds_coag = dust_f.smax_deriv(
sim.dust.v.rel.tot[:, -2, -1],
sim.dust.rho[:, 2],
sim.dust.rhos[:, 2],
sim.dust.s.min,
sim.dust.s.max,
sim.dust.v.frag,
sim.dust.Sigma,
sim.dust.SigmaFloor)
# Prevents unwanted growth of smax at the inner boundary Experimental
sim.dust.s.sdot_shrink = np.zeros_like(sim.dust.s.max)
sim.dust.s.sdot_coag = ds_coag
return ds_coag
[docs]
def S_coag(sim, Sigma=None):
"""Function calculates the source terms from dust growth
Parameters
----------
sim : Frame
Parent simulation frame
Sigma : Field, optional, default : None
Surface density to be used if not None
Returns
-------
Scoag : Field
Source terms from dust growth"""
if Sigma is None:
Sigma = sim.dust.Sigma
s_coag = dust_f.s_coag(
# here we compute the cross section where the first entry is
# the cross section of (a0, a1) and the second of (a1, fudge * a1)
np.pi * (sim.dust.a[:, [0, 2]]+sim.dust.a[:, [2, 1]])**2,
# same for the relative velocities
sim.dust.v.rel.tot[:, [0, 2], [2, 1]],
sim.dust.H[:, [0, 2]],
sim.dust.m[:, [0, 2]],
Sigma,
sim.dust.s.min,
sim.dust.s.max,
sim.dust.q.eff,
sim.dust.SigmaFloor
)
# Prevents unwanted growth of smax
return s_coag
[docs]
def S_tot_ext(sim, Sigma=None):
"""Function calculates the total source terms. including contributions from components
Parameters
----------
sim : Frame
Parent simulation frame
Sigma : Field, optional, default : None
Surface density to be used if not None
Returns
-------
Stot : Field
Total source terms of surface density"""
if Sigma is None:
Sigma = sim.dust.Sigma
#global source terms
ret = sim.dust.S.ext
#source terms from components
for key, comp in sim.components.__dict__.items():
if key.startswith("_"):
continue
ret += comp.dust.S_Sigma
return ret
[docs]
def enforce_f(sim):
"""Function enforces the minimum mass fraction in large particles.
Parameters
----------
sim : Frame
Parent simulation frame
"""
delta = np.maximum( 0., sim.dust.f.crit * sim.dust.Sigma[...].sum(-1) - sim.dust.Sigma[:,1])
sim.dust.s.max = np.maximum( sim.dust.s.lim*np.ones_like(sim.dust.s.max) ,sim.dust.s.max + delta * dadsig(sim))
sim.dust.Sigma[:,1] += delta
sim.dust.Sigma[:,0] -= delta
for key, comp in sim.components.__dict__.items():
if (not key.startswith("_")) and comp.dust._active:
comp.dust.Sigma[:,1] += delta * comp.dust.Sigma[:,1]/sim.dust.Sigma[:,1]
comp.dust.Sigma[:,0] -= delta * comp.dust.Sigma[:,0]/sim.dust.Sigma[:,0]
sim.dust.qrec.update()
[docs]
def dadsig(sim):
"""Function calculates the derivative of smax with respect to Sigma1.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
dadsig : Field
Derivative of smax with respect to Sigma1"""
return dust_f.dadsig(sim.dust.s.lim, sim.dust.qrec,sim.dust.f.crit, sim.dust.s.max, sim.dust.s.min, sim.dust.Sigma)
[docs]
def dsigda(sim):
"""Function calculates the derivative of Sigma1 with respect to smax.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
dsigda : Field
Derivative of Sigma1 with respect to smax"""
return dust_f.dsigda(sim.dust.s.lim, sim.dust.qrec,sim.dust.f.crit, sim.dust.s.max, sim.dust.s.min, sim.dust.Sigma)
[docs]
def S_tot(sim, Sigma=None):
"""Function calculates the total source terms.
Parameters
----------
sim : Frame
Parent simulation frame
Sigma : Field, optional, default : None
Surface density to be used if not None
Returns
-------
Stot : Field
Total source terms of surface density"""
Sext = sim.dust.S.ext
if Sigma is None:
Scoag = sim.dust.S.coag
Shyd = sim.dust.S.hyd
Scomp = sim.dust.S.compo
else:
Scoag = sim.dust.S.coag.updater.beat(sim, Sigma=Sigma)
if Scoag is None:
Scoag = sim.dust.S.coag
Shyd = sim.dust.S.hyd.updater.beat(sim, Sigma=Sigma)
if Shyd is None:
Shyd = sim.dust.S.hyd
Scomp = sim.dust.S.compo.updater.beat(sim, Sigma=Sigma)
if Scomp is None:
Scomp = sim.dust.S.compo
return Scoag + Shyd + Sext + Scomp
[docs]
def S_compo(sim, Sigma=None):
"""Function calculates the source terms from dust composion source terms
Parameters
----------
sim : Frame
Parent simulation frame
Sigma : Field, optional, default : None
Surface density to be used if not None
Returns
-------
Scoag : Field
Source terms from dust gomponents"""
if Sigma is None:
Sigma = sim.dust.Sigma
Scomp = np.zeros_like(Sigma)
if hasattr(sim,"components") == False:
return Scomp
for key, comp in sim.components.__dict__.items():
if key.startswith("_"):
continue
Scomp += comp.dust.S_Sigma
return Scomp
[docs]
def vrel_brownian_motion(sim):
"""Function calculates the relative particle velocities due to Brownian motion.
The maximum value is set to the sound speed.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
vrel : Field
Relative velocities"""
return dust_f.vrel_brownian_motion(sim.gas.cs, sim.dust.m, sim.gas.T)
[docs]
def q_eff(sim):
"""Function calculates the equilibrium exponent of the distribution.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
q_eff : Field
Calculated exponent of distribution"""
return sim.dust.q.frag * sim.dust.p.frag + sim.dust.q.sweep * (1.0 - sim.dust.p.frag)
[docs]
def q_frag(sim):
"""
Calculate the effective fragmentation power-law of the size distribution.
Parameters:
sim (Simulation): The simulation object containing the dust and gas properties.
Returns:
float: The effective fragmentation power-law of the size distribution.
"""
return dust_f.qfrag(
sim.dust.p.driftfrag,
sim.dust.v.rel.tot[:, -1, -2],
sim.dust.v.frag,
sim.dust.St[:, -1],
sim.dust.q.turb1,
sim.dust.q.turb2,
sim.dust.q.drfrag,
sim.gas.alpha,
sim.gas.Sigma,
sim.gas.mu)
[docs]
def q_rec(sim):
"""
Function computes the power law exponent of the size distribution
n(a) da = a^q da
Parameters
----------
Sigma : array-like
Dust surface densities
smin : array-like
Minimum particle sizes
smax : array-like
Maximum particle sizes
Returns
-------
q : array-like
Size distribution exponent
"""
return np.log(sim.dust.Sigma[..., 1]/sim.dust.Sigma[..., 0]) / np.log(np.sqrt(sim.dust.s.max/sim.dust.s.min)) - 4.
[docs]
def p_frag_trans(sim):
"""
Calculate the transition between the two turbulent regime.
Parameters:
sim (Simulation): The simulation object containing the dust and gas properties.
Returns:
float: Transition between the two turbulent regime.
"""
return dust_f.pfrag_trans(sim.dust.St[:, -1],
sim.gas.alpha,
sim.gas.Sigma,
sim.gas.mu)
[docs]
def p_drift_frag(sim):
"""Calculate the fudge factor for the relative velocities.
Parameters
----------
sim : simulation frame
"""
""" # Pfeil+2024, Eq. A.3
dv_drmax = np.sqrt(
sim.dust.v.rel.rad[:, -1, -2]**2 + sim.dust.v.rel.azi[:, -1, -2]**2)
# where does the 0.3 come from
st_mx = sim.dust.St[...,-1]
st_mn = 0.5*st_mx
Re = 0.5 * sim.gas.alpha * 2e-15 * sim.gas.Sigma/ sim.gas.mu
vgas = (1.5*sim.gas.alpha)**0.5*sim.gas.cs
vsmall = vgas * ((st_mx-st_mn)/(st_mx+st_mn) * (st_mx**2./(st_mx+Re**-0.5) - st_mn**2./(st_mn+Re**-0.5)))**0.5
vinter = vgas * (2.292*st_mx)**0.5
pint = p_frag_trans(sim)
psmall = 1. - pint
vtr_simp = psmall*vsmall + pint*vinter
f_dt = np.where(dv_drmax != 0, 0.3 * vtr_simp / dv_drmax, 1e100)
# Eq. A.4
pdr = np.where( f_dt < 1e100, 0.5 + 0.5 * ((1.0 - f_dt**6) / (f_dt**6 + 1.0)),0.0)"""
return dust_f.pdriftfrag(sim.dust.v.rel.rad[:,-1,-2],sim.dust.v.rel.azi[:,-1,-2],sim.dust.St[:,-1],sim.gas.alpha,sim.gas.Sigma,sim.gas.mu,sim.gas.cs,sim.dust.p.fragtrans)
[docs]
def D_mod(sim):
"""Function calculates the dust diffusivity.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
D : Field
Dust diffusivity
Notes
-----
The diffusivity at the first and last two radial
grid cells will be set to zero to avoid unwanted
behavior at the boundaries."""
# warning this is only done beacuse opf the pluto code -> times gammma factor athe the end
v2 = sim.dust.delta.rad * sim.gas.cs**2
Diff = dp_dust_f.d(v2, sim.grid.OmegaK, sim.dust.St*sim.dust.f.drift)
Diff[:1, ...] = 0.
Diff[-2:, ...] = 0.
return Diff
[docs]
def vrad_mod(sim):
"""Function calculated the radial velocity of the dust.
Parameters
----------
sim : Frame
Parent simulation frame
Returns
-------
vrad : Field
Radial dust velocity"""
return dp_dust_f.vrad(sim.dust.St*sim.dust.f.drift, sim.dust.v.driftmax, sim.gas.v.rad)
[docs]
def Y_jacobian(sim, x, dx=None, *args, **kwargs):
"""Function calculates the Jacobian for the statevector that includes smax. i.e. [Sigma, smax*Sigma]
Parameters
----------
sim : Frame
Parent simulation frame
x : IntVar
Integration variable
dx : float, optional, default : None
stepsize defaults to x.stepsize
args : additional positional arguments
kwargs : additional keyword arguments
Returns
-------
jac : Sparse csc_matrix
Dust Jacobian including smax with size (3*Nr, 3*Nr)"""
# Helper variables for convenience
if dx is None:
dt = x.stepsize
else:
dt = dx
r = sim.grid.r
ri = sim.grid.ri
area = sim.grid.A
Nr = int(sim.grid.Nr)
Nm_s = int(sim.grid._Nm_short)
# Getting the Jacobian of Sigma
J_Sigma = sim.dust.Sigma.jacobian(x, dx=dt)
# Building the hydrodynamic Jacobian of smax
# We are advecting smax*Sigma[1], which is stored in Y
smaxSig = sim.dust._Y[Nm_s * Nr:]
smaxSig_rhs = smaxSig[...]
# Creating the sparse matrix
sim.dust.v.rad_flux.update()
A, B, C = dp_dust_f.jacobian_hydrodynamic_generator(
area,
sim.dust.D[:, 2],
r,
ri,
sim.gas.Sigma,
sim.dust.v.rad_flux[:, 2]
)
# Setting boundary conditions for the Jacobian of smax*Sigma
# The boundary condition is constant value on both boundaries
row_max_adv = np.hstack((np.arange(Nr-1)+1 , np.arange(Nr), np.arange(Nr - 1)))
col_max_adv= np.hstack((np.arange(Nr - 1), np.arange(Nr), np.arange(Nr - 1) + 1))
dat_max_adv = np.hstack((A.ravel()[1:], B.ravel(), C.ravel()[:-1]))
dat_in = np.zeros(3)
row_in = np.zeros(3)
col_in = np.arange(3)
if sim.dust.s.boundary.inner.condition is not None:
# Given value
if sim.dust.s.boundary.inner.condition == "const_grad":
Di = ri[1] / ri[2] * (r[1] - r[0]) / (r[2] - r[0])
K1 = - r[1] / r[0] * (1. + Di)
K2 = r[2] / r[0] * Di
dat_in[1] = -K1 / dt
dat_in[2] = -K2 / dt
smaxSig_rhs[0] = 0.
elif sim.dust.s.boundary.inner.condition == "grad":
K1 = -r[1] / r[0]
dat_in[1] = -K1 / dt
smaxSig_rhs[0] = ri[1] / r[0] * \
(r[1] - r[0]) * sim.dust.s.boundary.inner.value
elif sim.dust.s.boundary.inner.condition == "val":
#dust value times the maximal size at the time
smaxSig_rhs[0] = sim.dust.s.boundary.inner.value
elif sim.dust.s.boundary.inner.condition == "const_val":
# const_val
dat_in[1] = 1. / dt
smaxSig_rhs[0] = 0.
elif sim.dust.s.boundary.inner.condition == "pow":
p = sim.dust.s.boundary.inner.value
smaxSig_rhs[0] = smaxSig_rhs[1]* (r[0]/r[1])**p
elif sim.dust.s.boundary.inner.condition == "const_pow":
p = np.log(smaxSig[2] / smaxSig[1]) / \
np.log(r[2]/r[1])
K1 = -(r[0]/r[1])**p
dat_in[1] = -K1/dt
smaxSig_rhs[0] = 0.
#outer boundary
dat_out = np.zeros(3)
row_out = np.zeros(3)+(Nr-1)
col_out = np.arange(0, -3, -1)+(Nr-1)
if sim.dust.boundary.outer is not None:
# Given value
if sim.dust.s.boundary.outer.condition == "const_grad":
Do = ri[-2] / ri[-3] * (r[-1] - r[-2]) / (r[-2] - r[-3])
KNrm2 = - r[-2] / r[-1] * (1. + Do)
KNrm3 = r[-3] / r[-1] * Do
dat_out[1] = -KNrm2 / dt
dat_out[2] = -KNrm3 / dt
smaxSig_rhs[-1] = 0.
elif sim.dust.s.boundary.outer.condition == "grad":
KNrm2 = -r[-2] / r[-1]
dat_out[1] = -KNrm2 / dt
smaxSig_rhs[-1] = ri[-2] / r[-1] * \
(r[-1] - r[-2]) * sim.dust.s.boundary.outer.value
# Given value
elif sim.dust.s.boundary.outer.condition == "val":
smaxSig_rhs[-1] = sim.dust.s.boundary.outer.value
elif sim.dust.s.boundary.outer.condition == "const_val":
dat_out[1] = 1. / dt
smaxSig_rhs[-1] = 0.
elif sim.dust.s.boundary.outer.condition == "pow":
p = sim.dust.s.boundary.outer.value
smaxSig_rhs[-1] = smaxSig_rhs[-2]* (r[-1]/r[-2])**p
elif sim.dust.s.boundary.outer.condition == "const_pow":
p = np.log(smaxSig[-2] / smaxSig[-3]) / \
np.log(r[-2]/r[-3])
KNrm2 = -(r[-1]/r[-2])**p
dat_out[1] = -KNrm2/dt
smaxSig_rhs[-1] = 0.
# to do pow and const_pow
# Stitching together the generators
row = np.hstack((row_max_adv, row_in, row_out))
col = np.hstack((col_max_adv, col_in, col_out))
dat = np.hstack((dat_max_adv, dat_in, dat_out))
gen = (dat, (row, col))
# Building sparse matrix of coagulation Jacobian
J_smax_hyd = sp.csc_matrix(
gen,
shape=(Nr, Nr)
)
# Stitching together both matrices
# This is the fastest possibility I could find
J = J_Sigma.copy()
J.data = np.hstack((J_Sigma.data, J_smax_hyd.data))
J.indices = np.hstack(
(J_Sigma.indices, J_smax_hyd.indices + J_Sigma.shape[0]))
J.indptr = np.hstack(
(J_Sigma.indptr, J_smax_hyd.indptr[1:] + len(J_Sigma.data)))
Ntot = J_Sigma.shape[0] + J_smax_hyd.shape[0]
J._shape = (Ntot, Ntot)
# Stitching together the right hand sides of the equations
Sigma_rhs = sim.dust._rhs[...]
sim.dust._Y_rhs[:] = np.hstack((Sigma_rhs, smaxSig_rhs))
return J
def _f_impl_1_direct(x0, Y0, dx, jac=None, rhs=None, *args, **kwargs):
"""Implicit 1st-order integration scheme with direct matrix inversion
Parameters
----------
x0 : Intvar
Integration variable at beginning of scheme
Y0 : Field
Variable to be integrated at the beginning of scheme
dx : IntVar
Stepsize of integration variable
jac : Field, optional, defaul : None
Current Jacobian. Will be calculated, if not set
args : additional positional arguments
kwargs : additional keyworda arguments
Returns
-------
dY : Field
Delta of variable to be integrated
Butcher tableau
---------------
1 | 1
---|---
| 1
"""
if jac is None:
jac = Y0.jacobian(x0, dx)
if rhs is None:
rhs = np.array(Y0.ravel())
# Add external/explicit source terms to right-hand side
dust = Y0._owner.dust
# Note: s.max.derivative also sets s.sdot_shrink which is used below
s_max_deriv = dust.s.max.derivative()
S_Sigma_ext = np.zeros_like(dust.Sigma)
S_Sigma_ext[1:-1, ...] += dust.S.ext[1:-1, ...] + dust.S.compo[1:-1, ...]
# smax*Sigma (product rule)
S_smax_expl = np.zeros_like(dust.s.max)
S_smax_expl[1:-1] = s_max_deriv[1:-1] * dust.Sigma[1:-1, 1] \
+ (dust.S.ext[1:-1,1] + dust.S.coag[1:-1,1]) * dust.s.max[1:-1]
# Stitching both parts together
S = np.hstack((S_Sigma_ext.ravel(), S_smax_expl))
# Right hand side
rhs[...] += dx * S
N = jac.shape[0]
eye = sp.identity(N, format="csc")
A = eye - dx * jac
A_LU = sp.linalg.splu(A,
permc_spec="MMD_AT_PLUS_A",
diag_pivot_thresh=0.0,
options=dict(SymmetricMode=True))
Y1_ravel = A_LU.solve(rhs)
Y1 = Y1_ravel.reshape(Y0.shape)
return Y1 - Y0
[docs]
class impl_1_direct(Scheme):
"""Modified class for implicit dust integration."""
def __init__(self):
super().__init__(_f_impl_1_direct, description="Implicit 1st-order direct solver")