Skip to content

Core API

AlphaFlow

AlphaFlow

Event-driven backtesting engine for trading strategies.

Source code in alphaflow/__init__.py
class AlphaFlow:
    """Event-driven backtesting engine for trading strategies."""

    def __init__(self, *, on_missing_price: str = "raise") -> None:
        """Initialize the AlphaFlow backtest engine.

        Args:
            on_missing_price: Behavior when price data is missing. Options are:
                - "raise": Raise an error (default)
                - "warn": Log a warning and return zero price
                - "ignore": Silently return zero price

        """
        if on_missing_price not in ("raise", "warn", "ignore"):
            raise ValueError("on_missing_price must be 'raise', 'warn', or 'ignore'")
        self.on_missing_price = on_missing_price
        self.event_bus = EventBus()
        self.portfolio = Portfolio(self)
        self.strategies: list[Strategy] = []
        self.analyzers: list[Analyzer] = []
        self.universe: set[str] = set()
        self.data_feed: DataFeed | None = None
        self.broker: Broker | None = None
        self.benchmark: str | None = None
        self._data: dict[str, list[MarketDataEvent]] = defaultdict(list)
        self.data_start_timestamp: datetime | None = None
        self.backtest_start_timestamp: datetime | None = None
        self.backtest_end_timestamp: datetime | None = None
        for topic in self.portfolio.topic_subscriptions():
            self.event_bus.subscribe(topic, self.portfolio)

    def set_benchmark(self, symbol: str) -> None:
        """Set the benchmark symbol for performance comparison.

        Args:
            symbol: The ticker symbol to use as a benchmark (e.g., "SPY").

        """
        self.universe.add(symbol)
        self.benchmark = symbol

    def add_equity(self, symbol: str) -> None:
        """Add an equity symbol to the trading universe.

        Args:
            symbol: The ticker symbol to add (e.g., "AAPL").

        """
        self.universe.add(symbol)

    def set_data_feed(self, data_feed: DataFeed) -> None:
        """Set the data feed for retrieving market data.

        Args:
            data_feed: A DataFeed instance that will provide market data.

        """
        data_feed.set_alpha_flow(self)
        self.data_feed = data_feed

    def add_strategy(self, strategy: Strategy) -> None:
        """Add a trading strategy to the backtest.

        Args:
            strategy: A Strategy instance that will generate trading signals.

        """
        strategy.set_alpha_flow(self)
        for topic in strategy.topic_subscriptions():
            self.event_bus.subscribe(topic, strategy)
        self.strategies.append(strategy)

    def add_analyzer(self, analyzer: Analyzer) -> None:
        """Add an analyzer for performance metrics and visualization.

        Args:
            analyzer: An Analyzer instance for computing metrics and generating reports.

        """
        analyzer.set_alpha_flow(self)
        for topic in analyzer.topic_subscriptions():
            self.event_bus.subscribe(topic, analyzer)
        self.analyzers.append(analyzer)

    def set_broker(self, broker: Broker) -> None:
        """Set the broker for order execution simulation.

        Args:
            broker: A Broker instance that will simulate order execution.

        """
        broker.set_alpha_flow(self)
        for topic in broker.topic_subscriptions():
            self.event_bus.subscribe(topic, broker)
        self.broker = broker

    def set_cash(self, cash: float) -> None:
        """Set the initial cash balance for the portfolio.

        Args:
            cash: The initial cash amount in the portfolio currency.

        """
        self.portfolio.set_cash(cash)

    def set_data_start_timestamp(self, timestamp: datetime | str) -> None:
        """Set the start timestamp for loading data.

        Args:
            timestamp: Start datetime or ISO format string. Data will be loaded from this point.

        """
        if isinstance(timestamp, str):
            timestamp = datetime.fromisoformat(timestamp)
        self.data_start_timestamp = timestamp

    def set_backtest_start_timestamp(self, timestamp: datetime | str) -> None:
        """Set the start timestamp for the backtest period.

        Args:
            timestamp: Start datetime or ISO format string. Strategies will begin trading from this point.

        """
        if isinstance(timestamp, str):
            timestamp = datetime.fromisoformat(timestamp)
        self.backtest_start_timestamp = timestamp

    def set_backtest_end_timestamp(self, timestamp: datetime | str) -> None:
        """Set the end timestamp for the backtest period.

        Args:
            timestamp: End datetime or ISO format string. Backtest will stop at this point.

        """
        if isinstance(timestamp, str):
            timestamp = datetime.fromisoformat(timestamp)
        self.backtest_end_timestamp = timestamp

    def get_timestamps(self) -> list[datetime]:
        """Return all unique timestamps from loaded market data, sorted chronologically.

        Returns:
            Sorted list of datetime objects representing all data points in the backtest.

        """
        timestamps: set[datetime] = set()
        for events in self._data.values():
            timestamps.update(event.timestamp for event in events)
        return sorted(timestamps)

    def run(self, is_backtest: bool = True) -> None:
        """Run the backtest simulation.

        Load all market data for symbols in the universe, publishes events chronologically
        through the EventBus, and runs all analyzers after completion.

        Events are processed through a priority queue to ensure correct ordering:
        1. All MarketDataEvents are loaded and added to the queue
        2. The queue processes events in timestamp order
        3. When strategies generate OrderEvents, they're added to the queue
        4. When brokers generate FillEvents, they're added to the queue
        5. All events at timestamp T are fully processed before moving to T+1

        Args:
            is_backtest: Whether to run in backtest mode. Live trading not yet implemented.

        Raises:
            NotImplementedError: If is_backtest is False (live trading not supported).
            ValueError: If data_feed is not set before running.

        """
        if is_backtest:
            if self.data_feed is None:
                raise ValueError("Data feed must be set before running backtest")

            # Enable queue mode for proper event ordering
            self.event_bus.enable_queue_mode()

            # Load all market data events
            events: list[MarketDataEvent] = []
            for symbol in self.universe:
                events.extend(
                    list(
                        self.data_feed.run(
                            symbol,
                            self.data_start_timestamp or self.backtest_start_timestamp,
                            self.backtest_end_timestamp,
                        )
                    )
                )

            # Sort and store events for price lookups
            events = sorted(events)
            for event in events:
                self._data[event.symbol].append(event)

            # Add all market data events to the queue
            for event in events:
                self.event_bus.publish(Topic.MARKET_DATA, event)

            # Process all events in chronological order
            # This will handle market data, orders, and fills in the correct sequence
            self.event_bus.process_queue()

            # Disable queue mode after backtest
            self.event_bus.disable_queue_mode()

            # Run analyzers after backtest completion
            for analyzer in self.analyzers:
                logger.info("Running analyzer %s", analyzer)
                analyzer.run()
        else:
            raise NotImplementedError

    def get_price(self, symbol: str, timestamp: datetime) -> float:
        """Get the closing price for a symbol at or after a specific timestamp.

        Args:
            symbol: The ticker symbol.
            timestamp: The timestamp to look up the price for.

        Returns:
            The closing price at or after the given timestamp.

        Raises:
            ValueError: If no price data exists after the timestamp.

        """
        for event in self._data[symbol]:
            if event.timestamp >= timestamp:
                return event.close
        if self.on_missing_price == "raise":
            raise ValueError(f"No price data for symbol {symbol} after timestamp {timestamp}")
        elif self.on_missing_price == "warn":
            logger.warning(f"No price data for symbol {symbol} after timestamp {timestamp}")
        return 0.0

__init__(*, on_missing_price='raise')

Initialize the AlphaFlow backtest engine.

Parameters:

Name Type Description Default
on_missing_price str

Behavior when price data is missing. Options are: - "raise": Raise an error (default) - "warn": Log a warning and return zero price - "ignore": Silently return zero price

'raise'
Source code in alphaflow/__init__.py
def __init__(self, *, on_missing_price: str = "raise") -> None:
    """Initialize the AlphaFlow backtest engine.

    Args:
        on_missing_price: Behavior when price data is missing. Options are:
            - "raise": Raise an error (default)
            - "warn": Log a warning and return zero price
            - "ignore": Silently return zero price

    """
    if on_missing_price not in ("raise", "warn", "ignore"):
        raise ValueError("on_missing_price must be 'raise', 'warn', or 'ignore'")
    self.on_missing_price = on_missing_price
    self.event_bus = EventBus()
    self.portfolio = Portfolio(self)
    self.strategies: list[Strategy] = []
    self.analyzers: list[Analyzer] = []
    self.universe: set[str] = set()
    self.data_feed: DataFeed | None = None
    self.broker: Broker | None = None
    self.benchmark: str | None = None
    self._data: dict[str, list[MarketDataEvent]] = defaultdict(list)
    self.data_start_timestamp: datetime | None = None
    self.backtest_start_timestamp: datetime | None = None
    self.backtest_end_timestamp: datetime | None = None
    for topic in self.portfolio.topic_subscriptions():
        self.event_bus.subscribe(topic, self.portfolio)

set_benchmark(symbol)

Set the benchmark symbol for performance comparison.

Parameters:

Name Type Description Default
symbol str

The ticker symbol to use as a benchmark (e.g., "SPY").

required
Source code in alphaflow/__init__.py
def set_benchmark(self, symbol: str) -> None:
    """Set the benchmark symbol for performance comparison.

    Args:
        symbol: The ticker symbol to use as a benchmark (e.g., "SPY").

    """
    self.universe.add(symbol)
    self.benchmark = symbol

add_equity(symbol)

Add an equity symbol to the trading universe.

Parameters:

Name Type Description Default
symbol str

The ticker symbol to add (e.g., "AAPL").

required
Source code in alphaflow/__init__.py
def add_equity(self, symbol: str) -> None:
    """Add an equity symbol to the trading universe.

    Args:
        symbol: The ticker symbol to add (e.g., "AAPL").

    """
    self.universe.add(symbol)

set_data_feed(data_feed)

Set the data feed for retrieving market data.

Parameters:

Name Type Description Default
data_feed DataFeed

A DataFeed instance that will provide market data.

required
Source code in alphaflow/__init__.py
def set_data_feed(self, data_feed: DataFeed) -> None:
    """Set the data feed for retrieving market data.

    Args:
        data_feed: A DataFeed instance that will provide market data.

    """
    data_feed.set_alpha_flow(self)
    self.data_feed = data_feed

add_strategy(strategy)

Add a trading strategy to the backtest.

Parameters:

Name Type Description Default
strategy Strategy

A Strategy instance that will generate trading signals.

required
Source code in alphaflow/__init__.py
def add_strategy(self, strategy: Strategy) -> None:
    """Add a trading strategy to the backtest.

    Args:
        strategy: A Strategy instance that will generate trading signals.

    """
    strategy.set_alpha_flow(self)
    for topic in strategy.topic_subscriptions():
        self.event_bus.subscribe(topic, strategy)
    self.strategies.append(strategy)

add_analyzer(analyzer)

Add an analyzer for performance metrics and visualization.

Parameters:

Name Type Description Default
analyzer Analyzer

An Analyzer instance for computing metrics and generating reports.

required
Source code in alphaflow/__init__.py
def add_analyzer(self, analyzer: Analyzer) -> None:
    """Add an analyzer for performance metrics and visualization.

    Args:
        analyzer: An Analyzer instance for computing metrics and generating reports.

    """
    analyzer.set_alpha_flow(self)
    for topic in analyzer.topic_subscriptions():
        self.event_bus.subscribe(topic, analyzer)
    self.analyzers.append(analyzer)

set_broker(broker)

Set the broker for order execution simulation.

Parameters:

Name Type Description Default
broker Broker

A Broker instance that will simulate order execution.

required
Source code in alphaflow/__init__.py
def set_broker(self, broker: Broker) -> None:
    """Set the broker for order execution simulation.

    Args:
        broker: A Broker instance that will simulate order execution.

    """
    broker.set_alpha_flow(self)
    for topic in broker.topic_subscriptions():
        self.event_bus.subscribe(topic, broker)
    self.broker = broker

set_cash(cash)

Set the initial cash balance for the portfolio.

Parameters:

Name Type Description Default
cash float

The initial cash amount in the portfolio currency.

required
Source code in alphaflow/__init__.py
def set_cash(self, cash: float) -> None:
    """Set the initial cash balance for the portfolio.

    Args:
        cash: The initial cash amount in the portfolio currency.

    """
    self.portfolio.set_cash(cash)

set_data_start_timestamp(timestamp)

Set the start timestamp for loading data.

Parameters:

Name Type Description Default
timestamp datetime | str

Start datetime or ISO format string. Data will be loaded from this point.

required
Source code in alphaflow/__init__.py
def set_data_start_timestamp(self, timestamp: datetime | str) -> None:
    """Set the start timestamp for loading data.

    Args:
        timestamp: Start datetime or ISO format string. Data will be loaded from this point.

    """
    if isinstance(timestamp, str):
        timestamp = datetime.fromisoformat(timestamp)
    self.data_start_timestamp = timestamp

set_backtest_start_timestamp(timestamp)

Set the start timestamp for the backtest period.

Parameters:

Name Type Description Default
timestamp datetime | str

Start datetime or ISO format string. Strategies will begin trading from this point.

required
Source code in alphaflow/__init__.py
def set_backtest_start_timestamp(self, timestamp: datetime | str) -> None:
    """Set the start timestamp for the backtest period.

    Args:
        timestamp: Start datetime or ISO format string. Strategies will begin trading from this point.

    """
    if isinstance(timestamp, str):
        timestamp = datetime.fromisoformat(timestamp)
    self.backtest_start_timestamp = timestamp

set_backtest_end_timestamp(timestamp)

Set the end timestamp for the backtest period.

Parameters:

Name Type Description Default
timestamp datetime | str

End datetime or ISO format string. Backtest will stop at this point.

required
Source code in alphaflow/__init__.py
def set_backtest_end_timestamp(self, timestamp: datetime | str) -> None:
    """Set the end timestamp for the backtest period.

    Args:
        timestamp: End datetime or ISO format string. Backtest will stop at this point.

    """
    if isinstance(timestamp, str):
        timestamp = datetime.fromisoformat(timestamp)
    self.backtest_end_timestamp = timestamp

get_timestamps()

Return all unique timestamps from loaded market data, sorted chronologically.

Returns:

Type Description
list[datetime]

Sorted list of datetime objects representing all data points in the backtest.

Source code in alphaflow/__init__.py
def get_timestamps(self) -> list[datetime]:
    """Return all unique timestamps from loaded market data, sorted chronologically.

    Returns:
        Sorted list of datetime objects representing all data points in the backtest.

    """
    timestamps: set[datetime] = set()
    for events in self._data.values():
        timestamps.update(event.timestamp for event in events)
    return sorted(timestamps)

run(is_backtest=True)

Run the backtest simulation.

Load all market data for symbols in the universe, publishes events chronologically through the EventBus, and runs all analyzers after completion.

Events are processed through a priority queue to ensure correct ordering: 1. All MarketDataEvents are loaded and added to the queue 2. The queue processes events in timestamp order 3. When strategies generate OrderEvents, they're added to the queue 4. When brokers generate FillEvents, they're added to the queue 5. All events at timestamp T are fully processed before moving to T+1

Parameters:

Name Type Description Default
is_backtest bool

Whether to run in backtest mode. Live trading not yet implemented.

True

Raises:

Type Description
NotImplementedError

If is_backtest is False (live trading not supported).

ValueError

If data_feed is not set before running.

Source code in alphaflow/__init__.py
def run(self, is_backtest: bool = True) -> None:
    """Run the backtest simulation.

    Load all market data for symbols in the universe, publishes events chronologically
    through the EventBus, and runs all analyzers after completion.

    Events are processed through a priority queue to ensure correct ordering:
    1. All MarketDataEvents are loaded and added to the queue
    2. The queue processes events in timestamp order
    3. When strategies generate OrderEvents, they're added to the queue
    4. When brokers generate FillEvents, they're added to the queue
    5. All events at timestamp T are fully processed before moving to T+1

    Args:
        is_backtest: Whether to run in backtest mode. Live trading not yet implemented.

    Raises:
        NotImplementedError: If is_backtest is False (live trading not supported).
        ValueError: If data_feed is not set before running.

    """
    if is_backtest:
        if self.data_feed is None:
            raise ValueError("Data feed must be set before running backtest")

        # Enable queue mode for proper event ordering
        self.event_bus.enable_queue_mode()

        # Load all market data events
        events: list[MarketDataEvent] = []
        for symbol in self.universe:
            events.extend(
                list(
                    self.data_feed.run(
                        symbol,
                        self.data_start_timestamp or self.backtest_start_timestamp,
                        self.backtest_end_timestamp,
                    )
                )
            )

        # Sort and store events for price lookups
        events = sorted(events)
        for event in events:
            self._data[event.symbol].append(event)

        # Add all market data events to the queue
        for event in events:
            self.event_bus.publish(Topic.MARKET_DATA, event)

        # Process all events in chronological order
        # This will handle market data, orders, and fills in the correct sequence
        self.event_bus.process_queue()

        # Disable queue mode after backtest
        self.event_bus.disable_queue_mode()

        # Run analyzers after backtest completion
        for analyzer in self.analyzers:
            logger.info("Running analyzer %s", analyzer)
            analyzer.run()
    else:
        raise NotImplementedError

get_price(symbol, timestamp)

Get the closing price for a symbol at or after a specific timestamp.

Parameters:

Name Type Description Default
symbol str

The ticker symbol.

required
timestamp datetime

The timestamp to look up the price for.

required

Returns:

Type Description
float

The closing price at or after the given timestamp.

Raises:

Type Description
ValueError

If no price data exists after the timestamp.

Source code in alphaflow/__init__.py
def get_price(self, symbol: str, timestamp: datetime) -> float:
    """Get the closing price for a symbol at or after a specific timestamp.

    Args:
        symbol: The ticker symbol.
        timestamp: The timestamp to look up the price for.

    Returns:
        The closing price at or after the given timestamp.

    Raises:
        ValueError: If no price data exists after the timestamp.

    """
    for event in self._data[symbol]:
        if event.timestamp >= timestamp:
            return event.close
    if self.on_missing_price == "raise":
        raise ValueError(f"No price data for symbol {symbol} after timestamp {timestamp}")
    elif self.on_missing_price == "warn":
        logger.warning(f"No price data for symbol {symbol} after timestamp {timestamp}")
    return 0.0

Strategy

Strategy

Bases: Subscriber

Defines the interface for strategies.

Source code in alphaflow/__init__.py
class Strategy(Subscriber):
    """Defines the interface for strategies."""

    def topic_subscriptions(self) -> list[Topic]:
        """Return the topics to subscribe to."""
        raise NotImplementedError

    def set_alpha_flow(self, alpha_flow: AlphaFlow) -> None:
        """Set the AlphaFlow instance for this strategy.

        Args:
            alpha_flow: The AlphaFlow backtest engine instance.

        """
        self._alpha_flow = alpha_flow

    def read_event(self, event: Event) -> None:
        """Read and process the event."""
        raise NotImplementedError

topic_subscriptions()

Return the topics to subscribe to.

Source code in alphaflow/__init__.py
def topic_subscriptions(self) -> list[Topic]:
    """Return the topics to subscribe to."""
    raise NotImplementedError

set_alpha_flow(alpha_flow)

Set the AlphaFlow instance for this strategy.

Parameters:

Name Type Description Default
alpha_flow AlphaFlow

The AlphaFlow backtest engine instance.

required
Source code in alphaflow/__init__.py
def set_alpha_flow(self, alpha_flow: AlphaFlow) -> None:
    """Set the AlphaFlow instance for this strategy.

    Args:
        alpha_flow: The AlphaFlow backtest engine instance.

    """
    self._alpha_flow = alpha_flow

read_event(event)

Read and process the event.

Source code in alphaflow/__init__.py
def read_event(self, event: Event) -> None:
    """Read and process the event."""
    raise NotImplementedError

Broker

Broker

Bases: Subscriber

Defines the interface for brokers.

Source code in alphaflow/__init__.py
class Broker(Subscriber):
    """Defines the interface for brokers."""

    def topic_subscriptions(self) -> list[Topic]:
        """Return the topics to subscribe to."""
        return [Topic.ORDER]

    def set_alpha_flow(self, alpha_flow: AlphaFlow) -> None:
        """Set the AlphaFlow instance for this broker.

        Args:
            alpha_flow: The AlphaFlow backtest engine instance.

        """
        self._alpha_flow = alpha_flow

    def read_event(self, event: Event) -> None:
        """Read and process the event."""
        raise NotImplementedError

topic_subscriptions()

Return the topics to subscribe to.

Source code in alphaflow/__init__.py
def topic_subscriptions(self) -> list[Topic]:
    """Return the topics to subscribe to."""
    return [Topic.ORDER]

set_alpha_flow(alpha_flow)

Set the AlphaFlow instance for this broker.

Parameters:

Name Type Description Default
alpha_flow AlphaFlow

The AlphaFlow backtest engine instance.

required
Source code in alphaflow/__init__.py
def set_alpha_flow(self, alpha_flow: AlphaFlow) -> None:
    """Set the AlphaFlow instance for this broker.

    Args:
        alpha_flow: The AlphaFlow backtest engine instance.

    """
    self._alpha_flow = alpha_flow

read_event(event)

Read and process the event.

Source code in alphaflow/__init__.py
def read_event(self, event: Event) -> None:
    """Read and process the event."""
    raise NotImplementedError

DataFeed

DataFeed

Defines the interface for data feeds.

Source code in alphaflow/__init__.py
class DataFeed:
    """Defines the interface for data feeds."""

    def set_alpha_flow(self, alpha_flow: AlphaFlow) -> None:
        """Set the AlphaFlow instance for this data feed.

        Args:
            alpha_flow: The AlphaFlow backtest engine instance.

        """
        self._alpha_flow = alpha_flow

    def run(
        self,
        symbol: str,
        start_timestamp: datetime | None,
        end_timestamp: datetime | None,
    ) -> Generator[MarketDataEvent, None, None]:
        """Run the data feed."""
        raise NotImplementedError

set_alpha_flow(alpha_flow)

Set the AlphaFlow instance for this data feed.

Parameters:

Name Type Description Default
alpha_flow AlphaFlow

The AlphaFlow backtest engine instance.

required
Source code in alphaflow/__init__.py
def set_alpha_flow(self, alpha_flow: AlphaFlow) -> None:
    """Set the AlphaFlow instance for this data feed.

    Args:
        alpha_flow: The AlphaFlow backtest engine instance.

    """
    self._alpha_flow = alpha_flow

run(symbol, start_timestamp, end_timestamp)

Run the data feed.

Source code in alphaflow/__init__.py
def run(
    self,
    symbol: str,
    start_timestamp: datetime | None,
    end_timestamp: datetime | None,
) -> Generator[MarketDataEvent, None, None]:
    """Run the data feed."""
    raise NotImplementedError

Portfolio

Portfolio

Bases: Subscriber

Manages portfolio state including cash, positions, and performance calculations.

Source code in alphaflow/__init__.py
class Portfolio(Subscriber):
    """Manages portfolio state including cash, positions, and performance calculations."""

    def __init__(self, alpha_flow: AlphaFlow):
        """Initialize the portfolio.

        Args:
            alpha_flow: The AlphaFlow backtest engine instance.

        """
        self._alpha_flow = alpha_flow
        self._cash: float = 0.0
        self.positions: dict[str, float] = {}

    def topic_subscriptions(self) -> list[Topic]:
        """Return the topics to subscribe to."""
        return [Topic.FILL]

    def set_cash(self, cash: float) -> None:
        """Set the cash balance.

        Args:
            cash: The cash amount to set.

        """
        self._cash = cash

    def get_cash(self) -> float:
        """Get the current cash balance.

        Returns:
            The current cash balance.

        """
        return self._cash

    def get_position(self, symbol: str) -> float:
        """Get the current position quantity for a symbol.

        Args:
            symbol: The ticker symbol.

        Returns:
            The number of shares held (0 if no position).

        """
        return self.positions.get(symbol, 0.0)

    def update_cash(self, amount: float) -> None:
        """Update the cash balance by adding an amount.

        Args:
            amount: The amount to add (can be negative).

        """
        self._cash += amount

    def update_position(self, symbol: str, qty: float) -> None:
        """Update the position quantity for a symbol.

        Args:
            symbol: The ticker symbol.
            qty: The quantity to add (can be negative for sells).

        """
        self.positions[symbol] = self.get_position(symbol) + qty

    def get_position_value(self, symbol: str, timestamp: datetime) -> float:
        """Get the market value of a position at a specific timestamp.

        Args:
            symbol: The ticker symbol.
            timestamp: The timestamp for price lookup.

        Returns:
            The position value (shares * price).

        """
        return self.get_position(symbol) * self._alpha_flow.get_price(symbol, timestamp)

    def get_positions_value(self, timestamp: datetime) -> float:
        """Get the total market value of all positions at a specific timestamp.

        Args:
            timestamp: The timestamp for price lookup.

        Returns:
            The total value of all positions.

        """
        return sum(self.get_position_value(symbol, timestamp) for symbol in self.positions)

    def get_portfolio_value(self, timestamp: datetime) -> float:
        """Get the total portfolio value (cash + positions) at a specific timestamp.

        Args:
            timestamp: The timestamp for price lookup.

        Returns:
            The total portfolio value.

        """
        return self._cash + self.get_positions_value(timestamp)

    def get_buying_power(self, margin: float, timestamp: datetime) -> float:
        """Calculate available buying power with margin.

        Args:
            margin: The margin multiplier (e.g., 2.0 for 2x margin).
            timestamp: The timestamp for price lookup.

        Returns:
            The available buying power.

        """
        return self.get_portfolio_value(timestamp) * margin - self.get_positions_value(timestamp)

    def get_benchmark_values(self) -> dict[datetime, float]:
        """Get benchmark prices for all timestamps in the backtest.

        Returns:
            Dictionary mapping timestamps to benchmark prices.

        """
        if self._alpha_flow.benchmark is None:
            return {}
        return {
            timestamp: self._alpha_flow.get_price(self._alpha_flow.benchmark, timestamp)
            for timestamp in self._alpha_flow.get_timestamps()
        }

    def read_event(self, event: Event) -> None:
        """Read and process the event."""
        if not isinstance(event, FillEvent):
            return

        cost = event.fill_price * event.fill_qty  # Can be positive or negative
        self.update_cash(-cost - event.commission)  # Deduct commission on all trades
        self.update_position(event.symbol, event.fill_qty)

__init__(alpha_flow)

Initialize the portfolio.

Parameters:

Name Type Description Default
alpha_flow AlphaFlow

The AlphaFlow backtest engine instance.

required
Source code in alphaflow/__init__.py
def __init__(self, alpha_flow: AlphaFlow):
    """Initialize the portfolio.

    Args:
        alpha_flow: The AlphaFlow backtest engine instance.

    """
    self._alpha_flow = alpha_flow
    self._cash: float = 0.0
    self.positions: dict[str, float] = {}

topic_subscriptions()

Return the topics to subscribe to.

Source code in alphaflow/__init__.py
def topic_subscriptions(self) -> list[Topic]:
    """Return the topics to subscribe to."""
    return [Topic.FILL]

set_cash(cash)

Set the cash balance.

Parameters:

Name Type Description Default
cash float

The cash amount to set.

required
Source code in alphaflow/__init__.py
def set_cash(self, cash: float) -> None:
    """Set the cash balance.

    Args:
        cash: The cash amount to set.

    """
    self._cash = cash

get_cash()

Get the current cash balance.

Returns:

Type Description
float

The current cash balance.

Source code in alphaflow/__init__.py
def get_cash(self) -> float:
    """Get the current cash balance.

    Returns:
        The current cash balance.

    """
    return self._cash

get_position(symbol)

Get the current position quantity for a symbol.

Parameters:

Name Type Description Default
symbol str

The ticker symbol.

required

Returns:

Type Description
float

The number of shares held (0 if no position).

Source code in alphaflow/__init__.py
def get_position(self, symbol: str) -> float:
    """Get the current position quantity for a symbol.

    Args:
        symbol: The ticker symbol.

    Returns:
        The number of shares held (0 if no position).

    """
    return self.positions.get(symbol, 0.0)

update_cash(amount)

Update the cash balance by adding an amount.

Parameters:

Name Type Description Default
amount float

The amount to add (can be negative).

required
Source code in alphaflow/__init__.py
def update_cash(self, amount: float) -> None:
    """Update the cash balance by adding an amount.

    Args:
        amount: The amount to add (can be negative).

    """
    self._cash += amount

update_position(symbol, qty)

Update the position quantity for a symbol.

Parameters:

Name Type Description Default
symbol str

The ticker symbol.

required
qty float

The quantity to add (can be negative for sells).

required
Source code in alphaflow/__init__.py
def update_position(self, symbol: str, qty: float) -> None:
    """Update the position quantity for a symbol.

    Args:
        symbol: The ticker symbol.
        qty: The quantity to add (can be negative for sells).

    """
    self.positions[symbol] = self.get_position(symbol) + qty

get_position_value(symbol, timestamp)

Get the market value of a position at a specific timestamp.

Parameters:

Name Type Description Default
symbol str

The ticker symbol.

required
timestamp datetime

The timestamp for price lookup.

required

Returns:

Type Description
float

The position value (shares * price).

Source code in alphaflow/__init__.py
def get_position_value(self, symbol: str, timestamp: datetime) -> float:
    """Get the market value of a position at a specific timestamp.

    Args:
        symbol: The ticker symbol.
        timestamp: The timestamp for price lookup.

    Returns:
        The position value (shares * price).

    """
    return self.get_position(symbol) * self._alpha_flow.get_price(symbol, timestamp)

get_positions_value(timestamp)

Get the total market value of all positions at a specific timestamp.

Parameters:

Name Type Description Default
timestamp datetime

The timestamp for price lookup.

required

Returns:

Type Description
float

The total value of all positions.

Source code in alphaflow/__init__.py
def get_positions_value(self, timestamp: datetime) -> float:
    """Get the total market value of all positions at a specific timestamp.

    Args:
        timestamp: The timestamp for price lookup.

    Returns:
        The total value of all positions.

    """
    return sum(self.get_position_value(symbol, timestamp) for symbol in self.positions)

get_portfolio_value(timestamp)

Get the total portfolio value (cash + positions) at a specific timestamp.

Parameters:

Name Type Description Default
timestamp datetime

The timestamp for price lookup.

required

Returns:

Type Description
float

The total portfolio value.

Source code in alphaflow/__init__.py
def get_portfolio_value(self, timestamp: datetime) -> float:
    """Get the total portfolio value (cash + positions) at a specific timestamp.

    Args:
        timestamp: The timestamp for price lookup.

    Returns:
        The total portfolio value.

    """
    return self._cash + self.get_positions_value(timestamp)

get_buying_power(margin, timestamp)

Calculate available buying power with margin.

Parameters:

Name Type Description Default
margin float

The margin multiplier (e.g., 2.0 for 2x margin).

required
timestamp datetime

The timestamp for price lookup.

required

Returns:

Type Description
float

The available buying power.

Source code in alphaflow/__init__.py
def get_buying_power(self, margin: float, timestamp: datetime) -> float:
    """Calculate available buying power with margin.

    Args:
        margin: The margin multiplier (e.g., 2.0 for 2x margin).
        timestamp: The timestamp for price lookup.

    Returns:
        The available buying power.

    """
    return self.get_portfolio_value(timestamp) * margin - self.get_positions_value(timestamp)

get_benchmark_values()

Get benchmark prices for all timestamps in the backtest.

Returns:

Type Description
dict[datetime, float]

Dictionary mapping timestamps to benchmark prices.

Source code in alphaflow/__init__.py
def get_benchmark_values(self) -> dict[datetime, float]:
    """Get benchmark prices for all timestamps in the backtest.

    Returns:
        Dictionary mapping timestamps to benchmark prices.

    """
    if self._alpha_flow.benchmark is None:
        return {}
    return {
        timestamp: self._alpha_flow.get_price(self._alpha_flow.benchmark, timestamp)
        for timestamp in self._alpha_flow.get_timestamps()
    }

read_event(event)

Read and process the event.

Source code in alphaflow/__init__.py
def read_event(self, event: Event) -> None:
    """Read and process the event."""
    if not isinstance(event, FillEvent):
        return

    cost = event.fill_price * event.fill_qty  # Can be positive or negative
    self.update_cash(-cost - event.commission)  # Deduct commission on all trades
    self.update_position(event.symbol, event.fill_qty)

Analyzer

Analyzer

Bases: Subscriber

Defines the interface for analyzers.

Source code in alphaflow/__init__.py
class Analyzer(Subscriber):
    """Defines the interface for analyzers."""

    def topic_subscriptions(self) -> list[Topic]:
        """Return the topics to subscribe to."""
        raise NotImplementedError

    def set_alpha_flow(self, alpha_flow: AlphaFlow) -> None:
        """Set the AlphaFlow instance for this analyzer.

        Args:
            alpha_flow: The AlphaFlow backtest engine instance.

        """
        self._alpha_flow = alpha_flow

    def read_event(self, event: Event) -> None:
        """Read and process the event."""
        raise NotImplementedError

    def run(self) -> None:
        """Run the analyzer."""
        raise NotImplementedError

topic_subscriptions()

Return the topics to subscribe to.

Source code in alphaflow/__init__.py
def topic_subscriptions(self) -> list[Topic]:
    """Return the topics to subscribe to."""
    raise NotImplementedError

set_alpha_flow(alpha_flow)

Set the AlphaFlow instance for this analyzer.

Parameters:

Name Type Description Default
alpha_flow AlphaFlow

The AlphaFlow backtest engine instance.

required
Source code in alphaflow/__init__.py
def set_alpha_flow(self, alpha_flow: AlphaFlow) -> None:
    """Set the AlphaFlow instance for this analyzer.

    Args:
        alpha_flow: The AlphaFlow backtest engine instance.

    """
    self._alpha_flow = alpha_flow

read_event(event)

Read and process the event.

Source code in alphaflow/__init__.py
def read_event(self, event: Event) -> None:
    """Read and process the event."""
    raise NotImplementedError

run()

Run the analyzer.

Source code in alphaflow/__init__.py
def run(self) -> None:
    """Run the analyzer."""
    raise NotImplementedError