It's been a bit of a battle, with a steep Python learning curve, but finally I created a single Python file that enables a time stamp, detection class, confidence and bounding box coordinates to be sent to the Arduino base station. Obviously, there's still a number of dependancies in various directories - some close by and others deeply embedded in the Python system somewhere, but here's my 'top of the stack' code:
#!/usr/bin/python3
# ****************************************************************************
# Copyright(c) 2017 Intel Corporation.
# License: MIT See LICENSE file in root directory.
# ****************************************************************************
# Detect objects on a LIVE camera feed using and send data over LoRa network
# Intel® Movidius™ Neural Compute Stick (NCS)
import os
import cv2
import sys
import numpy
import ntpath
import argparse
import mvnc.mvncapi as mvnc
from time import localtime, strftime
from utils import visualize_output
from utils import deserialize_output
from pySX127x_master import basic_test01
import datetime
import array as arr
import numpy as np
from time import sleep
from pySX127x_master.SX127x.LoRa import *
from pySX127x_master.SX127x.LoRaArgumentParser import LoRaArgumentParser
from pySX127x_master.SX127x.board_config import BOARD
#from pySX127x_master import Tegwyns_LoRa_Beacon
BOARD.setup()
parser = LoRaArgumentParser("A simple LoRa beacon")
parser.add_argument('--single', '-S', dest='single', default=False, action="store_true", help="Single transmission")
parser.add_argument('--wait', '-w', dest='wait', default=1, action="store", type=float, help="Waiting time between transmissions (default is 0s)")
myString = ""
#from pySX127x_master.Tegwyns_LoRa_Beacon import *
class LoRaBeacon(LoRa):
def __init__(self, verbose=False):
super(LoRaBeacon, self).__init__(verbose)
self.set_mode(MODE.SLEEP)
self.set_dio_mapping([1,0,0,0,0,0])
def on_rx_done(self):
print("\nRxDone")
print(self.get_irq_flags())
print(map(hex, self.read_payload(nocheck=True)))
self.set_mode(MODE.SLEEP)
self.reset_ptr_rx()
self.set_mode(MODE.RXCONT)
def on_cad_done(self):
print("\non_CadDone")
print(self.get_irq_flags())
def on_rx_timeout(self):
print("\non_RxTimeout")
print(self.get_irq_flags())
def on_valid_header(self):
print("\non_ValidHeader")
print(self.get_irq_flags())
def on_payload_crc_error(self):
print("\non_PayloadCrcError")
print(self.get_irq_flags())
def on_fhss_change_channel(self):
print("\non_FhssChangeChannel")
print(self.get_irq_flags())
def start(self):
global args
sys.stdout.write("\rstart\r")
stamp = str(datetime.datetime.now())
text=bytearray('PING LoRa Test PI: ' + stamp + (' ') + myString,'utf-8')
self.write_payload([0x00, 0x00, 0x00, 0x00] + list(text))
self.set_mode(MODE.TX)
lora = LoRaBeacon(verbose=False)
args = parser.parse_args(lora)
lora.set_pa_config(pa_select=1)
#lora.set_rx_crc(True)
#lora.set_agc_auto_on(True)
#lora.set_lna_gain(GAIN.NOT_USED)
#lora.set_coding_rate(CODING_RATE.CR4_6)
#lora.set_implicit_header_mode(False)
#lora.set_pa_config(max_power=0x04, output_power=0x0F)
#lora.set_pa_config(max_power=0x04, output_power=0b01000000)
#lora.set_low_data_rate_optim(True)
#lora.set_pa_ramp(PA_RAMP.RAMP_50_us)
#print(lora)
#assert(lora.get_lna()['lna_gain'] == GAIN.NOT_USED)
assert(lora.get_agc_auto_on() == 1)
print("Security cam config:")
print(" Wait %f s" % args.wait)
print(" Single tx = %s" % args.single)
print("")
lora.start()
# "Class of interest" - Display detections only if they match this class ID
CLASS_PERSON = 15
# Detection threshold: Minimum confidance to tag as valid detection
CONFIDANCE_THRESHOLD = 0.60 # 60% confidant
# Variable to store commandline arguments
ARGS = None
# OpenCV object for video capture
camera = None
# ---- Step 1: Open the enumerated device and get a handle to it -------------
def open_ncs_device():
# Look for enumerated NCS device(s); quit program if none found.
devices = mvnc.EnumerateDevices()
if len( devices ) == 0:
print( "No devices found" )
quit()
# Get a handle to the first enumerated device and open it
device = mvnc.Device( devices[0] )
device.OpenDevice()
return device
# ---- Step 2: Load a graph file onto the NCS device -------------------------
def load_graph( device ):
# Read the graph file into a buffer
with open( ARGS.graph, mode='rb' ) as f:
blob = f.read()
# Load the graph buffer into the NCS
graph = device.AllocateGraph( blob )
return graph
# ---- Step 3: Pre-process the images ----------------------------------------
def pre_process_image( frame ):
# Resize image [Image size is defined by choosen network, during training]
img = cv2.resize( frame, tuple( ARGS.dim ) )
# Convert RGB to BGR [OpenCV reads image in BGR, some networks may need RGB]
if( ARGS.colormode == "rgb" ):
img = img[:, :, ::-1]
# Mean subtraction & scaling [A common technique used to center the data]
img = img.astype( numpy.float16 )
img = ( img - numpy.float16( ARGS.mean ) ) * ARGS.scale
return img
# ---- Step 4: Read & print inference results from the NCS -------------------
def infer_image( graph, img, frame ):
#from pySX127x_master.Tegwyns_LoRa_Beacon import LoRaBeacon
#from pySX127x_master import Tegwyns_LoRa_Beacon
# Load the image as a half-precision floating point array
graph.LoadTensor( img, 'user object' )
# Get the results from NCS
output, userobj = graph.GetResult()
# Get execution time
inference_time = graph.GetGraphOption( mvnc.GraphOption.TIME_TAKEN )
# Deserialize the output into a python dictionary
output_dict = deserialize_output.ssd(
output,
CONFIDANCE_THRESHOLD,
frame.shape )
# Print the results (each image/frame may have multiple objects)
for i in range( 0, output_dict['num_detections'] ):
# Filter a specific class/category
if( output_dict.get( 'detection_classes_' + str(i) ) == CLASS_PERSON ):
cur_time = strftime( "%Y_%m_%d_%H_%M_%S", localtime() )
print( "Person detected on " + cur_time )
print(".... Press q to quit ..... ")
# Extract top-left & bottom-right coordinates of detected objects
(y1, x1) = output_dict.get('detection_boxes_' + str(i))[0]
(y2, x2) = output_dict.get('detection_boxes_' + str(i))[1]
#print (y1, x1)
# Prep string to overlay on the image
display_str = (
labels[output_dict.get('detection_classes_' + str(i))]
+ ": "
+ str( output_dict.get('detection_scores_' + str(i) ) )
+ "%" )
print (display_str)
print (y1, x1)
print (y2, x2)
# Overlay bounding boxes, detection class and scores
frame = visualize_output.draw_bounding_box(
y1, x1, y2, x2,
frame,
thickness=4,
color=(255, 255, 0),
display_str=display_str )
global myString
myString = display_str + " , " + "(" + str(y1) + "," + str(x1) +")" + "," + "(" + str(y2) + "," + str(x2) +")"
###########################################################################################
lora.start()
###########################################################################################
# Capture snapshots
photo = ( os.path.dirname(os.path.realpath(__file__))
+ "/captures/photo_"
+ cur_time + ".jpg" )
cv2.imwrite( photo, frame )
# If a display is available, show the image on which inference was performed
if 'DISPLAY' in os.environ:
cv2.imshow( 'NCS live inference', frame )
# ---- Step 5: Unload the graph and close the device -------------------------
def close_ncs_device( device, graph ):
graph.DeallocateGraph()
device.CloseDevice()
camera.release()
cv2.destroyAllWindows()
# ---- Main function (entry point for this script ) --------------------------
def main():
device = open_ncs_device()
graph = load_graph( device )
# Main loop: Capture live stream & send frames to NCS
while( True ):
ret, frame = camera.read()
img = pre_process_image( frame )
infer_image( graph, img, frame )
# Display the frame for 5ms, and close the window so that the next
# frame can be displayed. Close the window if 'q' or 'Q' is pressed.
if( cv2.waitKey( 5 ) & 0xFF == ord( 'q' ) ):
break
close_ncs_device( device, graph )
# ---- Define 'main' function as the entry point for this script -------------
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="DIY smart security camera PoC using \
Intel® Movidius™ Neural Compute Stick." )
parser.add_argument( '-g', '--graph', type=str,
default='../../caffe/SSD_MobileNet/graph',
help="Absolute path to the neural network graph file." )
parser.add_argument( '-v', '--video', type=int,
default=0,
help="Index of your computer's V4L2 video device. \
ex. 0 for /dev/video0" )
parser.add_argument( '-l', '--labels', type=str,
default='../../caffe/SSD_MobileNet/labels.txt',
help="Absolute path to labels file." )
parser.add_argument( '-M', '--mean', type=float,
nargs='+',
default=[127.5, 127.5, 127.5],
help="',' delimited floating point values for image mean." )
parser.add_argument( '-S', '--scale', type=float,
default=0.00789,
help="Absolute path to labels file." )
parser.add_argument( '-D', '--dim', type=int,
nargs='+',
default=[300, 300],
help="Image dimensions. ex. -D 224 224" )
parser.add_argument( '-c', '--colormode', type=str,
default="bgr",
help="RGB vs BGR color sequence. This is network dependent." )
ARGS = parser.parse_args()
# Create a VideoCapture object
camera = cv2.VideoCapture( ARGS.video )
# Set camera resolution
camera.set( cv2.CAP_PROP_FRAME_WIDTH, 620 )
camera.set( cv2.CAP_PROP_FRAME_HEIGHT, 480 )
# Load the labels file
labels =[ line.rstrip('\n') for line in
open( ARGS.labels ) if line != 'classes\n']
main()
# ==== End of file ===========================================================
Discussions
Become a Hackaday.io Member
Create an account to leave a comment. Already have an account? Log In.