Adding New Data Sources#
This guide explains how to add a new data source to the finm.data module following
the standardized pattern.
File Structure#
Each data source should have the following structure:
src/finm/data/
└── your_source/
├── __init__.py # Public API: pull(), load(), to_long_format()
├── _constants.py # URLs, file names, metadata
├── _pull.py # Download logic
├── _load.py # Load from cache logic
└── _transform.py # Wide <-> Long format conversion
Step 1: Create Constants#
Create _constants.py with metadata about your data source:
"""Constants for your data source."""
from typing import Final
DATASET_NAME: Final[str] = "your_source"
DISPLAY_NAME: Final[str] = "Your Data Source Name"
# Data source URL(s)
DATA_URL: Final[str] = "https://example.com/data.csv"
# Parquet file names
PARQUET_FILE: Final[str] = "your_data.parquet"
# Column names for long format (if applicable)
VALUE_COLUMNS: Final[list[str]] = ["col1", "col2", "col3"]
Step 2: Implement Pull#
Create _pull.py with download logic:
"""Pull functions for your data source."""
from __future__ import annotations
from pathlib import Path
import pandas as pd
import requests
from finm.data.your_source._constants import DATA_URL, PARQUET_FILE
def pull_data(data_dir: Path | str) -> pd.DataFrame:
"""Download data from source and save to parquet.
Parameters
----------
data_dir : Path or str
Directory to save the data.
Returns
-------
pd.DataFrame
Downloaded data.
"""
data_dir = Path(data_dir)
data_dir.mkdir(parents=True, exist_ok=True)
# Download data
response = requests.get(DATA_URL)
response.raise_for_status()
# Parse and process
df = pd.read_csv(BytesIO(response.content))
# Save to parquet
df.to_parquet(data_dir / PARQUET_FILE)
return df
Step 3: Implement Load#
Create _load.py with cache loading logic:
"""Load functions for your data source."""
from __future__ import annotations
from pathlib import Path
import pandas as pd
from finm.data.your_source._constants import PARQUET_FILE
def load_data(data_dir: Path | str) -> pd.DataFrame:
"""Load data from parquet cache.
Parameters
----------
data_dir : Path or str
Directory containing the parquet file.
Returns
-------
pd.DataFrame
Loaded data.
"""
data_dir = Path(data_dir)
return pd.read_parquet(data_dir / PARQUET_FILE)
Step 4: Implement Transform#
Create _transform.py with format conversion:
"""Transform functions for your data source."""
from __future__ import annotations
import pandas as pd
from finm.data.your_source._constants import VALUE_COLUMNS
def to_long_format(df: pd.DataFrame) -> pd.DataFrame:
"""Convert from wide to long format.
Parameters
----------
df : pd.DataFrame
Wide-format DataFrame.
Returns
-------
pd.DataFrame
Long-format DataFrame with columns [unique_id, ds, y].
"""
# Reset index to make date a column
df_reset = df.reset_index()
date_col = df_reset.columns[0]
# Melt from wide to long
long_df = df_reset.melt(
id_vars=[date_col],
value_vars=VALUE_COLUMNS,
var_name="unique_id",
value_name="y",
)
# Rename date column to ds
long_df = long_df.rename(columns={date_col: "ds"})
# Reorder columns
long_df = long_df[["unique_id", "ds", "y"]]
# Drop NaN values
long_df = long_df.dropna(subset=["y"])
return long_df.reset_index(drop=True)
Step 5: Create Public Interface#
Create __init__.py with the standard interface. Note that all load() functions
should return polars DataFrames by default and support the pull_if_not_found
and lazy parameters:
"""Your data source module.
Standard interface:
- pull(data_dir, accept_license): Download data from source
- load(data_dir, format, pull_if_not_found, lazy): Load cached data (returns polars)
- to_long_format(df): Convert to long format
"""
from __future__ import annotations
from pathlib import Path
from typing import Literal, Union
import pandas as pd
import polars as pl
from finm.data.your_source._constants import PARQUET_FILE
from finm.data.your_source._load import load_data
from finm.data.your_source._pull import pull_data
from finm.data.your_source._transform import to_long_format
FormatType = Literal["wide", "long"]
def pull(
data_dir: Path | str,
accept_license: bool = False,
) -> pd.DataFrame:
"""Download data from source.
Parameters
----------
data_dir : Path or str
Directory to save downloaded data.
accept_license : bool, default False
Must be True to acknowledge the data provider's license terms.
Returns
-------
pd.DataFrame
Downloaded data.
"""
return pull_data(data_dir=data_dir, accept_license=accept_license)
def load(
data_dir: Path | str,
format: FormatType = "wide",
pull_if_not_found: bool = False,
accept_license: bool = False,
lazy: bool = False,
) -> Union[pl.DataFrame, pl.LazyFrame]:
"""Load data from cache.
Parameters
----------
data_dir : Path or str
Directory containing the parquet file.
format : {"wide", "long"}, default "wide"
Output format.
pull_if_not_found : bool, default False
If True and data doesn't exist locally, pull from source.
Requires accept_license=True.
accept_license : bool, default False
Must be True when pull_if_not_found=True.
lazy : bool, default False
If True, return a polars LazyFrame instead of DataFrame.
Returns
-------
pl.DataFrame or pl.LazyFrame
Loaded data as polars DataFrame (default) or LazyFrame.
"""
from finm.data._utils import pandas_to_polars
data_path = Path(data_dir)
# Handle pull_if_not_found
if pull_if_not_found:
if not accept_license:
raise ValueError(
"When pull_if_not_found=True, accept_license must also be True."
)
if not (data_path / PARQUET_FILE).exists():
pull_data(data_dir=data_dir, accept_license=True)
# Load data (internally uses pandas)
df = load_data(data_dir=data_dir)
if format == "long":
df = to_long_format(df)
# Convert to polars (always)
return pandas_to_polars(df, lazy=lazy)
__all__ = ["pull", "load", "to_long_format"]
Step 6: Register in Data Module#
Add your source to src/finm/data/__init__.py:
# Add import
from finm.data import your_source
# Add wrapper functions if desired
def pull_your_source(data_dir: Path | str) -> pd.DataFrame:
"""Download your data."""
return your_source.pull(data_dir=data_dir)
def load_your_source(data_dir: Path | str, format: FormatType = "wide") -> pd.DataFrame:
"""Load your data."""
return your_source.load(data_dir=data_dir, format=format)
# Add to __all__
__all__ = [
# ...
"your_source",
"pull_your_source",
"load_your_source",
]
Step 7: Add CLI Support (Optional)#
Update src/finm/data/_cli.py to include your dataset:
class Dataset(str, Enum):
# ...
your_source = "your_source"
# In pull command
elif dataset == Dataset.your_source:
from finm.data import your_source
your_source.pull(data_dir=resolved_data_dir)
Checklist#
Create
_constants.pywith URLs and file namesCreate
_pull.pywith download logicCreate
_load.pywith cache loadingCreate
_transform.pywith long format conversionCreate
__init__.pywith standard interfaceRegister in
finm.data.__init__.pyAdd CLI support if needed
Add tests
Update documentation