#!/usr/bin/env python3 """ Proxy STDIO vers HTTP pour Claude Desktop Transfert simple des messages MCP STDIO vers le serveur HTTP Carto-SI """ import json import logging import os import requests import sys # Configuration BASE_URL = os.getenv('MCP_BASE_URL', 'http://localhost:9327/api/mcp') AUTH_TOKEN = os.getenv('MCP_AUTH_TOKEN') TENANT = os.getenv('MCP_TENANT') # Logging vers stderr logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr ) logger = logging.getLogger(__name__) def create_headers(): """Créer les headers HTTP avec authentification Carto-SI""" headers = { 'Content-Type': 'application/json', 'User-Agent': 'Claude-Desktop-MCP-Proxy/1.0' } if AUTH_TOKEN: auth_payload = { "token": AUTH_TOKEN, "myTenant": {"id": TENANT} } headers['Authorization'] = f'Bearer {json.dumps(auth_payload)}' return headers def route_request(request): """Router la requête vers le bon endpoint""" method = request.get('method', '') # Map des méthodes vers les endpoints routes = { 'initialize': '/initialize', 'tools/list': '/tools/list', 'tools/call': '/tools/call', 'resources/list': '/resources/list', 'resources/read': '/resources/read', 'resources/templates/list': '/resources/templates/list', 'prompts/list': '/prompts/list', 'prompts/get': '/prompts/get', } # Notifications (pas de réponse) if method in ['notifications/initialized', 'initialized']: return None endpoint = routes.get(method) if not endpoint: logger.warning(f"Unknown method: {method}") return None return endpoint def main(): """Boucle principale du proxy""" logger.info(f"MCP Proxy started - Forwarding to {BASE_URL}") try: for line in sys.stdin: line = line.strip() if not line: continue # Parser la requête JSON-RPC request = json.loads(line) # Router vers le bon endpoint endpoint = route_request(request) if endpoint is None: continue # Transférer la requête au serveur response = requests.post( f"{BASE_URL}{endpoint}", json=request, headers=create_headers(), timeout=60 ) # Retourner la réponse telle quelle print(response.text) sys.stdout.flush() except KeyboardInterrupt: logger.info("Stopped") except Exception as e: logger.error(f"Error: {e}") sys.exit(1) if __name__ == '__main__': main()