From 34e405b31baa9dd43062477b9bdabe324c75f44b Mon Sep 17 00:00:00 2001 From: Raghav Aggarwal Date: Wed, 25 Feb 2026 21:10:32 +0530 Subject: [PATCH] TEZ-4686: NPE when tez-client tries to connect to AM via ZK Mode --- .../registry/zookeeper/ZkFrameworkClient.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkFrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkFrameworkClient.java index 1d85f44dcd..33641e41ee 100644 --- a/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkFrameworkClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkFrameworkClient.java @@ -19,6 +19,8 @@ package org.apache.tez.client.registry.zookeeper; import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -110,6 +112,28 @@ public void close() { */ @Override public YarnClientApplication createApplication() { + if (amRecord == null) { + long startTime = System.currentTimeMillis(); + while (!isZkInitialized() && (System.currentTimeMillis() - startTime) < 5000) { + try { + TimeUnit.MILLISECONDS.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for ZK registry sync", e); + } + } + + List records = amRegistryClient.getAllRecords(); + if (records != null && !records.isEmpty()) { + amRecord = records.getFirst(); + LOG.info( + "No AppId provided, discovered AM from Zookeeper: {}", amRecord.getApplicationId()); + } else { + throw new RuntimeException( + "No AM record found in Zookeeper. Ensure the AM is running and registered."); + } + } + ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class); ApplicationId appId = amRecord.getApplicationId(); context.setApplicationId(appId);