Amazon Kinesis giúp bạn dễ dàng thu thập, xử lý và phân tích luồng dữ liệu truyền liên tục, real-time để bạn có thể nhận được thông tin chi tiết kịp thời và phản ứng nhanh với thông tin mới. Trong trường hợp này, trình duyệt sẽ gửi các điểm vẽ tới Kinesis stream. Thông thường, chúng ta sẽ tạo một Kinesis consumer client và đọc trực tiếp các sự kiện từ stream và xử lý chúng, và chúng ta sẽ thực hiện điều này trong kịch bản tiếp theo. Đối với tình huống này, giả sử chúng ta muốn các thông tin được gửi đến hàng đợi SQS giống như chúng ta đã làm trong các tình huống trước đây.
Vì không có cách nào được xây dựng sẵn để định tuyến các message đã được gửi tới Kinesis stream vào hàng đợi SQS, chúng ta cần một bên trung gian thực hiện việc xử lý/ định tuyến đó cho chúng ta. Trong trường hợp này, chúng ta sẽ triển khai trung gian dưới dạng một hàm AWS Lambda sẽ tạo cầu nối giữa Kinesis stream và hàng đợi SQS mà chúng ta xác định làm mục tiêu.
Kiến trúc cho kịch bản này trông giống như sau:
Tương tự như kịch bản trước, message được hiển thị bởi một subscriber hàng đợi Amazon SQS. Nhưng trong trường hợp này, các message được gửi đến một Kinesis stream, và sau đó được chuyển đến hàng đợi SQS bởi hàm Lambda.
Nội dung
Làm theo các bước bên dưới để triển khai hàm Lambda.
KinesisToSQS
Kinesis Event
từ danh sách.Chúng ta sẽ sử dụng cách triển khai mặc định để kiểm tra xem cấu hình có đúng không trước khi cập nhật cách triển khai nhằm thực hiện một tác vụ cụ thể.
kinesissam-202103-LambdaFunction-XXXXXXXXXXXX
.Lab 5 Kinesis to SQS
.LambdaKinesisToSQSRole
đã được bài thực hành cung cấp cho bạn.Trong khi bạn đang đợi tải lên hoàn tất, hãy xem nhanh cách triển khai đơn giản mà IDE đã tạo cho bạn.
public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, Integer> {
@Override
public Integer handleRequest(KinesisEvent event, Context context) {
context.getLogger().log("Input: " + event);
for (KinesisEventRecord record : event.getRecords()) {
String payload = new String(record.getKinesis().getData().array());
context.getLogger().log("Payload: " + payload);
}
return event.getRecords().size();
}
}
Việc triển khai này sẽ đơn giản là ghi nhật ký sự kiện đầu vào mà hàm Lambda nhận được, và sau đó đối với mỗi bản ghi trong gói, chúng sẽ ghi lại nội dung. Đây là một điểm khởi đầu tốt để cho phép chúng ta xác nhận rằng cấu hình của chúng ta đang đọc chính xác dữ liệu từ Kinesis stream.
Xung quanh vị trí này sẽ là những thống kê khác về stream. Lưu ý rằng Incoming data - sum (Count) khớp với cùng một số bạn thấy trong trang web demo của chúng ta, trong Messages Sent? Bạn có thể cần phải di chuyển chuột qua biểu đồ để xem khối lượng dữ liệu thực tế. Trong ví dụ được hiển thị ở đây, bạn sẽ trỏ chuột vào cuối dòng để hiển thị số lượng (hiển thị 118). Số điểm của hai điểm trên biểu đồ sẽ bằng số liệu Messages Sent trên trang web của chúng ta (hình ảnh cuối cùng, hiển thị Messages Sent từ trang web của chúng ta. 118)
kinesissam-202103-LambdaFunction-XXXXXXXXXXXX
. Chọn vào nó để quản lý cấu hình của nó.iDevelopDrawingData
từ danh sách.Trim horizon
. Giữ thiết lập Batch size ở 100
.Tùy thuộc vào số lượng điểm vẽ bạn đã xuất vào Kinesis stream, bạn sẽ thấy một số khác nhau cho Invocation count. Khi chúng ta bật trigger, chúng ta đặt Batch size thành 100 (là giá trị mặc định). Vì vậy, hàm Lambda sẽ được gọi với tối đa 100 bản ghi cùng một lúc. Nếu bạn đã gửi 225 message vào luồng, việc này sẽ sử dụng 3 invocation của hàm Lambda - Math.ceil(225/100) == 3.
Khi viết mã hàm Lambda của bạn bằng Java, bạn có thể phát chuỗi nhật ký vào CloudWatch log bằng cách chỉ cần gọi phương thức .log() của Logger:
context.getLogger().log("Log message to emit: " + someMessage);
Quay lại tình huống của chúng ta, chúng ta muốn đọc các message từ Kinesis stream và gửi chúng vào hàng đợi SQS, nơi triển khai trên trình duyệt của chúng ta để có thể đọc chúng và vẽ chúng trên khung của subscriber.
/KinesisToSQS/src/main/java/com/amazonaws/lambda/demo/LambdaFunctionHandler.java
bằng đoạn mã sau:package com.amazonaws.lambda.demo;
import java.awt.geom.AffineTransform;
import java.awt.geom.Point2D;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.fasterxml.jackson.databind.ObjectMapper;
public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, Integer> {
final int MAX_SQS_BATCH_SIZE = 10;
private AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
@Override
public Integer handleRequest(KinesisEvent event, Context context) {
Iterator<KinesisEventRecord> iterator = event.getRecords().iterator();
AffineTransform transformer = new AffineTransform(1, 0, 0, -1, 0, 400);
ObjectMapper mapper = new ObjectMapper();
Point2D dstPoint = new Point2D.Double();
boolean sendMessagesToSQS = true;
try
{
//
// Read the SQS Queue Target from the Lambda Environment Variables
//
String sqsUrl = System.getenv("TargetSQSUrl");
if ( sqsUrl == null || sqsUrl.isEmpty() )
{
context.getLogger().log("WARNING:: Environment Variable [TargetSQSUrl] is not set. No messages will be sent via SQS");
sendMessagesToSQS = false;
}
while ( iterator.hasNext() )
{
int messageCounter = 0;
// Prepare a batch request to write all the messages we have received in this invocation
Collection<SendMessageBatchRequestEntry> entries = new ArrayList<SendMessageBatchRequestEntry>();
while ( iterator.hasNext() && messageCounter++ < MAX_SQS_BATCH_SIZE )
{
String payload = new String(iterator.next().getKinesis().getData().array());
context.getLogger().log("Payload: " + payload);
DrawPoint transformedPoint = new DrawPoint();
try {
transformedPoint = mapper.readValue(payload, DrawPoint.class);
// Transform the point
Point2D srcPoint = new Point2D.Double(transformedPoint.getX(), transformedPoint.getY());
transformer.transform(srcPoint, dstPoint);
// Update the payload
transformedPoint.setX((int)dstPoint.getX());
transformedPoint.setY((int)dstPoint.getY());
// Add this payload into our batch
SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry(
"msg_" + messageCounter,
mapper.writeValueAsString(transformedPoint)
);
entries.add(entry);
}
catch (Exception e)
{
context.getLogger().log("Unable to deserialise " + payload + " as a DrawPoint! " + e.getMessage());
}
}
if ( sendMessagesToSQS && entries.size() > 0 )
{
// We have reached the end of the records or we have reached the maximum
// batch size allowed for SQS, so we need to send our entries
context.getLogger().log("Sending batch of " + (messageCounter - 1) + " events to SQS...");
SendMessageBatchRequest batch = new SendMessageBatchRequest()
.withQueueUrl(sqsUrl);
batch.setEntries(entries);
// Perform the message sending
sqs.sendMessageBatch(batch);
}
}
}
catch (Exception e)
{
context.getLogger().log("EXCEPTION::Aborting Lambda processing");
context.getLogger().log(e.getStackTrace().toString());
}
return event.getRecords().size();
}
// Inner class must be marked as static in order for the JSON mapper to deserialise
private static class DrawPoint {
private int x;
private int y;
private long timestamp;
private boolean clear;
public int getX() {
return x;
}
public void setX(int x) {
this.x = x;
}
public int getY() {
return y;
}
public void setY(int y) {
this.y = y;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public boolean isClear() {
return clear;
}
public void setClear(boolean clear) {
this.clear = clear;
}
}
}
Bạn cũng có thể tải xuống tập tin này từ đây.
Sau khi quá trình tải lên hoàn tất, trên AWS Lambda console, chọn vào tab Configuration cho hàm Lambda kinesissam-202103-LambdaFunction-XXXXXXXXXXXX
. Cuộn xuống phần Environment variables (nếu bạn không thấy, hãy chọn vào tên hàm Lambda của bạn trong Designer dashboard) và thêm một biến mới bằng khóa TargetSQSUrl
và đặt giá trị thành QueueUrl1
từ phòng thí nghiệm Additional Info panel.
Chọn vào Save.
Các điểm vẽ bây giờ sẽ được nhận qua hàng đợi SQS và được hiển thị trên khung. Lưu ý rằng hình dạng bạn vẽ có bị đảo ngược theo chiều Y không? Điều này là do việc triển khai chúng ta đã sử dụng cho hàm Lambda thực hiện một phép biến đổi trên các điểm vẽ, việc này chứng minh chúng ta có thể thao tác dữ liệu trong Kinesis stream trước khi chuyển nó vào hàng đợi SQS.
Hãy dành một chút thời gian để xem lại mã triển khai handler để đảm bảo bạn hiểu cách hoạt động của nó.