youssefleb commited on
Commit
407dc12
·
verified ·
1 Parent(s): aa593cf

Update mcp_servers.py

Browse files
Files changed (1) hide show
  1. mcp_servers.py +25 -117
mcp_servers.py CHANGED
@@ -1,4 +1,4 @@
1
- # mcp_servers.py (FIXED: Schema Enforcement + Detailed Logging + Usage Tracking)
2
  import asyncio
3
  import json
4
  import re
@@ -12,56 +12,20 @@ from personas import PERSONAS_DATA
12
 
13
  EVALUATION_PROMPT_TEMPLATE = load_prompt(config.PROMPT_FILES["evaluator"])
14
 
15
- # --- DEFINING THE SCHEMA TO FORCE JUSTIFICATIONS ---
16
  EVALUATION_SCHEMA = {
17
  "type": "OBJECT",
18
  "properties": {
19
- "Novelty": {
20
- "type": "OBJECT",
21
- "properties": {
22
- "score": {"type": "INTEGER"},
23
- "justification": {"type": "STRING"}
24
- },
25
- "required": ["score", "justification"]
26
- },
27
- "Usefulness_Feasibility": {
28
- "type": "OBJECT",
29
- "properties": {
30
- "score": {"type": "INTEGER"},
31
- "justification": {"type": "STRING"}
32
- },
33
- "required": ["score", "justification"]
34
- },
35
- "Flexibility": {
36
- "type": "OBJECT",
37
- "properties": {
38
- "score": {"type": "INTEGER"},
39
- "justification": {"type": "STRING"}
40
- },
41
- "required": ["score", "justification"]
42
- },
43
- "Elaboration": {
44
- "type": "OBJECT",
45
- "properties": {
46
- "score": {"type": "INTEGER"},
47
- "justification": {"type": "STRING"}
48
- },
49
- "required": ["score", "justification"]
50
- },
51
- "Cultural_Appropriateness": {
52
- "type": "OBJECT",
53
- "properties": {
54
- "score": {"type": "INTEGER"},
55
- "justification": {"type": "STRING"}
56
- },
57
- "required": ["score", "justification"]
58
- }
59
  },
60
  "required": ["Novelty", "Usefulness_Feasibility", "Flexibility", "Elaboration", "Cultural_Appropriateness"]
61
  }
62
 
63
  def extract_json(text: str) -> dict:
64
- """Robustly extracts JSON from text."""
65
  try:
66
  clean_text = text.strip()
67
  if "```json" in clean_text:
@@ -72,38 +36,26 @@ def extract_json(text: str) -> dict:
72
  except (json.JSONDecodeError, IndexError):
73
  try:
74
  match = re.search(r'(\{[\s\S]*\})', text)
75
- if match:
76
- return json.loads(match.group(1))
77
- except:
78
- pass
79
  raise ValueError(f"Could not extract JSON from response: {text[:100]}...")
80
 
81
  class BusinessSolutionEvaluator:
82
  def __init__(self, gemini_client: Optional[genai.GenerativeModel]):
83
- if not gemini_client:
84
- raise ValueError("BusinessSolutionEvaluator requires a Google/Gemini client.")
85
  self.gemini_model = gemini_client
86
- if "ERROR:" in EVALUATION_PROMPT_TEMPLATE:
87
- raise FileNotFoundError(EVALUATION_PROMPT_TEMPLATE)
88
 
89
  async def evaluate(self, problem: str, solution_text: str) -> Tuple[dict, dict]:
90
- """Returns (evaluation_dict, usage_dict)"""
91
  print(f"Evaluating solution (live): {solution_text[:50]}...")
92
-
93
  base_prompt = EVALUATION_PROMPT_TEMPLATE.format(problem=problem, solution_text=solution_text)
94
 
95
  schema_instruction = """
96
  [IMPORTANT SYSTEM INSTRUCTION]
97
  Ignore any previous examples of JSON formatting in this prompt.
98
  You MUST strictly follow the Output Schema provided below.
99
-
100
- For EACH of the 5 metrics (Novelty, Usefulness_Feasibility, etc.), you must provide an object with TWO fields:
101
- 1. "score": An integer from 1 to 5.
102
- 2. "justification": A specific sentence explaining why you gave that score.
103
-
104
- Do not output a list. Return a single JSON object describing the solution above.
105
  """
106
-
107
  final_prompt = base_prompt + schema_instruction
108
  usage = {"model": "Gemini", "input": 0, "output": 0}
109
 
@@ -115,29 +67,16 @@ class BusinessSolutionEvaluator:
115
  response_schema=EVALUATION_SCHEMA
116
  )
117
  )
118
-
119
- # Capture Usage
120
  if hasattr(response, "usage_metadata"):
121
  usage["input"] = response.usage_metadata.prompt_token_count
122
  usage["output"] = response.usage_metadata.candidates_token_count
123
 
124
  v_fitness = extract_json(response.text)
125
-
126
- if not isinstance(v_fitness, (dict, list)):
127
- raise ValueError(f"Judge returned invalid type: {type(v_fitness)}")
128
-
129
- print(f"Evaluation complete (live): {v_fitness}")
130
  return v_fitness, usage
131
-
132
  except Exception as e:
133
  print(f"ERROR: BusinessSolutionEvaluator failed: {e}")
134
- return {
135
- "Novelty": {"score": 1, "justification": f"Error: {str(e)}"},
136
- "Usefulness_Feasibility": {"score": 1, "justification": f"Error: {str(e)}"},
137
- "Flexibility": {"score": 1, "justification": f"Error: {str(e)}"},
138
- "Elaboration": {"score": 1, "justification": f"Error: {str(e)}"},
139
- "Cultural_Appropriateness": {"score": 1, "justification": f"Error: {str(e)}"}
140
- }, usage
141
 
142
  class AgentCalibrator:
143
  def __init__(self, api_clients: dict, evaluator: BusinessSolutionEvaluator):
@@ -150,11 +89,12 @@ class AgentCalibrator:
150
  print(f"Running LIVE calibration test for specialist team on {self.sponsor_llms}...")
151
  error_log = []
152
  detailed_results = []
153
- all_usage_stats = [] # Collect all usage data here
154
 
155
  if not self.sponsor_llms:
156
  raise Exception("AgentCalibrator cannot run: No LLM clients are configured.")
157
 
 
158
  if len(self.sponsor_llms) == 1:
159
  default_llm = self.sponsor_llms[0]
160
  print("Only one LLM available. Skipping calibration.")
@@ -163,12 +103,10 @@ class AgentCalibrator:
163
  "Implementer": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Implementer"], "llm": default_llm},
164
  "Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": default_llm}
165
  }
 
166
  return plan, error_log, [], []
167
 
168
- roles_to_test = {
169
- role: PERSONAS_DATA[key]["description"]
170
- for role, key in config.CALIBRATION_CONFIG["roles_to_test"].items()
171
- }
172
  test_problem = f"For the business problem '{problem}', generate a single, brief, one-paragraph concept-level solution."
173
 
174
  tasks = []
@@ -179,7 +117,6 @@ class AgentCalibrator:
179
  results = await asyncio.gather(*tasks)
180
  detailed_results = results
181
 
182
- # Flatten results to extract usage
183
  for res in results:
184
  if "usage_gen" in res: all_usage_stats.append(res["usage_gen"])
185
  if "usage_eval" in res: all_usage_stats.append(res["usage_eval"])
@@ -197,7 +134,6 @@ class AgentCalibrator:
197
  continue
198
  metric = role_metrics[role]
199
 
200
- # Robust Dict Access
201
  raw_score_data = res.get("score", {})
202
  if not isinstance(raw_score_data, (dict, list)): raw_score_data = {}
203
  if isinstance(raw_score_data, list): raw_score_data = raw_score_data[0] if len(raw_score_data) > 0 else {}
@@ -207,7 +143,6 @@ class AgentCalibrator:
207
  if isinstance(metric_data, list): metric_data = metric_data[0] if len(metric_data) > 0 else {}
208
 
209
  score = metric_data.get("score", 0)
210
-
211
  if score > best_score:
212
  best_score = score
213
  best_llm = res["llm"]
@@ -219,78 +154,51 @@ class AgentCalibrator:
219
  "Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": best_llms["Monitor"]}
220
  }
221
  print(f"Calibration complete (live). Team plan: {team_plan}")
 
222
  return team_plan, error_log, detailed_results, all_usage_stats
223
 
224
  async def run_calibration_test(self, problem, role, llm_name, persona, test_problem):
225
- print(f"...Calibrating {role} on {llm_name}...")
226
  client = self.api_clients[llm_name]
227
-
228
- # 1. Generate Solution (and get usage)
229
  solution, gen_usage = await get_llm_response(llm_name, client, persona, test_problem)
230
 
231
  if "Error generating response" in solution:
232
  return {"role": role, "llm": llm_name, "error": solution, "output": solution, "usage_gen": gen_usage}
233
 
234
- # 2. Evaluate Solution (and get usage)
235
  score, eval_usage = await self.evaluator.evaluate(problem, solution)
236
-
237
  return {
238
- "role": role,
239
- "llm": llm_name,
240
- "score": score,
241
- "output": solution,
242
- "usage_gen": gen_usage,
243
- "usage_eval": eval_usage
244
  }
245
 
246
- # --- Unified API Call Function ---
247
  async def get_llm_response(client_name: str, client, system_prompt: str, user_prompt: str) -> Tuple[str, dict]:
248
  """Returns (text_response, usage_dict)"""
249
  usage = {"model": client_name, "input": 0, "output": 0}
250
  try:
251
  if client_name == "Gemini":
252
  model = client
253
- full_prompt = [
254
- {'role': 'user', 'parts': [system_prompt]},
255
- {'role': 'model', 'parts': ["Understood. I will act as this persona."]},
256
- {'role': 'user', 'parts': [user_prompt]}
257
- ]
258
  response = await model.generate_content_async(full_prompt)
259
-
260
- # Capture Gemini Usage
261
  if hasattr(response, "usage_metadata"):
262
  usage["input"] = response.usage_metadata.prompt_token_count
263
  usage["output"] = response.usage_metadata.candidates_token_count
264
-
265
  return response.text, usage
266
 
267
  elif client_name == "Anthropic":
268
  response = await client.messages.create(
269
- model=config.MODELS["Anthropic"]["default"],
270
- max_tokens=8192,
271
- system=system_prompt,
272
- messages=[{"role": "user", "content": user_prompt}]
273
  )
274
- # Capture Anthropic Usage
275
  if hasattr(response, "usage"):
276
  usage["input"] = response.usage.input_tokens
277
  usage["output"] = response.usage.output_tokens
278
-
279
  return response.content[0].text, usage
280
 
281
- elif client_name == "SambaNova":
 
282
  completion = await client.chat.completions.create(
283
- model=config.MODELS["SambaNova"]["default"],
284
- messages=[
285
- {"role": "system", "content": system_prompt},
286
- {"role": "user", "content": user_prompt}
287
- ]
288
  )
289
- # Capture SambaNova Usage
290
  if hasattr(completion, "usage"):
291
  usage["input"] = completion.usage.prompt_tokens
292
  usage["output"] = completion.usage.completion_tokens
293
-
294
  return completion.choices[0].message.content, usage
295
 
296
  except Exception as e:
 
1
+ # mcp_servers.py (FIXED: Guarantees 4-value return tuple)
2
  import asyncio
3
  import json
4
  import re
 
12
 
13
  EVALUATION_PROMPT_TEMPLATE = load_prompt(config.PROMPT_FILES["evaluator"])
14
 
15
+ # Schema definition
16
  EVALUATION_SCHEMA = {
17
  "type": "OBJECT",
18
  "properties": {
19
+ "Novelty": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]},
20
+ "Usefulness_Feasibility": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]},
21
+ "Flexibility": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]},
22
+ "Elaboration": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]},
23
+ "Cultural_Appropriateness": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
  },
25
  "required": ["Novelty", "Usefulness_Feasibility", "Flexibility", "Elaboration", "Cultural_Appropriateness"]
26
  }
27
 
28
  def extract_json(text: str) -> dict:
 
29
  try:
30
  clean_text = text.strip()
31
  if "```json" in clean_text:
 
36
  except (json.JSONDecodeError, IndexError):
37
  try:
38
  match = re.search(r'(\{[\s\S]*\})', text)
39
+ if match: return json.loads(match.group(1))
40
+ except: pass
 
 
41
  raise ValueError(f"Could not extract JSON from response: {text[:100]}...")
42
 
43
  class BusinessSolutionEvaluator:
44
  def __init__(self, gemini_client: Optional[genai.GenerativeModel]):
45
+ if not gemini_client: raise ValueError("BusinessSolutionEvaluator requires a Google/Gemini client.")
 
46
  self.gemini_model = gemini_client
 
 
47
 
48
  async def evaluate(self, problem: str, solution_text: str) -> Tuple[dict, dict]:
 
49
  print(f"Evaluating solution (live): {solution_text[:50]}...")
 
50
  base_prompt = EVALUATION_PROMPT_TEMPLATE.format(problem=problem, solution_text=solution_text)
51
 
52
  schema_instruction = """
53
  [IMPORTANT SYSTEM INSTRUCTION]
54
  Ignore any previous examples of JSON formatting in this prompt.
55
  You MUST strictly follow the Output Schema provided below.
56
+ For EACH of the 5 metrics, you must provide an object with TWO fields: "score" (integer) and "justification" (string).
57
+ Do not output a list. Return a single JSON object.
 
 
 
 
58
  """
 
59
  final_prompt = base_prompt + schema_instruction
60
  usage = {"model": "Gemini", "input": 0, "output": 0}
61
 
 
67
  response_schema=EVALUATION_SCHEMA
68
  )
69
  )
 
 
70
  if hasattr(response, "usage_metadata"):
71
  usage["input"] = response.usage_metadata.prompt_token_count
72
  usage["output"] = response.usage_metadata.candidates_token_count
73
 
74
  v_fitness = extract_json(response.text)
75
+ if not isinstance(v_fitness, (dict, list)): raise ValueError(f"Judge returned invalid type: {type(v_fitness)}")
 
 
 
 
76
  return v_fitness, usage
 
77
  except Exception as e:
78
  print(f"ERROR: BusinessSolutionEvaluator failed: {e}")
79
+ return {"Novelty": {"score": 1, "justification": f"Error: {str(e)}"}}, usage
 
 
 
 
 
 
80
 
81
  class AgentCalibrator:
82
  def __init__(self, api_clients: dict, evaluator: BusinessSolutionEvaluator):
 
89
  print(f"Running LIVE calibration test for specialist team on {self.sponsor_llms}...")
90
  error_log = []
91
  detailed_results = []
92
+ all_usage_stats = []
93
 
94
  if not self.sponsor_llms:
95
  raise Exception("AgentCalibrator cannot run: No LLM clients are configured.")
96
 
97
+ # If only one model, return default plan + empty lists for details/usage
98
  if len(self.sponsor_llms) == 1:
99
  default_llm = self.sponsor_llms[0]
100
  print("Only one LLM available. Skipping calibration.")
 
103
  "Implementer": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Implementer"], "llm": default_llm},
104
  "Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": default_llm}
105
  }
106
+ # MUST RETURN 4 VALUES
107
  return plan, error_log, [], []
108
 
109
+ roles_to_test = {role: PERSONAS_DATA[key]["description"] for role, key in config.CALIBRATION_CONFIG["roles_to_test"].items()}
 
 
 
110
  test_problem = f"For the business problem '{problem}', generate a single, brief, one-paragraph concept-level solution."
111
 
112
  tasks = []
 
117
  results = await asyncio.gather(*tasks)
118
  detailed_results = results
119
 
 
120
  for res in results:
121
  if "usage_gen" in res: all_usage_stats.append(res["usage_gen"])
122
  if "usage_eval" in res: all_usage_stats.append(res["usage_eval"])
 
134
  continue
135
  metric = role_metrics[role]
136
 
 
137
  raw_score_data = res.get("score", {})
138
  if not isinstance(raw_score_data, (dict, list)): raw_score_data = {}
139
  if isinstance(raw_score_data, list): raw_score_data = raw_score_data[0] if len(raw_score_data) > 0 else {}
 
143
  if isinstance(metric_data, list): metric_data = metric_data[0] if len(metric_data) > 0 else {}
144
 
145
  score = metric_data.get("score", 0)
 
146
  if score > best_score:
147
  best_score = score
148
  best_llm = res["llm"]
 
154
  "Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": best_llms["Monitor"]}
155
  }
156
  print(f"Calibration complete (live). Team plan: {team_plan}")
157
+ # MUST RETURN 4 VALUES
158
  return team_plan, error_log, detailed_results, all_usage_stats
159
 
160
  async def run_calibration_test(self, problem, role, llm_name, persona, test_problem):
 
161
  client = self.api_clients[llm_name]
 
 
162
  solution, gen_usage = await get_llm_response(llm_name, client, persona, test_problem)
163
 
164
  if "Error generating response" in solution:
165
  return {"role": role, "llm": llm_name, "error": solution, "output": solution, "usage_gen": gen_usage}
166
 
 
167
  score, eval_usage = await self.evaluator.evaluate(problem, solution)
 
168
  return {
169
+ "role": role, "llm": llm_name, "score": score, "output": solution, "usage_gen": gen_usage, "usage_eval": eval_usage
 
 
 
 
 
170
  }
171
 
 
172
  async def get_llm_response(client_name: str, client, system_prompt: str, user_prompt: str) -> Tuple[str, dict]:
173
  """Returns (text_response, usage_dict)"""
174
  usage = {"model": client_name, "input": 0, "output": 0}
175
  try:
176
  if client_name == "Gemini":
177
  model = client
178
+ full_prompt = [{'role': 'user', 'parts': [system_prompt]}, {'role': 'model', 'parts': ["Understood."]}, {'role': 'user', 'parts': [user_prompt]}]
 
 
 
 
179
  response = await model.generate_content_async(full_prompt)
 
 
180
  if hasattr(response, "usage_metadata"):
181
  usage["input"] = response.usage_metadata.prompt_token_count
182
  usage["output"] = response.usage_metadata.candidates_token_count
 
183
  return response.text, usage
184
 
185
  elif client_name == "Anthropic":
186
  response = await client.messages.create(
187
+ model=config.MODELS["Anthropic"]["default"], max_tokens=8192, system=system_prompt, messages=[{"role": "user", "content": user_prompt}]
 
 
 
188
  )
 
189
  if hasattr(response, "usage"):
190
  usage["input"] = response.usage.input_tokens
191
  usage["output"] = response.usage.output_tokens
 
192
  return response.content[0].text, usage
193
 
194
+ elif client_name in ["SambaNova", "OpenAI", "Nebius"]:
195
+ model_id = config.MODELS.get(client_name, {}).get("default", "gpt-4o-mini")
196
  completion = await client.chat.completions.create(
197
+ model=model_id, messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}]
 
 
 
 
198
  )
 
199
  if hasattr(completion, "usage"):
200
  usage["input"] = completion.usage.prompt_tokens
201
  usage["output"] = completion.usage.completion_tokens
 
202
  return completion.choices[0].message.content, usage
203
 
204
  except Exception as e: