@@ -130,9 +130,9 @@ For this, we need two parts running:
130130> < /details>
131131
132132The Kafka server must be accessible to * external* applications.
133- For this we need a
134- [static IP address](https://cloud.google.com/compute/docs/ip-addresses/reserve-static-external-ip-address)
135- for the Kafka server to live.
133+ For this we need an
134+ [external static IP address](https://cloud.google.com/compute/docs/ip-addresses/reserve-static-external-ip-address)
135+ for the Kafka server to live. Not an internal IP address.
136136
137137> ℹ️ If you already have a Kafka server running you can skip this section.
138138> Just make sure to store its IP address into an environment variable.
@@ -183,9 +183,21 @@ To learn more about pricing, see the
183183```sh
184184export KAFKA_IMAGE="gcr.io/$PROJECT/samples/dataflow/kafka:latest"
185185
186+ # Note: If the project name has `:` in it that signifies a project within an
187+ # organization (e.g. `example.com:project-id`), replace those with `/` so that
188+ # the Kafka image can be found appropriately.
189+
186190# Build the Kafka server image into Container Registry.
187191gcloud builds submit --tag $KAFKA_IMAGE kafka/
188192
193+ # If a different topic, address, kafka port, or zookeeper port is desired,
194+ # update the following environment variables before starting the server.
195+ # Otherwise, the default values will be used in the Dockerfile:
196+ export KAFKA_TOPIC=<topic-name>
197+ export KAFKA_ADDRESS=<kafka-address>
198+ export KAFKA_PORT=<kafka-port>
199+ export ZOOKEEPER_PORT=<zookeeper-port>
200+
189201# Create and start a new instance.
190202# The --address flag binds the VM' s address to the static address we created.
191203# The --container-env KAFKA_ADDRESS is an environment variable passed to the
@@ -200,6 +212,70 @@ gcloud compute instances create-with-container kafka-vm \
200212 --tags " kafka-server"
201213` ` `
202214
215+ Note: The Kafka server should be running at this point, but in its current state
216+ no messages are being sent to a topic, which will cause the KafkaToBigQuery
217+ template to fail.
218+
219+
220+ # ## Sending messages to Kafka server
221+
222+ SSH into the ` kafka-vm` that was created earlier and issue
223+ the below commands that are required based on your timing. Messages sent before
224+ the template is started will be present when the template is started. If the
225+ desire is to send messages after the template has started, then the messages
226+ will be processed as they are sent.
227+
228+ Pre-Requisite SSH into the Kafka VM
229+
230+ ` ` ` sh
231+ $ gcloud compute ssh kafka-vm --zone " $ZONE "
232+ ` ` `
233+
234+ 1. Create a Topic
235+
236+ ` ` ` sh
237+ docker run --rm --network host bitnami/kafka:3.4.0 \
238+ /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
239+ --create --topic < topic-name> --partitions 1 --replication-factor 1
240+ ` ` `
241+
242+ 2. Send Messages to the Topic
243+
244+ Run the console producer to send messages. After running the command, type a
245+ message and press Enter. You can send multiple messages. Press Ctrl+C to stop
246+ the producer.
247+
248+ Note: You can run this step either before starting the Dataflow template
249+ (messages will be ready) or while it' s running (messages will be processed as
250+ they arrive).
251+
252+ ```sh
253+ docker run -i --rm --network host bitnami/kafka:3.4.0 \
254+ /opt/bitnami/kafka/bin/kafka-console-producer.sh \
255+ --bootstrap-server localhost:9092 --topic <topic-name>
256+ ```
257+
258+ 3. (Optional) Verify the Messages
259+
260+ You can check that your messages were sent correctly by starting a consumer.
261+ This will print all messages from the beginning of the topic. Press Ctrl+C to
262+ exit.
263+
264+ ```sh
265+ docker run -it --rm --network host bitnami/kafka:3.4.0 \
266+ /opt/bitnami/kafka/bin/kafka-console-consumer.sh \
267+ --bootstrap-server localhost:9092 --topic <topic-name> --from-beginning
268+ ```
269+
270+ 4. (Optional) Delete a Topic
271+
272+ ```sh
273+ docker run --rm --network host bitnami/kafka:3.4.0 \
274+ /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
275+ --delete --topic <topic-name>
276+ ```
277+
278+
203279### Creating and running a Flex Template
204280
205281> <details><summary>
@@ -257,6 +333,21 @@ gcloud dataflow flex-template run "kafka-to-bigquery-`date +%Y%m%d-%H%M%S`" \
257333 --region " $REGION "
258334` ` `
259335
336+ Note: If one of the parameters is a deeply nested json or dictionary, use the
337+ gcloud ` --flags-file` parameter to pass in a yaml file a list of all the
338+ parameters including the nested dictionary. Passing in the dictionary straight
339+ from the command line will give a gcloud error. The parameters file can look
340+ like this:
341+
342+ ` ` ` yaml
343+ --parameters:
344+ inputTopic: messages
345+ outputTable: $PROJECT :$DATASET .$TABLE
346+ bootstrapServer: $KAFKA_ADDRESS :9092
347+ schema:
348+ ' {type: object, properties: {processing_time: {type: TIMESTAMP}, url: {type: STRING}, rating: {type: STRING}}}'
349+ ` ` `
350+
260351Run the following query to check the results in BigQuery.
261352
262353` ` ` sh
0 commit comments