diff --git a/java/Iceberg/S3TableSQLJSON/README.md b/java/Iceberg/S3TableSQLJSON/README.md
new file mode 100644
index 0000000..89b6ebd
--- /dev/null
+++ b/java/Iceberg/S3TableSQLJSON/README.md
@@ -0,0 +1,85 @@
+# Flink Iceberg Sink using SQL API with S3 Tables
+
+* Flink version: 1.19.0
+* Flink API: SQL API
+* Iceberg 1.8.1
+* Language: Java (11)
+* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
+ and [S3 Tables](https://docs.aws.amazon.com/s3/latest/userguide/s3-tables.html)
+
+This example demonstrates how to use
+[Flink SQL API with Iceberg](https://iceberg.apache.org/docs/latest/flink-writes/) and the Amazon S3 Tables Catalog.
+
+For simplicity, the application generates synthetic data, random stock prices, internally.
+Data is generated as POJO objects, simulating a real source, for example a Kafka Source, that receives records
+that can be converted to table format for SQL operations.
+
+### Prerequisites
+
+#### Create a Table Bucket
+The sample application expects the S3 Table Bucket to exist and to have the ARN in the local environment:
+```bash
+aws s3tables create-table-bucket --name flink-example
+{
+ "arn": "arn:aws:s3tables:us-east-1:111122223333:bucket/flink-example"
+
+}
+```
+
+If you already did this, you can query to get the ARN like this:
+
+```bash
+aws s3tables list-table-buckets
+```
+
+This will show you the list of table buckets. Select the one you wish to write to and paste it into the config file in this project.
+
+#### Create a Namespace in the Table Bucket (Database)
+The sample application expects the Namespace in the Table Bucket to exist
+```bash
+aws s3tables create-namespace \
+ --table-bucket-arn arn:aws:s3tables:us-east-1:111122223333:bucket/flink-example \
+ --namespace default
+```
+
+#### IAM Permissions
+
+The application must have IAM permissions to:
+* Write and Read from the S3 Table
+
+### Runtime configuration
+
+When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties.
+
+When running locally, the configuration is read from the
+[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file.
+
+Runtime parameters:
+
+| Group ID | Key | Default | Description |
+|-----------|--------------------------|------------------|---------------------------------------------------------------------------------------------------------------------|
+| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. |
+| `Iceberg` | `table.bucket.arn` | (mandatory) | ARN of the S3 Tables bucket, for example `arn:aws:s3tables:region:account:bucket/my-bucket`. |
+| `Iceberg` | `catalog.db` | `iceberg` | Name of the S3 Tables Catalog database. |
+| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the S3 Tables Catalog table. |
+
+### Checkpoints
+
+Checkpointing must be enabled. Iceberg commits writes on checkpoint.
+
+When running locally, the application enables checkpoints programmatically, every 30 seconds.
+When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
+
+### Known limitations
+
+At the moment there are current limitations concerning Flink Iceberg integration:
+* Doesn't support Iceberg Table with hidden partitioning
+* Doesn't support adding columns, removing columns, renaming columns or changing columns.
+
+### Running locally, in IntelliJ
+
+You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
+
+Make sure to configure the appropriate AWS credentials and region when running locally, and ensure the provided S3 Tables bucket ARN is valid and accessible.
+
+See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details.
\ No newline at end of file
diff --git a/java/Iceberg/S3TableSQLJSON/pom.xml b/java/Iceberg/S3TableSQLJSON/pom.xml
new file mode 100644
index 0000000..511ba19
--- /dev/null
+++ b/java/Iceberg/S3TableSQLJSON/pom.xml
@@ -0,0 +1,238 @@
+
+
+ 4.0.0
+
+ com.amazonaws
+ s3-table-sql-flink
+ 1.0
+ jar
+
+
+ UTF-8
+ 11
+ ${target.java.version}
+ ${target.java.version}
+
+ 1.19.0
+ 1.11.3
+ 2.12
+ 3.4.0
+ 1.6.1
+ 1.2.0
+ 2.23.1
+ 5.8.1
+
+
+
+
+
+ org.apache.flink
+ flink-runtime-web
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-metrics-dropwizard
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-avro
+ ${flink.version}
+
+
+
+
+ org.apache.flink
+ flink-table-planner_${scala.version}
+ ${flink.version}
+ provided
+
+
+
+
+ com.amazonaws
+ aws-kinesisanalytics-runtime
+ ${kda.runtime.version}
+ provided
+
+
+
+
+ software.amazon.awssdk
+ s3tables
+ 2.31.50
+
+
+ software.amazon.s3tables
+ s3-tables-catalog-for-iceberg
+ 0.1.6
+
+
+ org.apache.flink
+ flink-connector-files
+ ${flink.version}
+ provided
+
+
+
+ org.apache.hadoop
+ hadoop-client
+ ${hadoop.version}
+
+
+ org.apache.avro
+ avro
+
+
+ org.slf4j
+ slf4j-reload4j
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ ${hadoop.version}
+
+
+
+
+ org.apache.iceberg
+ iceberg-core
+ ${iceberg.version}
+
+
+
+ org.apache.iceberg
+ iceberg-flink
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-aws-bundle
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-aws
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-flink-1.19
+ ${iceberg.version}
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit5.version}
+ test
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j.version}
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j.version}
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ ${target.java.version}
+ ${target.java.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.1
+
+
+ package
+
+ shade
+
+
+
+
+ org.apache.flink:force-shading
+ com.google.code.findbugs:jsr305
+ org.slf4j:*
+ log4j:*
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ S3TableSQLJSONExample
+
+
+
+
+
+
+
+
+
diff --git a/java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java b/java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java
new file mode 100644
index 0000000..d3f8faf
--- /dev/null
+++ b/java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: MIT-0
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this
+ * software and associated documentation files (the "Software"), to deal in the Software
+ * without restriction, including without limitation the rights to use, copy, modify,
+ * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
+ * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+ * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+public class S3TableSQLJSONExample {
+ // Constants
+ private static final String CATALOG_NAME = "s3";
+ private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json";
+ private static final Logger LOG = LoggerFactory.getLogger(S3TableSQLJSONExample.class);
+
+ // Configuration properties
+ private static String tableBucketArn;
+ private static String s3TableDatabase;
+ private static String s3Table;
+
+ public static void main(String[] args) throws Exception {
+ // 1. Initialize environments - using standard environment instead of WebUI for production consistency
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+ // 2. Load properties and configure environment
+ Map applicationProperties = loadApplicationProperties(env);
+ Properties icebergProperties = applicationProperties.get("Iceberg");
+
+ // Configure local development settings if needed
+ if (isLocal(env)) {
+ env.enableCheckpointing(30000);
+ env.setParallelism(2);
+ }
+
+ // 3. Setup configuration properties with validation
+ setupS3TableProperties(icebergProperties);
+ Catalog s3Catalog = createS3Catalog(tableEnv);
+
+ tableEnv.registerCatalog(CATALOG_NAME,s3Catalog);
+
+ // 4. Create data generator source
+ Properties dataGenProperties = applicationProperties.get("DataGen");
+ DataStream stockPriceDataStream = env.fromSource(
+ createDataGenerator(dataGenProperties),
+ WatermarkStrategy.noWatermarks(),
+ "DataGen");
+
+ // 5. Convert DataStream to Table and create view
+ Table stockPriceTable = tableEnv.fromDataStream(stockPriceDataStream);
+ tableEnv.createTemporaryView("stockPriceTable", stockPriceTable);
+
+ String sinkTableName = CATALOG_NAME + "." + s3TableDatabase + "." + s3Table;
+
+ // Define and create table with schema matching AVRO schema from DataStream example
+ String createTableStatement = "CREATE TABLE IF NOT EXISTS " + sinkTableName + " (" +
+ "`timestamp` STRING, " +
+ "symbol STRING," +
+ "price FLOAT," +
+ "volumes INT" +
+ ") PARTITIONED BY (symbol)";
+
+ LOG.info("Creating table with statement: {}", createTableStatement);
+ tableEnv.executeSql(createTableStatement);
+
+ // 7. Execute SQL operations - Insert data from stock price stream
+ String insertQuery = "INSERT INTO " + sinkTableName +
+ " SELECT `timestamp`, symbol, price, volumes FROM stockPriceTable";
+ LOG.info("Executing insert statement: {}", insertQuery);
+ TableResult insertResult = tableEnv.executeSql(insertQuery);
+
+ // Keep the job running to continuously insert data
+ LOG.info("Application started successfully. Inserting data into S3 table: {}", sinkTableName);
+
+ }
+
+ private static void setupS3TableProperties(Properties icebergProperties) {
+ tableBucketArn = icebergProperties.getProperty("table.bucket.arn");
+ s3TableDatabase = icebergProperties.getProperty("catalog.db", "iceberg");
+ s3Table = icebergProperties.getProperty("catalog.table", "prices_s3tables");
+
+ Preconditions.checkNotNull(tableBucketArn, "You must supply a table bucket ARN.");
+ Preconditions.checkNotNull(s3TableDatabase, "You must supply a database name");
+ Preconditions.checkNotNull(s3Table, "You must supply a table name");
+
+ // Validate ARN format
+ validateArn(tableBucketArn);
+
+ LOG.info("S3 Tables configuration: bucket={}, database={}, table={}",
+ tableBucketArn, s3TableDatabase, s3Table);
+ }
+
+ private static DataGeneratorSource createDataGenerator(Properties dataGeneratorProperties) {
+ double recordsPerSecond = Double.parseDouble(dataGeneratorProperties.getProperty("records.per.sec", "10.0"));
+ Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0");
+
+ LOG.info("Data generator: {} record/sec", recordsPerSecond);
+ return new DataGeneratorSource(new StockPriceGeneratorFunction(),
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(recordsPerSecond),
+ TypeInformation.of(StockPrice.class));
+ }
+
+ /**
+ * Defines a config object with S3 Table specific catalog and io implementations
+ * Then, uses that to create the Flink catalog
+ */
+ private static Catalog createS3Catalog(StreamTableEnvironment tableEnv) {
+
+ Map catalogProperties = new HashMap<>();
+ catalogProperties.put("type", "iceberg");
+ catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
+ catalogProperties.put("warehouse", tableBucketArn);
+ catalogProperties.put("catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog");
+ //Loading Glue Data Catalog
+ CatalogLoader glueCatalogLoader = CatalogLoader.custom(
+ CATALOG_NAME,
+ catalogProperties,
+ new org.apache.hadoop.conf.Configuration(),
+ "software.amazon.s3tables.iceberg.S3TablesCatalog");
+
+
+ FlinkCatalog flinkCatalog = new FlinkCatalog(CATALOG_NAME,s3TableDatabase, Namespace.empty(),glueCatalogLoader,true,1000);
+ return flinkCatalog;
+ }
+
+ private static boolean isLocal(StreamExecutionEnvironment env) {
+ return env instanceof LocalStreamEnvironment;
+ }
+
+ /**
+ * Load application properties from Amazon Managed Service for Apache Flink runtime
+ * or from a local resource, when the environment is local
+ */
+ private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
+ if (isLocal(env)) {
+ LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
+ return KinesisAnalyticsRuntime.getApplicationProperties(
+ Objects.requireNonNull(S3TableSQLJSONExample.class.getClassLoader()
+ .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath());
+ } else {
+ LOG.info("Loading application properties from Amazon Managed Service for Apache Flink");
+ return KinesisAnalyticsRuntime.getApplicationProperties();
+ }
+ }
+
+ public static void validateArn(String arn) {
+ String arnPattern = "^arn:aws[a-zA-Z-]*:[a-zA-Z0-9-]+:[a-zA-Z0-9-]*:[0-9]{12}:[a-zA-Z0-9-_/:.]+$";
+ Preconditions.checkArgument(arn != null && arn.matches(arnPattern),
+ "Invalid ARN format: %s. ARN must match pattern: arn:partition:service:region:account-id:resource", arn);
+ }
+}
diff --git a/java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java
new file mode 100644
index 0000000..c386c2a
--- /dev/null
+++ b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java
@@ -0,0 +1,60 @@
+import java.time.Instant;
+
+public class StockPrice {
+ private String timestamp;
+ private String symbol;
+ private Float price;
+ private Integer volumes;
+
+ public StockPrice() {
+ }
+
+ public StockPrice(String timestamp, String symbol, Float price, Integer volumes) {
+ this.timestamp = timestamp;
+ this.symbol = symbol;
+ this.price = price;
+ this.volumes = volumes;
+ }
+
+ public String getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(String timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getSymbol() {
+ return symbol;
+ }
+
+ public void setSymbol(String symbol) {
+ this.symbol = symbol;
+ }
+
+ public Float getPrice() {
+ return price;
+ }
+
+ public void setPrice(Float price) {
+ this.price = price;
+ }
+
+ public Integer getVolumes() {
+ return volumes;
+ }
+
+ public void setVolumes(Integer volumes) {
+ this.volumes = volumes;
+ }
+
+ @Override
+ public String toString() {
+ return "StockPrice{" +
+ "timestamp='" + timestamp + '\'' +
+ ", symbol='" + symbol + '\'' +
+ ", price=" + price +
+ ", volumes=" + volumes +
+ '}';
+ }
+}
diff --git a/java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java
new file mode 100644
index 0000000..38e236b
--- /dev/null
+++ b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java
@@ -0,0 +1,24 @@
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import java.time.Instant;
+
+/**
+ * Function used by DataGen source to generate random records as StockPrice POJOs.
+ *
+ * The generator mimics the behavior of AvroGenericStockTradeGeneratorFunction
+ * from the IcebergDataStreamSink example.
+ */
+public class StockPriceGeneratorFunction implements GeneratorFunction {
+
+ private static final String[] SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"};
+
+ @Override
+ public StockPrice map(Long sequence) throws Exception {
+ String symbol = SYMBOLS[RandomUtils.nextInt(0, SYMBOLS.length)];
+ float price = RandomUtils.nextFloat(0, 10);
+ int volumes = RandomUtils.nextInt(0, 1000000);
+ String timestamp = Instant.now().toString();
+
+ return new StockPrice(timestamp, symbol, price, volumes);
+ }
+}
diff --git a/java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json
new file mode 100644
index 0000000..bce4273
--- /dev/null
+++ b/java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json
@@ -0,0 +1,16 @@
+[
+ {
+ "PropertyGroupId": "DataGen",
+ "PropertyMap": {
+ "records.per.sec": 10.0
+ }
+ },
+ {
+ "PropertyGroupId": "Iceberg",
+ "PropertyMap": {
+ "table.bucket.arn": "arn:aws:s3tables:us-east-1:111111111:bucket/iceberg",
+ "catalog.db": "iceberg",
+ "catalog.table": "prices_s3table"
+ }
+ }
+]
\ No newline at end of file
diff --git a/java/Iceberg/S3TableSQLJSON/src/main/resources/log4j2.properties b/java/Iceberg/S3TableSQLJSON/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..a6cccce
--- /dev/null
+++ b/java/Iceberg/S3TableSQLJSON/src/main/resources/log4j2.properties
@@ -0,0 +1,13 @@
+# Log4j2 configuration
+status = warn
+name = PropertiesConfig
+
+# Console appender configuration
+appender.console.type = Console
+appender.console.name = ConsoleAppender
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# Root logger configuration
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
\ No newline at end of file
diff --git a/java/Iceberg/S3TableSQLJSON/src/main/resources/price.avsc b/java/Iceberg/S3TableSQLJSON/src/main/resources/price.avsc
new file mode 100644
index 0000000..6303e0d
--- /dev/null
+++ b/java/Iceberg/S3TableSQLJSON/src/main/resources/price.avsc
@@ -0,0 +1,23 @@
+{
+ "type": "record",
+ "name": "Price",
+ "namespace": "com.amazonaws.services.msf.avro",
+ "fields": [
+ {
+ "name": "timestamp",
+ "type": "string"
+ },
+ {
+ "name": "symbol",
+ "type": "string"
+ },
+ {
+ "name": "price",
+ "type": "float"
+ },
+ {
+ "name": "volumes",
+ "type": "int"
+ }
+ ]
+}
\ No newline at end of file