arrow_back

Confluent: Developing a Streaming Microservices Application

Gabung Login
Test and share your knowledge with our community!
done
Get access to over 700 hands-on labs, skill badges, and courses

Confluent: Developing a Streaming Microservices Application

Lab 1 jam 30 menit universal_currency_alt 5 Kredit show_chart Menengah
Test and share your knowledge with our community!
done
Get access to over 700 hands-on labs, skill badges, and courses

This lab was developed with our partner, Confluent. Your personal information may be shared with Confluent, the lab sponsor, if you have opted-in to receive product updates, announcements, and offers in your Account Profile.

GSP741

Google Cloud self-paced labs logo

Overview

This lab is based on a small microservices ecosystem, showcasing an order management workflow, such as one might find in retail and online shopping. It is built using Kafka Streams, whereby business events that describe the order management workflow propagate through this ecosystem. The blog post Building a Microservices Ecosystem with Kafka Streams and ksqlDB outlines the approach used in the lab.

Objectives

In this lab, you will:

  • Persist events into Kafka by producing records that represent customer orders
  • Write a service that validates customer orders
  • Write a service that joins streaming order information with streaming payment information and data from a customer database
  • Define one set of criteria to filter records in a stream based on some criteria
  • Create a session window to define five-minute windows for processing
  • Create a state store for the Inventory Service
  • Create one persistent query that enriches the orders stream with customer information using a stream-table join

Setup and requirements

Before you click the Start Lab button

Read these instructions. Labs are timed and you cannot pause them. The timer, which starts when you click Start Lab, shows how long Google Cloud resources will be made available to you.

This hands-on lab lets you do the lab activities yourself in a real cloud environment, not in a simulation or demo environment. It does so by giving you new, temporary credentials that you use to sign in and access Google Cloud for the duration of the lab.

To complete this lab, you need:

  • Access to a standard internet browser (Chrome browser recommended).
Note: Use an Incognito or private browser window to run this lab. This prevents any conflicts between your personal account and the Student account, which may cause extra charges incurred to your personal account.
  • Time to complete the lab---remember, once you start, you cannot pause a lab.
Note: If you already have your own personal Google Cloud account or project, do not use it for this lab to avoid extra charges to your account.

How to start your lab and sign in to the Google Cloud console

  1. Click the Start Lab button. If you need to pay for the lab, a pop-up opens for you to select your payment method. On the left is the Lab Details panel with the following:

    • The Open Google Cloud console button
    • Time remaining
    • The temporary credentials that you must use for this lab
    • Other information, if needed, to step through this lab
  2. Click Open Google Cloud console (or right-click and select Open Link in Incognito Window if you are running the Chrome browser).

    The lab spins up resources, and then opens another tab that shows the Sign in page.

    Tip: Arrange the tabs in separate windows, side-by-side.

    Note: If you see the Choose an account dialog, click Use Another Account.
  3. If necessary, copy the Username below and paste it into the Sign in dialog.

    {{{user_0.username | "Username"}}}

    You can also find the Username in the Lab Details panel.

  4. Click Next.

  5. Copy the Password below and paste it into the Welcome dialog.

    {{{user_0.password | "Password"}}}

    You can also find the Password in the Lab Details panel.

  6. Click Next.

    Important: You must use the credentials the lab provides you. Do not use your Google Cloud account credentials. Note: Using your own Google Cloud account for this lab may incur extra charges.
  7. Click through the subsequent pages:

    • Accept the terms and conditions.
    • Do not add recovery options or two-factor authentication (because this is a temporary account).
    • Do not sign up for free trials.

After a few moments, the Google Cloud console opens in this tab.

Note: To view a menu with a list of Google Cloud products and services, click the Navigation menu at the top-left. Navigation menu icon

Activate Cloud Shell

Cloud Shell is a virtual machine that is loaded with development tools. It offers a persistent 5GB home directory and runs on the Google Cloud. Cloud Shell provides command-line access to your Google Cloud resources.

  1. Click Activate Cloud Shell Activate Cloud Shell icon at the top of the Google Cloud console.

When you are connected, you are already authenticated, and the project is set to your Project_ID, . The output contains a line that declares the Project_ID for this session:

Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}

gcloud is the command-line tool for Google Cloud. It comes pre-installed on Cloud Shell and supports tab-completion.

  1. (Optional) You can list the active account name with this command:
gcloud auth list
  1. Click Authorize.

Output:

ACTIVE: * ACCOUNT: {{{user_0.username | "ACCOUNT"}}} To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (Optional) You can list the project ID with this command:
gcloud config list project

Output:

[core] project = {{{project_0.project_id | "PROJECT_ID"}}} Note: For full documentation of gcloud, in Google Cloud, refer to the gcloud CLI overview guide.

Task 1. Persist events

An event is simply a thing that happened or occurred. An event in a business is some fact that occurred, such as a sale, an invoice, a trade, or a customer experience, and it is the source of truth. In event-oriented architectures, events are first-class citizens that constantly push data into applications. Client applications can then react to these streams of events in real time and decide what to do next.

Event-oriented architecture flow diagram

In this section, you will persist events into Kafka by producing records that represent customer orders. This event happens in the Orders Service, which provides a REST interface to POST and GET Orders. Posting an Order is essentially a REST call, and it creates the event in Kafka.

  1. In Cloud Shell, start by cloning the Examples repository and the Kafka Streams Examples repository. You will use the latter for testing purposes.
git clone https://github.com/confluentinc/examples git clone https://github.com/confluentinc/kafka-streams-examples.git
  1. Change the Java version to 8 (since this is the version used in the branched Kafka Streams Examples repository) and set the JAVA_HOME environment variable:
sudo apt-get install -y openjdk-8-jre sudo apt-get install -y openjdk-8-jdk export JAVA_HOME='/usr/lib/jvm/java-8-openjdk-amd64'
  1. Next, run the following to navigate to the kafka-streams-examples folder and switch to the 5.5.1-post branch:
cd ~/kafka-streams-examples/ # change directory and change to 5.5.1-post branch git checkout 5.5.1-post
  1. Next, navigate to mircoservice-orders/exercises directory and switch to the 5.5.1-post branch:
cd ~/examples/microservices-orders/exercises # change directory and change to 5.5.1-post branch git checkout 5.5.1-post
  1. Click Open Editor (Open Editor icon) on the toolbar of Cloud Shell. (You can switch between Cloud Shell and the code editor by using the Open Editor and Open Terminal icons as required, or click the Open in new window button to leave the Editor open in a separate tab).

  2. Open the OrdersService.java file by navigating to examples/microservices-orders/exercises/OrdersService.java in the Editor.

  3. In OrdersService.java, fill in TODO 1.1: create a new ProducerRecord with a key specified by bean.getId() and value of the bean, to the orders topic whose name is specified by ORDERS.name():

producer.send(new ProducerRecord<>(ORDERS.name(), bean.getId(), bean),
  1. Next, fill in TODO 1.2: produce the newly created record using the existing producer and pass use the OrdersService#callback function to send the response and the record key:
callback(response, bean.getId()));
  1. Navigate back to Cloud Shell. To test your code, run the following commands to copy your version of the file to the main project, compile, and run the unit test:
# Copy your exercise client application to the project cp ~/examples/microservices-orders/exercises/OrdersService.java ~/kafka-streams-examples/src/main/java/io/confluent/examples/streams/microservices/. # Compile the project and resolve any compilation errors mvn clean compile -DskipTests -f ~/kafka-streams-examples/pom.xml # Run the test and validate that it passes mvn compile -Dtest=io.confluent.examples.streams.microservices.OrdersServiceTest test -f ~/kafka-streams-examples/pom.xml

This may take a couple of minutes to complete.

Note: You can proceed to the next section while you compile and run the tests in the background.

Task 2. Event-driven applications

Service-based architectures are often designed to be request-driven, in which services send commands to other services to tell them what to do, await a response, or send queries to get the resulting state. Building services on a protocol of requests and responses forces a complicated web of synchronous dependencies that bind services together.

Request driven way versus event driven way illustration

In contrast, in an event-driven design, the event stream is the inter-service communication that enables services to cross deployment boundaries and avoids synchronous execution. When and how downstream services respond to those events is within their control, which reduces the coupling between services and enables an architecture with more pluggability. Read more on Build Services on a Backbone of Events from the Confluent Developer documentation.

Event-driven processing graphic illustrating the OrderDetailsService trigger between order and order validations

In this section, you will write a service that validates customer orders. Instead of using a series of synchronous calls to submit and validate orders, the order event itself triggers the OrderDetailsService. When a new order is created, it is written to the topic orders, from which OrderDetailsService has a consumer polling for new records.

  1. In the Cloud Shell Editor, navigate to examples/microservices-orders/exercises and open the OrderDetailsService.java file.

  2. In OrderDetailsService.java, implement TODO 2.1: subscribe the existing consumer to a Collections#singletonList with the orders topic whose name is specified by Topics.ORDERS.name():

consumer.subscribe(singletonList(Topics.ORDERS.name()));
  1. Implement TODO 2.2: validate the order using OrderDetailsService#isValid and save the validation result to type OrderValidationResult:
isValid(order) ? PASS : FAIL;
  1. Adding to the previous command, implement TODO 2.3: create a new record using OrderDetailsService#result() that takes the order and validation result:
result(order, isValid(order) ? PASS : FAIL);
  1. Finally, implement TODO 2.4: produce the newly created record using the existing producer:
producer.send(result(order, isValid(order) ? PASS : FAIL)); Note: You will only put the provided code for TODO 2.1 and TODO 2.4 in the file.
  1. Navigate back to Cloud Shell. To test your code, run the following commands to copy your version of the file to the main project, compile, and run the unit test:
# Copy your exercise client application to the project cp ~/examples/microservices-orders/exercises/OrderDetailsService.java ~/kafka-streams-examples/src/main/java/io/confluent/examples/streams/microservices/. # Compile the project and resolve any compilation errors mvn clean compile -DskipTests -f ~/kafka-streams-examples/pom.xml # Run the test and validate that it passes mvn compile -Dtest=io.confluent.examples.streams.microservices.OrderDetailsServiceTest test -f ~/kafka-streams-examples/pom.xml

This may take a couple of minutes to complete.

Note: If you receive COMPILATION ERROR, please wait for a minute and re-run the above commands.

Task 3. Enriching streams with joins

Streams can be enriched with data from other streams or tables through joins. A join enriches data by performing lookups in a streaming context where data is updated continuously and concurrently. For example, applications backing an online retail store might enrich new data records with information from multiple databases.

In this scenario, it may be that a stream of customer transactions is enriched with sales price, inventory, customer information, etc. These lookups can be performed at very large scale and with a low processing latency.

Illustration of a customer transaction stream scenario

A popular pattern is to make the information in the databases available in Kafka through so-called change data capture (CDC), together with Kafka's Connect API to pull in the data from the database.

Once the data is in Kafka, client applications can perform very fast and efficient joins of such tables and streams, rather than requiring the application to make a query to a remote database over the network for each record. Read more on an overview of distributed, real-time joins and implementing joins in Kafka Streams.

Kafka stream flow diagram

In this exercise, you will write a service that joins streaming order information with streaming payment information and data from a customer database. First, the payment stream needs to be rekeyed to match the same key info as the order stream before joined together.

The resulting stream is then joined with the customer information that was read into Kafka by a JDBC source from a customer database. Additionally, this service performs dynamic routing: an enriched order record is written to a topic that is determined from the value of level field of the corresponding customer.

  1. In the Cloud Shell editor, navigate to examples/microservices-orders/exercises/EmailService.java and open the file.

  2. In EmailService.java, implement TODO 3.1: create a new KStream called payments from payments_original, using KStream#selectKey to rekey on order id specified by payment.getOrderId() instead of payment id.

You will replace this:

final KStream<String, Payment> payments_original = builder.stream(PAYMENTS.name(), Consumed.with(PAYMENTS.keySerde(), PAYMENTS.valueSerde()));

With this:

final KStream<String, Payment> payments = builder.stream(PAYMENTS.name(), Consumed.with(PAYMENTS.keySerde(), PAYMENTS.valueSerde())) //Rekey payments to be by OrderId for the windowed join .selectKey((s, payment) -> payment.getOrderId());
  1. Next, implement TODO 3.2: do a stream-table join with the customers table, which requires three arguments:
  • The GlobalKTable for the stream-table join.
  • customer Id, specified by order.getCustomerId(), using a KeyValueMapper that gets the customer id from the tuple in the record's value.
  • A method that computes a value for the result record, in this case EmailTuple::setCustomer.
.join(customers,(key1, tuple) -> tuple.order.getCustomerId(),EmailTuple::setCustomer)
  1. Implement TODO 3.3: route an enriched order record to a topic that is dynamically determined from the value of the customerLevel field of the corresponding customer:
.to((orderId, orderEnriched, record) -> orderEnriched.getCustomerLevel(), Produced.with(ORDERS_ENRICHED.keySerde(), ORDERS_ENRICHED.valueSerde()));
  1. Navigate back to Cloud Shell. To test your code, run the following to copy your version of the file to the main project, compile, and run the unit test:
# Copy your exercise client application to the project cp ~/examples/microservices-orders/exercises/EmailService.java ~/kafka-streams-examples/src/main/java/io/confluent/examples/streams/microservices/. # Compile the project and resolve any compilation errors mvn clean compile -DskipTests -f ~/kafka-streams-examples/pom.xml # Run the test and validate that it passes mvn compile -Dtest=io.confluent.examples.streams.microservices.EmailServiceTest test -f ~/kafka-streams-examples/pom.xml

This may take a couple of minutes to complete.

Task 4. Filtering and branching

A stream of events can be captured in a Kafka topic. Client applications can then manipulate this stream based on some user-defined criteria, even creating new streams of data that they can act on or downstream services can act on. These help create new streams with more logically consistent data.

In some cases, the application may need to filter events from an input stream that match certain criteria, which results in a new stream with just a subset of records from the original stream.

In other cases, the application may need to branch events, whereby each event is tested against a predicate and then routed to a stream that matches, which results in multiple new streams split from the original stream.

Filtering and branching events illustration

In this section, you will define one set of criteria to filter records in a stream based on some criteria. Then you will define another set of criteria to branch records into two different streams.

  1. In the Cloud Shell Editor, navigate to examples/microservices-orders/exercises and open the FraudService.java file.

  2. In FraudService.java, implement TODO 4.1: filter this stream to include only orders in "CREATED" state, i.e., it should satisfy the predicate OrderState.CREATED.equals(order.getState()):

.filter((id, order) -> OrderState.CREATED.equals(order.getState()));
  1. Next, implement TODO 4.2: create a KStream<String, OrderValue> array from the ordersWithTotals stream by branching the records based on OrderValue#getValue.
  • First branched stream: FRAUD_CHECK will fail for predicate where order value >= FRAUD_LIMIT
  • Second branched stream: FRAUD_CHECK will pass for predicate where order value < FRAUD_LIMIT
@SuppressWarnings("unchecked") final KStream<String, OrderValue>[] forks = ordersWithTotals.branch( (id, orderValue) -> orderValue.getValue() >= FRAUD_LIMIT, (id, orderValue) -> orderValue.getValue() < FRAUD_LIMIT);
  1. Navigate back to Cloud Shell. To test your code, run the following to copy your version of the file to the main project, compile, and run the unit test:
# Copy your exercise client application to the project cp ~/examples/microservices-orders/exercises/FraudService.java ~/kafka-streams-examples/src/main/java/io/confluent/examples/streams/microservices/. # Compile the project and resolve any compilation errors mvn clean compile -DskipTests -f ~/kafka-streams-examples/pom.xml # Run the test and validate that it passes mvn compile -Dtest=io.confluent.examples.streams.microservices.FraudServiceTest test -f ~/kafka-streams-examples/pom.xml

This may take a couple of minutes to complete.

Task 5. Stateful operations

An aggregation operation takes one input stream or table, and yields a new table by combining multiple input records into a single output record.

Examples of aggregations are computing count or sum, because they combine current record values with previous record values. These are stateful operations because they maintain data during processing.

Aggregations are always key-based operations, and Kafka's Streams API ensures that records for the same key are always routed to the same stream processing task. Oftentimes, these are combined with windowing capabilities in order to run computations in real time over a window of time.

Aggregations operation illustration

In this section, you will create a session window to define five-minute windows for processing.

Additionally, you will use a stateful operation, reduce, to collapse duplicate records in a stream. Before running reduce you will group the records to repartition the data, which is generally required before using an aggregation operator.

  1. In the Cloud Shell Editor, navigate to examples/microservices-orders/exercises and open the ValidationsAggregatorService.java file.

  2. In ValidationsAggregatorService.java, implement TODO 5.1: window the data using KGroupedStream#windowedBy, specifically using SessionWindows.with to define 5-minute windows:

.windowedBy(SessionWindows.with(Duration.ofMinutes(5)))
  1. Next, implement TODO 5.2: group the records by key using KStream#groupByKey, providing the existing Serialized instance for ORDERS:
.groupByKey(serdes6)
  1. Implement TODO 5.3: use an aggregation operator KTable#reduce to collapse the records in this stream to a single order for a given key:
.reduce((order, v1) -> order)
  1. Navigate back to Cloud Shell. To test your code, run the following to copy your version of the file to the main project, compile, and run the unit test:
# Copy your exercise client application to the project cp ~/examples/microservices-orders/exercises/ValidationsAggregatorService.java ~/kafka-streams-examples/src/main/java/io/confluent/examples/streams/microservices/. # Compile the project and resolve any compilation errors mvn clean compile -DskipTests -f ~/kafka-streams-examples/pom.xml # Run the test and validate that it passes mvn compile -Dtest=io.confluent.examples.streams.microservices.ValidationsAggregatorServiceTest test -f ~/kafka-streams-examples/pom.xml

This may take a couple of minutes to complete.

Task 6. State stores

Kafka Streams provides so-called state stores, which are disk-resident hash tables held inside the API for the client application.

The state store can be used within stream processing applications to store and query data, an important capability when implementing stateful operations. It can be used to remember recently received input records, to track rolling aggregates, to de-duplicate input records, etc.

Stream processing applications graphic that illustrates how state stores can be used to implement stateful operations

It is also backed by a Kafka topic and comes with all the Kafka guarantees. Consequently, other applications can also interactively query another application's state store.

Querying state stores is always read-only to guarantee that the underlying state stores will never be mutated out-of-band (i.e., you cannot add new entries).

Sate store interaction with Kafka Topics illustration

In this section, you will create a state store for the Inventory Service. This state store is initialized with data from a Kafka topic before the service starts processing, and then it is updated as new orders are created.

  1. In the Cloud Shell editor, navigate to examples/microservices-orders/exercises and open the InventoryService.java file.

  2. In InventoryService.java, implement TODO 6.1: create a state store called RESERVED_STOCK_STORE_NAME, using Stores#keyValueStoreBuilder and Stores#persistentKeyValueStore.

  • The key Serde is derived from the topic specified by WAREHOUSE_INVENTORY
  • The value Serde is derived from Serdes.Long() because it represents a count
.keyValueStoreBuilder(Stores.persistentKeyValueStore(RESERVED_STOCK_STORE_NAME), Topics.WAREHOUSE_INVENTORY.keySerde(), Serdes.Long())
  1. Implement TODO 6.2: update the reserved stock in the KeyValueStore called reservedStocksStore.
  • The key is the product in the order, using OrderBean#getProduct
  • The value is the sum of the current reserved stock and the quantity in the order, using OrderBean#getQuantity
reservedStocksStore.put(order.getProduct(), reserved + order.getQuantity());
  1. Navigate back to Cloud Shell. To test your code, run the following to copy your version of the file to the main project, compile, and run the unit test:
# Copy your exercise client application to the project cp ~/examples/microservices-orders/exercises/InventoryService.java ~/kafka-streams-examples/src/main/java/io/confluent/examples/streams/microservices/. # Compile the project and resolve any compilation errors mvn clean compile -DskipTests -f ~/kafka-streams-examples/pom.xml # Run the test and validate that it passes mvn compile -Dtest=io.confluent.examples.streams.microservices.InventoryServiceTest test -f ~/kafka-streams-examples/pom.xml

This may take a couple of minutes to complete.

Note: If you receive COMPILATION ERROR, please wait for a minute and re-run the above commands.

Task 7. Enrichment with ksqlDB

Confluent ksqlDB is the streaming SQL engine that enables real-time data processing against Apache Kafka. It provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka, without requiring you to write code in a programming language such as Java or Python.

ksqlDB is scalable, elastic, fault tolerant, and it supports a wide range of streaming operations, including data filtering, transformations, aggregations, joins, windowing, and sessionization.

KSQL stream engine interaction with Kafka Topics illustration

You can use ksqlDB to merge streams of data in real time by using a SQL-like join syntax. A ksqlDB join and a relational database join are similar in that they both combine data from two sources based on common values. The result of a ksqlDB join is a new stream or table that's populated with the column values that you specify in a SELECT statement. ksqlDB also supports several aggregate functions, like COUNT and SUM. You can use these to build stateful aggregates on streaming data.

In this section, you will create one persistent query that enriches the orders stream with customer information using a stream-table join. You will create another persistent query that detects fraudulent behavior by counting the number of orders in a given window.

  1. In the Cloud Console, from the Navigation menu, navigate to Compute Engine > VM Instances.

  2. Click SSH for the microservices-vm instance.

  3. In your SSH window, run the following script to bootstrap the VM:

sudo apt-get install -y openjdk-8-jre sudo apt-get install -y openjdk-8-jdk export JAVA_HOME='/usr/lib/jvm/java-8-openjdk-amd64' curl -O https://raw.githubusercontent.com/GoogleCloudPlatform/training-data-analyst/master/quests/confluent/bootstrap_vm.sh chmod +x bootstrap_vm.sh ./bootstrap_vm.sh
  1. Once the commands finish executing, exit out of your SSH window.

  2. Now, click SSH again to restart the session.

  3. Clone the repository, and switch to the 5.5.1-post branch:

git clone https://github.com/confluentinc/examples cd ~/examples/microservices-orders/exercises # change directory and change to 5.5.1-post branch git checkout 5.5.1-post Note: The #TODO implementations you've written won't be used in this last part of the lab. You can simply copy over the files from the repository again.
  1. Run the following to change to the microservices-orders directory, and build the Docker image:
sudo chmod -R 777 ~/examples/microservices-orders/ cd ~/examples/microservices-orders export LD_LIBRARY_PATH='/usr/bin/openssl' sudo sysctl -w vm.max_map_count=262144 docker-compose up -d --build
  1. After a minute or so, run the docker-compose ps status command to ensure that everything has started correctly:
docker-compose ps Note: Make sure your ksqldb-server is up (healthy) and running, and the kafka-setup and ksql-query-setup are in the Exit 0 state before you run the next command, as this will ensure the queries have been executed. This may take up to 5 minutes to start up.
  1. Launch the ksqlDB CLI:
docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
  1. From the ksqlDB CLI prompt, execute the following command to see the Kafka topics created. For this lab, you will be using orders and customers topics.
SHOW TOPICS;

You should see the following topics:

ksqlDB CLI prompt displaying a list of created Kafka topics

Click Check my progress to verify the objective. Enrichment with ksqlDB

  1. Next, you will create some persistent queries. Start by creating a new ksqlDB stream that does a stream-table join between orders and my_customers_table based on customer id:
CREATE STREAM my_orders WITH (kafka_topic='orders', value_format='AVRO'); --Next 3 steps are required to create a TABLE with keys of String type --1. Create a stream CREATE STREAM my_customers_with_wrong_format_key WITH (kafka_topic='customers', value_format='AVRO'); --2. Derive a new stream with the required key changes. --The CAST statement converts the key to the required format. --The PARTITION BY clause re-partitions the stream based on the new, converted key. CREATE STREAM my_customers_with_proper_key WITH (KAFKA_TOPIC='my_customers-with-proper-key') AS SELECT CAST(id as BIGINT) as customerid, firstname, lastname, email, address, level FROM my_customers_with_wrong_format_key PARTITION BY CAST(id as BIGINT); --3. Create the table on the properly keyed stream CREATE TABLE my_customers_table (rowkey bigint KEY, customerid bigint, firstname varchar, lastname varchar, email varchar, address varchar, level varchar) WITH (KAFKA_TOPIC='my_customers-with-proper-key', VALUE_FORMAT='AVRO', KEY='customerid'); --Join customer information based on customer id CREATE STREAM my_orders_cust1_joined AS SELECT my_customers_table.customerid AS customerid, firstname, lastname, state, product, quantity, price FROM my_orders LEFT JOIN my_customers_table ON my_orders.customerid = my_customers_table.customerid;
  1. Finally, create a new ksqlDB table that counts if a customer submits more than 2 orders in a 30 second time window:
--Fraud alert if a customer submits more than 2 orders in a 30 second time window CREATE TABLE MY_FRAUD_ORDER AS SELECT CUSTOMERID, LASTNAME, FIRSTNAME, COUNT(*) AS COUNTS FROM my_orders_cust1_joined WINDOW TUMBLING (SIZE 30 SECONDS) GROUP BY CUSTOMERID, LASTNAME, FIRSTNAME HAVING COUNT(*)>2;
  1. The CLI parser will give immediate feedback whether your ksqlDB queries worked or not. Use the following command to see the rows in each query. You can exit out of the process by typing CTRL + C at any point.
-- Visualize frauds SELECT * FROM MY_FRAUD_ORDER EMIT CHANGES;

Click Check my progress to verify the objective. Create streams and tables in ksqlDB CLI

Congratulations!

In this lab, you started by producing records representing customer orders to persist into Kafka. You then wrote services that validated customer orders and joined streaming order information with streaming payment information and data from a customer database. Next, you defined one set of criteria to filter records in a stream. Lastly, you created a session window to define five-minute windows for processing, created a state store for the Inventory Service, and created one persistent query that enriched the orders stream with customer information using a stream-table join.

Next steps / learn more

Google Cloud training and certification

...helps you make the most of Google Cloud technologies. Our classes include technical skills and best practices to help you get up to speed quickly and continue your learning journey. We offer fundamental to advanced level training, with on-demand, live, and virtual options to suit your busy schedule. Certifications help you validate and prove your skill and expertise in Google Cloud technologies.

Manual Last Updated April 23, 2024

Lab Last Tested April 23, 2024

Copyright 2024 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.