OCI Streaming Service – Kafka connect

To publish and consume messaging using the OCI Streaming Service, assuming the streaming service has already been created.

First install marven 3 or beyond, java 8 or beyond, both with yum, then the latest kafka binaries from https://kafka.apache.org/downloads

Take a note of all streams ocid from the managed service. Note that the kafka connec, unlike the SDK interface, expect the end point to be specified running on the port 9092.

As a prereq, create the marven configuration file in the working directory

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>oci.example</groupId>
    <artifactId>StreamsExampleWithKafkaApis</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>LATEST</version>
        </dependency>
    </dependencies>
</project>

To publish messages

#1 Create test the file src/main/java/kafka/sdk/oss/example/Producer.java, customise the sections relevant for the OCI stream service

package kafka.sdk.oss.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer {

  static String bootstrapServers = "https://xxx.streaming.me-dubai-1.oci.oraclecloud.com:9092" ;
  static String tenancyName = "xxx";
  static String username = "xxxxxx";
  static String streamPoolId = "ocid1.streampool.oc1.me-dubai-1.xxx";
  static String authToken = "xxx";
  static String streamOrKafkaTopicName = "xxx-streams"; 

  private static Properties getKafkaProperties() {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", bootstrapServers);
    properties.put("security.protocol", "SASL_SSL");
    properties.put("sasl.mechanism", "PLAIN");
    properties.put("enable.idempotence", false);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
        + tenancyName + "/"
        + username + "/"
        + streamPoolId + "\" "
        + "password=\""
        + authToken + "\";";
    properties.put("sasl.jaas.config", value);
    properties.put("retries", 3); // retries on transient errors and load balancing disconnection
    properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB
    return properties;
  }

  public static void main(String args[]) {
    try {
      Properties properties = getKafkaProperties();
      KafkaProducer producer = new KafkaProducer<>(properties);

      for (int i = 0; i < 10; i++) {
        ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i,
            "messageValue" + i);
        producer.send(record, (md, ex) -> {
          if (ex != null) {
            System.err.println("exception occurred in producer for review :" + record.value()
                + ", exception is " + ex);
            ex.printStackTrace();
          } else {
            System.err
                .println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp());
          }
        });
      }
      // producer.send() is async, to make sure all messages are sent we use
      // producer.flush()
      producer.flush();
      producer.close();
    } catch (Exception e) {
      System.err.println("Error: exception " + e);
    }
  }
}

#2 Run the file

mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer

[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building StreamsExampleWithKafkaApis 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.4.1:clean (default-clean) @ StreamsExampleWithKafkaApis ---
[INFO] Deleting /home/oracle/streams/kafka/target
[INFO]
[INFO] --- maven-resources-plugin:2.5:resources (default-resources) @ StreamsExampleWithKafkaApis ---
[debug] execute contextualize
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/oracle/streams/kafka/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:compile (default-compile) @ StreamsExampleWithKafkaApis ---
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 2 source files to /home/oracle/streams/kafka/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.5:testResources (default-testResources) @ StreamsExampleWithKafkaApis ---
[debug] execute contextualize
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/oracle/streams/kafka/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ StreamsExampleWithKafkaApis ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.10:test (default-test) @ StreamsExampleWithKafkaApis ---
[INFO] No tests to run.
[INFO] Surefire report directory: /home/oracle/streams/kafka/target/surefire-reports

-------------------------------------------------------
 T E S T S
-------------------------------------------------------

Results :

Tests run: 0, Failures: 0, Errors: 0, Skipped: 0

[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ StreamsExampleWithKafkaApis ---
[INFO] Building jar: /home/oracle/streams/kafka/target/StreamsExampleWithKafkaApis-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-install-plugin:2.3.1:install (default-install) @ StreamsExampleWithKafkaApis ---
[INFO] Installing /home/oracle/streams/kafka/target/StreamsExampleWithKafkaApis-1.0-SNAPSHOT.jar to /home/oracle/.m2/repository/oci/example/StreamsExampleWithKafkaApis/1.0-SNAPSHOT/StreamsExampleWithKafkaApis-1.0-SNAPSHOT.jar
[INFO] Installing /home/oracle/streams/kafka/pom.xml to /home/oracle/.m2/repository/oci/example/StreamsExampleWithKafkaApis/1.0-SNAPSHOT/StreamsExampleWithKafkaApis-1.0-SNAPSHOT.pom
[INFO]
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ StreamsExampleWithKafkaApis ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Sent msg to 0 with offset 16 at 1697208154335
Sent msg to 0 with offset 17 at 1697208154335
Sent msg to 0 with offset 18 at 1697208154335
Sent msg to 0 with offset 19 at 1697208154335
Sent msg to 0 with offset 20 at 1697208154335
Sent msg to 0 with offset 21 at 1697208154335
Sent msg to 0 with offset 22 at 1697208154335
Sent msg to 0 with offset 23 at 1697208154335
Sent msg to 0 with offset 24 at 1697208154335
Sent msg to 0 with offset 25 at 1697208154335
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.175s
[INFO] Finished at: Fri Oct 13 14:42:34 GMT 2023
[INFO] Final Memory: 24M/428M
[INFO] ------------------------------------------------------------------------
$

To consume messages

#1 Create test the file src/main/java/kafka/sdk/oss/example/Consumer.java, customise the sections relevant for the OCI stream service

package kafka.sdk.oss.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
  static String bootstrapServers = "https://xxx.streaming.me-dubai-1.oci.oraclecloud.com:9092" ;
  static String tenancyName = "xxx";
  static String username = "xxx";
  static String streamPoolId = "ocid1.streampool.oc1.me-dubai-1.xxx";
  static String authToken = "xxx";
  static String streamOrKafkaTopicName = "xxx-streams";
  static String consumerGroupName = "org.apache.kafka";

  private static Properties getKafkaProperties() {
    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("group.id", consumerGroupName);
    props.put("enable.auto.commit", "false");
    props.put("session.timeout.ms", "30000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.mechanism", "PLAIN");
    props.put("auto.offset.reset", "earliest");
    final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
        + tenancyName + "/"
        + username + "/"
        + streamPoolId + "\" "
        + "password=\""
        + authToken + "\";";
    props.put("sasl.jaas.config", value);
    return props;
  }

  public static void main(String[] args) {
    final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());
    ;
    consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName));
    ConsumerRecords<Integer, String> records = consumer.poll(10000);

    System.out.println("size of records polled is " + records.count());
    for (ConsumerRecord<Integer, String> record : records) {
      System.out
          .println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
    }

    consumer.commitSync();
    consumer.close();
  }
}

#2 Run the following command

mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer

[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building StreamsExampleWithKafkaApis 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.4.1:clean (default-clean) @ StreamsExampleWithKafkaApis ---
[INFO] Deleting /home/oracle/streams/kafka/target
[INFO]
[INFO] --- maven-resources-plugin:2.5:resources (default-resources) @ StreamsExampleWithKafkaApis ---
[debug] execute contextualize
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/oracle/streams/kafka/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:compile (default-compile) @ StreamsExampleWithKafkaApis ---
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 2 source files to /home/oracle/streams/kafka/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.5:testResources (default-testResources) @ StreamsExampleWithKafkaApis ---
[debug] execute contextualize
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/oracle/streams/kafka/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ StreamsExampleWithKafkaApis ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.10:test (default-test) @ StreamsExampleWithKafkaApis ---
[INFO] No tests to run.
[INFO] Surefire report directory: /home/oracle/streams/kafka/target/surefire-reports

-------------------------------------------------------
 T E S T S
-------------------------------------------------------

Results :

Tests run: 0, Failures: 0, Errors: 0, Skipped: 0

[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ StreamsExampleWithKafkaApis ---
[INFO] Building jar: /home/oracle/streams/kafka/target/StreamsExampleWithKafkaApis-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-install-plugin:2.3.1:install (default-install) @ StreamsExampleWithKafkaApis ---
[INFO] Installing /home/oracle/streams/kafka/target/StreamsExampleWithKafkaApis-1.0-SNAPSHOT.jar to /home/oracle/.m2/repository/oci/example/StreamsExampleWithKafkaApis/1.0-SNAPSHOT/StreamsExampleWithKafkaApis-1.0-SNAPSHOT.jar
[INFO] Installing /home/oracle/streams/kafka/pom.xml to /home/oracle/.m2/repository/oci/example/StreamsExampleWithKafkaApis/1.0-SNAPSHOT/StreamsExampleWithKafkaApis-1.0-SNAPSHOT.pom
[INFO]
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ StreamsExampleWithKafkaApis ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
size of records polled is 20
Received message: (messageKey0, messageValue0) at offset 6
Received message: (messageKey1, messageValue1) at offset 7
Received message: (messageKey2, messageValue2) at offset 8
Received message: (messageKey3, messageValue3) at offset 9
Received message: (messageKey4, messageValue4) at offset 10
Received message: (messageKey5, messageValue5) at offset 11
Received message: (messageKey6, messageValue6) at offset 12
Received message: (messageKey7, messageValue7) at offset 13
Received message: (messageKey8, messageValue8) at offset 14
Received message: (messageKey9, messageValue9) at offset 15
Received message: (messageKey0, messageValue0) at offset 16
Received message: (messageKey1, messageValue1) at offset 17
Received message: (messageKey2, messageValue2) at offset 18
Received message: (messageKey3, messageValue3) at offset 19
Received message: (messageKey4, messageValue4) at offset 20
Received message: (messageKey5, messageValue5) at offset 21
Received message: (messageKey6, messageValue6) at offset 22
Received message: (messageKey7, messageValue7) at offset 23
Received message: (messageKey8, messageValue8) at offset 24
Received message: (messageKey9, messageValue9) at offset 25
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.706s
[INFO] Finished at: Fri Oct 13 14:44:50 GMT 2023
[INFO] Final Memory: 22M/329M
[INFO] ------------------------------------------------------------------------