Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import json
import math
import os
import re
import uuid
from collections import defaultdict
from datetime import datetime
Expand Down Expand Up @@ -639,6 +640,123 @@ def _build_sample_payload(

return sample_payload

@staticmethod
def _clean_content_filter_response(content: Any) -> str:
"""If content looks like a raw content-filter API response, replace with friendly text.

Prefers structured JSON parsing over regex heuristics. Only content
that actually parses as a serialised API payload (or nested JSON
inside one) is rewritten; plain-text that merely *mentions*
``content_filter`` is returned unchanged.
"""
if not isinstance(content, str):
return str(content) if content is not None else ""
if not content:
return content

filter_details: List[str] = []
stripped = content.strip()

# --- Step 1: try to parse the whole content as JSON -----------------
if stripped.startswith(("{", "[")):
try:
parsed = json.loads(stripped)
filter_details = ResultProcessor._extract_filter_details_from_parsed(parsed)
if filter_details:
return f"[Response blocked by content filter: {', '.join(filter_details)}]"
# Parsed successfully and contains content_filter indicators → generic message
if ResultProcessor._has_content_filter_keys(parsed):
return "[Response blocked by Azure OpenAI content filter]"
except (json.JSONDecodeError, TypeError, ValueError):
pass

# --- Step 2: try to extract nested "message" JSON -------------------
if '"message":' in content:
try:
match = re.search(r'"message"\s*:\s*"((?:[^"\\]|\\.)*)"', content)
if match:
inner = match.group(1).replace('\\"', '"').replace("\\\\", "\\")
try:
inner_json = json.loads(inner)
filter_details = ResultProcessor._extract_filter_details_from_parsed(inner_json)
except (json.JSONDecodeError, TypeError, ValueError):
pass
except (re.error, AttributeError):
pass

if filter_details:
return f"[Response blocked by content filter: {', '.join(filter_details)}]"

# --- Step 3: regex fallback for non-JSON edge cases -----------------
# Only fire when the content actually starts with '{' / '[' (i.e. it
# looks like a payload but json.loads failed, e.g. truncated JSON).
if stripped.startswith(("{", "[")):
try:
for category in ["hate", "self_harm", "sexual", "violence"]:
pattern = f'"{category}".*?"filtered"\\s*:\\s*true'
if re.search(pattern, content, re.IGNORECASE):
sev_match = re.search(
f'"{category}".*?"severity"\\s*:\\s*"(\\w+)"',
content,
re.IGNORECASE,
)
severity = sev_match.group(1) if sev_match else "unknown"
filter_details.append(f"{category} (severity: {severity})")
except (re.error, AttributeError):
pass

Comment on lines +694 to +707
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parsing logic relies on regexes over the raw string and then wraps the whole extraction in a broad except Exception, which can mask unexpected issues and makes the behavior hard to reason about. Since the goal is to detect/clean serialized JSON payloads, it would be more robust to attempt json.loads(content) first (when content looks like JSON) and extract choices[*].content_filter_results directly, falling back to regex only if JSON parsing fails; if you keep the try/except, consider narrowing it to expected exception types.

Suggested change
try:
# Try to parse the nested JSON to extract filter details
if '"message":' in content:
match = re.search(r'"message"\s*:\s*"(.*?)"(?:\s*[,}])', content)
if match:
inner = match.group(1).replace('\\"', '"').replace("\\\\", "\\")
try:
inner_json = json.loads(inner)
for choice in inner_json.get("choices", []):
cfr = choice.get("content_filter_results", {})
for category, details in cfr.items():
if isinstance(details, dict) and details.get("filtered"):
severity = details.get("severity", "unknown")
filter_details.append(f"{category} (severity: {severity})")
except (json.JSONDecodeError, KeyError, TypeError):
pass
# Also check for the direct error format
if not filter_details and "content_filter" in content_lower:
for category in ["hate", "self_harm", "sexual", "violence"]:
pattern = f'"{category}".*?"filtered"\\s*:\\s*true'
if re.search(pattern, content, re.IGNORECASE):
sev_match = re.search(f'"{category}".*?"severity"\\s*:\\s*"(\\w+)"', content, re.IGNORECASE)
severity = sev_match.group(1) if sev_match else "unknown"
filter_details.append(f"{category} (severity: {severity})")
except Exception: # pylint: disable=broad-except
pass
# First, try to parse the entire content as JSON if it looks like JSON.
stripped = content.strip()
if stripped and stripped[0] in ("{", "["):
try:
parsed = json.loads(stripped)
except json.JSONDecodeError:
parsed = None
if isinstance(parsed, dict):
choices = parsed.get("choices", [])
if isinstance(choices, list):
for choice in choices:
if not isinstance(choice, dict):
continue
# Prefer content_filter_results, but also check content_filter for robustness.
cfr = choice.get("content_filter_results") or choice.get("content_filter", {})
if not isinstance(cfr, dict):
continue
for category, details in cfr.items():
if isinstance(details, dict) and details.get("filtered"):
severity = details.get("severity", "unknown")
filter_details.append(f"{category} (severity: {severity})")
# If we did not find details from the top-level JSON, try nested "message" JSON extraction.
if not filter_details and '"message":' in content:
match = re.search(r'"message"\s*:\s*"(.*?)"(?:\s*[,}])', content)
if match:
inner = match.group(1).replace('\\"', '"').replace("\\\\", "\\")
try:
inner_json = json.loads(inner)
for choice in inner_json.get("choices", []):
if not isinstance(choice, dict):
continue
cfr = choice.get("content_filter_results", {})
if not isinstance(cfr, dict):
continue
for category, details in cfr.items():
if isinstance(details, dict) and details.get("filtered"):
severity = details.get("severity", "unknown")
filter_details.append(f"{category} (severity: {severity})")
except (json.JSONDecodeError, KeyError, TypeError):
# If the nested JSON is malformed or has unexpected shape, ignore and fall back.
pass
# Also check for the direct error format via regex as a final fallback.
if not filter_details and "content_filter" in content_lower:
for category in ["hate", "self_harm", "sexual", "violence"]:
pattern = f'"{category}".*?"filtered"\\s*:\\s*true'
if re.search(pattern, content, re.IGNORECASE):
sev_match = re.search(
f'"{category}".*?"severity"\\s*:\\s*"(\\w+)"',
content,
re.IGNORECASE,
)
severity = sev_match.group(1) if sev_match else "unknown"
filter_details.append(f"{category} (severity: {severity})")

Copilot uses AI. Check for mistakes.
if filter_details:
return f"[Response blocked by content filter: {', '.join(filter_details)}]"
# Last resort: if it starts with JSON chars and mentions content_filter
content_lower = content.lower()
if "content_filter" in content_lower or '"finish_reason":"content_filter"' in content_lower:
return "[Response blocked by Azure OpenAI content filter]"

return content

@staticmethod
def _extract_filter_details_from_parsed(parsed: Any) -> List[str]:
"""Extract content-filter category details from a parsed JSON structure."""
details: List[str] = []
if not isinstance(parsed, dict):
return details
choices = parsed.get("choices", [])
if isinstance(choices, list):
for choice in choices:
if not isinstance(choice, dict):
continue
cfr = choice.get("content_filter_results", {})
if isinstance(cfr, dict):
for category, info in cfr.items():
if isinstance(info, dict) and info.get("filtered"):
severity = info.get("severity", "unknown")
details.append(f"{category} (severity: {severity})")
# Also handle top-level content_filter_results (non-choices wrapper)
cfr_top = parsed.get("content_filter_results", {})
if isinstance(cfr_top, dict) and not details:
for category, info in cfr_top.items():
if isinstance(info, dict) and info.get("filtered"):
severity = info.get("severity", "unknown")
details.append(f"{category} (severity: {severity})")
return details

@staticmethod
def _has_content_filter_keys(parsed: Any) -> bool:
"""Check whether a parsed JSON object contains content-filter indicator keys."""
if not isinstance(parsed, dict):
return False
if "content_filter_results" in parsed:
return True
if parsed.get("finish_reason") == "content_filter":
return True
for choice in parsed.get("choices", []):
if isinstance(choice, dict):
if "content_filter_results" in choice:
return True
if choice.get("finish_reason") == "content_filter":
return True
return False

@staticmethod
def _normalize_sample_message(message: Dict[str, Any]) -> Dict[str, Any]:
"""Return a shallow copy of a message limited to supported fields."""
Expand All @@ -657,6 +775,10 @@ def _normalize_sample_message(message: Dict[str, Any]) -> Dict[str, Any]:
if isinstance(tool_calls_value, list):
normalized["tool_calls"] = [call for call in tool_calls_value if isinstance(call, dict)]

# Clean raw content-filter API responses for assistant messages
if normalized.get("role") == "assistant":
normalized["content"] = ResultProcessor._clean_content_filter_response(normalized.get("content", ""))

Comment on lines +778 to +781
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change introduces new normalization behavior (rewriting assistant message content when it resembles a content-filter payload), but there are existing unit tests for red_team/ResultProcessor and none appear to cover this path. Adding a focused test for _normalize_sample_message/_clean_content_filter_response (both a positive case with a real content-filter payload and a negative case with normal text mentioning similar words) would help prevent regressions.

Copilot uses AI. Check for mistakes.
return normalized

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
"""Tests for ResultProcessor._clean_content_filter_response and helpers."""

import json

import pytest

from azure.ai.evaluation.red_team._result_processor import ResultProcessor


class TestCleanContentFilterResponse:
"""Tests addressing PR #45528 review comments on _clean_content_filter_response."""

# -- positive: real content-filter JSON payload (choices structure) -------
def test_json_payload_with_filtered_choices(self):
payload = json.dumps(
{
"choices": [
{
"content_filter_results": {
"hate": {"filtered": True, "severity": "high"},
"violence": {"filtered": False, "severity": "safe"},
}
}
]
}
)
result = ResultProcessor._clean_content_filter_response(payload)
assert "hate (severity: high)" in result
assert "violence" not in result
assert result.startswith("[Response blocked by content filter:")

def test_json_payload_multiple_categories_filtered(self):
payload = json.dumps(
{
"choices": [
{
"content_filter_results": {
"hate": {"filtered": True, "severity": "medium"},
"sexual": {"filtered": True, "severity": "high"},
}
}
]
}
)
result = ResultProcessor._clean_content_filter_response(payload)
assert "hate (severity: medium)" in result
assert "sexual (severity: high)" in result

# -- positive: finish_reason content_filter (no detail extraction) -------
def test_json_payload_finish_reason_content_filter(self):
payload = json.dumps({"choices": [{"finish_reason": "content_filter"}]})
result = ResultProcessor._clean_content_filter_response(payload)
assert result == "[Response blocked by Azure OpenAI content filter]"

# -- positive: nested "message" JSON format ------------------------------
def test_nested_message_json(self):
inner = json.dumps(
{
"choices": [
{
"content_filter_results": {
"self_harm": {"filtered": True, "severity": "medium"},
}
}
]
}
)
outer = json.dumps({"error": {"message": inner}})
result = ResultProcessor._clean_content_filter_response(outer)
assert "self_harm (severity: medium)" in result

# -- positive: top-level content_filter_results (no choices wrapper) -----
def test_top_level_content_filter_results(self):
payload = json.dumps(
{
"content_filter_results": {
"violence": {"filtered": True, "severity": "high"},
}
}
)
result = ResultProcessor._clean_content_filter_response(payload)
assert "violence (severity: high)" in result

# -- negative: normal text mentioning content_filter is NOT modified -----
def test_plain_text_mentioning_content_filter_unchanged(self):
text = "The content_filter module handles policy violations."
result = ResultProcessor._clean_content_filter_response(text)
assert result == text

def test_plain_text_mentioning_content_management_policy_unchanged(self):
text = "Our content management policy requires review of all outputs."
result = ResultProcessor._clean_content_filter_response(text)
assert result == text

def test_normal_sentence_with_filter_word(self):
text = 'The system said "content_filter_results are logged for auditing".'
result = ResultProcessor._clean_content_filter_response(text)
assert result == text

# -- non-string inputs (Comment 3) --------------------------------------
def test_non_string_int_returns_str(self):
result = ResultProcessor._clean_content_filter_response(42)
assert result == "42"

def test_non_string_dict_returns_str(self):
result = ResultProcessor._clean_content_filter_response({"key": "value"})
assert result == "{'key': 'value'}"

def test_non_string_none_returns_empty(self):
result = ResultProcessor._clean_content_filter_response(None)
assert result == ""

def test_non_string_list_returns_str(self):
result = ResultProcessor._clean_content_filter_response([1, 2, 3])
assert result == "[1, 2, 3]"

# -- empty / whitespace edge cases --------------------------------------
def test_empty_string_returns_empty(self):
assert ResultProcessor._clean_content_filter_response("") == ""

def test_whitespace_only_passthrough(self):
assert ResultProcessor._clean_content_filter_response(" ") == " "

# -- regex fallback for truncated JSON -----------------------------------
def test_truncated_json_with_filter_details_regex_fallback(self):
# Starts with '{' but not valid JSON — should fall back to regex
broken = '{"choices":[{"hate":{"filtered": true, "severity":"high"}'
result = ResultProcessor._clean_content_filter_response(broken)
assert "hate (severity: high)" in result

# -- JSON that parses but has no filter indicators → passthrough ---------
def test_json_without_filter_keys_passthrough(self):
payload = json.dumps({"choices": [{"text": "hello"}]})
result = ResultProcessor._clean_content_filter_response(payload)
assert result == payload


class TestExtractFilterDetailsFromParsed:
"""Unit tests for the helper that extracts categories from parsed dicts."""

def test_choices_structure(self):
parsed = {"choices": [{"content_filter_results": {"violence": {"filtered": True, "severity": "high"}}}]}
details = ResultProcessor._extract_filter_details_from_parsed(parsed)
assert details == ["violence (severity: high)"]

def test_non_dict_input_returns_empty(self):
assert ResultProcessor._extract_filter_details_from_parsed("not a dict") == []
assert ResultProcessor._extract_filter_details_from_parsed(None) == []

def test_top_level_cfr(self):
parsed = {"content_filter_results": {"hate": {"filtered": True, "severity": "low"}}}
details = ResultProcessor._extract_filter_details_from_parsed(parsed)
assert details == ["hate (severity: low)"]


class TestHasContentFilterKeys:
"""Unit tests for _has_content_filter_keys."""

def test_top_level_key(self):
assert ResultProcessor._has_content_filter_keys({"content_filter_results": {}}) is True

def test_finish_reason(self):
assert ResultProcessor._has_content_filter_keys({"finish_reason": "content_filter"}) is True

def test_choice_level_key(self):
parsed = {"choices": [{"content_filter_results": {}}]}
assert ResultProcessor._has_content_filter_keys(parsed) is True

def test_no_indicators(self):
assert ResultProcessor._has_content_filter_keys({"choices": [{"text": "hi"}]}) is False

def test_non_dict(self):
assert ResultProcessor._has_content_filter_keys([1, 2]) is False