Run Custom Model on Board
Overview
After completing the model conversion, the next step is to deploy and run the custom model on the GM-3568JHF development board. This chapter will detail two main deployment methods and demonstrate how to write a complete image classification application through practical cases.
5.1 Comparison of Two Deployment Methods
Python API Deployment
Advantages
- High Development Efficiency: Concise Python syntax, fast development speed.
- Convenient Debugging: Rich support for debugging tools and libraries.
- Rich Ecosystem: A large number of third-party libraries can be used directly.
- Prototype Verification: Suitable for rapid prototyping and algorithm verification.
- Strong Flexibility: Easy to modify and extend functionality.
Disadvantages
- Performance Overhead: Python interpretation execution has performance loss.
- Memory Usage: Uses more memory relative to C++.
- Startup Time: Interpreter startup and module loading take time.
- Complex Dependencies: Requires Python runtime environment and related libraries.
Applicable Scenarios
# Suitable for the following scenarios:
scenarios = [
"Algorithm prototype verification",
"Rapid functional demonstration",
"Teaching and learning",
"Complex data processing",
"Integration with other Python services",
"Applications with less strict performance requirements"
]C/C++ API Deployment
Advantages
- Excellent Performance: Compiled execution, high running efficiency.
- Memory Efficiency: Small memory footprint, high resource utilization.
- Fast Startup: No interpreter overhead, short startup time.
- Simple Deployment: Compiled executable file, few dependencies.
- System Integration: Easy to integrate with system services and other C/C++ programs.
Disadvantages
- Complex Development: Requires handling memory management, pointers, etc.
- Difficult Debugging: Relatively limited debugging tools.
- Development Cycle: Longer compilation-test cycle.
- Flexibility: Modifying functionality requires recompilation.
Applicable Scenarios
// Suitable for the following scenarios:
std::vector<std::string> scenarios = {
"Production environment deployment",
"Applications with high real-time requirements",
"Resource-constrained embedded systems",
"System services and daemons",
"Interaction with hardware low-level",
"Large-scale batch processing"
};Performance Comparison Test
# Performance test script
import time
import numpy as np
from rknnlite.api import RKNNLite
def benchmark_python_api(model_path, test_data, iterations=100):
"""Python API Performance Test"""
rknn = RKNNLite()
# Load model
start_time = time.time()
ret = rknn.load_rknn(model_path)
load_time = time.time() - start_time
if ret != 0:
print("Failed to load model")
return None
# Init runtime
start_time = time.time()
ret = rknn.init_runtime()
init_time = time.time() - start_time
if ret != 0:
print("Failed to init runtime")
return None
# Inference performance test
inference_times = []
for i in range(iterations):
start_time = time.time()
outputs = rknn.inference(inputs=[test_data])
inference_time = time.time() - start_time
inference_times.append(inference_time)
# Calculate statistics
avg_inference_time = np.mean(inference_times)
min_inference_time = np.min(inference_times)
max_inference_time = np.max(inference_times)
std_inference_time = np.std(inference_times)
# Release resources
rknn.release()
results = {
'load_time': load_time,
'init_time': init_time,
'avg_inference_time': avg_inference_time,
'min_inference_time': min_inference_time,
'max_inference_time': max_inference_time,
'std_inference_time': std_inference_time,
'fps': 1.0 / avg_inference_time
}
return results
def print_benchmark_results(python_results, cpp_results=None):
"""Print performance test results"""
print("=" * 60)
print("Performance Test Results Comparison")
print("=" * 60)
print(f"\nPython API Performance:")
print(f" Model Load Time: {python_results['load_time']:.4f}s")
print(f" Runtime Init: {python_results['init_time']:.4f}s")
print(f" Avg Inference Time: {python_results['avg_inference_time']:.4f}s")
print(f" Min Inference Time: {python_results['min_inference_time']:.4f}s")
print(f" Max Inference Time: {python_results['max_inference_time']:.4f}s")
print(f" Inference Time Std Dev: {python_results['std_inference_time']:.4f}s")
print(f" Avg FPS: {python_results['fps']:.2f}")
if cpp_results:
print(f"\nC++ API Performance:")
print(f" Model Load Time: {cpp_results['load_time']:.4f}s")
print(f" Runtime Init: {cpp_results['init_time']:.4f}s")
print(f" Avg Inference Time: {cpp_results['avg_inference_time']:.4f}s")
print(f" Avg FPS: {cpp_results['fps']:.2f}")
print(f"\nPerformance Improvement:")
speedup = python_results['avg_inference_time'] / cpp_results['avg_inference_time']
print(f" C++ vs Python Inference Speedup: {speedup:.2f}x")
fps_improvement = (cpp_results['fps'] - python_results['fps']) / python_results['fps'] * 100
print(f" FPS Improvement: {fps_improvement:.1f}%")
# Usage example
if __name__ == "__main__":
model_path = "models/resnet18_rk3568.rknn"
test_data = np.random.rand(1, 3, 224, 224).astype(np.float32)
python_results = benchmark_python_api(model_path, test_data)
print_benchmark_results(python_results)5.2 Python API Deployment Explanation
Basic API Usage
Core Class and Methods
from rknnlite.api import RKNNLite
import numpy as np
import cv2
class RKNNInference:
"""RKNN Inference Wrapper Class"""
def __init__(self, model_path, verbose=True):
self.model_path = model_path
self.rknn = RKNNLite(verbose=verbose)
self.is_loaded = False
self.is_initialized = False
def load_model(self):
"""Load RKNN model"""
print(f"Loading model: {self.model_path}")
ret = self.rknn.load_rknn(self.model_path)
if ret != 0:
raise RuntimeError(f"Failed to load model, error code: {ret}")
self.is_loaded = True
print("Model loaded successfully")
def init_runtime(self, target='rk3568', device_id=None):
"""Initialize runtime environment"""
if not self.is_loaded:
raise RuntimeError("Please load model first")
print("Initializing runtime environment...")
ret = self.rknn.init_runtime(target=target, device_id=device_id)
if ret != 0:
raise RuntimeError(f"Failed to init runtime, error code: {ret}")
self.is_initialized = True
print("Runtime initialized successfully")
def inference(self, input_data):
"""Execute inference"""
if not self.is_initialized:
raise RuntimeError("Please init runtime first")
# Ensure input data format is correct
if isinstance(input_data, np.ndarray):
inputs = [input_data]
elif isinstance(input_data, list):
inputs = input_data
else:
raise ValueError("Input data must be numpy array or list of arrays")
# Execute inference
outputs = self.rknn.inference(inputs=inputs)
if outputs is None:
raise RuntimeError("Inference failed")
return outputs
def get_model_info(self):
"""Get model info"""
if not self.is_loaded:
raise RuntimeError("Please load model first")
# Get input output info
input_info = self.rknn.get_input_info()
output_info = self.rknn.get_output_info()
return {
'input_info': input_info,
'output_info': output_info
}
def release(self):
"""Release resources"""
if hasattr(self, 'rknn') and self.rknn:
self.rknn.release()
print("Resources released")
# Usage example
def basic_usage_example():
"""Basic usage example"""
model_path = "models/resnet18_rk3568.rknn"
# Create inference object
inference = RKNNInference(model_path)
try:
# Load model
inference.load_model()
# Init runtime
inference.init_runtime()
# Get model info
model_info = inference.get_model_info()
print("Model info:", model_info)
# Prepare test data
test_data = np.random.rand(1, 3, 224, 224).astype(np.float32)
# Execute inference
outputs = inference.inference(test_data)
print(f"Inference output shape: {[output.shape for output in outputs]}")
finally:
# Release resources
inference.release()
if __name__ == "__main__":
basic_usage_example()Image Preprocessing Module
import cv2
import numpy as np
from typing import Tuple, List, Optional
class ImagePreprocessor:
"""Image Preprocessing Class"""
def __init__(self, target_size=(224, 224), mean=(0.485, 0.456, 0.406),
std=(0.229, 0.224, 0.225), bgr_to_rgb=True):
self.target_size = target_size
self.mean = np.array(mean, dtype=np.float32)
self.std = np.array(std, dtype=np.float32)
self.bgr_to_rgb = bgr_to_rgb
def resize_image(self, image: np.ndarray, keep_ratio: bool = True) -> np.ndarray:
"""Resize image"""
h, w = image.shape[:2]
target_h, target_w = self.target_size
if keep_ratio:
# Resize maintaining aspect ratio
scale = min(target_w / w, target_h / h)
new_w, new_h = int(w * scale), int(h * scale)
# Resize image
resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
# Create target size canvas
canvas = np.zeros((target_h, target_w, 3), dtype=image.dtype)
# Calculate paste position (centered)
start_h = (target_h - new_h) // 2
start_w = (target_w - new_w) // 2
canvas[start_h:start_h + new_h, start_w:start_w + new_w] = resized
return canvas, scale, (start_w, start_h)
else:
# Resize directly to target size
resized = cv2.resize(image, (target_w, target_h), interpolation=cv2.INTER_LINEAR)
return resized, None, None
def normalize(self, image: np.ndarray) -> np.ndarray:
"""Image normalization"""
# Convert to float32 and normalize to [0,1]
image = image.astype(np.float32) / 255.0
# Standardize
image = (image - self.mean) / self.std
return image
def preprocess(self, image: np.ndarray, keep_ratio: bool = True) -> Tuple[np.ndarray, dict]:
"""Complete preprocessing pipeline"""
original_shape = image.shape[:2]
# BGR to RGB
if self.bgr_to_rgb and len(image.shape) == 3:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Resize
if keep_ratio:
image, scale, offset = self.resize_image(image, keep_ratio=True)
preprocess_info = {
'original_shape': original_shape,
'scale': scale,
'offset': offset,
'target_size': self.target_size
}
else:
image, _, _ = self.resize_image(image, keep_ratio=False)
preprocess_info = {
'original_shape': original_shape,
'target_size': self.target_size
}
# Normalize
image = self.normalize(image)
# Convert to NCHW format
image = np.transpose(image, (2, 0, 1))
image = np.expand_dims(image, axis=0)
return image, preprocess_info
# Usage example
def preprocess_example():
"""Preprocessing usage example"""
# Create preprocessor
preprocessor = ImagePreprocessor(
target_size=(224, 224),
mean=(0.485, 0.456, 0.406),
std=(0.229, 0.224, 0.225)
)
# Read image
image_path = "test_images/cat.jpg"
image = cv2.imread(image_path)
if image is None:
print(f"Cannot read image: {image_path}")
return
print(f"Original image shape: {image.shape}")
# Preprocess
processed_image, info = preprocessor.preprocess(image, keep_ratio=True)
print(f"Processed image shape: {processed_image.shape}")
print(f"Preprocess info: {info}")
return processed_image, info
if __name__ == "__main__":
preprocess_example()Post-processing Module
import numpy as np
from typing import List, Tuple, Dict, Optional
class PostProcessor:
"""Post-processing Base Class"""
def __init__(self):
pass
def process(self, outputs: List[np.ndarray], **kwargs) -> Dict:
"""Process model output"""
raise NotImplementedError
class ClassificationPostProcessor(PostProcessor):
"""Classification Task Post-processing"""
def __init__(self, class_names: Optional[List[str]] = None, top_k: int = 5):
super().__init__()
self.class_names = class_names
self.top_k = top_k
def softmax(self, x: np.ndarray) -> np.ndarray:
"""Softmax activation function"""
exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
return exp_x / np.sum(exp_x, axis=-1, keepdims=True)
def process(self, outputs: List[np.ndarray], **kwargs) -> Dict:
"""Process classification model output"""
if len(outputs) == 0:
raise ValueError("Output is empty")
# Get first output (usually classification logits)
logits = outputs[0]
# If 4D tensor, reshape to 2D
if len(logits.shape) == 4:
logits = logits.reshape(logits.shape[0], -1)
elif len(logits.shape) == 3:
logits = logits.reshape(logits.shape[0], -1)
# Apply softmax
probabilities = self.softmax(logits)
results = []
for i, probs in enumerate(probabilities):
# Get top-k results
top_indices = np.argsort(probs)[::-1][:self.top_k]
top_probs = probs[top_indices]
# Build results
predictions = []
for idx, prob in zip(top_indices, top_probs):
prediction = {
'class_id': int(idx),
'probability': float(prob),
'confidence': float(prob)
}
if self.class_names and idx < len(self.class_names):
prediction['class_name'] = self.class_names[idx]
predictions.append(prediction)
results.append({
'predictions': predictions,
'top1_class_id': int(top_indices[0]),
'top1_probability': float(top_probs[0])
})
return {
'results': results,
'batch_size': len(results)
}
class DetectionPostProcessor(PostProcessor):
"""Object Detection Post-processing"""
def __init__(self, class_names: Optional[List[str]] = None,
conf_threshold: float = 0.5, nms_threshold: float = 0.4):
super().__init__()
self.class_names = class_names
self.conf_threshold = conf_threshold
self.nms_threshold = nms_threshold
def xywh2xyxy(self, boxes: np.ndarray) -> np.ndarray:
"""Convert bounding box format from xywh to xyxy"""
xyxy = boxes.copy()
xyxy[:, 0] = boxes[:, 0] - boxes[:, 2] / 2 # x1
xyxy[:, 1] = boxes[:, 1] - boxes[:, 3] / 2 # y1
xyxy[:, 2] = boxes[:, 0] + boxes[:, 2] / 2 # x2
xyxy[:, 3] = boxes[:, 1] + boxes[:, 3] / 2 # y2
return xyxy
def nms(self, boxes: np.ndarray, scores: np.ndarray, threshold: float) -> List[int]:
"""Non-Maximum Suppression"""
if len(boxes) == 0:
return []
# Calculate area
areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
# Sort by score
order = scores.argsort()[::-1]
keep = []
while len(order) > 0:
i = order[0]
keep.append(i)
if len(order) == 1:
break
# Calculate IoU
xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0])
yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1])
xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2])
yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3])
w = np.maximum(0, xx2 - xx1)
h = np.maximum(0, yy2 - yy1)
intersection = w * h
union = areas[i] + areas[order[1:]] - intersection
iou = intersection / union
# Keep boxes with IoU less than threshold
indices = np.where(iou <= threshold)[0]
order = order[indices + 1]
return keep
def process(self, outputs: List[np.ndarray], input_shape: Tuple[int, int] = (640, 640),
original_shape: Optional[Tuple[int, int]] = None) -> Dict:
"""Process detection model output"""
if len(outputs) == 0:
raise ValueError("Output is empty")
# YOLOv5 output format: [batch, num_anchors, 85] (4 + 1 + 80)
predictions = outputs[0]
if len(predictions.shape) == 3:
predictions = predictions[0] # Take first batch
# Filter low confidence detections
conf_mask = predictions[:, 4] >= self.conf_threshold
predictions = predictions[conf_mask]
if len(predictions) == 0:
return {'detections': [], 'count': 0}
# Extract bounding boxes, confidences and class probabilities
boxes = predictions[:, :4]
confidences = predictions[:, 4]
class_probs = predictions[:, 5:]
# Calculate class scores
class_scores = confidences[:, np.newaxis] * class_probs
class_ids = np.argmax(class_scores, axis=1)
scores = np.max(class_scores, axis=1)
# Convert bounding box format
boxes = self.xywh2xyxy(boxes)
# Scale to original image size
if original_shape:
scale_x = original_shape[1] / input_shape[1]
scale_y = original_shape[0] / input_shape[0]
boxes[:, [0, 2]] *= scale_x
boxes[:, [1, 3]] *= scale_y
# Apply NMS
keep_indices = self.nms(boxes, scores, self.nms_threshold)
# Build final results
detections = []
for i in keep_indices:
detection = {
'bbox': boxes[i].tolist(),
'confidence': float(scores[i]),
'class_id': int(class_ids[i])
}
if self.class_names and class_ids[i] < len(self.class_names):
detection['class_name'] = self.class_names[class_ids[i]]
detections.append(detection)
return {
'detections': detections,
'count': len(detections)
}
# Usage example
def postprocess_example():
"""Post-processing usage example"""
# Classification post-processing example
class_names = ['cat', 'dog', 'bird', 'fish', 'horse']
classifier = ClassificationPostProcessor(class_names=class_names, top_k=3)
# Simulate classification output
classification_output = [np.random.rand(1, 5)]
classification_results = classifier.process(classification_output)
print("Classification Results:")
for result in classification_results['results']:
print(f"Top-1: {result['predictions'][0]}")
# Detection post-processing example
coco_classes = ['person', 'bicycle', 'car', 'motorcycle', 'airplane']
detector = DetectionPostProcessor(class_names=coco_classes)
# Simulate detection output
detection_output = [np.random.rand(1, 25200, 85)] # YOLOv5 output format
detection_results = detector.process(detection_output)
print(f"\nDetection Results: Found {detection_results['count']} objects")
if __name__ == "__main__":
postprocess_example()5.3 C/C++ API Deployment Explanation
Basic C++ API Usage
// rknn_inference.h
#ifndef RKNN_INFERENCE_H
#define RKNN_INFERENCE_H
#include <vector>
#include <string>
#include <memory>
#include "rknn_api.h"
class RKNNInference {
public:
RKNNInference();
~RKNNInference();
// Basic functions
int loadModel(const std::string& model_path);
int initRuntime();
int inference(const std::vector<void*>& inputs, std::vector<void*>& outputs);
void release();
// Info getters
rknn_input_output_num getIONum() const { return io_num_; }
rknn_tensor_attr* getInputAttrs() const { return input_attrs_; }
rknn_tensor_attr* getOutputAttrs() const { return output_attrs_; }
// Utility functions
static void printTensorAttr(const rknn_tensor_attr& attr);
static size_t getTensorSize(const rknn_tensor_attr& attr);
private:
rknn_context ctx_;
rknn_input_output_num io_num_;
rknn_tensor_attr* input_attrs_;
rknn_tensor_attr* output_attrs_;
bool is_loaded_;
bool is_initialized_;
void cleanup();
};
#endif // RKNN_INFERENCE_H// rknn_inference.cpp
#include "rknn_inference.h"
#include <iostream>
#include <fstream>
#include <cstring>
RKNNInference::RKNNInference()
: ctx_(0), input_attrs_(nullptr), output_attrs_(nullptr),
is_loaded_(false), is_initialized_(false) {
memset(&io_num_, 0, sizeof(io_num_));
}
RKNNInference::~RKNNInference() {
cleanup();
}
int RKNNInference::loadModel(const std::string& model_path) {
std::cout << "Loading model: " << model_path << std::endl;
// Read model file
std::ifstream file(model_path, std::ios::binary | std::ios::ate);
if (!file.is_open()) {
std::cerr << "Failed to open model file: " << model_path << std::endl;
return -1;
}
size_t model_size = file.tellg();
file.seekg(0, std::ios::beg);
std::vector<char> model_data(model_size);
if (!file.read(model_data.data(), model_size)) {
std::cerr << "Failed to read model file" << std::endl;
return -1;
}
file.close();
// Init RKNN context
int ret = rknn_init(&ctx_, model_data.data(), model_size, 0, nullptr);
if (ret < 0) {
std::cerr << "rknn_init failed: " << ret << std::endl;
return ret;
}
// Get input output count
ret = rknn_query(ctx_, RKNN_QUERY_IN_OUT_NUM, &io_num_, sizeof(io_num_));
if (ret < 0) {
std::cerr << "rknn_query RKNN_QUERY_IN_OUT_NUM failed: " << ret << std::endl;
return ret;
}
std::cout << "Model input num: " << io_num_.n_input
<< ", output num: " << io_num_.n_output << std::endl;
// Get input attributes
input_attrs_ = new rknn_tensor_attr[io_num_.n_input];
memset(input_attrs_, 0, sizeof(rknn_tensor_attr) * io_num_.n_input);
for (uint32_t i = 0; i < io_num_.n_input; i++) {
input_attrs_[i].index = i;
ret = rknn_query(ctx_, RKNN_QUERY_INPUT_ATTR, &input_attrs_[i], sizeof(rknn_tensor_attr));
if (ret < 0) {
std::cerr << "rknn_query input attr " << i << " failed: " << ret << std::endl;
return ret;
}
std::cout << "Input " << i << " attr:" << std::endl;
printTensorAttr(input_attrs_[i]);
}
// Get output attributes
output_attrs_ = new rknn_tensor_attr[io_num_.n_output];
memset(output_attrs_, 0, sizeof(rknn_tensor_attr) * io_num_.n_output);
for (uint32_t i = 0; i < io_num_.n_output; i++) {
output_attrs_[i].index = i;
ret = rknn_query(ctx_, RKNN_QUERY_OUTPUT_ATTR, &output_attrs_[i], sizeof(rknn_tensor_attr));
if (ret < 0) {
std::cerr << "rknn_query output attr " << i << " failed: " << ret << std::endl;
return ret;
}
std::cout << "Output " << i << " attr:" << std::endl;
printTensorAttr(output_attrs_[i]);
}
is_loaded_ = true;
std::cout << "Model loaded successfully" << std::endl;
return 0;
}
int RKNNInference::initRuntime() {
if (!is_loaded_) {
std::cerr << "Model not loaded" << std::endl;
return -1;
}
std::cout << "Initializing runtime..." << std::endl;
int ret = rknn_init_runtime(ctx_, nullptr);
if (ret < 0) {
std::cerr << "rknn_init_runtime failed: " << ret << std::endl;
return ret;
}
is_initialized_ = true;
std::cout << "Runtime initialized successfully" << std::endl;
return 0;
}
int RKNNInference::inference(const std::vector<void*>& inputs, std::vector<void*>& outputs) {
if (!is_initialized_) {
std::cerr << "Runtime not initialized" << std::endl;
return -1;
}
if (inputs.size() != io_num_.n_input) {
std::cerr << "Input size mismatch: expected " << io_num_.n_input
<< ", got " << inputs.size() << std::endl;
return -1;
}
// Set inputs
std::vector<rknn_input> rknn_inputs(io_num_.n_input);
for (uint32_t i = 0; i < io_num_.n_input; i++) {
rknn_inputs[i].index = i;
rknn_inputs[i].buf = inputs[i];
rknn_inputs[i].size = getTensorSize(input_attrs_[i]);
rknn_inputs[i].pass_through = 0;
rknn_inputs[i].type = RKNN_TENSOR_UINT8;
rknn_inputs[i].fmt = RKNN_TENSOR_NHWC;
}
int ret = rknn_inputs_set(ctx_, io_num_.n_input, rknn_inputs.data());
if (ret < 0) {
std::cerr << "rknn_inputs_set failed: " << ret << std::endl;
return ret;
}
// Execute inference
ret = rknn_run(ctx_, nullptr);
if (ret < 0) {
std::cerr << "rknn_run failed: " << ret << std::endl;
return ret;
}
// Get outputs
std::vector<rknn_output> rknn_outputs(io_num_.n_output);
for (uint32_t i = 0; i < io_num_.n_output; i++) {
rknn_outputs[i].want_float = 1;
rknn_outputs[i].is_prealloc = 0;
}
ret = rknn_outputs_get(ctx_, io_num_.n_output, rknn_outputs.data(), nullptr);
if (ret < 0) {
std::cerr << "rknn_outputs_get failed: " << ret << std::endl;
return ret;
}
// Copy output data
outputs.resize(io_num_.n_output);
for (uint32_t i = 0; i < io_num_.n_output; i++) {
size_t output_size = rknn_outputs[i].size;
outputs[i] = malloc(output_size);
memcpy(outputs[i], rknn_outputs[i].buf, output_size);
}
// Release RKNN output buffers
rknn_outputs_release(ctx_, io_num_.n_output, rknn_outputs.data());
return 0;
}
void RKNNInference::release() {
cleanup();
}
void RKNNInference::cleanup() {
if (input_attrs_) {
delete[] input_attrs_;
input_attrs_ = nullptr;
}
if (output_attrs_) {
delete[] output_attrs_;
output_attrs_ = nullptr;
}
if (ctx_) {
rknn_destroy(ctx_);
ctx_ = 0;
}
is_loaded_ = false;
is_initialized_ = false;
}
void RKNNInference::printTensorAttr(const rknn_tensor_attr& attr) {
std::cout << " index=" << attr.index << ", name=" << attr.name
<< ", n_dims=" << attr.n_dims << ", dims=[";
for (uint32_t i = 0; i < attr.n_dims; i++) {
std::cout << attr.dims[i];
if (i < attr.n_dims - 1) std::cout << ", ";
}
std::cout << "], n_elems=" << attr.n_elems;
std::cout << ", size=" << attr.size << ", fmt=" << attr.fmt
<< ", type=" << attr.type << ", qnt_type=" << attr.qnt_type << std::endl;
}
size_t RKNNInference::getTensorSize(const rknn_tensor_attr& attr) {
size_t size = 1;
for (uint32_t i = 0; i < attr.n_dims; i++) {
size *= attr.dims[i];
}
switch (attr.type) {
case RKNN_TENSOR_FLOAT32:
return size * sizeof(float);
case RKNN_TENSOR_FLOAT16:
return size * sizeof(uint16_t);
case RKNN_TENSOR_INT8:
case RKNN_TENSOR_UINT8:
return size * sizeof(uint8_t);
case RKNN_TENSOR_INT16:
case RKNN_TENSOR_UINT16:
return size * sizeof(uint16_t);
case RKNN_TENSOR_INT32:
case RKNN_TENSOR_UINT32:
return size * sizeof(uint32_t);
case RKNN_TENSOR_INT64:
case RKNN_TENSOR_UINT64:
return size * sizeof(uint64_t);
default:
return size;
}
}Image Processing Utils Class
// image_utils.h
#ifndef IMAGE_UTILS_H
#define IMAGE_UTILS_H
#include <opencv2/opencv.hpp>
#include <vector>
class ImageUtils {
public:
struct PreprocessInfo {
cv::Size original_size;
cv::Size target_size;
float scale;
cv::Point2f offset;
};
static cv::Mat resizeKeepRatio(const cv::Mat& image, cv::Size target_size,
PreprocessInfo& info);
static cv::Mat normalize(const cv::Mat& image,
const std::vector<float>& mean = {0.485, 0.456, 0.406},
const std::vector<float>& std = {0.229, 0.224, 0.225});
static std::vector<uint8_t> matToUint8(const cv::Mat& image);
static cv::Mat uint8ToMat(const std::vector<uint8_t>& data, cv::Size size, int type);
// Draw results
static void drawClassification(cv::Mat& image, const std::string& class_name,
float confidence, cv::Point position = cv::Point(10, 30));
static void drawDetection(cv::Mat& image, const cv::Rect& bbox,
const std::string& label, float confidence);
};
#endif // IMAGE_UTILS_H// image_utils.cpp
#include "image_utils.h"
#include <iostream>
cv::Mat ImageUtils::resizeKeepRatio(const cv::Mat& image, cv::Size target_size,
PreprocessInfo& info) {
info.original_size = image.size();
info.target_size = target_size;
float scale_x = static_cast<float>(target_size.width) / image.cols;
float scale_y = static_cast<float>(target_size.height) / image.rows;
info.scale = std::min(scale_x, scale_y);
int new_width = static_cast<int>(image.cols * info.scale);
int new_height = static_cast<int>(image.rows * info.scale);
cv::Mat resized;
cv::resize(image, resized, cv::Size(new_width, new_height), 0, 0, cv::INTER_LINEAR);
// Create target size canvas
cv::Mat canvas = cv::Mat::zeros(target_size, image.type());
// Calculate center position
int offset_x = (target_size.width - new_width) / 2;
int offset_y = (target_size.height - new_height) / 2;
info.offset = cv::Point2f(offset_x, offset_y);
// Place resized image to canvas center
cv::Rect roi(offset_x, offset_y, new_width, new_height);
resized.copyTo(canvas(roi));
return canvas;
}
cv::Mat ImageUtils::normalize(const cv::Mat& image,
const std::vector<float>& mean,
const std::vector<float>& std) {
cv::Mat normalized;
image.convertTo(normalized, CV_32F, 1.0 / 255.0);
std::vector<cv::Mat> channels;
cv::split(normalized, channels);
for (size_t i = 0; i < channels.size() && i < mean.size(); i++) {
channels[i] = (channels[i] - mean[i]) / std[i];
}
cv::merge(channels, normalized);
return normalized;
}