https://github.com/excalidraw/excalidraw-mcp

In this Demo i will be showing how to integrate Excalidraw MCP into Claude Code to generate beautiful architecture diagrams from current dir source codes

Install The MCP in claude Code

claude mcp add excalidraw -- npx -y excalidraw-mcp --stdio

image.png

once installed you can verify it via either

claude mcp list

image.png

Demo

my current dir contains a boto3 automation python script of bit of cross account logic in it

script.py

"""
AWS Organizations - IAM Role Tag Scanner
=========================================
Lists all active AWS accounts from AWS Organizations, assumes the role
'rene-demo-ca-engineer' in each account, and scans all IAM roles that have
the tag value "Rene Solis". Results are saved to a CSV file.

Requirements:
    pip install boto3

Permissions needed (on the caller's identity):
    - organizations:ListAccounts
    - sts:AssumeRole  (for role 'rene-demo-ca-engineer' in every target account)

Permissions needed (on 'rene-demo-ca-engineer' in each target account):
    - iam:ListRoles
    - iam:ListRoleTags
"""

import boto3
import csv
import logging
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from botocore.exceptions import ClientError, NoCredentialsError

# ─── Configuration ────────────────────────────────────────────────────────────
ASSUME_ROLE_NAME   = "rene-demo-ca-engineer"
TARGET_TAG_VALUE   = "Rene Solis"          # Tag value to search for (any key)
MAX_WORKERS        = 20                    # Threads (tune to your API limits)
SESSION_DURATION   = 3600                  # Seconds for assumed-role session
OUTPUT_FILE        = "iam_role_scan_findings.csv"
LOG_LEVEL          = logging.INFO
# ──────────────────────────────────────────────────────────────────────────────

logging.basicConfig(
    level=LOG_LEVEL,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)

# ─── Step 1: List all ACTIVE accounts from AWS Organizations ─────────────────

def list_active_accounts() -> list[dict]:
    """Return a list of active account dicts from AWS Organizations."""
    org_client = boto3.client("organizations")
    accounts = []
    paginator = org_client.get_paginator("list_accounts")

    logger.info("Fetching active accounts from AWS Organizations …")
    for page in paginator.paginate():
        for acct in page["Accounts"]:
            if acct["Status"] == "ACTIVE":
                accounts.append(
                    {
                        "AccountId": acct["Id"],
                        "AccountName": acct.get("Name", ""),
                        "Email": acct.get("Email", ""),
                    }
                )

    logger.info(f"Found {len(accounts)} active account(s).")
    return accounts

# ─── Step 2: Assume role in a target account ─────────────────────────────────

def assume_role(account_id: str, role_name: str) -> boto3.Session | None:
    """
    Assume 'role_name' in 'account_id' and return a boto3 Session,
    or None if the assumption fails.
    """
    role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
    sts_client = boto3.client("sts")

    try:
        response = sts_client.assume_role(
            RoleArn=role_arn,
            RoleSessionName="IAMTagScanSession",
            DurationSeconds=SESSION_DURATION,
        )
        creds = response["Credentials"]
        session = boto3.Session(
            aws_access_key_id=creds["AccessKeyId"],
            aws_secret_access_key=creds["SecretAccessKey"],
            aws_session_token=creds["SessionToken"],
        )
        return session

    except ClientError as exc:
        error_code = exc.response["Error"]["Code"]
        logger.warning(
            f"[{account_id}] Could not assume role '{role_name}': "
            f"{error_code} – {exc.response['Error']['Message']}"
        )
        return None

# ─── Step 3: Scan IAM roles in an account for the target tag value ────────────

def get_role_tags(iam_client, role_name: str) -> list[dict]:
    """Return all tags attached to an IAM role."""
    tags = []
    paginator = iam_client.get_paginator("list_role_tags")
    for page in paginator.paginate(RoleName=role_name):
        tags.extend(page.get("Tags", []))
    return tags

def scan_account(account: dict) -> list[dict]:
    """
    Assume the role in 'account', then list all IAM roles whose tags include
    a tag whose Value == TARGET_TAG_VALUE.

    Returns a list of finding dicts (one per matching role).
    """
    account_id   = account["AccountId"]
    account_name = account["AccountName"]
    findings     = []

    # Assume role
    session = assume_role(account_id, ASSUME_ROLE_NAME)
    if session is None:
        return [
            {
                "AccountId":   account_id,
                "AccountName": account_name,
                "RoleName":    "N/A",
                "RoleArn":     "N/A",
                "TagKey":      "N/A",
                "TagValue":    "N/A",
                "Status":      "ASSUME_ROLE_FAILED",
                "ScannedAt":   datetime.now(timezone.utc).isoformat(),
            }
        ]

    iam_client = session.client("iam")

    try:
        role_paginator = iam_client.get_paginator("list_roles")
        for page in role_paginator.paginate():
            for role in page["Roles"]:
                role_name = role["RoleName"]
                role_arn  = role["Arn"]

                try:
                    tags = get_role_tags(iam_client, role_name)
                except ClientError as exc:
                    logger.debug(
                        f"[{account_id}] Skipping role '{role_name}': "
                        f"{exc.response['Error']['Code']}"
                    )
                    continue

                # Check if any tag value matches TARGET_TAG_VALUE
                for tag in tags:
                    if tag.get("Value") == TARGET_TAG_VALUE:
                        findings.append(
                            {
                                "AccountId":   account_id,
                                "AccountName": account_name,
                                "RoleName":    role_name,
                                "RoleArn":     role_arn,
                                "TagKey":      tag["Key"],
                                "TagValue":    tag["Value"],
                                "Status":      "FOUND",
                                "ScannedAt":   datetime.now(timezone.utc).isoformat(),
                            }
                        )

    except ClientError as exc:
        logger.warning(
            f"[{account_id}] Error listing roles: "
            f"{exc.response['Error']['Code']} – {exc.response['Error']['Message']}"
        )
        return [
            {
                "AccountId":   account_id,
                "AccountName": account_name,
                "RoleName":    "N/A",
                "RoleArn":     "N/A",
                "TagKey":      "N/A",
                "TagValue":    "N/A",
                "Status":      "LIST_ROLES_FAILED",
                "ScannedAt":   datetime.now(timezone.utc).isoformat(),
            }
        ]

    logger.info(
        f"[{account_id}] {account_name}: {len(findings)} matching role(s) found."
    )
    return findings

# ─── Step 4: Multithreaded orchestration ─────────────────────────────────────

def run_scan(accounts: list[dict]) -> list[dict]:
    """Scan all accounts concurrently and aggregate findings."""
    all_findings = []
    total = len(accounts)

    logger.info(
        f"Starting scan across {total} account(s) with {MAX_WORKERS} worker thread(s) …"
    )

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_account = {
            executor.submit(scan_account, acct): acct for acct in accounts
        }

        completed = 0
        for future in as_completed(future_to_account):
            acct = future_to_account[future]
            completed += 1
            try:
                results = future.result()
                all_findings.extend(results)
            except Exception as exc:
                logger.error(
                    f"[{acct['AccountId']}] Unexpected error: {exc}", exc_info=True
                )
            finally:
                logger.info(f"Progress: {completed}/{total} accounts processed.")

    return all_findings

# ─── Step 5: Write CSV output ─────────────────────────────────────────────────

CSV_FIELDS = [
    "AccountId",
    "AccountName",
    "RoleName",
    "RoleArn",
    "TagKey",
    "TagValue",
    "Status",
    "ScannedAt",
]

def write_csv(findings: list[dict], output_path: str) -> None:
    """Write findings to a CSV file."""
    with open(output_path, "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=CSV_FIELDS)
        writer.writeheader()
        writer.writerows(findings)
    logger.info(f"Results written to: {os.path.abspath(output_path)}")

# ─── Entry point ──────────────────────────────────────────────────────────────

def main():
    start_time = datetime.now()
    logger.info("=" * 60)
    logger.info("AWS IAM Role Tag Scanner")
    logger.info(f"  Role to assume : {ASSUME_ROLE_NAME}")
    logger.info(f"  Tag value      : {TARGET_TAG_VALUE}")
    logger.info(f"  Output file    : {OUTPUT_FILE}")
    logger.info(f"  Max threads    : {MAX_WORKERS}")
    logger.info("=" * 60)

    try:
        accounts = list_active_accounts()
    except (ClientError, NoCredentialsError) as exc:
        logger.error(f"Failed to list accounts from AWS Organizations: {exc}")
        raise SystemExit(1)

    if not accounts:
        logger.warning("No active accounts found. Exiting.")
        return

    findings = run_scan(accounts)

    # Summary
    found_count  = sum(1 for f in findings if f["Status"] == "FOUND")
    failed_count = sum(1 for f in findings if "FAILED" in f["Status"])
    logger.info("=" * 60)
    logger.info(f"Scan complete.")
    logger.info(f"  Accounts scanned    : {len(accounts)}")
    logger.info(f"  Matching roles found: {found_count}")
    logger.info(f"  Accounts with errors: {failed_count}")
    logger.info(f"  Elapsed time        : {datetime.now() - start_time}")
    logger.info("=" * 60)

    write_csv(findings, OUTPUT_FILE)

if __name__ == "__main__":
    main()

now lets say i want to have its architecture diagram what the script is doing and how its doing , then i can get the arch diagram generated by claude code itself via our excalidraw MCP