I tried to get the the HX711 routine from the pigpio library working, but it kept stopping after a couple of minutes. This is what I ended up with. Occasionally, the data gets corrupted when the linux kernel interferes with the data transfer so it has to be processed to avoid big spikes in the data stream.
This code works at the physical layer -- bit banging the data from the HX711. It must be called after the DOUT line transitions to a low state, indicating that the HX711 has finished a conversion.
#!/usr/bin/env python3
#-*- coding:utf-8 -*-
import RPi.GPIO as GPIO
import time
import math
GPIO.setmode(GPIO.BCM)
class HX711:
""" Configured as just a 24-bit differential ADC """
def __init__(self, dout, pd_sck, gain=32):
self.PD_SCK = pd_sck
self.DOUT = dout
GPIO.setup(self.PD_SCK, GPIO.OUT)
GPIO.setup(self.DOUT, GPIO.IN)
if gain is 128:
self.GAIN = 1
elif gain is 64:
self.GAIN = 3
elif gain is 32:
self.GAIN = 2
#initialize
GPIO.output(self.PD_SCK, False)
time.sleep(0.4) # settle time
GPIO.wait_for_edge(self.DOUT, GPIO.FALLING)
self.read()
def read(self):
""" Clock in the data.
PD_SCK must be low to keep the HX711 in an active state.
The data from DOUT is clocked in on the falling edge of PD_SCK.
Min PD_SCK high time = 0.2us, Max PD_SCK high time = 50us.
Min PD_SCK low time = 0.2us. """
databits = []
for j in range(2, -1, -1):
xbyte = 0x00 # intialize data byte
for i in range(7, -1, -1):
GPIO.output(self.PD_SCK, True)
xbyte = xbyte | GPIO.input(self.DOUT) << i
GPIO.output(self.PD_SCK, False)
databits.append(xbyte)
#set channel and gain factor for next reading
for i in range(self.GAIN):
GPIO.output(self.PD_SCK, True)
GPIO.output(self.PD_SCK, False)
# convert to integer from 2's complement bytes
result = (databits[0] << 16) + (databits[1] << 8) + databits[2]
if result > 8388607: result -= 16777216
return result
def meanstdv(self, x):
"""
Calculate mean and standard deviation of data x[]:
mean = {\sum_i x_i \over n}
std = math.sqrt(\sum_i (x_i - mean)^2 \over n-1)
"""
n = len(x)
mean, std = 0, 0
for a in x:
mean += a
mean = mean / float(n)
for a in x:
std += (a - mean)**2
if(n > 1):
std = math.sqrt(std / float(n-1))
else:
std = 0.0
return mean, std
# test code
if __name__ == '__main__':
DOUT = 25
PD_SCK = 24
gain = 128
print("Gain = {}".format(gain))
hx = HX711(DOUT, PD_SCK, gain)
while True:
GPIO.wait_for_edge(DOUT, GPIO.FALLING)
try:
val = hx.read()
print("value = {0} ".format(val))
except KeyboardInterrupt:
GPIO.cleanup()
GPIO.cleanup()
An this is an example of how to use it, along with the pulse swallowing code.
#!/usr/bin/env python3
#-*- coding:utf-8 -*-
'''
An example of getting data from the HX711 and swallowing artifacts. No guarantees that
this particular code block will work, since I butchered it pretty heavily. There are
additional processes (not shown) that processes the data into a seedlink stream and
web server -- pretty esoteric -- I didn't think that there would be much interest.
But the general method should be pretty clear.
'''
from myHX711 import HX711
import time
from datetime import datetime, timedelta, date
from multiprocessing import Process, Manager
import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BCM)
# instantiate hx711
gain = 128 # allowed values: 128, 64, 32
print("gain = {}".format(gain))
DOUT = 25
PD_SCK = 24
RATE = 23
GPIO.setup(DOUT, GPIO.IN)
GPIO.setup(RATE, GPIO.OUT)
# set rate high (sample_rate = 28.94 Hz with 4.00 MHz xtal)
GPIO.output(RATE, 1)
hx = HX711(dout=DOUT, pd_sck=PD_SCK, gain=gain)
# the Manager provides communication between processes. In this case between getData and the others.
mgr = Manager()
# makes a global list variable that is compatible with multiprocessing module.
resultArray = mgr.list()
def getData():
global resultArray, DOUT
while True:
GPIO.wait_for_edge(DOUT, GPIO.FALLING,bouncetime=1)
tn = time.time() * 1000 # creates a unix timestamp with millisecond resolution
Count = hx.read() # just returns the ADC result
resultArray = signalProcessor(tn, Count, resultArray)
if (len(resultArray) > int(sample_rate * 3600/2)):
# store 1/2 hour of data
# must use pop method for manager.list object instead of list=list[x:] method
resultArray.pop(0)
def signalProcessor(tn, data, rArray):
'''
Pulse swallower:
Swallow data, by using previous datapoint, if difference between Count(n-1) and
Count(n) is greater than X, OR if the data = -1 (an indication that the data
returned by the HX711 got clobbered.)
Sample rate checker:
Checks if period between samples is larger than Y. Kernal housekeeping obliterated
an entire sample. This never happens at slower sample rates (< 30Hz).
'''
if (len(rArray) > 2):
prevData = rArray[-1][1]
prevTime = rArray[-1][0]
#if ( 1/20 < (tn - prevTime)/1000 or (tn - prevTime)/1000 < 1/35 ):
if ( 1/20 < (tn - prevTime)/1000):
print('Bad Sample Rate {0}. Index={1}'.format(1000/(tn - prevTime), len(seed)))
for n in range(0, 2):
rArray.append([tn, data])
return rArray
if (((prevData - data)^2 > 3000^2 and prevData != rArray[-2][1]) or data == -1):
rArray.append([tn, prevData])
print("{0} Swallowed Pulse {1}".format(datetime.now(),prevData - data))
else:
rArray.append([tn, data])
else:
rArray.append([tn, data])
return rArray
def wiggle():
'''Makes a 10 minute square wave.'''
GPIO.setup(22, GPIO.OUT)
while True:
print('LOW')
GPIO.output(22, 0)
time.sleep(60 * 5)
print('HIGH')
GPIO.output(22, 1)
time.sleep(60 * 5)
run_wiggle = Process(target=wiggle)
run_wiggle.start()
run_getData = Process(target=getData)
run_getData.start()
GPIO.cleanup()
run_wiggle.join()
run_getData.join()
Discussions
Become a Hackaday.io Member
Create an account to leave a comment. Already have an account? Log In.