博客内容Blog Content

Flink实时数据计算引入大模型 Introducing Large Models into Flink Real-Time Data Processing

BlogType : Big Data releaseTime : 2025-08-12 18:00:00

使用代码测试通过在Flink数据清洗中引入大模型,能够借助外部能力进一步完成复杂的功能 Using code to test the integration of large models into Flink data cleansing, enabling the use of external capabilities to accomplish more complex tasks.

演示 Demo

使用docker-compose安装flink2.1,注意需要下载相关依赖,比如“OpenAI 模型函数”的提供器 JAR(即实现 ML_PREDICT 所需的 ModelProviderFactory 的 jar依赖

Use docker-compose to install Flink 2.1. Note that you need to download the necessary dependencies, such as the provider JAR for the "OpenAI model function" (i.e., the ModelProviderFactory JAR required for implementing ML_PREDICT).

version: "2.2"
services:
  jobmanager:
    image: flink:2.1.0-scala_2.12
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager     
    volumes:
      - /Users/luguanxing/Data/flink-2.1-test/flink-model-openai-2.1.0.jar:/opt/flink/lib/flink-model-openai-2.1.0.jar
  taskmanager:
    image: flink:2.1.0-scala_2.12
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2        
    volumes:
      - /Users/luguanxing/Data/flink-2.1-test/flink-model-openai-2.1.0.jar:/opt/flink/lib/flink-model-openai-2.1.0.jar
  sql-client:
    image: flink:2.1.0-scala_2.12
    command: bin/sql-client.sh
    depends_on:
      - jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        rest.address: jobmanager        
    volumes:
      - /Users/luguanxing/Data/flink-2.1-test/flink-model-openai-2.1.0.jar:/opt/flink/lib/flink-model-openai-2.1.0.jar



进入sql-client

Enter the SQL client.

docker compose run sql-client



添加模拟数据源

Add a mock data source.

CREATE TEMPORARY VIEW input_table(text) AS
VALUES
  ('Hello how are you?'),
  ('What is your name?'),
  ('Good morning everyone!'),
  ('I love learning new languages.'),
  ('This is a test sentence.');



添加ai模块,指定接口和提示词,用于对数据进行英文翻译成中文

Add the AI module, specifying the API endpoint and prompt, which will be used to translate English data into Chinese.

-- Declare a AI model
CREATE MODEL `my_model`
INPUT (text STRING)
OUTPUT (response STRING)
WITH(
  'provider' = 'openai',
  'endpoint' = 'https://api.openai.com/v1/llm/v1/chat',
  'api-key' = 'abcdefg',
  'system-prompt' = 'translate to Chinese',
  'model' = 'gpt-4o'
);



执行sql查询,使用openAI接口对数据进行实时清洗

Execute the SQL query to perform real-time data cleansing using the OpenAI API.

-- Basic usage
SELECT * FROM ML_PREDICT(
  TABLE input_table,
  MODEL my_model,
  DESCRIPTOR(text)
);



可以得到查询前后的结果,即数据英文已经成功翻译了

You can observe the results before and after the query — the English data has been successfully translated.



参考资料 References

安装 Installation

https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/deployment/resource-providers/standalone/docker/#flink-with-docker-compose


文档 Documentation

https://flink.apache.org/2025/07/31/apache-flink-2.1.0-ushers-in-a-new-era-of-unified-real-time-data--ai-with-comprehensive-upgrades/


openai依赖 OpenAI dependency

https://mvnrepository.com/artifact/org.apache.flink/flink-model-openai/2.1.0