python
---
sidebar_position: 2
title: "Python Examples"
description: "Complete Python code examples"
---
# Python Examples
Complete, production-ready code examples for integrating Kixago API in Python applications.
---
## Quick Start
### Installation
```bash
# Required: HTTP client
pip install requests
# Optional: For type safety
pip install pydantic
# Optional: For async support
pip install aiohttp
# Optional: For caching
pip install redis
# Optional: For web API
pip install fastapi uvicorn
# Optional: For testing
pip install pytest pytest-asyncio
Environment Setup
Store your API key in environment variables:
# .env
KIXAGO_API_KEY=kixakey_your_key_here
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
KIXAGO_API_KEY = os.getenv('KIXAGO_API_KEY')
Basic Examples
Example 1: Simple Request
# basic_example.py
import os
import requests
def get_risk_profile(wallet_address: str) -> dict:
"""Fetch risk profile for a wallet address."""
response = requests.get(
f"https://api.kixago.com/v1/risk-profile/{wallet_address}",
headers={
'X-API-Key': os.getenv('KIXAGO_API_KEY')
}
)
response.raise_for_status() # Raises HTTPError for bad responses
return response.json()
# Usage
if __name__ == '__main__':
profile = get_risk_profile('0xf0bb20865277aBd641a307eCe5Ee04E79073416C')
print(f"DeFi Score: {profile['defi_score']['defi_score']}")
print(f"Risk Level: {profile['defi_score']['risk_level']}")
print(f"Health Factor: {profile['global_health_factor']:.2f}")
Example 2: Type-Safe Client with Pydantic
# models.py
from typing import Optional, List, Literal
from pydantic import BaseModel, Field
from datetime import datetime
class TokenDetail(BaseModel):
token: str
amount: float
usd_value: float
token_address: str
class LendingPosition(BaseModel):
protocol: str
protocol_version: str
chain: str
user_address: str
collateral_usd: float
borrowed_usd: float
health_factor: float
ltv_current: float
is_at_risk: bool
collateral_details: Optional[List[TokenDetail]] = None
borrowed_details: Optional[List[TokenDetail]] = None
last_updated: str
class ComponentScore(BaseModel):
component_score: float
weight: float
weighted_contribution: float
reasoning: str
class ScoreBreakdown(BaseModel):
health_factor_score: ComponentScore
leverage_score: ComponentScore
diversification_score: ComponentScore
volatility_score: ComponentScore
protocol_risk_score: ComponentScore
total_internal_score: float
class RiskFactor(BaseModel):
severity: Literal['critical', 'high', 'medium', 'low']
factor: str
description: str
impact_on_score: int
class Recommendations(BaseModel):
immediate: List[str]
short_term: List[str]
long_term: List[str]
class LiquidationScenario(BaseModel):
event: str
new_health_factor: float
status: Literal['Safe', 'Near liquidation', 'LIQUIDATED']
time_estimate: Optional[str] = None
estimated_loss: Optional[str] = None
class LiquidationSimulation(BaseModel):
current_health_factor: float
liquidation_threshold: float
buffer_percentage: float
scenarios: List[LiquidationScenario]
class DeFiScore(BaseModel):
defi_score: int
risk_level: str
risk_category: Literal[
'VERY_LOW_RISK',
'LOW_RISK',
'MODERATE_RISK',
'HIGH_RISK',
'URGENT_ACTION_REQUIRED'
]
color: Literal['green', 'blue', 'yellow', 'orange', 'red']
score_breakdown: ScoreBreakdown
risk_factors: List[RiskFactor]
recommendations: Recommendations
liquidation_simulation: LiquidationSimulation
calculated_at: str
class RiskProfileResponse(BaseModel):
wallet_address: str
total_collateral_usd: float
total_borrowed_usd: float
global_health_factor: float
global_ltv: float
positions_at_risk_count: int
last_updated: str
aggregation_duration: str
lending_positions: List[LendingPosition]
defi_score: Optional[DeFiScore] = None
aggregation_errors: Optional[dict] = Field(default_factory=dict)
# client.py
import requests
from models import RiskProfileResponse
class KixagoClient:
"""Type-safe Kixago API client."""
def __init__(self, api_key: str, base_url: str = 'https://api.kixago.com'):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({'X-API-Key': api_key})
def get_risk_profile(self, wallet_address: str) -> RiskProfileResponse:
"""
Get complete risk profile for a wallet.
Args:
wallet_address: Ethereum address or ENS name
Returns:
RiskProfileResponse with full type safety
Raises:
requests.HTTPError: On HTTP errors
pydantic.ValidationError: On invalid response format
"""
response = self.session.get(
f"{self.base_url}/v1/risk-profile/{wallet_address}"
)
response.raise_for_status()
# Pydantic automatically validates the response
return RiskProfileResponse(**response.json())
def close(self):
"""Close the HTTP session."""
self.session.close()
# Usage
if __name__ == '__main__':
client = KixagoClient(os.getenv('KIXAGO_API_KEY'))
try:
profile = client.get_risk_profile('0xf0bb20865277aBd641a307eCe5Ee04E79073416C')
# Type-safe access to all fields
print(f"DeFi Score: {profile.defi_score.defi_score}")
print(f"Risk Level: {profile.defi_score.risk_level}")
print(f"Health Factor: {profile.global_health_factor:.2f}")
# Iterate positions with type safety
for position in profile.lending_positions:
print(f"\n{position.protocol} on {position.chain}")
print(f" Collateral: ${position.collateral_usd:,.2f}")
print(f" Health Factor: {position.health_factor:.3f}")
finally:
client.close()
Error Handling
Example 3: Comprehensive Error Handling
# error_handling.py
import requests
from typing import Optional
class KixagoAPIError(Exception):
"""Base exception for Kixago API errors."""
def __init__(
self,
message: str,
status_code: Optional[int] = None,
response_data: Optional[dict] = None
):
super().__init__(message)
self.status_code = status_code
self.response_data = response_data
def get_risk_profile_safe(wallet_address: str) -> RiskProfileResponse:
"""
Fetch risk profile with comprehensive error handling.
Raises:
KixagoAPIError: On API errors with detailed information
"""
try:
response = requests.get(
f"https://api.kixago.com/v1/risk-profile/{wallet_address}",
headers={'X-API-Key': os.getenv('KIXAGO_API_KEY')},
timeout=30 # 30 second timeout
)
# Handle HTTP errors
if not response.ok:
error_data = response.json() if response.content else {}
if response.status_code == 400:
raise KixagoAPIError(
f"Invalid wallet address: {error_data.get('error', 'Unknown error')}",
status_code=400,
response_data=error_data
)
elif response.status_code == 401:
raise KixagoAPIError(
"Invalid API key - check your credentials",
status_code=401,
response_data=error_data
)
elif response.status_code == 429:
retry_after = response.headers.get('Retry-After', '5')
raise KixagoAPIError(
f"Rate limit exceeded - retry after {retry_after}s",
status_code=429,
response_data=error_data
)
elif response.status_code == 500:
raise KixagoAPIError(
"API server error - try again later",
status_code=500,
response_data=error_data
)
else:
raise KixagoAPIError(
error_data.get('error', f"HTTP {response.status_code}"),
status_code=response.status_code,
response_data=error_data
)
# Parse response
data = response.json()
# Check for partial failures
if data.get('aggregation_errors'):
print(f"⚠️ Warning: Some protocols failed: {data['aggregation_errors']}")
return RiskProfileResponse(**data)
except requests.Timeout:
raise KixagoAPIError("Request timed out after 30 seconds")
except requests.ConnectionError:
raise KixagoAPIError("Connection failed - check your network")
except requests.RequestException as e:
raise KixagoAPIError(f"Network error: {str(e)}")
# Usage
try:
profile = get_risk_profile_safe('0xInvalidAddress')
print(f"Score: {profile.defi_score.defi_score}")
except KixagoAPIError as e:
print(f"API Error ({e.status_code}): {e}")
if e.status_code == 429:
# Handle rate limiting
print("Waiting before retry...")
time.sleep(5)
Caching Implementation
Example 4: Simple In-Memory Cache
# simple_cache.py
import time
from typing import Optional, TypeVar, Generic
T = TypeVar('T')
class SimpleCache(Generic[T]):
"""Simple in-memory cache with TTL."""
def __init__(self, ttl_seconds: int = 30):
self.ttl_seconds = ttl_seconds
self._cache: dict[str, tuple[T, float]] = {}
def get(self, key: str) -> Optional[T]:
"""Get cached value if not expired."""
if key not in self._cache:
return None
value, timestamp = self._cache[key]
age = time.time() - timestamp
if age > self.ttl_seconds:
del self._cache[key]
return None
return value
def set(self, key: str, value: T) -> None:
"""Cache a value with current timestamp."""
self._cache[key] = (value, time.time())
def clear(self) -> None:
"""Clear all cached values."""
self._cache.clear()
# Usage
cache = SimpleCache[RiskProfileResponse](ttl_seconds=30)
def get_cached_risk_profile(wallet_address: str) -> RiskProfileResponse:
"""Get risk profile with caching."""
# Check cache first
cached = cache.get(wallet_address)
if cached:
print("✅ Cache HIT")
return cached
print("❌ Cache MISS - fetching from API")
# Fetch from API
fresh = get_risk_profile_safe(wallet_address)
# Cache the result
cache.set(wallet_address, fresh)
return fresh
# Usage
profile1 = get_cached_risk_profile('0xf0bb...') # API call
profile2 = get_cached_risk_profile('0xf0bb...') # Cache hit (fast!)
Example 5: Redis Cache (Production)
# redis_cache.py
import json
import redis
from typing import Optional
class RedisCache:
"""Redis-backed cache for production use."""
def __init__(
self,
host: str = 'localhost',
port: int = 6379,
password: Optional[str] = None,
ttl_seconds: int = 30
):
self.ttl_seconds = ttl_seconds
self.redis = redis.Redis(
host=host,
port=port,
password=password,
decode_responses=True
)
def get(self, key: str) -> Optional[dict]:
"""Get cached value from Redis."""
value = self.redis.get(f"kixago:profile:{key}")
return json.loads(value) if value else None
def set(self, key: str, value: dict) -> None:
"""Cache a value in Redis with TTL."""
self.redis.setex(
f"kixago:profile:{key}",
self.ttl_seconds,
json.dumps(value)
)
# Usage
redis_cache = RedisCache(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', '6379')),
password=os.getenv('REDIS_PASSWORD')
)
def get_cached_risk_profile_redis(wallet_address: str) -> RiskProfileResponse:
"""Get risk profile with Redis caching."""
# Try cache first
cached = redis_cache.get(wallet_address)
if cached:
print("✅ Redis cache HIT")
return RiskProfileResponse(**cached)
print("❌ Redis cache MISS")
# Fetch from API
fresh = get_risk_profile_safe(wallet_address)
# Cache the result
redis_cache.set(wallet_address, fresh.dict())
return fresh
Real-World Use Cases
Example 6: Credit Underwriting System
# underwriting.py
from enum import Enum
from dataclasses import dataclass
from typing import Optional, List
class Decision(Enum):
APPROVED = "APPROVED"
DECLINED = "DECLINED"
MANUAL_REVIEW = "MANUAL_REVIEW"
@dataclass
class UnderwritingDecision:
decision: Decision
reason: str
defi_score: Optional[int] = None
risk_category: Optional[str] = None
max_loan_amount: Optional[float] = None
conditions: Optional[List[str]] = None
def underwrite_borrower(
wallet_address: str,
requested_loan_amount: float
) -> UnderwritingDecision:
"""
Underwrite a borrower based on their DeFi credit profile.
Args:
wallet_address: Ethereum address
requested_loan_amount: Loan amount in USD
Returns:
UnderwritingDecision with approval status and reasoning
"""
try:
profile = get_cached_risk_profile(wallet_address)
# Check if wallet has DeFi history
if not profile.defi_score:
return UnderwritingDecision(
decision=Decision.DECLINED,
reason="No DeFi lending history found"
)
defi_score = profile.defi_score.defi_score
risk_category = profile.defi_score.risk_category
health_factor = profile.global_health_factor
collateral = profile.total_collateral_usd
# Rule 1: Minimum credit score
if defi_score < 550:
return UnderwritingDecision(
decision=Decision.DECLINED,
reason=f"DeFi credit score too low: {defi_score}",
defi_score=defi_score
)
# Rule 2: Health factor requirement
if 0 < health_factor < 1.5:
return UnderwritingDecision(
decision=Decision.DECLINED,
reason=f"Health factor too low: {health_factor:.2f} (minimum 1.5)",
defi_score=defi_score
)
# Rule 3: Collateral requirement (must have 2x loan amount)
min_collateral = requested_loan_amount * 2
if collateral < min_collateral:
return UnderwritingDecision(
decision=Decision.DECLINED,
reason=f"Insufficient collateral. Need ${min_collateral:,.0f}, have ${collateral:,.0f}",
defi_score=defi_score
)
# Rule 4: Risk category checks
if risk_category == 'URGENT_ACTION_REQUIRED':
return UnderwritingDecision(
decision=Decision.DECLINED,
reason="Critical risk factors detected - imminent liquidation risk",
defi_score=defi_score,
risk_category=risk_category
)
if risk_category == 'HIGH_RISK':
return UnderwritingDecision(
decision=Decision.MANUAL_REVIEW,
reason="High risk category - requires underwriter review",
defi_score=defi_score,
risk_category=risk_category,
conditions=profile.defi_score.recommendations.immediate
)
# Calculate max loan amount (50% of collateral)
max_loan = collateral * 0.5
if requested_loan_amount > max_loan:
return UnderwritingDecision(
decision=Decision.MANUAL_REVIEW,
reason=f"Requested amount exceeds max (50% of collateral)",
defi_score=defi_score,
max_loan_amount=max_loan
)
# APPROVED!
return UnderwritingDecision(
decision=Decision.APPROVED,
reason=f"Strong DeFi profile - Score {defi_score}, Risk: {risk_category}",
defi_score=defi_score,
risk_category=risk_category,
max_loan_amount=max_loan,
conditions=[]
)
except KixagoAPIError as e:
print(f"Underwriting error: {e}")
return UnderwritingDecision(
decision=Decision.MANUAL_REVIEW,
reason="Error fetching DeFi profile - manual review required"
)
# Usage
decision = underwrite_borrower(
'0xf0bb20865277aBd641a307eCe5Ee04E79073416C',
500_000 # $500K loan request
)
print(f"Decision: {decision.decision.value}")
print(f"Reason: {decision.reason}")
if decision.decision == Decision.APPROVED:
print(f"Max loan amount: ${decision.max_loan_amount:,.0f}")
Example 7: Liquidation Monitoring Bot
# liquidation_monitor.py
from dataclasses import dataclass
from typing import List, Literal
@dataclass
class LiquidationAlert:
wallet: str
collateral_usd: float
health_factor: float
buffer_percentage: float
urgency: Literal['CRITICAL', 'HIGH', 'MEDIUM']
message: str
estimated_loss: Optional[str] = None
def monitor_liquidation_risk(wallet_addresses: List[str]) -> List[LiquidationAlert]:
"""
Monitor multiple wallets for liquidation risk.
Args:
wallet_addresses: List of Ethereum addresses to monitor
Returns:
List of alerts sorted by urgency (CRITICAL first)
"""
alerts: List[LiquidationAlert] = []
for address in wallet_addresses:
try:
profile = get_cached_risk_profile(address)
# Skip if no positions
if not profile.defi_score or profile.total_collateral_usd == 0:
continue
sim = profile.defi_score.liquidation_simulation
buffer = sim.buffer_percentage
health = sim.current_health_factor
# Check for liquidation risk
if buffer < 5:
# CRITICAL: Liquidation imminent
liquidated = next(
(s for s in sim.scenarios if s.status == 'LIQUIDATED'),
None
)
alerts.append(LiquidationAlert(
wallet=address,
collateral_usd=profile.total_collateral_usd,
health_factor=health,
buffer_percentage=buffer,
urgency='CRITICAL',
message=f"🚨 CRITICAL: Only {buffer:.1f}% buffer remaining! Liquidation imminent!",
estimated_loss=liquidated.estimated_loss if liquidated else None
))
elif buffer < 10:
# HIGH: Vulnerable to normal volatility
alerts.append(LiquidationAlert(
wallet=address,
collateral_usd=profile.total_collateral_usd,
health_factor=health,
buffer_percentage=buffer,
urgency='HIGH',
message=f"⚠️ HIGH RISK: {buffer:.1f}% buffer - vulnerable to normal market volatility"
))
elif buffer < 20:
# MEDIUM: Monitor closely
alerts.append(LiquidationAlert(
wallet=address,
collateral_usd=profile.total_collateral_usd,
health_factor=health,
buffer_percentage=buffer,
urgency='MEDIUM',
message=f"⚡ WARNING: {buffer:.1f}% buffer - monitor closely during volatility"
))
except KixagoAPIError as e:
print(f"Error monitoring {address}: {e}")
continue
# Sort by urgency
urgency_order = {'CRITICAL': 0, 'HIGH': 1, 'MEDIUM': 2}
alerts.sort(key=lambda a: urgency_order[a.urgency])
return alerts
# Usage
whale_wallets = [
'0xf0bb20865277aBd641a307eCe5Ee04E79073416C',
'0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb',
# ... more wallets
]
alerts = monitor_liquidation_risk(whale_wallets)
# Send notifications for critical alerts
for alert in alerts:
print(alert.message)
print(f" Wallet: {alert.wallet}")
print(f" Collateral at risk: ${alert.collateral_usd:,.0f}")
print(f" Health Factor: {alert.health_factor:.3f}")
if alert.urgency == 'CRITICAL':
# Send SMS/email alert
send_critical_alert(alert)
Example 8: Portfolio Dashboard
# portfolio_dashboard.py
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class PortfolioSummary:
wallet: str
score: int
risk_level: str
total_aum: float
net_worth: float
health_factor: float
ltv: float
positions_count: int
chains: List[str]
protocols: List[str]
at_risk: bool
top_recommendations: List[str]
def generate_portfolio_summary(wallet_address: str) -> Optional[PortfolioSummary]:
"""
Generate a summary dashboard for a wallet.
Args:
wallet_address: Ethereum address
Returns:
PortfolioSummary or None if no positions
"""
profile = get_cached_risk_profile(wallet_address)
if not profile.defi_score:
return None
# Extract unique chains and protocols
chains = list(set(p.chain for p in profile.lending_positions))
protocols = list(set(p.protocol for p in profile.lending_positions))
# Calculate net worth
net_worth = profile.total_collateral_usd - profile.total_borrowed_usd
return PortfolioSummary(
wallet=wallet_address,
score=profile.defi_score.defi_score,
risk_level=profile.defi_score.risk_level,
total_aum=profile.total_collateral_usd,
net_worth=net_worth,
health_factor=profile.global_health_factor,
ltv=profile.global_ltv,
positions_count=len(profile.lending_positions),
chains=chains,
protocols=protocols,
at_risk=profile.positions_at_risk_count > 0,
top_recommendations=profile.defi_score.recommendations.immediate
)
# Usage
summary = generate_portfolio_summary('0xf0bb20865277aBd641a307eCe5Ee04E79073416C')
if summary:
print("=== PORTFOLIO SUMMARY ===")
print(f"DeFi Score: {summary.score} ({summary.risk_level})")
print(f"Total AUM: ${summary.total_aum:,.0f}")
print(f"Net Worth: ${summary.net_worth:,.0f}")
print(f"Health Factor: {summary.health_factor:.2f}")
print(f"LTV: {summary.ltv:.1f}%")
print(f"Positions: {summary.positions_count} across {', '.join(summary.chains)}")
print(f"Protocols: {', '.join(summary.protocols)}")
if summary.at_risk:
print("\n⚠️ POSITIONS AT RISK:")
for rec in summary.top_recommendations:
print(f" - {rec}")
Advanced Patterns
Example 9: Retry Logic with Exponential Backoff
# retry_logic.py
import time
from typing import TypeVar, Callable
from functools import wraps
T = TypeVar('T')
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
backoff_factor: float = 2.0
):
"""
Decorator for retrying functions with exponential backoff.
Args:
max_retries: Maximum number of retry attempts
base_delay: Initial delay in seconds
backoff_factor: Multiplier for each retry (exponential)
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args, **kwargs) -> T:
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except KixagoAPIError as e:
is_last_attempt = attempt == max_retries - 1
# Don't retry on 4xx errors (except 429)
if e.status_code and 400 <= e.status_code < 500 and e.status_code != 429:
raise
if is_last_attempt:
raise
# Exponential backoff: 1s, 2s, 4s, 8s...
delay = base_delay * (backoff_factor ** attempt)
print(f"Retry {attempt + 1}/{max_retries} after {delay}s...")
time.sleep(delay)
raise Exception("Max retries exceeded")
return wrapper
return decorator
# Usage
@retry_with_backoff(max_retries=3, base_delay=1.0)
def get_risk_profile_with_retry(wallet_address: str) -> RiskProfileResponse:
return get_risk_profile_safe(wallet_address)
# Usage
profile = get_risk_profile_with_retry('0xf0bb...')
Example 10: Async Batch Processing
# async_batch.py
import asyncio
import aiohttp
from typing import List
from models import RiskProfileResponse
class AsyncKixagoClient:
"""Async client for concurrent requests."""
def __init__(self, api_key: str, base_url: str = 'https://api.kixago.com'):
self.api_key = api_key
self.base_url = base_url
async def get_risk_profile(
self,
session: aiohttp.ClientSession,
wallet_address: str
) -> RiskProfileResponse:
"""Fetch risk profile asynchronously."""
async with session.get(
f"{self.base_url}/v1/risk-profile/{wallet_address}",
headers={'X-API-Key': self.api_key}
) as response:
response.raise_for_status()
data = await response.json()
return RiskProfileResponse(**data)
async def get_many(self, wallet_addresses: List[str]) -> List[RiskProfileResponse]:
"""Fetch multiple profiles concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [
self.get_risk_profile(session, address)
for address in wallet_addresses
]
return await asyncio.gather(*tasks, return_exceptions=True)
# Usage
async def main():
client = AsyncKixagoClient(os.getenv('KIXAGO_API_KEY'))
wallets = [
'0xWallet1...',
'0xWallet2...',
# ... 100 wallets
]
# Fetch all concurrently
results = await client.get_many(wallets)
# Filter out errors
successful = [r for r in results if isinstance(r, RiskProfileResponse)]
errors = [r for r in results if isinstance(r, Exception)]
print(f"Successful: {len(successful)}")
print(f"Errors: {len(errors)}")
# Run
asyncio.run(main())
Example 11: Rate-Limited Batch Processing
# rate_limited_batch.py
import time
from typing import List, Callable, TypeVar
from queue import Queue
from threading import Thread
T = TypeVar('T')
class RateLimiter:
"""Rate limiter for API calls."""
def __init__(self, requests_per_second: int = 10):
self.requests_per_second = requests_per_second
self.delay = 1.0 / requests_per_second
self.queue: Queue = Queue()
self.results: List = []
self.worker = Thread(target=self._process_queue, daemon=True)
self.worker.start()
def add(self, func: Callable[[], T]) -> None:
"""Add a function to the rate-limited queue."""
self.queue.put(func)
def _process_queue(self):
"""Process queued functions with rate limiting."""
while True:
func = self.queue.get()
try:
result = func()
self.results.append(result)
except Exception as e:
self.results.append(e)
time.sleep(self.delay)
self.queue.task_done()
def wait(self):
"""Wait for all queued functions to complete."""
self.queue.join()
# Usage
def batch_process_wallets(wallets: List[str]) -> List[RiskProfileResponse]:
"""Process multiple wallets with rate limiting."""
limiter = RateLimiter(requests_per_second=10)
for wallet in wallets:
limiter.add(lambda w=wallet: get_cached_risk_profile(w))
limiter.wait()
# Filter out errors
results = [r for r in limiter.results if isinstance(r, RiskProfileResponse)]
errors = [r for r in limiter.results if isinstance(r, Exception)]
print(f"Processed {len(results)} wallets, {len(errors)} errors")
return results
# Usage
wallets = ['0xWallet1...', '0xWallet2...', ...] # 100 wallets
results = batch_process_wallets(wallets)
FastAPI Integration
Example 12: Production FastAPI Server
# main.py
from fastapi import FastAPI, HTTPException, Header
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional, List
app = FastAPI(
title="Kixago Proxy API",
description="Production API for DeFi credit intelligence",
version="1.0.0"
)
# Initialize client
kixago_client = KixagoClient(os.getenv('KIXAGO_API_KEY'))
redis_cache = RedisCache()
# Request/Response models
class UnderwriteRequest(BaseModel):
wallet_address: str
loan_amount: float
class MonitorRequest(BaseModel):
wallets: List[str]
# Endpoints
@app.get("(/docs/api/wallet/{address}", response_model=RiskProfileResponse)
async def get_wallet_profile(address: str):
"""Get complete risk profile for a wallet."""
try:
# Try cache first
cached = redis_cache.get(address)
if cached:
return JSONResponse(content={**cached, "cached": True})
# Fetch from API
profile = kixago_client.get_risk_profile(address)
# Cache result
redis_cache.set(address, profile.dict())
return profile
except KixagoAPIError as e:
raise HTTPException(status_code=e.status_code or 500, detail=str(e))
@app.post("(/docs/api/underwrite")
async def underwrite(request: UnderwriteRequest):
"""Underwrite a borrower."""
try:
decision = underwrite_borrower(
request.wallet_address,
request.loan_amount
)
return decision
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("(/docs/api/monitor")
async def monitor(request: MonitorRequest):
"""Monitor multiple wallets for liquidation risk."""
try:
alerts = monitor_liquidation_risk(request.wallets)
return {"alerts": alerts, "count": len(alerts)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "ok"}
# Run with: uvicorn main:app --reload
Testing
Example 13: Unit Tests with pytest
# test_client.py
import pytest
from unittest.mock import Mock, patch
from models import RiskProfileResponse
from client import KixagoClient
@pytest.fixture
def mock_response():
"""Mock API response."""
return {
"wallet_address": "0xTest...",
"total_collateral_usd": 100000.0,
"total_borrowed_usd": 30000.0,
"global_health_factor": 3.2,
"global_ltv": 30.0,
"positions_at_risk_count": 0,
"last_updated": "2025-01-15T12:00:00Z",
"aggregation_duration": "1.234s",
"lending_positions": [],
"defi_score": {
"defi_score": 750,
"risk_level": "Very Low Risk",
"risk_category": "VERY_LOW_RISK",
"color": "green",
"score_breakdown": {
"health_factor_score": {"component_score": 100, "weight": 0.4, "weighted_contribution": 40.0, "reasoning": "Safe"},
"leverage_score": {"component_score": 100, "weight": 0.3, "weighted_contribution": 30.0, "reasoning": "Safe"},
"diversification_score": {"component_score": 20, "weight": 0.15, "weighted_contribution": 3.0, "reasoning": "Poor"},
"volatility_score": {"component_score": 100, "weight": 0.1, "weighted_contribution": 10.0, "reasoning": "Safe"},
"protocol_risk_score": {"component_score": 95, "weight": 0.05, "weighted_contribution": 4.75, "reasoning": "Safe"},
"total_internal_score": 87.75
},
"risk_factors": [],
"recommendations": {"immediate": [], "short_term": [], "long_term": []},
"liquidation_simulation": {
"current_health_factor": 3.2,
"liquidation_threshold": 1.0,
"buffer_percentage": 220.0,
"scenarios": []
},
"calculated_at": "2025-01-15T12:00:00Z"
}
}
def test_get_risk_profile_success(mock_response):
"""Test successful risk profile fetch."""
client = KixagoClient('test-api-key')
with patch('requests.Session.get') as mock_get:
mock_get.return_value.ok = True
mock_get.return_value.json.return_value = mock_response
profile = client.get_risk_profile('0xTest...')
assert profile.wallet_address == '0xTest...'
assert profile.defi_score.defi_score == 750
assert profile.global_health_factor == 3.2
def test_get_risk_profile_404():
"""Test 404 error handling."""
client = KixagoClient('test-api-key')
with patch('requests.Session.get') as mock_get:
mock_get.return_value.ok = False
mock_get.return_value.status_code = 404
mock_get.return_value.json.return_value = {"error": "Wallet not found"}
with pytest.raises(Exception):
client.get_risk_profile('0xInvalid...')
def test_underwriting_approved(mock_response):
"""Test underwriting approval."""
with patch('get_cached_risk_profile') as mock_get:
mock_get.return_value = RiskProfileResponse(**mock_response)
decision = underwrite_borrower('0xTest...', 10000)
assert decision.decision == Decision.APPROVED
assert decision.defi_score == 750
def test_underwriting_declined_low_score(mock_response):
"""Test underwriting decline due to low score."""
mock_response['defi_score']['defi_score'] = 400
with patch('get_cached_risk_profile') as mock_get:
mock_get.return_value = RiskProfileResponse(**mock_response)
decision = underwrite_borrower('0xTest...', 10000)
assert decision.decision == Decision.DECLINED
assert 'too low' in decision.reason.lower()
# Run with: pytest test_client.py -v
Best Practices
✅ DO
- Use type hints with Pydantic for data validation
- Cache responses for at least 30 seconds
- Handle errors gracefully with try/except blocks
- Implement retry logic for transient failures
- Use async/await for concurrent requests
- Check
aggregation_errorsfor partial failures - Validate inputs before sending requests
- Use environment variables for API keys
- Close sessions when done (or use context managers)
❌ DON'T
- Don't expose API keys in code repositories
- Don't spam the API - respect rate limits
- Don't ignore
defi_score: None- handle wallets with no positions - Don't assume all fields exist - use
Optionaltypes - Don't refetch every second - use the 30s cache
- Don't hardcode URLs - use configuration
Next Steps
Need Help?
- Code not working? Email [email protected] with code snippet
- Want a Python SDK? Coming Q2 2026 - watch our GitHub
- Found a bug? Report it
---