Data
DataProcessor
Custom processing logic for multi-source alphas.
DataProcessor
DataProcessor is a base class whose job is to join multiple data sources from a Datamap into a single
pl.DataFrame suitable for passing to Alpha.next().
The default implementation performs an inner join on start_time across all DataInfo entries and validates the
result. You can subclass it to add custom alignment, resampling, or feature engineering:
from adrs.data import DataInfo, DataColumn, Datamap, DataProcessor
import polars as pl
class MyProcessor(DataProcessor):
def process(
self,
datamap: Datamap,
last_closed_time: datetime | None = None,
) -> pl.DataFrame | None:
# Fetch each raw DataFrame from the datamap
df_a = datamap.get(self.data_infos[0])
df_b = datamap.get(self.data_infos[1])
# Join on timestamp, resample, feature-engineer, etc.
merged = df_a.join(df_b, on="start_time", how="inner")
return mergedPass your processor when constructing the Alpha:
class MyAlpha(Alpha):
def __init__(self):
super().__init__(
id="my_alpha",
data_infos=[info_a, info_b],
data_processor=MyProcessor(data_infos=[info_a, info_b]),
)Method reference
def process(
self,
datamap: Datamap,
last_closed_time: datetime | None = None,
) -> pl.DataFrame | None| Parameter | Type | Description |
|---|---|---|
datamap | Datamap | Populated data store |
last_closed_time | datetime | None | Latest closed candle time (used in live / streaming scenarios) |
Returns a pl.DataFrame ready for Alpha.next(), or None if processing fails.
Tip — vectorised joins
Polars join operations are fully vectorised. Prefer join over Python-level loops when merging DataFrames for
maximum performance.
Balaena Quant