From 4009d29ab585447453ad50ca9c7c7c329bfb8639 Mon Sep 17 00:00:00 2001 From: Ashley Davis Date: Thu, 5 Feb 2026 09:30:40 +0000 Subject: [PATCH 1/2] add context to DataGatherer.Fetch Signed-off-by: Ashley Davis --- pkg/agent/dummy_data_gatherer.go | 2 +- pkg/agent/run.go | 2 +- pkg/datagatherer/datagatherer.go | 2 +- pkg/datagatherer/k8sdiscovery/discovery.go | 2 +- pkg/datagatherer/k8sdynamic/dynamic.go | 2 +- pkg/datagatherer/k8sdynamic/dynamic_test.go | 5 +++-- pkg/datagatherer/local/local.go | 2 +- pkg/datagatherer/oidc/oidc.go | 4 +--- pkg/datagatherer/oidc/oidc_test.go | 4 ++-- 9 files changed, 12 insertions(+), 13 deletions(-) diff --git a/pkg/agent/dummy_data_gatherer.go b/pkg/agent/dummy_data_gatherer.go index 997c50eb..8d224d27 100644 --- a/pkg/agent/dummy_data_gatherer.go +++ b/pkg/agent/dummy_data_gatherer.go @@ -39,7 +39,7 @@ func (g *dummyDataGatherer) WaitForCacheSync(ctx context.Context) error { return nil } -func (c *dummyDataGatherer) Fetch() (any, int, error) { +func (c *dummyDataGatherer) Fetch(ctx context.Context) (any, int, error) { var err error if c.attemptNumber < c.FailedAttempts { err = fmt.Errorf("First %d attempts will fail", c.FailedAttempts) diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 609f5c4d..1af820c8 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -361,7 +361,7 @@ func gatherData(ctx context.Context, config CombinedConfig, dataGatherers map[st var dgError *multierror.Error for k, dg := range dataGatherers { - dgData, count, err := dg.Fetch() + dgData, count, err := dg.Fetch(ctx) if err != nil { dgError = multierror.Append(dgError, fmt.Errorf("error in datagatherer %s: %w", k, err)) diff --git a/pkg/datagatherer/datagatherer.go b/pkg/datagatherer/datagatherer.go index 0baab09d..afea88ec 100644 --- a/pkg/datagatherer/datagatherer.go +++ b/pkg/datagatherer/datagatherer.go @@ -14,7 +14,7 @@ type DataGatherer interface { // Fetch retrieves data. // count is the number of items that were discovered. A negative count means the number // of items was indeterminate. - Fetch() (data any, count int, err error) + Fetch(ctx context.Context) (data any, count int, err error) // Run starts the data gatherer's informers for resource collection. // Returns error if the data gatherer informer wasn't initialized Run(ctx context.Context) error diff --git a/pkg/datagatherer/k8sdiscovery/discovery.go b/pkg/datagatherer/k8sdiscovery/discovery.go index e4ce54b5..3033c85f 100644 --- a/pkg/datagatherer/k8sdiscovery/discovery.go +++ b/pkg/datagatherer/k8sdiscovery/discovery.go @@ -76,7 +76,7 @@ func (g *DataGathererDiscovery) WaitForCacheSync(ctx context.Context) error { } // Fetch will fetch discovery data from the apiserver, or return an error -func (g *DataGathererDiscovery) Fetch() (any, int, error) { +func (g *DataGathererDiscovery) Fetch(ctx context.Context) (any, int, error) { data, err := g.cl.ServerVersion() if err != nil { return nil, -1, fmt.Errorf("failed to get server version: %v", err) diff --git a/pkg/datagatherer/k8sdynamic/dynamic.go b/pkg/datagatherer/k8sdynamic/dynamic.go index 55cb4cde..3fea7cca 100644 --- a/pkg/datagatherer/k8sdynamic/dynamic.go +++ b/pkg/datagatherer/k8sdynamic/dynamic.go @@ -385,7 +385,7 @@ func (g *DataGathererDynamic) WaitForCacheSync(ctx context.Context) error { // Fetch will fetch the requested data from the apiserver, or return an error // if fetching the data fails. -func (g *DataGathererDynamic) Fetch() (any, int, error) { +func (g *DataGathererDynamic) Fetch(ctx context.Context) (any, int, error) { if g.groupVersionResource.String() == "" { return nil, -1, fmt.Errorf("resource type must be specified") } diff --git a/pkg/datagatherer/k8sdynamic/dynamic_test.go b/pkg/datagatherer/k8sdynamic/dynamic_test.go index 1adae119..2cfcc79b 100644 --- a/pkg/datagatherer/k8sdynamic/dynamic_test.go +++ b/pkg/datagatherer/k8sdynamic/dynamic_test.go @@ -117,6 +117,7 @@ func sortGatheredResources(list []*api.GatheredResource) { func TestNewDataGathererWithClientAndDynamicInformer(t *testing.T) { ctx := t.Context() + config := ConfigDynamic{ ExcludeNamespaces: []string{"kube-system"}, GroupVersionResource: schema.GroupVersionResource{Group: "foobar", Version: "v1", Resource: "foos"}, @@ -748,7 +749,7 @@ func TestDynamicGatherer_Fetch(t *testing.T) { if waitTimeout(&wg, 30*time.Second) { t.Fatalf("unexpected timeout") } - res, expectCount, err := dynamiDg.Fetch() + res, expectCount, err := dynamiDg.Fetch(ctx) if err != nil && !tc.err { t.Errorf("expected no error but got: %v", err) } @@ -1061,7 +1062,7 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) { if waitTimeout(&wg, 5*time.Second) { t.Fatalf("unexpected timeout") } - rawRes, count, err := dynamiDg.Fetch() + rawRes, count, err := dynamiDg.Fetch(ctx) if tc.err { require.Error(t, err) } else { diff --git a/pkg/datagatherer/local/local.go b/pkg/datagatherer/local/local.go index 530cd11c..4169aca3 100644 --- a/pkg/datagatherer/local/local.go +++ b/pkg/datagatherer/local/local.go @@ -49,7 +49,7 @@ func (g *DataGatherer) WaitForCacheSync(ctx context.Context) error { } // Fetch loads and returns the data from the LocalDatagatherer's dataPath -func (g *DataGatherer) Fetch() (any, int, error) { +func (g *DataGatherer) Fetch(ctx context.Context) (any, int, error) { dataBytes, err := os.ReadFile(g.dataPath) if err != nil { return nil, -1, err diff --git a/pkg/datagatherer/oidc/oidc.go b/pkg/datagatherer/oidc/oidc.go index 6683adf7..251fa922 100644 --- a/pkg/datagatherer/oidc/oidc.go +++ b/pkg/datagatherer/oidc/oidc.go @@ -66,9 +66,7 @@ func (g *DataGathererOIDC) WaitForCacheSync(ctx context.Context) error { } // Fetch will fetch the OIDC discovery document and JWKS from the cluster API server. -func (g *DataGathererOIDC) Fetch() (any, int, error) { - ctx := context.Background() - +func (g *DataGathererOIDC) Fetch(ctx context.Context) (any, int, error) { oidcResponse, oidcErr := g.fetchOIDCConfig(ctx) jwksResponse, jwksErr := g.fetchJWKS(ctx) diff --git a/pkg/datagatherer/oidc/oidc_test.go b/pkg/datagatherer/oidc/oidc_test.go index 6230708a..75f36cd4 100644 --- a/pkg/datagatherer/oidc/oidc_test.go +++ b/pkg/datagatherer/oidc/oidc_test.go @@ -51,7 +51,7 @@ func TestFetch_Success(t *testing.T) { rc := makeRESTClient(t, ts) g := &DataGathererOIDC{cl: rc} - anyRes, count, err := g.Fetch() + anyRes, count, err := g.Fetch(t.Context()) require.NoError(t, err) require.Equal(t, 1, count) @@ -197,7 +197,7 @@ func TestFetch_Errors(t *testing.T) { rc := makeRESTClient(t, ts) g := &DataGathererOIDC{cl: rc} - anyRes, count, err := g.Fetch() + anyRes, count, err := g.Fetch(t.Context()) require.NoError(t, err) require.Equal(t, 1, count) From 7e9a310937b78ec1cf60909a73a68e2f74182bab Mon Sep 17 00:00:00 2001 From: Ashley Davis Date: Thu, 5 Feb 2026 10:35:05 +0000 Subject: [PATCH 2/2] further cleanups to context use in run.go Signed-off-by: Ashley Davis --- pkg/agent/run.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 1af820c8..77ce57c1 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -51,9 +51,9 @@ const schemaVersion string = "v2.0.0" // Run starts the agent process func Run(cmd *cobra.Command, args []string) (returnErr error) { - ctx, cancel := context.WithCancel(cmd.Context()) + baseCtx, cancel := context.WithCancel(cmd.Context()) defer cancel() - log := klog.FromContext(ctx).WithName("Run") + log := klog.FromContext(baseCtx).WithName("Run") log.Info("Starting", "version", version.PreflightVersion, "commit", version.Commit) @@ -78,7 +78,7 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) { return fmt.Errorf("While evaluating configuration: %v", err) } - group, gctx := errgroup.WithContext(ctx) + group, gctx := errgroup.WithContext(baseCtx) defer func() { cancel() if groupErr := group.Wait(); groupErr != nil { @@ -123,13 +123,14 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) { }) group.Go(func() error { + listenCtx := klog.NewContext(gctx, log) err := listenAndServe( - klog.NewContext(gctx, log), + listenCtx, &http.Server{ Addr: serverAddress, Handler: server, BaseContext: func(_ net.Listener) context.Context { - return gctx + return listenCtx }, }, ) @@ -239,7 +240,7 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) { // be cancelled, which will cause this blocking loop to exit // instead of waiting for the time period. for { - if err := gatherAndOutputData(klog.NewContext(ctx, log), eventf, config, preflightClient, dataGatherers); err != nil { + if err := gatherAndOutputData(gctx, eventf, config, preflightClient, dataGatherers); err != nil { return err } @@ -316,7 +317,7 @@ func gatherAndOutputData(ctx context.Context, eventf Eventf, config CombinedConf } } else { var err error - readings, err = gatherData(klog.NewContext(ctx, log), config, dataGatherers) + readings, err = gatherData(ctx, config, dataGatherers) if err != nil { return err } @@ -338,7 +339,7 @@ func gatherAndOutputData(ctx context.Context, eventf Eventf, config CombinedConf postCtx, cancel := context.WithTimeout(ctx, config.BackoffMaxTime) defer cancel() - return struct{}{}, postData(klog.NewContext(postCtx, log), config, preflightClient, readings) + return struct{}{}, postData(postCtx, config, preflightClient, readings) } group.Go(func() error { @@ -406,7 +407,6 @@ func gatherData(ctx context.Context, config CombinedConfig, dataGatherers map[st func postData(ctx context.Context, config CombinedConfig, preflightClient client.Client, readings []*api.DataReading) error { log := klog.FromContext(ctx).WithName("postData") - ctx = klog.NewContext(ctx, log) err := preflightClient.PostDataReadingsWithOptions(ctx, readings, client.Options{ ClusterName: config.ClusterName, ClusterDescription: config.ClusterDescription,