BioGuideMCP / server.py
stefanjwojcik's picture
Add setup script and comprehensive tests for Congressional Bioguide MCP Server
15de73a
#!/usr/bin/env python3
"""
MCP Server for Congressional Bioguide profiles.
Provides SQL queries and semantic search capabilities.
"""
import sys
import sqlite3
import json
import os
import warnings
from typing import List, Dict, Any, Optional
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
import pickle
from pathlib import Path
from mcp.server import Server
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
import mcp.server.stdio
# Suppress all warnings to prevent JSON protocol corruption
warnings.filterwarnings('ignore')
os.environ['TOKENIZERS_PARALLELISM'] = 'false'
# Initialize global resources - use absolute paths
SCRIPT_DIR = Path(__file__).parent.absolute()
DB_PATH = str(SCRIPT_DIR / "congress.db")
FAISS_INDEX_PATH = str(SCRIPT_DIR / "congress_faiss.index")
BIO_IDS_PATH = str(SCRIPT_DIR / "congress_bio_ids.pkl")
# Load FAISS index and model
model = None
faiss_index = None
bio_id_mapping = None
def initialize_search_index():
"""Initialize the semantic search components."""
global model, faiss_index, bio_id_mapping
try:
if Path(FAISS_INDEX_PATH).exists() and Path(BIO_IDS_PATH).exists():
print(f"Loading FAISS index from: {FAISS_INDEX_PATH}", file=sys.stderr, flush=True)
model = SentenceTransformer('all-MiniLM-L6-v2')
faiss_index = faiss.read_index(FAISS_INDEX_PATH)
with open(BIO_IDS_PATH, "rb") as f:
bio_id_mapping = pickle.load(f)
print(f"✓ Loaded {faiss_index.ntotal} embeddings", file=sys.stderr, flush=True)
return True
else:
print(f"FAISS index not found at: {FAISS_INDEX_PATH}", file=sys.stderr, flush=True)
print(f"Bio IDs not found at: {BIO_IDS_PATH}", file=sys.stderr, flush=True)
return False
except Exception as e:
print(f"Error loading search index: {e}", file=sys.stderr, flush=True)
return False
def get_db_connection():
"""Get a database connection."""
return sqlite3.connect(DB_PATH)
def execute_query(query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""Execute a SQL query and return results as list of dicts."""
conn = get_db_connection()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(query, params)
results = [dict(row) for row in cursor.fetchall()]
conn.close()
return results
def format_member_concise(member: Dict[str, Any]) -> Dict[str, Any]:
"""Format member data to concise output with only essential fields."""
return {
'bio_id': member.get('bio_id'),
'name': f"{member.get('given_name', '')} {member.get('middle_name', '') + ' ' if member.get('middle_name') else ''}{member.get('family_name', '')}".strip(),
'birth_date': member.get('birth_date'),
'death_date': member.get('death_date'),
'party': member.get('party'),
'state': member.get('region_code'),
'position': member.get('job_name'),
'congress': member.get('congress_number')
}
def get_member_profile(bio_id: str) -> Optional[Dict[str, Any]]:
"""Get complete profile for a member including all related data."""
conn = get_db_connection()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get member data
cursor.execute("SELECT * FROM members WHERE bio_id = ?", (bio_id,))
member = cursor.fetchone()
if not member:
conn.close()
return None
profile = dict(member)
# Get images
cursor.execute("SELECT * FROM images WHERE bio_id = ?", (bio_id,))
profile['images'] = [dict(row) for row in cursor.fetchall()]
# Get job positions
cursor.execute("SELECT * FROM job_positions WHERE bio_id = ? ORDER BY start_date", (bio_id,))
profile['job_positions'] = [dict(row) for row in cursor.fetchall()]
# Get relationships
cursor.execute("SELECT * FROM relationships WHERE bio_id = ?", (bio_id,))
profile['relationships'] = [dict(row) for row in cursor.fetchall()]
# Get creative works
cursor.execute("SELECT * FROM creative_works WHERE bio_id = ?", (bio_id,))
profile['creative_works'] = [dict(row) for row in cursor.fetchall()]
# Get assets
cursor.execute("SELECT * FROM assets WHERE bio_id = ?", (bio_id,))
profile['assets'] = [dict(row) for row in cursor.fetchall()]
conn.close()
return profile
def semantic_search(query_text: str, top_k: int = 10) -> List[str]:
"""Perform semantic search and return matching bio_ids."""
if not all([model, faiss_index, bio_id_mapping]):
raise ValueError("Search index not initialized. Run ingest_data.py first.")
# Encode query
query_embedding = model.encode([query_text])[0].astype('float32')
query_embedding = query_embedding.reshape(1, -1)
# Normalize for cosine similarity
faiss.normalize_L2(query_embedding)
# Search
scores, indices = faiss_index.search(query_embedding, top_k)
# Map indices to bio_ids
results = []
for idx, score in zip(indices[0], scores[0]):
if idx < len(bio_id_mapping):
results.append({
'bio_id': bio_id_mapping[idx],
'similarity_score': float(score)
})
return results
# Initialize MCP server
server = Server("congressional-bioguide")
@server.list_tools()
async def list_tools() -> List[Tool]:
"""List all available tools."""
return [
Tool(
name="search_by_name",
description="Search for Congressional members by name. Returns concise results (name, dates, party, congress) by default.",
inputSchema={
"type": "object",
"properties": {
"family_name": {
"type": "string",
"description": "Family/last name to search for (partial match)"
},
"given_name": {
"type": "string",
"description": "Given/first name to search for (partial match)"
},
"full_name": {
"type": "string",
"description": "Full name to search for (partial match in any name field)"
},
"limit": {
"type": "integer",
"description": "Maximum results to return (default: 50)",
"default": 50
},
"return_full_profile": {
"type": "boolean",
"description": "Return full profile data including biography (default: false)",
"default": False
}
}
}
),
Tool(
name="search_by_party",
description="Search for Congressional members by political party affiliation.",
inputSchema={
"type": "object",
"properties": {
"party": {
"type": "string",
"description": "Party name (e.g., 'Republican', 'Democrat', 'Whig')"
},
"congress_number": {
"type": "integer",
"description": "Optional: Filter by specific Congress number (e.g., 117)"
}
},
"required": ["party"]
}
),
Tool(
name="search_by_state",
description="Search for Congressional members by state or region they represented.",
inputSchema={
"type": "object",
"properties": {
"state_code": {
"type": "string",
"description": "State code (e.g., 'CA', 'NY', 'TX')"
},
"congress_number": {
"type": "integer",
"description": "Optional: Filter by specific Congress number"
}
},
"required": ["state_code"]
}
),
Tool(
name="search_by_congress",
description="Get all members who served in a specific Congress.",
inputSchema={
"type": "object",
"properties": {
"congress_number": {
"type": "integer",
"description": "Congress number (e.g., 117 for the 117th Congress)"
},
"chamber": {
"type": "string",
"description": "Optional: Filter by chamber ('Representative' or 'Senator')"
}
},
"required": ["congress_number"]
}
),
Tool(
name="search_by_date_range",
description="Search for members who served during a specific date range.",
inputSchema={
"type": "object",
"properties": {
"start_date": {
"type": "string",
"description": "Start date in YYYY-MM-DD format"
},
"end_date": {
"type": "string",
"description": "End date in YYYY-MM-DD format"
}
},
"required": ["start_date", "end_date"]
}
),
Tool(
name="semantic_search_biography",
description="Perform semantic search on member biographies. Use natural language to find members based on career details, accomplishments, background, etc.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Natural language query to search biographies (e.g., 'lawyers who became judges', 'Civil War veterans')"
},
"top_k": {
"type": "integer",
"description": "Number of results to return (default: 10)",
"default": 5
}
},
"required": ["query"]
}
),
Tool(
name="get_member_profile",
description="Get complete profile information for a specific member by their Bioguide ID.",
inputSchema={
"type": "object",
"properties": {
"bio_id": {
"type": "string",
"description": "Bioguide ID (e.g., 'W000374', 'P000144')"
}
},
"required": ["bio_id"]
}
),
Tool(
name="execute_sql_query",
description="Execute a custom SQL query against the Congressional database. Use for complex queries not covered by other tools. READ-ONLY access.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL SELECT query to execute"
}
},
"required": ["query"]
}
),
Tool(
name="get_database_schema",
description="Get the database schema showing all tables and columns available for querying.",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="search_by_relationship",
description="Find members who have family relationships with other members (e.g., father, son, spouse).",
inputSchema={
"type": "object",
"properties": {
"relationship_type": {
"type": "string",
"description": "Type of relationship (e.g., 'father', 'son', 'spouse', 'brother')"
}
}
}
),
Tool(
name="search_biography_regex",
description="Search member biographies using regex patterns. Returns concise member info (name, dates, party, state) for matches. Use filters to narrow results.",
inputSchema={
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Regex pattern to search for in biographies (e.g., 'Harvard', 'lawyer', 'served.*army', 'born in [0-9]{4}')"
},
"case_sensitive": {
"type": "boolean",
"description": "Whether search should be case-sensitive (default: false)",
"default": False
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return (default: 5)",
"default": 5
},
"filter_party": {
"type": "string",
"description": "Optional: Filter results by party (e.g., 'Republican', 'Democrat')"
},
"filter_state": {
"type": "string",
"description": "Optional: Filter results by state code (e.g., 'CA', 'NY')"
},
"filter_congress": {
"type": "integer",
"description": "Optional: Filter results by Congress number (e.g., 117)"
},
"return_full_profile": {
"type": "boolean",
"description": "Return full profile including biography text (default: false)",
"default": False
}
},
"required": ["pattern"]
}
),
Tool(
name="count_members",
description="Count members matching specific criteria. Returns aggregated counts by party, state, position, or custom grouping. Much more efficient than returning full member lists.",
inputSchema={
"type": "object",
"properties": {
"group_by": {
"type": "string",
"description": "Field to group by: 'party', 'state', 'position', 'congress', or 'year'",
"enum": ["party", "state", "position", "congress", "year"]
},
"filter_party": {
"type": "string",
"description": "Optional: Filter by party name"
},
"filter_state": {
"type": "string",
"description": "Optional: Filter by state code"
},
"filter_congress": {
"type": "integer",
"description": "Optional: Filter by Congress number"
},
"filter_position": {
"type": "string",
"description": "Optional: Filter by position (Representative, Senator)"
},
"date_range_start": {
"type": "string",
"description": "Optional: Start date (YYYY-MM-DD)"
},
"date_range_end": {
"type": "string",
"description": "Optional: End date (YYYY-MM-DD)"
}
},
"required": ["group_by"]
}
),
Tool(
name="temporal_analysis",
description="Analyze member trends over time. Shows how membership changed across years, decades, or congresses. Perfect for historical analysis.",
inputSchema={
"type": "object",
"properties": {
"analysis_type": {
"type": "string",
"description": "Type of temporal analysis",
"enum": ["party_over_time", "state_representation", "position_counts", "demographics"]
},
"time_unit": {
"type": "string",
"description": "Time granularity: 'congress', 'year', 'decade'",
"enum": ["congress", "year", "decade"],
"default": "congress"
},
"start_date": {
"type": "string",
"description": "Optional: Start date (YYYY-MM-DD)"
},
"end_date": {
"type": "string",
"description": "Optional: End date (YYYY-MM-DD)"
},
"filter_party": {
"type": "string",
"description": "Optional: Filter to specific party"
},
"filter_state": {
"type": "string",
"description": "Optional: Filter to specific state"
}
},
"required": ["analysis_type"]
}
),
Tool(
name="count_by_biography_content",
description="Count members whose biographies mention specific keywords or phrases (e.g., 'Harvard', 'lawyer', 'Civil War'). Much more efficient than searching when you only need counts.",
inputSchema={
"type": "object",
"properties": {
"keywords": {
"type": "array",
"items": {"type": "string"},
"description": "List of keywords or phrases to search for (case-insensitive)"
},
"match_all": {
"type": "boolean",
"description": "If true, count members matching ALL keywords. If false, count members matching ANY keyword (default: false)",
"default": False
},
"breakdown_by": {
"type": "string",
"description": "Optional: Break down counts by party, state, position, or congress",
"enum": ["party", "state", "position", "congress", "none"],
"default": "none"
},
"filter_party": {
"type": "string",
"description": "Optional: Only count members from specific party"
},
"filter_state": {
"type": "string",
"description": "Optional: Only count members from specific state"
}
},
"required": ["keywords"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: Any) -> List[TextContent]:
"""Handle tool calls."""
try:
if name == "search_by_name":
family_name = arguments.get("family_name")
given_name = arguments.get("given_name")
full_name = arguments.get("full_name")
limit = arguments.get("limit", 50)
return_full = arguments.get("return_full_profile", False)
conditions = []
params = []
if family_name:
conditions.append("LOWER(m.unaccented_family_name) LIKE LOWER(?)")
params.append(f"%{family_name}%")
if given_name:
conditions.append("LOWER(m.unaccented_given_name) LIKE LOWER(?)")
params.append(f"%{given_name}%")
if full_name:
conditions.append("""(LOWER(m.unaccented_family_name) LIKE LOWER(?)
OR LOWER(m.unaccented_given_name) LIKE LOWER(?)
OR LOWER(m.unaccented_middle_name) LIKE LOWER(?))""")
params.extend([f"%{full_name}%"] * 3)
if not conditions:
return [TextContent(type="text", text="Please provide at least one name parameter.")]
if return_full:
query = f"SELECT * FROM members m WHERE {' AND '.join(conditions)} ORDER BY m.family_name, m.given_name LIMIT ?"
params.append(limit)
results = execute_query(query, tuple(params))
else:
# Return concise results with job info
query = f"""
SELECT DISTINCT m.bio_id, m.given_name, m.middle_name, m.family_name,
m.birth_date, m.death_date,
j.party, j.region_code, j.job_name, j.congress_number
FROM members m
LEFT JOIN job_positions j ON m.bio_id = j.bio_id
WHERE {' AND '.join(conditions)}
ORDER BY m.family_name, m.given_name
LIMIT ?
"""
params.append(limit)
results = execute_query(query, tuple(params))
results = [format_member_concise(r) for r in results]
response = {
"count": len(results),
"limit": limit,
"results": results
}
return [TextContent(type="text", text=json.dumps(response, indent=2))]
elif name == "search_by_party":
party = arguments["party"]
congress_number = arguments.get("congress_number")
if congress_number:
query = """
SELECT DISTINCT m.* FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
WHERE j.party = ? AND j.congress_number = ?
ORDER BY m.family_name, m.given_name
"""
results = execute_query(query, (party, congress_number))
else:
query = """
SELECT DISTINCT m.* FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
WHERE j.party = ?
ORDER BY m.family_name, m.given_name
"""
results = execute_query(query, (party,))
return [TextContent(type="text", text=json.dumps(results, indent=2))]
elif name == "search_by_state":
state_code = arguments["state_code"].upper()
congress_number = arguments.get("congress_number")
if congress_number:
query = """
SELECT DISTINCT m.*, j.job_name, j.party, j.congress_number
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
WHERE j.region_code = ? AND j.congress_number = ?
ORDER BY m.family_name, m.given_name
"""
results = execute_query(query, (state_code, congress_number))
else:
query = """
SELECT DISTINCT m.*, j.job_name, j.party, j.congress_number
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
WHERE j.region_code = ?
ORDER BY m.family_name, m.given_name
"""
results = execute_query(query, (state_code,))
return [TextContent(type="text", text=json.dumps(results, indent=2))]
elif name == "search_by_congress":
congress_number = arguments["congress_number"]
chamber = arguments.get("chamber")
if chamber:
query = """
SELECT DISTINCT m.*, j.job_name, j.party, j.region_code
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
WHERE j.congress_number = ? AND j.job_name = ?
ORDER BY m.family_name, m.given_name
"""
results = execute_query(query, (congress_number, chamber))
else:
query = """
SELECT DISTINCT m.*, j.job_name, j.party, j.region_code
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
WHERE j.congress_number = ?
ORDER BY m.family_name, m.given_name
"""
results = execute_query(query, (congress_number,))
return [TextContent(type="text", text=json.dumps(results, indent=2))]
elif name == "search_by_date_range":
start_date = arguments["start_date"]
end_date = arguments["end_date"]
query = """
SELECT DISTINCT m.*, j.job_name, j.start_date, j.end_date
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
WHERE (j.start_date <= ? AND (j.end_date >= ? OR j.end_date IS NULL))
ORDER BY j.start_date, m.family_name, m.given_name
"""
results = execute_query(query, (end_date, start_date))
return [TextContent(type="text", text=json.dumps(results, indent=2))]
elif name == "semantic_search_biography":
query_text = arguments["query"]
top_k = arguments.get("top_k", 10)
# Perform semantic search
search_results = semantic_search(query_text, top_k)
# Get full profiles for top results
profiles = []
for result in search_results:
profile = get_member_profile(result['bio_id'])
if profile:
profile['similarity_score'] = result['similarity_score']
profiles.append(profile)
return [TextContent(type="text", text=json.dumps(profiles, indent=2))]
elif name == "get_member_profile":
bio_id = arguments["bio_id"]
profile = get_member_profile(bio_id)
if profile:
return [TextContent(type="text", text=json.dumps(profile, indent=2))]
else:
return [TextContent(type="text", text=f"No profile found for bio_id: {bio_id}")]
elif name == "execute_sql_query":
query = arguments["query"]
# Basic security: only allow SELECT queries
if not query.strip().upper().startswith("SELECT"):
return [TextContent(type="text", text="Error: Only SELECT queries are allowed.")]
results = execute_query(query)
return [TextContent(type="text", text=json.dumps(results, indent=2))]
elif name == "get_database_schema":
schema_info = {
"tables": {
"members": {
"description": "Main table with member biographical information",
"columns": [
"bio_id (PRIMARY KEY) - Bioguide ID",
"family_name - Last name",
"given_name - First name",
"middle_name - Middle name",
"honorific_prefix - Title (Mr., Mrs., etc.)",
"unaccented_family_name - Family name without accents",
"unaccented_given_name - Given name without accents",
"unaccented_middle_name - Middle name without accents",
"birth_date - Birth date (YYYY-MM-DD)",
"birth_circa - Whether birth date is approximate (0/1)",
"death_date - Death date (YYYY-MM-DD)",
"death_circa - Whether death date is approximate (0/1)",
"profile_text - Full biography text",
"full_name - Generated full name column"
]
},
"job_positions": {
"description": "Congressional positions held by members",
"columns": [
"id (PRIMARY KEY)",
"bio_id (FOREIGN KEY) - References members",
"job_name - Position title (Representative, Senator)",
"job_type - Type of position",
"start_date - Start date of position",
"start_circa - Whether start date is approximate (0/1)",
"end_date - End date of position",
"end_circa - Whether end date is approximate (0/1)",
"congress_number - Congress number (e.g., 117)",
"congress_name - Full Congress name",
"party - Party affiliation",
"caucus - Caucus affiliation",
"region_type - Type of region represented",
"region_code - State/region code (e.g., 'CA', 'NY')",
"note - Additional notes"
]
},
"images": {
"description": "Profile images",
"columns": ["id", "bio_id", "content_url", "caption"]
},
"relationships": {
"description": "Family relationships between members",
"columns": ["id", "bio_id", "related_bio_id", "relationship_type"]
},
"creative_works": {
"description": "Publications and creative works by members",
"columns": ["id", "bio_id", "citation_text"]
},
"assets": {
"description": "Additional assets (images, documents)",
"columns": ["id", "bio_id", "name", "asset_type", "content_url",
"credit_line", "accession_number", "upload_date"]
}
},
"indexes": [
"idx_family_name - Index on unaccented_family_name",
"idx_given_name - Index on unaccented_given_name",
"idx_birth_date - Index on birth_date",
"idx_death_date - Index on death_date",
"idx_job_congress - Index on congress_number",
"idx_job_party - Index on party",
"idx_job_region - Index on region_code",
"idx_job_type - Index on job_name"
]
}
return [TextContent(type="text", text=json.dumps(schema_info, indent=2))]
elif name == "search_by_relationship":
relationship_type = arguments.get("relationship_type")
if relationship_type:
query = """
SELECT m1.bio_id, m1.family_name, m1.given_name,
r.relationship_type, r.related_bio_id,
m2.family_name as related_family_name,
m2.given_name as related_given_name
FROM members m1
JOIN relationships r ON m1.bio_id = r.bio_id
JOIN members m2 ON r.related_bio_id = m2.bio_id
WHERE r.relationship_type = ?
ORDER BY m1.family_name, m1.given_name
"""
results = execute_query(query, (relationship_type,))
else:
query = """
SELECT m1.bio_id, m1.family_name, m1.given_name,
r.relationship_type, r.related_bio_id,
m2.family_name as related_family_name,
m2.given_name as related_given_name
FROM members m1
JOIN relationships r ON m1.bio_id = r.bio_id
JOIN members m2 ON r.related_bio_id = m2.bio_id
ORDER BY m1.family_name, m1.given_name
"""
results = execute_query(query)
return [TextContent(type="text", text=json.dumps(results, indent=2))]
elif name == "search_biography_regex":
import re
pattern = arguments["pattern"]
case_sensitive = arguments.get("case_sensitive", False)
limit = arguments.get("limit", 5)
filter_party = arguments.get("filter_party")
filter_state = arguments.get("filter_state")
filter_congress = arguments.get("filter_congress")
return_full = arguments.get("return_full_profile", False)
try:
# Compile regex pattern
flags = 0 if case_sensitive else re.IGNORECASE
regex = re.compile(pattern, flags)
# Build query with optional filters
conn = get_db_connection()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Base query - join with job_positions for filtering
query = """
SELECT DISTINCT m.bio_id, m.family_name, m.given_name, m.middle_name,
m.birth_date, m.death_date, m.profile_text,
j.party, j.region_code, j.job_name, j.congress_number
FROM members m
LEFT JOIN job_positions j ON m.bio_id = j.bio_id
WHERE m.profile_text IS NOT NULL
"""
where_conditions = []
params = []
if filter_party:
where_conditions.append("j.party = ?")
params.append(filter_party)
if filter_state:
where_conditions.append("j.region_code = ?")
params.append(filter_state)
if filter_congress:
where_conditions.append("j.congress_number = ?")
params.append(filter_congress)
if where_conditions:
query += " AND " + " AND ".join(where_conditions)
cursor.execute(query, tuple(params))
# Filter using regex
matches = []
for row in cursor:
if regex.search(row['profile_text']):
if return_full:
# Return full profile
matches.append(dict(row))
else:
# Return concise info only
match_result = {
"bio_id": row['bio_id'],
"name": f"{row['given_name']} {row['middle_name'] or ''} {row['family_name']}".strip(),
"birth_date": row['birth_date'],
"death_date": row['death_date'],
"party": row['party'],
"state": row['region_code'],
"position": row['job_name'],
"congress": row['congress_number']
}
matches.append(match_result)
if len(matches) >= limit:
break
conn.close()
result = {
"pattern": pattern,
"case_sensitive": case_sensitive,
"total_members_found": len(matches),
"limit": limit,
"filters_applied": {
"party": filter_party,
"state": filter_state,
"congress": filter_congress
},
"results": matches
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
except re.error as e:
return [TextContent(type="text", text=f"Invalid regex pattern: {str(e)}")]
elif name == "count_members":
group_by = arguments["group_by"]
filter_party = arguments.get("filter_party")
filter_state = arguments.get("filter_state")
filter_congress = arguments.get("filter_congress")
filter_position = arguments.get("filter_position")
date_start = arguments.get("date_range_start")
date_end = arguments.get("date_range_end")
# Build WHERE clause
where_conditions = []
params = []
if filter_party:
where_conditions.append("j.party = ?")
params.append(filter_party)
if filter_state:
where_conditions.append("j.region_code = ?")
params.append(filter_state)
if filter_congress:
where_conditions.append("j.congress_number = ?")
params.append(filter_congress)
if filter_position:
where_conditions.append("j.job_name = ?")
params.append(filter_position)
if date_start and date_end:
where_conditions.append("(j.start_date <= ? AND (j.end_date >= ? OR j.end_date IS NULL))")
params.extend([date_end, date_start])
where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else ""
# Build GROUP BY query
if group_by == "party":
query = f"""
SELECT j.party as group_key, COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY j.party
ORDER BY count DESC
"""
elif group_by == "state":
query = f"""
SELECT j.region_code as group_key, COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY j.region_code
ORDER BY count DESC
"""
elif group_by == "position":
query = f"""
SELECT j.job_name as group_key, COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY j.job_name
ORDER BY count DESC
"""
elif group_by == "congress":
query = f"""
SELECT j.congress_number as group_key, COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY j.congress_number
ORDER BY j.congress_number
"""
elif group_by == "year":
query = f"""
SELECT SUBSTR(j.start_date, 1, 4) as group_key, COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY SUBSTR(j.start_date, 1, 4)
ORDER BY group_key
"""
results = execute_query(query, tuple(params))
total = sum(r['count'] for r in results)
response = {
"group_by": group_by,
"total_unique_members": total,
"groups": results,
"filters_applied": {
"party": filter_party,
"state": filter_state,
"congress": filter_congress,
"position": filter_position,
"date_range": [date_start, date_end] if date_start and date_end else None
}
}
return [TextContent(type="text", text=json.dumps(response, indent=2))]
elif name == "temporal_analysis":
analysis_type = arguments["analysis_type"]
time_unit = arguments.get("time_unit", "congress")
start_date = arguments.get("start_date")
end_date = arguments.get("end_date")
filter_party = arguments.get("filter_party")
filter_state = arguments.get("filter_state")
# Build WHERE clause
where_conditions = []
params = []
if start_date:
where_conditions.append("j.start_date >= ?")
params.append(start_date)
if end_date:
where_conditions.append("j.start_date <= ?")
params.append(end_date)
if filter_party:
where_conditions.append("j.party = ?")
params.append(filter_party)
if filter_state:
where_conditions.append("j.region_code = ?")
params.append(filter_state)
where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else ""
if analysis_type == "party_over_time":
if time_unit == "congress":
query = f"""
SELECT j.congress_number, j.party, COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY j.congress_number, j.party
ORDER BY j.congress_number, j.party
"""
elif time_unit == "year":
query = f"""
SELECT SUBSTR(j.start_date, 1, 4) as year, j.party, COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY year, j.party
ORDER BY year, j.party
"""
elif time_unit == "decade":
query = f"""
SELECT (CAST(SUBSTR(j.start_date, 1, 4) AS INTEGER) / 10) * 10 as decade,
j.party, COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY decade, j.party
ORDER BY decade, j.party
"""
elif analysis_type == "state_representation":
if time_unit == "congress":
query = f"""
SELECT j.congress_number, j.region_code, COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY j.congress_number, j.region_code
ORDER BY j.congress_number, count DESC
"""
else:
query = f"""
SELECT SUBSTR(j.start_date, 1, 4) as year, j.region_code, COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY year, j.region_code
ORDER BY year, count DESC
"""
elif analysis_type == "position_counts":
query = f"""
SELECT j.congress_number, j.job_name, COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY j.congress_number, j.job_name
ORDER BY j.congress_number
"""
elif analysis_type == "demographics":
# Analyze birth year distribution over time
if time_unit == "congress":
query = f"""
SELECT j.congress_number,
AVG(CAST(SUBSTR(m.birth_date, 1, 4) AS INTEGER)) as avg_birth_year,
COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY j.congress_number
ORDER BY j.congress_number
"""
else:
query = f"""
SELECT SUBSTR(j.start_date, 1, 4) as year,
AVG(CAST(SUBSTR(m.birth_date, 1, 4) AS INTEGER)) as avg_birth_year,
COUNT(DISTINCT m.bio_id) as count
FROM members m
JOIN job_positions j ON m.bio_id = j.bio_id
{where_clause}
GROUP BY year
ORDER BY year
"""
results = execute_query(query, tuple(params))
response = {
"analysis_type": analysis_type,
"time_unit": time_unit,
"data_points": len(results),
"results": results,
"filters_applied": {
"start_date": start_date,
"end_date": end_date,
"party": filter_party,
"state": filter_state
}
}
return [TextContent(type="text", text=json.dumps(response, indent=2))]
elif name == "count_by_biography_content":
keywords = arguments["keywords"]
match_all = arguments.get("match_all", False)
breakdown_by = arguments.get("breakdown_by", "none")
filter_party = arguments.get("filter_party")
filter_state = arguments.get("filter_state")
# Build the query to find matching members
conn = get_db_connection()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get all members with their job info
base_query = """
SELECT DISTINCT m.bio_id, m.profile_text,
j.party, j.region_code, j.job_name, j.congress_number
FROM members m
LEFT JOIN job_positions j ON m.bio_id = j.bio_id
WHERE m.profile_text IS NOT NULL
"""
where_conditions = []
params = []
if filter_party:
where_conditions.append("j.party = ?")
params.append(filter_party)
if filter_state:
where_conditions.append("j.region_code = ?")
params.append(filter_state)
if where_conditions:
base_query += " AND " + " AND ".join(where_conditions)
cursor.execute(base_query, tuple(params))
all_members = cursor.fetchall()
# Filter members by keywords
matching_members = []
for member in all_members:
profile_text_lower = member['profile_text'].lower() if member['profile_text'] else ""
if match_all:
# ALL keywords must be present
if all(keyword.lower() in profile_text_lower for keyword in keywords):
matching_members.append(dict(member))
else:
# ANY keyword must be present
if any(keyword.lower() in profile_text_lower for keyword in keywords):
matching_members.append(dict(member))
conn.close()
# Count total unique members
unique_bio_ids = set(m['bio_id'] for m in matching_members)
total_count = len(unique_bio_ids)
# Breakdown if requested
breakdown = None
if breakdown_by != "none" and matching_members:
breakdown_counts = {}
for member in matching_members:
if breakdown_by == "party":
key = member.get('party', 'Unknown')
elif breakdown_by == "state":
key = member.get('region_code', 'Unknown')
elif breakdown_by == "position":
key = member.get('job_name', 'Unknown')
elif breakdown_by == "congress":
key = member.get('congress_number', 'Unknown')
else:
key = 'Unknown'
if key not in breakdown_counts:
breakdown_counts[key] = set()
breakdown_counts[key].add(member['bio_id'])
# Convert sets to counts
breakdown = [
{"group": k, "count": len(v)}
for k, v in sorted(breakdown_counts.items(), key=lambda x: len(x[1]), reverse=True)
]
response = {
"keywords": keywords,
"match_all": match_all,
"total_members_matching": total_count,
"breakdown_by": breakdown_by,
"breakdown": breakdown,
"filters_applied": {
"party": filter_party,
"state": filter_state
}
}
return [TextContent(type="text", text=json.dumps(response, indent=2))]
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
except Exception as e:
return [TextContent(type="text", text=f"Error executing tool {name}: {str(e)}")]
async def main():
"""Main entry point for the MCP server."""
# Initialize search index (log to stderr to not interfere with stdio JSON protocol)
if initialize_search_index():
print("Search index loaded successfully", file=sys.stderr, flush=True)
else:
print("Warning: Search index not found. Run ingest_data.py to create it.", file=sys.stderr, flush=True)
# Run the server
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())