---
title: Python Integration
lastUpdated: '2026-05-31'
nextjs:
  metadata:
    title: Python Integration - Hive Intelligence
    description: Use Hive Intelligence from Python scripts and services with requests, bearer auth, REST execution, tool discovery, and crypto data responses.
    alternates:
      canonical: https://www.hiveintelligence.xyz/sdk/python
---


Use Hive from scripts, notebooks, workers, and backend services with `requests`.

{% ai-doc-actions path="/sdk/python" title="Python Integration" /%}

---

## Installation

```bash
pip install requests
```

For the async examples later in this guide, also install `httpx`:

```bash
pip install httpx
```

---

## Quick Start

```python
import os
import requests

BASE_URL = "https://mcp.hiveintelligence.xyz"
API_KEY = os.environ["HIVE_API_KEY"]

def execute(tool: str, args: dict | None = None) -> dict:
    response = requests.post(
        f"{BASE_URL}/api/v1/execute",
        headers={"Authorization": f"Bearer {API_KEY}"},
        json={"tool": tool, "args": args or {}},
        timeout=30,
    )
    response.raise_for_status()
    payload = response.json()
    if not payload.get("ok", False):
        raise RuntimeError(payload.get("error", {}).get("message", "Hive execution failed"))
    return payload

price = execute("get_price", {
    "ids": "bitcoin",
    "vs_currencies": "usd",
})

print(price["data"]["bitcoin"]["usd"])
print(price["meta"]["fetched_at"])
```

---

## Reusable Client

```python
from __future__ import annotations

import requests


class HiveClient:
    def __init__(self, api_key: str, base_url: str = "https://mcp.hiveintelligence.xyz") -> None:
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({"Authorization": f"Bearer {api_key}"})

    def execute(self, tool: str, args: dict | None = None) -> dict:
        response = self.session.post(
            f"{self.base_url}/api/v1/execute",
            json={"tool": tool, "args": args or {}},
            timeout=30,
        )
        response.raise_for_status()
        payload = response.json()
        if not payload.get("ok", False):
            raise RuntimeError(payload.get("error", {}).get("message", "Hive execution failed"))
        return payload

    def list_tools(self, limit: int = 50) -> dict:
        response = self.session.get(
            f"{self.base_url}/api/v1/tools",
            params={"limit": limit},
            timeout=30,
        )
        response.raise_for_status()
        return response.json()


client = HiveClient(api_key="YOUR_HIVE_API_KEY")

market = client.execute("get_coins_market_data", {
    "vs_currency": "usd",
    "order": "market_cap_desc",
    "per_page": 5,
})

wallet = client.execute("alchemy_get_token_balances_by_wallet", {
    "address": "0x1234...",
    "network": "eth-mainnet",
})

print(market["data"])
print(wallet["data"])
```

---

## Common Patterns

### Security review

```python
security = client.execute("get_token_security", {
    "contract_addresses": "0x6982508145454Ce325dDbE47a25d4ec3d2311933",
    "chainId": "1",
})
```

### DeFi analytics

```python
pools = client.execute("get_yield_pools", {
    "chain": "ethereum",
})
```

### Wallet intelligence

```python
positions = client.execute("moralis_get_wallet_defi_positions", {
    "address": "0x1234...",
    "chain": "eth",
})
```

### Prediction markets

```python
markets = client.execute("codex_prediction_markets", {
    "networkId": 1,
})
```

---

## Error Handling

Hive returns a structured envelope on every request. Non-2xx responses carry a JSON body with an `error` object — parse it before retrying:

```python
import time
import requests


class HiveError(Exception):
    def __init__(self, code: str, message: str, http_status: int) -> None:
        super().__init__(f"{code}: {message}")
        self.code = code
        self.message = message
        self.http_status = http_status


def execute_with_retry(client: HiveClient, tool: str, args: dict, max_retries: int = 3) -> dict:
    for attempt in range(max_retries):
        try:
            response = client.session.post(
                f"{client.base_url}/api/v1/execute",
                json={"tool": tool, "args": args or {}},
                timeout=30,
            )
            if response.status_code == 429:
                # Respect Retry-After header (seconds); fall back to exponential.
                sleep_for = int(response.headers.get("Retry-After", 2 ** attempt))
                time.sleep(sleep_for)
                continue
            if response.status_code >= 500:
                time.sleep(2 ** attempt)
                continue
            response.raise_for_status()
            payload = response.json()
            if not payload.get("ok", False):
                message = payload.get("error", {}).get("message", "Hive execution failed")
                code = payload.get("error", {}).get("code", "execution_error")
                raise HiveError(code, message, response.status_code)
            return payload
        except requests.RequestException as exc:
            if attempt == max_retries - 1:
                raise HiveError("network_error", str(exc), 0) from exc
            time.sleep(2 ** attempt)
    raise HiveError("exhausted_retries", "All retries failed", 0)
```

Key HTTP codes to handle:

- **400** — invalid tool name or args. Check `GET /api/v1/tools` for the live shape.
- **401** — missing or invalid API key. Rotate via the dashboard; don't retry.
- **429** — rate-limited. Honor `Retry-After`; backoff otherwise.
- **502/503** — upstream provider failure. Exponential backoff.

---

## Async with httpx

For agent loops that fan out across many tools, switch to `httpx` with a shared `AsyncClient`:

```python
import asyncio
import httpx


class AsyncHiveClient:
    def __init__(self, api_key: str, base_url: str = "https://mcp.hiveintelligence.xyz") -> None:
        self.base_url = base_url
        self._client = httpx.AsyncClient(
            headers={"Authorization": f"Bearer {api_key}"},
            timeout=httpx.Timeout(30.0, connect=5.0),
            limits=httpx.Limits(max_connections=32),
        )

    async def execute(self, tool: str, args: dict | None = None) -> dict:
        response = await self._client.post(
            f"{self.base_url}/api/v1/execute",
            json={"tool": tool, "args": args or {}},
        )
        response.raise_for_status()
        payload = response.json()
        if not payload.get("ok", False):
            raise RuntimeError(payload.get("error", {}).get("message", "Hive execution failed"))
        return payload

    async def aclose(self) -> None:
        await self._client.aclose()


async def daily_brief(api_key: str) -> dict:
    client = AsyncHiveClient(api_key)
    try:
        prices, tvl, oi = await asyncio.gather(
            client.execute("get_price", {"ids": "bitcoin,ethereum", "vs_currencies": "usd"}),
            client.execute("get_protocol_tvl", {"protocol": "aave"}),
            client.execute("get_open_interest", {"exchange": "binance", "symbol": "BTC/USDT:USDT"}),
        )
        return {"prices": prices["data"], "tvl": tvl["data"], "oi": oi["data"]}
    finally:
        await client.aclose()
```

One key insight: Hive counts one credit per tool call regardless of concurrency, so fan-out is the preferred pattern for research agents.

---

## Tool Discovery

Don't hard-code tool schemas — fetch them at runtime so your agent picks up new tools without a redeploy:

```python
def discover_tools(client: HiveClient, search: str | None = None) -> list[dict]:
    cursor, items = None, []
    while True:
        params = {"limit": 200}
        if cursor:
            params["cursor"] = cursor
        if search:
            params["search"] = search
        page = client.session.get(
            f"{client.base_url}/api/v1/tools",
            params=params,
            timeout=30,
        ).json()
        items.extend(page.get("data", []))
        cursor = page.get("meta", {}).get("cursor")
        if not cursor:
            return items

wallet_tools = discover_tools(client, search="wallet")
```

---

## LangChain Integration

If your agent runs on LangChain, use `langchain-mcp-adapters` to expose Hive tools as native `BaseTool` objects — no manual schema wrapping:

```python
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
import os

API_KEY = os.environ["HIVE_API_KEY"]

client = MultiServerMCPClient({
    "hive": {
        "transport": "http",
        "url": "https://mcp.hiveintelligence.xyz/mcp",
        "headers": {"Authorization": f"Bearer {API_KEY}"},
    }
})
tools = await client.get_tools()
agent = create_agent("your-provider:your-model", tools)
```

---

## Notes

- The current REST payload keys are `tool` and `args`. Older `toolName` / `arguments` docs are legacy.
- Tool schemas are available from `GET /api/v1/tools` — always prefer live discovery over hardcoded definitions.
- Authenticated execution and discovery requests can count against quota once accepted by the backend. Cache discovery responses and validate inputs before sending requests.
- Every execution includes a `fetched_at` timestamp so agents can reason about staleness.

---

## Related

- [Integration Libraries](/sdk)
- [API Integration](/api-integration)
- [Live Catalog](/tools/live-catalog)
