Skip to content

Data Feeds

PolarsDataFeed

PolarsDataFeed

Bases: DataFeed

Data feed that loads market data from Polars dataframes.

Source code in alphaflow/data_feeds/polars_data_feed.py
class PolarsDataFeed(DataFeed):
    """Data feed that loads market data from Polars dataframes."""

    def __init__(
        self,
        df_or_file_path: Path | str | pl.DataFrame | pl.LazyFrame,
        *,
        col_timestamp: str = "Date",
        col_symbol: str = "Symbol",
        col_open: str = "Open",
        col_high: str = "High",
        col_low: str = "Low",
        col_close: str = "Close",
        col_volume: str = "Volume",
    ) -> None:
        """Initialize the Polars data feed.

        Args:
            df_or_file_path: Polars dataframe or path to the Polars dataframe containing market data.
            col_timestamp: Name of the timestamp column.
            col_symbol: Name of the symbol column.
            col_open: Name of the open price column.
            col_high: Name of the high price column.
            col_low: Name of the low price column.
            col_close: Name of the close price column.
            col_volume: Name of the volume column.

        """
        self.df_or_file_path = df_or_file_path
        self._col_timestamp = col_timestamp
        self._col_symbol = col_symbol
        self._col_open = col_open
        self._col_high = col_high
        self._col_low = col_low
        self._col_close = col_close
        self._col_volume = col_volume

    def run(
        self,
        symbol: str,
        start_timestamp: datetime | None,
        end_timestamp: datetime | None,
    ) -> Generator[MarketDataEvent, None, None]:
        """Load and yield market data events from the Polars dataframe.

        Args:
            symbol: The ticker symbol to load data for.
            start_timestamp: Optional start time for filtering data.
            end_timestamp: Optional end time for filtering data.

        Yields:
            MarketDataEvent objects containing OHLCV data.

        Raises:
            ValueError: If required columns are missing from the Polars dataframe.

        """
        if isinstance(self.df_or_file_path, (str, Path)):
            df_path = Path(self.df_or_file_path) if isinstance(self.df_or_file_path, str) else self.df_or_file_path
            if df_path.suffix in {".parquet", ".parq"}:
                df = pl.read_parquet(df_path)
                df = df.with_columns(pl.col(self._col_timestamp).cast(pl.Datetime))
            elif df_path.suffix == ".csv":
                df = pl.read_csv(df_path, try_parse_dates=True)
            else:
                raise ValueError(f"Unsupported file format: {df_path.suffix}")
        elif isinstance(self.df_or_file_path, pl.LazyFrame):
            df = self.df_or_file_path.collect()
        else:
            df = self.df_or_file_path

        required_cols = {
            self._col_timestamp,
            self._col_close,
            self._col_high,
            self._col_low,
            self._col_open,
            self._col_volume,
        }

        missing_cols = required_cols.difference(df.columns)
        if missing_cols:
            raise ValueError(f"Missing columns: {missing_cols}")

        # Convert date column to datetime if needed (polars parses as date by default)
        if df[self._col_timestamp].dtype == pl.Date:
            df = df.with_columns(pl.col(self._col_timestamp).cast(pl.Datetime))

        # Filter by symbol using polars
        if self._col_symbol in df.columns:
            df = df.filter(pl.col(self._col_symbol) == symbol)

        # Filter by timestamp bounds using polars
        if start_timestamp:
            df = df.filter(pl.col(self._col_timestamp) >= start_timestamp)
        if end_timestamp:
            df = df.filter(pl.col(self._col_timestamp) <= end_timestamp)

        # Convert to dicts once after all filtering
        for row in df.sort(by=self._col_timestamp).iter_rows(named=True):
            event = MarketDataEvent(
                timestamp=row[self._col_timestamp],
                symbol=symbol,
                open=row[self._col_open],
                high=row[self._col_high],
                low=row[self._col_low],
                close=row[self._col_close],
                volume=row[self._col_volume],
            )
            yield event

__init__(df_or_file_path, *, col_timestamp='Date', col_symbol='Symbol', col_open='Open', col_high='High', col_low='Low', col_close='Close', col_volume='Volume')

Initialize the Polars data feed.

Parameters:

Name Type Description Default
df_or_file_path Path | str | DataFrame | LazyFrame

Polars dataframe or path to the Polars dataframe containing market data.

required
col_timestamp str

Name of the timestamp column.

'Date'
col_symbol str

Name of the symbol column.

'Symbol'
col_open str

Name of the open price column.

'Open'
col_high str

Name of the high price column.

'High'
col_low str

Name of the low price column.

'Low'
col_close str

Name of the close price column.

'Close'
col_volume str

Name of the volume column.

'Volume'
Source code in alphaflow/data_feeds/polars_data_feed.py
def __init__(
    self,
    df_or_file_path: Path | str | pl.DataFrame | pl.LazyFrame,
    *,
    col_timestamp: str = "Date",
    col_symbol: str = "Symbol",
    col_open: str = "Open",
    col_high: str = "High",
    col_low: str = "Low",
    col_close: str = "Close",
    col_volume: str = "Volume",
) -> None:
    """Initialize the Polars data feed.

    Args:
        df_or_file_path: Polars dataframe or path to the Polars dataframe containing market data.
        col_timestamp: Name of the timestamp column.
        col_symbol: Name of the symbol column.
        col_open: Name of the open price column.
        col_high: Name of the high price column.
        col_low: Name of the low price column.
        col_close: Name of the close price column.
        col_volume: Name of the volume column.

    """
    self.df_or_file_path = df_or_file_path
    self._col_timestamp = col_timestamp
    self._col_symbol = col_symbol
    self._col_open = col_open
    self._col_high = col_high
    self._col_low = col_low
    self._col_close = col_close
    self._col_volume = col_volume

run(symbol, start_timestamp, end_timestamp)

Load and yield market data events from the Polars dataframe.

Parameters:

Name Type Description Default
symbol str

The ticker symbol to load data for.

required
start_timestamp datetime | None

Optional start time for filtering data.

required
end_timestamp datetime | None

Optional end time for filtering data.

required

Yields:

Type Description
MarketDataEvent

MarketDataEvent objects containing OHLCV data.

Raises:

Type Description
ValueError

If required columns are missing from the Polars dataframe.

Source code in alphaflow/data_feeds/polars_data_feed.py
def run(
    self,
    symbol: str,
    start_timestamp: datetime | None,
    end_timestamp: datetime | None,
) -> Generator[MarketDataEvent, None, None]:
    """Load and yield market data events from the Polars dataframe.

    Args:
        symbol: The ticker symbol to load data for.
        start_timestamp: Optional start time for filtering data.
        end_timestamp: Optional end time for filtering data.

    Yields:
        MarketDataEvent objects containing OHLCV data.

    Raises:
        ValueError: If required columns are missing from the Polars dataframe.

    """
    if isinstance(self.df_or_file_path, (str, Path)):
        df_path = Path(self.df_or_file_path) if isinstance(self.df_or_file_path, str) else self.df_or_file_path
        if df_path.suffix in {".parquet", ".parq"}:
            df = pl.read_parquet(df_path)
            df = df.with_columns(pl.col(self._col_timestamp).cast(pl.Datetime))
        elif df_path.suffix == ".csv":
            df = pl.read_csv(df_path, try_parse_dates=True)
        else:
            raise ValueError(f"Unsupported file format: {df_path.suffix}")
    elif isinstance(self.df_or_file_path, pl.LazyFrame):
        df = self.df_or_file_path.collect()
    else:
        df = self.df_or_file_path

    required_cols = {
        self._col_timestamp,
        self._col_close,
        self._col_high,
        self._col_low,
        self._col_open,
        self._col_volume,
    }

    missing_cols = required_cols.difference(df.columns)
    if missing_cols:
        raise ValueError(f"Missing columns: {missing_cols}")

    # Convert date column to datetime if needed (polars parses as date by default)
    if df[self._col_timestamp].dtype == pl.Date:
        df = df.with_columns(pl.col(self._col_timestamp).cast(pl.Datetime))

    # Filter by symbol using polars
    if self._col_symbol in df.columns:
        df = df.filter(pl.col(self._col_symbol) == symbol)

    # Filter by timestamp bounds using polars
    if start_timestamp:
        df = df.filter(pl.col(self._col_timestamp) >= start_timestamp)
    if end_timestamp:
        df = df.filter(pl.col(self._col_timestamp) <= end_timestamp)

    # Convert to dicts once after all filtering
    for row in df.sort(by=self._col_timestamp).iter_rows(named=True):
        event = MarketDataEvent(
            timestamp=row[self._col_timestamp],
            symbol=symbol,
            open=row[self._col_open],
            high=row[self._col_high],
            low=row[self._col_low],
            close=row[self._col_close],
            volume=row[self._col_volume],
        )
        yield event

AlphaVantageFeed

AlphaVantageFeed

Bases: DataFeed

Data feed that loads market data from Alpha Vantage API.

Source code in alphaflow/data_feeds/alpha_vantage_data_feed.py
class AlphaVantageFeed(DataFeed):
    """Data feed that loads market data from Alpha Vantage API."""

    def __init__(
        self,
        use_cache: bool = False,
        api_key: str | None = None,
        rate_limit_retries: int = 3,
        rate_limit_backoff: float = 60.0,
        rate_limit_backoff_multiplier: float = 1.0,
    ) -> None:
        """Initialize the Alpha Vantage data feed.

        Args:
            use_cache: Whether to cache API responses (not yet implemented).
            api_key: Alpha Vantage API key. Falls back to ALPHA_VANTAGE_API_KEY env var.
            rate_limit_retries: Number of retry attempts for 429 rate limit errors (default: 3).
            rate_limit_backoff: Initial backoff delay in seconds for rate limit errors (default: 60).
            rate_limit_backoff_multiplier: Multiplier for exponential backoff (default: 1.0).

        """
        self._use_cache = use_cache
        self.__api_key = api_key or os.getenv("ALPHA_VANTAGE_API_KEY")
        self.rate_limit_retries = rate_limit_retries
        self.rate_limit_backoff = rate_limit_backoff
        self.rate_limit_backoff_multiplier = rate_limit_backoff_multiplier

    def run(
        self,
        symbol: str,
        start_timestamp: datetime | None,
        end_timestamp: datetime | None,
    ) -> Generator[MarketDataEvent, None, None]:
        """Load and yield market data events from Alpha Vantage API.

        Args:
            symbol: The ticker symbol to load data for.
            start_timestamp: Optional start time for filtering data.
            end_timestamp: Optional end time for filtering data.

        Yields:
            MarketDataEvent objects containing OHLCV data.

        """
        if self._use_cache:
            raise NotImplementedError("Cache not implemented yet")
        else:
            url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol={symbol}&apikey={self.__api_key}&outputsize=full"
            logger.debug(f"Fetching data for symbol '{symbol}' from Alpha Vantage endpoint.")
            data = http_request_with_backoff(
                request_func=lambda: httpx.get(url, timeout=httpx.Timeout(30.0)),
                retries=self.rate_limit_retries,
                backoff=self.rate_limit_backoff,
                backoff_multiplier=self.rate_limit_backoff_multiplier,
                error_message="Failed to fetch data from Alpha Vantage",
            )
            for date, datum in data["Time Series (Daily)"].items():
                event = MarketDataEvent(
                    timestamp=datetime.strptime(date, "%Y-%m-%d"),
                    symbol=symbol,
                    open=float(datum["1. open"]),
                    high=float(datum["2. high"]),
                    low=float(datum["3. low"]),
                    close=float(datum["5. adjusted close"]),
                    volume=float(datum["6. volume"]),
                )
                if start_timestamp is not None and event.timestamp < start_timestamp:
                    continue
                if end_timestamp is not None and event.timestamp > end_timestamp:
                    continue
                yield event

__init__(use_cache=False, api_key=None, rate_limit_retries=3, rate_limit_backoff=60.0, rate_limit_backoff_multiplier=1.0)

Initialize the Alpha Vantage data feed.

Parameters:

Name Type Description Default
use_cache bool

Whether to cache API responses (not yet implemented).

False
api_key str | None

Alpha Vantage API key. Falls back to ALPHA_VANTAGE_API_KEY env var.

None
rate_limit_retries int

Number of retry attempts for 429 rate limit errors (default: 3).

3
rate_limit_backoff float

Initial backoff delay in seconds for rate limit errors (default: 60).

60.0
rate_limit_backoff_multiplier float

Multiplier for exponential backoff (default: 1.0).

1.0
Source code in alphaflow/data_feeds/alpha_vantage_data_feed.py
def __init__(
    self,
    use_cache: bool = False,
    api_key: str | None = None,
    rate_limit_retries: int = 3,
    rate_limit_backoff: float = 60.0,
    rate_limit_backoff_multiplier: float = 1.0,
) -> None:
    """Initialize the Alpha Vantage data feed.

    Args:
        use_cache: Whether to cache API responses (not yet implemented).
        api_key: Alpha Vantage API key. Falls back to ALPHA_VANTAGE_API_KEY env var.
        rate_limit_retries: Number of retry attempts for 429 rate limit errors (default: 3).
        rate_limit_backoff: Initial backoff delay in seconds for rate limit errors (default: 60).
        rate_limit_backoff_multiplier: Multiplier for exponential backoff (default: 1.0).

    """
    self._use_cache = use_cache
    self.__api_key = api_key or os.getenv("ALPHA_VANTAGE_API_KEY")
    self.rate_limit_retries = rate_limit_retries
    self.rate_limit_backoff = rate_limit_backoff
    self.rate_limit_backoff_multiplier = rate_limit_backoff_multiplier

run(symbol, start_timestamp, end_timestamp)

Load and yield market data events from Alpha Vantage API.

Parameters:

Name Type Description Default
symbol str

The ticker symbol to load data for.

required
start_timestamp datetime | None

Optional start time for filtering data.

required
end_timestamp datetime | None

Optional end time for filtering data.

required

Yields:

Type Description
MarketDataEvent

MarketDataEvent objects containing OHLCV data.

Source code in alphaflow/data_feeds/alpha_vantage_data_feed.py
def run(
    self,
    symbol: str,
    start_timestamp: datetime | None,
    end_timestamp: datetime | None,
) -> Generator[MarketDataEvent, None, None]:
    """Load and yield market data events from Alpha Vantage API.

    Args:
        symbol: The ticker symbol to load data for.
        start_timestamp: Optional start time for filtering data.
        end_timestamp: Optional end time for filtering data.

    Yields:
        MarketDataEvent objects containing OHLCV data.

    """
    if self._use_cache:
        raise NotImplementedError("Cache not implemented yet")
    else:
        url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol={symbol}&apikey={self.__api_key}&outputsize=full"
        logger.debug(f"Fetching data for symbol '{symbol}' from Alpha Vantage endpoint.")
        data = http_request_with_backoff(
            request_func=lambda: httpx.get(url, timeout=httpx.Timeout(30.0)),
            retries=self.rate_limit_retries,
            backoff=self.rate_limit_backoff,
            backoff_multiplier=self.rate_limit_backoff_multiplier,
            error_message="Failed to fetch data from Alpha Vantage",
        )
        for date, datum in data["Time Series (Daily)"].items():
            event = MarketDataEvent(
                timestamp=datetime.strptime(date, "%Y-%m-%d"),
                symbol=symbol,
                open=float(datum["1. open"]),
                high=float(datum["2. high"]),
                low=float(datum["3. low"]),
                close=float(datum["5. adjusted close"]),
                volume=float(datum["6. volume"]),
            )
            if start_timestamp is not None and event.timestamp < start_timestamp:
                continue
            if end_timestamp is not None and event.timestamp > end_timestamp:
                continue
            yield event

FMPDataFeed

FMPDataFeed

Bases: DataFeed

Data feed that loads market data from Financial Modeling Prep API.

Source code in alphaflow/data_feeds/fmp_data_feed.py
class FMPDataFeed(DataFeed):
    """Data feed that loads market data from Financial Modeling Prep API."""

    def __init__(
        self,
        use_cache: bool = False,
        api_key: str | None = None,
        rate_limit_retries: int = 3,
        rate_limit_backoff: float = 60.0,
        rate_limit_backoff_multiplier: float = 1.0,
    ) -> None:
        """Initialize the FMP data feed.

        Args:
            use_cache: Whether to cache API responses (not yet implemented).
            api_key: FMP API key. Falls back to FMP_API_KEY env var.
            rate_limit_retries: Number of retry attempts for 429 rate limit errors (default: 3).
            rate_limit_backoff: Initial backoff delay in seconds for rate limit errors (default: 60).
            rate_limit_backoff_multiplier: Multiplier for exponential backoff (default: 1.0).

        """
        self._use_cache = use_cache
        self.__api_key = api_key or os.getenv("FMP_API_KEY")
        self.rate_limit_retries = rate_limit_retries
        self.rate_limit_backoff = rate_limit_backoff
        self.rate_limit_backoff_multiplier = rate_limit_backoff_multiplier

    def run(
        self,
        symbol: str,
        start_timestamp: datetime | None,
        end_timestamp: datetime | None,
    ) -> Generator[MarketDataEvent, None, None]:
        """Load and yield market data events from FMP API.

        Args:
            symbol: The ticker symbol to load data for.
            start_timestamp: Optional start time for filtering data.
            end_timestamp: Optional end time for filtering data.

        Yields:
            MarketDataEvent objects containing OHLCV data.

        """
        if self._use_cache:
            raise NotImplementedError("Cache not implemented yet")
        else:
            url = f"https://financialmodelingprep.com/stable/historical-price-eod/dividend-adjusted?symbol={symbol}&apikey={self.__api_key}"
            if start_timestamp:
                url += f"&from={start_timestamp.date()}"
            if end_timestamp:
                url += f"&to={end_timestamp.date()}"
            logger.debug(f"Fetching data for symbol '{symbol}' from FMP stable historical-price-eod endpoint")
            data = http_request_with_backoff(
                request_func=lambda: httpx.get(url, timeout=httpx.Timeout(30.0)),
                retries=self.rate_limit_retries,
                backoff=self.rate_limit_backoff,
                backoff_multiplier=self.rate_limit_backoff_multiplier,
                error_message="Failed to fetch data from FMP",
            )

            # Sort data by date to ensure chronological order (oldest first)
            sorted_data = sorted(data, key=lambda x: x["date"])

            for row in sorted_data:
                event = MarketDataEvent(
                    timestamp=datetime.strptime(row["date"], "%Y-%m-%d"),
                    symbol=symbol,
                    open=row["adjOpen"],
                    high=row["adjHigh"],
                    low=row["adjLow"],
                    close=row["adjClose"],
                    volume=row["volume"],
                )
                yield event

__init__(use_cache=False, api_key=None, rate_limit_retries=3, rate_limit_backoff=60.0, rate_limit_backoff_multiplier=1.0)

Initialize the FMP data feed.

Parameters:

Name Type Description Default
use_cache bool

Whether to cache API responses (not yet implemented).

False
api_key str | None

FMP API key. Falls back to FMP_API_KEY env var.

None
rate_limit_retries int

Number of retry attempts for 429 rate limit errors (default: 3).

3
rate_limit_backoff float

Initial backoff delay in seconds for rate limit errors (default: 60).

60.0
rate_limit_backoff_multiplier float

Multiplier for exponential backoff (default: 1.0).

1.0
Source code in alphaflow/data_feeds/fmp_data_feed.py
def __init__(
    self,
    use_cache: bool = False,
    api_key: str | None = None,
    rate_limit_retries: int = 3,
    rate_limit_backoff: float = 60.0,
    rate_limit_backoff_multiplier: float = 1.0,
) -> None:
    """Initialize the FMP data feed.

    Args:
        use_cache: Whether to cache API responses (not yet implemented).
        api_key: FMP API key. Falls back to FMP_API_KEY env var.
        rate_limit_retries: Number of retry attempts for 429 rate limit errors (default: 3).
        rate_limit_backoff: Initial backoff delay in seconds for rate limit errors (default: 60).
        rate_limit_backoff_multiplier: Multiplier for exponential backoff (default: 1.0).

    """
    self._use_cache = use_cache
    self.__api_key = api_key or os.getenv("FMP_API_KEY")
    self.rate_limit_retries = rate_limit_retries
    self.rate_limit_backoff = rate_limit_backoff
    self.rate_limit_backoff_multiplier = rate_limit_backoff_multiplier

run(symbol, start_timestamp, end_timestamp)

Load and yield market data events from FMP API.

Parameters:

Name Type Description Default
symbol str

The ticker symbol to load data for.

required
start_timestamp datetime | None

Optional start time for filtering data.

required
end_timestamp datetime | None

Optional end time for filtering data.

required

Yields:

Type Description
MarketDataEvent

MarketDataEvent objects containing OHLCV data.

Source code in alphaflow/data_feeds/fmp_data_feed.py
def run(
    self,
    symbol: str,
    start_timestamp: datetime | None,
    end_timestamp: datetime | None,
) -> Generator[MarketDataEvent, None, None]:
    """Load and yield market data events from FMP API.

    Args:
        symbol: The ticker symbol to load data for.
        start_timestamp: Optional start time for filtering data.
        end_timestamp: Optional end time for filtering data.

    Yields:
        MarketDataEvent objects containing OHLCV data.

    """
    if self._use_cache:
        raise NotImplementedError("Cache not implemented yet")
    else:
        url = f"https://financialmodelingprep.com/stable/historical-price-eod/dividend-adjusted?symbol={symbol}&apikey={self.__api_key}"
        if start_timestamp:
            url += f"&from={start_timestamp.date()}"
        if end_timestamp:
            url += f"&to={end_timestamp.date()}"
        logger.debug(f"Fetching data for symbol '{symbol}' from FMP stable historical-price-eod endpoint")
        data = http_request_with_backoff(
            request_func=lambda: httpx.get(url, timeout=httpx.Timeout(30.0)),
            retries=self.rate_limit_retries,
            backoff=self.rate_limit_backoff,
            backoff_multiplier=self.rate_limit_backoff_multiplier,
            error_message="Failed to fetch data from FMP",
        )

        # Sort data by date to ensure chronological order (oldest first)
        sorted_data = sorted(data, key=lambda x: x["date"])

        for row in sorted_data:
            event = MarketDataEvent(
                timestamp=datetime.strptime(row["date"], "%Y-%m-%d"),
                symbol=symbol,
                open=row["adjOpen"],
                high=row["adjHigh"],
                low=row["adjLow"],
                close=row["adjClose"],
                volume=row["volume"],
            )
            yield event

PolygonDataFeed

PolygonDataFeed

Bases: DataFeed

Data feed that loads market data from Polygon.io API.

Supports daily and intraday OHLCV data. See https://polygon.io for API documentation and pricing.

Source code in alphaflow/data_feeds/polygon_data_feed.py
class PolygonDataFeed(DataFeed):
    """Data feed that loads market data from Polygon.io API.

    Supports daily and intraday OHLCV data.
    See https://polygon.io for API documentation and pricing.
    """

    BASE_URL = "https://api.polygon.io"

    def __init__(
        self,
        api_key: str | None = None,
        timeframe: str = "day",
        multiplier: int = 1,
        rate_limit_retries: int = 3,
        rate_limit_backoff: float = 60.0,
        rate_limit_backoff_multiplier: float = 1.0,
    ) -> None:
        """Initialize the Polygon.io data feed.

        Args:
            api_key: Polygon.io API key. Falls back to POLYGON_API_KEY env var.
            timeframe: Timeframe for bars ('minute', 'hour', 'day', 'week', 'month').
            multiplier: Multiplier for the timeframe (e.g., 5 for 5-minute bars).
            rate_limit_retries: Number of retry attempts for 429 rate limit errors (default: 3).
            rate_limit_backoff: Initial backoff delay in seconds for rate limit errors (default: 60).
            rate_limit_backoff_multiplier: Multiplier for exponential backoff (default: 1.0).

        """
        self.__api_key = api_key or os.getenv("POLYGON_API_KEY")
        if not self.__api_key:
            raise ValueError(
                "Polygon API key required. Provide via api_key parameter or POLYGON_API_KEY environment variable."
            )

        self.timeframe = timeframe
        self.multiplier = multiplier
        self.rate_limit_retries = rate_limit_retries
        self.rate_limit_backoff = rate_limit_backoff
        self.rate_limit_backoff_multiplier = rate_limit_backoff_multiplier

    def run(
        self,
        symbol: str,
        start_timestamp: datetime | None,
        end_timestamp: datetime | None,
    ) -> Generator[MarketDataEvent, None, None]:
        """Load and yield market data events from Polygon.io API.

        Args:
            symbol: The ticker symbol to load data for.
            start_timestamp: Start time for data range (required).
            end_timestamp: End time for data range (required).

        Yields:
            MarketDataEvent objects containing OHLCV data and optionally bid/ask.

        Raises:
            ValueError: If start_timestamp or end_timestamp is None, or API request fails.

        """
        if start_timestamp is None or end_timestamp is None:
            raise ValueError("Polygon data feed requires start_timestamp and end_timestamp")

        # Format dates for API (YYYY-MM-DD)
        start_date = start_timestamp.strftime("%Y-%m-%d")
        end_date = end_timestamp.strftime("%Y-%m-%d")

        # Build aggregates URL
        url = (
            f"{self.BASE_URL}/v2/aggs/ticker/{symbol}/range/{self.multiplier}/{self.timeframe}/{start_date}/{end_date}"
        )

        params: dict[str, str | int | None] = {
            "apiKey": self.__api_key,
            "adjusted": "true",  # Use adjusted prices (splits, dividends)
            "sort": "asc",  # Chronological order
            "limit": 50000,  # Max results per request
        }

        logger.info(f"Fetching {self.multiplier}{self.timeframe} data for '{symbol}' from {start_date} to {end_date}")

        # Fetch initial page with retry/backoff
        data = http_request_with_backoff(
            request_func=lambda: httpx.get(url, params=params, timeout=httpx.Timeout(30.0)),
            retries=self.rate_limit_retries,
            backoff=self.rate_limit_backoff,
            backoff_multiplier=self.rate_limit_backoff_multiplier,
            error_message="Failed to fetch data from Polygon",
        )

        # Check response status
        if data.get("status") != "OK":
            error_msg = data.get("error", "Unknown error")
            raise ValueError(f"Polygon API error: {error_msg}")

        # Check if we have results
        results = data.get("results", [])
        if not results:
            logger.warning(f"No data returned for {symbol} in date range {start_date} to {end_date}")
            return

        # Process and yield results from first page
        total_bars = len(results)
        for bar in results:
            # Polygon returns UTC timestamps in milliseconds
            # Convert to naive UTC datetime (timezone.utc then strip tzinfo for consistency)
            timestamp = datetime.fromtimestamp(bar["t"] / 1000, tz=timezone.utc).replace(tzinfo=None)

            # Create market data event
            event = MarketDataEvent(
                timestamp=timestamp,
                symbol=symbol,
                open=float(bar["o"]),
                high=float(bar["h"]),
                low=float(bar["l"]),
                close=float(bar["c"]),
                volume=int(bar["v"]),
                # Bid/ask will be None for now - requires separate API call
                # This could be extended in future to fetch NBBO data
            )

            yield event

        # Handle pagination - detect infinite loops by tracking seen URLs
        seen_urls = {url}  # Track visited URLs to prevent circular pagination
        page_count = 1

        while data.get("next_url"):
            next_url = cast(str, data["next_url"])

            # Detect circular pagination (same URL appearing again)
            if next_url in seen_urls:
                logger.warning(f"Detected circular pagination for {symbol} - same URL appeared twice")
                break

            seen_urls.add(next_url)

            try:
                data = http_request_with_backoff(
                    request_func=partial(httpx.get, next_url, timeout=httpx.Timeout(30.0)),
                    retries=self.rate_limit_retries,
                    backoff=self.rate_limit_backoff,
                    backoff_multiplier=self.rate_limit_backoff_multiplier,
                    error_message="Failed to fetch data from Polygon",
                )

                # Check status on paginated response
                if data.get("status") != "OK":
                    logger.warning(f"Pagination request failed for {symbol}: {data.get('error', 'Unknown error')}")
                    break

                page_results = data.get("results", [])
                if not page_results:
                    break

                total_bars += len(page_results)

                # Process and yield results from this page
                for bar in page_results:
                    timestamp = datetime.fromtimestamp(bar["t"] / 1000, tz=timezone.utc).replace(tzinfo=None)
                    event = MarketDataEvent(
                        timestamp=timestamp,
                        symbol=symbol,
                        open=float(bar["o"]),
                        high=float(bar["h"]),
                        low=float(bar["l"]),
                        close=float(bar["c"]),
                        volume=int(bar["v"]),
                    )
                    yield event

                page_count += 1

            except ValueError as e:
                logger.warning(f"Failed to fetch page {page_count + 1} for {symbol}: {e}")
                break

        # Log summary
        logger.info(f"Loaded {total_bars} bars across {page_count} page(s) for {symbol}")

__init__(api_key=None, timeframe='day', multiplier=1, rate_limit_retries=3, rate_limit_backoff=60.0, rate_limit_backoff_multiplier=1.0)

Initialize the Polygon.io data feed.

Parameters:

Name Type Description Default
api_key str | None

Polygon.io API key. Falls back to POLYGON_API_KEY env var.

None
timeframe str

Timeframe for bars ('minute', 'hour', 'day', 'week', 'month').

'day'
multiplier int

Multiplier for the timeframe (e.g., 5 for 5-minute bars).

1
rate_limit_retries int

Number of retry attempts for 429 rate limit errors (default: 3).

3
rate_limit_backoff float

Initial backoff delay in seconds for rate limit errors (default: 60).

60.0
rate_limit_backoff_multiplier float

Multiplier for exponential backoff (default: 1.0).

1.0
Source code in alphaflow/data_feeds/polygon_data_feed.py
def __init__(
    self,
    api_key: str | None = None,
    timeframe: str = "day",
    multiplier: int = 1,
    rate_limit_retries: int = 3,
    rate_limit_backoff: float = 60.0,
    rate_limit_backoff_multiplier: float = 1.0,
) -> None:
    """Initialize the Polygon.io data feed.

    Args:
        api_key: Polygon.io API key. Falls back to POLYGON_API_KEY env var.
        timeframe: Timeframe for bars ('minute', 'hour', 'day', 'week', 'month').
        multiplier: Multiplier for the timeframe (e.g., 5 for 5-minute bars).
        rate_limit_retries: Number of retry attempts for 429 rate limit errors (default: 3).
        rate_limit_backoff: Initial backoff delay in seconds for rate limit errors (default: 60).
        rate_limit_backoff_multiplier: Multiplier for exponential backoff (default: 1.0).

    """
    self.__api_key = api_key or os.getenv("POLYGON_API_KEY")
    if not self.__api_key:
        raise ValueError(
            "Polygon API key required. Provide via api_key parameter or POLYGON_API_KEY environment variable."
        )

    self.timeframe = timeframe
    self.multiplier = multiplier
    self.rate_limit_retries = rate_limit_retries
    self.rate_limit_backoff = rate_limit_backoff
    self.rate_limit_backoff_multiplier = rate_limit_backoff_multiplier

run(symbol, start_timestamp, end_timestamp)

Load and yield market data events from Polygon.io API.

Parameters:

Name Type Description Default
symbol str

The ticker symbol to load data for.

required
start_timestamp datetime | None

Start time for data range (required).

required
end_timestamp datetime | None

End time for data range (required).

required

Yields:

Type Description
MarketDataEvent

MarketDataEvent objects containing OHLCV data and optionally bid/ask.

Raises:

Type Description
ValueError

If start_timestamp or end_timestamp is None, or API request fails.

Source code in alphaflow/data_feeds/polygon_data_feed.py
def run(
    self,
    symbol: str,
    start_timestamp: datetime | None,
    end_timestamp: datetime | None,
) -> Generator[MarketDataEvent, None, None]:
    """Load and yield market data events from Polygon.io API.

    Args:
        symbol: The ticker symbol to load data for.
        start_timestamp: Start time for data range (required).
        end_timestamp: End time for data range (required).

    Yields:
        MarketDataEvent objects containing OHLCV data and optionally bid/ask.

    Raises:
        ValueError: If start_timestamp or end_timestamp is None, or API request fails.

    """
    if start_timestamp is None or end_timestamp is None:
        raise ValueError("Polygon data feed requires start_timestamp and end_timestamp")

    # Format dates for API (YYYY-MM-DD)
    start_date = start_timestamp.strftime("%Y-%m-%d")
    end_date = end_timestamp.strftime("%Y-%m-%d")

    # Build aggregates URL
    url = (
        f"{self.BASE_URL}/v2/aggs/ticker/{symbol}/range/{self.multiplier}/{self.timeframe}/{start_date}/{end_date}"
    )

    params: dict[str, str | int | None] = {
        "apiKey": self.__api_key,
        "adjusted": "true",  # Use adjusted prices (splits, dividends)
        "sort": "asc",  # Chronological order
        "limit": 50000,  # Max results per request
    }

    logger.info(f"Fetching {self.multiplier}{self.timeframe} data for '{symbol}' from {start_date} to {end_date}")

    # Fetch initial page with retry/backoff
    data = http_request_with_backoff(
        request_func=lambda: httpx.get(url, params=params, timeout=httpx.Timeout(30.0)),
        retries=self.rate_limit_retries,
        backoff=self.rate_limit_backoff,
        backoff_multiplier=self.rate_limit_backoff_multiplier,
        error_message="Failed to fetch data from Polygon",
    )

    # Check response status
    if data.get("status") != "OK":
        error_msg = data.get("error", "Unknown error")
        raise ValueError(f"Polygon API error: {error_msg}")

    # Check if we have results
    results = data.get("results", [])
    if not results:
        logger.warning(f"No data returned for {symbol} in date range {start_date} to {end_date}")
        return

    # Process and yield results from first page
    total_bars = len(results)
    for bar in results:
        # Polygon returns UTC timestamps in milliseconds
        # Convert to naive UTC datetime (timezone.utc then strip tzinfo for consistency)
        timestamp = datetime.fromtimestamp(bar["t"] / 1000, tz=timezone.utc).replace(tzinfo=None)

        # Create market data event
        event = MarketDataEvent(
            timestamp=timestamp,
            symbol=symbol,
            open=float(bar["o"]),
            high=float(bar["h"]),
            low=float(bar["l"]),
            close=float(bar["c"]),
            volume=int(bar["v"]),
            # Bid/ask will be None for now - requires separate API call
            # This could be extended in future to fetch NBBO data
        )

        yield event

    # Handle pagination - detect infinite loops by tracking seen URLs
    seen_urls = {url}  # Track visited URLs to prevent circular pagination
    page_count = 1

    while data.get("next_url"):
        next_url = cast(str, data["next_url"])

        # Detect circular pagination (same URL appearing again)
        if next_url in seen_urls:
            logger.warning(f"Detected circular pagination for {symbol} - same URL appeared twice")
            break

        seen_urls.add(next_url)

        try:
            data = http_request_with_backoff(
                request_func=partial(httpx.get, next_url, timeout=httpx.Timeout(30.0)),
                retries=self.rate_limit_retries,
                backoff=self.rate_limit_backoff,
                backoff_multiplier=self.rate_limit_backoff_multiplier,
                error_message="Failed to fetch data from Polygon",
            )

            # Check status on paginated response
            if data.get("status") != "OK":
                logger.warning(f"Pagination request failed for {symbol}: {data.get('error', 'Unknown error')}")
                break

            page_results = data.get("results", [])
            if not page_results:
                break

            total_bars += len(page_results)

            # Process and yield results from this page
            for bar in page_results:
                timestamp = datetime.fromtimestamp(bar["t"] / 1000, tz=timezone.utc).replace(tzinfo=None)
                event = MarketDataEvent(
                    timestamp=timestamp,
                    symbol=symbol,
                    open=float(bar["o"]),
                    high=float(bar["h"]),
                    low=float(bar["l"]),
                    close=float(bar["c"]),
                    volume=int(bar["v"]),
                )
                yield event

            page_count += 1

        except ValueError as e:
            logger.warning(f"Failed to fetch page {page_count + 1} for {symbol}: {e}")
            break

    # Log summary
    logger.info(f"Loaded {total_bars} bars across {page_count} page(s) for {symbol}")