Le blog technique

Toutes les astuces #tech des collaborateurs de PI Services.

#openblogPI

Retrouvez les articles à la une

Python – VMWare Vcenter – Script d’inventaire

Le script suivant interroge un ou plusieurs Vcenter VMWare pour construire un inventaire et l’exporter en csv/json

Assurez-vous d’avoir Python installé, puis installez la dépendance nécessaire :

Bash

pip install pyvmomi

#!/usr/bin/env python3
"""
================================================================================
Script d'inventaire VMware vSphere Multi-vCenter
================================================================================

Description:
    Ce script se connecte à un ou plusieurs serveurs vCenter/ESXi pour récupérer
    l'inventaire complet des machines virtuelles et des clusters. Il peut exporter
    les données en JSON ou CSV. En cas d'échec de connexion à un vCenter, le script
    continue avec les autres vCenters sans interruption.

Fonctionnalités:
    - Connexion à un ou plusieurs vCenter/ESXi avec authentification
    - Gestion de la tolérance aux pannes (continue si un vCenter est indisponible)
    - Récupération des informations des VMs (nom, UUID, CPU, RAM, état, etc.)
    - Récupération des informations des clusters
    - Export en JSON et/ou CSV avec identification du vCenter source
    - Support des variables d'environnement pour les credentials
    - Rapport de connexion détaillé (succès/échecs)

Usage:
    
    # Un ou plusieurs vCenters avec même credentials
    python vsphere_inventory.py \
        --servers vcenter1.example.com vcenter2.example.com vcenter3.example.com \
        --user administrator@vsphere.local --password MonMotDePasse
    
    
    # Export en JSON et CSV
    python vsphere_inventory.py \
        --servers vcenter1.example.com vcenter2.example.com \
        --output-json all_vms.json --output-csv all_vms.csv
    

Arguments:
    --servers            Un ou Liste de serveurs vCenter/ESXi (espace ou virgule)
    --user, -u           Nom d'utilisateur vSphere (commun à tous)
    --password, -p       Mot de passe (demandé si non fourni)
    --port               Port de connexion (défaut: 443)
    --output-json        Fichier de sortie JSON (défaut: vsphere_inventory.json)
    --output-csv         Fichier de sortie CSV (optionnel)
    --no-verify-ssl      Désactiver la vérification SSL (non recommandé)
    --continue-on-error  Continuer même si tous les vCenters échouent (défaut: True)



Exemples d'utilisation:
    # Inventaire de plusieurs vCenters avec mêmes credentials
    python vsphere_inventory.py \
        --servers vcenter1.example.com vcenter2.example.com \
        --user admin@vsphere.local \
        --output-json multi_vcenter.json
    
    
    # Sans vérification SSL (environnement de test)
    python vsphere_inventory.py \
        --servers 192.168.1.10 192.168.1.11 \
        --user root \
        --no-verify-ssl

Dépendances:
    - pyvmomi>=8.0.0.1
    
    Installation:
    pip install pyvmomi

Notes:
    - La connexion utilise SSL par défaut (port 443)
    - Les mots de passe ne sont jamais affichés dans les logs
    - Le script gère automatiquement la déconnexion à la fin
    - Compatible avec vCenter 6.x, 7.x et 8.x
    - Si un vCenter est inaccessible, le script continue avec les autres
    - Les erreurs de connexion sont loguées mais n'interrompent pas le processus
    - Le résumé final indique le nombre de connexions réussies/échouées

================================================================================
"""

from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim
import ssl
import atexit
import getpass
import argparse
import os
from datetime import datetime
import csv
import json


def get_obj(content, vimtype, name=None):
    """
    Récupère un objet vSphere par type et optionnellement par nom
    
    Args:
        content: ServiceInstance content
        vimtype: Type d'objet vim (ex: vim.VirtualMachine)
        name: Nom optionnel de l'objet
        
    Returns:
        obj: L'objet trouvé ou None
    """
    obj = None
    container = content.viewManager.CreateContainerView(
        content.rootFolder, [vimtype], True)
    
    if name:
        for c in container.view:
            if c.name == name:
                obj = c
                break
    else:
        obj = container.view
    
    container.Destroy()
    return obj


def get_vm_info(vm, vcenter_source=None):
    """
    Récupère les informations d'une VM
    
    Args:
        vm: Objet vim.VirtualMachine
        vcenter_source: Nom du vCenter source (optionnel)
        
    Returns:
        dict: Dictionnaire contenant les infos de la VM
    """
    summary = vm.summary
    config = vm.config
    guest = vm.guest
    runtime = vm.runtime
    
    # Récupérer le nom de l'hôte ESXi
    host_name = runtime.host.name if runtime.host else None
    
    # Récupérer le cluster
    cluster_name = None
    if runtime.host and runtime.host.parent:
        if isinstance(runtime.host.parent, vim.ClusterComputeResource):
            cluster_name = runtime.host.parent.name
    
    # Récupérer le datacenter
    datacenter_name = None
    obj = vm
    while obj:
        if isinstance(obj, vim.Datacenter):
            datacenter_name = obj.name
            break
        try:
            obj = obj.parent
        except:
            break
    
    vm_info = {
        'vcenter_source': vcenter_source,
        'name': vm.name,
        'uuid': config.uuid if config else None,
        'instance_uuid': config.instanceUuid if config else None,
        'power_state': summary.runtime.powerState,
        'num_cpu': config.hardware.numCPU if config else None,
        'memory_mb': config.hardware.memoryMB if config else None,
        'guest_os': config.guestFullName if config else None,
        'ip_address': guest.ipAddress if guest else None,
        'hostname': guest.hostName if guest else None,
        'tools_status': guest.toolsStatus if guest else None,
        'tools_version': guest.toolsVersion if guest else None,
        'host': host_name,
        'cluster': cluster_name,
        'datacenter': datacenter_name,
        'annotation': config.annotation if config else None,
        'num_disks': len(config.hardware.device) if config else 0,
        'num_nics': sum(1 for dev in config.hardware.device if isinstance(dev, vim.vm.device.VirtualEthernetCard)) if config else 0,
    }
    
    return vm_info


def get_cluster_info(cluster, vcenter_source=None):
    """
    Récupère les informations d'un cluster
    
    Args:
        cluster: Objet vim.ClusterComputeResource
        vcenter_source: Nom du vCenter source (optionnel)
        
    Returns:
        dict: Dictionnaire contenant les infos du cluster
    """
    # Récupérer le datacenter
    datacenter_name = None
    obj = cluster
    while obj:
        if isinstance(obj, vim.Datacenter):
            datacenter_name = obj.name
            break
        try:
            obj = obj.parent
        except:
            break
    
    summary = cluster.summary
    
    cluster_info = {
        'vcenter_source': vcenter_source,
        'name': cluster.name,
        'datacenter': datacenter_name,
        'num_hosts': summary.numHosts,
        'num_effective_hosts': summary.numEffectiveHosts,
        'total_cpu_mhz': summary.totalCpu,
        'total_memory_mb': summary.totalMemory / (1024 * 1024),
        'num_cpu_cores': summary.numCpuCores,
        'num_cpu_threads': summary.numCpuThreads,
        'effective_cpu_mhz': summary.effectiveCpu,
        'effective_memory_mb': summary.effectiveMemory,
        'overall_status': summary.overallStatus,
    }
    
    return cluster_info


def connect_to_vsphere(server, user, password, port=443, verify_ssl=True):
    """
    Se connecte à vSphere
    
    Args:
        server: Nom d'hôte ou IP du serveur vCenter/ESXi
        user: Nom d'utilisateur
        password: Mot de passe
        port: Port de connexion (défaut: 443)
        verify_ssl: Vérifier le certificat SSL (défaut: True)
        
    Returns:
        tuple: (ServiceInstance, error_message) - si erreur, si=None et message d'erreur
    """
    context = None
    if not verify_ssl:
        context = ssl._create_unverified_context()
    
    try:
        si = SmartConnect(
            host=server,
            user=user,
            pwd=password,
            port=int(port),
            sslContext=context
        )
        atexit.register(Disconnect, si)
        return si, None
    except Exception as e:
        return None, str(e)


def get_vcenter_inventory(si, vcenter_name):
    """
    Récupère l'inventaire complet d'un vCenter
    
    Args:
        si: ServiceInstance
        vcenter_name: Nom du vCenter
        
    Returns:
        dict: Inventaire avec VMs et clusters
    """
    content = si.RetrieveContent()
    
    # Récupérer les VMs
    vm_list = get_obj(content, vim.VirtualMachine)
    vms_info = []
    for vm in vm_list:
        vm_info = get_vm_info(vm, vcenter_source=vcenter_name)
        vms_info.append(vm_info)
    
    # Récupérer les clusters
    cluster_list = get_obj(content, vim.ClusterComputeResource)
    clusters_info = []
    for cluster in cluster_list:
        cluster_info = get_cluster_info(cluster, vcenter_source=vcenter_name)
        clusters_info.append(cluster_info)
    
    return {
        'status': 'success',
        'vms': vms_info,
        'clusters': clusters_info,
        'summary': {
            'total_vms': len(vms_info),
            'total_clusters': len(clusters_info),
            'vms_powered_on': sum(1 for vm in vms_info if vm['power_state'] == 'poweredOn'),
            'vms_powered_off': sum(1 for vm in vms_info if vm['power_state'] == 'poweredOff'),
        }
    }


def export_to_json(data, filename):
    """Exporte les données en JSON"""
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    print(f"✓ Données exportées vers {filename}")


def export_to_csv(all_vms, filename):
    """Exporte toutes les VMs de tous les vCenters en CSV"""
    if not all_vms:
        print("Aucune VM à exporter")
        return
    
    fieldnames = all_vms[0].keys()
    
    with open(filename, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(all_vms)
    
    print(f"✓ {len(all_vms)} VMs exportées vers {filename}")

def export_clusters_to_csv(all_clusters, filename):
    """Exporte tous les clusters de tous les vCenters en CSV"""
    if not all_clusters:
        print("Aucun cluster à exporter")
        return
    
    fieldnames = all_clusters[0].keys()
    
    with open(filename, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(all_clusters)
    
    print(f"✓ {len(all_clusters)} clusters exportés vers {filename}")


def export_clusters_to_csv(all_clusters, filename):
    """Exporte tous les clusters de tous les vCenters en CSV"""
    if not all_clusters:
        print("Aucun cluster à exporter")
        return
    
    fieldnames = all_clusters[0].keys()
    
    with open(filename, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(all_clusters)
    
    print(f"✓ {len(all_clusters)} clusters exportés vers {filename}")



def main():
    parser = argparse.ArgumentParser(
        description='Récupérer l\'inventaire VMware vSphere Multi-vCenter (VMs et Clusters)',
        formatter_class=argparse.RawDescriptionHelpFormatter
        
    )
    
    parser.add_argument('--servers', nargs='+',
                        help='Un ou Liste de serveurs vCenter/ESXi')
    parser.add_argument('--user', '-u',
                        default=os.environ.get('VSPHERE_USER'),
                        help='Nom d\'utilisateur')
    parser.add_argument('--password', '-p',
                        default=os.environ.get('VSPHERE_PASSWORD'),
                        help='Mot de passe (sera demandé si non fourni)')
    parser.add_argument('--port',
                        default=os.environ.get('VSPHERE_PORT', 443),
                        help='Port de connexion (défaut: 443)')
    parser.add_argument('--output-json',
                        default='vsphere_inventory.json',
                        help='Fichier de sortie JSON (défaut: vsphere_inventory.json)')
    parser.add_argument('--output-csv',
                        help='Fichier de sortie CSV pour les VMs (optionnel)')
    parser.add_argument('--output-clusters-csv',
                        help='Fichier de sortie CSV pour les clusters (optionnel)')
    parser.add_argument('--no-verify-ssl',
                        action='store_true',
                        help='Désactiver la vérification SSL')
    parser.add_argument('--continue-on-error',
                        action='store_true',
                        default=True,
                        help='Continuer même si connexion échoue (défaut: True)')
    
    args = parser.parse_args()
    
    # Déterminer la liste des vCenters
    vcenter_configs = []
    
    # Vérifier si --servers est fourni
    if not args.servers:
        # Essayer la variable d'environnement
        servers_env = os.environ.get('VSPHERE_SERVERS', os.environ.get('VSPHERE_SERVER'))
        if servers_env:
            servers = [s.strip() for s in servers_env.split(',')]
            print(f"ℹ Utilisation de VSPHERE_SERVERS depuis l'environnement: {', '.join(servers)}")
        else:
            print("❌ ERREUR: Aucun serveur vCenter spécifié")
            print("\nVeuillez fournir au moins un serveur vCenter via:")
            print("  1. Option --servers : python vsphere_inventory.py --servers vcenter1.example.com")
            print("  2. Variable d'environnement VSPHERE_SERVERS : export VSPHERE_SERVERS=vcenter1.example.com")
            print("\nExemple complet:")
            print("  python vsphere_inventory.py --servers vc1.example.com vc2.example.com --user admin@vsphere.local")
            parser.print_help()
            return 1
    else:
        servers = []
        for server_arg in args.servers:
            # Permettre séparation par virgule ou espace
            servers.extend([s.strip() for s in server_arg.split(',')])
    
    # Demander credentials si nécessaire
    user = args.user
    if not user:
        user = input("Nom d'utilisateur vSphere: ")
    
    password = args.password
    if not password:
        password = getpass.getpass("Mot de passe: ")
    
    for server in servers:
        vcenter_configs.append({
            'server': server,
            'user': user,
            'password': password,
            'port': args.port,
            'verify_ssl': not args.no_verify_ssl
        })
    
    
    print("="*80)
    print("INVENTAIRE VMWARE VSPHERE MULTI-VCENTER")
    print("="*80)
    print(f"Nombre de vCenter(s): {len(vcenter_configs)}")
    print(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print()
    
    # Collecter les inventaires
    results = {}
    all_vms = []
    all_clusters = []
    successful_connections = 0
    failed_connections = 0
    
    for i, config in enumerate(vcenter_configs, 1):
        server = config['server']
        print(f"[{i}/{len(vcenter_configs)}] Connexion à {server}...")
        
        si, error = connect_to_vsphere(
            server=config['server'],
            user=config['user'],
            password=config['password'],
            port=config.get('port', 443),
            verify_ssl=config.get('verify_ssl', True)
        )
        
        if si:
            print(f"  ✓ Connecté à {server}")
            successful_connections += 1
            
            try:
                print(f"  → Récupération de l'inventaire...")
                inventory = get_vcenter_inventory(si, server)
                results[server] = inventory
                
                # Ajouter aux listes globales
                all_vms.extend(inventory['vms'])
                all_clusters.extend(inventory['clusters'])
                
                print(f"  ✓ {inventory['summary']['total_vms']} VMs, {inventory['summary']['total_clusters']} clusters")
            
            except Exception as e:
                print(f"  ✗ Erreur lors de la récupération: {e}")
                results[server] = {
                    'status': 'failed',
                    'error': f"Erreur lors de la récupération: {str(e)}"
                }
                failed_connections += 1
        else:
            print(f"  ✗ Échec de connexion: {error}")
            results[server] = {
                'status': 'failed',
                'error': error
            }
            failed_connections += 1
        
        print()
    
    # Vérifier si au moins une connexion a réussi
    if successful_connections == 0:
        print("✗ ERREUR: Aucune connexion réussie")
        if not args.continue_on_error:
            return 1
        print("  Mode continue-on-error activé, export des données disponibles...")
    
    # Préparer les données finales
    data = {
        'timestamp': datetime.now().isoformat(),
        'vcenters': results,
        'summary': {
            'total_vcenters': len(vcenter_configs),
            'successful_connections': successful_connections,
            'failed_connections': failed_connections,
            'total_vms': len(all_vms),
            'total_clusters': len(all_clusters),
            'vms_powered_on': sum(1 for vm in all_vms if vm.get('power_state') == 'poweredOn'),
            'vms_powered_off': sum(1 for vm in all_vms if vm.get('power_state') == 'poweredOff'),
        }
    }
    
    # Export
    print("="*80)
    print("EXPORT DES DONNÉES")
    print("="*80)
    
    export_to_json(data, args.output_json)
    
    if args.output_csv and all_vms:
        export_to_csv(all_vms, args.output_csv)
    
    if args.output_clusters_csv and all_clusters:
        export_clusters_to_csv(all_clusters, args.output_clusters_csv)
    
    if args.output_clusters_csv and all_clusters:
        export_clusters_to_csv(all_clusters, args.output_clusters_csv)
    
    # Résumé final
    print()
    print("="*80)
    print("RÉSUMÉ")
    print("="*80)
    print(f"vCenters interrogés: {len(vcenter_configs)}")
    print(f"  ✓ Connexions réussies: {successful_connections}")
    print(f"  ✗ Connexions échouées: {failed_connections}")
    print()
    print(f"VMs totales: {len(all_vms)}")
    print(f"  - Allumées: {data['summary']['vms_powered_on']}")
    print(f"  - Éteintes: {data['summary']['vms_powered_off']}")
    print(f"Clusters: {len(all_clusters)}")
    print()
    
    if failed_connections > 0:
        print("⚠ Serveurs en échec:")
        for server, result in results.items():
            if result['status'] == 'failed':
                print(f"  - {server}: {result['error']}")
        print()
    
    print("✓ Terminé")
    
    return 0 if successful_connections > 0 else 1


if __name__ == '__main__':
    exit(main())

🐍 Interroger l’API de Rundeck avec Python : Un Guide Rapide

Rundeck est une plateforme pour l’automatisation des opérations et la gestion des exécutions. Bien que vous puissiez tout faire via l’interface web, la vraie puissance vient de son API, qui vous permet d’intégrer Rundeck dans vos scripts et vos systèmes existants.

Ce guide rapide vous montrera comment utiliser la bibliothèque standard de Python, requests, pour interagir avec l’API de Rundeck.


Prérequis

  • Un serveur Rundeck en cours d’exécution.
  • Un Jeton d’API (API Token) créé dans les paramètres de votre profil utilisateur Rundeck.
  • Python et la bibliothèque requests installés (pip install requests).

Étape 1 : Définir les Constantes de Connexion

Nous allons commencer par définir l’URL de base de votre instance Rundeck et l’en-tête d’autorisation qui contiendra votre Jeton d’API

import requests
import json

# Remplacez par l'URL de votre serveur Rundeck
RUNDECK_URL = "http://votreserveur:4440" 

# Remplacez par votre véritable Jeton d'API
API_TOKEN = "votre_jeton_api_secret"

HEADERS = {
    'X-Rundeck-Auth-Token': API_TOKEN,
    'Accept': 'application/json'  # Nous voulons une réponse au format JSON
}

Étape 2 : Interroger l’API

L’une des requêtes les plus courantes est de lister tous les projets. L’API de Rundeck a généralement un chemin qui inclut la version d’API que vous ciblez (souvent api/40 ou plus).

Pour lister les projets, le chemin d’accès est souvent /api/40/projects.

Exemple 1 : Lister les Projets

def lister_projets():
    # Chemin complet de l'API pour lister les projets
    endpoint = f"{RUNDECK_URL}/api/40/projects"
    
    print(f"-> Requête à : {endpoint}")
    
    try:
        # Envoi de la requête GET
        reponse = requests.get(endpoint, headers=HEADERS)
        reponse.raise_for_status()  # Lève une exception pour les codes d'erreur (4xx ou 5xx)

        # La réponse est une liste de projets au format JSON
        projets = reponse.json()
        
        print(f"\n✅ Nombre de projets trouvés : {len(projets)}")
        
        for projet in projets:
            print(f"- Nom : {projet['name']} (Description : {projet.get('description', 'N/A')})")
            
    except requests.exceptions.RequestException as e:
        print(f"❌ Erreur lors de la requête : {e}")

lister_projets()

Exemple 2 : Exécuter un Job

Pour une opération plus complexe comme l’exécution d’un job, vous utilisez généralement une méthode POST et vous devez cibler l’ID du job.

# Remplacez par l'ID de votre job
JOB_ID = "1a2b3c4d-5e6f-7890-abcd-ef0123456789" 

def executer_job():
    # Chemin de l'API pour l'exécution d'un job
    endpoint = f"{RUNDECK_URL}/api/40/job/{JOB_ID}/run"
    
    print(f"-> Exécution du job ID: {JOB_ID}")
    
    try:
        # Requête POST sans corps (par défaut, il n'y a pas d'options)
        reponse = requests.post(endpoint, headers=HEADERS)
        reponse.raise_for_status()

        resultat = reponse.json()
        
        # Le résultat contient l'ID de l'exécution
        execution_id = resultat['id'] 
        
        print(f"\n✅ Job lancé avec succès !")
        print(f"   ID d'exécution : {execution_id}")
        print(f"   URL de suivi : {resultat.get('permalink')}")
        
    except requests.exceptions.RequestException as e:
        print(f"❌ Erreur lors de l'exécution du job : {e}")

# executer_job() # Décommentez pour tester l'exécution

Conclusion

En utilisant la simple bibliothèque requests, vous pouvez facilement automatiser et interagir avec votre environnement Rundeck en quelques lignes de Python. Que ce soit pour récupérer des métriques, déclencher des workflows ou gérer des ressources.

Accorder un consentement pour une application au nom d’un utilisateur unique avec PowerShell

À quoi cela sert ?

Dans certaines organisations, il est parfois nécessaire d’accorder à une application des permissions pour accéder à une API uniquement au nom d’un utilisateur précis, sans étendre ce consentement à l’ensemble du tenant. Cette approche est particulièrement utile pour des scénarios de test, des comptes dédiés, ou des usages restreints nécessitant un contrôle fin des accès. Microsoft Entra ID permet cela grâce à Microsoft Graph PowerShell, en créant une délégation ciblée de permissions associée à un utilisateur unique.

Script proposé par Microsoft

Microsoft fournit un script PowerShell destiné à appliquer, pour une application, un ensemble de permissions déléguées au nom d’un utilisateur précis.

Prérequis

  • Un compte avec un rôle suffisant : Application Administrator, Cloud Application Administrator ou Privileged Role Administrator.
  • L’ID de l’application cliente, l’ID de l’API et l’UPN ou ID de l’utilisateur ciblé.
  • Le module Microsoft Graph PowerShell installé.

Script

# The app for which consent is being granted.
$clientAppId = "de8bc8b5-d9f9-48b1-a8ad-b748da725064" # Microsoft Graph Explorer

# The API to which access will be granted. Microsoft Graph Explorer makes API 
# requests to the Microsoft Graph API, so we'll use that here.
$resourceAppId = "00000003-0000-0000-c000-000000000000" # Microsoft Graph API

# The permissions to grant. Here we're including "openid", "profile", "User.Read"
# and "offline_access" (for basic sign-in), as well as "User.ReadBasic.All" (for 
# reading other users' basic profile).
$permissions = @("openid", "profile", "offline_access", "User.Read", "User.ReadBasic.All")

# The user on behalf of whom access will be granted. The app will be able to access 
# the API on behalf of this user.
$userUpnOrId = "user@example.com"

# Step 0. Connect to Microsoft Graph PowerShell. We need User.ReadBasic.All to get
#    users' IDs, Application.ReadWrite.All to list and create service principals, 
#    DelegatedPermissionGrant.ReadWrite.All to create delegated permission grants, 
#    and AppRoleAssignment.ReadWrite.All to assign an app role.
#    WARNING: These are high-privilege permissions!
Connect-MgGraph -Scopes ("User.ReadBasic.All Application.ReadWrite.All " + "DelegatedPermissionGrant.ReadWrite.All " + "AppRoleAssignment.ReadWrite.All")

# Step 1. Check if a service principal exists for the client application. 
#     If one doesn't exist, create it.
$clientSp = Get-MgServicePrincipal -Filter "appId eq '$($clientAppId)'"
if (-not $clientSp) {
   $clientSp = New-MgServicePrincipal -AppId $clientAppId
}

# Step 2. Create a delegated permission that grants the client app access to the
#     API, on behalf of the user. (This example assumes that an existing delegated 
#     permission grant does not already exist, in which case it would be necessary 
#     to update the existing grant, rather than create a new one.)
$user = Get-MgUser -UserId $userUpnOrId
$resourceSp = Get-MgServicePrincipal -Filter "appId eq '$($resourceAppId)'"
$scopeToGrant = $permissions -join " "
$grant = New-MgOauth2PermissionGrant -ResourceId $resourceSp.Id -Scope $scopeToGrant -ClientId $clientSp.Id -ConsentType "Principal" -PrincipalId $user.Id

# Step 3. Assign the app to the user. This ensures that the user can sign in if assignment
#     is required, and ensures that the app shows up under the user's My Apps portal.
if ($clientSp.AppRoles | ? { $_.AllowedMemberTypes -contains "User" }) {
    Write-Warning ("A default app role assignment cannot be created because the " + "client application exposes user-assignable app roles. You must " + "assign the user a specific app role for the app to be listed " + "in the user's My Apps portal.")
} else {
    # The app role ID 00000000-0000-0000-0000-000000000000 is the default app role
    # indicating that the app is assigned to the user, but not for any specific 
    # app role.
    $assignment = New-MgServicePrincipalAppRoleAssignedTo -ServicePrincipalId $clientSp.Id -ResourceId $clientSp.Id -PrincipalId $user.Id -AppRoleId "00000000-0000-0000-0000-000000000000"
}

Conclusion

Bien que ce mécanisme soit efficace pour des besoins très ciblés, il doit être utilisé de manière prudente. Le consentement est appliqué immédiatement, sans validation de l’utilisateur, ce qui peut amplifier les risques en cas de permissions sensibles. Cette méthode n’est pas adaptée pour remplacer un consentement administrateur global dès que l’application s’adresse à un ensemble étendu d’utilisateur.