Le blog technique

Toutes les astuces #tech des collaborateurs de PI Services.

#openblogPI

Retrouvez les articles à la une

Python – Rundeck – Script pour Lister les jobs

Présentation du Script query_rundeck_pwd_auth.py

Ce script Python utilise la bibliothèque requests pour interagir avec l’API REST de Rundeck en suivant l’approche suivante :

  1. Authentification par Mot de Passe : Le script gère l’authentification en envoyant les identifiants (j_username et j_password) au point de terminaison /j_security_check de Rundeck. Cela établit une session, et le mot de passe est saisi de manière sécurisée (non affiché) grâce au module getpass.
  2. Collecte des Données :
    • Il commence par lister tous les projets disponibles.
    • Pour chaque projet, il liste les jobs.
    • Pour chaque job, une requête API est faite pour récupérer la dernière exécution en utilisant le paramètre max: 1. Le script parse le statut, les dates de début et de fin de cette dernière exécution.
  3. Sortie Standard et Exportation CSV :
    • Les données sont affichées dans la console pour une vérification immédiate.
    • Un fichier rundeck_jobs.csv est généré, fournissant un enregistrement structuré et facilement utilisable pour l’analyse ou l’audit. Le format de sortie est en semicolon-separated values (CSV) : EXPORT_DATE;PROJET;JOB_ID;JOB_NAME;DERNIERE_EXECUTION.

Utilisation

L’exécution du script se fait en fournissant l’URL du site Rundeck, l’URL de l’API et le nom d’utilisateur comme arguments :

python query_rundeck_pwd_auth.py site_url api_url user
python query_rundeck_pwd_auth.py ‘http://myrundeck.mydomain:4440’ ‘http://myrundeck.mydomain:4440/api/14’ myaccount

import requests
import json
from contextlib import redirect_stdout
import sys
import getpass
import datetime



if len(sys.argv) < 4:
    print('Usage: query_rundeck_pwd_auth.py <site_url> <api url> <user>')
    sys.exit()


objdate = datetime.datetime
# site url
rundeck_site_url = (sys.argv[1])
# api url
rundeck_api_url = (sys.argv[2])
# user
user = (sys.argv[3])
# password demandé de façon sécurisée (non affiché)
password = getpass.getpass(prompt='Mot de passe: ')


# Open session with user/password authentication
url = f'{rundeck_site_url}/j_security_check'
session = requests.session()
response = session.post(url, data={"j_username": user, "j_password": password})

# debug minimal : afficher erreur si auth KO
if response.status_code != 200:
    print(f"Auth error: {response.status_code}")
    try:
        print(response.text)
    except Exception:
        pass


# Fonction pour lister les projets
def lister_projects():
    url = f'{rundeck_api_url}/projects'
    headers = {
        'Accept': 'application/json'
            }
    response = session.get(url,headers = headers)

    allprojects=[]
    myproject={
          'project': None
          }

    if response.status_code == 200:
        projects = response.json()
        for project in projects:
            project_name=project['name']
            myproject={
                'project_name': project_name,
                 }
            allprojects.append(myproject)
        return allprojects    
    else:
            print(f"Erreur: {response.status_code}")
            try:
                print(response.text)
            except Exception:
                pass



# Fonction pour lister les jobs d'un projet
def lister_jobs(projet):
    url = f'{rundeck_api_url}/project/{projet}/jobs'
    headers = {
        'Accept': 'application/json'
        
    }
    response = session.get(url,headers = headers)

    alljobs=[]
    myjob={
          'export_date': None,
          'projet': None,
          'job_id': None,
          'job_name': None,
          'last_result': None
          }

    if response.status_code == 200:
        jobs = response.json()
        for job in jobs:
            job_id=job['id']
            job_name=job['name']
            last_result=dernier_etat_execution(job['id'])
            myjob={
                'export_date': objdate.now().strftime("%d/%m/%Y"),
                'projet': projet,
                'job_id': job_id,
                'job_name': job_name,
                'last_result': last_result
            }
            alljobs.append(myjob)
        return alljobs    
    else:
            print(f"Erreur: {response.status_code}")
            try:
                print(response.text)
            except Exception:
                pass


# Fonction pour obtenir le dernier état d'exécution d'un job
def dernier_etat_execution(job_id):
    url = f'{rundeck_api_url}/job/{job_id}/executions'
    headers = {
        'Accept': 'application/json'
        
    }
    params = {
        'max': 1,  # Limite à une exécution pour obtenir la plus récente
        'offset': 0
    }
    response = session.get(url, headers=headers, params=params)
    
    allexec=[]
    myexec={
          'id': None,
          'statut': None,
          'start': None,
          'end': None
          }



    if response.status_code == 200:
        executions = response.json()
        if executions['executions']:
            derniere_execution = executions['executions'][0]
            exec_id=derniere_execution['id']
            exec_statut=derniere_execution['status']
            exec_start=objdate.fromisoformat(derniere_execution['date-started']['date'].replace('Z','')).strftime("%d/%m/%Y, %H:%M:%S")
            # Si 'date-started' n'est pas trouvé dans la derniere execution on positionne exec_end='null'
            if 'date-ended' in derniere_execution: 
               exec_end=objdate.fromisoformat(derniere_execution['date-ended']['date'].replace('Z','')).strftime("%d/%m/%Y, %H:%M:%S")
            else:
               exec_end='null'
            myexec={
                'id': exec_id,
                'statut': exec_statut,
                'start': exec_start,
                'end': exec_end
            }
            allexec.append(myexec)
            return allexec
        else:
            return("Aucune exécution trouvée pour ce job.")
    else:
        return(f"Erreur: {response.status_code}")





# All projects
allproj=lister_projects()   

# Transforme la liste allproj en dictionnaire
dicallproj=dict(enumerate(allproj))



# Pour chacun des projets, recuperation de la liste des jobs et de l'état de la derniere execution
dicallprojjobs={}
for i in sorted(dicallproj.keys()):
    dicallprojjobs[i]=lister_jobs(dicallproj[i]['project_name'])



# Affichage du dictionnaire imbriqué
print("ALL PROJETS - ALL LAST JOBS:")
print("EXPORT_DATE;PROJET;JOB_ID;JOB_NAME;DERNIERE_EXECUTION")
for i in sorted(dicallprojjobs.items()):
    for x in i[1]:
        print("{export_date};{projet};{job_id};{job_name};{last_result}".format(export_date=x['export_date'],projet=x['projet'],job_id=x['job_id'], job_name=x['job_name'], last_result=x['last_result']))
    


# Export vers rundeck_jobs.csv du dictionnaire dicallprojjobs 
with open('rundeck_jobs.csv', 'w') as f:
    with redirect_stdout(f):
    
      print("EXPORT_DATE;PROJET;JOB_ID;JOB_NAME;DERNIERE_EXECUTION")
      for i in sorted(dicallprojjobs.items()):
        for x in i[1]:
          print("{export_date};{projet};{job_id};{job_name};{last_result}".format(export_date=x['export_date'],projet=x['projet'],job_id=x['job_id'], job_name=x['job_name'], last_result=x['last_result']))
 

print("\nresultat exporté dans rundeck_jobs.csv")

[Powershell] – Les dates au format Iso 8601

J’ai récemment eu besoin dans un script Powershell de construire une requête HTTP pour Microsoft API Graph, cela m’a permis de d’utiliser la norme Iso 8601 pour réaliser mes filtres.

Le format Iso 8601

La norme Iso 8601 spécifie la représentation numérique de la date et de l’heure, respectivement basées sur le calendrier grégorien et le système horaire sur 24 heures.

Voici la mise en forme de cette norme.

AAAA-MM-JJTHH:MM:SS,ss-/+FF:ff

AAAA: Représente l’année sur 4 chiffres
MM: représente le mois sur 2 chiffres (de 01 à 12)
JJ: représente le jour sur 2 chiffres (de 01 à 31)
T: Indique le temps (Time)
HH: représente l’heure sur 2 chiffres (de 00 à 24)
MM: représente les minutes sur 2 chiffres (de 00 à 59)
SS: représente les secondes sur 2 chiffres (de 00 à 59)
ss: représente la fraction de secondes sur 1 ou 2 chiffres
+/-: représente le fuseau horaire, où ‘+’ indique une avance sur UTC et ‘-‘ un retard sur UTC
FF: représente le nombre d’heure d’avance ou de retard sur UTC
ff: représente le nombre de minute d’avance ou de retard sur UTC
Z: indique qu’il s’agit de l’heure au format UTC (pour méridien Zéro)

Comment l’utiliser avec Powershell

Donc pour formater la date et l’heure à la norme Iso 8601, il suffit d’utiliser la commande suivante :

(Get-Date).ToString("yyyy-MM-ddTHH:mm:ssZ")

Comme vous pourrez le constater, dans mon cas j’utilise la lettre « Z » pour obtenir directement le format UTC.

Python – Script pour lister les proprietes des blobs Azure

Le script python suivant se connecte a un compte de stockage Azure avec une cle de stockage et liste les proprietes des blobs d’un container, avec la possibilité d’exporter le resultat en csv.

Le parametre MaxResult est important et est directement lié au fait que la requete vers le container (voir fonction « list_all_blobs_in_container_by_lot ») s’effectue en generant un iterator (generator) qui recupere par lot (MaxResult) la liste des blobs.

Le parametre export_all_blobs_list est a False par defaut pour eviter de generer un fichier csv si le nombre de blobs est tres important (1 ligne/blob)

### azure_list_blobs_properties.py ###
##### Script Python pour lister les proprietes de blobs dans un container Azure Blob Storage #####

# PARAMETERES D'ENTREE DU SCRIPT :
# --storage_account_name : Nom du compte de stockage Azure (default: mystorageaccount)
# --Cledestockage : Clé de stockage Azure
# --nomcontainer : Nom du container de stockage Azure (default: mycontainer)
# --MaxResults : Nombre maximum de blobs à récupérer par lot (default: 10000)
# --export_all_blobs_list : Exporter la liste de tous les blobs dans un fichier CSV (True/False) (default: False)
# --exportfolder : Chemin du dossier pour les fichiers CSV (default: .)
# --fichierrapport : Chemin du fichier de rapport (default: myreport.txt)
# --help : Afficher l'aide du script

# USAGE :
# WARNING: Ne pas pas positionner a True le parametre export_all_blobs_list si le container cible contiens beaucoup de blobs. Le fichier csv generé sera très volumineux (1 ligne / blob).
# python.exe .\azure_list_blobs_properties.py --storage_account_name <nom_du_compte_de_stockage> --Cledestockage <clé_de_stockage> --nomcontainer <nom_du_container> --export_all_blobs_list True/False



import os
import argparse
from datetime import datetime, timezone
from azure.storage.blob import BlobServiceClient
import csv


if __name__ == '__main__':

        ###########################################
        # Arguments du script
        ###########################################
        
        parser = argparse.ArgumentParser()

        parser.add_argument(
        '--storage_account_name',
        type=str,
        default='mystorageaccount',
        help='Nom du compte de stockage Azure'
        )


        parser.add_argument(
        '--Cledestockage',
        type=str,
        help='Clé de stockage Azure'
        )

        parser.add_argument(
        '--nomcontainer',
        type=str,
        default='mycontainer',
        help='nom du container de stockage Azure'
        )

        parser.add_argument(
        '--MaxResults',
        type=int,
        default=10000,
        help='Nombre maximum de blobs à récupérer par lot (par défaut: 10000)'
        )

        parser.add_argument(
        '--export_all_blobs_list',
        choices=['True', 'False'],
        default='False',
        help='Exporter la liste de tous les blobs dans un fichier CSV'
        )

        # export folder
        parser.add_argument(
        '--exportfolder',
        type=str,
        default='.',
        help='Chemin du dossier pour les fichiers CSV'
        )

        # fichier de rapport
        parser.add_argument(
        '--fichierrapport',
        type=str,
        default='myreport.txt',
        help='Nom du fichier de rapport'
        )

        # On parse les arguments
        args = parser.parse_args()

        # AUTRES PARAMETRES
        # URL du compte de stockage Azure
        account_url = f"https://{args.storage_account_name}.blob.core.windows.net"

        # On signifie a Python que l'on utilise PAS de proxy pour les connexions vers Azure
        os.environ['NO_PROXY'] = '.blob.core.windows.net,login.microsoftonline.com,management.azure.com'

        
        

        ### FONCTION POUR ECRIRE UN MESSAGE DANS UN FICHIER DE RAPPORT
        def write_to_report_file(message, filename):
            """
            Écrit un message dans un fichier de rapport.

            :param message: Message à écrire dans le fichier.
            :param
            filename: Nom du fichier de rapport.
            """
            try:
                with open(args.fichierrapport, 'a', encoding='utf-8') as fichier:
                    fichier.write(message + '\n')
                print(f"Message ecrit dans le fichier de rapport '{args.fichierrapport}'.")
            except Exception as e:
                print(f"Erreur lors de l'ecriture dans le fichier {args.fichierrapport}: {e}")
        # Fin de la fonction write_to_report_file



         ## FONCTION POUR TESTER L'ACCES AU CONTAINER DE STOCKAGE AZURE
        def test_access_to_azure_storage_container(account_url, credential, container_name):
            """
            Teste l'accès à un container de stockage Azure Blob.
            
            :param account_url: URL du compte de stockage Azure.
            :param credential: Clé d'accès ou SAS token pour le compte de stockage.
            :param container_name: Nom du container à tester.
            """
            try:
                blob_service_client = BlobServiceClient(account_url=account_url, credential=credential)
                container_client = blob_service_client.get_container_client(container_name)
                
                # Vérification de la connexion
                container_properties = container_client.get_container_properties()
                print(f"Connexion reussie au container '{container_name}'.")
                
                return True
            except Exception as e:
                print(f"Erreur lors de l'accès au container '{container_name}' : {e}")
                return False
        # Fin de la fonction test_access_to_azure_storage_container
        


        ## FONCTION POUR LISTER TOUT LES BLOBS DANS UN CONTAINER

        def list_all_blobs_in_container_by_lot(container_client, max_results):
            """
            Liste les blobs dans un conteneur Azure Blob Storage par lot avec une limite.

            :param container_name: Nom du conteneur.
            :param max_results: Nombre maximum de blobs à récupérer par lot.
            :return: Un générateur qui retourne les blobs par lot.
            """
            try:
                

                # Liste les blobs par lot
                blob_iterator = container_client.list_blobs(results_per_page=max_results).by_page()
                for blob_page in blob_iterator:
                    yield list(blob_page)  # Retourne un lot de blobs sous forme de liste

            except Exception as e:
                print(f"Erreur lors de la récupération des blobs : {e}")





        # FONCTION POUR ECRIRE UN CSV
        def write_csv(data, filename):
            """
            Écrit les données dans un fichier CSV.

            :param data: Liste de dictionnaires contenant les données à écrire.
                        Chaque dictionnaire représente une ligne, avec les clés comme en-têtes.
            :param filename: Nom du fichier CSV à créer.
            """
            try:
                # Vérifier si la liste de données n'est pas vide
                if not data:
                    print("Aucune donnee a ecrire dans le fichier CSV.")
                    return

                # Ouvrir le fichier en mode écriture
                with open(filename, mode='w', newline='', encoding='utf-8') as csvfile:
                    # Créer un objet writer
                    writer = csv.DictWriter(csvfile, fieldnames=data[0].keys())

                    # Écrire l'en-tête
                    writer.writeheader()

                    # Écrire les lignes
                    writer.writerows(data)

                print(f"Les donnees ont ete ecrites avec succes dans le fichier '{filename}'.")
            except Exception as e:
                print(f"Erreur lors de l'ecriture dans le fichier CSV : {e}")

    

        #Debut du traitement
        print(f"Debut du traitement pour le container {args.nomcontainer}.")



        # TEST DE L'ACCES AU CONTAINER DE STOCKAGE AZURE
        print("Test de l'acces au container de stockage Azure...")
        test_access_to_azure_storage_container(
            account_url=account_url,
            credential=args.Cledestockage,
            container_name=args.nomcontainer
        )
        # Fin du test de l'accès au container de stockage Azure
        # #################################################
  
        # On recupere la date d'aujourd'hui
        today = datetime.now().replace(tzinfo=timezone.utc)

        # Instanciation du client de service Blob
        container = BlobServiceClient(
            account_url=account_url,
            credential=args.Cledestockage
        ).get_container_client(args.nomcontainer)



        all_blobs_generator = list_all_blobs_in_container_by_lot(
            container_client=container,
            max_results=args.MaxResults # Nombre maximum de blobs à récupérer par lot
        )

        # On verifie que le generator n'est pas vide
        if not all_blobs_generator:
            print(f"Aucun blob trouve dans le container '{args.nomcontainer}'.")
            exit(1)  # Sortie du script si aucun blob n'est trouvé
        # Fin de la récupération des blobs dans le container      



        # Initialisation de la liste pour stocker les infos de tout les blobs (utilisé uniquement si l'option export_all_blobs_list est a True)
        all_blobs_info = []
        
         

        # On parcourt tous les blobs du container
        print("\n"*2)
        print("#"* 80)
        print(f"Analyse et traitement des blobs dans le container {args.nomcontainer}...")
        print("#"* 80)
       
        # initialisation de l'index pour afficher le numero du lot et le nombre total de lot a la fin du traitement
        lotcount = 0
        # initialisation de l'index pour incrementer le nombre de blobs
        blobcount = 0

        try:
            
                for blob_lot in all_blobs_generator:
                    lotcount += 1
                    print(f"Nombre de blobs dans le lot {lotcount}: {len(blob_lot)}")
                    for blob in blob_lot:
                            # On incremente le nombre de blobs
                            blobcount += 1
                            # On recupere le client du blob
                            blob_client = container.get_blob_client(blob.name)                        
                            # Récupération des propriétés du blob
                            blob_properties = blob_client.get_blob_properties()

                            
                            # On alimente all_blobs_info
                            all_blobs_info.append({
                                    'Container': blob.container,
                                    'Blob_Name': blob.name,
                                    'Blob_Size': blob.size,
                                    'is_deleted': blob.deleted,
                                    'remaining_retention_days': blob.remaining_retention_days,
                                    'is_current_version': blob.is_current_version,
                                    'last_modified': blob.last_modified,
                                    'expiration': blob_properties.immutability_policy.expiry_time if blob_properties.immutability_policy else None
                                })
            

        except StopIteration:
                print("Fin de l'iterateur, tous les lots de blobs ont ete analyses et traites.")
        except Exception as e:
                print(f"Erreur lors de la recuperation des blobs : {e}")
            # Fin de la récupération des blobs dans le container


        # affichage du nombre lots analyses
        print("\n")
        print("#"* 80)
        print(f"Nombre total de lots de blobs analyses: {lotcount}")
        print("#"* 80)



        # STATISTIQUES SUR LES BLOBS
        print("\n")
        print("#"* 80)
        print(f"Nombre total de blobs dans le container {args.nomcontainer}: {blobcount}")
        print("#"* 80)


        # On affiche certaines proprietes de all_blobs_info
        print("\n")
        print("#"* 80)
        print(f"Contenu de la liste de tous les blobs du container {args.nomcontainer}:")
        for blob_info in all_blobs_info:
            print(f"Blob Name: {blob_info['Blob_Name']}, Size: {blob_info['Blob_Size']} octets, Expiration: {blob_info['expiration']}, Last Modified: {blob_info['last_modified']}")
        print("#"* 80)


        # ECRIRE LE CONTENU DE all_blobs_info DANS UN CSV (Si l'option export_all_blobs_list est a True)
        if args.export_all_blobs_list == 'True':
            print("\n")
            print("#"* 80)
            print(f"Ecriture de la liste de tout les blobs du container dans le fichier CSV avec la date d'expiration...")
            csv_filepath = f"{args.exportfolder}\\{args.nomcontainer}_all_blobs_info.csv"
            write_csv(data=all_blobs_info, filename=csv_filepath)
            print("#"* 80)

        


        # On ajoute un message de fin dans le fichier de rapport avec toutes les statistiques
        print("\n")
        print("#"* 80)
        msg = f"Fin du traitement pour le container {args.nomcontainer}\nLe traitement s'est opere en {lotcount} lots\nNombre total de blobs: {blobcount}\nFichier CSV des blobs: {csv_filepath if args.export_all_blobs_list == 'True' else 'Non exporté'}"
        print(msg)

        # Ecriture dans le fichier de rapport
        write_to_report_file(
        message = f"\n### {today.isoformat()} ###\n{msg}\n#########################################",
        filename=f"{args.exportfolder}\\{args.fichierrapport}"
        )
        

        print("#"* 80)
        print("\n")
        # Fin du script
        print("#"* 40 + " FIN DU SCRIPT " + "#"*40)