r"""This module contains the main loop for the DIS calculations.
There are two ways of using ``yadism``:
* ``Runner``: this class provides a runner that get the *theory* and
*observables* descriptions as input and manage the whole observables'
calculation process
* ``run_dis``: a function that wraps the construction of a ``Runner`` object
and calls the proper method to get the requested output
.. todo::
decide about ``run_dis`` and document it properly in module header
"""
import copy
import inspect
import io
import logging
import time
import numpy as np
import rich
import rich.align
import rich.box
import rich.console
import rich.markdown
import rich.panel
import rich.progress
from eko import basis_rotation as br
from eko import matchings
from eko.interpolation import InterpolatorDispatcher, XGrid
from eko.quantities.heavy_quarks import MatchingScales
from . import log, observable_name
from .coefficient_functions.coupling_constants import CouplingConstants
from .esf import scale_variations as sv
from .input import compatibility
from .output import Output
from .sf import StructureFunction as SF
from .xs import CrossSection as XS
logger = logging.getLogger(__name__)
[docs]
class Runner:
"""Wrapper to compute a process.
Parameters
----------
theory : dict
Dictionary with the theory parameters for the evolution (currently
including PDFSet and DIS process indication).
observables : dict
DIS parameters: process description, kinematic specification for the
requested output.
Notes
-----
For a full description of the content of `theory` and `dis_observables`
dictionaries read ??.
.. todo::
* reference on theory template
* detailed description of dis_observables entries
"""
banner = rich.align.Align(
rich.panel.Panel.fit(
inspect.cleandoc(
r""" __ __ _ _
\ \ / / | (_)
\ \_/ /_ _ __| |_ ___ _ __ ___
\ / _` |/ _` | / __| '_ ` _ \
| | (_| | (_| | \__ \ | | | | |
|_|\__,_|\__,_|_|___/_| |_| |_|
"""
),
rich.box.SQUARE,
padding=1,
style="magenta",
),
"center",
)
def __init__(self, theory: dict, observables: dict):
new_theory, new_observables = compatibility.update(theory, observables)
# Store inputs
self._theory = new_theory
self._observables = new_observables
# Setup eko stuffs
xgrid = XGrid(
self._observables["interpolation_xgrid"],
self._observables["interpolation_is_log"],
)
interpolator = InterpolatorDispatcher(
xgrid, self._observables["interpolation_polynomial_degree"], mode_N=False
)
# Non-eko theory
coupling_constants = CouplingConstants.from_dict(theory, self._observables)
pto = new_theory["PTODIS"]
pto_evol = new_theory["PTO"]
sv_manager = sv.ScaleVariations(
order=pto,
interpolator=interpolator,
activate_ren=new_theory["RenScaleVar"],
activate_fact=new_theory["FactScaleVar"],
)
# Initialize structure functions
masses = np.power([new_theory["mc"], new_theory["mb"], new_theory["mt"]], 2)
thresholds_ratios = np.power(
[new_theory["kcThr"], new_theory["kbThr"], new_theory["ktThr"]], 2
)
managers = dict(
interpolator=interpolator,
threshold=matchings.Atlas(
matching_scales=MatchingScales(list(masses * thresholds_ratios)),
origin=(new_theory["Q0"] ** 2, new_theory["nf0"]),
),
coupling_constants=coupling_constants,
sv_manager=sv_manager,
)
# pass theory params
theory_params = dict(
pto=pto,
pto_evol=pto_evol,
scheme=theory["FNS"],
nf_ff=theory["NfFF"],
ZMq=(new_theory["ZMc"], new_theory["ZMb"], new_theory["ZMt"]),
m2hq=masses,
TMC=theory["TMC"],
target=new_observables["TargetDIS"],
GF=theory["GF"],
M2W=theory["MW"] ** 2,
M2target=theory["MP"] ** 2,
fonllparts=new_theory["FONLLParts"],
n3lo_cf_variation=theory["n3lo_cf_variation"],
)
logger.info(
"PTO: %d, PTO@evolution: %d, process: %s",
pto,
pto_evol,
new_observables["prDIS"],
)
self.configs = RunnerConfigs(theory=theory_params, managers=managers)
logger.info("FNS: %s, NfFF: %d", theory["FNS"], theory["NfFF"])
logger.info(
"projectile: %s, target: {Z: %g, A: %g}",
new_observables["ProjectileDIS"],
*new_observables["TargetDIS"].values(),
)
self.observables = {}
for obs_name, kins in self._observables["observables"].items():
on = observable_name.ObservableName(obs_name)
if on.kind in observable_name.xs:
obs = XS(on, self)
else:
# TODO use get_sf?
obs = SF(on, self)
# read kinematics
obs.load(kins)
self.observables[obs_name] = obs
# output console
if log.silent_mode:
file = io.StringIO()
else:
file = None
self.console = rich.console.Console(file=file)
log.setup(self.console)
# ==============================
# Initialize output
# ==============================
self._output = Output()
self._output.theory = theory
self._output.observables = observables
self._output.update(interpolator.to_dict())
self._output["pids"] = br.flavor_basis_pids
self._output["projectilePID"] = coupling_constants.obs_config["projectilePID"]
[docs]
def get_sf(self, obs_name):
"""Return associated SF object."""
if obs_name.name not in self.observables:
self.observables[obs_name.name] = SF(obs_name, self)
return self.observables[obs_name.name]
[docs]
def drop_cache(self):
"""Drop the whole cache for all observables.
This preserves final results, since they are not part of the
cache.
"""
for obs in self.observables.values():
if isinstance(obs, SF):
obs.drop_cache()
[docs]
def replace_nans_with_0(self, out):
"""Replace any NaNs in output with 0.0
The small-x (i.e. large eta) limit is not addressed in LeProHQ because
the high energy limit of the polarized case is not known and that is the
main purpose of LeProHQ. As a result LeProHQ may return NaNs in the
small-x limit, which this function replaces with 0.0.
Note that the value of x where this plays a role is below the
experimental regime and thus this does not affect the description of
data, but only e.g. the grids used for the FIATLUX photon computation.
"""
out2 = copy.deepcopy(out)
# Loop through each observable in the dictionary
for observable, points in out2.items():
# Skip the keys that are not an observable
if observable not in observable_name.kinds:
continue
# Loop over the kinematic points
for point in points:
# Loop over each perturbative order
for values in point.orders.values():
# `values` is a tuple containing the computed values and the integration error
for tup in range(2):
# Set any NaN or inf values in the array to 0
values[tup][~np.isfinite(values[tup])] = 0.0
return out2
[docs]
def get_result(self):
"""Compute coefficient functions grid for requested kinematic points.
Returns
-------
:obj:`Output`
output object, it will store the coefficient functions grid
(flavour, interpolation-index) for each requested kinematic
point (x, Q2)
"""
self.console.print(self.banner)
# precomputing the plan of calculation
precomputed_plan = {}
printable_plan = []
for name, obs in self.observables.items():
if name in self._observables["observables"].keys():
precomputed_plan[name] = obs
printable_plan.append(
f"- {name} at {len(self._observables['observables'][name])} pts"
)
self.console.print(rich.markdown.Markdown("## Plan"))
self.console.print(rich.markdown.Markdown("\n".join(printable_plan)))
# running the calculation
self.console.print(rich.markdown.Markdown("## Calculation"))
self.console.print("yadism took off! please stay tuned ...")
start = time.time()
with rich.progress.Progress(transient=True, console=self.console) as progress:
task = progress.add_task(
"Starting...",
total=sum(len(obs) for obs in precomputed_plan.values()),
)
for name, obs in precomputed_plan.items():
# init slots
results = [None] * len(obs)
Q2 = None
# compute
for idx, elem in sorted(
enumerate(obs.elements), key=lambda indexed: indexed[1].Q2
):
# if we're changing Q2, drop cache
if Q2 is not None and Q2 != elem.Q2:
self.drop_cache()
Q2 = elem.Q2
results[idx] = elem.get_result()
progress.update(
task,
description=f"Computing [bold green]{name}",
advance=1,
)
self.drop_cache()
self._output[name] = results
end = time.time()
diff = end - start
self.console.print(f"[cyan]took {diff:.2f} s")
out = copy.deepcopy(self._output)
out = self.replace_nans_with_0(out)
return out
[docs]
class RunnerConfigs:
"""Runner Configuration."""
def __init__(self, theory, managers):
self.theory = theory
self.managers = managers
[docs]
def __getattribute__(self, name):
managers = object.__getattribute__(self, "managers")
if name == "managers":
return managers
if name in managers:
return self.managers[name]
theory = object.__getattribute__(self, "theory")
if name == "theory":
return theory
if name in theory:
return theory[name]
return super().__getattribute__(name)