首页
  • GM-3568JHF
  • M4-R1
  • M5-R1
  • SC-3568HA
  • M-K1HSE
  • CF-NRS1
  • CF-CRA2
  • 1684XB-32T
  • 1684X-416T
  • RDK-X5
  • RDK-S100
  • C-3568BQ
  • C-3588LQ
  • GC-3568JBAF
  • C-K1BA
商城
  • English
  • 简体中文
首页
  • GM-3568JHF
  • M4-R1
  • M5-R1
  • SC-3568HA
  • M-K1HSE
  • CF-NRS1
  • CF-CRA2
  • 1684XB-32T
  • 1684X-416T
  • RDK-X5
  • RDK-S100
  • C-3568BQ
  • C-3588LQ
  • GC-3568JBAF
  • C-K1BA
商城
  • English
  • 简体中文
  • 1684XB-32T

    • 一、简介

      • AIBOX-1684XB-32简介
    • 二、快速上手

      • 初次使用
      • 网络配置
      • 磁盘使用
      • 内存分配
      • 风扇策略
      • 固件升级
      • 交叉编译
      • 模型量化
    • 三、应用开发

      • 开发简介

        • Sophgo SDK开发
        • SOPHON-DEMO简介
      • 大语言模型

        • 部署Llama3示例
        • Sophon LLM_api_server开发
        • 部署MiniCPM-V-2_6
        • Qwen-2-5-VL图片视频识别DEMO
        • Qwen3-chat-DEMO
        • Qwen3-Qwen Agent-MCP开发
        • Qwen3-langchain-AI Agent
      • 深度学习

        • ResNet(图像分类)
        • LPRNet(车牌识别)
        • SAM(通用图像分割基础模型)
        • YOLOv5(目标检测)
        • OpenPose(人体关键点检测)
        • PP-OCR(光学字符识别)
    • 四、资料下载

      • 资料下载
  • 1684X-416T

    • 简介

      • AIBOX-1684X-416简介
    • Demo简单操作指引

      • shimeta智慧监控demo的简单使用说明
  • RDK-X5

    • 简介

      • RDK-X5 硬件简介
    • 快速开始

      • RDK-X5 快速开始
    • 应用开发

      • AI在线模型开发

        • 实验01-接入火山引擎豆包 AI
        • 实验02-图片分析
        • 实验03-多模态视觉分析定位
        • 实验04-多模态图文比较分析
        • 实验05-多模态文档表格分析
        • 实验06-摄像头运用-AI视觉分析
      • 大语言模型

        • 实验01-语音识别
        • 实验02-语音对话
        • 实验03-多模态图片分析-语音对话
        • 实验04-多模态图片比较-语音对话
        • 实验05-多模态文档分析-语音对话
        • 实验06-多模态视觉运用-语音对话
      • ROS2基础开发

        • 实验01-搭建环境
        • 实验02-工作包的创建及编译
        • 实验03-运行 ROS2 话题通信节点
        • 实验04-ROS2 相机应用
      • 40pin-IO开发

        • 实验01-GPIO 输出(LED闪烁)
        • 实验02-GPIO 输入
        • 实验03-按键控制 LED
        • 实验04-PWM 输出
        • 实验05-串口输出
        • 实验06-IIC 实验
        • 实验07-SPI 实验
      • USB模块开发使用

        • 实验01-USB 语音模块使用
        • 实验02-声源定位模块使用
      • 机器视觉技术实战

        • 实验01-打开 USB 摄像头
        • 实验02-颜色识别检测
        • 实验03-手势识别体验
        • 实验04-YOLOv5物体检测
  • RDK-S100

    • 简介

      • RDK-S100 硬件简介
    • 快速开始

      • RDK-S100 硬件简介
    • 应用开发

      • AI在线模型开发

        • 实验01-接入火山引擎豆包 AI
        • 实验02-图片分析
        • 实验03-多模态视觉分析定位
        • 实验04-多模态图文比较分析
        • 实验05-多模态文档表格分析
        • 实验06-摄像头运用-AI视觉分析
      • 大语言模型

        • 实验01-语音识别
        • 实验02-语音对话
        • 实验03-多模态图片分析-语音对话
        • 实验04-多模态图片比较-语音对话
        • 实验05-多模态文档分析-语音对话
        • 实验06-多模态视觉运用-语音对话
      • ROS2基础开发

        • 实验01-搭建环境
        • 实验02-工作包的创建及编译
        • 实验03-运行 ROS2 话题通信节点
        • 实验04-ROS2 相机应用
      • 40pin-IO开发

        • 实验01-GPIO 输出(LED闪烁)
        • 实验02-GPIO 输入
        • 实验03-按键控制 LED
        • 实验04-PWM 输出
        • 实验05-串口输出
        • 实验06-IIC 实验
        • 实验07-SPI 实验
      • USB模块开发使用

        • 实验01-USB 语音模块使用
        • 实验02-声源定位模块使用
      • 机器视觉技术实战

        • 实验01-打开 USB 摄像头
        • 实验02-图像处理基础
        • 实验03-目标检测
        • 实验04-图像分割

语音LLM应用 - 实验06-多模态视觉运用-语音对话

实验准备:

  1. 确保已接入火山引擎豆包AI以及讯飞AI(参考实验01、实验02)
  2. 接入usb摄像头(本实验以usb摄像头为例),运行ls /dev/video*,检查摄像头是否接入,程序中使用默认摄像头接口video0,如接口不符可自行更改。
  3. 安装 OpenCV: pip install opencv-python (如已安装可跳过)

实验步骤:(确保语音模块已连接)

  1. cd AI_online_voice #进入主目录
  2. python examples/06_voice_camera_analysis.py #运行示例程序

终端运行示例:

TOOLTOOL
# -*- coding: utf-8 -*-
"""
06_voice_camera_analysis.py

实验06:以摄像头接入-语音分析为主题
流程:接入摄像头 → 实时小窗口显示 → 语音输入指令 → 截图当前画面 → 将截图与语音指令一起提交给豆包分析

参考:
- 摄像头接入:AI/examples/06_camera_input_loop.py
- 语音分析指令:AI_online_voice/examples/05_voice_document_analysis.py

指令:
- r [秒数]:录音指定秒数(默认5秒),识别文本并提交当前截图进行联合分析
- p:回放最近一次录音
- h:帮助
- q:退出
"""

import os
import sys
import time
import threading
import wave
import base64
from typing import Optional

# OpenCV 依赖
try:
    import cv2
except Exception:
    cv2 = None
    print("[依赖缺失] 未安装 opencv-python,请先安装:pip install opencv-python")

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.dirname(CURRENT_DIR)
sys.path.append(PROJECT_ROOT)

from utils.audio_processor import AudioProcessor
import config

# 复用实验03中的客户端(已内联并修复鉴权逻辑)
import importlib.util
EXP03_PATH = os.path.join(PROJECT_ROOT, "examples", "03_voice_image_dialogue.py")
spec = importlib.util.spec_from_file_location("exp03", EXP03_PATH)
exp03 = importlib.util.module_from_spec(spec)
spec.loader.exec_module(exp03)

DoubaoImageClient = exp03.DoubaoImageClient
XunfeiRealtimeSpeechClient = exp03.XunfeiRealtimeSpeechClient
ROOT_CONFIG = getattr(exp03, "ROOT_CONFIG", None)


class CameraStreamer:
    """摄像头实时显示与帧维护。"""
    def __init__(self, cam_index='video0', window_name: str = "Camera Feed", width: int = 1280, height: int = 720):
        self.cam_index = cam_index  # 可为索引(int)或设备名/路径(str)
        self.window_name = window_name
        self.width = width
        self.height = height
        self.cap = None
        self.thread = None
        self.running = False
        self.current_frame = None

    def _open_capture(self, source):
        """在不同平台尝试打开摄像头,支持 'video0' 语义。"""
        # 将 'video0' 规范化为平台兼容的来源
        if isinstance(source, str):
            s = source.lower().strip()
            if s == 'video0':
                if os.name == 'nt':
                    # Windows 不存在 /dev/video0,映射为索引 0
                    source = 0
                else:
                    # 非 Windows 按设备路径打开
                    source = "/dev/video0"
            elif s.startswith("/dev/video"):
                # Linux/WSL 等直接使用设备路径
                source = s
            else:
                # 尝试将字符串转换为索引
                try:
                    source = int(s)
                except Exception:
                    # 无法解析则回退到索引 0
                    source = 0
        # 按平台选择后端
        if os.name == 'nt':
            # 依次尝试 DSHOW -> MSMF -> 默认
            cap = cv2.VideoCapture(source, cv2.CAP_DSHOW)
            if not cap or not cap.isOpened():
                cap = cv2.VideoCapture(source, cv2.CAP_MSMF)
            if not cap or not cap.isOpened():
                cap = cv2.VideoCapture(source)
        else:
            # 非 Windows 默认后端通常为 V4L2
            cap = cv2.VideoCapture(source)
        return cap

    def start(self) -> bool:
        if cv2 is None:
            print("[错误] OpenCV 未安装,无法启动摄像头窗口")
            return False
        try:
            # 打开摄像头(支持 'video0' 映射)
            self.cap = self._open_capture(self.cam_index)
            if not self.cap or not self.cap.isOpened():
                print(f"[错误] 无法打开摄像头源:{self.cam_index},请检查设备或权限")
                print("[提示] 可尝试:--camera 0 / --camera video0 / --camera /dev/video0")
                return False
            # 对齐示例参数:设置采集分辨率为 1280x720(若设备支持)
            try:
                self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
                self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
            except Exception:
                pass
            # 打印实际分辨率,便于诊断
            try:
                actual_w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                actual_h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                print(f"[摄像头] 已打开源={self.cam_index},实际分辨率={actual_w}x{actual_h}")
            except Exception:
                pass
            cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL)
            cv2.resizeWindow(self.window_name, 1280, 720)
            self.running = True
            # 不再在子线程中显示画面,改由主线程循环显示,避免部分平台窗口不出现的问题
            return True
        except Exception as e:
            print(f"[摄像头启动失败] {e}")
            return False

    def _loop(self):
        while self.running:
            ret, frame = self.cap.read()
            if not ret:
                time.sleep(0.05)
                continue
            # 按原始分辨率显示,避免缩小
            self.current_frame = frame.copy()
            cv2.imshow(self.window_name, frame)
            # 处理窗口事件
            if cv2.waitKey(1) & 0xFF == 27:  # ESC 退出显示,仅关闭窗口,不退出程序
                pass
        try:
            cv2.destroyWindow(self.window_name)
        except Exception:
            pass
    def update_display(self) -> int:
        """读取一帧并显示在窗口,由主线程循环调用。返回按键码(无按键为 -1)。"""
        if not self.cap:
            return -1
        ret, frame = self.cap.read()
        if not ret:
            time.sleep(0.05)
            return -1
        self.current_frame = frame.copy()
        cv2.imshow(self.window_name, frame)
        key = cv2.waitKey(1) & 0xFF
        return key

    def snapshot_to_file(self, path: str) -> Optional[str]:
        if cv2 is None:
            return None
        frame = self.current_frame
        if frame is None:
            print("[提示] 当前没有可用帧,请稍后重试")
            return None
        try:
            # 将 BGR 帧编码为 JPEG 并保存
            ok, buf = cv2.imencode(".jpg", frame)
            if not ok:
                print("[错误] 帧编码失败")
                return None
            with open(path, "wb") as f:
                f.write(buf.tobytes())
            return path
        except Exception as e:
            print(f"[快照保存失败] {e}")
            return None

    def stop(self):
        self.running = False
        try:
            time.sleep(0.1)
        except Exception:
            pass
        try:
            if self.cap:
                self.cap.release()
        except Exception:
            pass


class VoiceCameraAnalysisApp:
    def __init__(self, cam_source: Optional[str] = None):
        self.processor = AudioProcessor()
        self.asr = XunfeiRealtimeSpeechClient()
        self.doubao = DoubaoImageClient()
        # 允许通过参数或环境变量选择摄像头源,默认使用 'video0'
        source = cam_source if cam_source is not None else os.getenv("CAMERA_SOURCE", "video0")
        self.camera = CameraStreamer(cam_index=source)
        self.last_audio: Optional[str] = None
        self.last_wav: Optional[str] = None
        self.snapshot_path = os.path.join(PROJECT_ROOT, "assets", "camera_snapshot.jpg")
        self._ensure_assets_dir()

    def _ensure_assets_dir(self):
        assets_dir = os.path.join(PROJECT_ROOT, "assets")
        os.makedirs(assets_dir, exist_ok=True)

    def print_help(self):
        print("\n指令帮助:")
        print("  r [秒数]  录音指定秒数(默认5秒),并提交当前摄像头截图 + 语音文本进行分析")
        print("  p        回放最近一次录音")
        print("  h        查看帮助")
        print("  q        退出\n")

    def _take_snapshot(self) -> Optional[str]:
        path = self.snapshot_path
        snap = self.camera.snapshot_to_file(path)
        if not snap:
            print("[错误] 无法获取截图。请确认摄像头已启动且有画面。")
            return None
        return snap

    def handle_record(self, duration_sec: int):
        print(f"[操作] 开始录音 {duration_sec} 秒…")
        audio_file = self.processor.record(duration_sec)
        if not audio_file:
            print("[错误] 录音失败")
            return
        self.last_audio = audio_file
        try:
            with wave.open(audio_file, "rb") as wf:
                print(f"[音频信息] rate={wf.getframerate()}, ch={wf.getnchannels()}, bits={wf.getsampwidth()*8}")
        except Exception:
            pass
        wav_path = self.processor.convert_to_wav(audio_file)
        if not wav_path:
            print("[错误] 转换 WAV 失败")
            return
        self.last_wav = wav_path
        print("[识别] 讯飞实时识别…")
        text = self.asr.transcribe_audio_ws(wav_path)
        if not text:
            print("[识别失败] 未获取到文本")
            return
        print(f"[识别结果] {text}")

        print("[摄像头] 获取当前画面截图…")
        snap_path = self._take_snapshot()
        if not snap_path:
            return
        print(f"[截图] {snap_path}")

        print("[豆包] 提交截图 + 指令进行分析…")
        try:
            sys_prompt = getattr(ROOT_CONFIG, "SYSTEM_PROMPT", None) if ROOT_CONFIG else None
            # 复用豆包图像接口:文本 + 图片
            reply = self.doubao.chat_with_image_file(text, snap_path, system_prompt=sys_prompt)
            if reply:
                print("[豆包回复]", reply)
            else:
                print("[豆包回复] None")
        except Exception as e:
            print("[豆包错误]", e)

    def handle_play(self):
        if not self.last_audio:
            print("[提示] 尚无可回放的录音。请先使用 r 指令录音。")
            return
        print("[播放] 回放最近一次录音…")
        self.processor.play(self.last_audio)

    def run_legacy(self):
        print("\n=== 06 摄像头接入 + 语音分析(讯飞 + 豆包)实验 ===")
        self.print_help()
        ok = self.camera.start()
        if not ok:
            print("[错误] 摄像头未能启动,后续分析将无法截图")
        else:
            print("[提示] 摄像头窗口已启动(ESC 可关闭窗口但不影响程序)。")
        while True:
            try:
                cmd = input("请输入指令 (r/p/h/q): ").strip()
            except (EOFError, KeyboardInterrupt):
                print("\n[退出]")
                break
            if not cmd:
                continue
            if cmd == "q":
                print("[退出]")
                break
            if cmd == "h":
                self.print_help()
                continue
            if cmd == "p":
                self.handle_play()
                continue
            if cmd.startswith("r"):
                parts = cmd.split()
                duration = 5
                if len(parts) >= 2:
                    try:
                        duration = int(parts[1])
                    except Exception:
                        print("[提示] 秒数无效,使用默认 5 秒")
                self.handle_record(duration)
    def run(self):
        print("\n=== 06 摄像头接入 + 语音分析(讯飞 + 豆包)实验 ===")
        self.print_help()
        ok = self.camera.start()
        if not ok:
            print("[错误] 摄像头未能启动,无法显示实时画面与截图分析。")
            return
        print("[提示] 摄像头窗口已启动(窗口内按 Q 退出,或在终端输入 q)。")

        stop_flag = False

        def input_loop():
            nonlocal stop_flag
            while not stop_flag:
                try:
                    cmd = input("请输入指令 (r/p/h/q): ").strip()
                except (EOFError, KeyboardInterrupt):
                    print("\n[退出]")
                    stop_flag = True
                    break
                if not cmd:
                    continue
                if cmd == "q":
                    print("[退出]")
                    stop_flag = True
                    break
                if cmd == "h":
                    self.print_help()
                    continue
                if cmd == "p":
                    self.handle_play()
                    continue
                if cmd.startswith("r"):
                    parts = cmd.split()
                    duration = 5
                    if len(parts) >= 2:
                        try:
                            duration = int(parts[1])
                        except Exception:
                            print("[提示] 秒数无效,使用默认 5 秒")
                    self.handle_record(duration)

        t = threading.Thread(target=input_loop, daemon=True)
        t.start()

        # 主线程循环显示摄像头画面
        while not stop_flag:
            try:
                key = self.camera.update_display()
                if key in (ord('q'), ord('Q')):
                    stop_flag = True
                    break
            except Exception:
                time.sleep(0.05)
                continue

        self.camera.stop()


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="摄像头接入 + 语音分析")
    parser.add_argument("--camera", type=str, default=os.getenv("CAMERA_SOURCE", "video0"),
                        help="摄像头源: 索引(如 0)或设备名(如 video0)/路径(/dev/video0)")
    args = parser.parse_args()
    VoiceCameraAnalysisApp(cam_source=args.camera).run()
在 GitHub 上编辑此页
上次更新:
贡献者: wuziqing
Prev
实验05-多模态文档分析-语音对话