Requirements
Iceberg Sink
Flink can be used to land data from a source into an Iceberg table using the Iceberg sink. A typical use case is to read from a Kafka topic source, perform some transformations, and then write to a table. Here is an example of configuring the Iceberg sink with Tabular, in Java:
Map<String, String> catalogProps =
Map.of(
"uri", "https://api.tabular.io/ws",
"credential", "<your-tabular-credential>",
"warehouse", "<your-warehouse-name>",
"auth.default-refresh-enabled", "true");
CatalogLoader catalogLoader = CatalogLoader.custom(
"iceberg",
catalogProps,
new Configuration(),
"org.apache.iceberg.rest.RESTCatalog");
TableIdentifier tableIdentifier = TableIdentifier.parse("<db>.<table>");
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, tableIdentifier);
Catalog catalog = catalogLoader.loadCatalog();
Schema schema = catalog.loadTable(tableIdentifier).schema();
FlinkSink.forRow(stream, FlinkSchemaUtil.toSchema(schema)).tableLoader(tableLoader).append();
Here is an example in Scala:
val catalogProps =
ImmutableMap.of(
"uri", "https://api.tabular.io/ws",
"credential", "<your-tabular-credential>",
"warehouse", "<your-warehouse-name>",
"auth.default-refresh-enabled", "true")
val catalogLoader = CatalogLoader.custom(
"iceberg",
catalogProps,
new Configuration,
"org.apache.iceberg.rest.RESTCatalog")
val tableIdentifier = TableIdentifier.parse("<db>.<table>")
val tableLoader = TableLoader.fromCatalog(catalogLoader, tableIdentifier)
val catalog = catalogLoader.loadCatalog
val schema = catalog.loadTable(tableIdentifier).schema
FlinkSink.forRow(stream.javaStream, FlinkSchemaUtil.toSchema(schema)).tableLoader(tableLoader).append
Iceberg Source
Flink can also read data from an Iceberg table, both for incremental streaming workloads, which will read new data as it is committed, and for batch loading, such as backfilling data. Here is an example of configuring the Iceberg source with Tabular to batch load some data, in Java:
Map<String, String> catalogProps =
Map.of(
"uri", "https://api.tabular.io/ws",
"credential", "<your-tabular-credential>",
"warehouse", "<your-warehouse-name>",
"auth.default-refresh-enabled", "true");
CatalogLoader catalogLoader = CatalogLoader.custom(
"iceberg",
catalogProps,
new Configuration(),
"org.apache.iceberg.rest.RESTCatalog");
CatalogLoader catalogLoader =
CatalogLoader.custom(
"iceberg", catalogProps, hadoopConf, "org.apache.iceberg.rest.RESTCatalog");
TableIdentifier inputTableIdentifier = TableIdentifier.parse("<db>.<table>");
String startDate = Instant.now().minus(7, DAYS).toString();
List<Expression> filters =
List.of(Expressions.greaterThanOrEqual("event_ts", startDate));
DataStream<RowData> source =
FlinkSource.forRowData()
.env(env)
.tableLoader(TableLoader.fromCatalog(catalogLoader, inputTableIdentifier))
.filters(filters)
.build();
Deploying a Flink Job to Kubernetes
Using the Flink Kubernetes Operator, you can easily deploy a Flink job to a Kubernetes cluster. Once you have the operator installed on your cluster, you can deploy the job by applying a manifest. Refer to the Flink Kubernetes Operator documentation for more details.
Blog Post
Read more about using Flink with Tabular in our blog post.