import dustpy as dp
import dustpy.constants as c
import numpy as np
from simframe import Instruction
from simframe import Integrator
from simframe import schemes
from simframe.frame import Field
from simframe.io.writers import hdf5writer
from dustpy.utils import Boundary
from functools import partial
from . import std
import types
[docs]
class Simulation(dp.Simulation):
"""The main simulation class for running dust evolution simulations.
`tripodpy.Simulation`` is a child of ``dustpy.Simulation``,
which is in turn a child of ``simframe.Frame``.
For setting simple initial conditions use ``Simulation.ini``,
For making the simulation grids use ``Simulation.makegrids()``,
For initialization use ``Simulation.initialize()``,
For running simulations use ``Simulation.run()``.
Please have a look at the documentation of ``simframe`` for further details."""
# Exclude the following functions from the from DustPy inherited object
_excludefromdustpy = [
"checkmassconservation",
"setdustintegrator"
]
__name__ = "tripodpy"
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Add new fields
self.dust.m = None
self.dust.addgroup("q", description="Distribution exponents")
self.dust.q.drfrag = None
self.dust.q.eff = None
self.dust.q.frag = None
self.dust.q.sweep = None
self.dust.q.turb1 = None
self.dust.q.turb2 = None
self.dust.qrec = None
self.dust.S.smax_hyd = None
self.dust.q.updater = ["frag", "eff"]
self.dust.addgroup("s", description="Characteristic particle sizes")
self.dust.s.min = None
self.dust.s.max = None
self.dust.s.addgroup("boundary", description="boundary conditions of smax")
self.dust.s.boundary.inner = None
self.dust.s.boundary.outer = None
self.dust.s.lim = None
self.dust.addgroup(
"f", description="Fudge factors")
self.dust.f.crit = None
self.dust.f.drift = None
self.dust.f.dv = None
self.dust.f.updater = ["dv"]
self.dust.p.frag = None
self.dust.p.stick = None
self.dust.p.fragtrans = None
self.dust.p.driftfrag = None
self.dust.v.rad_flux = None
self.dust.p.updater = ["frag", "stick","fragtrans", "driftfrag"]
# Adjusting update orders
# Adding new elements to update order in a relative way
def addelemtafter(lst, elem, after):
idx = lst.index(after)
lst.insert(idx + 1, elem)
# Adjusting the updater of main simulation frame
updtordr = self.dust.updateorder
# Add "f" after "p"
addelemtafter(updtordr, "f", "p")
# move "a" after "f"
updtordr.remove("a")
updtordr.remove("v")
updtordr.remove("p")
updtordr.remove("St")
updtordr.remove("H")
updtordr.remove("rho")
updtordr.remove("D")
updtordr.remove("eps")
addelemtafter(updtordr, "qrec", "f")
addelemtafter(updtordr, "a", "qrec")
# Add "m" after "a"
addelemtafter(updtordr, "m", "a")
# Add St after "m"
addelemtafter(updtordr, "St", "m")
# Add "H" after "St"
addelemtafter(updtordr, "H", "St")
# Add "rho" after "H"
addelemtafter(updtordr, "rho", "H")
# Add "D" after "rho"
addelemtafter(updtordr, "D", "rho")
#add "eps" after "D"
addelemtafter(updtordr, "eps", "D")
# Add "v" after "eps"
addelemtafter(updtordr, "v", "eps")
# Add "p" after "v"
addelemtafter(updtordr, "p", "v")
# Add "q" after "p"
addelemtafter(updtordr, "q", "p")
# Add "SigmaFloor" after "m"
addelemtafter(updtordr, "SigmaFloor", "q")
# Removing elements that are not used
updtordr.remove("kernel")
# Assign updateorder
self.dust.updater = updtordr
# Deleting Fields that are not needed
del self.ini.grid.Nmbpd
del self.ini.grid.mmax
del self.ini.dust.erosionMassRatio
del self.ini.dust.excavatedMass
del self.ini.dust.fragmentDistribution
del self.dust.coagulation
del self.dust.kernel
del self.grid.m
del self.grid.Nm
# Note: the next two functions are to hide methods from DustPy that are not used in TriPoD
def __dir__(self):
'''This function hides all attributes in _excludefromparten from inherited DustPy object.
It is only hiding them. They can still be accessed.'''
exclude = set(self._excludefromdustpy) - set(self.__dict__.keys())
return sorted((set(dir(self.__class__)) | set(self.__dict__.keys())) - exclude)
def __getattribute__(self, name):
'''This function raises an attribute error for elements that should not be inherited from DustPy if they
were not manually set in tripodpy.'''
in_tp = name in super(
dp.Simulation, self).__getattribute__("__dict__")
in_dp = name in dp.Simulation.__dict__
in_ex = name in name in super(
dp.Simulation, self).__getattribute__("_excludefromdustpy")
if not in_tp and in_dp and in_ex:
raise AttributeError(name)
return super(dp.Simulation, self).__getattribute__(name)
[docs]
def run(self):
'''This functions runs the simulation.'''
# Print welcome message
if self.verbosity > 0:
msg = ""
msg += "\ntripodpy v{}".format(self.__version__)
msg += "\n"
print(msg)
# Actually run the simulation
super(dp.Simulation, self).run()
[docs]
def makegrids(self):
'''Function creates radial grid.
Notes
-----
The grids are set up with the parameters given in ``Simulation.ini``.
If you want to have a custom radial grid you have to set the array of grid cell interfaces
``Simulation.grid.ri``, before calling ``Simulation.makegrids()``.'''
# Number of mass species. Hard coded.
# The surface densities have Nm_short particle species.
# Other quantities need Nm_long particle species.
# We store them in hidden variables.
Nm_short = 2
Nm_long = 5
self.grid.addfield(
"_Nm_short",
Nm_short,
description="# of particle species for surface densities",
constant=True
)
self.grid.addfield(
"_Nm_long",
Nm_long,
description="# of particle species for calculations",
constant=True
)
self._makeradialgrid()
def _makeradialgrid(self):
'''Function sets the mass grid using the parameters set in ``Simulation.ini``.'''
if self.grid.ri is None:
ri = np.logspace(np.log10(self.ini.grid.rmin), np.log10(
self.ini.grid.rmax), num=self.ini.grid.Nr + 1, base=10.)
Nr = self.ini.grid.Nr
else:
ri = self.grid.ri
Nr = ri.shape[0] - 1
r = 0.5 * (ri[:-1] + ri[1:])
A = np.pi * (ri[1:] ** 2 - ri[:-1] ** 2)
self.grid.addfield(
"Nr", Nr, description="# of radial grid cells", constant=True)
self.grid.addfield(
"r", r, description="Radial grid cell centers [cm]", constant=True)
self.grid.addfield(
"ri", ri, description="Radial grid cell interfaces [cm]", constant=True)
self.grid.addfield(
"A", A, description="Radial grid annulus area [cm²]", constant=True)
[docs]
def initialize(self):
'''Function initializes the simulation frame.
Function sets all fields that are None with a standard value.
If the grids are not set, it will call ``Simulation.makegrids()`` first.'''
if not isinstance(self.grid.Nr, Field):
self.makegrids()
# Set integration variable
if self.t is None:
self.addintegrationvariable("t", 0., description="Time [s]")
self.t.cfl = 0.1
self.t.updater = std.sim.dt
self.t.snapshots = np.logspace(3., 5., num=21, base=10.) * c.year
# Initialize groups
self._initializestar()
self._initializegrid()
self._initializegas()
self._initializedust()
# Set integrator
if self.integrator is None:
instructions = [
Instruction(
std.dust.impl_1_direct,
self.dust._Y,
controller={"rhs": self.dust._Y_rhs},
description="Dust (state vector): implicit 1st-order direct solver"
),Instruction(schemes.update, self.dust._Y, description="Update all int variable after dust step")
]
self.integrator = Integrator(
self.t, description="Default integrator")
self.integrator.instructions = instructions
self.integrator.preparator = std.sim.prepare_implicit_dust
self.integrator.finalizer = std.sim.finalize_implicit_dust
# setup gas_compo
# Adding gas components to gas group
self.addgroup("components", description="components")
self.components.addfield("_gas_updated", False, description="Flag to indicate if gas has been updated in this timestep", constant=False)
self.components.addfield("_dust_updated", False, description="Flag to indicate if gas has been updated in this timestep", constant=False)
self.addfield("_dust_compo",False, description="Flag to indicate if composition affects dust", constant=False)
self.components.updater = []
lst = self.updateorder.copy()
lst.insert(lst.index("gas"), "components")
self.updater = lst
#self.addgascomponent("default", self.gas.Sigma[...], self.ini.gas.mu, tracer=False, description="Molecular hydrogen")
# Add component for H2
self.addcomponent = types.MethodType(std.addcomponent, self)
std.addcomponent(self,"Default", self.gas.Sigma, self.ini.gas.mu, gas_active=True, description="Default gas component")
# Adding dust surface density updater to gas updater
self.gas.updater = ["Sigma"] + self.gas.updateorder
#change the updater of Sigma to include composition
self.gas.Sigma.updater = std.gas.Sigma_tot
self.gas.mu.updater = std.gas.mu
self.gas.S.ext.updater = std.gas.S_ext_total
# Bring the entire object to initial state
dp.std.gas.enforce_floor_value(self)
self.update()
# Set writer
if self.writer is None:
self.writer = dp.hdf5writer()
def _initializedust(self):
'''Function to initialize dust quantities'''
# Shapes needed to initialize arrays
# Only radial grid
shape1 = (int(self.grid.Nr))
# Radial grid and long particle grid
shape2 = (int(self.grid.Nr), int(self.grid._Nm_long))
# Radial grid and short particle grid for surface densities
shape2Sigma = (int(self.grid.Nr), int(self.grid._Nm_short))
# Length of vector of implicit integration
shape2Sigmaravel = (int(self.grid.Nr * self.grid._Nm_short))
# Radial grid interfaces and short particle grid
shape2p1Sigma = (int(self.grid.Nr) + 1, int(self.grid._Nm_short))
# Radial grid and twice long mass grid for relative velocities etc.
shape3 = (int(self.grid.Nr), int(
self.grid._Nm_long), int(self.grid._Nm_long))
# Particle size
if self.dust.a is None:
self.dust.addfield(
"a", np.ones(shape2), description="Particle sizes in cm: [a0, fudge * a1, a1, 0.5 * amax, amax]"
)
self.dust.a.updater = std.dust.a
# Particle mass
if self.dust.m is None:
self.dust.addfield(
"m", np.ones(shape2), description="Particle mass [g]"
)
self.dust.m.updater = std.dust.m
# Diffusivity
if self.dust.D is None:
self.dust.addfield(
"D", np.zeros(shape2), description="Diffusivity [cm²/s]"
)
self.dust.D.updater = std.dust.D_mod
# Deltas
if self.dust.delta.rad is None:
delta = self.ini.gas.alpha * np.ones(shape1)
self.dust.delta.addfield(
"rad", delta, description="Radial mixing parameter"
)
if self.dust.delta.turb is None:
delta = self.ini.gas.alpha * np.ones(shape1)
self.dust.delta.addfield(
"turb", delta, description="Turbulent mixing parameter"
)
if self.dust.delta.vert is None:
delta = self.ini.gas.alpha * np.ones(shape1)
self.dust.delta.addfield(
"vert", delta, description="Vertical mixing parameter"
)
# Vertically integrated dust to gas ratio
if self.dust.eps is None:
self.dust.addfield(
"eps", np.zeros(shape1), description="Dust-to-gas ratio"
)
self.dust.eps.updater = dp.std.dust.eps
# Fluxes
if self.dust.Fi.adv is None:
self.dust.Fi.addfield(
"adv", np.zeros(shape2p1Sigma), description="Advective flux [g/cm/s]"
)
self.dust.Fi.adv.updater = std.dust.F_adv
if self.dust.Fi.diff is None:
self.dust.Fi.addfield(
"diff", np.zeros(shape2p1Sigma), description="Diffusive flux [g/cm/s]"
)
self.dust.Fi.diff.updater = std.dust.F_diff
if self.dust.Fi.tot is None:
self.dust.Fi.addfield(
"tot", np.zeros(shape2p1Sigma), description="Total flux [g/cm/s]"
)
self.dust.Fi.tot.updater = dp.std.dust.F_tot
# Filling factor
if self.dust.fill is None:
self.dust.addfield(
"fill", np.ones(shape2), description="Filling factor"
)
# Scale height
if self.dust.H is None:
self.dust.addfield(
"H", np.zeros(shape2), description="Scale heights [cm]"
)
self.dust.H.updater = std.dust.H
# Midplane mass density
if self.dust.rho is None:
self.dust.addfield(
"rho", np.zeros(shape2), description="Midplane mass density per mass bin [g/cm³]"
)
self.dust.rho.updater = std.dust.rho_midplane
# Solid state density
if self.dust.rhos is None:
rhos = self.ini.dust.rhoMonomer * np.ones(shape2)
self.dust.addfield(
"rhos", rhos, description="Solid state density [g/cm³]"
)
# Probabilities
if self.dust.p.frag is None:
self.dust.p.frag = Field(self, np.zeros(
shape1), description="Fragmentation probability")
self.dust.p.frag.updater = std.dust.p_frag
if self.dust.p.stick is None:
self.dust.p.stick = Field(self, np.zeros(
shape1), description="Sticking probability")
self.dust.p.stick.updater = std.dust.p_stick
if self.dust.p.fragtrans is None:
self.dust.p.fragtrans = Field(self, np.zeros(
shape1), description="transition probability between fragmentation regimes")
self.dust.p.fragtrans.updater = std.dust.p_frag_trans
if self.dust.p.driftfrag is None:
self.dust.p.driftfrag = Field(self, np.zeros(
shape1), description="Transition function from drift to turbulence")
self.dust.p.driftfrag.updater = std.dust.p_drift_frag
# Source terms
if self.dust.S.ext is None:
self.dust.S.addfield(
"ext", np.zeros(shape2Sigma), description="External sources [g/cm²/s]"
)
if self.dust.S.hyd is None:
self.dust.S.addfield(
"hyd", np.zeros(shape2Sigma), description="Hydrodynamic sources [g/cm²/s]"
)
self.dust.S.hyd.updater = dp.std.dust.S_hyd
if self.dust.S.coag is None:
self.dust.S.addfield(
"coag", np.zeros(shape2Sigma), description="Coagulation sources [g/cm²/s]"
)
self.dust.S.coag.updater = std.dust.S_coag
self.dust.S.addfield("compo", np.zeros(shape2Sigma), description="Sources due to composition changes [g/cm²/s]")
self.dust.S.compo.updater = std.dust.S_compo
lst = self.dust.S.updateorder
lst.insert(0, "compo")
self.dust.S.updater = lst
if self.dust.S.tot is None:
self.dust.S.addfield(
"tot", np.zeros(shape2Sigma), description="Total sources [g/cm²/s]"
)
self.dust.S.tot.updater = std.dust.S_tot
if self.dust.S.smax_hyd is None:
self.dust.S.addfield(
"smax_hyd", np.zeros(shape1), description="Total sources [g/cm²/s]"
)
self.dust.S.smax_hyd.updater = std.dust.S_smax_hyd
updtordr = self.dust.S.updateorder
updtordr.append("smax_hyd")
self.dust.S.updater = updtordr
# Stokes number
if self.dust.St is None:
self.dust.addfield(
"St", np.zeros(shape2), description="Stokes number"
)
self.dust.St.updater = dp.std.dust.St_Epstein_StokesI
# Velocities
if self.dust.v.frag is None:
vfrag = self.ini.dust.vFrag * np.ones(shape1)
self.dust.v.addfield(
"frag", vfrag, description="Fragmentation velocity [cm/s]"
)
if self.dust.v.rel.azi is None:
self.dust.v.rel.addfield(
"azi", np.zeros(shape3), description="Relative azimuthal velocity [cm/s]"
)
self.dust.v.rel.azi.updater = dp.std.dust.vrel_azimuthal_drift
if self.dust.v.rel.brown is None:
self.dust.v.rel.addfield(
"brown", np.zeros(shape3), description="Relative Brownian motion velocity [cm/s]"
)
self.dust.v.rel.brown.updater = std.dust.vrel_brownian_motion
if self.dust.v.rel.rad is None:
self.dust.v.rel.addfield(
"rad", np.zeros(shape3), description="Relative radial velocity [cm/s]"
)
self.dust.v.rel.rad.updater = dp.std.dust.vrel_radial_drift
if self.dust.v.rel.turb is None:
self.dust.v.rel.addfield(
"turb", np.zeros(shape3), description="Relative turbulent velocity [cm/s]"
)
self.dust.v.rel.turb.updater = dp.std.dust.vrel_turbulent_motion
if self.dust.v.rel.vert is None:
self.dust.v.rel.addfield(
"vert", np.zeros(shape3), description="Relative vertical settling velocity [cm/s]"
)
self.dust.v.rel.vert.updater = dp.std.dust.vrel_vertical_settling
if self.dust.v.rel.tot is None:
self.dust.v.rel.addfield(
"tot", np.zeros(shape3), description="Total relative velocity [cm/s]"
)
self.dust.v.rel.tot.updater = dp.std.dust.vrel_tot
if self.dust.v.driftmax is None:
self.dust.v.addfield(
"driftmax", np.zeros(shape1), description="Maximum drift velocity [cm/s]"
)
self.dust.v.driftmax.updater = dp.std.dust.vdriftmax
if self.dust.v.rad is None:
self.dust.v.addfield(
"rad", np.zeros(shape2), description="Radial velocity [cm/s]"
)
self.dust.v.rad.updater = dp.std.dust.vrad
if self.dust.v.rad_flux is None:
self.dust.v.addfield(
"rad_flux", np.zeros(shape2), description="Radial velocity modified to claulate the proper flux[cm/s]"
)
self.dust.v.rad_flux.updater = std.dust.vrad_mod
# Distribution exponents
if self.dust.q.eff is None:
q = np.ones(shape1)*-3.5 # will be computed in the updater
self.dust.q.addfield(
"eff", q, description="Calculated distribution exponent"
)
self.dust.q.eff.updater = std.dust.q_eff
if self.dust.q.frag is None:
self.dust.q.addfield(
"frag", np.ones(shape1), description="Fragmentation distribution exponent"
)
self.dust.q.frag.updater = std.dust.q_frag
if self.dust.qrec is None:
self.dust.addfield(
"qrec",q , description="reconstructed distribution exponent"
)
self.dust.qrec.updater = std.dust.q_rec
if self.dust.q.turb1 is None:
self.dust.q.addfield(
"turb1", -3.5, description="Size distribution exponent in first turbulence regime"
)
if self.dust.q.turb2 is None:
self.dust.q.addfield(
"turb2", -3.75, description="Size distribution exponent in second turbulence regime"
)
if self.dust.q.drfrag is None:
self.dust.q.addfield(
"drfrag", -3.75, description="Size distribution exponent in drift-induced fragmentation regime"
)
if self.dust.q.sweep is None:
self.dust.q.addfield(
"sweep", -3.0, description="Size distribution exponent in the sweep-up regime"
)
# Specific particle sizes
if self.dust.s.min is None:
rho = self.dust.rhos[:, 0] * self.dust.fill[:, 0]
smin = (3. * self.ini.grid.mmin / (4. * np.pi * rho)) ** (1. / 3.)
self.dust.s.addfield(
"min", smin, description="Minimum particle size"
)
# Particle size variation factors
if self.dust.f.crit is None:
self.dust.f.addfield(
"crit", 0.425, description="Critical mass depletion coefficient for shrinking"
)
if self.dust.f.drift is None:
self.dust.f.addfield(
"drift", 0.8, description="Drift velocity calibration factor"
)
if self.dust.f.dv is None:
self.dust.f.addfield(
"dv", 0.4, description="effective collision speed parameter"
)
# Initialize dust quantities partly to calculate Sigma
try:
self.dust.update()
except:
pass
# Initialize the initial maximum particle size
# This needs the pressure gradiant. Therefore, the other quantities
# need to be initialized previously
if self.dust.s.max is None:
smax = std.dust.smax_initial(self)
self.dust.s.addfield(
"max", smax, description="Maximum particle size"
)
self.dust.s.addfield(
"_maxOld", smax, description="Maximum particle size")
self.dust.s.max.differentiator = std.dust.smax_deriv
if self.dust.s.lim is None:
self.dust.s.addfield(
'lim', 1e-4, description="Limiting size for shrinking")
# Floor value
if self.dust.SigmaFloor is None:
SigmaFloor = 1.e-10 * np.ones(shape2Sigma)
self.dust.addfield(
"SigmaFloor", SigmaFloor, description="Floor value of surface density [g/cm²]"
)
# Surface density, if not set
if self.dust.Sigma is None:
Sigma = std.dust.Sigma_initial(self)
self.dust.addfield(
"Sigma", Sigma, description="Surface density per mass bin [g/cm²]"
)
self.dust.Sigma.jacobinator = std.dust.jacobian
# Fully initialize dust quantities
self.dust.update()
# Hidden fields
# We store the old values of the surface density in a hidden field
# to calculate the fluxes through the boundaries in case of implicit integration.
self.dust._SigmaOld = Field(
self, self.dust.Sigma, description="Previous value of surface density [g/cm²]"
)
self.dust.s._damp = Field(
self, np.ones(shape1), description="Previous value of surface density [g/cm²]"
)
# The right-hand side of the matrix equation is stored in a hidden field
self.dust._rhs = Field(self, np.zeros(
shape2Sigmaravel), description="Right-hand side of matrix equation [g/cm²]"
)
# storing coagulation and shrinkage separately to be able to ignore the shrinkate in the timestep computation
self.dust.s.sdot_coag = Field(
self, np.zeros(shape1),
description="coagulation source term for amax [cm/s]"
)
self.dust.s.sdot_shrink = Field(
self, np.zeros(shape1),
description="shrinkage source term for amax [cm²/s]",
)
# State vector
self.dust.addfield("_Y", np.zeros((int(self.grid._Nm_short) + 1) * int(self.grid.Nr)),
description="Dust state vector (Sig1, Sig2, a_max * Sig1)")
self.dust._Y.jacobinator = std.dust.Y_jacobian
# The right-hand side of the state vector matrix equation is stored in a hidden field
self.dust._Y_rhs = Field(self, np.zeros_like(
self.dust._Y), description="Right-hand side of state vector matrix equation"
)
# Boundary conditions
if self.dust.boundary.inner is None:
self.dust.boundary.inner = dp.utils.Boundary(
self.grid.r,
self.grid.ri,
self.dust.Sigma,
condition="const_grad"
)
if self.dust.boundary.outer is None:
self.dust.boundary.outer = dp.utils.Boundary(
self.grid.r[::-1],
self.grid.ri[::-1],
self.dust.Sigma[::-1],
condition="val",
value=0.1 * self.dust.SigmaFloor[-1,1]
)
# Boundary conditions of smax
if self.dust.s.boundary.inner is None:
self.dust.s.boundary.inner = dp.utils.Boundary(
self.grid.r,
self.grid.ri,
self.dust.Sigma[...,1]*self.dust.s.max,
condition="const_grad"
)
if self.dust.s.boundary.outer is None:
self.dust.s.boundary.outer = dp.utils.Boundary(
self.grid.r[::-1],
self.grid.ri[::-1],
self.dust.Sigma[::-1,1]*self.dust.s.max[::-1],
condition="val",
value= self.dust.SigmaFloor[-1,1]*self.dust.s.max[-1]
)