Source code for reddit_decider

import logging
import sys

from copy import deepcopy
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import IO
from typing import List
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import Union

from baseplate import RequestContext
from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib.events import DebugLogger
from baseplate.lib.events import EventLogger
from baseplate.lib.file_watcher import FileWatcher
from baseplate.lib.file_watcher import WatchedFileNotAvailableError
from reddit_edgecontext import ValidatedAuthenticationToken
from rust_decider import Decider as RustDecider
from rust_decider import DeciderException
from rust_decider import Decision
from rust_decider import FeatureNotFoundException
from rust_decider import ValueTypeMismatchException
from typing_extensions import Literal

from .prometheus_metrics import experiments_client_counter

# get package's version for prometheus metrics
if sys.version_info >= (3, 8):
    from importlib.metadata import version as pkg_version, PackageNotFoundError
else:
    from importlib_metadata import version as pkg_version, PackageNotFoundError

try:
    # see https://github.com/python/mypy/issues/8823#issuecomment-1484368501
    # for why cast is used (mypy)
    _pkg_version = cast(Callable[[str], str], pkg_version)("reddit-experiments")
except PackageNotFoundError:
    _pkg_version = ""

logger = logging.getLogger(__name__)

EMPLOYEE_ROLES = ["employee", "contractor"]
IDENTIFIERS = [
    "user_id",
    "device_id",
    "canonical_url",
    "subreddit_id",
    "ad_account_id",
    "business_id",
]
TYPE_STR_LOOKUP = {bool: "boolean", int: "integer", float: "float", str: "string", dict: "map"}


class EventType(Enum):
    EXPOSE = "expose"


@dataclass
class ExperimentConfig:
    id: int
    version: str
    name: str
    bucket_val: str
    start_ts: int
    stop_ts: int
    owner: str
    emit_event: Optional[bool] = None


class DeciderContext:
    """:code:`DeciderContext` is used to contain all fields necessary for
    bucketing, targeting, and overrides.
    :code:`DeciderContext` is populated in :code:`make_object_for_context()`

    :param user_id: user's t2 id
    :param device_id: device installation uuid
    :param country_code: 2-letter country codes
    :param locale: ISO 639-1 primary language subtag and an optional ISO 3166-1 alpha-2 region subtag
    :param user_is_employee:
    :param logged_in: is user logged in
    :param oauth_client_id: OAuth Client ID
    :param origin_service: Service where request originated
    :param cookie_created_timestamp: When the authentication cookie was created
    :param loid_created_timestamp: Epoch milliseconds when the current LoID cookie was created
    :param extracted_fields: Optional dict of additional fields, e.g. app_name & build_number
    """

    def __init__(
        self,
        user_id: Optional[str] = None,
        device_id: Optional[str] = None,
        country_code: Optional[str] = None,
        locale: Optional[str] = None,
        user_is_employee: Optional[bool] = None,
        logged_in: Optional[bool] = None,
        oauth_client_id: Optional[str] = None,
        origin_service: Optional[str] = None,
        cookie_created_timestamp: Optional[float] = None,
        loid_created_timestamp: Optional[float] = None,
        extracted_fields: Optional[dict] = None,
    ):
        self._user_id = user_id
        self._device_id = device_id
        self._country_code = country_code
        self._locale = locale
        self._user_is_employee = user_is_employee
        self._logged_in = logged_in
        self._oauth_client_id = oauth_client_id
        self._origin_service = origin_service
        self._cookie_created_timestamp = cookie_created_timestamp
        self._loid_created_timestamp = loid_created_timestamp
        self._extracted_fields = extracted_fields

    def to_dict(self) -> Dict:
        ef = deepcopy(self._extracted_fields or {})

        return {
            "user_id": self._user_id,
            "device_id": self._device_id,
            "country_code": self._country_code,
            "locale": self._locale,
            "user_is_employee": self._user_is_employee,
            "logged_in": self._logged_in,
            "oauth_client_id": self._oauth_client_id,
            "origin_service": self._origin_service,
            "cookie_created_timestamp": self._cookie_created_timestamp,
            "loid_created_timestamp": self._loid_created_timestamp,
            "other_fields": ef,
            **ef,
        }

    def to_event_dict(self) -> Dict:
        user_fields = {
            "id": self._user_id,
            "logged_in": self._logged_in,
            "cookie_created_timestamp": self._cookie_created_timestamp,
            "is_employee": self._user_is_employee,
        }

        ef = deepcopy(self._extracted_fields or {})

        app_fields = {}
        if ef.get("app_name"):
            app_fields["name"] = ef["app_name"]
        if ef.get("app_version"):
            app_fields["version"] = ef["app_version"]
        if ef.get("build_number"):
            app_fields["build_number"] = ef["build_number"]
        if self._locale:
            app_fields["relevant_locale"] = self._locale

        geo_fields = {}
        if self._country_code:
            geo_fields["country_code"] = self._country_code

        request_fields = {}
        if ef.get("canonical_url"):
            request_fields["canonical_url"] = ef["canonical_url"]

        platform_fields = {}
        if self._device_id:
            platform_fields["device_id"] = self._device_id

        return {
            "user_id": self._user_id,
            "country_code": self._country_code,
            "locale": self._locale,
            "user_is_employee": self._user_is_employee,
            "logged_in": self._logged_in,
            "device_id": self._device_id,
            "origin_service": self._origin_service,
            "cookie_created_timestamp": self._cookie_created_timestamp,
            "user": user_fields,
            "app": app_fields,
            "geo": geo_fields,
            "request": request_fields,
            "platform": platform_fields,
            **ef,
        }


def init_decider_parser(file: IO) -> Any:
    return RustDecider(file.name)


[docs]class Decider: """Access to experiments with automatic refresh when changed. This decider client allows access to the experiments cached on disk by the experiment configuration fetcher daemon. It will automatically reload the cache when changed. """ T = TypeVar("T") def __init__( self, decider_context: DeciderContext, internal: Optional[RustDecider], server_span: Span, context_name: str, event_logger: Optional[EventLogger] = None, ): self._decider_context = decider_context self._internal: RustDecider = internal self._span = server_span self._context_name = context_name if event_logger: self._event_logger = event_logger else: self._event_logger = DebugLogger() def internal_decider(self) -> RustDecider: return self._internal def _send_expose(self, event: str, exposure_fields: dict) -> None: event_fields = deepcopy(exposure_fields) try: ( _event_type, exp_id, name, version, event_variant, bucketing_value, bucket_val, start_ts, stop_ts, owner, ) = event.split("::::") except ValueError: logger.warning( f'Encountered error in event.split("::::") for event: {event}. Exposure not emitted.' ) return experiment = ExperimentConfig( id=self._cast_to_int(exp_id), name=name, version=version, bucket_val=bucket_val, start_ts=self._cast_to_int(start_ts), stop_ts=self._cast_to_int(stop_ts), owner=owner, ) event_fields = {**event_fields, **{bucket_val: bucketing_value}} self._event_logger.log( experiment=experiment, variant=event_variant, span=self._span, event_type=EventType.EXPOSE, inputs=event_fields, **event_fields, ) return def _send_expose_if_holdout(self, event: str, exposure_fields: dict) -> None: event_fields = deepcopy(exposure_fields) try: ( event_type, exp_id, name, version, event_variant, bucketing_value, bucket_val, start_ts, stop_ts, owner, ) = event.split("::::") except ValueError: logger.warning( f'Encountered error in event.split("::::") for event: {event}. Exposure not emitted.' ) return # event_type enum: # 0: regular bucketing # 1: override # 2: holdout if event_type == "2": experiment = ExperimentConfig( id=self._cast_to_int(exp_id), name=name, version=version, bucket_val=bucket_val, start_ts=self._cast_to_int(start_ts), stop_ts=self._cast_to_int(stop_ts), owner=owner, ) event_fields = {**event_fields, **{bucket_val: bucketing_value}} self._event_logger.log( experiment=experiment, variant=event_variant, span=self._span, event_type=EventType.EXPOSE, inputs=event_fields, **event_fields, ) return @staticmethod def _cast_to_int(input: str) -> int: out = 1 try: out = int(input) except ValueError as e: logger.info(f"Encountered error casting to integer: {e}") return out
[docs] def get_variant( self, experiment_name: str, **exposure_kwargs: Optional[Dict[str, Any]] ) -> Optional[str]: """Return a bucketing variant, if any, with auto-exposure. Since calling :code:`get_variant()` will fire an exposure event, it is best to call it when you are sure the user will be exposed to the experiment. If you absolutely must check the status of an experiment before the user will be exposed to the experiment, use :code:`get_variant_without_expose()` to disable exposure events and call :code:`expose()` manually later. :param experiment_name: Name of the experiment you want a variant for. :param exposure_kwargs: Additional arguments that will be passed to :code:`events_logger` (keys must be part of v2 event schema, use dicts for nested fields) under :code:`inputs` and as :code:`kwargs` :return: Variant name if a variant is assigned, :code:`None` otherwise. """ ctx = self._decider_context.to_dict() decision = self._get_decision(experiment_name, ctx) if decision is None: return None event_context_fields = self._decider_context.to_event_dict() event_context_fields.update(exposure_kwargs or {}) for event in decision.events: self._send_expose(event=event, exposure_fields=event_context_fields) return decision.variant
[docs] def get_variant_without_expose(self, experiment_name: str) -> Optional[str]: """Return a bucketing variant, if any, without emitting exposure event. The :code:`expose()` function is available to be manually called afterward. However, experiments in Holdout Groups will still send an exposure for the holdout parent experiment, since it is not possible to manually expose the holdout later (because it's impossible to know if a returned :code:`None` or :code:`"control_1"` string came from the holdout group or its child experiment once this function exits). :param experiment_name: Name of the experiment you want a variant for. :return: Variant name if a variant is assigned, None otherwise. """ ctx = self._decider_context.to_dict() decision = self._get_decision(experiment_name, ctx) if decision is None: return None event_context_fields = self._decider_context.to_event_dict() for event in decision.events: self._send_expose_if_holdout(event=event, exposure_fields=event_context_fields) return decision.variant
[docs] def expose( self, experiment_name: str, variant_name: str, **exposure_kwargs: Optional[Dict[str, Any]] ) -> None: """Log an event to indicate that a user has been exposed to an experimental treatment. Meant to be used after calling :code:`get_variant_without_expose()` since :code:`get_variant()` emits exposure event automatically. :param experiment_name: Name of the experiment that was exposed. :param variant_name: Name of the variant that was exposed. :param exposure_kwargs: Additional arguments that will be passed to :code:`events_logger` (keys must be part of v2 event schema, use dicts for nested fields) under :code:`inputs` and as :code:`kwargs` """ if variant_name is None or variant_name == "": return if self._internal is None: logger.error("RustDecider is None--did not initialize.") return try: feature = self._internal.get_feature(experiment_name) except FeatureNotFoundException: return except DeciderException as exc: logger.error("[decider] %s", str(exc)) return # drop exposure for feature rollouts if not feature.emit_event: return event_context_fields = self._decider_context.to_event_dict() event_context_fields.update(exposure_kwargs or {}) event_fields = deepcopy(event_context_fields) experiment = ExperimentConfig( id=feature.id, name=feature.name, version=str(feature.version), bucket_val=feature.bucket_val, start_ts=feature.start_ts, stop_ts=feature.stop_ts, owner=feature.owner, ) self._event_logger.log( experiment=experiment, variant=variant_name, span=self._span, event_type=EventType.EXPOSE, inputs=event_fields, **event_fields, )
[docs] def get_variant_for_identifier( self, experiment_name: str, identifier: str, identifier_type: Literal[ "user_id", "device_id", "canonical_url", "subreddit_id", "ad_account_id", "business_id" ], **exposure_kwargs: Optional[Dict[str, Any]], ) -> Optional[str]: """Return a bucketing variant, if any, with auto-exposure for a given :code:`identifier`. Note: If the experiment's :code:`bucket_val` (e.g. "user_id", "device_id", "canonical_url", "subreddit_id", "ad_account_id", "business_id") does not match the :code:`identifier_type` param, the :code:`identifier` will be ignored and not used to bucket (:code:`{identifier_type: identifier}` is added to internal :code:`DeciderContext` instance, but doesn't act like a bucketing override). If the :code:`bucket_val` field exists on the :code:`DeciderContext` instance, that field will be used to bucket, since it corresponds to the experiment's config. Since calling :code:`get_variant_for_identifier()` will fire an exposure event, it is best to call it when you are sure the user will be exposed to the experiment. :param experiment_name: Name of the experiment you want a variant for. :param identifier: an arbitary string used to bucket the experiment by being set on :code:`DeciderContext`'s :code:`identifier_type` field. :param identifier_type: Sets :code:`{identifier_type: identifier}` on :code:`DeciderContext`. The experiment's :code:`bucket_val` will be looked up in :code:`DeciderContext` and be used to bucket. If the experiment's :code:`bucket_val` field does not match :code:`identifier_type` param, :code:`identifier` will be ignored, and the field corresponding :code:`bucket_val` will be looked up from :code:`DeciderContext` for bucketing. :param exposure_kwargs: Additional arguments that will be passed to :code:`events_logger` (keys must be part of v2 event schema, use dicts for nested fields) under :code:`inputs` and as :code:`kwargs` :return: Variant name if a variant is assigned, None otherwise. """ if identifier_type not in IDENTIFIERS: logger.warning( f'"{identifier_type}" is not one of supported "identifier_type": {IDENTIFIERS}.' ) return None ctx = self._decider_context.to_dict() ctx[identifier_type] = identifier decision = self._get_decision(experiment_name, ctx) if decision is None: return None event_context_fields = self._decider_context.to_event_dict() event_context_fields.update(exposure_kwargs or {}) for event in decision.events: self._send_expose(event=event, exposure_fields=event_context_fields) return decision.variant
[docs] def get_variant_for_identifier_without_expose( self, experiment_name: str, identifier: str, identifier_type: Literal[ "user_id", "device_id", "canonical_url", "subreddit_id", "ad_account_id", "business_id" ], ) -> Optional[str]: """Return a bucketing variant, if any, without emitting exposure event for a given :code:`identifier`. Note: If the experiment's :code:`bucket_val` (e.g. "user_id", "device_id", "canonical_url", "subreddit_id", "ad_account_id", "business_id") does not match the :code:`identifier_type` param, the :code:`identifier` will be ignored and not used to bucket (:code:`{identifier_type: identifier}` is added to internal :code:`DeciderContext` instance, but doesn't act like a bucketing override). If the :code:`bucket_val` field exists on the :code:`DeciderContext` instance, that field will be used to bucket, since it corresponds to the experiment's config. The :code:`expose()` function is available to be manually called afterward to emit exposure event. However, experiments in Holdout Groups will still send an exposure for the holdout parent experiment, since it is not possible to manually expose the holdout later (because it's impossible to know if a returned :code:`None` or :code:`"control_1"` string came from the holdout group or its child experiment once this function exits). :param experiment_name: Name of the experiment you want a variant for. :param identifier: an arbitary string used to bucket the experiment by being set on :code:`DeciderContext`'s :code:`identifier_type` field. :param identifier_type: Sets :code:`{identifier_type: identifier}` on :code:`DeciderContext`. The experiment's :code:`bucket_val` will be looked up in :code:`DeciderContext` and be used to bucket. If the experiment's :code:`bucket_val` field does not match :code:`identifier_type` param, :code:`identifier` will be ignored and the field corresponding :code:`bucket_val` will be looked up from :code:`DeciderContext` for bucketing. :return: Variant name if a variant is assigned, None otherwise. """ if identifier_type not in IDENTIFIERS: logger.warning( f'"{identifier_type}" is not one of supported "identifier_type": {IDENTIFIERS}.' ) return None ctx = self._decider_context.to_dict() ctx[identifier_type] = identifier decision = self._get_decision(experiment_name, ctx) if decision is None: return None event_context_fields = self._decider_context.to_event_dict() # expose Holdout if the experiment is part of one for event in decision.events: self._send_expose_if_holdout(event=event, exposure_fields=event_context_fields) return decision.variant
[docs] def get_all_variants_without_expose(self) -> List[Dict[str, Union[str, int]]]: """Return a list of experiment dicts in this format: .. code-block:: json [ { "id": 1, "name": "variant_1", "version": "1", "experimentName": "exp_1" } ] If an experiment has a variant of :code:`None`, it is not included in the returned list. All available experiments get bucketed. Exposure events are not emitted. The :code:`expose()` function is available to be manually called afterward to emit exposure event. However, experiments in Holdout Groups will still send an exposure for the holdout parent experiment, since it is not possible to manually expose the holdout later (because it's impossible to know if a returned :code:`None` or :code:`"control_1"` string came from the holdout group or its child experiment once this function exits). :return: list of experiment dicts with non-:code:`None` variants. """ ctx = self._decider_context.to_dict() all_decisions = self._get_all_decisions(ctx) if all_decisions is None: return [] parsed_choices = [] event_context_fields = self._decider_context.to_event_dict() for decision in all_decisions.values(): if decision.variant: parsed_choices.append(self._decision_to_dict(decision)) # expose Holdout if the experiment is part of one for event in decision.events: self._send_expose_if_holdout(event=event, exposure_fields=event_context_fields) return parsed_choices
def _decision_to_dict(self, decision: Decision) -> Dict[str, Any]: return { "name": decision.variant, "id": decision.feature_id, "version": str(decision.feature_version), "experimentName": decision.feature_name, }
[docs] def get_all_variants_for_identifier_without_expose( self, identifier: str, identifier_type: Literal[ "user_id", "device_id", "canonical_url", "subreddit_id", "ad_account_id", "business_id" ], ) -> List[Dict[str, Union[str, int]]]: """Return a list of experiment dicts for experiments having :code:`bucket_val` match :code:`identifier_type`, for a given :code:`identifier`, in this format: .. code-block:: json [ { "id": 1, "name": "variant_1", "version": "1", "experimentName": "exp_1" } ] If an experiment has a variant of :code:`None`, it is not included in the returned list. All available experiments get bucketed. Exposure events are not emitted. However, experiments in Holdout Groups will still send an exposure for the holdout parent experiment, since it is not possible to manually expose the holdout later (because it's impossible to know if a returned :code:`None` or :code:`"control_1"` string came from the holdout group or its child experiment once this function exits). :param identifier: an arbitary string used to bucket the experiment by being set on :code:`DeciderContext`'s :code:`identifier_type` field. :param identifier_type: Sets :code:`{identifier_type: identifier}` on DeciderContext and buckets all experiment with matching :code:`bucket_val`. :return: list of experiment dicts with non-:code:`None` variants. """ if identifier_type not in IDENTIFIERS: logger.warning( f'"{identifier_type}" is not one of supported "identifier_type": {IDENTIFIERS}.' ) return [] ctx = self._decider_context.to_dict() ctx[identifier_type] = identifier all_decisions = self._get_all_decisions(ctx=ctx, bucketing_field_filter=identifier_type) if all_decisions is None: return [] parsed_choices = [] event_context_fields = self._decider_context.to_event_dict() for decision in all_decisions.values(): if decision.variant: parsed_choices.append(self._decision_to_dict(decision)) # expose Holdout if the experiment is part of one for event in decision.events: self._send_expose_if_holdout(event=event, exposure_fields=event_context_fields) return parsed_choices
[docs] def get_bool(self, feature_name: str, default: bool = False) -> bool: """Fetch a Dynamic Configuration of boolean type. :param feature_name: Name of the dynamic config you want a value for. :param default: what is returned if dynamic config is not active (:code:`False` unless overriden). :return: the boolean value of the dyanimc config if it is active/exists, :code:`default` parameter otherwise. """ if self._internal is None: logger.error("rs_decider is None--did not initialize.") return default return self._get_dynamic_config_value(feature_name, default, bool, self._internal.get_bool)
[docs] def get_int(self, feature_name: str, default: int = 0) -> int: """Fetch a Dynamic Configuration of int type. :param feature_name: Name of the dynamic config you want a value for. :param default: what is returned if dynamic config is not active (:code:`0` unless overriden). :return: the int value of the dyanimc config if it is active/exists, :code:`default` parameter otherwise. """ if self._internal is None: logger.error("rs_decider is None--did not initialize.") return default return self._get_dynamic_config_value(feature_name, default, int, self._internal.get_int)
[docs] def get_float(self, feature_name: str, default: float = 0.0) -> float: """Fetch a Dynamic Configuration of float type. :param feature_name: Name of the dynamic config you want a value for. :param default: what is returned if dynamic config is not active (:code:`0.0` unless overriden). :return: the float value of the dyanimc config if it is active/exists, :code:`default` parameter otherwise. """ if self._internal is None: logger.error("rs_decider is None--did not initialize.") return default return self._get_dynamic_config_value( feature_name, default, float, self._internal.get_float )
[docs] def get_string(self, feature_name: str, default: str = "") -> str: """Fetch a Dynamic Configuration of string type. :param feature_name: Name of the dynamic config you want a value for. :param default: what is returned if dynamic config is not active (:code:`""` unless overriden). :return: the string value of the dyanimc config if it is active/exists, :code:`default` parameter otherwise. """ if self._internal is None: logger.error("rs_decider is None--did not initialize.") return default return self._get_dynamic_config_value(feature_name, default, str, self._internal.get_string)
[docs] def get_map(self, feature_name: str, default: Optional[dict] = None) -> Optional[dict]: """Fetch a Dynamic Configuration of map type. :param feature_name: Name of the dynamic config you want a value for. :param default: what is returned if dynamic config is not active (:code:`None` unless overriden). :return: the map value of the dyanimc config if it is active/exists, :code:`default` parameter otherwise. """ if self._internal is None: logger.error("rs_decider is None--did not initialize.") return default return self._get_dynamic_config_value(feature_name, default, dict, self._internal.get_map)
[docs] def get_all_dynamic_configs(self) -> List[Dict[str, Any]]: """Return a list of dynamic configuration dicts in this format: .. code-block:: json [ { "name": "example_dc", "type": "float", "value": 1.0 } ] where "type" field can be one of: .. code-block:: python "boolean", "integer", "float", "string", "map" Dynamic Configurations that are malformed, fail parsing, or otherwise error for any reason are included in the response and have their respective default values set: .. code-block:: python "boolean" -> False "integer" -> 0 "float" -> 0.0 "string" -> "" "map" -> {} :return: list of all active dynamic config dicts. """ if self._internal is None: logger.error("rs_decider is None--did not initialize.") return [] ctx = self._decider_context.to_dict() try: values = self._internal.all_values(ctx) except DeciderException as exc: logger.error("[decider] %s", str(exc)) return [] parsed_configs = [] for feature_name, val in values.items(): parsed_configs.append(self._value_to_dc_dict(feature_name, val)) return parsed_configs
def _get_decision( self, experiment_name: str, ctx: Dict[str, Any], ) -> Optional[Decision]: if self._internal is None: logger.error("RustDecider is None--did not initialize.") return None try: return self._internal.choose(experiment_name, ctx) except FeatureNotFoundException: return None except DeciderException as exc: logger.error("[decider] %s", str(exc)) return None def _get_all_decisions( self, ctx: Dict[str, Any], bucketing_field_filter: Optional[str] = None ) -> Optional[Dict[str, Decision]]: if self._internal is None: logger.error("RustDecider is None--did not initialize.") return None try: return self._internal.choose_all(ctx, bucketing_field_filter) except DeciderException as exc: logger.error("[decider] %s %s", str(exc)) return None def _get_dynamic_config_value( self, feature_name: str, default: Any, dc_type: Type[T], get_fn: Callable[..., Type[T]], ) -> T: ctx = self._decider_context.to_dict() try: value = get_fn(feature_name=feature_name, context=ctx) except FeatureNotFoundException: return default except ValueTypeMismatchException as exc: logger.info(str(exc)) return default except DeciderException as exc: logger.error("[decider] %s", str(exc)) return default try: return dc_type(value) # type: ignore [call-arg] except TypeError: return default def _value_to_dc_dict(self, feature_name: str, value: Optional[Any]) -> Dict[str, Any]: return { "name": feature_name, "value": value, "type": "" if value is None else TYPE_STR_LOOKUP[type(value)], }
[docs] def get_experiment(self, experiment_name: str) -> Optional[ExperimentConfig]: """Get an :py:class:`~reddit_decider.ExperimentConfig` `dataclass <https://github.com/reddit/experiments.py/blob/develop/reddit_decider/__init__.py#L44>`_ representation of an experiment or :code:`None` if not found. :param experiment_name: Name of the experiment to be fetched. :return: an :py:class:`~reddit_decider.ExperimentConfig` `dataclass <https://github.com/reddit/experiments.py/blob/develop/reddit_decider/__init__.py#L44>`_ representation of an experiment if found, else :code:`None`. """ if self._internal is None: logger.error("RustDecider is None--did not initialize.") return None try: feature = self._internal.get_feature(experiment_name) except FeatureNotFoundException: return None except DeciderException as exc: logger.error("[decider] %s", str(exc)) return None return ExperimentConfig( id=feature.id, name=feature.name, version=str(feature.version), bucket_val=feature.bucket_val, start_ts=feature.start_ts, stop_ts=feature.stop_ts, owner=feature.owner, emit_event=feature.emit_event, )
[docs]class DeciderContextFactory(ContextFactory): """Decider client context factory. This factory will attach a new :py:class:`reddit_decider.Decider` to an attribute on the :py:class:`~baseplate.RequestContext`. :param path: Path to the experiment configuration file. :param event_logger: The logger to use to log experiment eligibility events. If not provided, a :py:class:`~baseplate.lib.events.DebugLogger` will be created and used. :param timeout: How long, in seconds, to block instantiation waiting for the watched experiments file to become available (defaults to not blocking). :param backoff: retry backoff time for experiments file watcher. Defaults to None, which is mapped to DEFAULT_FILEWATCHER_BACKOFF. :param request_field_extractor: an optional function used to populate fields such as "app_name" & "build_number" in DeciderContext() that may be used for targeting """ def __init__( self, path: str, event_logger: Optional[EventLogger] = None, timeout: Optional[float] = None, backoff: Optional[float] = None, request_field_extractor: Optional[ Callable[[RequestContext], Dict[str, Union[str, int, float, bool]]] ] = None, ): self._filewatcher = FileWatcher( path=path, parser=init_decider_parser, timeout=timeout, backoff=backoff ) self._event_logger = event_logger self._request_field_extractor = request_field_extractor @staticmethod def _is_employee(edge_context: Any) -> bool: return ( any([edge_context.user.has_role(role) for role in EMPLOYEE_ROLES]) if edge_context.user.is_logged_in else False ) @staticmethod def _prune_extracted_dict(extracted_dict: dict) -> dict: parsed_extracted_fields = deepcopy(extracted_dict) for k, v in extracted_dict.items(): # remove invalid keys if k is None or not isinstance(k, str): logger.info( f"{k} key in request_field_extractor() dict is not of type str and is removed." ) del parsed_extracted_fields[k] continue # remove invalid values if not isinstance(v, (int, float, str, bool)) and v is not None: logger.info( f"{k}: {v} value in `request_field_extractor()` dict is not one of type: [None, int, float, str, bool] and is removed." ) del parsed_extracted_fields[k] return parsed_extracted_fields def _minimal_decider( self, internal: Optional[RustDecider], name: str, span: Span, parsed_extracted_fields: Optional[Dict] = None, ) -> Decider: return Decider( decider_context=DeciderContext(extracted_fields=parsed_extracted_fields), internal=internal, server_span=span, context_name=name, event_logger=self._event_logger, ) def make_object_for_context(self, name: str, span: Span) -> Decider: def inc_failure_counter(failure_type: str) -> None: experiments_client_counter.labels( operation="make_object_for_context", success="false", error_type=failure_type, pkg_version=_pkg_version, ).inc() # initialize rust decider from watched manifest file rs_decider = None try: rs_decider = self._filewatcher.get_data() except WatchedFileNotAvailableError as exc: inc_failure_counter("watched_file_not_available") logger.error(f"Experiment config file unavailable: {exc}") # check for `span`'s presence if span is None: inc_failure_counter("missing:'span'") logger.debug("`span` is `None` in reddit_decider `make_object_for_context()`.") return self._minimal_decider(internal=rs_decider, name=name, span=span) # check for `span.context`'s presence request = getattr(span, "context", None) if request is None: inc_failure_counter("missing:'span.context'") return self._minimal_decider( internal=rs_decider, name=name, span=span, ) # extract fields from `span.context` if `self._request_field_extractor` is defined parsed_extracted_fields = None try: if self._request_field_extractor: extracted_fields = self._request_field_extractor(request) # prune any invalid keys/values parsed_extracted_fields = self._prune_extracted_dict( extracted_dict=extracted_fields ) except Exception as exc: inc_failure_counter("request_field_extractor") logger.error( f"Unable to extract fields from `request_field_extractor()` in `make_object_for_context()`. details: {exc}" ) # re-raise exception raised by `_request_field_extractor` # since it's user-defined & should be made visible raise exc ec = getattr(request, "edge_context", None) # if `edge_context` is inaccessible, bail field extraction early if ec is None: inc_failure_counter("missing:'request.edge_context'") return self._minimal_decider( internal=rs_decider, name=name, span=span, parsed_extracted_fields=parsed_extracted_fields, ) # All fields below are derived from `edge_context` user_id = None logged_in = None cookie_created_timestamp = None try: user_event_fields = ec.user.event_fields() user_id = user_event_fields.get("user_id") logged_in = user_event_fields.get("logged_in") cookie_created_timestamp = user_event_fields.get("cookie_created_timestamp") except Exception as exc: logger.info( f"Error while accessing `user.event_fields()` in `make_object_for_context()`. details: {exc}" ) loid_created_timestamp = None try: if isinstance(ec.authentication_token, ValidatedAuthenticationToken): loid_cms = ec.authentication_token.loid_created_ms if loid_cms: loid_created_timestamp = loid_cms except Exception as exc: logger.info( f"Unable to access `ec.authentication_token.loid_created_ms` in `make_object_for_context()`. details: {exc}" ) oauth_client_id = None try: if isinstance(ec.authentication_token, ValidatedAuthenticationToken): oc_id = ec.authentication_token.oauth_client_id if oc_id: oauth_client_id = oc_id except Exception as exc: logger.info( f"Unable to access `ec.authentication_token.oauth_client_id` in `make_object_for_context()`. details: {exc}" ) country_code = None try: country_code = ec.geolocation.country_code except Exception as exc: logger.info( f"Unable to access `ec.geolocation.country_code` in `make_object_for_context()`. details: {exc}" ) locale = None try: locale = ec.locale.locale_code except Exception as exc: logger.info( f"Unable to access `ec.locale.locale_code` in `make_object_for_context()`. details: {exc}" ) origin_service = None try: origin_service = ec.origin_service.name except Exception as exc: logger.info( f"Unable to access `ec.origin_service.name` in `make_object_for_context()`. details: {exc}" ) is_employee = None try: is_employee = self._is_employee(ec) except Exception as exc: logger.info( f"Error in `DeciderContextFactory.is_employee(ec)` in `make_object_for_context()`. details: {exc}" ) device_id = None try: device_id = ec.device.id except Exception as exc: logger.info( f"Unable to access `ec.device.id` in `make_object_for_context()`. details: {exc}" ) try: decider_context = DeciderContext( user_id=user_id, logged_in=logged_in, country_code=country_code, locale=locale, origin_service=origin_service, user_is_employee=is_employee, device_id=device_id, oauth_client_id=oauth_client_id, cookie_created_timestamp=cookie_created_timestamp, loid_created_timestamp=loid_created_timestamp, extracted_fields=parsed_extracted_fields, ) except Exception as exc: inc_failure_counter("DeciderContext_init_failed") logger.warning( f"Could not create full DeciderContext() (defaulting to empty DeciderContext()): {exc}" ) decider_context = DeciderContext() return Decider( decider_context=decider_context, internal=rs_decider, server_span=span, context_name=name, event_logger=self._event_logger, )
[docs]class DeciderClient(config.Parser): """Configure a decider client. This is meant to be used with :py:meth:`baseplate.Baseplate.configure_context`. See :py:func:`decider_client_from_config` for available configuration settings. :param event_logger: The EventLogger instance to be used to log bucketing events. :param prefix: the prefix used to filter config keys (defaults to "experiments."). :param request_field_extractor: (optional) function used to populate fields such as :code:`"app_name"` & :code:`"build_number"` in :code:`DeciderContext()` that may be used for targeting """ def __init__( self, event_logger: EventLogger, prefix: str = "experiments.", request_field_extractor: Optional[ Callable[[RequestContext], Dict[str, Union[str, int, float, bool]]] ] = None, ): self._prefix = prefix self._event_logger = event_logger self._request_field_extractor = request_field_extractor def parse(self, _key_path: str, raw_config: config.RawConfig) -> DeciderContextFactory: # `_key_path` is ignored for prefix because most services will not change `app_config` # to use "decider" key, so using `prefix` from `__init__` return decider_client_from_config( app_config=raw_config, event_logger=self._event_logger, prefix=self._prefix, request_field_extractor=self._request_field_extractor, )
[docs]def decider_client_from_config( app_config: config.RawConfig, event_logger: EventLogger, prefix: str = "experiments.", request_field_extractor: Optional[ Callable[[RequestContext], Dict[str, Union[str, int, float, bool]]] ] = None, ) -> DeciderContextFactory: """Configure and return an :py:class:`DeciderContextFactory` object. The keys used in your app's :code:`some_config.ini` file should be prefixed, e.g. ``experiments.path``, etc. Supported config keys: ``path`` (optional) The path to the experiment configuration file generated by the experiment configuration fetcher daemon. Defaults to :code:`"/var/local/experiments.json"`. ``timeout`` (optional) The time that we should wait for the file specified by ``path`` to exist. Defaults to blocking for :code:`30` seconds. ``backoff`` (optional) The base amount of time for exponential backoff when trying to find the experiments config file. Defaults to no backoff between tries. :param app_config: The application configuration which should have settings for the decider client. :param event_logger: The EventLogger to be used to log bucketing events. :param prefix: the prefix used to filter keys (defaults to "experiments."). :param request_field_extractor: (optional) function used to populate fields such as "app_name" & "build_number" in DeciderContext() that may be used for targeting """ assert prefix.endswith(".") config_prefix = prefix[:-1] cfg = config.parse_config( app_config, { config_prefix: { "path": config.Optional(config.String, default="/var/local/experiments.json"), "timeout": config.Optional(config.Timespan, default=timedelta(seconds=30)), "backoff": config.Optional(config.Timespan), } }, ) options = getattr(cfg, config_prefix) # pylint: disable=maybe-no-member if options.timeout: timeout = options.timeout.total_seconds() else: timeout = None if options.backoff: backoff = options.backoff.total_seconds() else: backoff = None return DeciderContextFactory( path=options.path, event_logger=event_logger, timeout=timeout, backoff=backoff, request_field_extractor=request_field_extractor, )