Flink

On this page:

Requirements

Iceberg Sink

Flink can be used to land data from a source into an Iceberg table using the Iceberg sink. A typical use case is to read from a Kafka topic source, perform some transformations, and then write to a table. Here is an example of configuring the Iceberg sink with Tabular, in Java:

Map<String, String> catalogProps =
    Map.of(
        "uri", "https://api.tabular.io/ws",
        "credential", "<your-tabular-credential>",
        "warehouse", "<your-warehouse-name>",
        "auth.default-refresh-enabled", "true");

CatalogLoader catalogLoader = CatalogLoader.custom(
        "iceberg",
        catalogProps,
        new Configuration(),
        "org.apache.iceberg.rest.RESTCatalog");

TableIdentifier tableIdentifier = TableIdentifier.parse("<db>.<table>");
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, tableIdentifier);
Catalog catalog = catalogLoader.loadCatalog();
Schema schema = catalog.loadTable(tableIdentifier).schema();

FlinkSink.forRow(stream, FlinkSchemaUtil.toSchema(schema)).tableLoader(tableLoader).append();

Here is an example in Scala:

val catalogProps =
  ImmutableMap.of(
    "uri", "https://api.tabular.io/ws",
    "credential", "<your-tabular-credential>",
    "warehouse", "<your-warehouse-name>",
    "auth.default-refresh-enabled", "true")

val catalogLoader = CatalogLoader.custom(
	"iceberg",
	catalogProps,
	new Configuration,
	"org.apache.iceberg.rest.RESTCatalog")

val tableIdentifier = TableIdentifier.parse("<db>.<table>")
val tableLoader = TableLoader.fromCatalog(catalogLoader, tableIdentifier)
val catalog = catalogLoader.loadCatalog
val schema = catalog.loadTable(tableIdentifier).schema

FlinkSink.forRow(stream.javaStream, FlinkSchemaUtil.toSchema(schema)).tableLoader(tableLoader).append

Iceberg Source

Flink can also read data from an Iceberg table, both for incremental streaming workloads, which will read new data as it is committed, and for batch loading, such as backfilling data. Here is an example of configuring the Iceberg source with Tabular to batch load some data, in Java:

Map<String, String> catalogProps =
    Map.of(
        "uri", "https://api.tabular.io/ws",
        "credential", "<your-tabular-credential>",
        "warehouse", "<your-warehouse-name>",
        "auth.default-refresh-enabled", "true");

CatalogLoader catalogLoader = CatalogLoader.custom(
        "iceberg",
        catalogProps,
        new Configuration(),
        "org.apache.iceberg.rest.RESTCatalog");

CatalogLoader catalogLoader =
    CatalogLoader.custom(
        "iceberg", catalogProps, hadoopConf, "org.apache.iceberg.rest.RESTCatalog");

TableIdentifier inputTableIdentifier = TableIdentifier.parse("<db>.<table>");

String startDate = Instant.now().minus(7, DAYS).toString();
List<Expression> filters =
    List.of(Expressions.greaterThanOrEqual("event_ts", startDate));

DataStream<RowData> source =
    FlinkSource.forRowData()
        .env(env)
        .tableLoader(TableLoader.fromCatalog(catalogLoader, inputTableIdentifier))
        .filters(filters)
        .build();

Using the Flink Kubernetes Operator, you can easily deploy a Flink job to a Kubernetes cluster. Once you have the operator installed on your cluster, you can deploy the job by applying a manifest. Refer to the Flink Kubernetes Operator documentation for more details.

Blog Post

Read more about using Flink with Tabular in our blog post.