博客内容Blog Content
Flink实时数据计算引入大模型 Introducing Large Models into Flink Real-Time Data Processing
使用代码测试通过在Flink数据清洗中引入大模型,能够借助外部能力进一步完成复杂的功能 Using code to test the integration of large models into Flink data cleansing, enabling the use of external capabilities to accomplish more complex tasks.
演示 Demo
使用docker-compose安装flink2.1,注意需要下载相关依赖,比如“OpenAI 模型函数”的提供器 JAR(即实现 ML_PREDICT 所需的 ModelProviderFactory 的 jar依赖
Use docker-compose to install Flink 2.1. Note that you need to download the necessary dependencies, such as the provider JAR for the "OpenAI model function" (i.e., the ModelProviderFactory JAR required for implementing ML_PREDICT).
version: "2.2"
services:
jobmanager:
image: flink:2.1.0-scala_2.12
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
volumes:
- /Users/luguanxing/Data/flink-2.1-test/flink-model-openai-2.1.0.jar:/opt/flink/lib/flink-model-openai-2.1.0.jar
taskmanager:
image: flink:2.1.0-scala_2.12
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
volumes:
- /Users/luguanxing/Data/flink-2.1-test/flink-model-openai-2.1.0.jar:/opt/flink/lib/flink-model-openai-2.1.0.jar
sql-client:
image: flink:2.1.0-scala_2.12
command: bin/sql-client.sh
depends_on:
- jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
volumes:
- /Users/luguanxing/Data/flink-2.1-test/flink-model-openai-2.1.0.jar:/opt/flink/lib/flink-model-openai-2.1.0.jar
进入sql-client
Enter the SQL client.
docker compose run sql-client
添加模拟数据源
Add a mock data source.
CREATE TEMPORARY VIEW input_table(text) AS VALUES ('Hello how are you?'), ('What is your name?'), ('Good morning everyone!'), ('I love learning new languages.'), ('This is a test sentence.');
添加ai模块,指定接口和提示词,用于对数据进行英文翻译成中文
Add the AI module, specifying the API endpoint and prompt, which will be used to translate English data into Chinese.
-- Declare a AI model CREATE MODEL `my_model` INPUT (text STRING) OUTPUT (response STRING) WITH( 'provider' = 'openai', 'endpoint' = 'https://api.openai.com/v1/llm/v1/chat', 'api-key' = 'abcdefg', 'system-prompt' = 'translate to Chinese', 'model' = 'gpt-4o' );
执行sql查询,使用openAI接口对数据进行实时清洗
Execute the SQL query to perform real-time data cleansing using the OpenAI API.
-- Basic usage SELECT * FROM ML_PREDICT( TABLE input_table, MODEL my_model, DESCRIPTOR(text) );
可以得到查询前后的结果,即数据英文已经成功翻译了
You can observe the results before and after the query — the English data has been successfully translated.
参考资料 References
安装 Installation
文档 Documentation
openai依赖 OpenAI dependency
https://mvnrepository.com/artifact/org.apache.flink/flink-model-openai/2.1.0