How to Automate Cloudflare Usage Monitoring

Deepanshu Bhalla Add Comment

If you are using Cloudflare products like Workers, R2, or D1, you have probably checked the dashboard more times than you would like.

It works, but it is manual, repetitive, and not very scalable. What if you could just run a script and instantly see your usage?That is exactly what we will do here.

Why automate Cloudflare usage tracking?

Cloudflare gives you powerful analytics, but:

  • You have to open the dashboard every time
  • You cannot easily combine data across services
  • Tracking trends daily or hourly becomes tedious

With the following python script, you can solve this by automating:

  • Workers usage: requests, errors, CPU time
  • R2 usage: reads, writes, total requests per bucket
  • D1 usage: queries, rows read and written per database

Steps to Automate Tracking for Workers, R2 & D1

1. Install dependency

pip install requests

2. Get your API credentials

From your Cloudflare dashboard:

  • Create an API token with Account Analytics Read
  • Copy your CF_API_TOKEN and CF_ACCOUNT_ID

3. Run Python Code

You can either set environment variables for API token and account ID like shown below or paste them directly inside the script if you prefer.

export CF_API_TOKEN="your_token_here"
export CF_ACCOUNT_ID="your_account_id"
 
import os
import requests
from datetime import datetime, timezone, timedelta

HAS_TABULATE = False

# ─── CONFIG ──────────────────────────────────────────────────────────────────
CF_API_TOKEN  = os.getenv("CF_API_TOKEN",  "xxxxxxxxxxxxxxxxxxx")
CF_ACCOUNT_ID = os.getenv("CF_ACCOUNT_ID", "xxxxxxxxxxxxxxxxxxx")

GRAPHQL_URL = "https://api.cloudflare.com/client/v4/graphql"
REST_BASE   = "https://api.cloudflare.com/client/v4"

HEADERS = {
    "Authorization": f"Bearer {CF_API_TOKEN}",
    "Content-Type":  "application/json",
}

# ─── HELPERS ─────────────────────────────────────────────────────────────────

def iso(dt: datetime) -> str:
    return dt.strftime("%Y-%m-%dT%H:%M:%SZ")

def fmt_bytes(b: int) -> str:
    for unit in ("B", "KB", "MB", "GB", "TB"):
        if abs(b) < 1024:
            return f"{b:.1f} {unit}"
        b /= 1024
    return f"{b:.1f} PB"

def print_table(title: str, rows: list[dict], keys: list[str] | None = None):
    print(f"\n{'─'*60}")
    print(f"  {title}")
    print(f"{'─'*60}")
    if not rows:
        print("  (no data)")
        return
    keys = keys or list(rows[0].keys())
    header = "  " + " | ".join(f"{k:<20}" for k in keys)
    print(header)
    print("  " + "-" * (len(header) - 2))
    for row in rows:
        print("  " + " | ".join(f"{str(row.get(k,'')):<20}" for k in keys))

def gql(query: str, variables: dict = None) -> dict:
    payload = {"query": query}
    if variables:
        payload["variables"] = variables
    r = requests.post(GRAPHQL_URL, headers=HEADERS, json=payload, timeout=30)
    r.raise_for_status()
    data = r.json()
    if errors := data.get("errors"):
        raise RuntimeError(f"GraphQL errors: {errors}")
    return data["data"]

def rest_get(path: str, params: dict = None) -> dict:
    r = requests.get(f"{REST_BASE}{path}", headers=HEADERS, params=params, timeout=30)
    r.raise_for_status()
    return r.json()


# ─── 3. WORKERS USAGE (GraphQL) ───────────────────────────────────────────────

def get_workers_usage(days: int = 30):
    print(f"\nWORKERS USAGE  (last {days} days)")
    now   = datetime.now(timezone.utc)
    start = now - timedelta(days=days)

    query = """
    query WorkersUsage($accountId: String!, $start: String!, $end: String!) {
      viewer {
        accounts(filter: {accountTag: $accountId}) {
          workersInvocationsAdaptive(
            limit: 10000
            filter: {datetime_geq: $start, datetime_leq: $end}
            ) {
              sum {
                requests
                errors
                subrequests
              }
              quantiles {
                cpuTimeP50
                cpuTimeP99
              }
              dimensions {
                scriptName
              }
            }
        }
      }
    }
    """
    try:
        data = gql(query, {"accountId": CF_ACCOUNT_ID,
                            "start": iso(start), "end": iso(now)})
        records = (data["viewer"]["accounts"][0]
                       .get("workersInvocationsAdaptive", []))
        rows = []
        for rec in records:
            s = rec.get("sum", {})
            q = rec.get("quantiles", {})
            rows.append({
                "Script":       rec["dimensions"].get("scriptName", "(unnamed)"),
                "Requests":     f"{s.get('requests', 0):,}",
                "Errors":       f"{s.get('errors', 0):,}",
                "CPU ms":       f"{(q.get('cpuTimeP50') or 0) / 1_000:.1f}",
                "CPU P99 ms":   f"{(q.get('cpuTimeP99') or 0) / 1_000:.1f}",
            })
        rows.sort(key=lambda r: int(r["Requests"].replace(",", "")), reverse=True)
        print_table("Workers Invocations by Script",
                    rows, ["Script", "Requests", "Errors", "CPU ms", "CPU P99 ms"])
    except Exception as e:
        print(f"  Workers data unavailable: {e}")

# ─── 4. R2 STORAGE & OPERATIONS (GraphQL) ────────────────────────────────────

def get_r2_usage(days: int = 30):
    print(f"\nR2 STORAGE & OPERATIONS  (last {days} days)")
    now   = datetime.now(timezone.utc)
    start = now - timedelta(days=days)

    query = """
    query R2Usage($accountId: String!, $start: String!, $end: String!) {
      viewer {
        accounts(filter: {accountTag: $accountId}) {
          r2OperationsAdaptiveGroups(
            limit: 10000
            filter: {datetime_geq: $start, datetime_leq: $end}
          ) {
            sum {
              requests
            }
            dimensions {
              bucketName
              actionType
            }
          }
        }
      }
    }
  """
    try:
        data = gql(query, {"accountId": CF_ACCOUNT_ID,
                            "start": iso(start), "end": iso(now)})
        records = (data["viewer"]["accounts"][0]
                       .get("r2OperationsAdaptiveGroups", []))

        # Aggregate per bucket
        buckets: dict[str, dict] = {}
        for rec in records:
            name = rec["dimensions"].get("bucketName", "(unknown)")
            action = (rec["dimensions"].get("actionType", "") or "").lower()
            s    = rec.get("sum", {})
            if name not in buckets:
                buckets[name] = {"requests": 0, "reads": 0, "writes": 0}
            buckets[name]["requests"] += s.get("requests", 0)

            # Classify operations into reads vs writes based on common R2 action types
            if any(k in action for k in ("get", "read", "head", "list")):
                buckets[name]["reads"] += s.get("requests", 0)
            if any(k in action for k in ("put", "post", "write", "delete")):
                buckets[name]["writes"] += s.get("requests", 0)

        rows = [{"Bucket": k,
                 "Requests": f"{v['requests']:,}",
                 "Reads": f"{v['reads']:,}",
                 "Writes": f"{v['writes']:,}"}
                for k, v in sorted(buckets.items(),
                                   key=lambda x: x[1]["requests"], reverse=True)]
        print_table("R2 Usage by Bucket", rows, ["Bucket", "Requests", "Reads", "Writes"])
    except Exception as e:
        print(f"  R2 data unavailable: {e}")

# ─── 5. D1 DATABASE USAGE (GraphQL) ──────────────────────────────────────────

def get_d1_usage(days: int = 30):
    print(f"\nD1 DATABASE USAGE  (last {days} days)")
    now   = datetime.now(timezone.utc)
    start = now - timedelta(days=days)

    # 1. Fetch database names via REST API to map IDs to friendly names
    db_names = {}
    try:
        res = rest_get(f"/accounts/{CF_ACCOUNT_ID}/d1/database")
        db_names = {db["uuid"]: db["name"] for db in res.get("result", []) if "uuid" in db}
    except Exception:
        pass

    # 2. Query GraphQL with datetime filters to match dashboard's precise windows
    query = """
    query D1Usage($accountId: String!, $start: String!, $end: String!) {
      viewer {
        accounts(filter: {accountTag: $accountId}) {
          d1AnalyticsAdaptiveGroups(
            limit: 1000
            filter: {datetime_geq: $start, datetime_leq: $end}
          ) {
            sum {
              readQueries
              writeQueries
              rowsRead
              rowsWritten
            }
            dimensions {
              databaseId
            }
          }
        }
      }
    }
    """
    try:
        data = gql(query, {"accountId": CF_ACCOUNT_ID,
                            "start": iso(start), "end": iso(now)})
        records = (data["viewer"]["accounts"][0]
                       .get("d1AnalyticsAdaptiveGroups", []))

        db_totals: dict[str, dict] = {}
        for rec in records:
            db_id = rec["dimensions"].get("databaseId", "?")
            s = rec.get("sum", {})
            if db_id not in db_totals:
                db_totals[db_id] = {"reads": 0, "writes": 0, "rowsRead": 0, "rowsWritten": 0}
            
            db_totals[db_id]["reads"] += (s.get("readQueries") or 0)
            db_totals[db_id]["writes"] += (s.get("writeQueries") or 0)
            db_totals[db_id]["rowsRead"] += (s.get("rowsRead") or 0)
            db_totals[db_id]["rowsWritten"] += (s.get("rowsWritten") or 0)

        rows = [{
            "Database":    db_names.get(db_id, db_id[:18]),
            "Read Rows":   f"{totals['rowsRead']:,}",
            "Write Rows":  f"{totals['rowsWritten']:,}",
            "Queries":     f"{totals['reads'] + totals['writes']:,}",
        } for db_id, totals in sorted(db_totals.items(),
                                      key=lambda item: item[1]["rowsRead"] + item[1]["rowsWritten"],
                                      reverse=True)]

        print_table("D1 Usage by Database (Billable Metrics)",
                    rows, ["Database", "Read Rows", "Write Rows", "Queries"])
    except Exception as e:
        print(f"  D1 data unavailable: {e}")


# ─── 7. SPENDING SUMMARY ─────────────────────────────────────────────────────

# ─── MAIN ─────────────────────────────────────────────────────────────────────

def main():
    print("=" * 60)
    print("  Cloudflare Billing & Usage Tracker")
    print(f"  Account: {CF_ACCOUNT_ID[:8]}…")
    print(f"  Run at:  {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
    print("=" * 60)

    get_workers_usage(days=1)
    get_r2_usage(days=1)
    get_d1_usage(days=1)

if __name__ == "__main__":
    main()

After running the code above, you will see output like this:

Output
============================================================
  Cloudflare Billing & Usage Tracker
  Account: exxxxxxxxxx…
  Run at:  2026-05-05 13:35 UTC
============================================================

WORKERS USAGE  (last 1 days)

────────────────────────────────────────────────────────────
  Workers Invocations by Script
────────────────────────────────────────────────────────────
  Script               | Requests             | Errors               | CPU ms               | CPU P99 ms          
  ----------------------------------------------------------------------------------------------------------------
  t-analysis           | 36,780               | 0                    | 17.1                 | 137.8               
  notify---purge       | 46                   | 0                    | 0.6                  | 11.7                
  d1-example           | 35                   | 0                    | 2.0                  | 11.9                

R2 STORAGE & OPERATIONS  (last 1 days)

────────────────────────────────────────────────────────────
  R2 Usage by Bucket
────────────────────────────────────────────────────────────
  Bucket               | Requests             | Reads                | Writes              
  -----------------------------------------------------------------------------------------
  stocks               | 38,746               | 29,987               | 8,759               
  tradedata            | 6,875                | 6,875                | 0                   
  etfs                 | 1,612                | 1,127                | 485                 
  logos                | 620                  | 620                  | 0                   
                       | 8                    | 8                    | 0                   

D1 DATABASE USAGE  (last 1 days)

────────────────────────────────────────────────────────────
  D1 Usage by Database (Billable Metrics)
────────────────────────────────────────────────────────────
  Database             | Read Rows            | Write Rows           | Queries             
  -----------------------------------------------------------------------------------------
  cxxxx-xxx-xxxxxxxx   | 28,166,287           | 3,715                | 8,372               

Related Posts
Spread the Word!
Share
About Author:
Deepanshu Bhalla

Deepanshu founded ListenData with a simple objective - Make analytics easy to understand and follow. He has over 10 years of experience in data science. During his tenure, he worked with global clients in various domains like Banking, Insurance, Private Equity, Telecom and HR.

Post Comment 0 Response to "How to Automate Cloudflare Usage Monitoring"
Next →