Data
Datamap
Reusable in-memory data store.
Datamap
Datamap is an in-memory container that maps each DataInfo to its fetched pl.DataFrame. After initialisation,
it acts as the single source of truth that your DataProcessor and Alpha read from.
Initialisation
from datetime import datetime, timedelta
from adrs import DataLoader
from adrs.data import DataInfo, DataColumn, Datamap
datamap = Datamap()
await datamap.init(
dataloader=dataloader,
infos=alpha.data_infos,
start_time=start_time,
end_time=end_time,
)If your Evaluator needs its own price data you must load it into the same Datamap too — typically with a small
time extension so there is price data available for the last candle's close:
await datamap.init(
dataloader=dataloader,
infos=list(evaluator.assets.values()),
start_time=start_time,
end_time=end_time + timedelta(days=1), # (+1 day so the evaluator has price for every signal)
)make_datamap (convenience helper)
For multi-asset workflows, make_datamap handles all of the above in one call:
from adrs.data import make_datamap
datamap = await make_datamap(
dataloader=dataloader,
data_infos=alpha.data_infos,
start_time=start_time,
end_time=end_time,
evaluator=evaluator, # optional — automatically loads price data too
evaluator_offset=timedelta(days=1), # default
)| Parameter | Type | Default | Description |
|---|---|---|---|
dataloader | DataLoader | — | Data source |
data_infos | list[DataInfo] | — | Topics to load |
start_time | datetime | — | Range start |
end_time | datetime | — | Range end |
evaluator | Evaluator | None | None | If provided, loads evaluator price data automatically |
evaluator_offset | timedelta | timedelta(days=1) | Extra time appended to end_time for evaluator data |
Dict-like interface
Datamap exposes a dict-like API for direct access:
df = datamap.get(some_data_info) # retrieve DataFrame for a DataInfo
datamap.is_ready() # True once all infos are loaded
# iterate
for info, df in datamap.items():
print(info.topic, df.shape)Serialisation
Datamaps can be serialised to bytes (via Apache Arrow IPC) for caching or sharing between processes:
raw: bytes = datamap.write_ipc()
# later / elsewhere
restored: Datamap = Datamap.read_ipc(raw)
Balaena Quant