Balaena Quant's LogoBalaena Quant
Data

DataProcessor

Custom processing logic for multi-source alphas.

DataProcessor

DataProcessor is a base class whose job is to join multiple data sources from a Datamap into a single pl.DataFrame suitable for passing to Alpha.next().

The default implementation performs an inner join on start_time across all DataInfo entries and validates the result. You can subclass it to add custom alignment, resampling, or feature engineering:

from adrs.data import DataInfo, DataColumn, Datamap, DataProcessor
import polars as pl

class MyProcessor(DataProcessor):
    def process(
        self,
        datamap: Datamap,
        last_closed_time: datetime | None = None,
    ) -> pl.DataFrame | None:
        # Fetch each raw DataFrame from the datamap
        df_a = datamap.get(self.data_infos[0])
        df_b = datamap.get(self.data_infos[1])

        # Join on timestamp, resample, feature-engineer, etc.
        merged = df_a.join(df_b, on="start_time", how="inner")
        return merged

Pass your processor when constructing the Alpha:

class MyAlpha(Alpha):
    def __init__(self):
        super().__init__(
            id="my_alpha",
            data_infos=[info_a, info_b],
            data_processor=MyProcessor(data_infos=[info_a, info_b]),
        )

Method reference

def process(
    self,
    datamap: Datamap,
    last_closed_time: datetime | None = None,
) -> pl.DataFrame | None
ParameterTypeDescription
datamapDatamapPopulated data store
last_closed_timedatetime | NoneLatest closed candle time (used in live / streaming scenarios)

Returns a pl.DataFrame ready for Alpha.next(), or None if processing fails.

Tip — vectorised joins

Polars join operations are fully vectorised. Prefer join over Python-level loops when merging DataFrames for maximum performance.

On this page