DrishtiSharma commited on
Commit
8155d66
Β·
verified Β·
1 Parent(s): 7417989

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +30 -12
app.py CHANGED
@@ -71,14 +71,20 @@ async def stream_or_async_response(messages: Union[Iterable[ResponseChunk], Asyn
71
  layer_outputs = {}
72
 
73
  async def process_message(message):
74
- # Store intermediate messages in `layer_outputs`
75
- if message['response_type'] == 'intermediate':
76
- layer = message['metadata']['layer']
 
 
 
 
 
 
77
  if layer not in layer_outputs:
78
  layer_outputs[layer] = []
79
- layer_outputs[layer].append(message['delta'])
80
  return "" # Intermediate messages don't contribute to the final response
81
- elif message['response_type'] == 'final':
82
  # Final message processing
83
  for layer, outputs in layer_outputs.items():
84
  st.write(f"Layer {layer}")
@@ -87,22 +93,25 @@ async def stream_or_async_response(messages: Union[Iterable[ResponseChunk], Asyn
87
  with cols[i]:
88
  st.expander(label=f"Agent {i + 1}", expanded=False).write(output)
89
  layer_outputs.clear()
90
- return message['delta'] or "" # Ensure no None value is returned
91
  else:
92
- return "" # Default to empty string if no valid response type
 
 
93
 
94
  # Check if the input is an async or sync iterable
95
  if hasattr(messages, "__aiter__"): # Asynchronous iterable
96
  async for message in messages:
97
  resolved_message = await process_message(message)
98
- yield resolved_message
99
  elif hasattr(messages, "__iter__"): # Synchronous iterable
100
  for message in messages:
101
  resolved_message = process_message(message) # Do not `await` sync messages
102
- yield resolved_message
103
  else:
104
  raise TypeError("'messages' must be an Iterable or AsyncIterable.")
105
 
 
106
  # Set up the MOAgent
107
  def set_moa_agent(
108
  main_model: str = default_config['main_model'],
@@ -267,20 +276,28 @@ if query := st.chat_input("Ask a question"):
267
 
268
  # Get messages from MOAgent (supports async streaming)
269
  messages = moa_agent.chat(query, output_format='json')
270
-
271
  try:
272
  # Stream and display responses from `stream_or_async_response`
273
  final_response = ""
274
  async for response in stream_or_async_response(messages):
275
  # Skip None or empty responses
276
- if response:
277
  final_response += response
278
  message_placeholder.markdown(final_response)
 
 
 
279
 
280
  # Save the final response to session state
281
- st.session_state.messages.append({"role": "assistant", "content": final_response})
 
 
 
282
  except Exception as e:
283
  st.error(f"Error processing response: {e}")
 
 
284
 
285
  # Run the asynchronous handle_query function
286
  asyncio.run(handle_query())
@@ -288,6 +305,7 @@ if query := st.chat_input("Ask a question"):
288
 
289
 
290
 
 
291
  # Add acknowledgment at the bottom
292
  st.markdown("---")
293
  st.markdown("""
 
71
  layer_outputs = {}
72
 
73
  async def process_message(message):
74
+ """
75
+ Process an individual message and return the content for streaming.
76
+ Returns an empty string if the message type is intermediate or invalid.
77
+ """
78
+ # Debug: Log the incoming message
79
+ st.write(f"Processing message: {message}")
80
+
81
+ if message.get('response_type') == 'intermediate':
82
+ layer = message['metadata'].get('layer', "Unknown")
83
  if layer not in layer_outputs:
84
  layer_outputs[layer] = []
85
+ layer_outputs[layer].append(message.get('delta', ""))
86
  return "" # Intermediate messages don't contribute to the final response
87
+ elif message.get('response_type') == 'final':
88
  # Final message processing
89
  for layer, outputs in layer_outputs.items():
90
  st.write(f"Layer {layer}")
 
93
  with cols[i]:
94
  st.expander(label=f"Agent {i + 1}", expanded=False).write(output)
95
  layer_outputs.clear()
96
+ return message.get('delta', "") # Ensure no None value is returned
97
  else:
98
+ # Debug: Log unexpected message types
99
+ st.write(f"Unexpected message type: {message}")
100
+ return "" # Default to empty string for invalid types
101
 
102
  # Check if the input is an async or sync iterable
103
  if hasattr(messages, "__aiter__"): # Asynchronous iterable
104
  async for message in messages:
105
  resolved_message = await process_message(message)
106
+ yield resolved_message or "" # Yield empty string if None
107
  elif hasattr(messages, "__iter__"): # Synchronous iterable
108
  for message in messages:
109
  resolved_message = process_message(message) # Do not `await` sync messages
110
+ yield resolved_message or "" # Yield empty string if None
111
  else:
112
  raise TypeError("'messages' must be an Iterable or AsyncIterable.")
113
 
114
+
115
  # Set up the MOAgent
116
  def set_moa_agent(
117
  main_model: str = default_config['main_model'],
 
276
 
277
  # Get messages from MOAgent (supports async streaming)
278
  messages = moa_agent.chat(query, output_format='json')
279
+
280
  try:
281
  # Stream and display responses from `stream_or_async_response`
282
  final_response = ""
283
  async for response in stream_or_async_response(messages):
284
  # Skip None or empty responses
285
+ if response is not None and response.strip():
286
  final_response += response
287
  message_placeholder.markdown(final_response)
288
+ else:
289
+ # Debug: Log skipped empty/None responses
290
+ st.write(f"Skipped empty or None response: {response}")
291
 
292
  # Save the final response to session state
293
+ if final_response.strip(): # Only save non-empty responses
294
+ st.session_state.messages.append({"role": "assistant", "content": final_response})
295
+ else:
296
+ st.error("Received no valid response from the assistant.")
297
  except Exception as e:
298
  st.error(f"Error processing response: {e}")
299
+ # Debug: Log the exception
300
+ st.write(f"Exception details: {e}")
301
 
302
  # Run the asynchronous handle_query function
303
  asyncio.run(handle_query())
 
305
 
306
 
307
 
308
+
309
  # Add acknowledgment at the bottom
310
  st.markdown("---")
311
  st.markdown("""