Amazon Kinesis Data Streams là gì? – Hướng dẫn cấu hình – Bài 21

Amazon Kinesis Data Streams (KDS) là dịch vụ của AWS cho phép thu thập dữ liệu theo thời gian thực. Đây là giải pháp tối ưu cho các trường hợp dữ liệu được tạo ra và cần xử lý ngay lập tức, chẳng hạn như luồng click từ website, dữ liệu cảm biến IoT, hoặc log hệ thống từ máy chủ.

Xem thêm:

Amazon FSx

Amazon Route 53

Auto Scaling Group trong AWS

1. Amazon Kinesis Data Streams là gì?

Amazon Kinesis Data Streams (KDS) là dịch vụ của AWS cho phép thu thập, lưu trữ và xử lý dữ liệu luồng theo thời gian thực (real-time streaming).

Một số tính năng nổi bật:

  • Dữ liệu có thể được lưu trữ tối đa 365 ngày.
  • Dữ liệu không thể bị xoá thủ công – chỉ bị loại bỏ khi hết thời hạn lưu trữ.
  • Kích thước tối đa cho một bản ghi: 1 MB.
  • Hỗ trợ ordering theo Partition Key – giúp duy trì thứ tự khi gửi nhiều bản ghi liên quan.
  • Mã hóa dữ liệu at rest bằng AWS KMS và in transit bằng HTTPS.

2. Amazon Kinesis Data Strams hoạt động như thế nào?

Dữ liệu real-time thường phát sinh từ nhiều nguồn như:

  • Hành vi người dùng trên website (clickstream)
  • Thiết bị IoT như xe đạp kết nối internet
  • Log hoặc metric hệ thống máy chủ

Các nguồn dữ liệu này được gọi là Producers – có thể là ứng dụng viết bằng code, hoặc sử dụng Kinesis Agent được cài đặt trên máy chủ để tự động gửi dữ liệu vào luồng.

Dữ liệu sau đó được truyền theo thời gian thực vào Kinesis Data Stream, dữ liệu sẽ được lưu trữ và sẵn sàng cho các thành phần Consumers xử lý như:

  • Application
  • AWS Lambda Function
  • Amazon Kinesis Data Firehose
  • Managed Service for Apache Flink

Amazon Kinesis Data Streams

3. Hai chế độ vận hành: Provisioned và On-demand

3.1. Provisioned Mode

Trong chế độ này, bạn tự cấu hình số lượng shard – đơn vị cơ bản xác định thông lượng của stream:

  • 1 shard cung cấp tối đa: 1 MB/s hoặc 1.000 bản ghi/giây (ghi vào) và 2 MB/s (đọc ra).
  • Muốn đạt thông lượng cao hơn (ví dụ: 10 MB/s), bạn cần 10 shards.
  • Chi phí tính theo số shard được provision theo giờ.
  • Cho phép scale in/out (tăng/giảm shard) thủ công.

3.2. On-demand Mode

Đây là chế độ linh hoạt, không cần cấu hình trước số lượng shard:

  • Khởi đầu với ngưỡng mặc định: ~4 MB/s hoặc 4.000 bản ghi/giây.
  • Kinesis tự động scale theo thông lượng quan sát trong 30 ngày gần nhất.
  • Chi phí tính theo dung lượng dữ liệu vào/ra và thời gian sử dụng stream.

Dựa trên đoạn note bạn cung cấp và theo phong cách chi tiết – dễ hiểu như trong data.pdf, mình sẽ viết lại thành một bài hướng dẫn hoàn chỉnh cho người mới bắt đầu.

4. Amazon Kinesis Data Streams – HandOn

Trong bài lab này, chúng ta sẽ thực hành xây dựng một pipeline xử lý dữ liệu streaming sử dụng các dịch vụ của AWS: Kinesis Data Streams, AWS Lambda và DynamoDB.

Mục tiêu là:

  • Tạo một Kinesis Data Stream để tiếp nhận dữ liệu.
  • Tạo Lambda Function thứ nhất để phát sinh dữ liệu mẫu (IoT sensor data) và gửi vào Data Stream.
  • Tạo một bảng DynamoDB để lưu dữ liệu đã xử lý.
  • Tạo Lambda Function thứ hai để xử lý dữ liệu từ Data Stream và ghi kết quả vào DynamoDB.
  • Thực hiện test pipeline, quan sát dữ liệu trong DynamoDB.
  • Cuối cùng dọn dẹp tài nguyên để tránh phát sinh chi phí.

4.1. Kiến trúc tổng quan

  1. Kinesis Data Stream (DemoStream): nơi tiếp nhận dữ liệu thời gian thực.
  2. Lambda 1 (KinesisDataGenerator): sinh dữ liệu cảm biến (sensor ID, nhiệt độ, độ ẩm, timestamp) và gửi vào Kinesis.
  3. Lambda 2 (ProcessKinesisData): được kích hoạt tự động khi có dữ liệu trong stream, xử lý và ghi vào DynamoDB.
  4. DynamoDB Table (KinesisProcessedData): nơi lưu trữ dữ liệu đã xử lý.

Amazon Kinesis Data Streams - HandOn

4.2. Các bước thực hiện

Bước 1: Tạo Kinesis Data Stream

  1. Vào AWS Console, tìm dịch vụ Kinesis.
  2. Chọn Data Streams → Create data stream.
  3. Đặt tên: DemoStream.
  4. Chọn Provisioned, để mặc định 1 shard.
  5. Nhấn Create.

Tạo Kinesis Data Stream

Tạo Kinesis Data Stream

Tạo Kinesis Data Stream

Tạo Kinesis Data Stream

Bước 2: Tạo Lambda Function 1 – Data Generator

1. Vào dịch vụ Lambda → Create function.

2. Đặt tên: KinesisDataGenerator.

3. Runtime: Python 3.x.

4. Tạo function và mở phần Code.

Tạo Lambda Function 1 – Data Generator

Tạo Lambda Function 1 – Data Generator

5. Thêm đoạn code sau (ví dụ, sinh 100 bản ghi ngẫu nhiên):

import json
import boto3
import random
import time

def lambda_handler(event, context):
    kinesis = boto3.client('kinesis')
    
    for _ in range(100):  # Generate 100 records
        data = {
            'sensor_id': random.randint(1, 10),
            'temperature': round(random.uniform(20, 30), 2),
            'humidity': round(random.uniform(30, 70), 2),
            'timestamp': int(time.time())
        }
        
        response = kinesis.put_record(
            StreamName='DemoStream',
            Data=json.dumps(data),
            PartitionKey=str(data['sensor_id'])
        )
        
        print(f"Put record in stream: {data}")
        time.sleep(0.1)  # Wait for 0.1 second before sending next record

    return {
        'statusCode': 200,
        'body': json.dumps('Data generation complete')
    }

6. Nhấn Deploy để lưu code.

Tạo Lambda Function 1 – Data Generator

7. Vào tab Configuration → General Configuration, chỉnh timeout thành 1 phút.

Tạo Lambda Function 1 – Data Generator

Tạo Lambda Function 1 – Data Generator

8. Gắn quyền cho Lambda: vào tab Permissions → Execution role → Attach policies, chọn AmazonKinesisFullAccess.

Gắn quyền cho Lambda:

Gắn quyền cho Lambda

Gắn quyền cho Lambda

Bước 3: Tạo DynamoDB Table

  1. Vào dịch vụ DynamoDB → Create Table.
  2. Table name: KinesisProcessedData.
  3. Partition key: sensor_id (Number).
  4. Sort key: timestamp (Number).
  5. Giữ mặc định và nhấn Create.

TẠO DYNAMODB TABLE

TẠO DYNAMODB TABLE

Bước 4: Tạo Lambda Function 2 – Data Processor

1. Vào dịch vụ Lambda → Create function.

2. Đặt tên: ProcessKinesisData.

3. Runtime: Python 3.x.

Tạo Lambda Function 1 – Data Generator

Tạo Lambda Function 2 – Data Processor

4. Thêm đoạn code sau:

import json
import boto3
from decimal import Decimal
import base64

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('KinesisProcessedData')

def lambda_handler(event, context):
    for record in event['Records']:
        # Decode and load the Kinesis data
        payload = json.loads(base64.b64decode(record['kinesis']['data']).decode('utf-8'))
        
        # Convert float to Decimal for DynamoDB
        payload['temperature'] = Decimal(str(payload['temperature']))
        payload['humidity'] = Decimal(str(payload['humidity']))
        
        table.put_item(Item=payload)
        
        print(f"Processed and stored record: {payload}")

    return {
        'statusCode': 200,
        'body': json.dumps('Processing complete')
    }

Tạo Lambda Function 2 – Data Processor

5. Nhấn Deploy để lưu code.

6. Gắn quyền cho Lambda:

  • AmazonKinesisFullAccess.
  • AmazonDynamoDBFullAccess.

Tạo Lambda Function 2 – Data Processor

Tạo Lambda Function 2 – Data Processor

Bước 5: Thêm Trigger cho Lambda 2

  1. Vào function ProcessKinesisData.
  2. Chọn tab Triggers → Add trigger.
  3. Chọn Kinesis, chọn stream DemoStream.
  4. Nhấn Add.

Thêm Trigger cho Lambda 2

Thêm Trigger cho Lambda 2

Thêm Trigger cho Lambda 2

Bước 6: Kiểm thử pipeline

  1. Vào Lambda KinesisDataGenerator.
  2. Nhấn Test → Create test event → Save → Test.
  3. Lambda sẽ chạy ~15 giây và gửi 100 bản ghi vào Kinesis.
  4. Vào DynamoDB → KinesisProcessedDataExplore table items.
  5. Kiểm tra dữ liệu (sensor_id, timestamp, temperature, humidity).

Kiểm thử pipeline

Kiểm thử pipeline

Kiểm thử pipeline

Kiểm thử pipeline

Kiểm thử pipeline

4.3. Dọn dẹp tài nguyên

Để tránh chi phí không mong muốn, hãy xóa toàn bộ tài nguyên sau khi test:

  • Xóa Kinesis Data Stream (DemoStream).
  • Xóa Lambda functions.
  • Xóa DynamoDB table (KinesisProcessedData).
  • Xóa CloudWatch log groups của các Lambda.
  • Xóa IAM roles đã tạo.

Cảm ơn bạn đã tham khảo AWS Certified Solutions Architect Associate – SAA-C03 trên ttnguyen.net

Bài viết cùng chủ đề:

AWS App Runner

Amazon Simple Queue Service

Nguyễn Tiến Trường

Mình viết về những điều nhỏ nhặt trong cuộc sống, Viết về câu chuyện những ngày không có em