You’ve read the Polymarket API Guide, you’ve set up your wallet with the Coinbase Quickstart, and you can place a manual trade. Now you want a bot that trades for you.
This guide builds a complete Polymarket trading bot from scratch in Python. By the end, you’ll have a working bot with market scanning, signal generation, order execution, risk controls, and a paper trading mode — ready to go live.
Prerequisites:
- Python 3.10+
py-clob-clientinstalled (pip install py-clob-client)- A funded Polygon wallet (see the Coinbase Quickstart or the Agentic Wallets Guide)
- Basic familiarity with the Polymarket API
Bot Architecture
Every production trading bot has the same four components, regardless of strategy. Separate them cleanly and your bot becomes modular — swap strategies without touching execution logic, or upgrade your risk rules without rewriting your scanner.
┌──────────────────────────────────────────────────┐
│ MAIN LOOP │
│ │
│ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ MARKET │───▶│ SIGNAL GENERATOR │ │
│ │ SCANNER │ │ (Strategy Logic) │ │
│ └─────────────┘ └──────────┬──────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ RISK │◀──│ EXECUTION ENGINE │ │
│ │ MANAGER │───▶│ (Order Management) │ │
│ └─────────────┘ └─────────────────────────┘ │
│ │
└──────────────────────────────────────────────────┘
- Market Scanner fetches and filters markets from Polymarket’s APIs
- Signal Generator applies your strategy logic to produce trade signals
- Execution Engine translates signals into orders and tracks fills
- Risk Manager enforces position limits, drawdown halts, and kill switches
Let’s build each one.
Component 1: Market Scanner
The scanner’s job is to find tradeable markets. Polymarket lists thousands of markets, but most are illiquid, resolved, or irrelevant. Your scanner filters them down to a watchlist.
import requests
import time
class MarketScanner:
"""Scans Polymarket's Gamma API for tradeable markets."""
GAMMA_URL = "https://gamma-api.polymarket.com"
def __init__(self, min_volume=10_000, min_liquidity=5_000, max_markets=50):
self.min_volume = min_volume
self.min_liquidity = min_liquidity
self.max_markets = max_markets
def fetch_active_markets(self):
"""Fetch all active, non-resolved markets."""
markets = []
offset = 0
limit = 100
while True:
resp = requests.get(
f"{self.GAMMA_URL}/markets",
params={
"closed": "false",
"limit": limit,
"offset": offset,
"order": "volume24hr",
"ascending": "false",
}
)
batch = resp.json()
if not batch:
break
markets.extend(batch)
offset += limit
time.sleep(0.5) # Respect rate limits
return markets
def filter_markets(self, markets):
"""Filter to markets worth trading."""
watchlist = []
for m in markets:
volume_24h = float(m.get("volume24hr", 0))
liquidity = float(m.get("liquidityClob", 0))
# Skip low-volume and illiquid markets
if volume_24h < self.min_volume:
continue
if liquidity < self.min_liquidity:
continue
# Skip markets too close to resolution (price > 0.95 or < 0.05)
tokens = m.get("clobTokenIds", [])
if not tokens:
continue
price = float(m.get("outcomePrices", "[0.5,0.5]")
.strip("[]").split(",")[0])
if price > 0.95 or price < 0.05:
continue
watchlist.append({
"condition_id": m["conditionId"],
"question": m["question"],
"token_ids": tokens,
"price": price,
"volume_24h": volume_24h,
"liquidity": liquidity,
"slug": m.get("slug", ""),
})
# Sort by volume, take top N
watchlist.sort(key=lambda x: x["volume_24h"], reverse=True)
return watchlist[:self.max_markets]
def scan(self):
"""Full scan: fetch, filter, return watchlist."""
raw = self.fetch_active_markets()
return self.filter_markets(raw)
Customizing your scanner:
- Lower
min_volumeto find less-traded markets (potentially more alpha, but harder to exit) - Add keyword filters to focus on specific topics (politics, crypto, sports)
- Add a time-to-resolution filter to avoid markets that expire in <24 hours
For the full Gamma API reference, see the Polymarket API Guide.
Component 2: Signal Generator
The signal generator takes your watchlist and produces trade signals. We’ll start with the simplest useful strategy: threshold deviation — buy when a market’s price diverges significantly from its recent average.
from collections import defaultdict
from dataclasses import dataclass
@dataclass
class Signal:
condition_id: str
token_id: str
side: str # "BUY" or "SELL"
strength: float # 0.0 to 1.0
reason: str
class ThresholdSignalGenerator:
"""Generates signals when price deviates from moving average."""
def __init__(self, lookback=20, buy_threshold=-0.05, sell_threshold=0.05):
self.lookback = lookback
self.buy_threshold = buy_threshold # Buy when price is 5%+ below avg
self.sell_threshold = sell_threshold # Sell when price is 5%+ above avg
self.price_history = defaultdict(list)
def update_price(self, condition_id, price):
"""Record a new price observation."""
history = self.price_history[condition_id]
history.append(price)
# Keep only lookback period
if len(history) > self.lookback * 2:
self.price_history[condition_id] = history[-self.lookback:]
def generate_signals(self, watchlist):
"""Evaluate each market and return signals."""
signals = []
for market in watchlist:
cid = market["condition_id"]
current_price = market["price"]
self.update_price(cid, current_price)
history = self.price_history[cid]
if len(history) < self.lookback:
continue # Not enough data yet
avg_price = sum(history[-self.lookback:]) / self.lookback
deviation = (current_price - avg_price) / avg_price
if deviation <= self.buy_threshold:
# Price dropped below average — potential buy
signals.append(Signal(
condition_id=cid,
token_id=market["token_ids"][0], # YES token
side="BUY",
strength=min(abs(deviation) / 0.10, 1.0),
reason=f"Price {current_price:.3f} is {deviation:.1%} "
f"below {self.lookback}-period avg {avg_price:.3f}"
))
elif deviation >= self.sell_threshold:
# Price spiked above average — potential sell
signals.append(Signal(
condition_id=cid,
token_id=market["token_ids"][0],
side="SELL",
strength=min(abs(deviation) / 0.10, 1.0),
reason=f"Price {current_price:.3f} is {deviation:.1%} "
f"above {self.lookback}-period avg {avg_price:.3f}"
))
return signals
This is intentionally simple. For LLM-powered signal generation (using GPT-4, Claude, or open-source models to analyze news and social sentiment), see the Agent Intelligence Guide.
Component 3: Execution Engine
The execution engine translates signals into actual orders on Polymarket. It handles order placement, fill tracking, and position management via py_clob_client.
from py_clob_client.client import ClobClient
from py_clob_client.clob_types import OrderArgs, OrderType
import logging
logger = logging.getLogger("execution")
class ExecutionEngine:
"""Places and manages orders on Polymarket."""
def __init__(self, client: ClobClient, default_size=5.0):
self.client = client
self.default_size = default_size # Default order size in USDC
self.open_orders = {}
self.positions = {}
def execute_signal(self, signal, size=None):
"""Convert a signal into a limit order."""
size = size or self.default_size
# Get current best price from order book
book = self.client.get_order_book(signal.token_id)
if signal.side == "BUY":
# Place limit order slightly below best ask
best_ask = float(book["asks"][0]["price"]) if book["asks"] else None
if best_ask is None:
logger.warning(f"No asks for {signal.token_id}, skipping")
return None
price = round(best_ask - 0.01, 2) # 1 cent below best ask
else:
# Place limit order slightly above best bid
best_bid = float(book["bids"][0]["price"]) if book["bids"] else None
if best_bid is None:
logger.warning(f"No bids for {signal.token_id}, skipping")
return None
price = round(best_bid + 0.01, 2) # 1 cent above best bid
# Calculate number of shares
num_shares = size / price if price > 0 else 0
try:
order = self.client.create_order(
OrderArgs(
token_id=signal.token_id,
price=price,
size=num_shares,
side=signal.side,
)
)
signed = self.client.post_order(order, OrderType.GTC)
order_id = signed.get("orderID")
logger.info(
f"Placed {signal.side} order: {num_shares:.1f} shares "
f"@ {price} (order {order_id})"
)
if order_id:
self.open_orders[order_id] = {
"signal": signal,
"price": price,
"size": num_shares,
"status": "open",
}
return order_id
except Exception as e:
logger.error(f"Order failed: {e}")
return None
def sync_positions(self):
"""Refresh positions from the API."""
try:
positions = self.client.get_positions()
self.positions = {
p["asset"]["token_id"]: {
"size": float(p["size"]),
"avg_price": float(p.get("avgPrice", 0)),
}
for p in positions
if float(p["size"]) > 0
}
except Exception as e:
logger.error(f"Position sync failed: {e}")
def cancel_stale_orders(self, max_age_seconds=300):
"""Cancel orders that haven't filled within max_age."""
# In production, track order timestamps and cancel old ones
try:
open_orders = self.client.get_orders(
params={"state": "open"}
)
for order in open_orders:
# Cancel if older than max_age
self.client.cancel(order["id"])
logger.info(f"Cancelled stale order {order['id']}")
except Exception as e:
logger.error(f"Cancel failed: {e}")
For the complete py_clob_client method reference (every parameter, return type, and edge case), see the py_clob_client Reference.
Component 4: Risk Manager
The risk manager is the most important component. It prevents your bot from losing more than you can afford. Every trade passes through the risk manager before execution.
import os
import sys
class RiskManager:
"""Enforces trading limits and monitors risk."""
KILL_SWITCH_FILE = "/tmp/polymarket_bot_kill"
def __init__(
self,
max_position_size=50.0, # Max USDC in any single market
max_total_exposure=200.0, # Max USDC across all positions
max_drawdown_pct=0.10, # Halt if portfolio drops 10%
max_trades_per_hour=20, # Rate limit on trade execution
):
self.max_position_size = max_position_size
self.max_total_exposure = max_total_exposure
self.max_drawdown_pct = max_drawdown_pct
self.max_trades_per_hour = max_trades_per_hour
self.starting_balance = None
self.trade_timestamps = []
def check_kill_switch(self):
"""Halt immediately if kill switch is active."""
if os.path.exists(self.KILL_SWITCH_FILE):
print("KILL SWITCH ACTIVATED — halting all trading")
sys.exit(1)
def set_starting_balance(self, balance):
"""Record starting balance for drawdown tracking."""
if self.starting_balance is None:
self.starting_balance = balance
def check_drawdown(self, current_balance):
"""Halt if drawdown exceeds threshold."""
if self.starting_balance is None:
return True
drawdown = (self.starting_balance - current_balance) / self.starting_balance
if drawdown >= self.max_drawdown_pct:
print(f"DRAWDOWN HALT: {drawdown:.1%} exceeds "
f"{self.max_drawdown_pct:.1%} limit")
return False
return True
def check_trade_rate(self):
"""Enforce maximum trades per hour."""
now = time.time()
cutoff = now - 3600
self.trade_timestamps = [t for t in self.trade_timestamps if t > cutoff]
if len(self.trade_timestamps) >= self.max_trades_per_hour:
return False
return True
def approve_trade(self, signal, proposed_size, positions, balance):
"""Gate every trade through risk checks."""
self.check_kill_switch()
# Check drawdown
if not self.check_drawdown(balance):
return False, "Drawdown limit reached"
# Check trade rate
if not self.check_trade_rate():
return False, "Trade rate limit reached"
# Check position size
token_id = signal.token_id
current_position = positions.get(token_id, {}).get("size", 0)
new_position = current_position + proposed_size
if new_position > self.max_position_size:
return False, f"Position would exceed {self.max_position_size} limit"
# Check total exposure
total = sum(p.get("size", 0) for p in positions.values())
if total + proposed_size > self.max_total_exposure:
return False, f"Total exposure would exceed {self.max_total_exposure} limit"
# Check minimum balance reserve (keep 10% as buffer)
if proposed_size > balance * 0.9:
return False, "Insufficient balance (10% reserve)"
self.trade_timestamps.append(time.time())
return True, "Approved"
The kill switch is non-negotiable. To activate it in an emergency:
touch /tmp/polymarket_bot_kill
Your bot will halt on its next iteration. To resume, remove the file. For a deeper dive on kill switches, monitoring, and prompt injection defense, see Security Best Practices.
Putting It All Together: The Main Loop
Here’s the complete bot, wiring all four components together:
import time
import logging
from py_clob_client.client import ClobClient
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(message)s")
logger = logging.getLogger("bot")
def run_bot():
# --- Configuration ---
PRIVATE_KEY = os.environ["POLYMARKET_PRIVATE_KEY"]
CHAIN_ID = 137 # Polygon mainnet
SCAN_INTERVAL = 300 # 5 minutes between scans
ORDER_SIZE = 5.0 # USDC per trade
# --- Initialize components ---
client = ClobClient(
host="https://clob.polymarket.com",
key=PRIVATE_KEY,
chain_id=CHAIN_ID,
)
# Derive and set API credentials
client.set_api_creds(client.create_or_derive_api_creds())
scanner = MarketScanner(min_volume=10_000, min_liquidity=5_000)
signal_gen = ThresholdSignalGenerator(lookback=20, buy_threshold=-0.05)
executor = ExecutionEngine(client, default_size=ORDER_SIZE)
risk = RiskManager(
max_position_size=50.0,
max_total_exposure=200.0,
max_drawdown_pct=0.10,
)
# --- Main loop ---
logger.info("Bot starting...")
balance = float(client.get_balance())
risk.set_starting_balance(balance)
logger.info(f"Starting balance: ${balance:.2f}")
while True:
try:
risk.check_kill_switch()
# 1. Scan markets
watchlist = scanner.scan()
logger.info(f"Watchlist: {len(watchlist)} markets")
# 2. Generate signals
signals = signal_gen.generate_signals(watchlist)
logger.info(f"Signals: {len(signals)} opportunities")
# 3. Sync current state
executor.sync_positions()
balance = float(client.get_balance())
# 4. Execute approved trades
for signal in signals:
approved, reason = risk.approve_trade(
signal, ORDER_SIZE, executor.positions, balance
)
if approved:
order_id = executor.execute_signal(signal, ORDER_SIZE)
if order_id:
logger.info(f"Executed: {signal.reason}")
else:
logger.info(f"Rejected ({reason}): {signal.reason}")
# 5. Clean up stale orders
executor.cancel_stale_orders(max_age_seconds=600)
# 6. Log status
total_positions = len(executor.positions)
logger.info(
f"Balance: ${balance:.2f} | "
f"Positions: {total_positions} | "
f"Next scan in {SCAN_INTERVAL}s"
)
time.sleep(SCAN_INTERVAL)
except KeyboardInterrupt:
logger.info("Shutting down gracefully...")
break
except Exception as e:
logger.error(f"Loop error: {e}", exc_info=True)
time.sleep(60) # Back off on errors
if __name__ == "__main__":
run_bot()
Paper Trading Mode
Before risking real money, run your bot in paper trading mode. This executes the entire pipeline — scanning, signal generation, risk checks — but logs trades instead of submitting them.
class PaperExecutionEngine:
"""Drop-in replacement for ExecutionEngine that simulates trades."""
def __init__(self, starting_balance=100.0):
self.balance = starting_balance
self.positions = {}
self.trade_log = []
def execute_signal(self, signal, size=5.0):
"""Simulate a trade without hitting the API."""
trade = {
"timestamp": time.time(),
"token_id": signal.token_id,
"side": signal.side,
"size": size,
"reason": signal.reason,
}
self.trade_log.append(trade)
# Simulate position tracking
tid = signal.token_id
if signal.side == "BUY":
current = self.positions.get(tid, {"size": 0, "avg_price": 0})
current["size"] += size
self.positions[tid] = current
self.balance -= size
else:
if tid in self.positions:
self.positions[tid]["size"] -= size
self.balance += size
logger.info(f"[PAPER] {signal.side} ${size:.2f} — {signal.reason}")
return f"paper-{len(self.trade_log)}"
def sync_positions(self):
pass # Already tracked locally
def cancel_stale_orders(self, max_age_seconds=300):
pass # No real orders to cancel
def summary(self):
"""Print paper trading results."""
print(f"\n--- Paper Trading Summary ---")
print(f"Total trades: {len(self.trade_log)}")
print(f"Final balance: ${self.balance:.2f}")
print(f"Open positions: {len(self.positions)}")
for tid, pos in self.positions.items():
print(f" {tid[:12]}... : {pos['size']:.2f} shares")
Swap ExecutionEngine for PaperExecutionEngine in your main loop. Run for at least a week before going live. If your paper P&L is consistently negative, your strategy needs work — not more capital.
Going Live
When your paper trading results look good, here’s the checklist for going live:
1. Fund Your Wallet
If you’re using Coinbase Agentic Wallets (recommended for the spending limit safety net):
# Set up your agentic wallet
npx awal setup
# Configure spending limits BEFORE funding
npx awal config set session-cap 50
npx awal config set tx-limit 10
# Fund the wallet
npx awal fund 50 # Start with $50
If you’re using a direct Polygon wallet, bridge USDC to Polygon and ensure your bot has the private key in an environment variable (never hardcoded). See the Coinbase Agentic Wallets Guide for the full setup.
2. Start Small
Begin with order sizes of $1-5. Your first live week is about verifying that orders execute correctly, fills are tracked, and the risk manager works — not about making money.
3. Monitor Actively
For your first 48 hours live, watch the bot’s logs in real time. After that, set up alerts:
# Simple Discord webhook alert for trades and errors
import requests
def alert(message, webhook_url):
requests.post(webhook_url, json={"content": message})
# In your main loop:
alert(f"Trade executed: {signal.side} ${ORDER_SIZE} — {signal.reason}", WEBHOOK)
4. Gradual Scale-Up
- Week 1: $50 total, $5 per trade, 10 markets
- Week 2: $100 total, $10 per trade, 20 markets (if Week 1 was profitable)
- Week 3: $200 total, $20 per trade, 30 markets (if cumulative P&L is positive)
- Never: Go all-in on a strategy you haven’t tested for at least 2 weeks
5 Starter Strategies
The threshold strategy above is a starting point. Here are five strategies to explore, each targeting a different market inefficiency.
1. Mean Reversion
Thesis: Markets overreact to news. When a price spikes or drops sharply, it often reverts toward its prior level.
Signal: Buy when price drops >7% in one hour. Sell when price rises >7% in one hour.
Best for: High-liquidity markets with frequent news events (elections, economic data).
2. Momentum
Thesis: Markets underreact to sustained trends. A market moving in one direction tends to continue.
Signal: Buy when price has increased for 3+ consecutive observation periods. Sell when direction reverses.
Best for: Markets with clear catalysts (polling data, earnings dates).
3. New Market Sniper
Thesis: Newly created markets are priced inefficiently because few traders have analyzed them yet.
Signal: When a new market appears (created within last 6 hours), compare its price to your model’s estimate. If the mispricing exceeds your threshold, trade.
Best for: Agents with strong intelligence layers (see Agent Intelligence Guide).
4. Liquidation Sweeper
Thesis: When a large holder exits a position quickly, they crash the price below fair value.
Signal: Monitor order book depth. When a large sell order clears multiple price levels, buy the temporary dip.
Best for: High-volume markets where whale movements are visible. Requires WebSocket data — see the Polymarket API Guide.
5. Event Calendar
Thesis: Market prices adjust predictably around scheduled events (elections, court rulings, data releases).
Signal: Maintain a calendar of events tied to markets. Position before the event when odds are mispriced based on your analysis.
Best for: Political and economic markets with known resolution dates.
For an in-depth treatment of each strategy with backtesting frameworks and implementation details, see the upcoming Bot Strategies guide.
Production Deployment
When you’re ready to run 24/7, containerize your bot:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY bot/ ./bot/
ENV POLYMARKET_PRIVATE_KEY=""
ENV DISCORD_WEBHOOK=""
# Health check: verify the bot process is running
HEALTHCHECK --interval=30s --timeout=5s \
CMD pgrep -f "python.*bot" || exit 1
CMD ["python", "-m", "bot.main"]
# Build and run
docker build -t polymarket-bot .
docker run -d \
--name polymarket-bot \
-e POLYMARKET_PRIVATE_KEY=$POLYMARKET_PRIVATE_KEY \
-e DISCORD_WEBHOOK=$DISCORD_WEBHOOK \
--restart unless-stopped \
polymarket-bot
Production monitoring checklist:
- Logs are persisted (Docker volume or log aggregation service)
- Discord/Telegram alerts on every trade and every error
- Balance check every hour with alert if unexpected change
- Kill switch file is accessible from outside the container
- Automatic restart on crash (Docker
--restart unless-stopped) - Daily P&L summary sent to your alerts channel
For the complete security checklist including prompt injection defense, key management, and sandboxing, see Security Best Practices.
Further Reading
- Polymarket API Tutorial & Developer Guide — Complete API documentation
- py_clob_client Reference — Every SDK method with examples
- Polymarket Rate Limits Guide — Avoiding 429 errors
- Agent Intelligence Guide — LLM-powered signal generation
- Coinbase Agentic Wallets Guide — Wallet setup with spending limits
- Security Best Practices — Kill switches, key management, monitoring
- Agent Wallet Comparison — Choosing the right wallet
- Cross-Market Arbitrage — Trading across Polymarket and sportsbooks