Skip to content

Commit 9d494cd

Browse files
committed
feat: store backends (HDF5/Parquet/Zarr), store_format in manifest, migration (v1.1.0)
- io/backends: backend registry (hdf5, parquet, zarr), get_backend, register_backend - Survey: zarr_file_path, fill_store/get_values for zarr; build-collection --zarr - Table: delegate write/read to backends via _get_store_path_and_format - Manifest: store_format (hdf5|parquet|zarr) at dataset level; load applies it and sets store paths - Migration script: infer store_format from legacy JSON and write in manifest - Docs: ZARR-BACKEND.md, RFC-002 store_format example and migration note - Changelog 1.1.0, pyproject 1.1.0 Made-with: Cursor
1 parent daa5978 commit 9d494cd

File tree

14 files changed

+905
-35
lines changed

14 files changed

+905
-35
lines changed

CHANGELOG.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,21 @@
11
# Changelog
22

3+
# 1.1.0
4+
5+
* **Store backends** (choix du format de stockage des tables)
6+
- **io/backends**: Backends HDF5, Parquet et Zarr (abstraction `StoreBackend`) ; `get_backend(name)`, `get_available_backend_names()`, `register_backend()` pour étendre.
7+
- **Zarr** : backend optionnel (`pip install openfisca-survey-manager[zarr]`) ; une table = un groupe zarr dans un répertoire `.zarr` par survey.
8+
- **Survey** : attribut `zarr_file_path` ; `fill_store(store_format="zarr")` et lecture via `get_values` pour zarr.
9+
- **Table** : écriture/lecture et `_is_stored` délégués aux backends ; `_get_store_path_and_format()` unifie les chemins.
10+
- **build-collection** : option `--zarr` en plus de `--parquet` ; défaut HDF5 avec avertissement.
11+
- **Docs** : `docs/ZARR-BACKEND.md` (utilisation Zarr, compression, parallélisation).
12+
13+
* **Manifest (RFC-002) : store_format**
14+
- **manifest.yaml** : clé optionnelle `store_format` (hdf5, parquet, zarr) au niveau dataset ; par défaut `parquet` au chargement.
15+
- **SurveyCollection.load** : depuis un manifest, applique `store_format` et déduit les chemins de store (`hdf5_file_path`, `parquet_file_path`, `zarr_file_path`) à partir de `default_output_dir`.
16+
- **Script de migration** : infère `store_format` depuis le JSON legacy (`parquet_file_path` / `zarr_file_path` / `hdf5_file_path`) et l’écrit dans le manifest généré.
17+
- **RFC-002** : exemple de manifest avec `store_format` ; section 3.5 et 4.2 mises à jour.
18+
319
# 1.0.0
420

521
* **Breaking**: Version 1.0 — retrait des ré-exports et des DeprecationWarning

docs/RFC-002-METADATA-AND-CONFIG.md

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ collections_dir/
115115
name: erfs
116116
label: "Enquête Revenus Fiscaux et Sociaux"
117117

118+
# Backend de stockage des tables (hdf5, parquet, zarr) ; par défaut parquet
119+
store_format: parquet
120+
118121
# Par survey : sources brutes (remplace raw_data.ini + informations)
119122
surveys:
120123
erfs_2019:
@@ -147,7 +150,22 @@ On **ne** résout plus le répertoire en fonction de la présence de `taxipp` ou
147150
- soit définir `OPENFISCA_SURVEY_CONFIG_DIR` vers leur répertoire,
148151
- soit passer le chemin de config à chaque appel.
149152

150-
### 3.5 API cible (alignement RFC-001)
153+
### 3.5 Backends de stockage (store)
154+
155+
Le stockage des tables d’enquête peut s’effectuer via différents **backends** (choix au build / `fill_store`) :
156+
157+
| Backend | Format | Usage |
158+
|----------|---------------------|--------------------------------------------|
159+
| **hdf5** | Un fichier .h5 | Historique (déprécié à terme) |
160+
| **parquet** | Répertoire, un .parquet par table | Recommandé (interop, colonnes) |
161+
| **zarr** | Répertoire .zarr, un groupe par table | Optionnel (dépendance `[zarr]`) |
162+
163+
- **API** : `io.backends.get_backend(name)`, `get_available_backend_names()`, `register_backend(name, backend)` pour étendre.
164+
- **CLI** : `build-collection --parquet` ou `build-collection --zarr` ; par défaut HDF5 (avec avertissement).
165+
- **Survey** : `store_format`, `hdf5_file_path` / `parquet_file_path` / `zarr_file_path` selon le backend.
166+
- **Zarr (compression, parallélisation)** : voir [docs/ZARR-BACKEND.md](ZARR-BACKEND.md).
167+
168+
### 3.6 API cible (alignement RFC-001)
151169

152170
- Charger un dataset par nom : `DataManager.load("erfs", config_dir=...)` → lit `collections_dir/erfs/manifest.yaml` et les données associées.
153171
- Accès aux métadonnées : `dataset.metadata` (provenant du manifest), `dataset.schema` (si on l’expose), chemins dérivés déterministes à partir de `collections_dir` + `name` + `output_subdir`.
@@ -173,7 +191,7 @@ Un script permet de migrer l’existant vers la nouvelle structure :
173191
```bash
174192
python -m openfisca_survey_manager.scripts.migrate_config_to_rfc002 [--config-dir PATH] [--dry-run] [-v]
175193
```
176-
- **Comportement** : lit `config.ini` ([collections] + [data]) et, si présent, `raw_data.ini` ; pour chaque collection, charge le JSON, déduit `source.format` et `source.path` à partir de `informations` (csv_files, sas_files, etc.) ou de la section correspondante de raw_data.ini ; crée `config.yaml` et `collections_dir/<name>/manifest.yaml` pour chaque collection. Avec `--dry-run`, n’écrit aucun fichier.
194+
- **Comportement** : lit `config.ini` ([collections] + [data]) et, si présent, `raw_data.ini` ; pour chaque collection, charge le JSON, déduit `source.format` et `source.path` à partir de `informations` (csv_files, sas_files, etc.) ou de la section correspondante de raw_data.ini ; **infère `store_format`** (parquet, hdf5 ou zarr) à partir des champs `parquet_file_path` / `zarr_file_path` / `hdf5_file_path` des surveys du JSON legacy, et l’écrit dans le manifest ; crée `config.yaml` et `collections_dir/<name>/manifest.yaml` pour chaque collection. Avec `--dry-run`, n’écrit aucun fichier.
177195
- **Répertoire de config par défaut** : celui retourné par `get_config_dir()` (env `OPENFISCA_SURVEY_CONFIG_DIR` ou XDG). On peut imposer un répertoire avec `--config-dir`.
178196

179197
### 4.3 Dépréciation

docs/ZARR-BACKEND.md

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# Utiliser Zarr avec OpenFisca Survey Manager
2+
3+
Ce document explique **si et comment** utiliser le backend Zarr pour stocker les enquêtes, et ce qu’il en est de la **compression** et de la **parallélisation** en lecture/écriture.
4+
5+
---
6+
7+
## 1. Utiliser Zarr avec OpenFisca
8+
9+
### Oui, c’est possible
10+
11+
Le backend **zarr** est disponible dans `openfisca-survey-manager` à condition d’installer la dépendance optionnelle :
12+
13+
```bash
14+
pip install openfisca-survey-manager[zarr]
15+
# ou
16+
pip install openfisca-survey-manager zarr numcodecs
17+
```
18+
19+
(pandas 2.x utilise `to_zarr` / `read_zarr` ; le package **zarr** est requis.)
20+
21+
### En ligne de commande (build-collection)
22+
23+
Pour construire une collection en stockant les tables au format Zarr :
24+
25+
```bash
26+
build-collection -c ma_collection --zarr
27+
```
28+
29+
Sans `--zarr`, le format par défaut reste HDF5 (avec avertissement) ou vous pouvez utiliser `--parquet`.
30+
31+
### En Python (fill_store)
32+
33+
```python
34+
from openfisca_survey_manager.core.dataset import SurveyCollection
35+
36+
col = SurveyCollection.load(collection="ma_collection", config_files_directory="...")
37+
col.fill_store(
38+
source_format="sas", # ou csv, parquet, etc.
39+
store_format="zarr",
40+
)
41+
```
42+
43+
Après cela, chaque survey a un répertoire `{output}/{survey.name}.zarr`, et chaque table est un **groupe zarr** (sous-répertoire) dans ce store. La lecture se fait comme d’habitude avec `survey.get_values(table=..., variables=...)` ; le code utilise automatiquement le backend zarr si `store_format == "zarr"`.
44+
45+
### Vérifier que Zarr est disponible
46+
47+
```python
48+
from openfisca_survey_manager.io.backends import get_available_backend_names, get_backend
49+
50+
print(get_available_backend_names()) # doit contenir "zarr" si le package est installé
51+
backend = get_backend("zarr") # lève ValueError si zarr absent
52+
```
53+
54+
---
55+
56+
## 2. Compression
57+
58+
### Comportement actuel
59+
60+
Dans l’implémentation actuelle, l’écriture Zarr passe par `pandas.DataFrame.to_zarr(path, mode="w")` **sans options de compression explicites**. Zarr/pandas peuvent donc utiliser un comportement par défaut (par ex. compression légère ou aucune selon les versions).
61+
62+
### Ce que Zarr permet en général
63+
64+
Zarr gère la compression **par blocs (chunks)** via **numcodecs**. On peut utiliser par exemple :
65+
66+
- **Blosc** (LZ4, Zstd, Zlib) : bon compromis vitesse / ratio, très utilisé
67+
- **Zstd** : bon ratio, décompression rapide
68+
- **LZ4** : très rapide, ratio moindre
69+
- **Gzip** : standard, plus lent
70+
71+
Ces options se configurent au moment de la **création** du tableau zarr (compressor, chunks). Avec **pandas** :
72+
73+
- `df.to_zarr(path, ...)` peut accepter des arguments supplémentaires passés au store zarr sous-jacent (selon la version de pandas).
74+
- Pour un contrôle fin (compression, chunking), on peut créer soi‑même un store zarr avec le bon `compressor` puis y écrire les colonnes, ou étendre le backend (voir ci‑dessous).
75+
76+
### Évolution possible dans le survey-manager
77+
78+
On peut faire évoluer le backend Zarr pour accepter des options (compression, chunks) soit :
79+
80+
- via des **kwargs** dans `fill_store(..., store_format="zarr", **zarr_options)` transmis à `to_zarr`,
81+
- soit via la **config** (manifest ou config.yaml) pour définir un compressor par défaut pour le format zarr.
82+
83+
Aujourd’hui, si vous avez besoin d’une compression précise, vous pouvez :
84+
85+
1. **Enregistrer un backend personnalisé** (`register_backend`) qui appelle `to_zarr` avec le `compressor` (et éventuellement les chunks) de votre choix.
86+
2. Ou **post‑traiter** les répertoires `.zarr` générés (ré‑écriture avec d’autres options zarr) en dehors du survey-manager.
87+
88+
---
89+
90+
## 3. Parallélisation lecture / écriture
91+
92+
### Zarr en général
93+
94+
- **Parallélisme par blocs** : Zarr est conçu pour que des **chunks différents** puissent être lus ou écrits en parallèle sans verrou global (chaque chunk est indépendant).
95+
- **En Python** : le **GIL** limite le gain avec des threads pour la partie compression/décompression ; le parallélisme efficace passe souvent par **multi‑processus** ou des runtimes qui libèrent le GIL (Cython, C extensions utilisées par numcodecs/blosc).
96+
- **Goulot d’étranglement** : en pratique, la **compression/décompression** peut saturer le CPU (~1 GB/s) alors que le disque ou le réseau peuvent aller plus vite ; des évolutions (batch encode/decode, GPU) sont en cours dans l’écosystème zarr.
97+
98+
### Dans le survey-manager aujourd’hui
99+
100+
- **Écriture** : `fill_store(store_format="zarr")` appelle `to_zarr` pour chaque table, de façon **séquentielle** (une table après l’autre, pas de parallélisation interne exposée).
101+
- **Lecture** : `get_values()` utilise `read_zarr` pour une table donnée, également de façon **séquentielle** par appel.
102+
103+
Donc **par défaut** : pas de parallélisation multi‑tables ni multi‑chunks exposée dans l’API actuelle.
104+
105+
### Comment paralléliser quand même
106+
107+
1. **Plusieurs tables / plusieurs surveys**
108+
Vous pouvez paralléliser vous‑même au niveau applicatif : lancer plusieurs processus ou threads qui appellent `fill_store` (ou `get_values`) sur des collections/surveys/tables différents ; chaque processus écrira/lira ses propres fichiers ou groupes zarr sans conflit.
109+
110+
2. **Dask**
111+
Pour des tableaux zarr, **Dask** (dask.array, ou chargement des zarr en Dask) gère le chargement parallèle par chunks. Cela ne passe pas directement par l’API Survey/SurveyCollection actuelle : il faudrait soit exporter les chemins `.zarr` puis les ouvrir avec Dask, soit ajouter une couche d’intégration (p.ex. une fonction qui retourne un Dask DataFrame à partir d’un survey zarr).
112+
113+
3. **Évolution du backend**
114+
On pourrait ajouter plus tard un mode « écriture parallèle par table » (threads/processes) ou une option de lecture qui retourne un objet Dask pour exploiter le parallélisme par chunks côté zarr.
115+
116+
---
117+
118+
## 4. Résumé pratique
119+
120+
| Question | Réponse |
121+
|----------|--------|
122+
| **Utiliser Zarr avec OpenFisca ?** | Oui : `pip install openfisca-survey-manager[zarr]`, puis `build-collection --zarr` ou `fill_store(store_format="zarr")`. |
123+
| **Compression ?** | Par défaut : comportement zarr/pandas (souvent léger). Pour plus de contrôle : backend personnalisé avec `to_zarr(..., compressor=...)` ou post‑traitement des stores zarr. |
124+
| **Parallélisation lecture/écriture ?** | Pas exposée dans l’API actuelle (une table à la fois). Parallélisme possible : vous-même sur plusieurs tables/surveys, ou en utilisant Dask sur les chemins zarr générés. |
125+
126+
Si vous voulez, on peut détailler une proposition d’API pour passer des options de compression (et éventuellement de chunking) au backend Zarr dans `fill_store` ou dans la config.

openfisca_survey_manager/configuration/config_loader.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def manifest_survey_to_json(survey_name: str, entry: dict[str, Any]) -> dict[str
9191
"label": entry.get("label", survey_name),
9292
"hdf5_file_path": None,
9393
"parquet_file_path": None,
94+
"zarr_file_path": None,
9495
"tables": entry.get("tables"),
9596
"informations": informations,
9697
}

openfisca_survey_manager/core/dataset.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,20 @@ def load(
151151
self.config = None
152152
self.output_directory = str(new_cfg["default_output_dir"])
153153
self.surveys = []
154+
store_format = manifest.get("store_format", "parquet")
155+
output_dir = Path(self.output_directory)
154156
for survey_name, entry in manifest.get("surveys", {}).items():
155157
survey_json = manifest_survey_to_json(survey_name, entry)
156158
survey = Survey(name=survey_name)
157159
survey = survey.create_from_json(survey_json)
158160
survey.survey_collection = self
161+
survey.store_format = store_format
162+
if store_format == "hdf5":
163+
survey.hdf5_file_path = str(output_dir / (survey.name + ".h5"))
164+
elif store_format == "parquet":
165+
survey.parquet_file_path = str(output_dir / survey.name)
166+
elif store_format == "zarr":
167+
survey.zarr_file_path = str(output_dir / (survey.name + ".zarr"))
159168
self.surveys.append(survey)
160169
return self
161170

openfisca_survey_manager/core/survey.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from openfisca_survey_manager.core.table import Table
1717
from openfisca_survey_manager.exceptions import SurveyIOError, SurveyManagerError
18+
from openfisca_survey_manager.io.backends import get_backend
1819
from openfisca_survey_manager.io.hdf import hdf5_safe_key
1920
from openfisca_survey_manager.processing.harmonization import harmonize_data_frame_columns
2021

@@ -46,6 +47,7 @@ class Survey:
4647

4748
hdf5_file_path: Optional[str] = None
4849
parquet_file_path: Optional[str] = None
50+
zarr_file_path: Optional[str] = None
4951
label: Optional[str] = None
5052
name: Optional[str] = None
5153
survey_collection: Optional[SurveyCollection] = None
@@ -89,12 +91,16 @@ def __repr__(self) -> str:
8991

9092
@classmethod
9193
def create_from_json(cls, survey_json: dict) -> Survey:
94+
# Top-level store paths; exclude from informations to avoid duplicate kwargs
95+
store_path_keys = {"hdf5_file_path", "parquet_file_path", "zarr_file_path"}
96+
infos = {k: v for k, v in survey_json.get("informations", {}).items() if k not in store_path_keys}
9297
self = cls(
9398
name=survey_json.get("name"),
9499
label=survey_json.get("label"),
95100
hdf5_file_path=survey_json.get("hdf5_file_path"),
96101
parquet_file_path=survey_json.get("parquet_file_path"),
97-
**survey_json.get("informations", {}),
102+
zarr_file_path=survey_json.get("zarr_file_path"),
103+
**infos,
98104
)
99105
self.tables = survey_json.get("tables")
100106
return self
@@ -137,6 +143,9 @@ def fill_store(
137143
if store_format == "parquet" and survey.parquet_file_path is None:
138144
survey.parquet_file_path = str(Path(directory_path) / survey.name)
139145

146+
if store_format == "zarr" and survey.zarr_file_path is None:
147+
survey.zarr_file_path = str(Path(directory_path) / (survey.name + ".zarr"))
148+
140149
self.store_format = store_format
141150

142151
if source_format is not None:
@@ -276,6 +285,23 @@ def _get_values_from_parquet(
276285
return pq.ParquetDataset(parquet_file).read(columns=variables).to_pandas()
277286
raise SurveyIOError(f"No table {table} found in {self.parquet_file_path}")
278287

288+
def _get_values_from_zarr(
289+
self,
290+
table: str,
291+
variables: Optional[List[str]] = None,
292+
**kwargs: Any,
293+
) -> pandas.DataFrame:
294+
"""Read table from zarr store."""
295+
if self.zarr_file_path is None:
296+
raise SurveyIOError("No zarr store path for survey")
297+
backend = get_backend("zarr")
298+
return backend.read_table(
299+
self.zarr_file_path,
300+
table,
301+
variables=variables,
302+
**kwargs,
303+
)
304+
279305
def get_values(
280306
self,
281307
variables: Optional[List[str]] = None,
@@ -287,9 +313,11 @@ def get_values(
287313
batch_index: int = 0,
288314
filter_by: Optional[List[tuple]] = None,
289315
) -> pandas.DataFrame:
290-
if self.parquet_file_path is None and self.hdf5_file_path is None:
316+
if self.parquet_file_path is None and self.hdf5_file_path is None and self.zarr_file_path is None:
291317
raise SurveyIOError(f"No data file found for survey {self.name}")
292-
if self.hdf5_file_path is not None:
318+
if self.store_format == "zarr" and self.zarr_file_path is not None:
319+
df = self._get_values_from_zarr(table or "", variables=variables)
320+
elif self.hdf5_file_path is not None:
293321
df, _ = self._get_values_from_hdf5(table or "", ignorecase=ignorecase)
294322
else:
295323
df = self._get_values_from_parquet(table, variables, filter_by, batch_size, batch_index)

0 commit comments

Comments
 (0)