首页
商城
  • English
  • 简体中文
首页
商城
  • English
  • 简体中文
  • 产品系列

    • FPGA+ARM

      • GM-3568JHF

        • 一、简介

          • GM-3568JHF 简介
        • 二、快速开始

          • 00 前言
          • 01 环境搭建
          • 02 编译说明
          • 03 烧录指南
          • 04 调试工具
          • 05 软件更新
          • 06 查看信息
          • 07 测试命令
          • 08 应用编译
          • 09 源码获取
        • 三、外设与接口

          • 01 USB
          • 02 显示与触摸
          • 03 以太网
          • 04 WIFI
          • 05 蓝牙
          • 06 TF-Card
          • 07 音频
          • 08 串口
          • 09 CAN
          • 10 RTC
        • 四、应用开发

          • 01 UART读写案例
          • 02 按键检测案例
          • 03 LED灯闪烁案例
          • 04 MIPI屏幕检测案例
          • 05 读取 USB 设备信息案例
          • 06 FAN 检测案例
          • 07 FPGA FSPI 通信案例
          • 08 FPGA DMA 读写案例
          • 09 GPS调试案例
          • 10 以太网测试案例
          • 11 RS485读写案例
          • 12 FPGA IIC 读写案例
          • 13 PN532 NFC读卡案例
          • 14 TF卡读写案例
        • 五、QT开发

          • 01 ARM64交叉编译器环境搭建
          • 02 QT 程序加入开机自启服务
        • 六、RKNN_NPU开发

          • 01 RK3568 NPU 概述
          • 02 开发环境搭建
          • 运行官方 YOLOv5 示例
        • 七、FPGA开发

          • ARM与FPGA通讯
          • FPGA开发手册
        • 八、其他

          • 01 根目录文件系统的修改
          • 02 系统自启服务
        • 九、资料下载

          • 资料下载
    • ShimetaPi

      • M4-R1

        • 一、简介

          • M4-R1简介
        • 二、快速上手

          • 01 OpenHarmony概述
          • 02 镜像烧录
          • 03 应用开发快速上手
          • 04 设备开发快速上手
        • 三、应用开发

          • 3.1 ArkUI

            • 1 ArkTS语言简介
            • 2 UI 组件-Row 容器介绍
            • 3 UI 组件-Column 容器介绍
            • 4 UI 组件-Text 组件
            • 5 UI 组件-Toggle 组件
            • 6 UI 组件-Slider 组件
            • 7 UI 组件-Animation 组件&Transition 组件
          • 3.2 资料获取

            • 1 OpenHarmony 官方资料
          • 3.3 开发须知

            • 1 Full-SDK替换教程
            • 2 引入和使用三方库
            • 3 HDC调试
            • 4 命令行恢复出厂模式
            • 5 升级App为system权限
          • 3.4 构建第一个应用

            • 1 构建第一个ArkTs应用-HelloWorld
          • 3.5 案例

            • 01 串口调试助手应用案例
            • 02 手写板应用案例
            • 03 数字时钟应用案例
            • 04 WIFI 信息获取应用案例
        • 四、设备开发

          • 4.1 Ubuntu环境开发

            • 01 环境搭建
            • 02 下载源码
            • 03 编译源码
          • 4.2 使用DevEco Device Tool 工具

            • 01 工具简介
            • 02 开发环境的搭建
            • 03 导入SDK
            • 04 HUAWEI DevEco Tool 功能介绍
        • 五、内核外设与接口

          • 5.1 指南
          • 5.2 设备树介绍
          • 5.3 NAPI 入门
          • 5.4 ArkTS入门
          • 5.5 NAPI开发实战演示
          • 5.6 GPIO介绍
          • 5.7 I2C通讯
          • 5.8 SPI通信
          • 5.9 PWM 控制
          • 5.10 串口通讯
          • 5.11 TF卡
          • 5.12 屏幕
          • 5.13 触摸
          • 5.14 Ethernet(以太网)
          • 5.15 M.2 硬盘
          • 5.16 音频
          • 5.17 WIFI & BT
          • 5.18 摄像头
        • 六、资料下载

          • 资料下载
      • M5-R1

        • 一、简介

          • M5-R1 开发文档
        • 二、快速上手

          • 镜像烧录
          • 环境搭建
          • 下载源码
        • 三、外设与接口

          • 3.1 树莓派接口
          • 3.2 GPIO接口
          • 3.3 I2C接口
          • 3.4 SPI通信
          • 3.5 PWM控制
          • 3.6 串口通信
          • 3.7 TF卡插槽
          • 3.8 显示屏
          • 3.9 触摸屏
          • 3.10 音频
          • 3.11 RTC
          • 3.12 以太网
          • 3.13 M.2接口
          • 3.14 MINI PCIE接口
          • 3.15 摄像头
          • 3.16 WIFI蓝牙
        • 四、资料下载

          • 资料下载
    • 开源鸿蒙

      • SC-3568HA

        • 一、简介

          • SC-3568HA简介
        • 二、快速上手

          • OpenHarmony概述
          • 镜像烧录
          • 开发环境准备
          • Hello World应用以及部署
        • 三、应用开发

          • 3.1 ArkUI

            • 第一章 ArkTS语言简介
            • 第二章 UI组件介绍和实际应用(上)
            • 第三章 UI组件介绍和实际应用(中)
            • 第四章 UI组件介绍和实际应用(下)
          • 3.2 拓展

            • 第一章 入门指引
            • 第二章 三方库的引用和使用
            • 第三章 应用编译以及部署
            • 第四章 命令行恢复出厂设置
            • 第五章 系统调试--HDC调试
            • 第六章 APP 稳定性测试
            • 第七章 应用测试
        • 四、设备开发

          • 4.1 环境搭建
          • 4.2 源码下载
          • 4.3 源码编译
        • 五、内核的外设与接口

          • 5.1 树莓派接口
          • 5.2 GPIO 接口
          • 5.3 I2C 接口
          • 5.4 SPI 通信
          • 5.5 PWM 控制
          • 5.6 串口通信
          • 5.7 TF卡插槽
          • 5.8 显示屏
          • 5.9 触摸屏
          • 5.10 音频
          • 5.11 RTC
          • 5.12 以太网
          • 5.13 M.2接口
          • 5.14 MINI PCIE接口
          • 5.15 摄像头
          • 5.16 WIFI蓝牙
          • 5.17 树莓派拓展板
        • 六、资料下载

          • 资料下载
      • M-K1HSE

        • 一、简介

          • 1.1 产品简介
        • 二、快速开始

          • 2.1 调试工具安装
          • 2.2 开发环境搭建
          • 2.3 源码下载
          • 2.4 编译说明
          • 2.5 烧录指南
          • 2.6 APT 更新源
          • 2.7 查看板卡信息
          • 2.8 命令行 LED 和按键测试
          • 2.9 GCC 编译程序
        • 三、应用开发

          • 3.1 基础应用开发

            • 3.1.1 开发环境准备
            • 3.1.2 第一个应用 HelloWorld
            • 3.1.3 开发 HAR 包
          • 3.2 外设应用案例

            • 3.2.1 UART 读写
            • 3.2.2 按键实验
            • 3.2.3 LED 闪烁
        • 四、外设与接口

          • 4.1 标准外设

            • 4.1.1 USB
            • 4.1.2 显示与触摸
            • 4.1.3 以太网
            • 4.1.4 WIFI
            • 4.1.5 蓝牙
            • 4.1.6 TF卡
            • 4.1.7 音频
            • 4.1.8 串口
            • 4.1.9 CAN
            • 4.1.10 RTC
          • 4.2 接口

            • 4.2.1 音频
            • 4.2.2 RS485
            • 4.2.3 显示
            • 4.2.4 触摸
        • 五、系统定制开发

          • 5.1 系统移植
          • 5.2 系统定制
          • 5.3 驱动开发
          • 5.4 系统调试
          • 5.5 OTA 升级
        • 六、资料下载

          • 6.1 资料下载
    • EVS相机

      • CF-NRS1

        • 一、简介

          • 1.1 关于 CF-NRS1
          • 1.2 基于事件的概念
          • 1.3 快速开始
          • 1.4 资源
        • 二、开发

          • 2.1 开发概览

            • 2.1.1 Shimetapi 混合相机 SDK 简介
          • 2.2 环境与API

            • 2.2.1 环境说明
            • 2.2.2 开发 API 说明
          • 2.3 Linux开发

            • 2.3.1 Linux SDK 简介
            • 2.3.2 Linux SDK API
            • 2.3.3 Linux 算法
            • 2.3.4 Linux 算法 API
          • 2.4 服务与Web

            • 2.4.1 EVS 服务器
            • 2.4.2 时间服务器
            • 2.4.3 EVS Web
        • 三、资料下载

          • 3.1 资料下载
        • 四、常见问题

          • 4.1 常见问题
      • CF-CRA2

        • 一、简介

          • 1.1 关于 CF-CRA2
        • 二、资料下载

          • 2.1 资料下载
      • EVS模块

        • 一、相关概念
        • 二、硬件准备与环境配置
        • 三、示例程序使用指南
        • 资料下载
    • AI硬件

      • 1684XB-32T

        • 一、简介

          • AIBOX-1684XB-32简介
        • 二、快速上手

          • 初次使用
          • 网络配置
          • 磁盘使用
          • 内存分配
          • 风扇策略
          • 固件升级
          • 交叉编译
          • 模型量化
        • 三、应用开发

          • 3.1 开发简介

            • Sophgo SDK开发
            • SOPHON-DEMO简介
          • 3.2 大语言模型

            • 部署Llama3示例
            • Sophon LLM_api_server开发
            • 部署MiniCPM-V-2_6
            • Qwen-2-5-VL图片视频识别DEMO
            • Qwen3-chat-DEMO
            • Qwen3-Qwen Agent-MCP开发
            • Qwen3-langchain-AI Agent
          • 3.3 深度学习

            • ResNet(图像分类)
            • LPRNet(车牌识别)
            • SAM(通用图像分割基础模型)
            • YOLOv5(目标检测)
            • OpenPose(人体关键点检测)
            • PP-OCR(光学字符识别)
        • 四、资料下载

          • 资料下载
      • 1684X-416T

        • 一、简介

          • 1.1 产品简介
        • 二、Demo简单操作指引

          • 2.1 智慧监控Demo使用说明
      • RDK-X5

        • 一、简介

          • RDK-X5 硬件简介
        • 二、快速开始

          • RDK-X5 快速开始
        • 三、应用开发

          • 3.1 AI在线模型开发

            • 实验01-接入火山引擎豆包 AI
            • 实验02-图片分析
            • 实验03-多模态视觉分析定位
            • 实验04-多模态图文比较分析
            • 实验05-多模态文档表格分析
            • 实验06-摄像头运用-AI视觉分析
          • 3.2 大语言模型

            • 实验01-语音识别
            • 实验02-语音对话
            • 实验03-多模态图片分析-语音对话
            • 实验04-多模态图片比较-语音对话
            • 实验05-多模态文档分析-语音对话
            • 实验06-多模态视觉运用-语音对话
          • 3.3 40pin-IO开发

            • 实验01-GPIO 输出(LED闪烁)
            • 实验02-GPIO 输入
            • 实验03-按键控制 LED
            • 实验04-PWM 输出
            • 实验05-串口输出
            • 实验06-IIC 实验
            • 实验07-SPI 实验
          • 3.4 USB模块开发使用

            • 实验01-USB 语音模块使用
            • 实验02-声源定位模块使用
          • 3.5 机器视觉技术实战

            • 实验01-打开 USB 摄像头
            • 实验02-颜色识别检测
            • 实验03-手势识别体验
            • 实验04-YOLOv5物体检测
          • 3.6 ROS2基础开发

            • 实验01-搭建环境
            • 实验02-工作包的创建及编译
            • 实验03-运行 ROS2 话题通信节点
            • 实验04-ROS2 相机应用
      • RDK-S100

        • 一、简介

          • 1.1 关于 RDK-S100
        • 二、快速开始

          • 2.1 首次使用
        • 三、应用开发

          • 3.1 AI在线模型开发

            • 3.1.1 接入火山引擎豆包 AI
            • 3.1.2 图片分析
            • 3.1.3 多模态视觉分析定位
            • 3.1.4 多模态图文比较分析
            • 3.1.5 多模态文档表格分析
            • 3.1.6 摄像头运用-AI视觉分析
          • 3.2 大语言模型

            • 3.2.1 语音识别
            • 3.2.2 语音对话
            • 3.2.3 多模态图片分析-语音对话
            • 3.2.4 多模态图片比较-语音对话
            • 3.2.5 多模态文档分析-语音对话
            • 3.2.6 多模态视觉运用-语音对话
          • 3.3 40pin-IO开发

            • 3.3.1 GPIO 输出(LED闪烁)
            • 3.3.2 GPIO 输入
            • 3.3.3 按键控制 LED
            • 3.3.4 PWM 输出
            • 3.3.5 串口输出
            • 3.3.6 IIC 实验
            • 3.3.7 SPI 实验
          • 3.4 USB模块开发使用

            • 3.4.1 USB 语音模块使用
            • 3.4.2 声源定位模块使用
          • 3.5 机器视觉技术实战

            • 3.5.1 打开 USB 摄像头
            • 3.5.2 图像处理基础
            • 3.5.3 目标检测
            • 3.5.4 图像分割
          • 3.6 ROS2基础开发

            • 3.6.1 搭建环境
            • 3.6.2 工作包的创建及编译
            • 3.6.3 运行 ROS2 话题通信节点
            • 3.6.4 ROS2 相机应用
    • 核心板

      • C-3568BQ

        • 一、简介

          • C-3568BQ 简介
      • C-3588LQ

        • 一、简介

          • C-3588LQ 简介
      • GC-3568JBAF

        • 一、简介

          • GC-3568JBAF 简介
      • C-K1BA

        • 一、简介

          • C-K1BA 简介

语音LLM应用

实验01-语音识别

实验准备:

1.安装 ALSA 工具(用于录音和播放)

sudo apt-get install alsa-utils

2.安装所需的依赖

1.pip install -r requirements.txt

2.python -m pip install websocket-client

3.sudo apt-get update && sudo apt-get install -y ffmpeg

3.注册讯飞账号

  1. 登录讯飞开放平台 https://www.xfyun.com.cn
  2. 点击进入控制台
  3. 注册登录账号
  4. 创建应用
  5. 开通语音识别-语音听写服务
  6. 获取APIID、APISecret、APIKey、语音听写接口地址四项信息
  7. 将四项信息保存(后续代码填入config.py中)

实验步骤:

  1. 检查使用语音模块 (确保语音模块与RDK主板以及喇叭已正确连接)

终端运行: arecord -l #识别麦克风的卡号与设备号(关注 card X 和 device Y )

终端运行: aplay -l #检查扬声器/输出设备

终端运行: sudo arecord -f S16_LE -r 16000 -c 1 -d 5 /tmp/test_mic.wav #使用默认设备录 5 秒,16k/单声道/16bit:

终端运行: aplay /tmp/test_mic.wav #播放音频

  1. 将APIID、APISecret、APIKey、语音听写接口地址四项信息填入config.py
TOOL
  1. cd AI_online_voice #进入功能包
  2. python examples/01_voice_chat.py #运行示例程序 输入r 开始测试

终端运行效果如下:

TOOL

实验效果:开始录音(默认为5秒,如若需要修改时长,输入 r+时长 既可),录音完毕后播放音频,随后将音频上传讯飞语音听写大模型,最后将识别结果返回Linux终端)

"""
01_voice_chat.py

功能:
- 录制语音
- 使用讯飞 WebSocket API 将语音转为文本
- 在终端打印识别结果(专注于语音转文字)

依赖:
- arecord: 用于录制音频(Linux)
- aplay: 用于播放音频(Linux)
- websocket-client: 用于与讯飞 WebSocket API 通信
- 请在 AI_online_voice/config.py 中填写 XUNFEI_APPID / XUNFEI_API_KEY / XUNFEI_API_SECRET / XUNFEI_WS_URL
"""

import os
import sys
import time
from typing import Optional

# 加入父目录,便于示例脚本直接运行
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

# 仅保留音频处理,暂不使用豆包对话
from utils.audio_processor import AudioProcessor

# 讯飞 WebSocket 所需依赖与配置
try:
    import websocket  # websocket-client
except ImportError:
    websocket = None
# 新增:导入超时异常类型用于精细日志
try:
    from websocket import WebSocketTimeoutException
except Exception:
    class WebSocketTimeoutException(Exception):
        pass

import json
import base64
import hmac
import hashlib
import ssl
import wave
from email.utils import formatdate
from urllib.parse import urlparse, quote

from config import (
    XUNFEI_APPID,
    XUNFEI_API_KEY,
    XUNFEI_API_SECRET,
    XUNFEI_WS_URL,
    REQUEST_TIMEOUT,
)


class XunfeiRealtimeSpeechClient:
    """讯飞语音识别(IAT流式WebSocket版)客户端(更新的消息格式与解析)"""

    def __init__(self, app_id: str = None, api_key: str = None, api_secret: str = None, ws_url: str = None):
        self.app_id = app_id or XUNFEI_APPID
        self.api_key = api_key or XUNFEI_API_KEY
        self.api_secret = api_secret or XUNFEI_API_SECRET
        self.ws_url = ws_url or XUNFEI_WS_URL
        self.timeout = REQUEST_TIMEOUT
        self._validate_config()

    def _validate_config(self):
        if not self.app_id or self.app_id == "你的讯飞APPID":
            raise ValueError("请配置正确的讯飞APPID")
        if not self.api_key or self.api_key == "你的讯飞API_KEY":
            raise ValueError("请配置正确的讯飞API_KEY")
        if not self.api_secret or self.api_secret == "你的讯飞API_SECRET":
            raise ValueError("请配置正确的讯飞API_SECRET")
        if websocket is None:
            raise RuntimeError("未安装 websocket-client,请先安装:python -m pip install websocket-client")

    def _rfc1123_date(self) -> str:
        # 生成GMT时间,RFC1123格式
        return formatdate(usegmt=True)

    def _assemble_auth_url(self) -> str:
        """根据APIKey与APISecret生成带鉴权参数的WS URL"""
        parsed = urlparse(self.ws_url)
        host = parsed.netloc
        path = parsed.path
        date = self._rfc1123_date()

        # signature 原始串:
        signature_origin = f"host: {host}\n" + f"date: {date}\n" + f"GET {path} HTTP/1.1"
        # 使用 apiSecret 做 HMAC-SHA256
        signature_sha = hmac.new(self.api_secret.encode("utf-8"), signature_origin.encode("utf-8"), hashlib.sha256).digest()
        signature = base64.b64encode(signature_sha).decode("utf-8")

        # authorization 原始串
        authorization_origin = (
            f"api_key=\"{self.api_key}\", "
            f"algorithm=\"hmac-sha256\", "
            f"headers=\"host date request-line\", "
            f"signature=\"{signature}\""
        )
        authorization = base64.b64encode(authorization_origin.encode("utf-8")).decode("utf-8")

        # 拼接最终URL
        auth_url = (
            f"{self.ws_url}?authorization={quote(authorization)}&date={quote(date)}&host={quote(host)}"
        )
        return auth_url

    def _parse_result_segments(self, result_obj: dict) -> str:
        """解析服务端 data.result.ws 结构为纯文本"""
        try:
            parts = []
            ws_arr = result_obj.get("ws")
            if isinstance(ws_arr, list):
                for ws in ws_arr:
                    cw_arr = ws.get("cw") if isinstance(ws, dict) else None
                    if isinstance(cw_arr, list):
                        for cw in cw_arr:
                            w = cw.get("w") if isinstance(cw, dict) else None
                            if w:
                                parts.append(w)
            return "".join(parts)
        except Exception:
            return ""

    def _safe_json_loads(self, text: str):
        try:
            return json.loads(text)
        except Exception:
            try:
                cleaned = text.strip()
                start = cleaned.find("{")
                end = cleaned.rfind("}")
                if start != -1 and end != -1 and end > start:
                    return json.loads(cleaned[start:end+1])
            except Exception:
                return None

    def transcribe_audio_ws(self, audio_file: str) -> Optional[str]:
        """将音频文件以流式方式发送到讯飞IAT WS接口并获取识别文本"""
        if not os.path.exists(audio_file):
            print(f"音频文件不存在: {audio_file}")
            return None

        # 解析wav
        try:
            wf = wave.open(audio_file, "rb")
        except Exception as e:
            print(f"打开音频文件失败: {e}")
            return None

        framerate = wf.getframerate()
        channels = wf.getnchannels()
        sampwidth = wf.getsampwidth()  # bytes per sample

        # 建议参数:16k, 单声道, 16bit
        if framerate not in (8000, 16000):
            print(f"采样率异常({framerate}),建议使用16k或8k")
        if channels != 1:
            print(f"通道数为{channels},建议使用单声道")
        if sampwidth != 2:
            print(f"位深为{sampwidth*8}bit,建议16bit")

        auth_url = self._assemble_auth_url()
        ws = None
        try:
            ws = websocket.create_connection(
                auth_url,
                timeout=self.timeout,
                sslopt={"cert_reqs": ssl.CERT_NONE},
            )
            ws.settimeout(self.timeout)

            # 计算每帧40ms对应的帧数
            frames_per_chunk = max(1, int(framerate * 0.04))

            # 构建格式字符串,例如 audio/L16;rate=16000;channel=1
            fmt = f"audio/L{sampwidth*8};rate={framerate};channel={channels}"

            # 初始化增量聚合与最终状态标记
            final_text_parts = []
            saw_final_status = False

            # 发送首帧(status=0)
            first_chunk = wf.readframes(frames_per_chunk)
            first_payload = base64.b64encode(first_chunk).decode("utf-8") if first_chunk else ""
            first_frame = {
                "common": {"app_id": self.app_id},
                "business": {
                    "domain": "iat",
                    "language": "zh_cn",
                    "accent": "mandarin",
                    "vinfo": 1,
                    "vad_eos": 2000,
                    "ptt": 0,
                },
                "data": {
                    "status": 0,
                    "format": fmt,
                    "encoding": "raw",
                    "audio": first_payload,
                },
            }
            try:
                ws.send(json.dumps(first_frame, separators=(",", ":")))
            except Exception as e:
                print(f"发送首帧失败: {e}")
                print("可能原因:鉴权失败或 WS URL 错误导致服务端立即关闭连接")
                return None

            # 增强:首帧后循环尝试接收,打印并积累增量结果
            try:
                ws.settimeout(1.0)
                for _ in range(3):
                    try:
                        pre_resp_text = ws.recv()
                    except WebSocketTimeoutException:
                        break
                    if not pre_resp_text:
                        break
                    pre_resp = self._safe_json_loads(pre_resp_text)
                    if not pre_resp:
                        print(f"[首帧返回-非JSON] {pre_resp_text}")
                        break
                    code = pre_resp.get("code")
                    message = pre_resp.get("message")
                    if code is None:
                        header = pre_resp.get("header", {})
                        code = header.get("code", 0)
                        message = header.get("message")
                    data = pre_resp.get("data", {})
                    status = data.get("status")
                    print(f"[首帧返回] code={code}, status={status}, message={message}")
                    if code != 0:
                        desc = message or "识别错误"
                        print(f"识别错误(连接初期): code={code}, message={desc}")
                        return None
                    result = data.get("result")
                    if result:
                        segment = self._parse_result_segments(result)
                        if segment:
                            final_text_parts.append(segment)
                            print(f"[增量结果-首帧] {segment}")
                    if status == 2:
                        saw_final_status = True
                        break
            except Exception as e:
                print(f"[首帧接收日志] {e}")
            finally:
                ws.settimeout(self.timeout)

            # 发送中间帧(status=1)
            while True:
                chunk = wf.readframes(frames_per_chunk)
                if not chunk or saw_final_status:
                    break
                frame = {
                    "common": {"app_id": self.app_id},
                    "data": {
                        "status": 1,
                        "format": fmt,
                        "encoding": "raw",
                        "audio": base64.b64encode(chunk).decode("utf-8"),
                    },
                }
                try:
                    ws.send(json.dumps(frame, separators=(",", ":")))
                except Exception as e:
                    print(f"发送中间帧失败: {e}")
                    print("可能原因:连接已被服务端关闭(鉴权/配置错误、URL错误、参数不匹配)")
                    return None
                # 每次发送后短暂接收,积累增量结果
                try:
                    ws.settimeout(0.5)
                    resp_text_mid = ws.recv()
                    if resp_text_mid:
                        resp_mid = self._safe_json_loads(resp_text_mid)
                        if not resp_mid:
                            print(f"[中间帧返回-非JSON] {resp_text_mid}")
                        else:
                            code_mid = resp_mid.get("code")
                            msg_mid = resp_mid.get("message")
                            if code_mid is None:
                                header_mid = resp_mid.get("header", {})
                                code_mid = header_mid.get("code", 0)
                                msg_mid = header_mid.get("message")
                            data_mid = resp_mid.get("data", {})
                            status_mid = data_mid.get("status")
                            print(f"[中间帧返回] code={code_mid}, status={status_mid}, message={msg_mid}")
                            if code_mid != 0:
                                print(f"识别错误(发送中间帧后): code={code_mid}, message={msg_mid}")
                                return None
                            result_mid = data_mid.get("result")
                            if result_mid:
                                seg_mid = self._parse_result_segments(result_mid)
                                if seg_mid:
                                    final_text_parts.append(seg_mid)
                                    print(f"[增量结果-中间] {seg_mid}")
                            if status_mid == 2:
                                saw_final_status = True
                                break
                except WebSocketTimeoutException:
                    pass
                except Exception as e:
                    print(f"接收中间帧返回失败: {e}")
                    return None
                finally:
                    ws.settimeout(self.timeout)
                time.sleep(0.04)

            # 若尚未收到最终状态,发送结束帧
            if not saw_final_status:
                last_frame = {
                    "common": {"app_id": self.app_id},
                    "data": {
                        "status": 2,
                        "format": fmt,
                        "encoding": "raw",
                        "audio": "",
                    },
                }
                try:
                    ws.send(json.dumps(last_frame, separators=(",", ":")))
                except Exception as e:
                    print(f"发送结束帧失败: {e}")
                    # 即使结束帧发送失败,只要已有增量文本也返回
                    return "".join(final_text_parts) if final_text_parts else None

            # 接收最终结果(容错:超时但已有增量文本则直接返回)
            if not saw_final_status:
                while True:
                    try:
                        resp_text = ws.recv()
                    except Exception as e:
                        print(f"接收结果失败: {e}")
                        return "".join(final_text_parts) if final_text_parts else None
                    if not resp_text:
                        continue
                    resp = self._safe_json_loads(resp_text)
                    if not resp:
                        continue
                    code = resp.get("code")
                    message = resp.get("message")
                    if code is None:
                        header = resp.get("header", {})
                        code = header.get("code", 0)
                        message = header.get("message")
                    if code != 0:
                        desc = message or "识别错误"
                        print(f"识别错误: code={code}, message={desc}")
                        break
                    data = resp.get("data", {})
                    status = data.get("status")
                    result = resp.get("result") or data.get("result")
                    if result:
                        segment = self._parse_result_segments(result)
                        if segment:
                            final_text_parts.append(segment)
                    if status == 2:
                        break
            return "".join(final_text_parts) if final_text_parts else None
        finally:
            try:
                wf.close()
            except Exception:
                pass
            if ws is not None:
                try:
                    ws.close()
                except Exception:
                    pass


class VoiceChatApp:
    """语音对话应用(仅语音转文字与打印)"""

    def __init__(self):
        """初始化应用"""
        self.processor = None
        self.xunfei_ws_client = None
        self.running = False

    def initialize(self) -> bool:
        """初始化客户端和处理器"""
        try:
            self.processor = AudioProcessor()
            self.xunfei_ws_client = XunfeiRealtimeSpeechClient()
            return True
        except Exception as e:
            print(f"初始化失败: {e}")
            return False

    def print_welcome(self):
        """打印欢迎信息"""
        print("\n" + "=" * 50)
        print("语音转文字 - 讯飞 WebSocket API")
        print("=" * 50)
        print("使用说明:")
        print("1. 输入 'r' 或 'record' 开始录音并进行识别(默认5秒)")
        print("2. 输入 'p' 或 'play' <文件> 播放音频文件")
        print("3. 输入 'q' 或 'quit' 退出应用")
        print("4. 输入 'h' 或 'help' 显示帮助信息")
        print("=" * 50 + "\n")

    def print_help(self):
        """打印帮助信息"""
        print("\n" + "=" * 50)
        print("命令列表:")
        print("  r, record [秒数]    - 录制语音 (默认5秒) 并用WebSocket识别,终端打印文本")
        print("  p, play <文件>      - 播放音频文件")
        print("  q, quit             - 退出应用")
        print("  h, help             - 显示帮助信息")
        print("=" * 50 + "\n")

    def handle_command(self, command: str) -> bool:
        """处理命令"""
        parts = command.strip().split()
        if not parts:
            return True

        cmd = parts[0].lower()

        if cmd in ('q', 'quit', 'exit'):
            return False

        elif cmd in ('h', 'help'):
            self.print_help()

        elif cmd in ('r', 'record'):
            # 解析录音时长
            duration = 5
            if len(parts) > 1:
                try:
                    duration = int(parts[1])
                except ValueError:
                    print("无效的时长,使用默认值5秒")

            # 录制音频
            audio_file = self.processor.record(duration)
            if not audio_file:
                print("录音失败")
                return True

            # 新增:录音后强制转换为 16k/1ch/16bit PCM WAV
            converted_file = self.processor.convert_to_wav(audio_file)
            use_file = converted_file or audio_file
            if converted_file:
                print(f"已转换为16k/1ch/16bit: {converted_file}")
            else:
                print("转换失败,使用原始录音进行识别")

            # 新增:打印文件名与完整路径,并先播放音频
            try:
                import os
                file_name = os.path.basename(use_file)
                print(f"原始录音文件: {audio_file}")
                print(f"用于播放与识别的文件: {use_file}")
                print(f"开始播放: {file_name} | {use_file}")
                play_ok = self.processor.play(use_file)
                if not play_ok:
                    print("播放失败,但继续进行识别")
            except Exception as e:
                print(f"播放流程异常: {e},继续进行识别")

            # 使用讯飞WS实时识别
            print("正在进行实时语音识别(WebSocket)...")
            text = self.xunfei_ws_client.transcribe_audio_ws(use_file)

            if text:
                print(f"识别结果: {text}")
            else:
                print("识别失败或未返回结果")

        elif cmd in ('p', 'play'):
            if len(parts) < 2:
                print("请提供要播放的音频文件路径")
                return True
            audio_path = parts[1]
            if not os.path.exists(audio_path):
                print(f"音频文件不存在: {audio_path}")
                return True
            self.processor.play(audio_path)

        else:
            print("未知命令,请输入 'h' 或 'help' 查看帮助")

        return True

    def run(self):
        if not self.initialize():
            return

        self.running = True
        self.print_welcome()

        while self.running:
            try:
                command = input("请输入命令 (r/p/h/q): ").strip()
            except (KeyboardInterrupt, EOFError):
                print("\n收到退出信号,正在退出...")
                break

            if not command:
                continue

            if not self.handle_command(command):
                break

        print("应用已退出")


def main():
    app = VoiceChatApp()
    app.run()

if __name__ == "__main__":
    main()
在 GitHub 上编辑此页
上次更新:
贡献者: wuziqing
Next
实验02-语音对话