OCI Streaming Service – Java SDK

To publish and consume messages in and out of the OCI Streaming Service, assuming the streaming service has already been created, using rhe Java SDK.

First install marven 3 or beyond, java 8 or beyond, java-oci-sdk, all with yum.

Take a note of all streams ocid from the managed service. Note that the Java SDK does nto expect a port specified on the end point URL.

As a prereq, create the marven configuration file in the working directory

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>oci.example</groupId>
    <artifactId>StreamsJava</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-common</artifactId>
            <version>LATEST</version>
        </dependency>
        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-streaming</artifactId>
            <version>3.25.4</version>
        </dependency>
        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-common-httpclient-jersey</artifactId>
            <version>3.25.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>LATEST</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>LATEST</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>LATEST</version>
        </dependency>
    </dependencies>
</project>

To publish messages

#1 Create test the file src/main/java/sdk/sdk/oss/example/Producer.java, customise the sections relevant for the OCI stream service

package oci.sdk.oss.example;

import com.oracle.bmc.ConfigFileReader;
import com.oracle.bmc.auth.AuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.streaming.StreamClient;
import com.oracle.bmc.streaming.model.PutMessagesDetails;
import com.oracle.bmc.streaming.model.PutMessagesDetailsEntry;
import com.oracle.bmc.streaming.model.PutMessagesResultEntry;
import com.oracle.bmc.streaming.requests.PutMessagesRequest;
import com.oracle.bmc.streaming.responses.PutMessagesResponse;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;

import static java.nio.charset.StandardCharsets.UTF_8;

public class Producer {
  public static void main(String[] args) throws Exception {

    final String configurationFilePath = "/home/oracle/.oci/config";
    final String profile = "xxx";
    final String ociStreamOcid = "ocid1.stream.oc1.me-dubai-1.xxx";
    final String ociMessageEndpoint = "https://xxx.streaming.me-dubai-1.oci.oraclecloud.com";

    final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
    final AuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider(configFile);

    // Streams are assigned a specific endpoint url based on where they are
    // provisioned.
    // Create a stream client using the provided message endpoint.
    StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);

    // publish some messages to the stream
    publishExampleMessages(streamClient, ociStreamOcid);

  }

  private static void publishExampleMessages(StreamClient streamClient, String streamId) {
    // build up a putRequest and publish some messages to the stream
    List<PutMessagesDetailsEntry> messages = new ArrayList<>();
    for (int i = 0; i < 50; i++) {
      messages.add(
          PutMessagesDetailsEntry.builder()
              .key(String.format("messageKey%s", i).getBytes(UTF_8))
              .value(String.format("messageValue%s", i).getBytes(UTF_8))
              .build());
    }

    System.out.println(
        String.format("Publishing %s messages to stream %s.", messages.size(), streamId));
    PutMessagesDetails messagesDetails = PutMessagesDetails.builder().messages(messages).build();

    PutMessagesRequest putRequest = PutMessagesRequest.builder()
        .streamId(streamId)
        .putMessagesDetails(messagesDetails)
        .build();

    PutMessagesResponse putResponse = streamClient.putMessages(putRequest);

    // the putResponse can contain some useful metadata for handling failures
    for (PutMessagesResultEntry entry : putResponse.getPutMessagesResult().getEntries()) {
      if (StringUtils.isNotBlank(entry.getError())) {
        System.out.println(
            String.format("Error(%s): %s", entry.getError(), entry.getErrorMessage()));
      } else {
        System.out.println(
            String.format(
                "Published message to partition %s, offset %s.",
                entry.getPartition(),
                entry.getOffset()));
      }
    }
  }

}

#2 Run the file

mvn clean install exec:java -Dexec.mainClass=sdk.oss.example.Producer

[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for oci.example:StreamsJava:jar:1.0-SNAPSHOT
[WARNING] 'dependencies.dependency.(groupId:artifactId:type:classifier)' must be unique: org.apache.commons:commons-lang3:jar -> duplicate declaration of version LATEST @ line 40, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building StreamsJava 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.4.1:clean (default-clean) @ StreamsJava ---
[INFO] Deleting /home/oracle/streams/sdk/target
[INFO]
[INFO] --- maven-resources-plugin:2.5:resources (default-resources) @ StreamsJava ---
[debug] execute contextualize
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/oracle/streams/sdk/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:compile (default-compile) @ StreamsJava ---
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 2 source files to /home/oracle/streams/sdk/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.5:testResources (default-testResources) @ StreamsJava ---
[debug] execute contextualize
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/oracle/streams/sdk/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ StreamsJava ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.10:test (default-test) @ StreamsJava ---
[INFO] No tests to run.
[INFO] Surefire report directory: /home/oracle/streams/sdk/target/surefire-reports

-------------------------------------------------------
 T E S T S
-------------------------------------------------------

Results :

Tests run: 0, Failures: 0, Errors: 0, Skipped: 0

[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ StreamsJava ---
[INFO] Building jar: /home/oracle/streams/sdk/target/StreamsJava-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-install-plugin:2.3.1:install (default-install) @ StreamsJava ---
[INFO] Installing /home/oracle/streams/sdk/target/StreamsJava-1.0-SNAPSHOT.jar to /home/oracle/.m2/repository/oci/example/StreamsJava/1.0-SNAPSHOT/StreamsJava-1.0-SNAPSHOT.jar
[INFO] Installing /home/oracle/streams/sdk/pom.xml to /home/oracle/.m2/repository/oci/example/StreamsJava/1.0-SNAPSHOT/StreamsJava-1.0-SNAPSHOT.pom
[INFO]
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ StreamsJava ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Publishing 50 messages to stream ocid1.stream.oc1.me-dubai-1.amaaaaaagakrxmyafvrqsrcjeisqw5xfi2sgw2ptzge5jprqhojdrhlfs5za.
Published message to partition 0, offset 76.
Published message to partition 0, offset 77.
Published message to partition 0, offset 78.
Published message to partition 0, offset 79.
Published message to partition 0, offset 80.
Published message to partition 0, offset 81.
Published message to partition 0, offset 82.
Published message to partition 0, offset 83.
Published message to partition 0, offset 84.
Published message to partition 0, offset 85.
Published message to partition 0, offset 86.
Published message to partition 0, offset 87.
Published message to partition 0, offset 88.
Published message to partition 0, offset 89.
Published message to partition 0, offset 90.
Published message to partition 0, offset 91.
Published message to partition 0, offset 92.
Published message to partition 0, offset 93.
Published message to partition 0, offset 94.
Published message to partition 0, offset 95.
Published message to partition 0, offset 96.
Published message to partition 0, offset 97.
Published message to partition 0, offset 98.
Published message to partition 0, offset 99.
Published message to partition 0, offset 100.
Published message to partition 0, offset 101.
Published message to partition 0, offset 102.
Published message to partition 0, offset 103.
Published message to partition 0, offset 104.
Published message to partition 0, offset 105.
Published message to partition 0, offset 106.
Published message to partition 0, offset 107.
Published message to partition 0, offset 108.
Published message to partition 0, offset 109.
Published message to partition 0, offset 110.
Published message to partition 0, offset 111.
Published message to partition 0, offset 112.
Published message to partition 0, offset 113.
Published message to partition 0, offset 114.
Published message to partition 0, offset 115.
Published message to partition 0, offset 116.
Published message to partition 0, offset 117.
Published message to partition 0, offset 118.
Published message to partition 0, offset 119.
Published message to partition 0, offset 120.
Published message to partition 0, offset 121.
Published message to partition 0, offset 122.
Published message to partition 0, offset 123.
Published message to partition 0, offset 124.
Published message to partition 0, offset 125.
[WARNING] thread Thread[jersey-client-async-executor-0,5,oci.sdk.oss.example.Producer] was interrupted but is still alive after waiting at least 15000msecs
[WARNING] thread Thread[jersey-client-async-executor-0,5,oci.sdk.oss.example.Producer] will linger despite being asked to die via interruption
[WARNING] NOTE: 1 thread(s) did not finish despite being asked to  via interruption. This is not a problem with exec:java, it is a problem with the running code. Although not serious, it should be remedied.
[WARNING] Couldn't destroy threadgroup o
[error]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 20.140s
[INFO] Finished at: Fri Oct 13 15:59:29 GMT 2023
[INFO] Final Memory: 35M/386M
[INFO] ------------------------------------------------------------------------

To consume messages

#1 Create test the file src/main/java/sdk/sdk/oss/example/Consumer.java, customise the sections relevant for the OCI stream service


package oci.sdk.oss.example;

import com.google.common.util.concurrent.Uninterruptibles;
import com.oracle.bmc.ConfigFileReader;
import com.oracle.bmc.auth.AuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.streaming.StreamClient;
import com.oracle.bmc.streaming.model.CreateGroupCursorDetails;
import com.oracle.bmc.streaming.model.Message;
import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest;
import com.oracle.bmc.streaming.requests.GetMessagesRequest;
import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse;
import com.oracle.bmc.streaming.responses.GetMessagesResponse;

import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;

public class Consumer {
  public static void main(String[] args) throws Exception {

    final String configurationFilePath = "/home/oracle/.oci/config";
    final String profile = "xxx";
    final String ociStreamOcid = "ocid1.stream.oc1.me-dubai-1.xxx";
    final String ociMessageEndpoint = "https://xxx.streaming.me-dubai-1.oci.oraclecloud.com";

    final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
    final AuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider(configFile);

    // Streams are assigned a specific endpoint url based on where they are
    // provisioned.
    // Create a stream client using the provided message endpoint.
    StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);

    // A cursor can be created as part of a consumer group.
    // Committed offsets are managed for the group, and partitions
    // are dynamically balanced amongst consumers in the group.
    System.out.println("Starting a simple message loop with a group cursor");
    String groupCursor = getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
    simpleMessageLoop(streamClient, ociStreamOcid, groupCursor);

  }

  private static void simpleMessageLoop(
      StreamClient streamClient, String streamId, String initialCursor) {
    String cursor = initialCursor;
    for (int i = 0; i < 10; i++) {

      GetMessagesRequest getRequest = GetMessagesRequest.builder()
          .streamId(streamId)
          .cursor(cursor)
          .limit(25)
          .build();

      GetMessagesResponse getResponse = streamClient.getMessages(getRequest);

      // process the messages
      System.out.println(String.format("Read %s messages.", getResponse.getItems().size()));
      for (Message message : ((GetMessagesResponse) getResponse).getItems()) {
        System.out.println(
            String.format(
                "%s: %s",
                message.getKey() == null ? "Null" : new String(message.getKey(), UTF_8),
                new String(message.getValue(), UTF_8)));
      }

      // getMessages is a throttled method; clients should retrieve sufficiently large
      // message
      // batches, as to avoid too many http requests.
      Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);

      // use the next-cursor for iteration
      cursor = getResponse.getOpcNextCursor();
    }
  }

  private static String getCursorByGroup(
      StreamClient streamClient, String streamId, String groupName, String instanceName) {
    System.out.println(
        String.format(
            "Creating a cursor for group %s, instance %s.", groupName, instanceName));

    CreateGroupCursorDetails cursorDetails = CreateGroupCursorDetails.builder()
        .groupName(groupName)
        .instanceName(instanceName)
        .type(CreateGroupCursorDetails.Type.TrimHorizon)
        .commitOnGet(true)
        .build();

    CreateGroupCursorRequest createCursorRequest = CreateGroupCursorRequest.builder()
        .streamId(streamId)
        .createGroupCursorDetails(cursorDetails)
        .build();

    CreateGroupCursorResponse groupCursorResponse = streamClient.createGroupCursor(createCursorRequest);
    return groupCursorResponse.getCursor().getValue();
  }

}

#2 Run the following command

mvn clean install exec:java -Dexec.mainClass=sdk.sdk.oss.example.Consumer

[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for oci.example:StreamsJava:jar:1.0-SNAPSHOT
[WARNING] 'dependencies.dependency.(groupId:artifactId:type:classifier)' must be unique: org.apache.commons:commons-lang3:jar -> duplicate declaration of version LATEST @ line 40, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building StreamsJava 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.4.1:clean (default-clean) @ StreamsJava ---
[INFO] Deleting /home/oracle/streams/sdk/target
[INFO]
[INFO] --- maven-resources-plugin:2.5:resources (default-resources) @ StreamsJava ---
[debug] execute contextualize
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/oracle/streams/sdk/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:compile (default-compile) @ StreamsJava ---
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 2 source files to /home/oracle/streams/sdk/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.5:testResources (default-testResources) @ StreamsJava ---
[debug] execute contextualize
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/oracle/streams/sdk/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ StreamsJava ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.10:test (default-test) @ StreamsJava ---
[INFO] No tests to run.
[INFO] Surefire report directory: /home/oracle/streams/sdk/target/surefire-reports

-------------------------------------------------------
 T E S T S
-------------------------------------------------------

Results :

Tests run: 0, Failures: 0, Errors: 0, Skipped: 0

[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ StreamsJava ---
[INFO] Building jar: /home/oracle/streams/sdk/target/StreamsJava-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-install-plugin:2.3.1:install (default-install) @ StreamsJava ---
[INFO] Installing /home/oracle/streams/sdk/target/StreamsJava-1.0-SNAPSHOT.jar to /home/oracle/.m2/repository/oci/example/StreamsJava/1.0-SNAPSHOT/StreamsJava-1.0-SNAPSHOT.jar
[INFO] Installing /home/oracle/streams/sdk/pom.xml to /home/oracle/.m2/repository/oci/example/StreamsJava/1.0-SNAPSHOT/StreamsJava-1.0-SNAPSHOT.pom
[INFO]
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ StreamsJava ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Starting a simple message loop with a group cursor
Creating a cursor for group exampleGroup, instance exampleInstance-1.
Read 25 messages.
messageKey0: messageValue0
messageKey1: messageValue1
messageKey2: messageValue2
messageKey3: messageValue3
messageKey4: messageValue4
messageKey5: messageValue5
messageKey6: messageValue6
messageKey7: messageValue7
messageKey8: messageValue8
messageKey9: messageValue9
messageKey10: messageValue10
messageKey11: messageValue11
messageKey12: messageValue12
messageKey13: messageValue13
messageKey14: messageValue14
messageKey15: messageValue15
messageKey16: messageValue16
messageKey17: messageValue17
messageKey18: messageValue18
messageKey19: messageValue19
messageKey20: messageValue20
messageKey21: messageValue21
messageKey22: messageValue22
messageKey23: messageValue23
messageKey24: messageValue24
Read 25 messages.
messageKey25: messageValue25
messageKey26: messageValue26
messageKey27: messageValue27
messageKey28: messageValue28
messageKey29: messageValue29
messageKey30: messageValue30
messageKey31: messageValue31
messageKey32: messageValue32
messageKey33: messageValue33
messageKey34: messageValue34
messageKey35: messageValue35
messageKey36: messageValue36
messageKey37: messageValue37
messageKey38: messageValue38
messageKey39: messageValue39
messageKey40: messageValue40
messageKey41: messageValue41
messageKey42: messageValue42
messageKey43: messageValue43
messageKey44: messageValue44
messageKey45: messageValue45
messageKey46: messageValue46
messageKey47: messageValue47
messageKey48: messageValue48
messageKey49: messageValue49
Read 0 messages.
Read 0 messages.
Read 0 messages.
Read 0 messages.
Read 0 messages.
Read 0 messages.
Read 0 messages.
Read 0 messages.
[WARNING] thread Thread[jersey-client-async-executor-0,5,oci.sdk.oss.example.Consumer] was interrupted but is still alive after waiting at least 14999msecs
[WARNING] thread Thread[jersey-client-async-executor-0,5,oci.sdk.oss.example.Consumer] will linger despite being asked to die via interruption
[WARNING] thread Thread[jersey-client-async-executor-1,5,oci.sdk.oss.example.Consumer] will linger despite being asked to die via interruption
[WARNING] thread Thread[jersey-client-async-executor-2,5,oci.sdk.oss.example.Consumer] will linger despite being asked to die via interruption
[WARNING] thread Thread[jersey-client-async-executor-3,5,oci.sdk.oss.example.Consumer] will linger despite being asked to die via interruption
[WARNING] thread Thread[jersey-client-async-executor-4,5,oci.sdk.oss.example.Consumer] will linger despite being asked to die via interruption
[WARNING] thread Thread[jersey-client-async-executor-5,5,oci.sdk.oss.example.Consumer] will linger despite being asked to die via interruption
[WARNING] thread Thread[jersey-client-async-executor-6,5,oci.sdk.oss.example.Consumer] will linger despite being asked to die via interruption
[WARNING] thread Thread[jersey-client-async-executor-7,5,oci.sdk.oss.example.Consumer] will linger despite being asked to die via interruption
[WARNING] thread Thread[jersey-client-async-executor-8,5,oci.sdk.oss.example.Consumer] will linger despite being asked to die via interruption
[WARNING] thread Thread[jersey-client-async-executor-9,5,oci.sdk.oss.example.Consumer] will linger despite being asked to die via interruption
[WARNING] thread Thread[jersey-client-async-executor-10,5,oci.sdk.oss.example.Consumer] will linger despite being asked to die via interruption
[WARNING] NOTE: 11 thread(s) did not finish despite being asked to  via interruption. This is not a problem with exec:java, it is a problem with the running code. Although not serious, it should be remedied.
[WARNING] Couldn't destroy threadgroup oke0(Native Method)
...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 30.413s
[INFO] Finished at: Fri Oct 13 16:03:03 GMT 2023
[INFO] Final Memory: 35M/390M
[INFO] ------------------------------------------------------------------------