Source code for pymc_marketing.data.fivetran

#   Copyright 2022 - 2025 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
Fivetran data processing functions.

These functions help transform Fivetran's standardized schemas into formats suitable
for PyMC-Marketing models.

Example usage for MMM:

.. code-block:: python

    from pymc_marketing.data.fivetran import (
        process_fivetran_ad_reporting,
        process_fivetran_shopify_unique_orders,
    )
    from pymc_marketing.mmm import MMM

    # Process ad spend data for media channels
    x = process_fivetran_ad_reporting(
        campaign_df, value_columns="spend", rename_date_to="date"
    )
    # Result: date | facebook_ads_spend | google_ads_spend | ...

    # Process conversion data (orders) as target variable
    y = process_fivetran_shopify_unique_orders(orders_df)
    # Result: date | orders

    # Use in MMM model
    mmm = MMM(...)
    mmm.fit(X=x, y=y["orders"])

There are also pandas accessors for these functions which allows calling them from a
pandas DataFrame. These accessors are registered under the ``fivetran`` namespace and
can be accessed after importing pymc_marketing.

.. code-block:: python

    import pandas as pd

    from pymc_marketing.mmm import MMM


    campaign_df: pd.DataFrame = ...
    orders_df: pd.DataFrame = ...

    X: pd.DataFrame = campaign_df.fivetran.process_ad_reporting(value_columns="spend")
    y: pd.DataFrame = orders_df.fivetran.process_shopify_unique_orders()

    # Use in MMM model
    mmm = MMM(...)
    mmm.fit(X=x, y=y["orders"])

"""

from collections.abc import Sequence

import pandas as pd

from pymc_marketing.decorators import copy_docstring


def _normalize_and_validate_inputs(
    df: pd.DataFrame,
    value_columns: str | Sequence[str],
    date_col: str,
    platform_col: str,
) -> tuple[pd.DataFrame, list[str]]:
    """Validate required columns, coerce date, and normalize metrics list.

    Parameters
    ----------
    df : pandas.DataFrame
        Input dataframe to validate and normalize.
    value_columns : str or Sequence[str]
        Metric column(s) to process.
    date_col : str
        Name of the date column.
    platform_col : str
        Name of the platform column.

    Returns
    -------
    tuple[pandas.DataFrame, list[str]]
        - A copy of ``df`` with ``date_col`` coerced to datetime64[ns].
        - A normalized list of metric column names.

    Raises
    ------
    ValueError
        If any of the required columns are missing.
    """
    metrics = [value_columns] if isinstance(value_columns, str) else list(value_columns)

    required_columns: list[str] = [date_col, platform_col, *metrics]
    missing_columns = [c for c in required_columns if c not in df.columns]
    if missing_columns:
        raise ValueError(
            f"Missing required columns: {missing_columns}. Present columns: {list(df.columns)}"
        )

    df_local = df.copy()
    df_local[date_col] = pd.to_datetime(df_local[date_col])
    return df_local, metrics


def _aggregate_and_pivot(
    df: pd.DataFrame,
    date_col: str,
    platform_col: str,
    value_columns: list[str],
    agg: str,
) -> pd.DataFrame:
    """Aggregate metrics by date and platform, then pivot and flatten columns.

    Parameters
    ----------
    df : pandas.DataFrame
        Input dataframe with coerced date column.
    date_col : str
        Date column name.
    platform_col : str
        Platform column name.
    value_columns : list[str]
        Metric columns to aggregate and pivot.
    agg : str
        Aggregation function passed to pandas ``agg``.

    Returns
    -------
    pandas.DataFrame
        Wide-format dataframe indexed by date with columns named ``{platform}_{metric}``.
    """
    grouped = (
        df[[date_col, platform_col, *value_columns]]
        .groupby([date_col, platform_col], dropna=False)
        .agg(agg)
    )

    wide = grouped.reset_index().pivot(
        index=date_col, columns=platform_col, values=value_columns
    )

    if isinstance(wide.columns, pd.MultiIndex):
        flat_cols = [f"{platform}_{metric}" for metric, platform in wide.columns]
    else:
        metric = value_columns[0]
        flat_cols = [f"{platform}_{metric}" for platform in wide.columns]
    wide.columns = flat_cols
    return wide


def _finalize_wide_output(
    wide: pd.DataFrame,
    *,
    date_col: str,
    rename_date_to: str | None,
    include_missing_dates: bool,
    freq: str,
    fill_value: float | None,
) -> pd.DataFrame:
    """Complete wide output: fill dates, fill values, reset, rename, standardize.

    Parameters
    ----------
    wide : pandas.DataFrame
        Dataframe indexed by date.
    date_col : str
        Name of the date column in the input.
    rename_date_to : str or None
        Optional new name for the date column.
    include_missing_dates : bool
        If ``True``, reindex to a continuous date range using ``freq``.
    freq : str
        Frequency for ``pandas.date_range`` when including missing dates.
    fill_value : float or None
        Value used to fill missing values. If ``None``, do not fill.

    Returns
    -------
    pandas.DataFrame
        Finalized wide-format dataframe with standardized columns and first column as the date.
    """
    if include_missing_dates:
        full_index = pd.date_range(
            start=wide.index.min(), end=wide.index.max(), freq=freq
        )
        wide = wide.reindex(full_index)

    if fill_value is not None:
        wide = wide.fillna(fill_value)

    wide = wide.sort_index()
    wide.index.name = date_col
    wide = wide.reset_index()

    if rename_date_to is not None and rename_date_to != date_col:
        wide = wide.rename(columns={date_col: rename_date_to})

    # standardize column names
    wide.columns = [col.lower().replace(" ", "_") for col in wide.columns]

    # Ensure date column is first and normalized to midnight
    first_col = (
        (rename_date_to if rename_date_to is not None else date_col)
        .lower()
        .replace(" ", "_")
    )
    ordered_cols = [first_col] + [c for c in wide.columns if c != first_col]
    wide[first_col] = pd.to_datetime(wide[first_col]).dt.normalize()
    return wide[ordered_cols]


[docs] def process_fivetran_ad_reporting( df: pd.DataFrame, value_columns: str | Sequence[str] = "impressions", *, date_col: str = "date_day", platform_col: str = "platform", agg: str = "sum", fill_value: float | None = 0.0, include_missing_dates: bool = False, freq: str = "D", rename_date_to: str | None = "date", ) -> pd.DataFrame: """Process Fivetran Ad Reporting tables into wide, model-ready features. Compatible with Fivetran's Ad Reporting schema tables: - ad_reporting__account_report: daily metrics by account - ad_reporting__campaign_report: daily metrics by campaign and account - ad_reporting__ad_group_report: daily metrics by ad group, campaign and account - ad_reporting__ad_report: daily metrics by ad, ad group, campaign and account The input data must include a date column, a platform column (e.g., vendor name), and one or more metric columns such as ``spend`` or ``impressions``. The output is a wide dataframe with one row per date and columns named ``{platform}_{metric}``. Parameters ---------- df : pandas.DataFrame Input dataframe in long format with at least the date, platform, and metric columns. value_columns : str or Sequence[str], default "impressions" Column name(s) to aggregate and pivot. Example: "spend" or ["spend", "impressions"]. date_col : str, default "date_day" Name of the date column. platform_col : str, default "platform" Name of the platform (vendor) column. agg : str, default "sum" Aggregation method applied during groupby. fill_value : float or None, default 0.0 Value used to fill missing values in the wide output. If ``None``, missing values are left as NaN. include_missing_dates : bool, default False If ``True``, include a continuous date range and fill missing dates using ``fill_value``. freq : str, default "D" Frequency used when ``include_missing_dates`` is ``True``. rename_date_to : str or None, default "date" If provided, rename the date column in the result to this value. If ``None``, keep ``date_col``. Returns ------- pandas.DataFrame A wide-format dataframe with one row per date and columns for each ``{platform}_{metric}`` combination. """ df_local, metrics = _normalize_and_validate_inputs( df=df, value_columns=value_columns, date_col=date_col, platform_col=platform_col, ) wide = _aggregate_and_pivot( df_local, date_col=date_col, platform_col=platform_col, value_columns=metrics, agg=agg, ) return _finalize_wide_output( wide, date_col=date_col, rename_date_to=rename_date_to, include_missing_dates=include_missing_dates, freq=freq, fill_value=fill_value, )
[docs] def process_fivetran_shopify_unique_orders( df: pd.DataFrame, *, date_col: str = "processed_timestamp", order_key_col: str = "orders_unique_key", rename_date_to: str = "date", ) -> pd.DataFrame: """Compute daily unique order counts from a (pre-filtered) Shopify dataset. This function targets data following the Fivetran Shopify orders schema (e.g., ``shopify__orders``). It assumes the input ``df`` is already filtered to the desired subset (e.g., non-canceled, US-delivery, new-only orders). Parameters ---------- df : pandas.DataFrame Input dataframe following the Shopify orders schema. date_col : str, default "processed_timestamp" Timestamp column from which the daily bucket is derived. order_key_col : str, default "orders_unique_key" Unique order identifier column. rename_date_to : str, default "date" Name of the date column in the result. Returns ------- pandas.DataFrame A dataframe with two columns: ``rename_date_to`` and ``orders``, where ``orders`` is the unique order count per day. """ # 1) Required columns missing = [c for c in (date_col, order_key_col) if c not in df.columns] if missing: raise ValueError( f"Missing required column(s): {missing}. Present: {list(df.columns)}" ) # 2) Minimal projection + robust datetime parsing tmp = df[[order_key_col, date_col]].copy() tmp[date_col] = pd.to_datetime(tmp[date_col], errors="coerce") tmp = tmp.dropna(subset=[date_col]) # 3) Daily bucket normalized to midnight, preserving datetime64[ns] dtype tmp["_date"] = tmp[date_col].dt.normalize() # 4) De-duplicate by (order, day) before counting tmp = tmp.drop_duplicates(subset=[order_key_col, "_date"]) # 5) Count unique orders per day out = ( tmp.groupby("_date", as_index=False) .agg(orders=(order_key_col, "nunique")) .rename(columns={"_date": rename_date_to}) .sort_values(rename_date_to) .reset_index(drop=True) ) return out
[docs] @pd.api.extensions.register_dataframe_accessor("fivetran") class FivetranAccessor: """Accessor for Fivetran data processing functions."""
[docs] def __init__(self, obj: pd.DataFrame) -> None: self._obj = obj
[docs] @copy_docstring(process_fivetran_ad_reporting) def process_ad_reporting( # noqa: D102 self, value_columns: str | Sequence[str] = "impressions", *, date_col: str = "date_day", platform_col: str = "platform", agg: str = "sum", fill_value: float | None = 0.0, include_missing_dates: bool = False, freq: str = "D", rename_date_to: str | None = "date", ) -> pd.DataFrame: return process_fivetran_ad_reporting( self._obj, value_columns=value_columns, date_col=date_col, platform_col=platform_col, agg=agg, fill_value=fill_value, include_missing_dates=include_missing_dates, freq=freq, rename_date_to=rename_date_to, )
[docs] @copy_docstring(process_fivetran_shopify_unique_orders) def process_shopify_unique_orders( # noqa: D102 self, *, date_col: str = "processed_timestamp", order_key_col: str = "orders_unique_key", rename_date_to: str = "date", ) -> pd.DataFrame: return process_fivetran_shopify_unique_orders( self._obj, date_col=date_col, order_key_col=order_key_col, rename_date_to=rename_date_to, )