Le blog technique

Toutes les astuces #tech des collaborateurs de PI Services.

#openblogPI

Retrouvez les articles à la une

Script Python pour manipuler les csv avec duckdb

Vous travaillez régulièrement avec des ensembles de données répartis sur plusieurs fichiers CSV ? Vous aimeriez pouvoir les interroger facilement avec la puissance du langage SQL, sans avoir à les importer dans une base de données lourde ?

Le script Python query_csvs.py transforme instantanément un dossier de fichiers CSV en un ensemble de tables SQL interrogeables grâce à la combinaison de pandas et de DuckDB.

Comment ça marche ?

Le script parcourt un dossier spécifié (par défaut, MyFolder dans le répertoire du script) et charge chaque fichier CSV dans la mémoire. Chaque CSV devient une « table » dans un moteur DuckDB léger.

Une fonctionnalité clé est la normalisation automatique des noms de fichiers en noms de tables SQL conviviaux.

🚀 Mise en route rapide

1. Prérequis

Assurez-vous d’avoir Python et les bibliothèques nécessaires installées :

Bash

pip install pandas duckdb

2. Placez vos données

Mettez vos fichiers CSV dans le dossier par défaut MyFolder ou préparez un autre dossier contenant vos données.

3. Les commandes essentielles

Voici les commandes principales pour commencer à explorer vos données :

Lister les tables (fichiers CSV détectés)

Pour voir quels fichiers ont été chargés et comment ils ont été nommés :

Bash

python query_csvs.py list

Afficher le schéma d’une table

Comprendre la structure et les types de colonnes d’un fichier :

Bash

python query_csvs.py schema nom_de_votre_fichier_csv

Exécuter une requête SQL

C’est là que la magie opère. Vous pouvez joindre des tables, filtrer, agréger, et plus encore !

Bash

python query_csvs.py query "SELECT * FROM ma_table_csv WHERE colonne_prix > 100 LIMIT 5"

Le Shell Interactif

Pour des explorations plus poussées, ouvrez le shell interactif DuckDB :

Bash

python query_csvs.py shell

Dans le shell, vous pouvez taper des requêtes SQL directement, ou utiliser des commandes comme .tables et .schema <table>.

✨ Options Avancées

  • Dossier de données spécifique : Utilisez --data-dir /chemin/vers/mon/dossier pour pointer vers un autre répertoire.
  • Fichier unique : Utilisez --data-file /chemin/vers/un/fichier.csv pour charger un seul fichier spécifique.
  • Exporter les résultats : Ajoutez --to-csv /chemin/vers/resultat.csv à votre commande query pour sauvegarder la sortie complète de votre requête dans un nouveau fichier.

Fini les boucles for et les fusions manuelles dans pandas pour des requêtes simples ! Adoptez la puissance de SQL pour interroger vos CSV en quelques secondes.

/

#!/usr/bin/env python3
"""
Script : query_csvs.py
======================

Ce script permet d'interroger dynamiquement des fichiers CSV présents dans un dossier (par defaut MyFolder) comme s'il s'agissait de tables SQL, en utilisant pandas et DuckDB.

Fonctionnalités principales :
----------------------------
- Recherche automatique de tous les fichiers CSV dans le dossier spécifié (et sous-dossiers)
- Chargement de chaque CSV comme une table DuckDB (nom normalisé automatiquement)
- Possibilité de lister les tables, afficher le schéma d'une table, exécuter des requêtes SQL, ou ouvrir un shell interactif SQL
- Export possible du résultat d'une requête SQL vers un fichier CSV

Utilisation :
-------------
python query_csvs.py [--data-dir CHEMIN] <commande> [options]

Commandes disponibles :
  list                       Lister les tables chargées
  schema <table>             Afficher le schéma d'une table
  query "<SQL>" [options]    Exécuter une requête SQL sur les tables
  shell                      Ouvrir un shell interactif SQL

Options de la commande query :
  --no-truncate              Afficher tous les résultats (pas de troncature)
  --max-rows N               Nombre max de lignes affichées (défaut: 100)
  --max-columns N            Nombre max de colonnes affichées (défaut: 100)
  --width N                  Largeur d'affichage (défaut: 200)
  --to-csv CHEMIN            Exporter le résultat complet dans un fichier CSV

Exemples :
----------
Lister les tables détectées :
    python query_csvs.py list

Afficher le schéma d'une table :
    python query_csvs.py schema nom_de_table

Exécuter une requête SQL :
    python query_csvs.py query "SELECT * FROM nom_de_table LIMIT 10"

Indiquer un dossier de données spécifique :
    python query_csvs.py --data-dir /chemin/vers/dossier query "SELECT * FROM nom_de_table"

Indiquer un fichier CSV spécifique à charger :
    python query_csvs.py --data-file /chemin/vers/fichier.csv query "SELECT * FROM nom_de_table LIMIT 10"

Ouvrir le shell interactif :
    python query_csvs.py shell

Dépendances :
-------------
- pandas
- duckdb

Installer les dépendances :
    pip install pandas duckdb

"""
import argparse
import os
import re
import sys
import csv
from pathlib import Path
from typing import Dict, List, Tuple

import pandas as pd
import duckdb

BASE_DIR = Path(__file__).resolve().parent
DATA_DIR = BASE_DIR / 'MyFolder'



def escape_ident(name: str) -> str:
    """Escape an identifier for DuckDB SQL by double-quoting and doubling internal quotes."""
    return '"' + str(name).replace('"', '""') + '"'


def normalize_table_name(csv_path: Path, root: Path) -> str:
    rel = csv_path.relative_to(root)
    # Build name from parts without extension
    parts = list(rel.parts)
    if parts:
        parts[-1] = Path(parts[-1]).stem
    name = '_'.join(parts)
    # Normalize: lowercase, replace spaces and non-alnum with underscore, collapse repeats
    name = name.lower()
    name = re.sub(r"[^a-z0-9_]+", "_", name)
    name = re.sub(r"_+", "_", name).strip('_')
    # Ensure starts with a letter for SQL friendliness
    if not name or not name[0].isalpha():
        name = f"t_{name}" if name else "t_csv"
    return name


def sniff_dialect_and_encoding(path: Path) -> Tuple[str, str]:
    # Try encodings in order
    encodings = ['utf-8-sig', 'utf-8', "cp1252", 'latin-1']
    for enc in encodings:
        try:
            with open(path, 'r', newline='', encoding=enc, errors='strict') as f:
                sample = f.read(4096)
                if not sample:
                    return enc, ','
                try:
                    dialect = csv.Sniffer().sniff(sample, delimiters=[',', ';', '\t', '|'])
                    return enc, dialect.delimiter
                except csv.Error:
                    # Fallback: guess common separators by count
                    counts = {sep: sample.count(sep) for sep in [',', ';', '\t', '|']}
                    delim = max(counts, key=counts.get)
                    return enc, delim
        except UnicodeDecodeError:
            continue
        except Exception:
            continue
    # Final fallback
    return 'utf-8', ','


def find_csv_files(root: Path) -> List[Path]:
    return [p for p in root.rglob('*.csv') if p.is_file()]


def load_csv_to_df(path: Path) -> pd.DataFrame:
    enc, sep = sniff_dialect_and_encoding(path)
    try:
        df = pd.read_csv(path, sep=sep, encoding=enc, engine='python')
    except Exception:
        # Retry with no header if it fails
        df = pd.read_csv(path, sep=sep, encoding=enc, engine='python', header=None)
        # Create default column names
        df.columns = [f"col_{i+1}" for i in range(len(df.columns))]
    # Trim column names
    df.columns = [str(c).strip() for c in df.columns]
    return df


def register_dataframes(con: duckdb.DuckDBPyConnection, root: Path, specific_file: Path = None) -> Dict[str, pd.DataFrame]:
    """
    Register CSV files as DuckDB tables.
    - If specific_file is provided, only that file is loaded (and its parent is used to build the table name).
    - Otherwise all CSVs under root are found and loaded.
    """
    if specific_file:
        specific_file = Path(specific_file).resolve()
        if not specific_file.exists() or not specific_file.is_file():
            print(f"Aucun fichier CSV trouvé: {specific_file}")
            return {}
        csvs = [specific_file]
        root_for_naming = specific_file.parent
    else:
        csvs = find_csv_files(root)
        root_for_naming = root

    if not csvs:
        print(f"Aucun fichier CSV trouvé dans {root if not specific_file else specific_file}")
    name_to_df: Dict[str, pd.DataFrame] = {}
    used_names = set()
    for csv_path in sorted(csvs):
        base_name = normalize_table_name(csv_path, root_for_naming)
        i = 2
        orig_base = base_name
        while base_name in used_names:
            # If there is a collision, suffix the preferred/base name
            base_name = f"{orig_base}_{i}"
            i += 1
        try:
            df = load_csv_to_df(csv_path)
        except Exception as e:
            print(f"[WARN] Échec de chargement: {csv_path} -> {e}")
            continue
        con.register(base_name, df)
        name_to_df[base_name] = df
        used_names.add(base_name)
    return name_to_df


def list_tables(con: duckdb.DuckDBPyConnection) -> List[str]:
    rows = con.sql("SELECT table_name FROM information_schema.tables WHERE table_schema = 'main' ORDER BY table_name").fetchall()
    return [r[0] for r in rows]


def show_schema(con: duckdb.DuckDBPyConnection, table: str) -> pd.DataFrame:
    return con.sql(f"DESCRIBE {escape_ident(table)}").fetchdf()


def run_query(con: duckdb.DuckDBPyConnection, sql: str) -> pd.DataFrame:
    return con.sql(sql).fetchdf()


def interactive_shell(con: duckdb.DuckDBPyConnection):
    print("DuckDB SQL shell (tables chargées depuis CSV). Tapez .help pour l'aide, .tables pour lister, .schema <table> pour le schéma, .exit pour quitter.")
    while True:
        try:
            line = input('sql> ').strip()
        except (EOFError, KeyboardInterrupt):
            print()
            break
        if not line:
            continue
        if line in ('.exit', '.quit', '\\q'):
            break
        if line == '.help':
            print("Commandes:\n  .tables           Lister les tables\n  .schema <table>   Afficher le schéma\n  .exit             Quitter")
            continue
        if line == '.tables':
            for t in list_tables(con):
                print(t)
            continue
        if line.startswith('.schema'):
            parts = line.split(maxsplit=1)
            if len(parts) == 2:
                tbl = parts[1].strip()
                try:
                    df = show_schema(con, tbl)
                    print(df.to_string(index=False))
                except Exception as e:
                    print(f"Erreur: {e}")
            else:
                print("Usage: .schema <table>")
            continue
        # Otherwise treat as SQL
        try:
            df = run_query(con, line)
            if df.empty:
                print("(résultat vide)")
            else:
                # Limit display width/rows to keep readable
                with pd.option_context('display.max_rows', 50, 'display.max_columns', 50, 'display.width', 200):
                    print(df)
        except Exception as e:
            print(f"Erreur SQL: {e}")


def main(argv: List[str]):
    parser = argparse.ArgumentParser(description='Interroger des CSV comme des tables SQL (pandas + DuckDB)')
    parser.add_argument('--data-dir', default=str(DATA_DIR), help='Dossier racine contenant les CSV (défaut: Data_Sources_Export/)')
    parser.add_argument('--data-file', default=None, help='Chemin vers un fichier CSV spécifique à charger (si fourni, --data-dir est ignoré)')

    sub = parser.add_subparsers(dest='cmd', required=True)

    sub.add_parser('list', help='Lister les tables chargées')

    p_schema = sub.add_parser('schema', help='Afficher le schéma d\'une table')
    p_schema.add_argument('table', help='Nom de la table')

    p_query = sub.add_parser('query', help='Exécuter une requête SQL')
    p_query.add_argument('sql', help='Requête SQL entre guillemets')

    p_qfile = sub.add_parser('query-file', help='Exécuter une requête SQL depuis un fichier')
    p_qfile.add_argument('file', type=argparse.FileType('r'), help='Fichier contenant la requête SQL')

    # Output control: add to both query and query-file
    for p in (p_query, p_qfile):
        p.add_argument('--no-truncate', action='store_true', help="Afficher tous les résultats (désactive la troncature d'affichage)")
        p.add_argument('--max-rows', type=int, default=100, help='Nombre max de lignes affichées (défaut: 100)')
        p.add_argument('--max-columns', type=int, default=100, help='Nombre max de colonnes affichées (défaut: 100)')
        p.add_argument('--width', type=int, default=200, help="Largeur d'affichage (défaut: 200)")
        p.add_argument('--to-csv', type=str, default=None, help='Chemin de fichier pour sauvegarder TOUT le résultat en CSV')

    sub.add_parser('shell', help='Ouvrir un mini-shell interactif SQL')

    args = parser.parse_args(argv)

    data_dir = Path(args.data_dir)
    specific = Path(args.data_file) if getattr(args, 'data_file', None) else None

    if specific:
        if not specific.exists() or not specific.is_file():
            print(f"Le fichier spécifié n'existe pas: {specific}")
            return 2
    else:
        if not data_dir.exists():
            print(f"Le dossier de données n'existe pas: {data_dir}")
            return 2

    con = duckdb.connect(database=':memory:')
    register_dataframes(con, data_dir, specific_file=specific)

    if args.cmd == 'list':
        for t in list_tables(con):
            print(t)
        return 0

    if args.cmd == 'schema':
        df = show_schema(con, args.table)
        print(df.to_string(index=False))
        return 0

    if args.cmd == 'query':
        # Lecture de la requête SQL depuis un fichier si précisé
        sql = args.sql
        df = run_query(con, sql)
        # Optional: save full result to CSV
        if getattr(args, 'to_csv', None):
            out_path = Path(args.to_csv)
            df.to_csv(out_path, index=False)
            print(f"Résultat complet sauvegardé dans {out_path}")
        # Configure display
        if getattr(args, 'no_truncate', False):
            rows, cols, width = None, None, 0
        else:
            rows = getattr(args, 'max_rows', 100)
            cols = getattr(args, 'max_columns', 100)
            width = getattr(args, 'width', 200)
        with pd.option_context('display.max_rows', rows, 'display.max_columns', cols, 'display.width', width):
            print(df)
        return 0

    if args.cmd == 'query-file':
        sql = args.file.read()
        df = run_query(con, sql)
        # Optional: save full result to CSV
        if getattr(args, 'to_csv', None):
            out_path = Path(args.to_csv)
            df.to_csv(out_path, index=False)
            print(f"Résultat complet sauvegardé dans {out_path}")
        # Configure display
        if getattr(args, 'no_truncate', False):
            rows, cols, width = None, None, 0
        else:
            rows = getattr(args, 'max_rows', 100)
            cols = getattr(args, 'max_columns', 100)
            width = getattr(args, 'width', 200)
        with pd.option_context('display.max_rows', rows, 'display.max_columns', cols, 'display.width', width):
            print(df)
        return 0

    if args.cmd == 'shell':
        interactive_shell(con)
        return 0

    return 0


if __name__ == '__main__':
    sys.exit(main(sys.argv[1:]))

🦆 DuckDB : Le couteau suisse de l’analytique embarquée !

Vous en avez assez de jongler avec des bases de données lourdes ou de devoir charger vos données dans un entrepôt distant pour la moindre analyse ? Il est temps de découvrir DuckDB, la base de données analytique embarquée qui révolutionne la manière dont les data scientists et les analystes travaillent avec leurs données locales.

Qu’est-ce que DuckDB ?

DuckDB n’est pas une base de données transactionnelle (comme PostgreSQL ou MySQL), mais une base de données orientée colonne (OLAP) conçue spécifiquement pour l’analyse et l’exécution rapide de requêtes complexes (agrégations, jointures, etc.).

Sa particularité majeure est d’être embarquée et sans serveur (serverless). Cela signifie :

  • Elle fonctionne dans le même processus que votre application (Python, R, Node.js, etc.).
  • Elle ne nécessite aucune installation, configuration ou gestion de serveur.
  • Elle peut lire les données directement à partir de fichiers locaux (CSV, Parquet, JSON) sans les importer au préalable.

🚀 L’intérêt majeur : La Vitesse et la Simplicité

  1. ⚡ Vitesse d’exécution : Grâce à son architecture orientée colonne et ses optimisations (vectorisation, compilation just-in-time), DuckDB est souvent plus rapide pour les requêtes analytiques que de nombreux outils traditionnels, y compris pour des jeux de données de plusieurs gigaoctets.
  2. 🔧 Simplicité d’utilisation : Elle utilise le standard SQL que vous connaissez, tout en étant incroyablement facile à intégrer.
  3. 🔗 Zéro ETL (Extract, Transform, Load) : C’est un Data Lakehouse dans une boîte ! Vous pouvez requêter directement des fichiers Parquet ou CSV stockés sur votre disque ou dans le cloud, ce qui élimine la phase fastidieuse de chargement.

🛠️ Exemples d’utilisation concrets avec Python

L’intégration la plus populaire de DuckDB se fait via sa librairie Python.

1. Interroger directement des fichiers (Zero ETL)

Imaginez que vous ayez un gros fichier Parquet nommé ventes_2024.parquet.

Python

import duckdb

# Requête SQL directement sur le fichier Parquet, sans l'importer !
resultat = duckdb.sql("""
    SELECT 
        region, 
        SUM(montant) AS total_ventes
    FROM 
        'ventes_2024.parquet' 
    WHERE 
        date >= '2024-06-01'
    GROUP BY 
        region
    ORDER BY 
        total_ventes DESC;
""")

print(resultat.df())

L’avantage ici est la rapidité : DuckDB ne lira que les colonnes (region, montant, date) et les lignes nécessaires pour la période spécifiée, optimisant l’accès au disque.

2. Remplacer Pandas pour les grandes agrégations

Lorsque votre DataFrame Pandas devient trop volumineux pour la mémoire (ou que les calculs deviennent lents), DuckDB peut prendre le relais sans changer votre flux de travail.

Python

import pandas as pd
import duckdb

# Création d'un DataFrame (peut être très grand)
data_df = pd.DataFrame({
    'cat': ['A', 'B', 'A', 'C', 'B'],
    'val': [10, 20, 15, 50, 30]
})

# Utiliser le DataFrame Pandas directement dans une requête SQL
resultat_sql = duckdb.sql("""
    SELECT 
        cat, 
        AVG(val) 
    FROM 
        data_df 
    GROUP BY 
        cat
""").df()

print(resultat_sql)
#   cat  avg(val)
# 0   A      12.5
# 1   B      25.0
# 2   C      50.0

DuckDB exécute la requête SQL très efficacement sur le DataFrame en mémoire, offrant une alternative rapide et familière à des opérations group by complexes en Pandas.


💡 Conclusion

DuckDB est l’outil parfait pour l’analyse interactive et les flux de travail de prototypage de données. Si vous travaillez régulièrement avec des ensembles de données de taille moyenne à grande (de quelques Mo à plusieurs Go) et que vous avez besoin de la puissance du SQL sans la complexité d’un SGBD serveur, DuckDB est fait pour vous. C’est un véritable « SQLite pour l’analytique » qui mérite sa place dans la boîte à outils de tout professionnel des données !

[Le saviez vous ?] – Trouver la date d’expiration du mot de passe d’un utilisateur

Trouver la date d’expiration du mot de passe dans l’Active Direcotry

Bien souvent j’entends cette question « Sais tu me dire si le mot de passe de l’utilisateur a expiré ?« .

Eh bien oui, il y a un moyen assez facile pour récupérer la date d’expiration du mot de passe d’un utilisateur, pour cela il suffit simplement de faire une requête Active Directory en demandant le retour de l’attribut ‘msDS-UserPasswordExpiryTimeComputed‘.
Voici donc une simple requête Powershell (nécessitant le module AD) pour retourner la date d’expiration du mot de passe d’un utilisateur:

# Samaccountname
$User = "toto"
# Return date
[Datetime]::FromFileTime((Get-ADUser $User -Properties "msDS-UserPasswordExpiryTimeComputed").'msDS-UserPasswordExpiryTimeComputed')