Skip to content

General UDAF pushdown as scripts#5064

Open
songkant-aws wants to merge 15 commits intoopensearch-project:mainfrom
songkant-aws:udaf-script-pushdown
Open

General UDAF pushdown as scripts#5064
songkant-aws wants to merge 15 commits intoopensearch-project:mainfrom
songkant-aws:udaf-script-pushdown

Conversation

@songkant-aws
Copy link
Contributor

@songkant-aws songkant-aws commented Jan 22, 2026

Description

Push down any UDAF as scripts to allow parallel evaluating sub aggregation result per shard and reduce them into a final aggregation result. We expect it will speed up some complex command like patterns or future UDAFs. Pending benchmark test.

Related Issues

Resolves #4354

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • New PPL command checklist all confirmed.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff or -s.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 22, 2026

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Optional pushdown of pattern aggregation to data nodes for distributed processing and improved performance
    • Option to show numbered tokens in pattern aggregation results
    • New cluster setting to enable/disable pattern aggregation pushdown
  • Documentation

    • Expanded docs explaining pushdown behavior, how to enable it, and memory/circuit-breaker considerations
  • Tests

    • Added integration and explain-plan tests covering pushdown and numbered-token scenarios

✏️ Tip: You can customize this high-level summary in your review settings.

Walkthrough

Adds scripted-metric UDAF pushdown for pattern aggregation: introduces PatternAggregationHelpers, scripted-metric UDAF framework (UDAF interface, registry, per-phase factories, ScriptedMetricDataContext), Calcite/UDF integration, OpenSearch scripted-metric wiring, a runtime setting to toggle pushdown, tests, and docs/plan updates.

Changes

Cohort / File(s) Summary
Pattern Aggregation Helpers
common/src/main/java/org/opensearch/sql/common/patterns/PatternAggregationHelpers.java, core/src/main/java/org/opensearch/sql/calcite/udf/udaf/LogPatternAggFunction.java
New shared Map-based accumulator helpers for init/add/combine/result; LogPatternAggFunction refactored to delegate state updates and result production to helpers.
Settings & Runtime Flag
common/src/main/java/org/opensearch/sql/common/setting/Settings.java, opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java
Added CALCITE_UDAF_PUSHDOWN_ENABLED key and a dynamic node-scoped OpenSearch setting to enable/disable UDAF pushdown.
Calcite UDFs & Function Wiring
core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java, core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java, core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java, core/src/main/java/org/opensearch/sql/calcite/utils/UserDefinedFunctionUtils.java
New builtin function names for pattern phases and adapters wrapping PatternAggregationHelpers static methods into Calcite UDF operators; helper to adapt static methods as UDFs.
Pattern Parsing & Projection
core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java, core/src/main/java/org/opensearch/sql/expression/function/PatternParserFunctionImpl.java
Added flattenParsedPattern and buildEvalAggSamplesCall to project parsed pattern/tokens/sample_logs; new evalAggSamples path for aggregation-mode parsing including optional numbered tokens.
Scripted-metric UDAF Framework
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/ScriptedMetricUDAF.java, .../ScriptedMetricUDAFRegistry.java, .../ScriptedMetricDataContext.java, .../udaf/PatternScriptedMetricUDAF.java
New public interface for scripted-metric UDAFs (lifecycle methods), registry for UDAFs, DataContext implementations for phases, and PatternScriptedMetricUDAF that builds RexNode-based init/map/combine/reduce scripts.
Script Factories (per-phase)
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricInitScriptFactory.java, .../CalciteScriptedMetricMapScriptFactory.java, .../CalciteScriptedMetricCombineScriptFactory.java, .../CalciteScriptedMetricReduceScriptFactory.java
New factories that wrap compiled RexNode functions into ScriptedMetricAggContexts for init, map, combine, and reduce phases, binding DataContext/params/state/states.
Pushdown Integration & Request Analysis
opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java, opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java
AggregateAnalyzer gains udafPushdownEnabled guard and delegates INTERNAL_PATTERN pushdown to ScriptedMetricUDAFRegistry; CalciteLogicalIndexScan propagates the pushdown flag to the analyzer helper.
Script Engine & Parameter Handling
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java, opensearch/src/main/java/org/opensearch/sql/opensearch/storage/serde/ScriptParameterHelper.java
Added scripted-metric script contexts to CalciteScriptEngine; ScriptParameterHelper gains addSpecialVariable to register special-variable params for scripted-metric bindings.
Scripted-metric Result Parsing
opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/ScriptedMetricParser.java
New MetricParser implementation to parse scripted-metric aggregation results into List/Map structure expected by the engine.
OpenSearch value parsing
opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java
Enhanced array parsing flow with unified parseArray pre-check and improved handling of empty/mixed arrays and single-element arrays.
Tests & Expectations
integ-test/src/test/java/.../CalcitePPLPatternsIT.java, integ-test/src/test/java/.../ExplainIT.java, ppl/src/test/java/.../CalcitePPLPatternsTest.java, test resources under integ-test/src/test/resources/expectedOutput/..., opensearch/src/test/java/.../AggregateAnalyzerTest.java
Added integration tests toggling pushdown, updated explain-plan expected outputs for pushdown/no-pushdown plans, and adjusted unit tests for constructor signature changes.
Documentation
docs/user/ppl/cmd/patterns.md
Documented UDAF pushdown option, how to enable it, and operational cautions (memory/circuit-breaker).

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant Coord as Coordinator
    participant DN1 as DataNode1
    participant DN2 as DataNode2
    participant DN3 as DataNode3
    participant Reducer as Reducer

    Client->>Coord: Submit pattern aggregation query (udaf pushdown enabled)
    Coord->>DN1: InitScript (state init)
    DN1->>DN1: MapScript (per-doc add -> buffer/partial merge)
    DN1->>DN1: CombineScript (emit shard state)
    Coord->>DN2: InitScript
    DN2->>DN2: MapScript
    DN2->>DN2: CombineScript
    Coord->>DN3: InitScript
    DN3->>DN3: MapScript
    DN3->>DN3: CombineScript
    DN1-->>Reducer: Shard state 1 (map)
    DN2-->>Reducer: Shard state 2
    DN3-->>Reducer: Shard state 3
    Reducer->>Reducer: ReduceScript (merge shard states -> PatternAggregationHelpers.produce)
    Reducer-->>Client: Final pattern results (List<Map<String,Object>>)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~65 minutes

Possibly related PRs

  • #4868: Fixes LogPatternAggFunction buffer and return behavior — closely related to the refactor delegating accumulation/result to PatternAggregationHelpers.
  • #4914: Modifies ScriptParameterHelper and script pushdown infra — overlaps with added special-variable handling and scripted-metric integration.

Suggested labels

PPL, calcite

Suggested reviewers

  • LantaoJin
  • penghuo
  • ps48
  • kavithacm
  • derek-ho
  • joshuali925
  • anirudha
  • GumpacG
  • Swiddis
  • mengweieric
🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 37.30% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'General UDAF pushdown as scripts' clearly and concisely describes the main objective of this pull request: implementing pushdown of UDAFs as scripted metric aggregations.
Description check ✅ Passed The description is directly related to the changeset, explaining the motivation for UDAF pushdown, linking to the corresponding issue, and documenting the testing and documentation status.
Linked Issues check ✅ Passed The PR implementation meets the core requirements from issue #4354: enabling UDAF pushdown via scripted metric aggregations (init/map/combine/reduce pipeline), with pattern aggregation as a primary use case and supporting infrastructure.
Out of Scope Changes check ✅ Passed All code changes are in scope with the linked issue's objectives: new UDAF scripting infrastructure, pattern aggregation helpers, registry/factory implementations, and integration tests directly support the scripted metric pushdown feature.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
@yuancu yuancu added enhancement New feature or request pushdown pushdown related issues labels Jan 23, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

🤖 Fix all issues with AI agents
In
`@common/src/main/java/org/opensearch/sql/common/patterns/PatternAggregationHelpers.java`:
- Around line 259-275: combinePatternAccumulators currently drops buffered
"logMessages" from acc1 and acc2; modify it to retrieve the "logMessages" lists
from both accumulators (cast to List<Map<String,Object>> or List<Object> as
appropriate), create a new ArrayList, addAll from acc1 then acc2 to preserve
order, and put that merged list into the result instead of a fresh empty list;
keep existing merging of "patternGroupMap" via PatternUtils.mergePatternGroups
and return the combined result so producePatternResult sees all buffered
messages.

In `@core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java`:
- Around line 3393-3417: The new private helper buildEvalAggSamplesCall (and the
INTERNAL_PATTERN_PARSER / evalAggSamples flow it invokes) lacks test coverage;
add unit or integration tests that exercise the end-to-end pattern aggregation
path: call the public API or caller that triggers buildEvalAggSamplesCall, feed
patterns with wildcards and sample_logs, and assert that
evalAggSamples/INTERNAL_PATTERN_PARSER produces numbered tokens (verify
showNumberedToken output or resulting transformed pattern and token list);
include tests for edge cases (no samples, multiple samples, complex wildcards)
and for the alternate branch where sample_logs is varchar to ensure
explicitMapType handling is covered.

In
`@core/src/main/java/org/opensearch/sql/calcite/udf/udaf/LogPatternAggFunction.java`:
- Around line 170-175: The Accumulator contract is violated:
LogPatternAggFunction.value(...) currently throws UnsupportedOperationException;
instead implement value(...) on LogParserAccumulator to return the
aggregated/pattern result (or wrapper state) so callers can obtain the final
value via value(...) just like FirstAggFunction/LastAggFunction/etc.; then
refactor LogPatternAggFunction.result() to call LogParserAccumulator.value(...)
(or delegate to PatternAggregationHelpers.producePatternResult(...) from within
the accumulator's value method) and remove the direct throw in value() so the
interface contract is honored (references: LogPatternAggFunction,
LogParserAccumulator, value(), result(),
PatternAggregationHelpers.producePatternResult()).

In
`@core/src/main/java/org/opensearch/sql/expression/function/PatternParserFunctionImpl.java`:
- Around line 138-151: The code in PatternParserFunctionImpl uses the boxed
Boolean showNumberedToken in a direct if(check) which can NPE when null; update
the conditional to a null-safe boolean test (e.g.
Boolean.TRUE.equals(showNumberedToken) as used in evalAggSamples) so the
parse/transform/extract block (ParseResult parseResult =
PatternUtils.parsePattern(...), outputPattern =
parseResult.toTokenOrderString(...), PatternUtils.extractVariables(...)) only
runs when showNumberedToken is explicitly true.

In
`@core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java`:
- Around line 490-531: The PATTERN_* UDFs are registered with null operand type
information and lack tests; replace the null operand-type arguments in the
UserDefinedFunctionUtils.adaptStaticMethodToUDF calls for PATTERN_INIT_UDF,
PATTERN_ADD_UDF, PATTERN_COMBINE_UDF and PATTERN_RESULT_UDF with explicit
operand type signatures matching the actual method parameter types in
PatternAggregationHelpers (e.g., types corresponding to initPatternState,
addLogToPattern's 6 parameters, combinePatternAccumulators,
producePatternResultFromStates), using the appropriate SqlTypeName entries and
SqlTypeUtil helpers to build array/map/any types where needed, and add unit
tests that exercise initPatternState, addLogToPattern (processing a log),
combinePatternAccumulators, and producePatternResultFromStates to validate
operand validation, runtime behavior, and result shapes.

In `@docs/user/ppl/cmd/patterns.md`:
- Around line 87-105: Update the earlier note that currently states aggregation
is "not executed on data nodes" to make it conditional on the UDAF pushdown
setting: explicitly say that when plugins.calcite.udaf_pushdown.enabled is true
(used with the patterns command when mode=aggregation and method=brain) the
aggregation may be pushed down and executed on data nodes as a scripted metric
aggregation, otherwise it runs locally on the coordinator; include a
cross-reference or short parenthetical pointing to the new "Enabling UDAF
pushdown" section for more details and the exact setting name.

In
`@opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java`:
- Around line 198-209: The array-handling path in OpenSearchExprValueFactory
routes unmapped arrays to parseArray even when supportArrays is false, and
parseArray currently calls content.array().next() which throws on empty arrays;
modify parseArray (or add a pre-check before calling it) to detect empty arrays
(e.g., !content.array().hasNext() or size==0) and short-circuit by returning an
empty array value (or ExprNullValue if project semantics prefer) instead of
attempting to read the first element; update parseArray and any callers
(referencing OpenSearchExprValueFactory and parseArray) to use this guard so
empty JSON arrays no longer cause NoSuchElementException.

In
`@opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java`:
- Around line 618-635: The INTERNAL_PATTERN branch in AggregateAnalyzer
currently throws UnsupportedOperationException when helper.udafPushdownEnabled
is false, which causes an uncaught exception to propagate; change the logic in
AggregateAnalyzer (the branch handling INTERNAL_PATTERN inside the
analyze/aggregation builder) so that when helper.udafPushdownEnabled is false it
does NOT throw but instead falls back to the non-pushdown path (i.e., skip or
bypass ScriptedMetricUDAFRegistry.INSTANCE.lookup and let normal aggregation
handling continue), so pattern aggregation degrades gracefully; update the
INTERNAL_PATTERN handling to only use
ScriptedMetricUDAFRegistry.lookup(functionName).map(...).orElseThrow(...) when
helper.udafPushdownEnabled is true.

In
`@opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricCombineScriptFactory.java`:
- Around line 26-30: Add JavaDoc to the public factory method newInstance in
CalciteScriptedMetricCombineScriptFactory: document the parameters
(Map<String,Object> params, Map<String,Object> state) with `@param` tags and
describe what each represents, and add an `@return` tag describing that it returns
a ScriptedMetricAggContexts.CombineScript (specifically a new
CalciteScriptedMetricCombineScript configured with function and outputType).
Keep the JavaDoc concise and place it immediately above the newInstance method
declaration.

In
`@opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricMapScriptFactory.java`:
- Around line 28-32: Add JavaDoc to the public method newFactory in
CalciteScriptedMetricMapScriptFactory: document each parameter with `@param` tags
for params, state, and lookup (describe their roles/types) and add an `@return`
tag that explains the returned ScriptedMetricAggContexts.MapScript.LeafFactory
(e.g., a CalciteMapScriptLeafFactory instance used to create leaf-level map
scripts). Keep the description concise and reference that the implementation
returns a new CalciteMapScriptLeafFactory constructed with function and
outputType.

In
`@opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/ScriptedMetricDataContext.java`:
- Around line 69-80: The method parseDynamicParamIndex currently calls
Integer.parseInt(name.substring(1)) without handling NumberFormatException;
update parseDynamicParamIndex to catch NumberFormatException around the parse
call (inside parseDynamicParamIndex) and rethrow an IllegalArgumentException
that includes the original parameter name (name) and a clear message like
"Malformed parameter name, expected '?N' pattern" so callers get an informative
error; keep the existing checks (startsWith("?") and the sources.size() bounds
check) and reference the methods/variables parseDynamicParamIndex and sources
when making the change.

In
`@opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/ScriptedMetricUDAF.java`:
- Around line 112-130: Add JavaDoc comments for the public accessors and the
class constructor in ScriptedMetricUDAF: document getRexBuilder(),
getParamHelper(), getCluster(), getRowType(), and getFieldTypes() with brief
descriptions and `@return` tags describing the returned type/meaning, and add a
short JavaDoc to the constructor describing its purpose and main parameters;
ensure each JavaDoc follows project style (one-line summary plus `@return` and
`@param` where applicable) and place them immediately above the corresponding
method/constructor declarations.
🧹 Nitpick comments (10)
opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java (1)

192-241: Consider splitting parse to reduce complexity.

parse(...) now exceeds 50 lines; extracting the array pre-check and type-dispatch branches into helpers would improve readability and maintainability. As per coding guidelines, consider refactoring.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/serde/ScriptParameterHelper.java (1)

41-48: Document the new SPECIAL_VARIABLE source type.

The source types 0, 1, and 2 are documented in the comment block at lines 42-48, but the new SPECIAL_VARIABLE = 3 added at line 107 is not included in that documentation. Consider updating the comment block for consistency and maintainability.

📝 Suggested documentation update
   /**
    * Records the source of each parameter, it decides which kind of source to retrieve value.
    *
    * <p>0 stands for DOC_VALUE
    *
    * <p>1 stand for SOURCE
    *
    * <p>2 stands for LITERAL
+   *
+   * <p>3 stands for SPECIAL_VARIABLE (e.g., state, states in scripted metric aggregations)
    */
   List<Integer> sources;

Also applies to: 105-110

opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/ScriptedMetricParser.java (1)

19-28: Consider using @Getter to eliminate boilerplate.

The explicit getName() method can be replaced with Lombok's @Getter annotation on the class or field, reducing boilerplate.

♻️ Suggested simplification
 `@EqualsAndHashCode`
 `@RequiredArgsConstructor`
+@Getter
 public class ScriptedMetricParser implements MetricParser {

   private final String name;
-
-  `@Override`
-  public String getName() {
-    return name;
-  }
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/ScriptedMetricDataContext.java (1)

138-158: Consider logging or documenting silent null returns for debugging.

The get() method returns null silently when sourceLookup is null (line 145) or when doc values are missing/empty (lines 149-157). While this may be intentional for optional fields, it could mask configuration issues during debugging. Consider adding debug-level logging or documenting this behavior explicitly.

opensearch/src/test/java/org/opensearch/sql/opensearch/request/AggregateAnalyzerTest.java (1)

706-707: Consider adding test coverage for udafPushdownEnabled=true scenario.

The existing tests all use udafPushdownEnabled=false. To ensure comprehensive coverage of the new UDAF pushdown feature, consider adding a test case that exercises the udafPushdownEnabled=true path with a pattern aggregation call.

Would you like me to help generate a test case for the udafPushdownEnabled=true scenario, or open an issue to track this as a follow-up task?

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLPatternsIT.java (1)

550-550: Remove debug print statements before merging.

System.out.println(result.toString()) appears to be debug code left over from development. These should be removed or replaced with proper logging if needed for debugging purposes.

-      System.out.println(result.toString());

Also applies to: 616-616

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricReduceScriptFactory.java (1)

37-38: Unused outputType field in inner class.

The outputType field is stored but never used in CalciteScriptedMetricReduceScript. This is consistent with the sibling factory classes (Combine, Map, Init), so it may be intentional for future extensibility or API consistency. Consider adding a brief comment explaining its purpose, or remove it if truly unused.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricInitScriptFactory.java (1)

49-68: Consider extracting result-to-state logic to reduce duplication.

The pattern of checking result != null && result.length > 0 and then either calling putAll for Map results or putting under "accumulator" key is duplicated across CalciteScriptedMetricInitScriptFactory and CalciteScriptedMetricMapScriptFactory. Consider extracting this into a shared utility method in a helper class.

♻️ Optional refactor example
// In a shared helper class like ScriptedMetricUtils:
public static void storeResultInState(Object[] result, Map<String, Object> state) {
    if (result != null && result.length > 0) {
        if (result[0] instanceof Map) {
            state.putAll((Map<String, Object>) result[0]);
        } else {
            state.put("accumulator", result[0]);
        }
    }
}
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/ScriptedMetricUDAFRegistry.java (1)

8-37: Consider a thread-safe registry map.
If register() can be called after startup, HashMap is not safe for concurrent access. Consider ConcurrentHashMap (or make the registry immutable after init).

♻️ Possible adjustment
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
@@
-    this.udafMap = new HashMap<>();
+    this.udafMap = new ConcurrentHashMap<>();
core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java (1)

3282-3383: Consider splitting flattenParsedPattern into smaller helpers.

This method mixes transformation logic and four projection variants in one block (~100+ LOC). Extracting helpers (pattern, count, tokens, sample_logs) would keep it under the 50-line guideline and make the aggregation vs. label paths easier to follow. As per coding guidelines, please keep methods under 50 lines.

Signed-off-by: Songkan Tang <songkant@amazon.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java (1)

234-242: Preserve the specific SPECIAL_VARIABLE error detail.
The current throw is swallowed by the generic catch, losing the intended message.

🛠️ Suggested fix
-      } catch (Exception e) {
-        throw new IllegalStateException("Failed to get value for parameter " + name);
-      }
+      } catch (IllegalStateException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new IllegalStateException("Failed to get value for parameter " + name, e);
+      }
🤖 Fix all issues with AI agents
In
`@core/src/main/java/org/opensearch/sql/expression/function/PatternParserFunctionImpl.java`:
- Around line 196-235: The new public method
PatternParserFunctionImpl.evalAggSamples lacks unit tests; add JUnit tests for
it: (1) when showNumberedToken=true supply a wildcard pattern and sampleLogs and
assert the returned Map has PatternUtils.PATTERN transformed to numbered tokens
(use PatternUtils.TOKEN_PREFIX) and PatternUtils.TOKENS contains expected token
values extracted from sampleLogs; (2) when showNumberedToken=false assert the
returned pattern equals the original wildcard pattern and tokens map is empty;
and (3) when pattern is null/blank assert the method returns EMPTY_RESULT; call
PatternParserFunctionImpl.evalAggSamples directly and use assertions on the
returned Map/objects to validate behavior.

In
`@opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java`:
- Around line 78-88: The catch in CalciteScriptEngine currently catches the
IllegalStateException raised for SPECIAL_VARIABLE and re-throws a new generic
IllegalStateException, losing the original variable name/type context; fix by
either (A) handling the SPECIAL_VARIABLE case before entering the try block in
the method that builds/creates scripts in CalciteScriptEngine (so it never gets
wrapped), or (B) preserve the original exception when propagating by re-throwing
the caught exception (throw e;) or wrapping it while including the original
message and cause (new IllegalStateException("SPECIAL_VARIABLE: " +
e.getMessage(), e)), so the SPECIAL_VARIABLE name/type details are preserved.

In
`@opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricInitScriptFactory.java`:
- Around line 24-27: Add a JavaDoc comment to the public factory method
CalciteScriptedMetricInitScriptFactory.newInstance that documents the method
purpose, the parameters (Map<String, Object> params and Map<String, Object>
state) and what the method returns (a ScriptedMetricAggContexts.InitScript,
specifically a new CalciteScriptedMetricInitScript created with the factory's
function, params, and state); place the JavaDoc immediately above the
newInstance method and ensure it follows project style (brief description,
`@param` tags for params and state, and an `@return` tag describing the InitScript
instance).

In
`@opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricReduceScriptFactory.java`:
- Around line 25-28: Add a JavaDoc comment for the public factory method
newInstance in CalciteScriptedMetricReduceScriptFactory: document the params map
and states list parameters and describe the returned
ScriptedMetricAggContexts.ReduceScript instance; reference that the method
constructs and returns a new CalciteScriptedMetricReduceScript using the
factory's function field, mention that params is a Map<String,Object> of script
parameters and states is a List<Object> of intermediate aggregation states, and
state that the method returns the constructed ReduceScript.
♻️ Duplicate comments (2)
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricCombineScriptFactory.java (1)

24-27: Public newInstance JavaDoc still missing (already flagged).

As per coding guidelines, please add JavaDoc for params/return.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricMapScriptFactory.java (1)

26-29: Public newFactory JavaDoc still missing (already flagged).

As per coding guidelines, please add JavaDoc for params/return.

Comment on lines +78 to +88
import org.opensearch.script.ScriptedMetricAggContexts;
import org.opensearch.script.StringSortScript;
import org.opensearch.search.lookup.SourceLookup;
import org.opensearch.sql.data.model.ExprTimestampValue;
import org.opensearch.sql.opensearch.storage.script.aggregation.CalciteAggregationScriptFactory;
import org.opensearch.sql.opensearch.storage.script.field.CalciteFieldScriptFactory;
import org.opensearch.sql.opensearch.storage.script.filter.CalciteFilterScriptFactory;
import org.opensearch.sql.opensearch.storage.script.scriptedmetric.CalciteScriptedMetricCombineScriptFactory;
import org.opensearch.sql.opensearch.storage.script.scriptedmetric.CalciteScriptedMetricInitScriptFactory;
import org.opensearch.sql.opensearch.storage.script.scriptedmetric.CalciteScriptedMetricMapScriptFactory;
import org.opensearch.sql.opensearch.storage.script.scriptedmetric.CalciteScriptedMetricReduceScriptFactory;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's determine the minimum supported OpenSearch version from the project
fd "pom.xml" | head -1 | xargs grep -A 5 "opensearch.version" 2>/dev/null || grep -r "opensearch.version" . --include="*.xml" --include="*.gradle" | head -5

Repository: opensearch-project/sql

Length of output: 561


🏁 Script executed:

# Find the CalciteScriptEngine.java file and examine the relevant lines
fd "CalciteScriptEngine.java" --type f

Repository: opensearch-project/sql

Length of output: 160


🌐 Web query:

ScriptedMetricAggContexts OpenSearch version introduced when first available

💡 Result:

ScriptedMetricAggContexts is available in OpenSearch starting with OpenSearch 1.0.0 (GA release on July 12, 2021), since OpenSearch 1.0 was derived from Elasticsearch 7.10.2, which already includes that scripted-metric aggregation script context. [1][2][3]

Sources:
[1] OpenSearch docs – version history (shows 1.0.0 release date) (docs.opensearch.org)
[2] OpenSearch blog – “1.0 is released!” (GA announcement; derived from Elasticsearch 7.10.2) (opensearch.org)
[3] OpenSearch blog – 1.0.0 RC1 announcement (explicitly “derived from Elasticsearch 7.10.2”) (opensearch.org)

Citations:


🏁 Script executed:

cat -n opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java | sed -n '70,145p'

Repository: opensearch-project/sql

Length of output: 4512


🏁 Script executed:

cat -n opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java | sed -n '230,275p'

Repository: opensearch-project/sql

Length of output: 2105


Exception handling masks the specific SPECIAL_VARIABLE error context.

The scripted-metric contexts (ScriptedMetricAggContexts) have been available since OpenSearch 1.0.0 and are compatible with this project (targeting 3.5.0).

However, at lines 234–242, the specific IllegalStateException thrown for SPECIAL_VARIABLE cases (line 237–238) is caught and re-thrown by the outer catch block (line 240–242) with a generic message that omits the variable name and type information. This masks the original error context and makes debugging harder.

Consider either handling the SPECIAL_VARIABLE case separately outside the try block, or re-throwing the original exception rather than wrapping it with a less informative message.

🤖 Prompt for AI Agents
In
`@opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java`
around lines 78 - 88, The catch in CalciteScriptEngine currently catches the
IllegalStateException raised for SPECIAL_VARIABLE and re-throws a new generic
IllegalStateException, losing the original variable name/type context; fix by
either (A) handling the SPECIAL_VARIABLE case before entering the try block in
the method that builds/creates scripts in CalciteScriptEngine (so it never gets
wrapped), or (B) preserve the original exception when propagating by re-throwing
the caught exception (throw e;) or wrapping it while including the original
message and cause (new IllegalStateException("SPECIAL_VARIABLE: " +
e.getMessage(), e)), so the SPECIAL_VARIABLE name/type details are preserved.

Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
yuancu
yuancu previously approved these changes Jan 23, 2026
Copy link
Collaborator

@yuancu yuancu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +175 to +181
public static final Setting<?> CALCITE_UDAF_PUSHDOWN_ENABLED_SETTING =
Setting.boolSetting(
Key.CALCITE_UDAF_PUSHDOWN_ENABLED.getKeyValue(),
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a a feature flag (cluster setting) that gates UDAF pushdown on/off at runtime? This flag will be deprecated without notice if push-down stable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's a feature flag to gates pushdown feature at runtime.

But I'm still cautious about deprecation. There is a comment left in OpenSearch ScriptedMetricAggregator: https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/search/aggregations/metrics/ScriptedMetricAggregator.java#L65-L73. It bypasses circuit breaker. It will be prudent to keep a kill switch.

"UDAF pushdown is disabled. Enable it via cluster setting"
+ " 'plugins.calcite.udaf_pushdown.enabled'");
}
yield ScriptedMetricUDAFRegistry.INSTANCE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this PR need to backport to 2.x which using JDK 11, Can we avoid yield keyword?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I heard we will no longer backport. So could we keep it here?

Comment on lines +630 to +634
.orElseThrow(
() ->
new AggregateAnalyzerException(
String.format(
"No scripted metric UDAF registered for %s", functionName)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will it happen? is it a bug in our system?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, it's not possible because we hardcoded the registry with only one supported UDAF(patterns) in this PR. It's a defensive check to provide more meaning info in case it really happens.

Comment on lines +170 to +177
ScriptParameterHelper initParamHelper =
new ScriptParameterHelper(fieldList, fieldTypes, rexBuilder);
ScriptParameterHelper mapParamHelper =
new ScriptParameterHelper(fieldList, fieldTypes, rexBuilder);
ScriptParameterHelper combineParamHelper =
new ScriptParameterHelper(fieldList, fieldTypes, rexBuilder);
ScriptParameterHelper reduceParamHelper =
new ScriptParameterHelper(fieldList, fieldTypes, rexBuilder);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There 4 parameters seem to be exactly the same. Do we really need all of them?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I get it. There is actually status in each param helper. We should clear these status if we want to reuse it, otherwise it will get messed.

It looks fine then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, different phases of UDAF script will use different RexNodes to serialize script. So, yes. They will have different status of sources, digests in ScriptParameterHelper.

new ScriptContext(rexBuilder, combineParamHelper, cluster, rowType, fieldTypes);
ScriptContext reduceContext =
new ScriptContext(rexBuilder, reduceParamHelper, cluster, rowType, fieldTypes);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Since the param helpers are the same. The ScriptContext should be the same as well.

Comment on lines +77 to +83
mapArgs.add(getArgOrDefault(args, 1, makeIntLiteral(rexBuilder, DEFAULT_MAX_SAMPLE_COUNT)));
mapArgs.add(getArgOrDefault(args, 2, makeIntLiteral(rexBuilder, DEFAULT_BUFFER_LIMIT)));
mapArgs.add(
getArgOrDefault(args, 5, makeIntLiteral(rexBuilder, DEFAULT_VARIABLE_COUNT_THRESHOLD)));
mapArgs.add(
getArgOrDefault(args, 4, makeDoubleLiteral(rexBuilder, DEFAULT_THRESHOLD_PERCENTAGE)));

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall these change to dynamic params? Otherwise, the script won't be reusable if these values differs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this part, we only define RexNode. The assignment of dynamic params for literals will be performed at serialization by RexStandardizer.standardizeRexNodeExpression.

Comment on lines +102 to +106
reduceArgs.add(getArgOrDefault(args, 1, makeIntLiteral(rexBuilder, DEFAULT_MAX_SAMPLE_COUNT)));

// Determine variableCountThreshold and thresholdPercentage
RexNode variableCountThreshold = makeIntLiteral(rexBuilder, DEFAULT_VARIABLE_COUNT_THRESHOLD);
RexNode thresholdPercentage = makeDoubleLiteral(rexBuilder, DEFAULT_THRESHOLD_PERCENTAGE);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above response.

@opensearch-trigger-bot
Copy link
Contributor

This PR is stalled because it has been open for 2 weeks with no activity.

@songkant-aws
Copy link
Contributor Author

songkant-aws commented Mar 3, 2026

Update the PR with a simple performance comparison between queries with pushdown disabled versus pushdown enabled.

The test query was executed on a three-shard index containing approximately 2.7 million rows of log messages, with profiling enabled:

curl -s -X POST http://localhost:9200/_plugins/_ppl -H 'Content-Type: application/json' -d '{"query":"source=logs-181998 | patterns request method=brain mode=aggregation","profile":true}'

Baseline(Pushdown disabled)

{
  "profile": {
    "summary": {
      "total_time_ms": 43977.34
    },
    "phases": {
      "analyze": {
        "time_ms": 4.17
      },
      "optimize": {
        "time_ms": 25.94
      },
      "execute": {
        "time_ms": 43947.1
      },
      "format": {
        "time_ms": 0.09
      }
    },
    "plan": {
      "node": "EnumerableLimit",
      "time_ms": 2.08,
      "rows": 29,
      "children": [
        {
          "node": "EnumerableCalc",
          "time_ms": 1.11,
          "rows": 29,
          "children": [
            {
              "node": "EnumerableCorrelate",
              "time_ms": 0.85,
              "rows": 29,
              "children": [
                {
                  "node": "EnumerableAggregate",
                  "time_ms": 0.0,
                  "rows": 1,
                  "children": [
                    {
                      "node": "EnumerableCalc",
                      "time_ms": 4304.11,
                      "rows": 2708746,
                      "children": [
                        {
                          "node": "CalciteEnumerableIndexScan",
                          "time_ms": 4150.36,
                          "rows": 2708746
                        }
                      ]
                    }
                  ]
                },
                {
                  "node": "EnumerableUncollect",
                  "time_ms": 0.22,
                  "rows": 29,
                  "children": [
                    {
                      "node": "EnumerableCalc",
                      "time_ms": 0.21,
                      "rows": 1,
                      "children": [
                        {
                          "node": "EnumerableValues",
                          "time_ms": 0.01,
                          "rows": 1
                        }
                      ]
                    }
                  ]
                }
              ]
            }
          ]
        }
      ]
    }
  },
  "schema": [
    {
      "name": "patterns_field",
      "type": "string"
    },
    {
      "name": "pattern_count",
      "type": "bigint"
    },
    {
      "name": "sample_logs",
      "type": "array"
    }
  ],
  "datarows": [
    [
      "GET <*> HTTP/<*>",
      2699196,
      [
        "GET /english/images/nav_tickets_off.gif HTTP/1.1",
        "GET /english/images/comp_bg2_hm.jpg HTTP/1.0",
        "GET /english/nav_inet.html HTTP/1.1",
        "GET /robots.txt HTTP/1.0",
        "GET /french/individuals/player2913.htm HTTP/1.0",
        "GET /english/splash_inet.html HTTP/1.1",
        "GET /english/images/comp_bu_stage1n.gif HTTP/1.0",
        "GET /images/arw_lk.gif HTTP/1.0",
        "GET /english/images/space.gif HTTP/1.0",
        "GET /images/s102381.gif HTTP/1.0"
      ]
    ],
    [
      "POST <*> HTTP/<*>",
      7764,
      [
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0"
      ]
    ],
    [
      "HEAD <*> HTTP/<*>",
      1537,
      [
        "HEAD /french/tickets/ang.zip HTTP/1.0",
        "HEAD /english/venues/cities/images/paris/paris_i.gif HTTP/1.0",
        "HEAD /images/home_bg_stars.gif HTTP/1.0",
        "HEAD /images/cal_mont.gif HTTP/1.0",
        "HEAD /images/s102336.gif HTTP/1.0",
        "HEAD /english/index.html HTTP/1.0",
        "HEAD /images/s140875.gif HTTP/1.0",
        "HEAD /english/news/3004bres.htm HTTP/1.0",
        "HEAD /english/history/history_of/images/cup/515txt.gif HTTP/1.0",
        "HEAD /english/venues/images/Venue_map_top_off.gif HTTP/1.0"
      ]
    ],
    [
      "GET <*> HTTP/X.X",
      182,
      [
        "GET /english/playing/download/media/screen/win_95/dance.zip HTTP/X.X",
        "GET /english/playing/download/media/screen/win_95/intro.zip HTTP/X.X",
        "GET /english/playing/download/media/screen/win_95/dance.zip HTTP/X.X",
        "GET /english/playing/download/media/screen/win_95/intro.zip HTTP/X.X",
        "GET /cnfl/ HTTP/X.X",
        "GET /english/news/><HR><H3>Transfer/ HTTP/X.X",
        "GET /english/playing/download/media/screen/win_95/dance.zip HTTP/X.X",
        "GET /english/news/><HR><H3>Transfer/ HTTP/X.X",
        "GET /english/member/body.html HTTP/X.X",
        "GET /english/playing/download/media/screen/win_95/intro.zip HTTP/X.X"
      ]
    ]
   ...

UDAF Pushdown enabled

{
  "profile": {
    "summary": {
      "total_time_ms": 16231.11
    },
    "phases": {
      "analyze": {
        "time_ms": 3.29
      },
      "optimize": {
        "time_ms": 18.57
      },
      "execute": {
        "time_ms": 16209.09
      },
      "format": {
        "time_ms": 0.1
      }
    },
    "plan": {
      "node": "EnumerableLimit",
      "time_ms": 16207.37,
      "rows": 34,
      "children": [
        {
          "node": "EnumerableCalc",
          "time_ms": 16206.31,
          "rows": 34,
          "children": [
            {
              "node": "EnumerableCorrelate",
              "time_ms": 16206.04,
              "rows": 34,
              "children": [
                {
                  "node": "CalciteEnumerableIndexScan",
                  "time_ms": 16204.15,
                  "rows": 1
                },
                {
                  "node": "EnumerableUncollect",
                  "time_ms": 0.29,
                  "rows": 34,
                  "children": [
                    {
                      "node": "EnumerableCalc",
                      "time_ms": 0.27,
                      "rows": 1,
                      "children": [
                        {
                          "node": "EnumerableValues",
                          "time_ms": 0.01,
                          "rows": 1
                        }
                      ]
                    }
                  ]
                }
              ]
            }
          ]
        }
      ]
    }
  },
  "schema": [
    {
      "name": "patterns_field",
      "type": "string"
    },
    {
      "name": "pattern_count",
      "type": "bigint"
    },
    {
      "name": "sample_logs",
      "type": "array"
    }
  ],
  "datarows": [
    [
      "GET <*> HTTP/<*>",
      2699196,
      [
        "GET /english/images/nav_tickets_off.gif HTTP/1.1",
        "GET /robots.txt HTTP/1.0",
        "GET /english/images/comp_bu_stage1n.gif HTTP/1.0",
        "GET /images/s102381.gif HTTP/1.0",
        "GET /images/s102381.gif HTTP/1.0",
        "GET /images/s102324.gif HTTP/1.1",
        "GET /images/hm_brdl.gif HTTP/1.1",
        "GET /english/tickets/images/ticket_hm_header.gif HTTP/1.0",
        "GET /images/space.gif HTTP/1.0",
        "GET /french/news/l3304.htm HTTP/1.0"
      ]
    ],
    [
      "POST <*> HTTP/<*>",
      7764,
      [
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0",
        "POST /cgi-bin/trivia/Trivia.pl HTTP/1.0"
      ]
    ],
    [
      "HEAD <*> HTTP/<*>",
      1537,
      [
        "HEAD /images/s140875.gif HTTP/1.0",
        "HEAD /english/venues/images/Venue_map_top_off.gif HTTP/1.0",
        "HEAD /english/venues/cities/images/lens/21discov.gif HTTP/1.0",
        "HEAD /english/history/images/history_hm_nav.gif HTTP/1.0",
        "HEAD /french/index.html HTTP/1.0",
        "HEAD /english/playing/images/france98b.GIF HTTP/1.0",
        "HEAD /french/images/fpnewstop.gif HTTP/1.0",
        "HEAD / HTTP/1.0",
        "HEAD /english/images/nav_sitemap_off.gif HTTP/1.0",
        "HEAD /french/news/2304tix.htm HTTP/1.0"
      ]
    ],
    [
      "GET <*> HTTP/X.X",
      176,
      [
        "GET /english/playing/download/media/screen/win_95/dance.zip HTTP/X.X",
        "GET /cnfl/ HTTP/X.X",
        "GET /english/playing/download/media/screen/win_95/dance.zip HTTP/X.X",
        "GET /english/news/><HR><H3>Transfer/ HTTP/X.X",
        "GET / HTTP/X.X",
        "GET /images/calendarvf.swf HTTP/X.X",
        "GET /english/index.html/screen/ HTTP/X.X",
        "GET /images/hobutt.gif HTTP/X.X",
        "GET /images/hobutt.gif HTTP/X.X",
        "GET /images/hobutt.gif HTTP/X.X"
      ]
    ],
    ...

Results show that the query with pushdown enabled is 2.7 times faster than without pushdown when running across three shards. The performance improvement scales with the number of shards available for parallel processing.

@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

PR Reviewer Guide 🔍

(Review updated until commit bf583fa)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: Refactor LogPatternAggFunction to use shared PatternAggregationHelpers and update CalciteRelNodeVisitor for evalAggSamples

Relevant files:

  • common/src/main/java/org/opensearch/sql/common/patterns/PatternAggregationHelpers.java
  • core/src/main/java/org/opensearch/sql/calcite/udf/udaf/LogPatternAggFunction.java
  • core/src/main/java/org/opensearch/sql/expression/function/PatternParserFunctionImpl.java
  • core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java
  • ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLPatternsTest.java

Sub-PR theme: Implement UDAF scripted metric pushdown infrastructure and PatternScriptedMetricUDAF

Relevant files:

  • opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/ScriptedMetricUDAF.java
  • opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/ScriptedMetricDataContext.java
  • opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricMapScriptFactory.java
  • opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java
  • opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/udaf/PatternScriptedMetricUDAF.java
  • core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java
  • core/src/main/java/org/opensearch/sql/calcite/utils/UserDefinedFunctionUtils.java
  • opensearch/src/test/java/org/opensearch/sql/opensearch/request/AggregateAnalyzerTest.java
  • integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLPatternsIT.java

Sub-PR theme: Fix array parsing in OpenSearchExprValueFactory for aggregation results

Relevant files:

  • opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java

⚡ Recommended focus areas for review

Thread Safety

The addLogToPattern method modifies the accumulator map in-place (adds to logMessages list, replaces patternGroupMap). If the same accumulator is accessed concurrently (e.g., in parallel stream processing), this could cause race conditions. The method is not synchronized and the underlying ArrayList and HashMap are not thread-safe.

public static Map<String, Object> addLogToPattern(
    Map<String, Object> acc,
    String logMessage,
    int maxSampleCount,
    int bufferLimit,
    int variableCountThreshold,
    double thresholdPercentage) {

  if (logMessage == null) {
    return acc;
  }

  List<String> logMessages = (List<String>) acc.get("logMessages");
  logMessages.add(logMessage);

  // Trigger partial merge when buffer reaches limit
  if (bufferLimit > 0 && logMessages.size() >= bufferLimit) {
    Map<String, Map<String, Object>> patternGroupMap =
        (Map<String, Map<String, Object>>) acc.get("patternGroupMap");

    BrainLogParser parser =
        new BrainLogParser(variableCountThreshold, (float) thresholdPercentage);
    Map<String, Map<String, Object>> partialPatterns =
        parser.parseAllLogPatterns(logMessages, maxSampleCount);

    patternGroupMap =
        PatternUtils.mergePatternGroups(patternGroupMap, partialPatterns, maxSampleCount);

    acc.put("patternGroupMap", patternGroupMap);
    logMessages.clear();
  }

  return acc;
}
Argument Index Assumption

The resolveBrainParams method assumes that optional brain parameters at args[4..5] are sorted alphabetically by the caller (frequency_threshold_percentage before variable_count_threshold). This implicit contract is fragile - if the caller changes the ordering, the parameters will be silently swapped, causing incorrect behavior. This assumption should be documented more prominently or enforced with an assertion/validation.

private static RexNode[] resolveBrainParams(List<RexNode> args, RexBuilder rexBuilder) {
  RexNode variableCountThreshold = makeIntLiteral(rexBuilder, DEFAULT_VARIABLE_COUNT_THRESHOLD);
  RexNode thresholdPercentage = makeDoubleLiteral(rexBuilder, DEFAULT_THRESHOLD_PERCENTAGE);

  if (args.size() > 5) {
    // Both present: alphabetical order means args[4]=frequency_threshold_percentage,
    // args[5]=variable_count_threshold
    thresholdPercentage = args.get(4);
    variableCountThreshold = args.get(5);
  } else if (args.size() > 4) {
    // Only one present: determine by type
    RexNode arg4 = args.get(4);
    SqlTypeName arg4Type = arg4.getType().getSqlTypeName();
    if (arg4Type == SqlTypeName.DOUBLE
        || arg4Type == SqlTypeName.DECIMAL
        || arg4Type == SqlTypeName.FLOAT) {
      thresholdPercentage = arg4;
    } else {
      variableCountThreshold = arg4;
    }
  }

  return new RexNode[] {variableCountThreshold, thresholdPercentage};
}
Stale DataContext

The CalciteScriptedMetricMapScript creates a MapContext once in the constructor and reuses it for all documents. The comment states that "doc and sourceLookup are updated internally by OpenSearch before each execute() call", but the MapContext captures the doc reference at construction time via getDoc(). If OpenSearch replaces the doc map reference (rather than updating it in-place) between documents, the DataContext will hold a stale reference.

public CalciteScriptedMetricMapScript(
    Function1<DataContext, Object[]> function,
    Map<String, Object> params,
    Map<String, Object> state,
    SearchLookup lookup,
    LeafReaderContext leafContext) {
  super(params, state, lookup, leafContext);
  this.function = function;
  // Create DataContext once and reuse for all documents in this segment.
  // OpenSearch updates doc values and source lookup internally before each execute().
  this.dataContext =
      new ScriptedMetricDataContext.MapContext(
          params, state, getDoc(), lookup.getLeafSearchLookup(leafContext).source());
}

@Override
@SuppressWarnings("unchecked")
public void execute() {
  // Execute the compiled RexNode expression (reusing the same DataContext)
  Object[] result = function.apply(dataContext);
  ScriptedMetricDataContext.mergeResultIntoState(result, (Map<String, Object>) getState());
}
Behavior Change

The new early array-check block (lines 202-209) changes the parsing behavior for all array content where fieldType is empty or supportArrays is true. This is a broader change than just supporting pattern aggregation results - it could affect other aggregation types or field types that previously relied on the original flow. The GeoPoint exclusion may not cover all special-case types that have array representations (e.g., geo_shape, dense_vector).

if (content.isArray()
    && (fieldType.isEmpty() || supportArrays)
    && !fieldType
        .map(t -> t.equals(OpenSearchDataType.of(OpenSearchDataType.MappingType.GeoPoint)))
        .orElse(false)) {
  ExprType type = fieldType.orElse(ARRAY);
  return parseArray(content, field, type, supportArrays);
}
TODO Comments

Multiple TODO comments indicate that proper operand type checking is not yet implemented for PATTERN_ADD_UDF, PATTERN_FLUSH_UDF, and PATTERN_RESULT_UDF. Without operand type checking, invalid arguments could be passed to these UDFs at query planning time without early validation, potentially causing runtime errors.

            null) // TODO: Add proper operand type checking
        .toUDF("PATTERN_ADD_UDF");

public static final SqlOperator PATTERN_FLUSH_UDF =
    UserDefinedFunctionUtils.adaptStaticMethodToUDF(
            PatternAggregationHelpers.class,
            "flushPatternAccumulator",
            ReturnTypes.explicit(SqlTypeName.ANY), // Returns Map<String, Object>
            NullPolicy.ANY,
            null) // TODO: Add proper operand type checking
        .toUDF("PATTERN_FLUSH_UDF");

public static final SqlOperator PATTERN_RESULT_UDF =
    UserDefinedFunctionUtils.adaptStaticMethodToUDF(
            PatternAggregationHelpers.class,
            "producePatternResultFromStates",
            opBinding -> {
              // Returns List<Map<String, Object>> - represented as ARRAY<ANY>
              RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
              RelDataType anyType = typeFactory.createSqlType(SqlTypeName.ANY);
              return SqlTypeUtil.createArrayType(typeFactory, anyType, true);
            },
            NullPolicy.ANY,
            null) // TODO: Add proper operand type checking

@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

Persistent review updated to latest commit a4a6e4e

@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

PR Code Suggestions ✨

Latest suggestions up to bf583fa

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Avoid mutating original state during reduction

The first state (states.get(0)) is used directly as the initial combined value and
then passed to combinePatternAccumulators, which may mutate it. This can corrupt the
original state object. Create a defensive copy or start combining from index 0
explicitly to avoid unintended side effects.

common/src/main/java/org/opensearch/sql/common/patterns/PatternAggregationHelpers.java [433-437]

-Map<String, Object> combined = (Map<String, Object>) states.get(0);
+Map<String, Object> combined = new HashMap<>((Map<String, Object>) states.get(0));
 for (int i = 1; i < states.size(); i++) {
   Map<String, Object> state = (Map<String, Object>) states.get(i);
   combined = combinePatternAccumulators(combined, state, maxSampleCount);
 }
Suggestion importance[1-10]: 5

__

Why: The first state is used directly as combined and then passed to combinePatternAccumulators, which creates a new HashMap result, so the original state is not mutated. The suggestion is valid as a defensive practice but the actual risk is low given the current implementation of combinePatternAccumulators.

Low
General
Ensure source lookup instance is correctly reused per document

The DataContext is created once in the constructor using getDoc(), which returns the
doc map reference at construction time. However, OpenSearch updates doc values per
document by updating the contents of the map object, so the reference should remain
valid. But lookup.getLeafSearchLookup(leafContext).source() creates a new
SourceLookup per call — verify that the source lookup returned here is the same
instance that gets updated per document, otherwise source-based field access will
always return the first document's data.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricMapScriptFactory.java [75-78]

+var leafSearchLookup = lookup.getLeafSearchLookup(leafContext);
 this.dataContext =
     new ScriptedMetricDataContext.MapContext(
-        params, state, getDoc(), lookup.getLeafSearchLookup(leafContext).source());
+        params, state, getDoc(), leafSearchLookup.source());
Suggestion importance[1-10]: 4

__

Why: The improved_code is functionally equivalent to the existing code — it just stores the leafSearchLookup in a local variable before calling .source(). The actual concern about whether the SourceLookup instance is updated per document is valid but the suggested fix doesn't actually address it.

Low
Reuse DataContext instance to avoid redundant allocations

The combine script creates a new DataContext on every execute() call, which includes
re-parsing SOURCES and DIGESTS from params each time. Since params don't change
between calls, the DataContext should be created once (similar to MapScript) and
reused to avoid redundant object allocation and parsing overhead.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricCombineScriptFactory.java [45-56]

+private final DataContext dataContext;
+
+public CalciteScriptedMetricCombineScript(
+    Function1<DataContext, Object[]> function,
+    Map<String, Object> params,
+    Map<String, Object> state) {
+  super(params, state);
+  this.function = function;
+  this.dataContext = new ScriptedMetricDataContext.CombineContext(params, state);
+}
+
 @Override
 public Object execute() {
-  // Create data context for combine script
-  @SuppressWarnings("unchecked")
-  Map<String, Object> state = (Map<String, Object>) getState();
-  DataContext dataContext = new ScriptedMetricDataContext.CombineContext(getParams(), state);
-
-  // Execute the compiled RexNode expression
   Object[] result = function.apply(dataContext);
-
-  // Return the combined result
   return (result != null && result.length > 0) ? result[0] : getState();
 }
Suggestion importance[1-10]: 4

__

Why: The combine script's execute() is typically called once per shard, so the performance benefit of caching the DataContext is minimal. The suggestion is valid for consistency with MapScript but has low practical impact.

Low
Clarify which circuit breakers are bypassed

The warning mentions "certain memory circuit breakers" without specifying which ones
are bypassed. This could be misleading to users who need to understand the exact
risk. Consider clarifying which specific circuit breakers are bypassed (e.g., the
request circuit breaker or the field data circuit breaker) to help users make an
informed decision.

docs/user/ppl/cmd/patterns.md [102]

-> **Warning**: Enabling UDAF pushdown executes user-defined aggregation functions as scripted metric aggregations on OpenSearch data nodes. This bypasses certain memory circuit breakers and may cause out-of-memory errors on nodes when processing very large datasets. Use with caution and monitor cluster resource usage.
+> **Warning**: Enabling UDAF pushdown executes user-defined aggregation functions as scripted metric aggregations on OpenSearch data nodes. Scripted metric aggregations bypass the request and field data memory circuit breakers, which may cause out-of-memory errors on data nodes when processing very large datasets. Use with caution and monitor cluster memory usage closely.
Suggestion importance[1-10]: 4

__

Why: The suggestion improves documentation clarity by specifying which circuit breakers are bypassed, but it's a minor documentation enhancement. The current text is not incorrect, just less specific.

Low
Validate list element types to prevent downstream ClassCastException

When result is an empty List, the code correctly wraps it. However, if the reduce
script returns a non-List, non-null value (e.g., a Map or scalar), the exception
message says "Expected List<Map<String, Object>>" but the actual contract should
also handle the case where result is a Map (e.g., a single result). More
importantly, the unchecked cast to List<Map<String, Object>> is implicit and could
cause a ClassCastException downstream — consider validating the list element types.

opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/ScriptedMetricParser.java [43-49]

-if (result instanceof List) {
+if (result instanceof List<?> list) {
+  // Validate that list contains maps (if non-empty)
+  if (!list.isEmpty() && !(list.get(0) instanceof Map)) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Expected List<Map<String, Object>> from scripted metric but list contains %s",
+            list.get(0).getClass().getSimpleName()));
+  }
   return List.of(Map.of(name, result));
 }
 throw new IllegalArgumentException(
     String.format(
         "Expected List<Map<String, Object>> from scripted metric but got %s",
         result.getClass().getSimpleName()));
Suggestion importance[1-10]: 3

__

Why: The validation adds a runtime check for list element types, which is a defensive measure. However, since the reduce script is controlled code that always returns List<Map<String, Object>>, the risk of a ClassCastException is low in practice.

Low
Fix incorrect code block language hint

The code block uses bash ignore as the language hint, but the content is a REST API
call (JSON body with HTTP method), not a bash script. This is inconsistent with the
earlier cluster settings example in the same file which uses json. Using json would
provide proper syntax highlighting for the JSON body.

docs/user/ppl/cmd/patterns.md [93-100]

-```bash ignore
+```json
 PUT _cluster/settings
 {
   "persistent": {
     "plugins.calcite.udaf_pushdown.enabled": true
   }
 }
<details><summary>Suggestion importance[1-10]: 3</summary>

__

Why: The `bash ignore` language hint is inconsistent with the REST API content, but this is a minor style issue. The earlier example in the same file uses `json`, so consistency would be improved by changing to `json`.


</details></details></td><td align=center>Low

</td></tr></tr></tbody></table>

___

#### Previous suggestions


<details><summary>Suggestions up to commit d815a87</summary>
<br><table><thead><tr><td><strong>Category</strong></td><td align=left><strong>Suggestion&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </strong></td><td align=center><strong>Impact</strong></td></tr><tbody><tr><td rowspan=4>Possible issue</td>
<td>



<details><summary>Restrict early array detection to supportArrays context</summary>

___


**The new early array-detection block uses <code>(fieldType.isEmpty() || supportArrays)</code> as <br>its condition, but the original code path only entered <code>parseArray</code> when <code>supportArrays</code> <br>was true or the type was an OpenSearch ARRAY type. This change may cause array <br>parsing to be triggered for fields with a known non-array type (e.g., a keyword <br>field that happens to contain a JSON array due to multi-value), potentially breaking <br>existing behavior for those fields.**

[opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java [202-209]](https://github.com/opensearch-project/sql/pull/5064/files#diff-fef3b03341db11f5fc8b5f1f1470de8be5555a22f43d945dea3bdc25b68f3f48R202-R209)

```diff
 if (content.isArray()
-    && (fieldType.isEmpty() || supportArrays)
+    && supportArrays
     && !fieldType
         .map(t -> t.equals(OpenSearchDataType.of(OpenSearchDataType.MappingType.GeoPoint)))
         .orElse(false)) {
   ExprType type = fieldType.orElse(ARRAY);
   return parseArray(content, field, type, supportArrays);
 }
Suggestion importance[1-10]: 7

__

Why: The new early array-detection block with (fieldType.isEmpty() || supportArrays) could trigger array parsing for fields with known non-array types, potentially breaking existing behavior. The suggestion to restrict to supportArrays only is more conservative and safer, though it may not handle the fieldType.isEmpty() case that was intentionally added for unmapped fields in aggregation results.

Medium
Add null checks for accumulator arguments

If either acc1 or acc2 is null, the method will throw a NullPointerException. Add
null checks for both accumulators at the start of the method to handle cases where a
shard returns no data.

common/src/main/java/org/opensearch/sql/common/patterns/PatternAggregationHelpers.java [258-264]

 public static Map<String, Object> combinePatternAccumulators(
     Map<String, Object> acc1, Map<String, Object> acc2, int maxSampleCount) {
+  if (acc1 == null) return acc2 != null ? acc2 : new HashMap<>();
+  if (acc2 == null) return acc1;
 
   Map<String, Map<String, Object>> patterns1 =
       (Map<String, Map<String, Object>>) acc1.get("patternGroupMap");
   Map<String, Map<String, Object>> patterns2 =
       (Map<String, Map<String, Object>>) acc2.get("patternGroupMap");
Suggestion importance[1-10]: 6

__

Why: The combinePatternAccumulators method could throw a NullPointerException if either acc1 or acc2 is null, which is a valid scenario when a shard returns no data. Adding null guards is a reasonable defensive improvement.

Low
Verify cached DataContext doc reference safety

The dataContext is created once in the constructor with getDoc(), but getDoc()
returns the doc map reference at construction time. If OpenSearch replaces the doc
map reference (rather than updating it in-place) between documents, the cached
dataContext will hold a stale reference. Verify that OpenSearch always updates the
same map instance in-place; if not, the DataContext must be recreated per document.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricMapScriptFactory.java [75-78]

+// If OpenSearch updates doc values in-place (same map reference), this is safe.
+// Otherwise, override execute() to create a new MapContext per document.
 this.dataContext =
     new ScriptedMetricDataContext.MapContext(
         params, state, getDoc(), lookup.getLeafSearchLookup(leafContext).source());
Suggestion importance[1-10]: 5

__

Why: The suggestion raises a valid concern about whether getDoc() returns the same map instance updated in-place between documents. However, the improved code only adds a comment without actually fixing the potential issue, making it a verification suggestion rather than a concrete fix.

Low
Fix fragile type-based argument disambiguation

The type-based heuristic to distinguish thresholdPercentage from
variableCountThreshold when args.size() == 5 is fragile. If a user passes an
integer-typed threshold percentage or a decimal-typed variable count threshold, the
arguments will be silently swapped, producing incorrect results. The argument
positions should be fixed and documented rather than inferred from types.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/udaf/PatternScriptedMetricUDAF.java [126-139]

+// args[4] = thresholdPercentage, args[5] = variableCountThreshold (fixed positions)
 if (args.size() > 5) {
   thresholdPercentage = args.get(4);
   variableCountThreshold = args.get(5);
 } else if (args.size() > 4) {
-  RexNode arg4 = args.get(4);
-  SqlTypeName arg4Type = arg4.getType().getSqlTypeName();
-  if (arg4Type == SqlTypeName.DOUBLE
-      || arg4Type == SqlTypeName.DECIMAL
-      || arg4Type == SqlTypeName.FLOAT) {
-    thresholdPercentage = arg4;
-  } else {
-    variableCountThreshold = arg4;
-  }
+  thresholdPercentage = args.get(4);
 }
Suggestion importance[1-10]: 5

__

Why: The type-based heuristic to distinguish thresholdPercentage from variableCountThreshold is indeed fragile and could silently produce incorrect results. However, the improved code changes the semantics when args.size() == 5 (only sets thresholdPercentage, not variableCountThreshold), which may not be the intended fix and could break the existing logic.

Low
General
Clarify which circuit breakers are bypassed

The warning mentions "certain memory circuit breakers" but doesn't specify which
ones are bypassed. Users need to understand the specific risks to make an informed
decision. Consider clarifying which circuit breakers are bypassed (e.g., the
parent/request circuit breakers) and whether there are any recommended safeguards or
limits to set before enabling this feature.

docs/user/ppl/cmd/patterns.md [102]

-> **Warning**: Enabling UDAF pushdown executes user-defined aggregation functions as scripted metric aggregations on OpenSearch data nodes. This bypasses certain memory circuit breakers and may cause out-of-memory errors on nodes when processing very large datasets. Use with caution and monitor cluster resource usage.
+> **Warning**: Enabling UDAF pushdown executes user-defined aggregation functions as scripted metric aggregations on OpenSearch data nodes. Scripted metric aggregations bypass the request and parent memory circuit breakers, which may cause out-of-memory errors on data nodes when processing very large datasets. Consider setting appropriate JVM heap sizes and monitoring node memory usage before enabling this feature in production. Use with caution.
Suggestion importance[1-10]: 4

__

Why: The suggestion improves documentation clarity by specifying which circuit breakers are bypassed, but this is a documentation enhancement rather than a critical fix. The existing warning is already informative enough for most users.

Low
Fix inconsistent code block language tag

The code block uses bash ignore as the language hint, which is inconsistent with the
existing cluster settings example earlier in the document that uses just bash or
json. Using ignore may prevent syntax highlighting in some renderers. Consider using
a consistent language tag like json since the content is a JSON body for a REST API
call.

docs/user/ppl/cmd/patterns.md [93-100]

-```bash ignore
+```json
 PUT _cluster/settings
 {
   "persistent": {
     "plugins.calcite.udaf_pushdown.enabled": true
   }
 }
<details><summary>Suggestion importance[1-10]: 3</summary>

__

Why: The `bash ignore` language tag is inconsistent with other code blocks in the document and may affect syntax highlighting. However, looking at the existing code block at line 83 which uses `bash ignore` as well, this appears to be an intentional pattern in this document, reducing the importance of this suggestion.


</details></details></td><td align=center>Low

</td></tr></tr></tbody></table>

</details>
<details><summary>Suggestions up to commit 0dece50</summary>
<br><table><thead><tr><td><strong>Category</strong></td><td align=left><strong>Suggestion&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </strong></td><td align=center><strong>Impact</strong></td></tr><tbody><tr><td rowspan=3>Possible issue</td>
<td>



<details><summary>Fix overly broad early array-handling condition</summary>

___


**The new early array-check condition uses <code>fieldType.isEmpty() || supportArrays</code>, which <br>means it will intercept arrays even when <code>supportArrays</code> is false and <code>fieldType</code> is <br>present (non-empty). This changes existing behavior for known-typed fields when <br><code>supportArrays</code> is false, potentially bypassing the original single-value extraction <br>path. The condition should be <code>fieldType.isEmpty() && !supportArrays</code> or restructured <br>to only apply when <code>fieldType</code> is absent.**

[opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java [202-209]](https://github.com/opensearch-project/sql/pull/5064/files#diff-fef3b03341db11f5fc8b5f1f1470de8be5555a22f43d945dea3bdc25b68f3f48R202-R209)

```diff
+// Only apply early array handling when field type is unknown (not in mapping)
 if (content.isArray()
-    && (fieldType.isEmpty() || supportArrays)
+    && fieldType.isEmpty()
     && !fieldType
         .map(t -> t.equals(OpenSearchDataType.of(OpenSearchDataType.MappingType.GeoPoint)))
         .orElse(false)) {
-  ExprType type = fieldType.orElse(ARRAY);
+  ExprType type = ARRAY;
   return parseArray(content, field, type, supportArrays);
 }
Suggestion importance[1-10]: 7

__

Why: The condition fieldType.isEmpty() || supportArrays could intercept arrays for known-typed fields when supportArrays is false, potentially changing existing behavior. This is a legitimate concern about the correctness of the new early array-check path, though the suggested fix may be too restrictive by removing the supportArrays branch entirely.

Medium
Guard against null pattern maps before merging

If either patterns1 or patterns2 is null (e.g., when acc1 or acc2 was never
populated with a patternGroupMap), passing null to PatternUtils.mergePatternGroups
may cause a NullPointerException. Add null-checks or default to empty maps before
merging.

common/src/main/java/org/opensearch/sql/common/patterns/PatternAggregationHelpers.java [262-268]

 public static Map<String, Object> combinePatternAccumulators(
     Map<String, Object> acc1, Map<String, Object> acc2, int maxSampleCount) {
 
   Map<String, Map<String, Object>> patterns1 =
-      (Map<String, Map<String, Object>>) acc1.get("patternGroupMap");
+      acc1.get("patternGroupMap") != null
+          ? (Map<String, Map<String, Object>>) acc1.get("patternGroupMap")
+          : new HashMap<>();
   Map<String, Map<String, Object>> patterns2 =
-      (Map<String, Map<String, Object>>) acc2.get("patternGroupMap");
+      acc2.get("patternGroupMap") != null
+          ? (Map<String, Map<String, Object>>) acc2.get("patternGroupMap")
+          : new HashMap<>();
 
   Map<String, Map<String, Object>> merged =
       PatternUtils.mergePatternGroups(patterns1, patterns2, maxSampleCount);
Suggestion importance[1-10]: 6

__

Why: If acc1 or acc2 was never populated with a patternGroupMap, passing null to PatternUtils.mergePatternGroups could cause a NullPointerException. The fix adds null-safety, though in practice the accumulators are always initialized via initPatternAccumulator() which sets an empty map.

Low
Verify doc values reference is live across documents

The dataContext is created once in the constructor using getDoc(), but OpenSearch
updates the doc values reference per document. If getDoc() returns a snapshot rather
than a live reference, the dataContext will read stale doc values for subsequent
documents. Verify that getDoc() returns a live/mutable reference that OpenSearch
updates in-place; if not, the DataContext must be recreated per execute() call.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricMapScriptFactory.java [75-78]

+// If getDoc() does not return a live reference updated by OpenSearch per document,
+// create the DataContext per execute() call instead:
 this.dataContext =
     new ScriptedMetricDataContext.MapContext(
         params, state, getDoc(), lookup.getLeafSearchLookup(leafContext).source());
+// NOTE: Confirm getDoc() is a live reference; otherwise move DataContext creation to execute()
Suggestion importance[1-10]: 5

__

Why: The concern about getDoc() returning a stale snapshot is valid, but the suggestion only asks to verify behavior rather than providing a concrete fix. The improved_code is essentially the same as existing_code with a comment added.

Low
General
Use fixed argument positions instead of type-based disambiguation

The type-based disambiguation for arg4 when args.size() == 5 is fragile: if a user
passes an integer literal for thresholdPercentage or a decimal for
variableCountThreshold, the wrong parameter will be assigned. The argument positions
should be fixed and documented rather than inferred from runtime types, matching the
positional convention used in buildMapScript and buildCombineScript
(args[4]=thresholdPercentage, args[5]=variableCountThreshold).

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/udaf/PatternScriptedMetricUDAF.java [127-140]

+// Use fixed positional convention: args[4]=thresholdPercentage, args[5]=variableCountThreshold
 if (args.size() > 5) {
   thresholdPercentage = args.get(4);
   variableCountThreshold = args.get(5);
 } else if (args.size() > 4) {
-  RexNode arg4 = args.get(4);
-  SqlTypeName arg4Type = arg4.getType().getSqlTypeName();
-  if (arg4Type == SqlTypeName.DOUBLE
-      || arg4Type == SqlTypeName.DECIMAL
-      || arg4Type == SqlTypeName.FLOAT) {
-    thresholdPercentage = arg4;
-  } else {
-    variableCountThreshold = arg4;
-  }
+  thresholdPercentage = args.get(4);
 }
Suggestion importance[1-10]: 6

__

Why: The type-based disambiguation for arg4 is fragile and inconsistent with the positional convention used in buildMapScript and buildCombineScript. Using fixed positions is more robust and maintainable, though the improved code drops variableCountThreshold assignment when only 5 args are present.

Low
Clarify which circuit breakers are bypassed

The warning mentions "certain memory circuit breakers" without specifying which ones
are bypassed. This could leave users without enough information to assess the risk.
Consider clarifying which specific circuit breakers are bypassed (e.g., the
parent/field data circuit breakers) so users can make an informed decision.

docs/user/ppl/cmd/patterns.md [102]

-> **Warning**: Enabling UDAF pushdown executes user-defined aggregation functions as scripted metric aggregations on OpenSearch data nodes. This bypasses certain memory circuit breakers and may cause out-of-memory errors on nodes when processing very large datasets. Use with caution and monitor cluster resource usage.
+> **Warning**: Enabling UDAF pushdown executes user-defined aggregation functions as scripted metric aggregations on OpenSearch data nodes. Scripted metric aggregations bypass the parent and field data memory circuit breakers, which may cause out-of-memory errors on data nodes when processing very large datasets. Use with caution and monitor cluster memory usage closely.
Suggestion importance[1-10]: 4

__

Why: The suggestion improves documentation clarity by specifying which circuit breakers are bypassed, helping users make more informed decisions. However, it's a documentation improvement with moderate impact, and the specific circuit breakers mentioned (parent and field data) may not be fully accurate without verification.

Low
Suggestions up to commit a4a6e4e
CategorySuggestion                                                                                                                                    Impact
General
Handle null aggregation result gracefully

When result is null, the instanceof List check returns false and the code falls
through to throw an exception with a null-safe message. However, an empty result
(null) from a scripted metric aggregation on an empty index is a valid scenario and
should return an empty list rather than throwing an exception. Handle the null case
explicitly to avoid unexpected failures on empty indices.

opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/ScriptedMetricParser.java [40-47]

+if (result == null) {
+  return List.of();
+}
 if (result instanceof List) {
   return List.of(Map.of(name, result));
 }
 throw new IllegalArgumentException(
     String.format(
         "Expected List<Map<String, Object>> from scripted metric but got %s",
-        result == null ? "null" : result.getClass().getSimpleName()));
+        result.getClass().getSimpleName()));
Suggestion importance[1-10]: 6

__

Why: A null result from a scripted metric aggregation on an empty index is a realistic scenario, and throwing an exception in that case would cause unexpected failures. Returning an empty list for null results is a more robust approach and prevents runtime errors on empty indices.

Low
Replace magic number with enum constant reference

The magic number 3 is used directly instead of referencing the
Source.SPECIAL_VARIABLE.getValue() enum constant. This creates a maintenance risk —
if the enum values change, this code will silently break. Use the enum value
directly to keep the code consistent and safe.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/serde/ScriptParameterHelper.java [108-113]

 public int addSpecialVariable(String variableName) {
   int index = sources.size();
-  sources.add(3); // SPECIAL_VARIABLE = 3
+  sources.add(CalciteScriptEngine.Source.SPECIAL_VARIABLE.getValue());
   digests.add(variableName);
   return index;
 }
Suggestion importance[1-10]: 5

__

Why: Using the magic number 3 instead of CalciteScriptEngine.Source.SPECIAL_VARIABLE.getValue() creates a maintenance risk if enum values change. This is a valid maintainability concern, though the risk is low since the enum is defined in the same codebase.

Low
Avoid mutating input state during reduce phase

The first state (states.get(0)) is directly cast and used as the initial combined
accumulator without copying. Since combinePatternAccumulators creates a new map for
subsequent merges, the first state's logMessages and patternGroupMap are passed into
the first merge but the original state object is not mutated. However, if
states.get(0) is the same object reference used elsewhere (e.g., by OpenSearch),
mutating it could cause issues. More critically, the buffered logMessages in
states.get(0) are included in the combine but never processed through the parser
before the final producePatternResult call, which handles remaining logs correctly —
this is fine. The logic is safe as written, but consider using
initPatternAccumulator() as the starting point and iterating all states for clarity
and safety.

common/src/main/java/org/opensearch/sql/common/patterns/PatternAggregationHelpers.java [460-465]

-// Combine all states into a single accumulator
-Map<String, Object> combined = (Map<String, Object>) states.get(0);
-for (int i = 1; i < states.size(); i++) {
-  Map<String, Object> state = (Map<String, Object>) states.get(i);
+// Combine all states into a single accumulator starting from an empty one
+Map<String, Object> combined = initPatternAccumulator();
+for (Object stateObj : states) {
+  Map<String, Object> state = (Map<String, Object>) stateObj;
   combined = combinePatternAccumulators(combined, state, maxSampleCount);
 }
Suggestion importance[1-10]: 4

__

Why: Starting from an empty accumulator and iterating all states is cleaner and avoids any risk of aliasing the first state object. However, since combinePatternAccumulators creates a new map for the result, the original states.get(0) is not mutated, making this primarily a readability/safety improvement rather than a bug fix.

Low
Clarify which circuit breakers are bypassed

The warning mentions "certain memory circuit breakers" without specifying which ones
are bypassed. This could leave users without enough information to make an informed
decision. Consider clarifying which specific circuit breakers are bypassed (e.g.,
the parent/request circuit breakers) so users understand the exact risk.

docs/user/ppl/cmd/patterns.md [102]

-> **Warning**: Enabling UDAF pushdown executes user-defined aggregation functions as scripted metric aggregations on OpenSearch data nodes. This bypasses certain memory circuit breakers and may cause out-of-memory errors on nodes when processing very large datasets. Use with caution and monitor cluster resource usage.
+> **Warning**: Enabling UDAF pushdown executes user-defined aggregation functions as scripted metric aggregations on OpenSearch data nodes. This bypasses the request and parent memory circuit breakers (which normally protect against excessive memory usage during query execution) and may cause out-of-memory errors on nodes when processing very large datasets. Use with caution and monitor cluster resource usage.
Suggestion importance[1-10]: 4

__

Why: The suggestion improves clarity by specifying which circuit breakers are bypassed, but this is a minor documentation enhancement. The exact circuit breakers bypassed may depend on implementation details that the PR author would need to verify.

Low
Fix inconsistent code block language hint

The code block uses bash ignore as the language hint, which is inconsistent with the
other cluster settings examples in the same file that use bash. Using ignore will
prevent syntax highlighting and may confuse readers or tooling. It should be changed
to match the existing style.

docs/user/ppl/cmd/patterns.md [93-100]

-```bash ignore
+```bash
 PUT _cluster/settings
 {
   "persistent": {
     "plugins.calcite.udaf_pushdown.enabled": true
   }
 }
<details><summary>Suggestion importance[1-10]: 3</summary>

__

Why: The `bash ignore` language hint is inconsistent with other code blocks in the file that use `bash`. This is a minor style/consistency issue that affects syntax highlighting but not functionality.


</details></details></td><td align=center>Low

</td></tr><tr><td rowspan=1>Possible issue</td>
<td>



<details><summary>Ensure doc values are fresh per document execution</summary>

___


**The <code>MapContext</code> is created once in the constructor using <code>getDoc()</code>, which returns the <br>doc map at construction time. However, OpenSearch updates the doc values internally <br>before each <code>execute()</code> call by updating the same map reference. If <code>getDoc()</code> returns a <br>snapshot rather than the live reference, doc values will be stale for all documents <br>after the first. Verify that <code>getDoc()</code> returns the same mutable map reference that <br>OpenSearch updates per document, otherwise the DataContext must be recreated per <br><code>execute()</code> call.**

[opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricMapScriptFactory.java [73-78]](https://github.com/opensearch-project/sql/pull/5064/files#diff-71e9a88d7cdaf170b68b083346df8ec904a1c21ea50643c66d8a03f405e7e0d1R73-R78)

```diff
-this.dataContext =
-    new ScriptedMetricDataContext.MapContext(
-        params, state, getDoc(), lookup.getLeafSearchLookup(leafContext).source());
+@Override
+@SuppressWarnings("unchecked")
+public void execute() {
+  // Recreate DataContext per document to ensure fresh doc values and source lookup
+  DataContext ctx = new ScriptedMetricDataContext.MapContext(
+      getParams(), (Map<String, Object>) getState(), getDoc(),
+      lookup.getLeafSearchLookup(leafContext).source());
+  Object[] result = function.apply(ctx);
+  ScriptedMetricDataContext.mergeResultIntoState(result, (Map<String, Object>) getState());
+}
Suggestion importance[1-10]: 5

__

Why: The concern about stale doc values is valid in principle, but the code comment explicitly explains that doc and sourceLookup are updated internally by OpenSearch before each execute() call, meaning the same map reference is mutated in-place. The suggestion to recreate the context per document would add overhead without necessarily fixing a real bug, and the existing approach is a deliberate optimization.

Low
Suggestions up to commit a4a6e4e
CategorySuggestion                                                                                                                                    Impact
Possible issue
Flush buffered messages before combining accumulators

The combinePatternAccumulators method does not process the buffered logMessages from
either accumulator before merging patternGroupMap. If either accumulator has
unprocessed messages in its buffer, those messages will be silently dropped during
the combine phase, leading to incorrect pattern counts and missing data in the final
result.
Before merging the pattern group maps, the buffered log messages from both
accumulators should be parsed and merged into their respective patternGroupMap
entries first.

common/src/main/java/org/opensearch/sql/common/patterns/PatternAggregationHelpers.java [260-269]

+@SuppressWarnings("unchecked")
 public static Map<String, Object> combinePatternAccumulators(
     Map<String, Object> acc1, Map<String, Object> acc2, int maxSampleCount) {
 
+  // Flush buffered messages from acc1 before combining
+  List<String> logMessages1 = (List<String>) acc1.get("logMessages");
   Map<String, Map<String, Object>> patterns1 =
       (Map<String, Map<String, Object>>) acc1.get("patternGroupMap");
+  if (logMessages1 != null && !logMessages1.isEmpty()) {
+    BrainLogParser parser = new BrainLogParser(
+        BrainLogParser.DEFAULT_VARIABLE_COUNT_THRESHOLD,
+        (float) BrainLogParser.DEFAULT_FREQUENCY_THRESHOLD_PERCENTAGE);
+    patterns1 = PatternUtils.mergePatternGroups(
+        patterns1, parser.parseAllLogPatterns(logMessages1, maxSampleCount), maxSampleCount);
+    logMessages1.clear();
+  }
+
+  // Flush buffered messages from acc2 before combining
+  List<String> logMessages2 = (List<String>) acc2.get("logMessages");
   Map<String, Map<String, Object>> patterns2 =
       (Map<String, Map<String, Object>>) acc2.get("patternGroupMap");
+  if (logMessages2 != null && !logMessages2.isEmpty()) {
+    BrainLogParser parser = new BrainLogParser(
+        BrainLogParser.DEFAULT_VARIABLE_COUNT_THRESHOLD,
+        (float) BrainLogParser.DEFAULT_FREQUENCY_THRESHOLD_PERCENTAGE);
+    patterns2 = PatternUtils.mergePatternGroups(
+        patterns2, parser.parseAllLogPatterns(logMessages2, maxSampleCount), maxSampleCount);
+    logMessages2.clear();
+  }
 
   Map<String, Map<String, Object>> merged =
       PatternUtils.mergePatternGroups(patterns1, patterns2, maxSampleCount);
Suggestion importance[1-10]: 7

__

Why: The combinePatternAccumulators method merges patternGroupMap entries but ignores buffered logMessages in both accumulators. However, looking at the code more carefully, the combine phase in producePatternResultFromStates calls combinePatternAccumulators which merges the logMessages lists (lines 272-280), and then producePatternResult flushes remaining logMessages. So the buffered messages are not dropped but carried forward. The suggestion has some merit but the current design intentionally defers flushing to the reduce phase.

Medium
Handle null scripted metric result gracefully

When result is null (e.g., no documents matched or all shards returned null), the
code throws an IllegalArgumentException instead of returning an empty result. A null
result from a scripted metric aggregation is a valid scenario (e.g., empty index or
no matching documents) and should be handled gracefully by returning an empty list.

opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/ScriptedMetricParser.java [40-46]

 if (result instanceof List) {
     return List.of(Map.of(name, result));
+  }
+  if (result == null) {
+    return List.of();
   }
   throw new IllegalArgumentException(
       String.format(
           "Expected List<Map<String, Object>> from scripted metric but got %s",
-          result == null ? "null" : result.getClass().getSimpleName()));
+          result.getClass().getSimpleName()));
Suggestion importance[1-10]: 6

__

Why: A null result from a scripted metric aggregation (e.g., empty index) would cause an IllegalArgumentException instead of returning an empty list. This is a valid edge case that should be handled gracefully to avoid unexpected errors in production.

Low
Ensure per-document source lookup is correctly advanced

The DataContext is created once in the constructor using getDoc() and
lookup.getLeafSearchLookup(leafContext).source(). However, getDoc() returns a
reference to the doc map that OpenSearch updates in-place before each execute()
call, but lookup.getLeafSearchLookup(leafContext).source() returns a SourceLookup
instance that may need to be advanced per document. If the SourceLookup is not the
same instance that OpenSearch advances, source field access will return stale data
for all documents after the first.
Verify that the SourceLookup obtained via
lookup.getLeafSearchLookup(leafContext).source() is the same instance that
OpenSearch advances per document, or obtain it lazily per execute() call to ensure
correctness.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/CalciteScriptedMetricMapScriptFactory.java [65-78]

 public CalciteScriptedMetricMapScript(
     Function1<DataContext, Object[]> function,
     Map<String, Object> params,
     Map<String, Object> state,
     SearchLookup lookup,
     LeafReaderContext leafContext) {
   super(params, state, lookup, leafContext);
   this.function = function;
-  // Create DataContext once and reuse for all documents in this segment.
-  // OpenSearch updates doc values and source lookup internally before each execute().
+  // Use the SourceLookup from the leaf search lookup - OpenSearch advances this per document.
+  // getDoc() returns the same map reference that OpenSearch updates in-place per document.
   this.dataContext =
       new ScriptedMetricDataContext.MapContext(
-          params, state, getDoc(), lookup.getLeafSearchLookup(leafContext).source());
+          params, state, getDoc(), getLeafLookup().source());
 }
Suggestion importance[1-10]: 5

__

Why: The concern about SourceLookup staleness is valid in principle, but the comment in the code already acknowledges that OpenSearch updates doc values and source lookup internally. The suggestion to use getLeafLookup().source() may not be a valid API call on ScriptedMetricAggContexts.MapScript. The issue needs verification but the improved code may not compile correctly.

Low
Fix inconsistent argument index mapping between script phases

The argument index mapping in buildReduceScript is inconsistent with buildMapScript.
In buildMapScript, args[4] is thresholdPercentage and args[5] is
variableCountThreshold, but in buildReduceScript when args.size() > 5, args.get(4)
is assigned to thresholdPercentage and args.get(5) to variableCountThreshold. The
type-based disambiguation for args.size() == 5 is fragile and error-prone. The
argument ordering should be consistent between map and reduce scripts.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/udaf/PatternScriptedMetricUDAF.java [108-121]

 if (args.size() > 5) {
   thresholdPercentage = args.get(4);
   variableCountThreshold = args.get(5);
 } else if (args.size() > 4) {
-  RexNode arg4 = args.get(4);
-  SqlTypeName arg4Type = arg4.getType().getSqlTypeName();
-  if (arg4Type == SqlTypeName.DOUBLE
-      || arg4Type == SqlTypeName.DECIMAL
-      || arg4Type == SqlTypeName.FLOAT) {
-    thresholdPercentage = arg4;
-  } else {
-    variableCountThreshold = arg4;
-  }
+  // args[4] = thresholdPercentage (consistent with buildMapScript ordering)
+  thresholdPercentage = args.get(4);
 }
Suggestion importance[1-10]: 4

__

Why: The type-based disambiguation for args.size() == 5 is fragile, but the improved code removes the variableCountThreshold assignment for the 5-argument case entirely, which changes behavior. The existing code at least attempts to handle both cases, while the suggestion silently drops variableCountThreshold when only 5 args are present.

Low
General
Clarify which circuit breakers are bypassed

The warning mentions "certain memory circuit breakers" without specifying which ones
are bypassed. This is important security/operational information that users need to
make an informed decision. Consider clarifying which specific circuit breakers are
bypassed (e.g., the parent/field data circuit breakers) so users understand the
exact risk.

docs/user/ppl/cmd/patterns.md [102]

-> **Warning**: Enabling UDAF pushdown executes user-defined aggregation functions as scripted metric aggregations on OpenSearch data nodes. This bypasses certain memory circuit breakers and may cause out-of-memory errors on nodes when processing very large datasets. Use with caution and monitor cluster resource usage.
+> **Warning**: Enabling UDAF pushdown executes user-defined aggregation functions as scripted metric aggregations on OpenSearch data nodes. Scripted metric aggregations bypass the parent and field data memory circuit breakers, which may cause out-of-memory errors on data nodes when processing very large datasets. Use with caution and monitor cluster heap usage closely.
Suggestion importance[1-10]: 4

__

Why: The suggestion improves documentation clarity by specifying which circuit breakers are bypassed, but this is a minor documentation enhancement rather than a critical fix. The existing warning is already informative enough for most users.

Low
Fix incorrect code block language tag

The code block uses bash ignore as the language hint, but this is a REST API call,
not a bash command. Other cluster settings examples in the same file use json as the
language. Using an inconsistent or incorrect language tag may cause syntax
highlighting issues in rendered documentation.

docs/user/ppl/cmd/patterns.md [93-100]

-```bash ignore
+```json
 PUT _cluster/settings
 {
   "persistent": {
     "plugins.calcite.udaf_pushdown.enabled": true
   }
 }
<details><summary>Suggestion importance[1-10]: 4</summary>

__

Why: The `bash ignore` language tag is inconsistent with other REST API examples in the file that use `json`. Fixing this improves syntax highlighting consistency in rendered documentation, though it's a minor style issue.


</details></details></td><td align=center>Low

</td></tr></tr></tbody></table>

</details>

…reduce

Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

Persistent review updated to latest commit 0dece50

Signed-off-by: Songkan Tang <songkant@amazon.com>
@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit d815a87.

PathLineSeverityDescription
integ-test/src/test/resources/expectedOutput/calcite/explain_patterns_brain_agg_push.yaml22mediumBase64-encoded payloads with Java serialization magic bytes (rO0ABX prefix, indicating Java object serialization) are embedded in OpenSearch scripted metric scripts. These serialized Calcite RexNode expressions are sent to and deserialized on data nodes. Java deserialization of remotely-supplied payloads is a known RCE vector (CVE pattern). While the serialization appears to be from server-generated query plans, the lack of explicit deserialization safeguards in CalciteScriptEngine warrants investigation.
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/ScriptedMetricDataContext.java107mediumThe mergeResultIntoState() method performs an unchecked cast of arbitrary execution results into the shared state map (state.putAll((Map) result[0])). Combined with the UDAF pushdown explicitly bypassing OpenSearch memory circuit breakers (documented in the PR), a malicious or malformed payload could overwrite state entries or trigger unbounded memory growth on data nodes without circuit breaker protection. The bypass of safety mechanisms with no alternative bounds checking is a concern.
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/scriptedmetric/udaf/PatternScriptedMetricUDAF.java93lowPATTERN_FLUSH_UDF is referenced in buildCombineScript() and defined in PPLBuiltinOperators.java, but has no corresponding entry in BuiltinFunctionName.java (only PATTERN_INIT_UDF, PATTERN_ADD_UDF, PATTERN_COMBINE_UDF, and PATTERN_RESULT_UDF are registered). This registry inconsistency could indicate an incomplete or hidden registration path for a script executed on remote data nodes.

The table above displays the top 10 most important findings.

Total: 3 | Critical: 0 | High: 0 | Medium: 2 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

Persistent review updated to latest commit d815a87

Signed-off-by: Songkan Tang <songkant@amazon.com>
@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

Persistent review updated to latest commit bf583fa

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request pushdown pushdown related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Pushdown any UDAF by scripted metric aggregations

4 participants