Source code for gal_goku.init_power

"""Class to generate simulation ICS, separated out for clarity."""
from __future__ import print_function
from typing import Tuple, List, Type, Any
import os.path
import math
import subprocess
import json
import shutil
#To do crazy munging of types for the storage format
import importlib
import numpy as np
import configobj
import classylss
import classylss.binding as CLASS
#from . import utils
#from . import clusters
#from . import cambpower
import datetime

# DM-only
[docs] class SimulationICs(object): """ Class for creating the initial conditions for a simulation. There are a few things this class needs to do: - Generate CAMB input files - Generate MP-GenIC input files (to use CAMB output) - Run CAMB and MP-GenIC to generate ICs The class will store the parameters of the simulation. We also save a copy of the input and enough information to reproduce the results exactly in SimulationICs.json. Many things are left hard-coded. We assume flatness. Init parameters: ---- redshifts - redshifts at which to generate ICs omegab - baryon density. Note that if we do not have gas particles, still set omegab, but set separate_gas = False omega0 - Total matter density at z=0 (includes massive neutrinos and baryons) hubble - Hubble parameter, h, which is H0 / (100 km/s/Mpc) scalar_amp - A_s at k = 0.05, comparable to the Planck value. ns - Scalar spectral index m_nu - neutrino mass unitary - if true, do not scatter modes, but use a unitary gaussian amplitude. Remove: ---- separate_gas - if true the ICs will contain baryonic particles; If false, just DM. """ def __init__(self, *, seed : int = 9281110, redshifts: float = 99, redend: float = 0, omega0: float = 0.288, omegab: float = 0.0472, hubble: float = 0.7, scalar_amp: float = 2.427e-9, ns: float = 0.97, rscatter: bool = False, m_nu: float = 0, nu_hierarchy: str = 'normal', uvb: str = "pu", nu_acc: float = 1e-5, unitary: bool = True, w0_fld: float = -1., wa_fld: float = 0., N_ur: float = 3.044, alpha_s: float = 0, MWDM_therm: float = 0, python: str = "python") -> None: #Check that input is reasonable and set parameters #In Mpc/h print("__init__: initializing parameters...", datetime.datetime.now()) #Physically reasonable assert omega0 <= 1 and omega0 > 0 self.omega0 = omega0 assert omegab > 0 and omegab < 1 self.omegab = omegab #assert redshifts > 1 and redshifts < 1100 self.redshifts = redshifts assert redend >= 0 and redend < 1100 self.redend = redend assert hubble < 1 and hubble > 0 self.hubble = hubble assert scalar_amp < 1e-7 and scalar_amp > 0 self.scalar_amp = scalar_amp assert ns > 0 and ns < 2 self.ns = ns self.unitary = unitary # assert w0_fld < 0 self.w0_fld = w0_fld # assert wa_fld < 1 and wa_fld > -1 self.wa_fld = wa_fld assert N_ur >= 0 self.N_ur = N_ur assert alpha_s < 1 and alpha_s > -1 self.alpha_s = alpha_s T_CMB = 2.7255 # default cmb temperature omegag = 4.480075654158969e-07 * T_CMB**4 / self.hubble**2 self.omegag = omegag self.omega_ur = omegag * 0.22710731766023898 * (self.N_ur - 1.013198221453432*3) # MP-Gadget # assert self.omega_ur >= 0 assert MWDM_therm >= 0 self.MWDM_therm = MWDM_therm #Neutrino accuracy for CLASS self.nu_acc = nu_acc #UVB? Only matters if gas self.uvb = uvb assert self.uvb == "hm" or self.uvb == "fg" or self.uvb == "sh" or self.uvb == "pu" self.rscatter = rscatter #Structure seed. self.seed = seed #Baryons? self.separate_gas = False # separate_gas #If neutrinos are combined into the DM, #we want to use a different CAMB transfer when checking output power. self.separate_nu = False self.m_nu = m_nu self.nu_hierarchy = nu_hierarchy # initialize the cluster object: to store the submit info for specific # cluster in used #self._cluster = cluster_class( # gadget = self.gadgetexe, param = self.gadgetparam, # genic = self.genicexe, genicparam = self.genicout, # nproc = nproc, cores = cores, # gadget_dir = gadget_dir, mpi_ranks = mpi_ranks, threads = threads) # add nproc and cores # # make them optional #assert self._cluster.gadget_dir == os.path.expanduser(gadget_dir) #For repeatability, we store git hashes of Gadget, GenIC, CAMB and ourselves #at time of running. #self.simulation_git = utils.get_git_hash(os.path.dirname(__file__)) # sometime the path to python is slightly different, especially you have # multiple pythons and multiple virtual env self.python = python print("__init__: done.", datetime.datetime.now(),"\n") @property def json(self) -> dict: """ these are the variables will be saved into json """ return self.__dict__
[docs] def cambfile(self, maxk=0.5) -> Tuple[np.ndarray, np.ndarray]: """ Generate the IC power spectrum using classylss. Basically is using pre_params feed into class and compute the powerspec files based on the range of redshift and redend. All files are stored in the directory camb_out. Return: ---- camb_output (str) : power specs directory. default: "camb_linear/" """ #Load high precision defaults print("cambfile: loading defaults...", datetime.datetime.now()) pre_params = { 'tol_background_integration': 1e-9, 'tol_perturb_integration' : 1.e-7, 'tol_thermo_integration':1.e-5, 'k_per_decade_for_pk': 50,'k_bao_width': 8, 'k_per_decade_for_bao': 200, 'neglect_CMB_sources_below_visibility' : 1.e-30, 'transfer_neglect_late_source': 3000., 'l_max_g' : 50, 'l_max_ur':150, 'extra metric transfer functions': 'y'} #Set the neutrino density and subtract it from omega0 omeganu = self.m_nu/93.14/self.hubble**2 omcdm = (self.omega0 - self.omegab) - omeganu gparams = {'h': self.hubble, 'Omega_cdm': omcdm,'Omega_b': self.omegab, 'Omega_k': 0, 'n_s': self.ns, 'A_s': self.scalar_amp, 'alpha_s': self.alpha_s} #Lambda is computed self-consistently if self.w0_fld != -1.0 or self.wa_fld != 0.: gparams['Omega_Lambda'] = 0 gparams['w0_fld'] = self.w0_fld gparams['wa_fld'] = self.wa_fld if (gparams['w0_fld'] + gparams['wa_fld'] + 1) * (1 + gparams['w0_fld']) > 0: # if not phantom-crossing gparams['use_ppf'] = 'no' else: gparams['Omega_fld'] = 0 numass = get_neutrino_masses(self.m_nu, self.nu_hierarchy) #Set up massive neutrinos if self.m_nu > 0: print("cambfile: setting up massive neutrinos...") # gparams['m_ncdm'] = '%.8f,%.8f,%.8f' % (numass[2], numass[1], numass[0]) # gparams['N_ncdm'] = 3 m_ncdm = '' N_ncdm = 0 for i, numa in enumerate(numass): if numa == 0: continue if N_ncdm == 0: m_ncdm += '%.8f' % numa else: m_ncdm += ',%.8f' % numa N_ncdm += 1 gparams['m_ncdm'] = m_ncdm gparams['N_ncdm'] = N_ncdm # gparams['N_ur'] = 0.00641 # gparams['N_ur'] = self.N_ur - 3 gparams['N_ur'] = self.N_ur - gparams['N_ncdm'] * 1.013198221453432 # for N_ncdm = 3, N_ur = N_ur^desired - 3.0395946643602962 #Neutrino accuracy: Default pk_ref.pre has tol_ncdm_* = 1e-10, #which takes 45 minutes (!) on my laptop. #tol_ncdm_* = 1e-8 takes 20 minutes and is machine-accurate. #Default parameters are fast but off by 2%. #I chose 1e-5, which takes 6 minutes and is accurate to 1e-5 gparams['tol_ncdm_newtonian'] = self.nu_acc #min(self.nu_acc,1e-5) gparams['tol_ncdm_synchronous'] = self.nu_acc gparams['tol_ncdm_bg'] = 1e-10 gparams['l_max_ncdm'] = 50 #This disables the fluid approximations, which make P_nu not match # camb on small scales. #We need accurate P_nu to initialise our neutrino code. gparams['ncdm_fluid_approximation'] = 3 # 3: disable approximation; # enum ncdmfa_method {ncdmfa_mb,ncdmfa_hu,ncdmfa_CLASS,ncdmfa_none}; #Does nothing unless ncdm_fluid_approximation = 2 #Spend less time on neutrino power for smaller neutrino mass gparams['ncdm_fluid_trigger_tau_over_tau_k'] = 30000.* (self.m_nu / 0.4) else: # gparams['N_ur'] = 3.046 gparams['N_ur'] = self.N_ur # for mnu = 0, N_ur cannot be set to 0 in CLASS #Initial cosmology pre_params.update(gparams) #maxk = 2 * math.pi / self.box * self.npart * 8 maxk = maxk powerparams = {'output': 'dTk vTk mPk', 'P_k_max_h/Mpc' : maxk, "z_max_pk" : 99 + 1} pre_params.update(powerparams) #At which redshifts should we produce CAMB output: we want the start and # end redshifts of the simulation, but we also want some other values # for checking purposes camb_zz = self.redshifts classconf = configobj.ConfigObj() classconf.filename = None classconf.update(pre_params) classconf['z_pk'] = camb_zz classconf.write() # feed in the parameters and generate the powerspec object print("cambfile: generating the powerspec object...") engine = CLASS.ClassEngine(pre_params) powspec = CLASS.Spectra(engine) # powerspec is an object # bg = CLASS.Background(engine) # pre_params['Omega_fld'] = 1 - self.omega0 + bg.Omega0_lambda # so that Omega0_lambda == 0 (forced) # engine = CLASS.ClassEngine(pre_params) # powspec = CLASS.Spectra(engine) # powerspec is an object #Get transfer fucntion and linear power spectrum print(f"cambfile: getting the transfer functions... for z = {camb_zz} ") all_ks = [] all_pk_lins = [] for zz in camb_zz: trans = powspec.get_transfer(z=zz) #fp-roundoff trans['k (h/Mpc)'][-1] *= 0.9999 pk_lin = powspec.get_pklin(k=trans['k (h/Mpc)'], z=zz) all_ks.append(trans['k (h/Mpc)']) all_pk_lins.append(pk_lin) print("cambfile: done.", datetime.datetime.now(),"\n") return np.array(all_ks), np.array(all_pk_lins)
def _fromarray(self) -> None: """Convert the data stored as lists back to what it was.""" for arr in self._really_arrays: self.__dict__[arr] = np.array(self.__dict__[arr]) self._really_arrays = [] for arr in self._really_types: #Some crazy nonsense to convert the module, name #string tuple we stored back into a python type. mod = importlib.import_module(self.__dict__[arr][0]) self.__dict__[arr] = getattr(mod, self.__dict__[arr][1]) self._really_types = [] def _other_params(self, config: configobj.ConfigObj) -> configobj.ConfigObj: """Function to override to set other config parameters""" return config
[docs] def get_neutrino_masses(total_mass: float, hierarchy: str) -> np.ndarray: """Get the three neutrino masses, including the mass splittings. Hierarchy is 'inverted' (two heavy), 'normal' (two light) or degenerate.""" #Neutrino mass splittings nu_M21 = 7.53e-5 #Particle data group 2016: +- 0.18e-5 eV2 nu_M32n = 2.44e-3 #Particle data group: +- 0.06e-3 eV2 nu_M32i = 2.51e-3 #Particle data group: +- 0.06e-3 eV2 if hierarchy == 'normal': nu_M32 = nu_M32n #If the total mass is below that allowed by the hierarchy, #assign one active neutrino. # if total_mass < np.sqrt(nu_M32n) + np.sqrt(nu_M21): if total_mass < .06: return np.array([total_mass, 0, 0]) elif hierarchy == 'inverted': nu_M32 = -nu_M32i if total_mass < 2*np.sqrt(nu_M32i) - np.sqrt(nu_M21): return np.array([total_mass/2., total_mass/2., 0]) #Hierarchy == 0 is 3 degenerate neutrinos else: return np.ones(3)*total_mass/3. #DD is the summed masses of the two closest neutrinos DD1 = 4 * total_mass/3. - 2/3.*np.sqrt(total_mass**2 + 3*nu_M32 + 1.5*nu_M21) #Last term was neglected initially. This should be very well converged. DD = 4 * total_mass/3. - 2/3.*np.sqrt(total_mass**2 + 3*nu_M32 + 1.5*nu_M21+0.75*nu_M21**2/DD1**2) nu_masses = np.array([ total_mass - DD, 0.5*(DD + nu_M21/DD), 0.5*(DD - nu_M21/DD)]) assert np.isfinite(DD) assert np.abs(DD1/DD -1) < 2e-2 assert np.all(nu_masses >= 0) return nu_masses