Skip to content

Data

The fundcloud.data package is the single entry point for loading and persisting market data. Every backend implements the same Backend protocol — reads always work, writes are gated by a read_only constructor flag. The Catalog composes a source backend onto a sink backend with watermark-driven incremental refresh. For the task-first walkthrough, start with the Pulling and caching market data guide.

fundcloud.data

Market data — unified Backend abstraction + Catalog orchestrator.

Every data backend (network providers like :class:YF, :class:FMP, :class:AV, :class:Binance; local format backends like :class:CSV, :class:Parquet, :class:DuckDB, :class:Memory) implements the single :class:Backend protocol. Reads always work; writes are gated by the read_only constructor flag and raise :class:ReadOnlyError when locked.

:class:Catalog binds named datasets to (source, sink) pairs and handles incremental refresh from sink watermarks via :meth:Backend.sync_to.

Network backends are lazy-imported via :func:__getattr__ so installs without yfinance / ccxt / httpx keep working.

WriteMode module-attribute

WriteMode = Literal[
    "overwrite", "append", "upsert", "error"
]

OHLCV_COLUMNS module-attribute

OHLCV_COLUMNS: tuple[str, ...] = (
    "open",
    "high",
    "low",
    "close",
    "volume",
)

Canonical order of the standard OHLCV fields.

Backend

Bases: Protocol

Unified protocol for any data backend.

Every backend is readable. Backends with read_only=False also accept :meth:write and :meth:delete. The key argument is the logical dataset name; single-key sources accept key=None.

BaseBackend

Bases: ABC

Default implementations shared by every concrete backend.

Subclasses must implement :meth:read and set the name ClassVar. Writable backends override :meth:write and :meth:delete (and call :meth:_check_writable first).

sync_to

sync_to(
    sink: Backend,
    *,
    key: str | None = None,
    source_key: str | None = None,
    start: Timestamp | str | None = None,
    end: Timestamp | str | None = None,
    mode: WriteMode = "upsert",
) -> pd.DataFrame

Read from self and write the result to sink under key.

key is the sink key (where to land the data). source_key is the source key (where to read from); defaults to None so the source picks its own canonical frame (e.g. a network backend ignores the key, a single-frame format backend resolves to its lone entry).

Source code in python/fundcloud/data/_base.py
def sync_to(
    self,
    sink: Backend,
    *,
    key: str | None = None,
    source_key: str | None = None,
    start: pd.Timestamp | str | None = None,
    end: pd.Timestamp | str | None = None,
    mode: WriteMode = "upsert",
) -> pd.DataFrame:
    """Read from ``self`` and write the result to ``sink`` under ``key``.

    ``key`` is the *sink* key (where to land the data). ``source_key`` is
    the *source* key (where to read from); defaults to ``None`` so the
    source picks its own canonical frame (e.g. a network backend ignores
    the key, a single-frame format backend resolves to its lone entry).
    """
    df = self.read(source_key, start=start, end=end)
    target_key = (
        key
        if key is not None
        else (source_key if source_key is not None else self._default_key())
    )
    sink.write(target_key, df, mode=mode)
    return df

ReadOnlyError

Bases: RuntimeError

Raised when write / delete is called on a read-only backend.

YF

YF(
    symbols: Sequence[str] | str,
    *,
    interval: str = "1d",
    adjust: bool = True,
)

Bases: BaseBackend

Pull OHLCV bars from Yahoo Finance.

Free, keyless, and best-effort. Under rate-limit or outage, errors propagate with no automatic fallback — treat it as a local-dev convenience rather than a production data plane.

Parameters:

Name Type Description Default
symbols Sequence[str] | str

One ticker (string) or many (sequence).

required
interval str

Bar interval; one of 1m, 5m, 15m, 30m, 1h, 1d, 1wk, 1mo.

'1d'
adjust bool

Default True. yfinance applies dividend + split adjustments to the close column. Set to False to get the raw, as-traded prices (which then includes a separate adj_close field).

True
Source code in python/fundcloud/data/yf.py
def __init__(
    self,
    symbols: Sequence[str] | str,
    *,
    interval: str = "1d",
    adjust: bool = True,
) -> None:
    self.symbols = [symbols] if isinstance(symbols, str) else list(symbols)
    if not self.symbols:
        raise ValueError("YF requires at least one symbol")
    if interval not in _YF_INTERVAL_MAP:
        msg = f"interval {interval!r} not supported by YF"
        raise ValueError(msg)
    self.interval = interval
    self.adjust = adjust

FMP

FMP(
    symbols: Sequence[str] | str,
    *,
    interval: str = "1d",
    adjust: bool = True,
    api_key: str | None = None,
    base_url: str = _FMP_BASE_URL,
)

Bases: BaseBackend

Pull OHLCV bars from the FinancialModelingPrep REST API.

Parameters:

Name Type Description Default
symbols Sequence[str] | str

One ticker (string) or many (sequence).

required
interval str

One of 1m, 5m, 15m, 30m, 1h, 4h, 1d.

'1d'
adjust bool

Default True. The FMP daily endpoint returns both the raw close and a dividend/split-adjusted close (adjClose). When True, the adjusted value is published under the canonical close column. Set to False to keep the raw, as-traded prices.

True
api_key str | None

Falls back to the FMP_API_KEY env var.

None
base_url str

Override the default FMP endpoint (useful for tests).

_FMP_BASE_URL
Source code in python/fundcloud/data/fmp.py
def __init__(
    self,
    symbols: Sequence[str] | str,
    *,
    interval: str = "1d",
    adjust: bool = True,
    api_key: str | None = None,
    base_url: str = _FMP_BASE_URL,
) -> None:
    self.symbols = [symbols] if isinstance(symbols, str) else list(symbols)
    if not self.symbols:
        raise ValueError("FMP requires at least one symbol")
    if interval not in _FMP_INTERVAL_MAP:
        msg = f"interval {interval!r} not supported by FMP"
        raise ValueError(msg)
    self.interval = interval
    self.adjust = adjust
    self._api_key = api_key or os.environ.get("FMP_API_KEY")
    if not self._api_key:
        msg = (
            "FMP requires an API key. Pass `api_key=` or set the "
            "FMP_API_KEY environment variable."
        )
        raise ValueError(msg)
    self._base_url = base_url

AV

AV(
    symbols: Sequence[str] | str,
    *,
    interval: str = "1d",
    adjust: bool = True,
    api_key: str | None = None,
    base_url: str = _AV_BASE_URL,
)

Bases: BaseBackend

Pull daily / weekly / monthly OHLCV bars from Alpha Vantage.

Parameters:

Name Type Description Default
symbols Sequence[str] | str

One ticker (string) or many (sequence).

required
interval str

One of 1d, 1wk, 1mo.

'1d'
adjust bool

Default True. Hits the TIME_SERIES_*_ADJUSTED endpoint and publishes the dividend/split-adjusted close under the canonical close column. Set to False for raw, as-traded prices via the unadjusted endpoint family (TIME_SERIES_DAILY etc., free-tier friendly).

True
api_key str | None

Falls back to ALPHAVANTAGE_API_KEY or ALPHA_VANTAGE_API_KEY env var.

None
base_url str

Override the default endpoint (useful for tests).

_AV_BASE_URL
Source code in python/fundcloud/data/av.py
def __init__(
    self,
    symbols: Sequence[str] | str,
    *,
    interval: str = "1d",
    adjust: bool = True,
    api_key: str | None = None,
    base_url: str = _AV_BASE_URL,
) -> None:
    self.symbols = [symbols] if isinstance(symbols, str) else list(symbols)
    if not self.symbols:
        raise ValueError("AV requires at least one symbol")
    if (interval, True) not in _AV_FUNCTION_MAP:
        msg = f"interval {interval!r} not supported by AV"
        raise ValueError(msg)
    self.interval = interval
    self.adjust = adjust
    self._api_key = (
        api_key
        or os.environ.get("ALPHAVANTAGE_API_KEY")
        or os.environ.get("ALPHA_VANTAGE_API_KEY")
    )
    if not self._api_key:
        msg = (
            "AV requires an API key. Pass `api_key=` or set the "
            "ALPHAVANTAGE_API_KEY (or ALPHA_VANTAGE_API_KEY) environment variable."
        )
        raise ValueError(msg)
    self._base_url = base_url

Binance

Binance(
    symbols: Sequence[str] | str,
    *,
    interval: str = "1d",
    limit: int = 1000,
    sandbox: bool = False,
)

Bases: BaseBackend

Fetch spot OHLCV bars from Binance via ccxt.

Source code in python/fundcloud/data/binance.py
def __init__(
    self,
    symbols: Sequence[str] | str,
    *,
    interval: str = "1d",
    limit: int = 1000,
    sandbox: bool = False,
) -> None:
    self.symbols = [symbols] if isinstance(symbols, str) else list(symbols)
    if not self.symbols:
        raise ValueError("Binance requires at least one symbol")
    if interval not in _CCXT_INTERVAL_MAP:
        msg = f"interval {interval!r} not supported by Binance"
        raise ValueError(msg)
    self.interval = interval
    self.limit = int(limit)
    self.sandbox = bool(sandbox)
    self._client: Any | None = None

CSV

CSV(
    path: str | Path,
    *,
    symbols: Sequence[str] | None = None,
    date_col: str = "date",
    read_only: bool = True,
)

Bases: BaseBackend

Read one or many CSV files into a single frame.

Source code in python/fundcloud/data/csv.py
def __init__(
    self,
    path: str | Path,
    *,
    symbols: Sequence[str] | None = None,
    date_col: str = "date",
    read_only: bool = True,
) -> None:
    self.path = Path(path)
    self.symbols = list(symbols) if symbols is not None else []
    self.date_col = date_col
    self.read_only = read_only

Parquet

Parquet(root: str | Path, *, read_only: bool = False)

Bases: BaseBackend

Per-key parquet files under a root directory.

Source code in python/fundcloud/data/parquet.py
def __init__(self, root: str | Path, *, read_only: bool = False) -> None:
    self.root = Path(root)
    self.read_only = read_only
    self.root.mkdir(parents=True, exist_ok=True)

DuckDB

DuckDB(path: str | Path, *, read_only: bool = False)

Bases: BaseBackend

Persist frames as DuckDB tables inside a single database file.

Source code in python/fundcloud/data/duckdb.py
def __init__(self, path: str | Path, *, read_only: bool = False) -> None:
    self.path = Path(path)
    self.read_only = read_only
    self.path.parent.mkdir(parents=True, exist_ok=True)
    self._con = duckdb.connect(str(self.path), read_only=read_only)

Memory

Memory(
    initial: Mapping[str, DataFrame] | None = None,
    *,
    read_only: bool = False,
)

Bases: BaseBackend

Dict-backed :class:fundcloud.data.Backend.

Source code in python/fundcloud/data/memory.py
def __init__(
    self,
    initial: Mapping[str, pd.DataFrame] | None = None,
    *,
    read_only: bool = False,
) -> None:
    self.read_only = read_only
    self._data: dict[str, pd.DataFrame] = {}
    if initial:
        for key, df in initial.items():
            if not isinstance(df.index, pd.DatetimeIndex):
                msg = f"frame for key {key!r} must have a DatetimeIndex"
                raise TypeError(msg)
            self._data[key] = df.sort_index().copy()

Catalog

Catalog(store: Backend)

A named collection of datasets sharing a single sink :class:Backend.

Source code in python/fundcloud/data/catalog.py
def __init__(self, store: Backend) -> None:
    self._store = store
    self._specs: dict[str, DatasetSpec] = {}

describe

describe(name: str | None = None) -> pd.DataFrame

Produce a one-row-per-dataset summary frame.

Source code in python/fundcloud/data/catalog.py
def describe(self, name: str | None = None) -> pd.DataFrame:
    """Produce a one-row-per-dataset summary frame."""
    specs = [self._specs[name]] if name is not None else list(self._specs.values())
    rows = []
    now = datetime.now(timezone.utc)
    for spec in specs:
        last = self._store.last_index(spec.store_key)
        if last is not None:
            ts = pd.Timestamp(last)
            utc_ts = ts.tz_convert("UTC") if ts.tzinfo is not None else ts.tz_localize("UTC")
            lag = now - utc_ts
        else:
            lag = None
        rows.append({
            "name": spec.name,
            "source": _qualname(spec.source),
            "symbols": getattr(spec.source, "symbols", []),
            "interval": getattr(spec.source, "interval", None),
            "store_key": spec.store_key,
            "last_index": last,
            "lag": lag,
            "tags": list(spec.tags),
        })
    return pd.DataFrame(rows)

from_spec classmethod

from_spec(
    store: Backend, spec: Mapping[str, Mapping[str, Any]]
) -> Catalog

Build a catalog from a mapping of {name: {source, source_kwargs, ...}}.

Source code in python/fundcloud/data/catalog.py
@classmethod
def from_spec(cls, store: Backend, spec: Mapping[str, Mapping[str, Any]]) -> Catalog:
    """Build a catalog from a mapping of ``{name: {source, source_kwargs, ...}}``."""
    cat = cls(store=store)
    for name, row in spec.items():
        source_cls = _import_dotted(row["source"])
        source = source_cls(**row.get("source_kwargs", {}))
        cat.register(
            name,
            source,
            store_key=row.get("store_key"),
            refresh_kwargs=row.get("refresh_kwargs", {}),
            tags=tuple(row.get("tags", [])),
        )
    return cat

load

load(
    name: str,
    *,
    start: Timestamp | str | None = None,
    end: Timestamp | str | None = None,
    prefer_store: bool = True,
) -> pd.DataFrame

Return rows for name.

When prefer_store=True and the sink already has the dataset, the sink is the source of truth (callers who want fresh data should call :meth:refresh first). Otherwise the source is pulled and the result is persisted before being returned.

Call-site start / end win over refresh_kwargs.start / refresh_kwargs.end; both are forwarded to the underlying :meth:Backend.read.

Source code in python/fundcloud/data/catalog.py
def load(
    self,
    name: str,
    *,
    start: pd.Timestamp | str | None = None,
    end: pd.Timestamp | str | None = None,
    prefer_store: bool = True,
) -> pd.DataFrame:
    """Return rows for ``name``.

    When ``prefer_store=True`` and the sink already has the dataset, the
    sink is the source of truth (callers who want fresh data should call
    :meth:`refresh` first). Otherwise the source is pulled and the result
    is persisted before being returned.

    Call-site ``start`` / ``end`` win over ``refresh_kwargs.start`` /
    ``refresh_kwargs.end``; both are forwarded to the underlying
    :meth:`Backend.read`.
    """
    spec = self.spec(name)
    kw = spec.refresh_kwargs
    eff_start = start if start is not None else kw.get("start")
    eff_end = end if end is not None else kw.get("end")

    if prefer_store and self._store.exists(spec.store_key):
        return self._store.read(spec.store_key, start=eff_start, end=eff_end)
    df = spec.source.read(start=eff_start, end=eff_end)
    if start is None and end is None:
        self._store.write(spec.store_key, df, mode="overwrite")
    return df

refresh

refresh(
    name: str, *, end: Timestamp | str | None = None
) -> pd.DataFrame

Pull incremental rows for name and upsert them into the sink.

The sink watermark (last_index) is the default start. If :attr:DatasetSpec.refresh_kwargs carries a lookback window, it is subtracted from the watermark so recently-corrected rows get re-pulled and deduplicated by mode='upsert'.

Source code in python/fundcloud/data/catalog.py
def refresh(
    self,
    name: str,
    *,
    end: pd.Timestamp | str | None = None,
) -> pd.DataFrame:
    """Pull incremental rows for ``name`` and upsert them into the sink.

    The sink watermark (``last_index``) is the default ``start``. If
    :attr:`DatasetSpec.refresh_kwargs` carries a ``lookback`` window, it
    is subtracted from the watermark so recently-corrected rows get
    re-pulled and deduplicated by ``mode='upsert'``.
    """
    spec = self.spec(name)
    last = self._store.last_index(spec.store_key)
    kw = spec.refresh_kwargs

    if last is not None:
        lookback = pd.Timedelta(kw.get("lookback", 0))
        start: pd.Timestamp | str | None = last - lookback
    else:
        start = kw.get("start")

    eff_end = end if end is not None else kw.get("end")
    return spec.source.sync_to(
        self._store,
        key=spec.store_key,
        start=start,
        end=eff_end,
        mode="upsert",
    )

refresh_all

refresh_all(
    *,
    end: Timestamp | str | None = None,
    tags: tuple[str, ...] | None = None,
) -> dict[str, pd.DataFrame]

Refresh every (optionally tag-filtered) dataset.

Source code in python/fundcloud/data/catalog.py
def refresh_all(
    self,
    *,
    end: pd.Timestamp | str | None = None,
    tags: tuple[str, ...] | None = None,
) -> dict[str, pd.DataFrame]:
    """Refresh every (optionally tag-filtered) dataset."""
    out: dict[str, pd.DataFrame] = {}
    for name, spec in self._specs.items():
        if tags is not None and not set(tags).issubset(set(spec.tags)):
            continue
        out[name] = self.refresh(name, end=end)
    return out

register

register(
    name: str,
    source: Backend,
    *,
    store_key: str | None = None,
    refresh_kwargs: Mapping[str, Any] | None = None,
    tags: tuple[str, ...] = (),
) -> DatasetSpec

Register a dataset. Returns the resulting :class:DatasetSpec.

Source code in python/fundcloud/data/catalog.py
def register(
    self,
    name: str,
    source: Backend,
    *,
    store_key: str | None = None,
    refresh_kwargs: Mapping[str, Any] | None = None,
    tags: tuple[str, ...] = (),
) -> DatasetSpec:
    """Register a dataset. Returns the resulting :class:`DatasetSpec`."""
    if name in self._specs:
        msg = f"dataset {name!r} already registered"
        raise ValueError(msg)
    spec = DatasetSpec(
        name=name,
        source=source,
        store_key=store_key or name,
        refresh_kwargs=dict(refresh_kwargs or {}),
        tags=tuple(tags),
    )
    self._specs[name] = spec
    return spec

to_spec

to_spec() -> dict[str, dict[str, Any]]

Serialise the catalog into a dict that from_spec can read.

Sources are referenced by their fully-qualified class name plus their constructor kwargs. Callers are responsible for making sure the referenced modules are importable when from_spec runs.

Source code in python/fundcloud/data/catalog.py
def to_spec(self) -> dict[str, dict[str, Any]]:
    """Serialise the catalog into a dict that ``from_spec`` can read.

    Sources are referenced by their fully-qualified class name plus their
    constructor kwargs. Callers are responsible for making sure the
    referenced modules are importable when ``from_spec`` runs.
    """
    out: dict[str, dict[str, Any]] = {}
    for name, spec in self._specs.items():
        out[name] = {
            "source": _qualname(spec.source),
            "source_kwargs": _source_kwargs(spec.source),
            "store_key": spec.store_key,
            "refresh_kwargs": dict(spec.refresh_kwargs),
            "tags": list(spec.tags),
        }
    return out

DatasetSpec dataclass

DatasetSpec(
    name: str,
    source: Backend,
    store_key: str,
    refresh_kwargs: dict[str, Any] = dict(),
    tags: tuple[str, ...] = (),
)

Declarative binding for a single dataset.

normalize_field

normalize_field(name: str) -> str

Coerce a column field name to lowercase snake_case.

Examples:

>>> normalize_field("Open")
'open'
>>> normalize_field("CLOSE")
'close'
>>> normalize_field("AdjClose")
'adj_close'
>>> normalize_field("Adj Close")
'adj_close'
>>> normalize_field("VWAP")
'vwap'
>>> normalize_field("HTTPRequest")
'http_request'
Source code in python/fundcloud/data/_columns.py
def normalize_field(name: str) -> str:
    """Coerce a column field name to lowercase snake_case.

    Examples
    --------
    >>> normalize_field("Open")
    'open'
    >>> normalize_field("CLOSE")
    'close'
    >>> normalize_field("AdjClose")
    'adj_close'
    >>> normalize_field("Adj Close")
    'adj_close'
    >>> normalize_field("VWAP")
    'vwap'
    >>> normalize_field("HTTPRequest")
    'http_request'
    """
    s = str(name).strip()
    s = _ACRONYM_RE.sub(r"\1_\2", s)
    s = _CAMEL_RE.sub(r"\1_\2", s)
    s = s.lower().replace(" ", "_").replace("-", "_")
    s = _REPEAT_RE.sub("_", s).strip("_")
    return s

normalize_ohlcv_columns

normalize_ohlcv_columns(df: DataFrame) -> pd.DataFrame

Lowercase + snake_case every column field name.

Handles both flat columns and the canonical (field, symbol) MultiIndex layout. Returns the same frame (column relabel is in place); pass a copy in if the caller cares about identity.

Source code in python/fundcloud/data/_columns.py
def normalize_ohlcv_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Lowercase + snake_case every column field name.

    Handles both flat columns and the canonical ``(field, symbol)``
    MultiIndex layout. Returns the same frame (column relabel is in
    place); pass a copy in if the caller cares about identity.
    """
    if len(df.columns) == 0:
        return df
    if isinstance(df.columns, pd.MultiIndex):
        df.columns = pd.MultiIndex.from_tuples([
            (normalize_field(field), sym) for field, sym in df.columns
        ])
    else:
        df.columns = [normalize_field(c) for c in df.columns]
    return df

canonicalize_ohlcv_order

canonicalize_ohlcv_order(df: DataFrame) -> pd.DataFrame

Reorder columns so OHLCV fields appear in canonical order.

Non-OHLCV fields are kept after the canonical ones, in the order they already appear. Works for both flat and (field, symbol) MultiIndex layouts. Returns the reordered frame.

Source code in python/fundcloud/data/_columns.py
def canonicalize_ohlcv_order(df: pd.DataFrame) -> pd.DataFrame:
    """Reorder columns so OHLCV fields appear in canonical order.

    Non-OHLCV fields are kept after the canonical ones, in the order they
    already appear. Works for both flat and ``(field, symbol)`` MultiIndex
    layouts. Returns the reordered frame.
    """
    if len(df.columns) == 0:
        return df
    if isinstance(df.columns, pd.MultiIndex):
        symbols: list[str] = list(dict.fromkeys(sym for _, sym in df.columns))
        present_fields = list(dict.fromkeys(field for field, _ in df.columns))
        ordered_fields = [f for f in OHLCV_COLUMNS if f in present_fields]
        ordered_fields += [f for f in present_fields if f not in OHLCV_COLUMNS]
        new_cols = [
            (field, sym)
            for field in ordered_fields
            for sym in symbols
            if (field, sym) in df.columns
        ]
        return df.reindex(columns=pd.MultiIndex.from_tuples(new_cols))
    present = list(df.columns)
    ordered = [c for c in OHLCV_COLUMNS if c in present]
    ordered += [c for c in present if c not in OHLCV_COLUMNS]
    return df.reindex(columns=ordered)

fundcloud.data.bars

OHLCV (Bars) utilities — conversion, alignment, resampling.

These are free functions over plain pandas structures. They encode the canonical data shapes (Bars: DatetimeIndex + top-level OHLCV column labels, optionally a second-level per-asset index) that the rest of the library relies on.

align

align(
    *frames: DataFrame,
    how: Literal["inner", "outer"] = "inner",
) -> list[pd.DataFrame]

Align multiple wide frames onto the same index (and columns).

Useful for combining prices + factors + signals before optimisation.

Source code in python/fundcloud/data/bars.py
def align(*frames: pd.DataFrame, how: Literal["inner", "outer"] = "inner") -> list[pd.DataFrame]:
    """Align multiple wide frames onto the same index (and columns).

    Useful for combining prices + factors + signals before optimisation.
    """
    if not frames:
        return []
    aligned_index = frames[0].index
    aligned_cols = frames[0].columns
    op = aligned_index.intersection if how == "inner" else aligned_index.union
    col_op = aligned_cols.intersection if how == "inner" else aligned_cols.union
    for f in frames[1:]:
        aligned_index = op(f.index)
        aligned_cols = col_op(f.columns)
    return [f.reindex(index=aligned_index, columns=aligned_cols) for f in frames]

as_long

as_long(
    wide: DataFrame, *, value_name: str = "value"
) -> pd.DataFrame

Melt a wide (date × asset) frame to long (date, asset, value).

Source code in python/fundcloud/data/bars.py
def as_long(wide: pd.DataFrame, *, value_name: str = "value") -> pd.DataFrame:
    """Melt a wide (date × asset) frame to long (date, asset, value)."""
    _require_datetime_index(wide, "wide")
    out = wide.stack(future_stack=True).rename(value_name).reset_index()
    out.columns = ["ts", "asset", value_name]
    return out

as_wide

as_wide(
    long: DataFrame,
    *,
    ts: str = "ts",
    asset: str = "asset",
    value: str = "value",
) -> pd.DataFrame

Pivot a long (ts, asset, value) frame to wide (date × asset).

Source code in python/fundcloud/data/bars.py
def as_wide(
    long: pd.DataFrame,
    *,
    ts: str = "ts",
    asset: str = "asset",
    value: str = "value",
) -> pd.DataFrame:
    """Pivot a long (ts, asset, value) frame to wide (date × asset)."""
    wide = long.pivot(index=ts, columns=asset, values=value)
    wide.index = pd.DatetimeIndex(wide.index)
    wide.columns.name = None
    return cast(pd.DataFrame, wide.sort_index())

resample

resample(
    bars: DataFrame,
    rule: str,
    *,
    agg: dict[str, str] | None = None,
) -> pd.DataFrame

Resample a Bars frame to a coarser frequency.

Defaults apply the standard OHLCV aggregation: first/max/min/last/sum for open/high/low/close/volume, and last for any other column.

Source code in python/fundcloud/data/bars.py
def resample(
    bars: pd.DataFrame,
    rule: str,
    *,
    agg: dict[str, str] | None = None,
) -> pd.DataFrame:
    """Resample a ``Bars`` frame to a coarser frequency.

    Defaults apply the standard OHLCV aggregation: first/max/min/last/sum for
    open/high/low/close/volume, and last for any other column.
    """
    _require_datetime_index(bars, "bars")
    ohlcv_agg = {"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"}

    if isinstance(bars.columns, pd.MultiIndex):
        # MultiIndex (field, symbol): aggregate per column, keyed by level-0 field.
        if agg is None:
            per_col = {c: ohlcv_agg.get(str(c[0]).lower(), "last") for c in bars.columns}
        else:
            per_col = {c: agg.get(str(c[0]).lower(), "last") for c in bars.columns}
        return bars.resample(rule).agg(per_col).dropna(how="all")

    if agg is None:
        agg = {c: ohlcv_agg.get(str(c).lower(), "last") for c in bars.columns}
    return bars.resample(rule).agg(agg).dropna(how="all")  # type: ignore[arg-type]

to_log_returns

to_log_returns(
    prices_or_bars: DataFrame | Series,
    *,
    field: PriceField = "close",
    dropna: bool = True,
) -> pd.DataFrame | pd.Series

Convenience alias for to_returns(..., method='log').

Source code in python/fundcloud/data/bars.py
def to_log_returns(
    prices_or_bars: pd.DataFrame | pd.Series,
    *,
    field: PriceField = "close",
    dropna: bool = True,
) -> pd.DataFrame | pd.Series:
    """Convenience alias for ``to_returns(..., method='log')``."""
    return to_returns(prices_or_bars, field=field, method="log", dropna=dropna)

to_prices

to_prices(
    bars: DataFrame, field: PriceField = "close"
) -> pd.DataFrame

Extract a wide per-asset price panel from a Bars frame.

Parameters:

Name Type Description Default
bars DataFrame

Either a wide frame whose columns are asset names (then returned as is, cast to float), or a frame with a two-level column index where the top level is the OHLCV field.

required
field PriceField

Which field to pull when bars has a MultiIndex on the columns.

'close'
Source code in python/fundcloud/data/bars.py
def to_prices(bars: pd.DataFrame, field: PriceField = "close") -> pd.DataFrame:
    """Extract a wide per-asset price panel from a ``Bars`` frame.

    Parameters
    ----------
    bars
        Either a wide frame whose columns *are* asset names (then returned as
        is, cast to float), or a frame with a two-level column index where the
        top level is the OHLCV field.
    field
        Which field to pull when ``bars`` has a MultiIndex on the columns.
    """
    _require_datetime_index(bars, "bars")

    if isinstance(bars.columns, pd.MultiIndex):
        if field not in bars.columns.get_level_values(0):
            msg = f"field '{field}' not found in columns {list(bars.columns.levels[0])}"
            raise KeyError(msg)
        prices = cast(pd.DataFrame, bars.xs(field, axis=1, level=0))
    else:
        prices = bars

    return cast(pd.DataFrame, prices.astype(float).sort_index())

to_returns

to_returns(
    prices_or_bars: DataFrame | Series,
    *,
    field: PriceField = "close",
    method: Literal["simple", "log"] = "simple",
    dropna: bool = True,
) -> pd.DataFrame | pd.Series

Convert prices to period returns.

Accepts either a wide price panel, a Bars DataFrame, or a single price Series. Returns have the same shape and index as the input, minus the first row if dropna is True.

Source code in python/fundcloud/data/bars.py
def to_returns(
    prices_or_bars: pd.DataFrame | pd.Series,
    *,
    field: PriceField = "close",
    method: Literal["simple", "log"] = "simple",
    dropna: bool = True,
) -> pd.DataFrame | pd.Series:
    """Convert prices to period returns.

    Accepts either a wide price panel, a ``Bars`` DataFrame, or a single price
    ``Series``. Returns have the same shape and index as the input, minus the
    first row if ``dropna`` is True.
    """
    if isinstance(prices_or_bars, pd.Series):
        _require_datetime_index(prices_or_bars, "prices")
        prices_s = prices_or_bars.astype(float).sort_index()
        if method == "log":
            raw = np.log(prices_s / prices_s.shift(1))
        else:
            # Explicit ``fill_method=None`` prevents pandas' forward-fill default
            # (deprecated in pandas 2.1) from fabricating zero-returns on NaN
            # bars — essential when the panel mixes 5-day equities with 7-day
            # crypto, where NaN weekends must stay NaN, not become 0%.
            raw = prices_s.pct_change(fill_method=None)
        r = pd.Series(raw, index=prices_s.index, name=prices_s.name)
        return r.dropna() if dropna else r

    prices = to_prices(prices_or_bars, field=field)
    if method == "log":
        ret = np.log(prices / prices.shift(1))
    else:
        ret = prices.pct_change(fill_method=None)
    return ret.dropna(how="all") if dropna else ret

fundcloud.data.catalog

Catalog — name → (source Backend, sink Backend key, refresh policy).

A Catalog is the orchestrator that binds a user-facing dataset name to a read backend (the source) and a key inside the catalog's sink backend (the cache). Refreshes call :meth:Backend.sync_to with mode='upsert' so overlapping rows from re-pulls dedup on the timestamp index.

Per-dataset overrides are persisted in :attr:DatasetSpec.refresh_kwargs with these recognised keys:

  • start: minimum date to pull on initial load.
  • end: maximum date (rare; usually omitted).
  • lookback: pd.Timedelta-compatible window subtracted from the sink watermark on :meth:refresh. Used to re-pull recent rows that upstream may correct (corporate actions, restatements, exchange revisions).

The spec format is a plain Python dict — YAML is out of scope on purpose (callers do yaml.safe_load(path.read_text()) and pass the dict in). See :meth:Catalog.to_spec / :meth:Catalog.from_spec for round-trippable serialisation.

Catalog

Catalog(store: Backend)

A named collection of datasets sharing a single sink :class:Backend.

Source code in python/fundcloud/data/catalog.py
def __init__(self, store: Backend) -> None:
    self._store = store
    self._specs: dict[str, DatasetSpec] = {}

describe

describe(name: str | None = None) -> pd.DataFrame

Produce a one-row-per-dataset summary frame.

Source code in python/fundcloud/data/catalog.py
def describe(self, name: str | None = None) -> pd.DataFrame:
    """Produce a one-row-per-dataset summary frame."""
    specs = [self._specs[name]] if name is not None else list(self._specs.values())
    rows = []
    now = datetime.now(timezone.utc)
    for spec in specs:
        last = self._store.last_index(spec.store_key)
        if last is not None:
            ts = pd.Timestamp(last)
            utc_ts = ts.tz_convert("UTC") if ts.tzinfo is not None else ts.tz_localize("UTC")
            lag = now - utc_ts
        else:
            lag = None
        rows.append({
            "name": spec.name,
            "source": _qualname(spec.source),
            "symbols": getattr(spec.source, "symbols", []),
            "interval": getattr(spec.source, "interval", None),
            "store_key": spec.store_key,
            "last_index": last,
            "lag": lag,
            "tags": list(spec.tags),
        })
    return pd.DataFrame(rows)

from_spec classmethod

from_spec(
    store: Backend, spec: Mapping[str, Mapping[str, Any]]
) -> Catalog

Build a catalog from a mapping of {name: {source, source_kwargs, ...}}.

Source code in python/fundcloud/data/catalog.py
@classmethod
def from_spec(cls, store: Backend, spec: Mapping[str, Mapping[str, Any]]) -> Catalog:
    """Build a catalog from a mapping of ``{name: {source, source_kwargs, ...}}``."""
    cat = cls(store=store)
    for name, row in spec.items():
        source_cls = _import_dotted(row["source"])
        source = source_cls(**row.get("source_kwargs", {}))
        cat.register(
            name,
            source,
            store_key=row.get("store_key"),
            refresh_kwargs=row.get("refresh_kwargs", {}),
            tags=tuple(row.get("tags", [])),
        )
    return cat

load

load(
    name: str,
    *,
    start: Timestamp | str | None = None,
    end: Timestamp | str | None = None,
    prefer_store: bool = True,
) -> pd.DataFrame

Return rows for name.

When prefer_store=True and the sink already has the dataset, the sink is the source of truth (callers who want fresh data should call :meth:refresh first). Otherwise the source is pulled and the result is persisted before being returned.

Call-site start / end win over refresh_kwargs.start / refresh_kwargs.end; both are forwarded to the underlying :meth:Backend.read.

Source code in python/fundcloud/data/catalog.py
def load(
    self,
    name: str,
    *,
    start: pd.Timestamp | str | None = None,
    end: pd.Timestamp | str | None = None,
    prefer_store: bool = True,
) -> pd.DataFrame:
    """Return rows for ``name``.

    When ``prefer_store=True`` and the sink already has the dataset, the
    sink is the source of truth (callers who want fresh data should call
    :meth:`refresh` first). Otherwise the source is pulled and the result
    is persisted before being returned.

    Call-site ``start`` / ``end`` win over ``refresh_kwargs.start`` /
    ``refresh_kwargs.end``; both are forwarded to the underlying
    :meth:`Backend.read`.
    """
    spec = self.spec(name)
    kw = spec.refresh_kwargs
    eff_start = start if start is not None else kw.get("start")
    eff_end = end if end is not None else kw.get("end")

    if prefer_store and self._store.exists(spec.store_key):
        return self._store.read(spec.store_key, start=eff_start, end=eff_end)
    df = spec.source.read(start=eff_start, end=eff_end)
    if start is None and end is None:
        self._store.write(spec.store_key, df, mode="overwrite")
    return df

refresh

refresh(
    name: str, *, end: Timestamp | str | None = None
) -> pd.DataFrame

Pull incremental rows for name and upsert them into the sink.

The sink watermark (last_index) is the default start. If :attr:DatasetSpec.refresh_kwargs carries a lookback window, it is subtracted from the watermark so recently-corrected rows get re-pulled and deduplicated by mode='upsert'.

Source code in python/fundcloud/data/catalog.py
def refresh(
    self,
    name: str,
    *,
    end: pd.Timestamp | str | None = None,
) -> pd.DataFrame:
    """Pull incremental rows for ``name`` and upsert them into the sink.

    The sink watermark (``last_index``) is the default ``start``. If
    :attr:`DatasetSpec.refresh_kwargs` carries a ``lookback`` window, it
    is subtracted from the watermark so recently-corrected rows get
    re-pulled and deduplicated by ``mode='upsert'``.
    """
    spec = self.spec(name)
    last = self._store.last_index(spec.store_key)
    kw = spec.refresh_kwargs

    if last is not None:
        lookback = pd.Timedelta(kw.get("lookback", 0))
        start: pd.Timestamp | str | None = last - lookback
    else:
        start = kw.get("start")

    eff_end = end if end is not None else kw.get("end")
    return spec.source.sync_to(
        self._store,
        key=spec.store_key,
        start=start,
        end=eff_end,
        mode="upsert",
    )

refresh_all

refresh_all(
    *,
    end: Timestamp | str | None = None,
    tags: tuple[str, ...] | None = None,
) -> dict[str, pd.DataFrame]

Refresh every (optionally tag-filtered) dataset.

Source code in python/fundcloud/data/catalog.py
def refresh_all(
    self,
    *,
    end: pd.Timestamp | str | None = None,
    tags: tuple[str, ...] | None = None,
) -> dict[str, pd.DataFrame]:
    """Refresh every (optionally tag-filtered) dataset."""
    out: dict[str, pd.DataFrame] = {}
    for name, spec in self._specs.items():
        if tags is not None and not set(tags).issubset(set(spec.tags)):
            continue
        out[name] = self.refresh(name, end=end)
    return out

register

register(
    name: str,
    source: Backend,
    *,
    store_key: str | None = None,
    refresh_kwargs: Mapping[str, Any] | None = None,
    tags: tuple[str, ...] = (),
) -> DatasetSpec

Register a dataset. Returns the resulting :class:DatasetSpec.

Source code in python/fundcloud/data/catalog.py
def register(
    self,
    name: str,
    source: Backend,
    *,
    store_key: str | None = None,
    refresh_kwargs: Mapping[str, Any] | None = None,
    tags: tuple[str, ...] = (),
) -> DatasetSpec:
    """Register a dataset. Returns the resulting :class:`DatasetSpec`."""
    if name in self._specs:
        msg = f"dataset {name!r} already registered"
        raise ValueError(msg)
    spec = DatasetSpec(
        name=name,
        source=source,
        store_key=store_key or name,
        refresh_kwargs=dict(refresh_kwargs or {}),
        tags=tuple(tags),
    )
    self._specs[name] = spec
    return spec

to_spec

to_spec() -> dict[str, dict[str, Any]]

Serialise the catalog into a dict that from_spec can read.

Sources are referenced by their fully-qualified class name plus their constructor kwargs. Callers are responsible for making sure the referenced modules are importable when from_spec runs.

Source code in python/fundcloud/data/catalog.py
def to_spec(self) -> dict[str, dict[str, Any]]:
    """Serialise the catalog into a dict that ``from_spec`` can read.

    Sources are referenced by their fully-qualified class name plus their
    constructor kwargs. Callers are responsible for making sure the
    referenced modules are importable when ``from_spec`` runs.
    """
    out: dict[str, dict[str, Any]] = {}
    for name, spec in self._specs.items():
        out[name] = {
            "source": _qualname(spec.source),
            "source_kwargs": _source_kwargs(spec.source),
            "store_key": spec.store_key,
            "refresh_kwargs": dict(spec.refresh_kwargs),
            "tags": list(spec.tags),
        }
    return out

DatasetSpec dataclass

DatasetSpec(
    name: str,
    source: Backend,
    store_key: str,
    refresh_kwargs: dict[str, Any] = dict(),
    tags: tuple[str, ...] = (),
)

Declarative binding for a single dataset.