Python 动态模块外部通信方法

本案例演示如何在工作流中通过 Python 动态模块将识别结果可靠地推送到外部系统,涵盖 HTTP 与 WebSocket 两类常见通信方式。

组成与流程

Python 外部通信工作流
  1. 人脸识别模块:运行,输出人脸预测与身份信息(含 unknown 判定)。

  2. 边界框可视化:绘制检测框以便查看识别位置。

  3. 标签可视化:在图像上叠加类别标签(身份名称)。

  4. 仪表盘展示:将可视化图像推送到仪表盘页面。

  5. 自定义 Python 模块:整理身份识别结果,构造消息并向外部服务 url 发送 HTTP POST;返回状态码与调试信息。

Python 外部通信工作流

工作流定义

查看工作流 JSON
  1{
  2  "version": "1.0",
  3  "inputs": [
  4    {
  5      "type": "WorkflowImage",
  6      "name": "image"
  7    }
  8  ],
  9  "steps": [
 10    {
 11      "type": "daoai/face_recognition_model@v1",
 12      "name": "face_recognition_model_1",
 13      "comments": null,
 14      "images": "$inputs.image",
 15      "max_faces": 10,
 16      "similarity_threshold": 0.5,
 17      "regions": null
 18    },
 19    {
 20      "type": "core/bounding_box_visualization@v1",
 21      "name": "bounding_box_visualization_1",
 22      "comments": null,
 23      "image": "$inputs.image",
 24      "copy_image": true,
 25      "predictions": "$steps.face_recognition_model_1.predictions",
 26      "color_palette": "DEFAULT",
 27      "palette_size": 10,
 28      "custom_colors": [],
 29      "color_axis": "CLASS",
 30      "thickness": 2,
 31      "roundness": 0
 32    },
 33    {
 34      "type": "core/label_visualization@v1",
 35      "name": "label_visualization_1",
 36      "comments": null,
 37      "image": "$steps.bounding_box_visualization_1.image",
 38      "copy_image": true,
 39      "predictions": "$steps.face_recognition_model_1.predictions",
 40      "color_palette": "DEFAULT",
 41      "palette_size": 10,
 42      "custom_colors": [],
 43      "color_axis": "CLASS",
 44      "text": "Class",
 45      "text_position": "TOP_LEFT",
 46      "text_color": "WHITE",
 47      "text_scale": 1,
 48      "text_thickness": 1,
 49      "text_padding": 10,
 50      "border_radius": 0
 51    },
 52    {
 53      "type": "daoai/dashboard_visualization@v1",
 54      "name": "dashboard_visualization_1",
 55      "comments": null,
 56      "image": "$steps.label_visualization_1.image"
 57    },
 58    {
 59      "type": "Face_recognition_handler",
 60      "name": "Face_recognition_handler",
 61      "url": "http://127.0.0.1:9000",
 62      "prediction": "$steps.face_recognition_model_1.predictions"
 63    }
 64  ],
 65  "outputs": [
 66    {
 67      "type": "JsonField",
 68      "name": "face_recognition_model_1_1",
 69      "coordinates_system": "own",
 70      "selector": "$steps.face_recognition_model_1.predictions"
 71    },
 72    {
 73      "type": "JsonField",
 74      "name": "bounding_box_visualization_1_1",
 75      "coordinates_system": "own",
 76      "selector": "$steps.bounding_box_visualization_1.image"
 77    },
 78    {
 79      "type": "JsonField",
 80      "name": "label_visualization_1_1",
 81      "coordinates_system": "own",
 82      "selector": "$steps.label_visualization_1.image"
 83    },
 84    {
 85      "type": "JsonField",
 86      "name": "message",
 87      "coordinates_system": "own",
 88      "selector": "$steps.Face_recognition_handler.message"
 89    },
 90    {
 91      "type": "JsonField",
 92      "name": "status_code",
 93      "coordinates_system": "own",
 94      "selector": "$steps.Face_recognition_handler.status_code"
 95    },
 96    {
 97      "type": "JsonField",
 98      "name": "debug_message",
 99      "coordinates_system": "own",
100      "selector": "$steps.Face_recognition_handler.debug_message"
101    }
102  ],
103  "dynamic_blocks_definitions": [
104    {
105      "type": "DynamicBlockDefinition",
106      "manifest": {
107        "type": "ManifestDescription",
108        "description": "处理人脸识别的结果,并发送http请求",
109        "block_type": "Face_recognition_handler",
110        "inputs": {
111          "prediction": {
112            "type": "DynamicInputDefinition",
113            "selector_types": [
114              "input_parameter",
115              "step_output"
116            ],
117            "selector_data_kind": {
118              "input_parameter": [
119                "object_detection_prediction"
120              ],
121              "step_output": [
122                "object_detection_prediction"
123              ]
124            }
125          },
126          "url": {
127            "type": "DynamicInputDefinition",
128            "selector_types": [
129              "input_parameter",
130              "step_output"
131            ],
132            "selector_data_kind": {
133              "input_parameter": [
134                "string"
135              ],
136              "step_output": [
137                "string"
138              ]
139            },
140            "value_types": [
141              "string"
142            ]
143          }
144        },
145        "outputs": {
146          "message": {
147            "type": "DynamicOutputDefinition",
148            "kind": [
149              "string"
150            ]
151          },
152          "status_code": {
153            "type": "DynamicOutputDefinition",
154            "kind": [
155              "string"
156            ]
157          },
158          "debug_message": {
159            "type": "DynamicOutputDefinition",
160            "kind": [
161              "string"
162            ]
163          },
164          "time": {
165            "type": "DynamicOutputDefinition",
166            "kind": [
167              "string"
168            ]
169          },
170          "identity": {
171            "type": "DynamicOutputDefinition",
172            "kind": [
173              "string"
174            ]
175          }
176        }
177      },
178      "code": {
179        "type": "PythonCode",
180        "run_function_code": "import time\nimport requests\nimport numpy as np\n\n\ndef run(self, prediction, url) -> BlockResult:\n    start_time = time.time()\n\n    try:\n        # class_name 是 array-like(numpy / pandas)\n        class_names = prediction.data['class_name']\n\n        message = \"未检测到人员\"\n        identity = \"\"\n\n        if class_names is not None and len(class_names) > 0:\n            # 转成字符串列表,防止 numpy 类型问题\n            class_list = [str(c) for c in class_names]\n\n            if np.any(np.char.lower(class_names.astype(str)) == \"unknown\"):\n                message = \"未在身份库中匹配到对应人员\"\n                identity = \"unknown\"\n            else:\n                unique_people = sorted(set(class_list))\n                identity = \", \".join(unique_people)\n                message = f\"识别到人员:{identity}\"\n\n        payload = {\n            \"message\": message,\n            \"timestamp\": int(time.time())\n        }\n\n        response = requests.post(\n            url,\n            json=payload,\n            timeout=5\n        )\n\n        return {\n            \"message\": message,\n            \"identity\": identity,          # ✅ 按你要求加在 return 里\n            \"status_code\": response.status_code,\n            \"debug_message\": response.text,\n            \"time\": round(time.time() - start_time, 3)\n        }\n\n    except Exception as e:\n        return {\n            \"message\": None,\n            \"identity\": \"\",\n            \"status_code\": -1,\n            \"debug_message\": str(e),\n            \"time\": round(time.time() - start_time, 3)\n        }\n"
181      }
182    }
183  ]
184}

外部通信示例

下面给出两种常见通信方式的参考实现,可直接复制到 Python 动态模块中使用。请根据实际需求修改目标 url 地址与消息内容格式。

HTTP 通信

示例:使用 HTTP POST 发送当前时间戳。

import time
import requests


def run(self) -> BlockResult:
    url = "http://127.0.0.1:8000/event"  # 按需修改

    payload = {
        "timestamp": int(time.time() * 1000)
    }

    try:
        response = requests.post(url, json=payload, timeout=5)

        return {
            "message": f"sent timestamp_ms {payload['timestamp']}",
            "status_code": response.status_code
        }

    except Exception as e:
        return {
            "message": str(e),
            "status_code": -1
        }

WebSocket 通信

使用原生库实现的最小 WebSocket 客户端,同样以发送当前时间戳为例。

import os
import ssl
import time
import json
import base64
import socket
import hashlib
from urllib.parse import urlparse


def _ws_send_text_once(ws_url: str, text: str, timeout: float = 5.0) -> None:
    u = urlparse(ws_url)
    if u.scheme not in ("ws", "wss"):
        raise ValueError(f"Unsupported scheme: {u.scheme}")

    host = u.hostname
    port = u.port or (443 if u.scheme == "wss" else 80)
    path = u.path or "/"
    if u.query:
        path += "?" + u.query

    # 1) TCP connect (+ TLS if wss)
    sock = socket.create_connection((host, port), timeout=timeout)
    if u.scheme == "wss":
        ctx = ssl.create_default_context()
        sock = ctx.wrap_socket(sock, server_hostname=host)

    try:
        # 2) WebSocket handshake
        key = base64.b64encode(os.urandom(16)).decode("ascii")
        req = (
            f"GET {path} HTTP/1.1\r\n"
            f"Host: {host}:{port}\r\n"
            f"Upgrade: websocket\r\n"
            f"Connection: Upgrade\r\n"
            f"Sec-WebSocket-Key: {key}\r\n"
            f"Sec-WebSocket-Version: 13\r\n"
            f"\r\n"
        )
        sock.sendall(req.encode("utf-8"))

        # Read handshake response headers
        buf = b""
        while b"\r\n\r\n" not in buf:
            chunk = sock.recv(4096)
            if not chunk:
                raise RuntimeError("Handshake failed: connection closed")
            buf += chunk

        header_blob = buf.split(b"\r\n\r\n", 1)[0].decode("iso-8859-1")
        lines = header_blob.split("\r\n")
        status_line = lines[0]
        if " 101 " not in status_line:
            raise RuntimeError(f"Handshake failed: {status_line}")

        headers = {}
        for line in lines[1:]:
            if ":" in line:
                k, v = line.split(":", 1)
                headers[k.strip().lower()] = v.strip()

        accept = headers.get("sec-websocket-accept")
        if not accept:
            raise RuntimeError("Handshake failed: missing Sec-WebSocket-Accept")

        magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
        expected = base64.b64encode(hashlib.sha1((key + magic).encode("ascii")).digest()).decode("ascii")
        if accept != expected:
            raise RuntimeError("Handshake failed: bad Sec-WebSocket-Accept")

        # 3) Send a single masked text frame
        payload = text.encode("utf-8")
        if len(payload) > 125:
            raise ValueError("This minimal client supports payload length <= 125 bytes")

        fin_opcode = 0x81  # FIN=1, opcode=1(text)
        mask_bit_len = 0x80 | len(payload)  # MASK=1 + payload len
        mask_key = os.urandom(4)
        masked = bytes(b ^ mask_key[i % 4] for i, b in enumerate(payload))

        frame = bytes([fin_opcode, mask_bit_len]) + mask_key + masked
        sock.sendall(frame)

    finally:
        try:
            sock.close()
        except Exception:
            pass


def run(self) -> BlockResult:
    ws_url = "ws://127.0.0.1:8000/ws"  # 按需改
    try:
        # 毫秒级时间戳
        ts_ms = int(time.time() * 1000)

        _ws_send_text_once(ws_url, str(ts_ms), timeout=5.0)

        return {
            "status_code": 0,
            "message": f"sent timestamp_ms {ts_ms}"
        }

    except Exception as e:
        return {
            "status_code": -1,
            "message": str(e)
        }