Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java V2 update the SNS example to specify FifoThroughputScope #7305

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.example.sns;

// snippet-start:[sns.java2.PriceUpdateExample.import]

import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
import software.amazon.awssdk.policybuilder.iam.IamEffect;
import software.amazon.awssdk.policybuilder.iam.IamPolicy;
Expand Down Expand Up @@ -39,14 +40,14 @@ public class PriceUpdateExample {
public static void main(String[] args) {

final String usage = "\n" +
"Usage: " +
" <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n\n" +
"Where:\n" +
" fifoTopicName - The name of the FIFO topic that you want to create. \n\n" +
" wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n\n"
+
" retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n\n" +
" analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n\n";
"Usage: " +
" <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n\n" +
"Where:\n" +
" fifoTopicName - The name of the FIFO topic that you want to create. \n\n" +
" wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n\n"
+
" retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n\n" +
" analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n\n";
if (args.length != 4) {
System.out.println(usage);
System.exit(1);
Expand All @@ -60,9 +61,9 @@ public static void main(String[] args) {
// For convenience, the QueueData class holds metadata about a queue: ARN, URL,
// name and type.
List<QueueData> queues = List.of(
new QueueData(wholeSaleQueueName, QueueType.FIFO),
new QueueData(retailQueueName, QueueType.FIFO),
new QueueData(analyticsQueueName, QueueType.Standard));
new QueueData(wholeSaleQueueName, QueueType.FIFO),
new QueueData(retailQueueName, QueueType.FIFO),
new QueueData(analyticsQueueName, QueueType.Standard));

// Create queues.
createQueues(queues);
Expand Down Expand Up @@ -90,13 +91,14 @@ public static String createFIFOTopic(String topicName) {
try {
// Create a FIFO topic by using the SNS service client.
Map<String, String> topicAttributes = Map.of(
"FifoTopic", "true",
"ContentBasedDeduplication", "false");
"FifoTopic", "true",
"ContentBasedDeduplication", "false",
"FifoThroughputScope", "MessageGroup");

CreateTopicRequest topicRequest = CreateTopicRequest.builder()
.name(topicName)
.attributes(topicAttributes)
.build();
.name(topicName)
.attributes(topicAttributes)
.build();

CreateTopicResponse response = snsClient.createTopic(topicRequest);
String topicArn = response.topicArn();
Expand All @@ -114,10 +116,10 @@ public static String createFIFOTopic(String topicName) {
public static void subscribeQueues(List<QueueData> queues, String topicARN) {
queues.forEach(queue -> {
SubscribeRequest subscribeRequest = SubscribeRequest.builder()
.topicArn(topicARN)
.endpoint(queue.queueARN)
.protocol("sqs")
.build();
.topicArn(topicARN)
.endpoint(queue.queueARN)
.protocol("sqs")
.build();

// Subscribe to the endpoint by using the SNS service client.
// Only Amazon SQS queues can receive notifications from an Amazon SNS FIFO
Expand All @@ -138,20 +140,20 @@ public static void publishPriceUpdate(String topicArn, String payload, String gr
String attributeValue = "wholesale";

MessageAttributeValue msgAttValue = MessageAttributeValue.builder()
.dataType("String")
.stringValue(attributeValue)
.build();
.dataType("String")
.stringValue(attributeValue)
.build();

Map<String, MessageAttributeValue> attributes = new HashMap<>();
attributes.put(attributeName, msgAttValue);
PublishRequest pubRequest = PublishRequest.builder()
.topicArn(topicArn)
.subject(subject)
.message(payload)
.messageGroupId(groupId)
.messageDeduplicationId(dedupId)
.messageAttributes(attributes)
.build();
.topicArn(topicArn)
.subject(subject)
.message(payload)
.messageGroupId(groupId)
.messageDeduplicationId(dedupId)
.messageAttributes(attributes)
.build();

final PublishResponse response = snsClient.publish(pubRequest);
System.out.println(response.messageId());
Expand All @@ -173,17 +175,17 @@ public static void createQueues(List<QueueData> queueData) {
CreateQueueResponse response;
if (isFifoQueue) {
response = sqsClient.createQueue(r -> r
.queueName(queue.queueName)
.attributes(Map.of(
QueueAttributeName.FIFO_QUEUE, "true")));
.queueName(queue.queueName)
.attributes(Map.of(
QueueAttributeName.FIFO_QUEUE, "true")));
} else {
response = sqsClient.createQueue(r -> r
.queueName(queue.queueName));
.queueName(queue.queueName));
}
queue.queueURL = response.queueUrl();
queue.queueARN = sqsClient.getQueueAttributes(b -> b
.queueUrl(queue.queueURL)
.attributeNames(QueueAttributeName.QUEUE_ARN)).attributes().get(QueueAttributeName.QUEUE_ARN);
.queueUrl(queue.queueURL)
.attributeNames(QueueAttributeName.QUEUE_ARN)).attributes().get(QueueAttributeName.QUEUE_ARN);
});
}

Expand All @@ -194,25 +196,25 @@ public static void addAccessPolicyToQueuesFINAL(List<QueueData> queues, String t
}
queues.forEach(queue -> {
IamPolicy policy = IamPolicy.builder()
.addStatement(b -> b // Allow account user to send messages to the queue.
.effect(IamEffect.ALLOW)
.addPrincipal(IamPrincipalType.AWS, account)
.addAction("SQS:*")
.addResource(queue.queueARN))
.addStatement(b -> b // Allow the SNS FIFO topic to send messages to the queue.
.effect(IamEffect.ALLOW)
.addPrincipal(IamPrincipalType.AWS, "*")
.addAction("SQS:SendMessage")
.addResource(queue.queueARN)
.addCondition(b1 -> b1
.operator(IamConditionOperator.ARN_LIKE)
.key("aws:SourceArn").value(topicARN)))
.build();
.addStatement(b -> b // Allow account user to send messages to the queue.
.effect(IamEffect.ALLOW)
.addPrincipal(IamPrincipalType.AWS, account)
.addAction("SQS:*")
.addResource(queue.queueARN))
.addStatement(b -> b // Allow the SNS FIFO topic to send messages to the queue.
.effect(IamEffect.ALLOW)
.addPrincipal(IamPrincipalType.AWS, "*")
.addAction("SQS:SendMessage")
.addResource(queue.queueARN)
.addCondition(b1 -> b1
.operator(IamConditionOperator.ARN_LIKE)
.key("aws:SourceArn").value(topicARN)))
.build();
sqsClient.setQueueAttributes(b -> b
.queueUrl(queue.queueURL)
.attributes(Map.of(
QueueAttributeName.POLICY,
policy.toJson())));
.queueUrl(queue.queueURL)
.attributes(Map.of(
QueueAttributeName.POLICY,
policy.toJson())));
});
}

Expand Down
Loading