If you are using Cloudflare products like Workers, R2, or D1, you have probably checked the dashboard more times than you would like.
It works, but it is manual, repetitive, and not very scalable. What if you could just run a script and instantly see your usage?That is exactly what we will do here.
Why automate Cloudflare usage tracking?
Cloudflare gives you powerful analytics, but:
- You have to open the dashboard every time
- You cannot easily combine data across services
- Tracking trends daily or hourly becomes tedious
With the following python script, you can solve this by automating:
- Workers usage: requests, errors, CPU time
- R2 usage: reads, writes, total requests per bucket
- D1 usage: queries, rows read and written per database
Steps to Automate Tracking for Workers, R2 & D1
1. Install dependency
pip install requests
2. Get your API credentials
From your Cloudflare dashboard:
- Create an API token with Account Analytics Read
- Copy your CF_API_TOKEN and CF_ACCOUNT_ID
3. Run Python Code
You can either set environment variables for API token and account ID like shown below or paste them directly inside the script if you prefer.
export CF_API_TOKEN="your_token_here"
export CF_ACCOUNT_ID="your_account_id"
import os
import requests
from datetime import datetime, timezone, timedelta
HAS_TABULATE = False
# ─── CONFIG ──────────────────────────────────────────────────────────────────
CF_API_TOKEN = os.getenv("CF_API_TOKEN", "xxxxxxxxxxxxxxxxxxx")
CF_ACCOUNT_ID = os.getenv("CF_ACCOUNT_ID", "xxxxxxxxxxxxxxxxxxx")
GRAPHQL_URL = "https://api.cloudflare.com/client/v4/graphql"
REST_BASE = "https://api.cloudflare.com/client/v4"
HEADERS = {
"Authorization": f"Bearer {CF_API_TOKEN}",
"Content-Type": "application/json",
}
# ─── HELPERS ─────────────────────────────────────────────────────────────────
def iso(dt: datetime) -> str:
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
def fmt_bytes(b: int) -> str:
for unit in ("B", "KB", "MB", "GB", "TB"):
if abs(b) < 1024:
return f"{b:.1f} {unit}"
b /= 1024
return f"{b:.1f} PB"
def print_table(title: str, rows: list[dict], keys: list[str] | None = None):
print(f"\n{'─'*60}")
print(f" {title}")
print(f"{'─'*60}")
if not rows:
print(" (no data)")
return
keys = keys or list(rows[0].keys())
header = " " + " | ".join(f"{k:<20}" for k in keys)
print(header)
print(" " + "-" * (len(header) - 2))
for row in rows:
print(" " + " | ".join(f"{str(row.get(k,'')):<20}" for k in keys))
def gql(query: str, variables: dict = None) -> dict:
payload = {"query": query}
if variables:
payload["variables"] = variables
r = requests.post(GRAPHQL_URL, headers=HEADERS, json=payload, timeout=30)
r.raise_for_status()
data = r.json()
if errors := data.get("errors"):
raise RuntimeError(f"GraphQL errors: {errors}")
return data["data"]
def rest_get(path: str, params: dict = None) -> dict:
r = requests.get(f"{REST_BASE}{path}", headers=HEADERS, params=params, timeout=30)
r.raise_for_status()
return r.json()
# ─── 3. WORKERS USAGE (GraphQL) ───────────────────────────────────────────────
def get_workers_usage(days: int = 30):
print(f"\nWORKERS USAGE (last {days} days)")
now = datetime.now(timezone.utc)
start = now - timedelta(days=days)
query = """
query WorkersUsage($accountId: String!, $start: String!, $end: String!) {
viewer {
accounts(filter: {accountTag: $accountId}) {
workersInvocationsAdaptive(
limit: 10000
filter: {datetime_geq: $start, datetime_leq: $end}
) {
sum {
requests
errors
subrequests
}
quantiles {
cpuTimeP50
cpuTimeP99
}
dimensions {
scriptName
}
}
}
}
}
"""
try:
data = gql(query, {"accountId": CF_ACCOUNT_ID,
"start": iso(start), "end": iso(now)})
records = (data["viewer"]["accounts"][0]
.get("workersInvocationsAdaptive", []))
rows = []
for rec in records:
s = rec.get("sum", {})
q = rec.get("quantiles", {})
rows.append({
"Script": rec["dimensions"].get("scriptName", "(unnamed)"),
"Requests": f"{s.get('requests', 0):,}",
"Errors": f"{s.get('errors', 0):,}",
"CPU ms": f"{(q.get('cpuTimeP50') or 0) / 1_000:.1f}",
"CPU P99 ms": f"{(q.get('cpuTimeP99') or 0) / 1_000:.1f}",
})
rows.sort(key=lambda r: int(r["Requests"].replace(",", "")), reverse=True)
print_table("Workers Invocations by Script",
rows, ["Script", "Requests", "Errors", "CPU ms", "CPU P99 ms"])
except Exception as e:
print(f" Workers data unavailable: {e}")
# ─── 4. R2 STORAGE & OPERATIONS (GraphQL) ────────────────────────────────────
def get_r2_usage(days: int = 30):
print(f"\nR2 STORAGE & OPERATIONS (last {days} days)")
now = datetime.now(timezone.utc)
start = now - timedelta(days=days)
query = """
query R2Usage($accountId: String!, $start: String!, $end: String!) {
viewer {
accounts(filter: {accountTag: $accountId}) {
r2OperationsAdaptiveGroups(
limit: 10000
filter: {datetime_geq: $start, datetime_leq: $end}
) {
sum {
requests
}
dimensions {
bucketName
actionType
}
}
}
}
}
"""
try:
data = gql(query, {"accountId": CF_ACCOUNT_ID,
"start": iso(start), "end": iso(now)})
records = (data["viewer"]["accounts"][0]
.get("r2OperationsAdaptiveGroups", []))
# Aggregate per bucket
buckets: dict[str, dict] = {}
for rec in records:
name = rec["dimensions"].get("bucketName", "(unknown)")
action = (rec["dimensions"].get("actionType", "") or "").lower()
s = rec.get("sum", {})
if name not in buckets:
buckets[name] = {"requests": 0, "reads": 0, "writes": 0}
buckets[name]["requests"] += s.get("requests", 0)
# Classify operations into reads vs writes based on common R2 action types
if any(k in action for k in ("get", "read", "head", "list")):
buckets[name]["reads"] += s.get("requests", 0)
if any(k in action for k in ("put", "post", "write", "delete")):
buckets[name]["writes"] += s.get("requests", 0)
rows = [{"Bucket": k,
"Requests": f"{v['requests']:,}",
"Reads": f"{v['reads']:,}",
"Writes": f"{v['writes']:,}"}
for k, v in sorted(buckets.items(),
key=lambda x: x[1]["requests"], reverse=True)]
print_table("R2 Usage by Bucket", rows, ["Bucket", "Requests", "Reads", "Writes"])
except Exception as e:
print(f" R2 data unavailable: {e}")
# ─── 5. D1 DATABASE USAGE (GraphQL) ──────────────────────────────────────────
def get_d1_usage(days: int = 30):
print(f"\nD1 DATABASE USAGE (last {days} days)")
now = datetime.now(timezone.utc)
start = now - timedelta(days=days)
# 1. Fetch database names via REST API to map IDs to friendly names
db_names = {}
try:
res = rest_get(f"/accounts/{CF_ACCOUNT_ID}/d1/database")
db_names = {db["uuid"]: db["name"] for db in res.get("result", []) if "uuid" in db}
except Exception:
pass
# 2. Query GraphQL with datetime filters to match dashboard's precise windows
query = """
query D1Usage($accountId: String!, $start: String!, $end: String!) {
viewer {
accounts(filter: {accountTag: $accountId}) {
d1AnalyticsAdaptiveGroups(
limit: 1000
filter: {datetime_geq: $start, datetime_leq: $end}
) {
sum {
readQueries
writeQueries
rowsRead
rowsWritten
}
dimensions {
databaseId
}
}
}
}
}
"""
try:
data = gql(query, {"accountId": CF_ACCOUNT_ID,
"start": iso(start), "end": iso(now)})
records = (data["viewer"]["accounts"][0]
.get("d1AnalyticsAdaptiveGroups", []))
db_totals: dict[str, dict] = {}
for rec in records:
db_id = rec["dimensions"].get("databaseId", "?")
s = rec.get("sum", {})
if db_id not in db_totals:
db_totals[db_id] = {"reads": 0, "writes": 0, "rowsRead": 0, "rowsWritten": 0}
db_totals[db_id]["reads"] += (s.get("readQueries") or 0)
db_totals[db_id]["writes"] += (s.get("writeQueries") or 0)
db_totals[db_id]["rowsRead"] += (s.get("rowsRead") or 0)
db_totals[db_id]["rowsWritten"] += (s.get("rowsWritten") or 0)
rows = [{
"Database": db_names.get(db_id, db_id[:18]),
"Read Rows": f"{totals['rowsRead']:,}",
"Write Rows": f"{totals['rowsWritten']:,}",
"Queries": f"{totals['reads'] + totals['writes']:,}",
} for db_id, totals in sorted(db_totals.items(),
key=lambda item: item[1]["rowsRead"] + item[1]["rowsWritten"],
reverse=True)]
print_table("D1 Usage by Database (Billable Metrics)",
rows, ["Database", "Read Rows", "Write Rows", "Queries"])
except Exception as e:
print(f" D1 data unavailable: {e}")
# ─── 7. SPENDING SUMMARY ─────────────────────────────────────────────────────
# ─── MAIN ─────────────────────────────────────────────────────────────────────
def main():
print("=" * 60)
print(" Cloudflare Billing & Usage Tracker")
print(f" Account: {CF_ACCOUNT_ID[:8]}…")
print(f" Run at: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
print("=" * 60)
get_workers_usage(days=1)
get_r2_usage(days=1)
get_d1_usage(days=1)
if __name__ == "__main__":
main()
After running the code above, you will see output like this:
Output
============================================================
Cloudflare Billing & Usage Tracker
Account: exxxxxxxxxx…
Run at: 2026-05-05 13:35 UTC
============================================================
WORKERS USAGE (last 1 days)
────────────────────────────────────────────────────────────
Workers Invocations by Script
────────────────────────────────────────────────────────────
Script | Requests | Errors | CPU ms | CPU P99 ms
----------------------------------------------------------------------------------------------------------------
t-analysis | 36,780 | 0 | 17.1 | 137.8
notify---purge | 46 | 0 | 0.6 | 11.7
d1-example | 35 | 0 | 2.0 | 11.9
R2 STORAGE & OPERATIONS (last 1 days)
────────────────────────────────────────────────────────────
R2 Usage by Bucket
────────────────────────────────────────────────────────────
Bucket | Requests | Reads | Writes
-----------------------------------------------------------------------------------------
stocks | 38,746 | 29,987 | 8,759
tradedata | 6,875 | 6,875 | 0
etfs | 1,612 | 1,127 | 485
logos | 620 | 620 | 0
| 8 | 8 | 0
D1 DATABASE USAGE (last 1 days)
────────────────────────────────────────────────────────────
D1 Usage by Database (Billable Metrics)
────────────────────────────────────────────────────────────
Database | Read Rows | Write Rows | Queries
-----------------------------------------------------------------------------------------
cxxxx-xxx-xxxxxxxx | 28,166,287 | 3,715 | 8,372

Share Share Tweet