Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
0f259d9
docs: sync agent connector docs from airbyte-agent-connectors repo (#…
octavia-bot[bot] Feb 6, 2026
849354f
docs: sync agent connector docs from airbyte-agent-connectors repo (#…
octavia-bot[bot] Feb 6, 2026
b8e2434
docs(source-bing-ads): fix broken URL, grammar, and spelling in docum…
devin-ai-integration[bot] Feb 6, 2026
f5bafe1
fix(docs): add Definition ID to enterprise connector docs header (#72…
rwask Feb 6, 2026
3291905
docs: sync agent connector docs from airbyte-agent-connectors repo (#…
octavia-bot[bot] Feb 6, 2026
559e491
feat: airbyte bigquery destination custom partitioning/clustering logic
pragyash258 Jan 25, 2026
104617b
test(bigquery): Fix unit tests after DestinationCatalog refactor
pragyash258 Jan 26, 2026
8ec00b5
fix(bigquery): Remove duplicate @Singleton from StreamConfigProvider
pragyash258 Jan 26, 2026
cc770ec
feat: patch BQ connector with custom partitioning logic
pragyash258 Jan 26, 2026
1f0a342
chore: add remaining modified files for custom BQ logic
pragyash258 Jan 26, 2026
493a382
Add support for DAY/MONTH/YEAR based partitioning
pragyash258 Feb 2, 2026
104ed4e
Fix BQ partitioning test defaults
pragyash258 Feb 5, 2026
7a7be1f
Merge branch 'master' into codex/bq-custom-partitioning-cdk
pragyash258 Feb 7, 2026
156322c
Fix destination-bigquery CI failures
pragyash258 Feb 7, 2026
da3b59d
Apply formatting and update spec fixtures
pragyash258 Feb 7, 2026
fb35f93
Skip no-creds integration tests
pragyash258 Feb 7, 2026
97ad6b4
Revert "Skip no-creds integration tests"
pragyash258 Feb 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
testExecutionConcurrency=-1
JunitMethodExecutionTimeout=10m
cdkVersion=0.2.0
cdkVersion=0.2.8
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ object BigQueryUtils {
private val connectorNameOrDefault: String
get() =
Optional.ofNullable(System.getenv("WORKER_CONNECTOR_IMAGE"))
.map { name: String -> name.replace("airbyte/", "").replace(":", "/") }
.map { name: String -> name.trim().replace("airbyte/", "").replace(":", "/") }
.orElse("destination-bigquery")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import com.google.cloud.bigquery.BigQueryOptions
import io.airbyte.cdk.load.check.DestinationCheckerSync
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.orchestration.db.ColumnNameGenerator
import io.airbyte.cdk.load.orchestration.db.DefaultTempTableNameGenerator
import io.airbyte.cdk.load.orchestration.db.FinalTableNameGenerator
import io.airbyte.cdk.load.orchestration.db.RawTableNameGenerator
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DefaultDirectLoadTableSqlOperations
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableExecutionConfig
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableWriter
Expand All @@ -27,11 +30,15 @@ import io.airbyte.cdk.load.write.StreamStateStore
import io.airbyte.cdk.load.write.WriteOperation
import io.airbyte.integrations.destination.bigquery.check.BigqueryCheckCleaner
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
import io.airbyte.integrations.destination.bigquery.stream.StreamConfigProvider
import io.airbyte.integrations.destination.bigquery.write.bulk_loader.BigQueryBulkOneShotUploader
import io.airbyte.integrations.destination.bigquery.write.bulk_loader.BigQueryBulkOneShotUploaderStep
import io.airbyte.integrations.destination.bigquery.write.bulk_loader.BigqueryBulkLoadConfiguration
import io.airbyte.integrations.destination.bigquery.write.bulk_loader.BigqueryConfiguredForBulkLoad
import io.airbyte.integrations.destination.bigquery.write.typing_deduping.BigQueryDatabaseHandler
import io.airbyte.integrations.destination.bigquery.write.typing_deduping.BigqueryColumnNameGenerator
import io.airbyte.integrations.destination.bigquery.write.typing_deduping.BigqueryFinalTableNameGenerator
import io.airbyte.integrations.destination.bigquery.write.typing_deduping.BigqueryRawTableNameGenerator
import io.airbyte.integrations.destination.bigquery.write.typing_deduping.direct_load_tables.BigqueryDirectLoadDatabaseInitialStatusGatherer
import io.airbyte.integrations.destination.bigquery.write.typing_deduping.direct_load_tables.BigqueryDirectLoadNativeTableOperations
import io.airbyte.integrations.destination.bigquery.write.typing_deduping.direct_load_tables.BigqueryDirectLoadSqlGenerator
Expand All @@ -54,6 +61,9 @@ private val logger = KotlinLogging.logger {}
class BigqueryBeansFactory {
@Singleton fun getConfig(config: DestinationConfiguration) = config as BigqueryConfiguration

@Singleton
fun getStreamConfigProvider(config: BigqueryConfiguration) = StreamConfigProvider(config)

@Singleton
@Requires(condition = BigqueryConfiguredForBulkLoad::class)
fun getBulkLoadConfig(config: BigqueryConfiguration) = BigqueryBulkLoadConfiguration(config)
Expand All @@ -80,7 +90,7 @@ class BigqueryBeansFactory {

@Singleton
fun getChecker(
catalog: DestinationCatalog,
@Named("safeDestinationCatalog") catalog: DestinationCatalog,
@Named("inputStream") stdinPipe: InputStream,
taskLauncher: DestinationTaskLauncher,
syncManager: SyncManager,
Expand All @@ -92,22 +102,59 @@ class BigqueryBeansFactory {
BigqueryCheckCleaner(),
)

@Singleton
fun getRawTableNameGenerator(
config: BigqueryConfiguration,
streamConfigProvider: StreamConfigProvider
): RawTableNameGenerator {
return BigqueryRawTableNameGenerator(config, streamConfigProvider)
}

@Singleton
fun getFinalTableNameGenerator(
config: BigqueryConfiguration,
streamConfigProvider: StreamConfigProvider
): FinalTableNameGenerator {
return BigqueryFinalTableNameGenerator(config, streamConfigProvider)
}

@Singleton
fun getColumnNameGenerator(): ColumnNameGenerator {
return BigqueryColumnNameGenerator()
}

@Singleton
fun getWriter(
bigquery: BigQuery,
config: BigqueryConfiguration,
names: TableCatalog,
// micronaut will only instantiate a single instance of StreamStateStore,
// so accept it as a * generic and cast as needed.
// we use a different type depending on whether we're in legacy raw tables vs
// direct-load tables mode.
streamStateStore: StreamStateStore<*>,
streamConfigProvider: StreamConfigProvider,
names: TableCatalog,
): DestinationWriter {
val destinationHandler = BigQueryDatabaseHandler(bigquery, config.datasetLocation.region)
// We need to pass the generators to the TableCatalog manually since we are constructing it
// here?
// Actually, TableCatalog is usually injected. But wait, where is TableCatalog defined?
// It's usually created by the factory too.
// Let's check if we need to update TableCatalog creation.
// The original code had `names: TableCatalog` injected into getWriter.
// But TableCatalog takes generators as constructor args.
// So we need to ensure TableCatalog uses our new generators.
// Looking at existing TableCatalog in CDK, it uses @Named("rawTableNameGenerator") etc.
// So defining the beans with those names matches the expectation.

// Wait, the previous getWriter signature had `names: TableCatalog`.
// Let's keep that, but ensure we define the generator beans so TableCatalog can find them.

if (config.legacyRawTablesOnly) {
// force smart cast
@Suppress("UNCHECKED_CAST")
streamStateStore as StreamStateStore<TypingDedupingExecutionConfig>

return TypingDedupingWriter(
names,
BigqueryTypingDedupingDatabaseInitialStatusGatherer(bigquery),
Expand All @@ -127,6 +174,7 @@ class BigqueryBeansFactory {
BigqueryDirectLoadSqlGenerator(
projectId = config.projectId,
cdcDeletionMode = config.cdcDeletionMode,
streamConfigProvider = streamConfigProvider,
),
destinationHandler,
),
Expand Down Expand Up @@ -154,6 +202,7 @@ class BigqueryBeansFactory {
destinationHandler,
projectId = config.projectId,
tempTableNameGenerator,
streamConfigProvider,
),
sqlTableOperations = sqlTableOperations,
streamStateStore = streamStateStore,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2026 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery

import io.airbyte.cdk.Operation
import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DefaultDestinationCatalogFactory
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.NamespaceMapper
import io.airbyte.cdk.load.command.Overwrite
import io.airbyte.cdk.load.command.SoftDelete
import io.airbyte.cdk.load.command.Update
import io.airbyte.cdk.load.config.CHECK_STREAM_NAMESPACE
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires
import jakarta.inject.Named
import jakarta.inject.Singleton
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import org.apache.commons.lang3.RandomStringUtils

@Factory
@Replaces(DefaultDestinationCatalogFactory::class)
class SafeDestinationCatalogFactory {
@Requires(property = Operation.PROPERTY, notEquals = "check")
@Singleton
@Primary
@Named("safeDestinationCatalog")
fun syncCatalog(
catalog: ConfiguredAirbyteCatalog,
namespaceMapper: NamespaceMapper,
jsonSchemaToAirbyteType: JsonSchemaToAirbyteType,
): DestinationCatalog {
val streams =
catalog.streams.map { stream ->
val importType =
when (stream.destinationSyncMode) {
null -> throw IllegalArgumentException("Destination sync mode was null")
DestinationSyncMode.OVERWRITE -> Overwrite
DestinationSyncMode.APPEND -> Append
DestinationSyncMode.APPEND_DEDUP ->
Dedupe(
primaryKey = stream.primaryKey ?: emptyList(),
cursor = stream.cursorField ?: emptyList()
)
DestinationSyncMode.UPDATE -> Update
DestinationSyncMode.SOFT_DELETE -> SoftDelete
}

DestinationStream(
unmappedName = stream.stream.name,
unmappedNamespace = stream.stream.namespace,
importType = importType,
schema = jsonSchemaToAirbyteType.convert(stream.stream.jsonSchema),
generationId = stream.generationId ?: 0,
minimumGenerationId = stream.minimumGenerationId ?: 0,
syncId = stream.syncId ?: 0,
namespaceMapper = namespaceMapper,
)
}
return DestinationCatalog(streams)
}

@Requires(property = Operation.PROPERTY, value = "check")
@Singleton
@Primary
@Named("safeDestinationCatalog")
fun checkCatalog(
namespaceMapper: NamespaceMapper,
@Named("checkNamespace") checkNamespace: String?,
): DestinationCatalog {
// Copied from DefaultDestinationCatalogFactory to maintain behavior
val date = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"))
val random = RandomStringUtils.randomAlphabetic(5).lowercase()
val namespace = checkNamespace ?: "${CHECK_STREAM_NAMESPACE}_$date$random"
return DestinationCatalog(
listOf(
DestinationStream(
unmappedNamespace = namespace,
unmappedName = "test$date$random",
importType = Append,
schema =
ObjectType(linkedMapOf("test" to FieldType(IntegerType, nullable = true))),
generationId = 1,
minimumGenerationId = 0,
syncId = 1,
namespaceMapper = namespaceMapper
)
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ import io.airbyte.cdk.load.check.CheckCleaner
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.integrations.destination.bigquery.BigqueryBeansFactory
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
import io.airbyte.integrations.destination.bigquery.stream.StreamConfigProvider
import io.airbyte.integrations.destination.bigquery.write.typing_deduping.BigqueryFinalTableNameGenerator
import io.airbyte.integrations.destination.bigquery.write.typing_deduping.BigqueryRawTableNameGenerator
import io.airbyte.integrations.destination.bigquery.write.typing_deduping.toTableId

class BigqueryCheckCleaner : CheckCleaner<BigqueryConfiguration> {
override fun cleanup(config: BigqueryConfiguration, stream: DestinationStream) {
val streamConfigProvider = StreamConfigProvider(config)
val bq = BigqueryBeansFactory().getBigqueryClient(config)
bq.getTable(
BigqueryRawTableNameGenerator(config)
BigqueryRawTableNameGenerator(config, streamConfigProvider)
.getTableName(stream.mappedDescriptor)
.toTableId()
)
?.delete()
bq.getTable(
BigqueryFinalTableNameGenerator(config)
BigqueryFinalTableNameGenerator(config, streamConfigProvider)
.getTableName(stream.mappedDescriptor)
.toTableId()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@ data class BigqueryConfiguration(
val cdcDeletionMode: CdcDeletionMode,
val internalTableDataset: String,
val legacyRawTablesOnly: Boolean,
val defaultPartitioningField: String?,
val defaultClusteringField: String?,
val defaultTableSuffix: String?,
val defaultPartitioningGranularity: PartitioningGranularity?,
val streamConfigMap: Map<String, StreamLevelConfig>,
) : DestinationConfiguration() {
override val numOpenStreamWorkers = 3
// currently the base cdk declares 0.2 as the default.
// use 0.4 so that we support 20MiB records.
override val maxMessageQueueMemoryUsageRatio = 0.4
}

data class StreamLevelConfig(
val partitioningField: String? = null,
val partitioningGranularity: PartitioningGranularity? = null,
val clusteringField: String? = null,
val tableSuffix: String? = null,
val dataset: String? = null,
)

sealed interface LoadingMethodConfiguration

data object BatchedStandardInsertConfiguration : LoadingMethodConfiguration
Expand Down Expand Up @@ -66,6 +79,22 @@ class BigqueryConfigurationFactory :
pojo.internalTableDataset!!
},
legacyRawTablesOnly = pojo.legacyRawTablesOnly ?: false,
defaultPartitioningField = pojo.defaultPartitioningField,
defaultClusteringField = pojo.defaultClusteringField,
defaultTableSuffix = pojo.defaultTableSuffix,
defaultPartitioningGranularity = pojo.defaultPartitioningGranularity,
streamConfigMap =
pojo.streams?.associate {
it.name to
StreamLevelConfig(
partitioningField = it.partitioningField,
partitioningGranularity = it.partitioningGranularity,
clusteringField = it.clusteringField,
tableSuffix = it.tableSuffix,
dataset = it.dataset
)
}
?: emptyMap(),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,68 @@ class BigquerySpecification : ConfigurationSpecification() {
@get:JsonProperty("raw_data_dataset")
@get:JsonSchemaInject(json = """{"group": "advanced", "order": 8}""")
val internalTableDataset: String? = null

@get:JsonSchemaTitle("Default Partitioning Field")
@get:JsonPropertyDescription(
"Default field to use for partitioning (e.g. _airbyte_extracted_at)"
)
@get:JsonProperty("default_partitioning_field")
@get:JsonSchemaInject(json = """{"group": "advanced", "order": 10}""")
val defaultPartitioningField: String? = null

@get:JsonSchemaTitle("Default Partitioning Granularity")
@get:JsonPropertyDescription(
"Default partitioning granularity: DAY, MONTH, or YEAR. Defaults to DAY."
)
@get:JsonProperty("default_partitioning_granularity")
@get:JsonSchemaInject(json = """{"group": "advanced", "order": 11}""")
val defaultPartitioningGranularity: PartitioningGranularity? = null

@get:JsonSchemaTitle("Default Clustering Field")
@get:JsonPropertyDescription("Default field to use for clustering (e.g. _airbyte_extracted_at)")
@get:JsonProperty("default_clustering_field")
@get:JsonSchemaInject(json = """{"group": "advanced", "order": 12}""")
val defaultClusteringField: String? = null

@get:JsonSchemaTitle("Default Table Suffix")
@get:JsonPropertyDescription("Default suffix to append to table names")
@get:JsonProperty("default_table_suffix")
@get:JsonSchemaInject(json = """{"group": "advanced", "order": 13}""")
val defaultTableSuffix: String? = null

@get:JsonSchemaTitle("Stream Configuration")
@get:JsonPropertyDescription(
"""Per-stream configuration overrides.""",
)
@get:JsonProperty("streams")
@get:JsonSchemaInject(json = """{"group": "advanced", "order": 14}""")
val streams: List<SingleStreamConfiguration>? = null
}

/** Per-stream configuration for custom partitioning, clustering, and table naming. */
data class SingleStreamConfiguration(
@get:JsonSchemaTitle("Stream Name")
@get:JsonPropertyDescription("Name of the stream (or namespace.stream_name)")
@JsonProperty("name")
val name: String = "",
@get:JsonSchemaTitle("Partitioning Field")
@JsonProperty("partitioning_field")
val partitioningField: String? = null,
@get:JsonSchemaTitle("Partitioning Granularity")
@get:JsonPropertyDescription(
"Partitioning granularity for the partitioning field. Allowed values: DAY, MONTH, YEAR. Defaults to DAY."
)
@JsonProperty("partitioning_granularity")
val partitioningGranularity: PartitioningGranularity? = null,
@get:JsonSchemaTitle("Clustering Field")
@JsonProperty("clustering_field")
val clusteringField: String? = null,
@get:JsonSchemaTitle("Table Suffix")
@JsonProperty("table_suffix")
val tableSuffix: String? = null,
@get:JsonSchemaTitle("Target Dataset") @JsonProperty("dataset") val dataset: String? = null,
)

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
Expand Down Expand Up @@ -214,6 +274,12 @@ enum class CdcDeletionMode(@get:JsonValue val cdcDeletionMode: String) {
SOFT_DELETE("Soft delete"),
}

enum class PartitioningGranularity(@get:JsonValue val granularity: String) {
DAY("DAY"),
MONTH("MONTH"),
YEAR("YEAR"),
}

@Singleton
class BigquerySpecificationExtension : DestinationSpecificationExtension {
override val supportedSyncModes =
Expand Down
Loading