Skip to main content

Process camera images

While the Intrinsic platform provides perception Skills such as estimate_pose, your Solution might need custom perception capabilities. This tutorial shows you how to create a Skill that gets an image from a camera, processes it with OpenCV, and outputs results. It builds a barcode scanning Skill as an example.

To find the complete example, navigate to the bottom of the page.

Setup

You need a Solution with a camera. The Solution needs to be deployed, and you need a development environment with its target Solution set to that deployment. Follow the guides for setting up your development environment and making your first Skill if you don't have that yet.

Our example Skill is only going to implement the execute method, and not the preview method. The preview method returns what a Skill would do if executed, without actually executing it. It can be used with Skills that have multiple valid outputs. For example, there may be multiple ways to move a robot to the same pose. This Skill is different. It is either going to see barcodes or it is not, and it needs to execute to be able to get an image to detect them.

Because we are not implementing the preview method, your Solution's execution mode must be set to "Full" and not "Preview". In "Preview" mode only the preview method is called, so the Skill never does anything in "Preview" mode.

Execution mode "Full"

Create a new Skill

Create a new Python or C++ Skill in your development environment, and give it the ID com.example.scan_barcodes. When prompted for the folder name, give skills/scan_barcodes.

Python type annotations are used in this guide. If using Python, then add this import to the top of scan_barcodes.py:

from typing import List

Parameters and outputs

The parameters and outputs of the Skill need to be decided upon. They are defined by protobuf messages. Two messages are defined below: ScanBarcodesParams and ScanBarcodesResult.

The Skill doesn't accept any parameters, so the ScanBarcodesParams is empty. The ScanBarcodesResult message has fields to describe all the barcodes that are detected.

Put the following into the file scan_barcodes/scan_barcodes.proto.

syntax = "proto3";

package com.example;

message ScanBarcodesParams {}

enum BarcodeType {
BARCODE_UNSPECIFIED = 0;
BARCODE_NONE = 1;
BARCODE_EAN_8 = 2;
BARCODE_EAN_13 = 3;
BARCODE_UPC_A = 4;
BARCODE_UPC_E = 5;
BARCODE_UPC_EAN_EXTENSION = 6;
}

message Corner {
double x = 1;
double y = 2;
}

message Barcode {
BarcodeType type = 1;
string data = 2;
repeated Corner corners = 3;
}

message ScanBarcodesResult {
repeated Barcode barcodes = 1;
}

The Skill's manifest must be updated to indicate which protobuf messages are used for return values. Add (or edit) the return_type specified in scan_barcodes/scan_barcodes.manifest.textproto to be as follows.

return_type {
message_full_name: "com.example.ScanBarcodesResult"
}

Equipment

The Skill needs a camera to be able to see barcodes. This is done through the Equipment interface. This interface is separate from parameters, though it causes a drop-down box to appear under the Parameters tab in Flowstate.

First, give a name for the equipment. This is meant to be a human readable name, as it is shown in the user interface. The Skill needs to refer to it later, so define it at the top of the file as a global constant.

Add the following to scan_barcodes.py.

# Camera slot name; make sure this matches the skill manifest.
CAMERA_EQUIPMENT_SLOT: str = "camera"

Next, the Skill needs to declare the kind of equipment it needs. This is done by adding equipment types as dependencies in the Skill's manifest. The CameraConfig equipment type tells Flowstate the Skill needs a camera. Add (or edit) the dependencies section to scan_barcodes/scan_barcodes.manifest.textproto such that it looks like the following. Note that the key matches the value of your global constant.

dependencies {
required_equipment {
key: "camera"
value {
capability_names: "CameraConfig"
}
}
}

Connect to the camera

Flowstate reads the manifest and provides the Skill with information about a camera for it to use when the Skill is executed.

There is an API to access the provided camera.

Add the following import to the top of scan_barcodes.py.

from intrinsic.perception.client.v1.python.camera import cameras
from intrinsic.perception.client.v1.python.image_utils import Metadata

Add the following code as the first thing in your Skill's execute method. This code accesses the camera provided by Flowstate. Note that the return type annotation has been changed to -> scan_barcodes_pb2.ScanBarcodesResult, and the logging.info(...) statement has been deleted.

@overrides(skill_interface.Skill)
def execute(
self,
request: skill_interface.ExecuteRequest[
scan_barcodes_pb2.ScanBarcodesParams
],
context: skill_interface.ExecuteContext,
) -> scan_barcodes_pb2.ScanBarcodesResult:
# Get camera.
camera = cameras.Camera.create(context, CAMERA_EQUIPMENT_SLOT)

Bazel needs to be informed about the new dependency. Add the following dependency to scan_barcodes/BUILD.

    py_library(
...
deps = [
":scan_barcodes_py_pb2",
"@ai_intrinsic_sdks//intrinsic/perception/client/v1/python:image_utils",
"@ai_intrinsic_sdks//intrinsic/perception/client/v1/python/camera:cameras",
...

Capture an image

The Skill now has everything it needs to use the camera. First, have the Skill capture an image each time it is executed. Add the following to the execute method, below the code that gets the camera.

# Capture from the camera and get the first intensity sensor image as a
# numpy array.
capture_result = camera.capture()
intensity_images = (
s.array
for s in capture_result.sensor_images.values()
if s.array.dtype.metadata is not None
and s.array.dtype.metadata.get(Metadata.Keys.PIXEL_TYPE)
== Metadata.Values.PIXEL_INTENSITY
)
img = next(intensity_images, None)

Process the image

The Skill can get a capture result, but it still needs to do something with it. Next, make it use OpenCV to detect barcodes.

Add an import for the OpenCV library to scan_barcodes.py.

import cv2

Add an import for numpy to scan_barcodes.py too.

import numpy as np

Then declare a detector attribute on the ScanBarcodes class, and specify its type as a class from OpenCV. Make sure that the detector is initialized when an instance of the class is created.

Modify scan_barcodes.py to add a detector object on the class.

class ScanBarcodes(skill_interface.Skill):
"""Skill that connects to a camera resource and scans all visible barcodes using OpenCV."""

detector: cv2.barcode.BarcodeDetector

def __init__(self) -> None:
super().__init__()
self.detector = cv2.barcode.BarcodeDetector()

The Skill now has a detector, but it lacks the code to use it. Add the following to the execute method after the code which gets the camera capture result.

For Python, your package can get these dependencies from the Python Package Index. For more information about this process, see this guide on how to add a pip dependency to Bazel.

Add the following to the execute method in scan_barcodes.py.

# Run the detector and check results.
(ok,
decoded_data,
decoded_types,
detected_corners,
) = self.detector.detectAndDecodeWithType(img)

The Skill now depends on OpenCV and numpy. Bazel needs to be told about these; however, it is more work this time because these dependencies aren't included with the Intrinsic Flowstate SDK.

First add a dependency to rules_python and load the pip extension if not already present.

bazel_dep(name = "rules_python", version = "0.31.0")
python = use_extension("@rules_python//python/extensions:python.bzl", "python")
python.toolchain(
is_default = True,
python_version = "3.11",
)
pip = use_extension("@rules_python//python/extensions:pip.bzl", "pip")

Add the following to the bottom of your MODULE.bazel file.

pip.parse(
hub_name = "scan_barcodes_pip_deps",
python_version = "3.11",
requirements_lock = "//skills/scan_barcodes:requirements.txt",
)
use_repo(pip, "scan_barcodes_pip_deps")

The repository rule references a file called requirements.txt in the skills/scan_barcodes/ directory. Create this file adjacent to scan_barcodes.py file and put the following content into it.

numpy==1.25.0
opencv-contrib-python-headless==4.8.0.76
opencv-python-headless==4.8.0.76

Lastly Bazel needs to be told that the Skill uses these dependencies. Add the dependencies to the Skill's py_library() rule.

"@scan_barcodes_pip_deps//numpy:pkg",
"@scan_barcodes_pip_deps//opencv_contrib_python_headless:pkg",
"@scan_barcodes_pip_deps//opencv_python_headless:pkg",

Output the results from the skill

The Skill is doing something useful now. It is using OpenCV to detect barcodes in a camera image. However, the detections aren't being output yet. The Skill needs to convert the detections into a ScanBarcodesResult.

Add the following method to the ScanBarcodes class.

def convert_to_result_proto(
self,
ok: bool,
decoded_data: List[str],
decoded_types: List[int],
detected_corners: np.ndarray,
) -> scan_barcodes_pb2.ScanBarcodesResult:
if not ok:
return scan_barcodes_pb2.ScanBarcodesResult()

barcodes: List[scan_barcodes_pb2.Barcode] = []
for i, barcode_type in enumerate(decoded_types):
if barcode_type == cv2.barcode.NONE:
continue

barcode_data = decoded_data[i]
barcode_corners = detected_corners[i]

corners: List[scan_barcodes_pb2.Corner] = []
for barcode_corner in barcode_corners:
corner = scan_barcodes_pb2.Corner(
x=barcode_corner[0],
y=barcode_corner[1],
)
corners.append(corner)

barcode = scan_barcodes_pb2.Barcode(
type=convert_barcode_type_to_proto(barcode_type),
data=barcode_data,
corners=corners,
)
barcodes.append(barcode)

return scan_barcodes_pb2.ScanBarcodesResult(barcodes=barcodes)

That method expects another function to exist in order to convert the barcode type to the proto. Add this function to the file, but this time make it a top level function instead of a class method.

Add the following function to scan_barcodes.py.

def convert_barcode_type_to_proto(
barcode_type: int,
) -> scan_barcodes_pb2.BarcodeType:
"""Convert cv2 barcode type to BarcodeType proto."""
if barcode_type == "EAN_8":
return scan_barcodes_pb2.BARCODE_EAN_8
elif barcode_type == "EAN_13":
return scan_barcodes_pb2.BARCODE_EAN_13
elif barcode_type == "UPC_A":
return scan_barcodes_pb2.BARCODE_UPC_A
elif barcode_type == "UPC_E":
return scan_barcodes_pb2.BARCODE_UPC_E
elif barcode_type == "UPC_EAN_EXTENSION":
return scan_barcodes_pb2.BARCODE_UPC_EAN_EXTENSION
else:
return scan_barcodes_pb2.BARCODE_UNSPECIFIED

The very last steps are to call this function and return the converted results. Add this code to the execute method to convert the results.

Add the following to the execute method in scan_barcodes.py.

  # Convert result and return.
result = self.convert_to_result_proto(
ok, decoded_data, decoded_types, detected_corners
)

logging.info("ScanBarcodesResult: %s", result)

return result

Source code

The full source code for this example is available in the intrinsic-ai/sdk-examples repository.