首页
  • GM-3568JHF
  • M4-R1
  • M5-R1
  • SC-3568HA
  • M-K1HSE
  • CF-NRS1
  • CF-CRA2
  • 1684XB-32T
  • 1684X-416T
  • RDK-X5
  • RDK-S100
  • C-3568BQ
  • C-3588LQ
  • GC-3568JBAF
  • C-K1BA
商城
  • English
  • 简体中文
首页
  • GM-3568JHF
  • M4-R1
  • M5-R1
  • SC-3568HA
  • M-K1HSE
  • CF-NRS1
  • CF-CRA2
  • 1684XB-32T
  • 1684X-416T
  • RDK-X5
  • RDK-S100
  • C-3568BQ
  • C-3588LQ
  • GC-3568JBAF
  • C-K1BA
商城
  • English
  • 简体中文
  • 1684XB-32T

    • 一、简介

      • AIBOX-1684XB-32简介
    • 二、快速上手

      • 初次使用
      • 网络配置
      • 磁盘使用
      • 内存分配
      • 风扇策略
      • 固件升级
      • 交叉编译
      • 模型量化
    • 三、应用开发

      • 开发简介

        • Sophgo SDK开发
        • SOPHON-DEMO简介
      • 大语言模型

        • 部署Llama3示例
        • Sophon LLM_api_server开发
        • 部署MiniCPM-V-2_6
        • Qwen-2-5-VL图片视频识别DEMO
        • Qwen3-chat-DEMO
        • Qwen3-Qwen Agent-MCP开发
        • Qwen3-langchain-AI Agent
      • 深度学习

        • ResNet(图像分类)
        • LPRNet(车牌识别)
        • SAM(通用图像分割基础模型)
        • YOLOv5(目标检测)
        • OpenPose(人体关键点检测)
        • PP-OCR(光学字符识别)
    • 四、资料下载

      • 资料下载
  • 1684X-416T

    • 简介

      • AIBOX-1684X-416简介
    • Demo简单操作指引

      • shimeta智慧监控demo的简单使用说明
  • RDK-X5

    • 简介

      • RDK-X5 硬件简介
    • 快速开始

      • RDK-X5 快速开始
    • 应用开发

      • AI在线模型开发

        • 实验01-接入火山引擎豆包 AI
        • 实验02-图片分析
        • 实验03-多模态视觉分析定位
        • 实验04-多模态图文比较分析
        • 实验05-多模态文档表格分析
        • 实验06-摄像头运用-AI视觉分析
      • 大语言模型

        • 实验01-语音识别
        • 实验02-语音对话
        • 实验03-多模态图片分析-语音对话
        • 实验04-多模态图片比较-语音对话
        • 实验05-多模态文档分析-语音对话
        • 实验06-多模态视觉运用-语音对话
      • ROS2基础开发

        • 实验01-搭建环境
        • 实验02-工作包的创建及编译
        • 实验03-运行 ROS2 话题通信节点
        • 实验04-ROS2 相机应用
      • 40pin-IO开发

        • 实验01-GPIO 输出(LED闪烁)
        • 实验02-GPIO 输入
        • 实验03-按键控制 LED
        • 实验04-PWM 输出
        • 实验05-串口输出
        • 实验06-IIC 实验
        • 实验07-SPI 实验
      • USB模块开发使用

        • 实验01-USB 语音模块使用
        • 实验02-声源定位模块使用
      • 机器视觉技术实战

        • 实验01-打开 USB 摄像头
        • 实验02-颜色识别检测
        • 实验03-手势识别体验
        • 实验04-YOLOv5物体检测
  • RDK-S100

    • 简介

      • RDK-S100 硬件简介
    • 快速开始

      • RDK-S100 硬件简介
    • 应用开发

      • AI在线模型开发

        • 实验01-接入火山引擎豆包 AI
        • 实验02-图片分析
        • 实验03-多模态视觉分析定位
        • 实验04-多模态图文比较分析
        • 实验05-多模态文档表格分析
        • 实验06-摄像头运用-AI视觉分析
      • 大语言模型

        • 实验01-语音识别
        • 实验02-语音对话
        • 实验03-多模态图片分析-语音对话
        • 实验04-多模态图片比较-语音对话
        • 实验05-多模态文档分析-语音对话
        • 实验06-多模态视觉运用-语音对话
      • ROS2基础开发

        • 实验01-搭建环境
        • 实验02-工作包的创建及编译
        • 实验03-运行 ROS2 话题通信节点
        • 实验04-ROS2 相机应用
      • 40pin-IO开发

        • 实验01-GPIO 输出(LED闪烁)
        • 实验02-GPIO 输入
        • 实验03-按键控制 LED
        • 实验04-PWM 输出
        • 实验05-串口输出
        • 实验06-IIC 实验
        • 实验07-SPI 实验
      • USB模块开发使用

        • 实验01-USB 语音模块使用
        • 实验02-声源定位模块使用
      • 机器视觉技术实战

        • 实验01-打开 USB 摄像头
        • 实验02-图像处理基础
        • 实验03-目标检测
        • 实验04-图像分割

AI在线开发

实验06-摄像头运用-AI视觉分析

实验准备:

  1. 确保系统已安装python3以及opencv数据库
  2. 准备一个usb摄像头

实验步骤:

  1. 将摄像头接入主板,运行ls /dev/video*,检查摄像头是否接入,程序中使用默认摄像头接口video0,如接口不符可自行更改。
  2. cd AI_online #进入功能包
  3. python examples/06_camera_input_loop.py #运行示例程序

运行终端如下:

TOOL

摄像头画面示例:

TOOL
"""
06_camera_input_loop.py

功能:
- 打开摄像头窗口实时显示画面
- 在终端输入问题后,将“当前帧”发送到 AI 做图文分析并返回回答
- 适用于识别当前画面中有什么、颜色判断(如红色或蓝色木块)等

依赖:
- OpenCV: pip install opencv-python
- 已配置好的 DoubaoAPIClient:请在 utils/config.py 中填写 API_KEY / MODEL_ENDPOINT / API_BASE_URL
"""
import sys
import os
import threading
import time
from typing import Optional

# 尝试导入 OpenCV
try:
    import cv2
except ImportError:
    print("未安装 OpenCV,请先执行: pip install opencv-python")
    sys.exit(1)

# 加入父目录,便于示例脚本直接运行
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.api_client import DoubaoAPIClient


def encode_frame_to_jpeg_bytes(frame) -> Optional[bytes]:
    """将当前帧编码为 JPEG 字节,失败返回 None"""
    try:
        # 轻度缩放,降低带宽与延迟(保持 16:9/4:3 等比例)
        max_w = 960
        h, w = frame.shape[:2]
        if w > max_w:
            scale = max_w / float(w)
            new_size = (int(w * scale), int(h * scale))
            frame = cv2.resize(frame, new_size, interpolation=cv2.INTER_AREA)
        ok, buf = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 85])
        if not ok:
            return None
        return buf.tobytes()
    except Exception as e:
        print(f"帧编码失败: {e}")
        return None


class CameraQALoop:
    """摄像头输入循环 + 终端问答,将当前画面发送到 AI 进行分析"""
    def __init__(self, camera_index: int = 0, window_name: str = "Camera Feed"):
        self.camera_index = camera_index
        self.window_name = window_name
        self.cap: Optional[cv2.VideoCapture] = None
        self.running = False
        self.latest_frame = None
        self.lock = threading.Lock()
        self.client: Optional[DoubaoAPIClient] = None
        self.input_thread: Optional[threading.Thread] = None

    def _init_camera(self) -> bool:
        self.cap = cv2.VideoCapture(self.camera_index)
        if not self.cap.isOpened():
            print(f"无法打开摄像头(index={self.camera_index}),请检查设备或更换索引")
            return False
        # 可选:设置分辨率,视设备而定
        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
        return True

    def _init_client(self) -> bool:
        try:
            self.client = DoubaoAPIClient()
            return True
        except Exception as e:
            print(f"API 客户端初始化失败:{e}\n请检查 utils/config.py 中的 API_KEY / MODEL_ENDPOINT / API_BASE_URL 配置是否正确")
            return False

    def _print_intro(self):
        print("\n=== 摄像头问答模式已启动 ===")
        print("使用说明:")
        print("1) 已打开摄像头窗口,请在终端直接输入你的问题并回车(例如:\"现在摄像头范围里有什么?\" / \"是红色木块还是蓝色木块?\")")
        print("2) 我会用当前画面进行分析并在终端返回答案。")
        print("3) 终端输入 quit 或 exit 可退出;窗口内按 Q 也可退出。\n")

    def _answer_with_current_frame(self, question: str):
        # 读取最新帧
        with self.lock:
            frame = None if self.latest_frame is None else self.latest_frame.copy()
        if frame is None:
            print("暂时没有可用画面,请稍后再试……")
            return

        image_bytes = encode_frame_to_jpeg_bytes(frame)
        if image_bytes is None:
            print("当前帧编码失败,未能发送给 AI")
            return

        # 系统提示词:引导模型专注当前图像进行客观识别与颜色判断
        system_prompt = (
            "你是一位视觉助手。请始终基于用户提供的当前图像来回答问题,"
            "需要进行:物体识别、颜色判断、场景/位置描述、简单关系判断。"
            "当图像中信息不足或不确定时,请明确说明不确定并简要给出可能性。"
        )

        try:
            print("\n[AI] 正在分析当前画面,请稍候……")
            answer = self.client.chat_with_image(
                text=question,
                image_data=image_bytes,
                image_format="bytes",  # 直接发送内存字节
                system_prompt=system_prompt,
                max_tokens=800,
                temperature=0.3
            )
            if answer:
                print(f"[AI 答复] {answer}\n")
            else:
                print("[AI] 未返回有效答案,请重试或检查网络/API 配置\n")
        except Exception as e:
            print(f"[AI] 分析失败:{e}\n")

    def _input_loop(self):
        """终端输入线程:阻塞读取用户问题,触发当前帧分析"""
        while self.running:
            try:
                question = input("请输入问题(或输入 quit/exit 退出):").strip()
            except EOFError:
                # 终端被关闭或无输入源
                question = "quit"
            if question.lower() in ("quit", "exit"):
                self.running = False
                break
            if not question:
                continue
            self._answer_with_current_frame(question)

    def start(self):
        if not self._init_camera():
            return
        if not self._init_client():
            # 即使 AI 客户端失败,也允许预览摄像头;但无法问答
            print("提示:你仍可查看摄像头窗口,但无法进行 AI 问答。")
        self.running = True
        self._print_intro()

        # 启动输入线程
        self.input_thread = threading.Thread(target=self._input_loop, daemon=True)
        self.input_thread.start()

        # 摄像头显示主循环
        try:
            while self.running:
                ret, frame = self.cap.read()
                if not ret:
                    print("读取摄像头帧失败,尝试继续……")
                    time.sleep(0.05)
                    continue
                # 更新当前帧
                with self.lock:
                    self.latest_frame = frame
                # 在窗口显示
                cv2.imshow(self.window_name, frame)
                key = cv2.waitKey(1) & 0xFF
                if key in (ord('q'), ord('Q')):
                    self.running = False
                    break
            print("正在退出……")
        finally:
            self.stop()

    def stop(self):
        try:
            if self.cap:
                self.cap.release()
            cv2.destroyAllWindows()
        except Exception:
            pass
        self.running = False
        # 等待输入线程结束
        if self.input_thread and self.input_thread.is_alive():
            try:
                self.input_thread.join(timeout=1.0)
            except Exception:
                pass
        print("已关闭摄像头与窗口。")


def main():
    import argparse
    parser = argparse.ArgumentParser(description="摄像头输入循环 + AI 图文问答")
    parser.add_argument("--index", type=int, default=0, help="摄像头索引(默认0)")
    args = parser.parse_args()

    loop = CameraQALoop(camera_index=args.index)
    loop.start()


if __name__ == "__main__":
    main()
在 GitHub 上编辑此页
上次更新:
贡献者: wuziqing
Prev
实验05-多模态文档表格分析