-
Availability map
08/20/2018 at 09:07 • 0 commentsWhat I got (red means: no live image available at that time/location):
Apparently, the live image is more or less consistently not available between some 38° east and 95° east. That's just based on one day, though, starting from unix timestamp 1534627704 (August 18th 2018, 21:28 UTC)
How I got there:
Here's a script that, whenever HDEV live image availability data is received via MQTT, requests the current ISS position from a web service and combines that information. The result is written to stdout, from where I wrote to a file. I had this script running for 24 hours (it self-terminates after that period of time):
# http://open-notify.org/Open-Notify-API/ISS-Location-Now/ import urllib2 import json import paho.mqtt.client as mqtt import time def on_connect(client, userdata, flags, rc): client.subscribe("iss-hdev-availability/available-bool") def on_message(client, userdata, message): try: available = message.payload req = urllib2.Request("http://api.open-notify.org/iss-now.json") response = urllib2.urlopen(req) obj = json.loads(response.read()) timestamp = obj['timestamp'] lat = obj['iss_position']['latitude'] lon = obj['iss_position']['longitude'] print("{},{},{},{}".format(timestamp,lat,lon,available)) except URLError: pass client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect("test.mosquitto.org", 1883) client.loop_start() start_time = time.time() duration = 24*3600 while True: try: if time.time() > (start_time + duration): break time.sleep(1) except KeyboardInterrupt: break client.loop_stop()
So now we have live image availability vs location, for a 24h period. The dataset is here:
https://cdn.hackaday.io/files/14729630165536/hdev-availability.csv
(it's also in the project files)That data can be drawn on a map. I found out that there's a map toolkit for matplotlib, and installed it. The rest is quite simple:
import matplotlib as mpl mpl.use('Agg') # because the AWS EC2 machine doesn't have tkInter installed import matplotlib.pyplot as plt from mpl_toolkits.basemap import Basemap import numpy as np import csv # https://matplotlib.org/basemap/users/cyl.html m = Basemap(projection='cyl',llcrnrlat=-90,urcrnrlat=90,\ llcrnrlon=-180,urcrnrlon=180,resolution='c') m.drawcoastlines() x = [] y = [] xn = [] yn = [] npoints = 0 with open('hdev-availability.csv') as csvfile: csvreader = csv.reader(csvfile) for row in csvreader: npoints += 1 lat = float(row[1]) lon = float(row[2]) available = int(row[3]) if available: x.append(lon) y.append(lat) else: xn.append(lon) yn.append(lat) plt.title("HDEV live image availability (last {} h)".format(int(npoints*6/3600))) m.scatter(x,y,3,marker='o',color='black',latlon=True) m.scatter(xn,yn,3,marker='o',color='red',latlon=True) plt.savefig('hdev-availability-map.png') # plt.show()
In the last line you see the save-image-to-file operation, and the result is the map shown at the top and again here:
-
CPU load observations
08/18/2018 at 19:02 • 0 commentsMy image analysis is running on the amazon cloud and uses roughly 3 - 4 % CPU. One thing that sticks out, though, is that CPU load periodically increases slightly. The period is roughly the same as that of the ISS' orbit (about 90 minutes, 16 per 24 hours):
I'll try to line this up with live image availability to see if it's actually related to the image analysis part.
edit: turns out that image analysis consumes less CPU while the ISS is in the night half of it orbit, so the cameras show a black image.
-
Making use of the data
08/17/2018 at 21:14 • 0 commentsNow that the live image availability data is published as an MQTT topic that can be subscribed to, here's a script that combined this data with the ISS position:
# http://open-notify.org/Open-Notify-API/ISS-Location-Now/ import urllib2 import json import paho.mqtt.client as mqtt import time def on_message(client, userdata, message): try: available = message.payload req = urllib2.Request("http://api.open-notify.org/iss-now.json") response = urllib2.urlopen(req) obj = json.loads(response.read()) timestamp = obj['timestamp'] lat = obj['iss_position']['latitude'] lon = obj['iss_position']['longitude'] print("{},{},{},{}".format(timestamp,lat,lon,available)) except URLError: pass client = mqtt.Client("issclient") client.on_message = on_message client.connect("test.mosquitto.org", 1883) client.subscribe("iss-hdev-availability/available-bool") client.loop_start() while True: try: time.sleep(1) except KeyboardInterrupt: break client.loop_stop()
I've got a modified version of this running which will will self-terminate after 24 hours, to draw an availability map.
-
Success!
08/17/2018 at 08:40 • 0 commentsI tried writing a python script that would use streamlink to connect to a stream, grab a frame every now and then, and analyse it. Whatever I tried, it would either be horribly slow or the stream would time out after just a few seconds.
What works, though, is using ffmpeg as a player at a low-ish framerate and dump images to disk:
streamlink -O http://ustream.tv/channel/iss-hdev-payload worst | ffmpeg -i - -r $fps -f image2 -update 1 out.jpg
Now I have a more or less recent frame every few seconds (I picked 0.2 fps for my application). I found that out.jpg from that command is usually ahead (think 10 to 20 seconds) of the image shown by the ustream web viewer.
The command shown above runs in an endless loop in the cloud, so I don't have to keep a server running at home. The reason for the endless loop is that sometimes the stream times out or has other problems, and streamlink would then terminate.
Now out.jpg can be analysed. Since it's now just a regular file, I had to check if it has been modified since the last image analysis. I tried pyinotify for this purpose and it seems to work well. Caveat: ffmpeg seems to write it three times per update, and each of these accesses generates a notification. A little deadtime between analyses fixed this. The analysis is a structural similarity measurement between the recently grabbed frame (out.jpg) and a snapshot of the "no stream available" image. Then the result is published to the mosquitto test broker. Here's the quick and dirty code:
import pyinotify import time from skimage.measure import compare_ssim import imutils import cv2 import paho.mqtt.client as mqtt threshold = 0.9 class EventHandler(pyinotify.ProcessEvent): template_img = cv2.cvtColor(cv2.imread("novideo.jpg"), cv2.COLOR_BGR2GRAY) template_width = template_img.shape[1] def __init__(self): self.start_time = time.time() self.last_event = 0 self.deadtime = 1 self.updated = False self.client = mqtt.Client("analyse_client") self.client.connect("test.mosquitto.org", 1883) self.client.loop_start() def analyse(self): self.updated = True now = time.time() if now > (self.last_event + self.deadtime): recent_img = cv2.cvtColor(cv2.imread("out.jpg"), cv2.COLOR_BGR2GRAY) recent_width = recent_img.shape[1] f = float(self.template_width)/float(recent_width) recent_img = cv2.resize(recent_img, (0,0), fx=f, fy=f) (score, diff) = compare_ssim(recent_img, self.template_img, full=True) t = now - self.start_time available = score < threshold print("modified @t = {:.3f} s; score = {:.3f}; available = {}".format(t, score, available)) self.client.publish("iss-hdev-availability/available-bool", int(available)) self.last_event = now def process_IN_CLOSE_WRITE(self, event): self.analyse() def main(): wm = pyinotify.WatchManager() # Watch Manager handler = EventHandler() notifier = pyinotify.ThreadedNotifier(wm, handler) notifier.start() wdd = wm.add_watch('out.jpg', pyinotify.IN_CLOSE_WRITE) last_event = handler.last_event while True: try: time.sleep(30) if handler.updated: handler.updated = False else: print("no updates, exiting") break except KeyboardInterrupt: print("keyboard interrupt, exiting") break #wm.rm_watch(wdd.values()) notifier.stop() if __name__ == "__main__": main()
This script is also running in an endless loop in the cloud since notifications won't come anymore after the grabbing command was restarted. I don't know why, but a loop does the job.
So if you just want to know if an actual live image from the ISS HDEV experiment is available (without having to look at the actual video output), the topic "iss-hdev-availability/available-bool" on test.mosquitto.org can be subscribed to by anyone. Here's a screenshot from my phone:
Of course it also works with a command line tool:
mosquitto_sub -h test.mosquitto.org -t iss-hdev-availability/available-bool
-
streamlink
08/14/2018 at 07:56 • 0 commentsStreamlink is a fork of livestreamer and currently seems to be more stable when it comes to playing ustream streams. I had it running yesterday on an RPi 3 B+ and didn't have any connection loss (read error / stream timeout) for an hour until I just closed it.
With livestreamer I wasn't able to get past a few seconds.
So basically I'm at the start again, just being able to play the HDEV stream.
-
livestreamer seems to cause problems
10/22/2016 at 21:07 • 0 commentsCurrently, using livestreamer on the Pi to get stream data results in this error:
Traceback (most recent call last): File "01_parsepipeline.py", line 195, in <module> main() File "01_parsepipeline.py", line 172, in main streams = livestreamer.streams(url) File "/usr/local/lib/python2.7/dist-packages/livestreamer/session.py", line 355, in streams return plugin.streams(**params) File "/usr/local/lib/python2.7/dist-packages/livestreamer/plugin/plugin.py", line 233, in streams ostreams = list(ostreams) File "/usr/local/lib/python2.7/dist-packages/livestreamer/plugins/ustreamtv.py", line 550, in _get_live_streams streams = self._get_desktop_streams(channel_id) File "/usr/local/lib/python2.7/dist-packages/livestreamer/plugins/ustreamtv.py", line 513, in _get_desktop_streams provider_url = provider["url"] KeyError: 'url'
Using the exact same code on my laptop works, kinda. -
Works on laptop. Not on Pi.
10/19/2016 at 20:22 • 4 commentsThis is the current script, which outputs the expected difference between the current stream image and a reference:
#!/usr/bin/env python # GST_DEBUG=3,python:5,gnl*:5 python 01_parsepipeline.py http://www.ustream.tv/channel/17074538 worst novideo.png from __future__ import print_function import sys import gi from gi.repository import GObject as gobject, Gst as gst from livestreamer import Livestreamer, StreamError, PluginError, NoPluginError import cv2 import numpy def exit(msg): print(msg, file=sys.stderr) sys.exit() class Player(object): def __init__(self): self.fd = None self.mainloop = gobject.MainLoop() # This creates a playbin pipeline and using the appsrc source # we can feed it our stream data self.pipeline = gst.parse_launch('uridecodebin uri=appsrc:// name=decoder \ decoder. ! videorate ! video/x-raw,framerate=1/1 ! tee name=t \ t. ! queue ! videoconvert ! video/x-raw,format=RGB ! appsink name=appsink \ decoder. ! queue ! audioconvert ! fakesink') if self.pipeline is None: exit("couldn't build pipeline") decoder = self.pipeline.get_by_name('decoder') if decoder is None: exit("couldn't get decoder") decoder.connect("source-setup", self.on_source_setup) vsink = self.pipeline.get_by_name('appsink') if vsink is None: exit("couldn't get sink") vsink.set_property("emit-signals", True) vsink.set_property("max-buffers", 1) vsink.connect("new-sample", self.on_new_sample) # Creates a bus and set callbacks to receive errors self.bus = self.pipeline.get_bus() self.bus.add_signal_watch() self.bus.connect("message::eos", self.on_eos) self.bus.connect("message::error", self.on_error) def on_new_sample(self, sink): sample = sink.emit("pull-sample") buf = sample.get_buffer() caps = sample.get_caps() height = caps.get_structure(0).get_value('height') width = caps.get_structure(0).get_value('width') (result, mapinfo) = buf.map(gst.MapFlags.READ) if result == True: arr = numpy.ndarray( (height, width, 3), buffer=buf.extract_dup(0, buf.get_size()), dtype=numpy.uint8) resized_refimage = cv2.resize(refArray, (width, height)) diff = cv2.norm(arr, resized_refimage, cv2.NORM_L2) buf.unmap(mapinfo) s = "diff = " + str(diff) print(s) return gst.FlowReturn.OK def exit(self, msg): self.stop() exit(msg) def stop(self): # Stop playback and exit mainloop self.pipeline.set_state(gst.State.NULL) self.mainloop.quit() # Close the stream if self.fd: self.fd.close() def play(self, stream): # Attempt to open the stream try: self.fd = stream.open() except StreamError as err: self.exit("Failed to open stream: {0}".format(err)) # Start playback self.pipeline.set_state(gst.State.PLAYING) self.mainloop.run() def on_source_setup(self, element, source): # When this callback is called the appsrc expects # us to feed it more data print("source setup") source.connect("need-data", self.on_source_need_data) print("done") def on_pad_added(self, element, pad): string = pad.query_caps(None).to_string() print(string) if string.startswith('video/'): #type = pad.get_caps()[0].get_name() #print(type) #if type.startswith("video"): pad.link(self.vconverter.get_static_pad("sink")) def on_source_need_data(self, source, length): # Attempt to read data from the stream try: data = self.fd.read(length) except IOError as err: self.exit("Failed to read data from stream: {0}".format(err)) # If data is empty it's the end of stream if not data: source.emit("end-of-stream") return # Convert the Python bytes into a GStreamer Buffer # and then push it to the appsrc buf = gst.Buffer.new_wrapped(data) source.emit("push-buffer", buf) #print("sent " + str(length) + " bytes") def on_eos(self, bus, msg): # Stop playback on end of stream self.stop() def on_error(self, bus, msg): # Print error message and exit on error error = msg.parse_error()[1] self.exit(error) def main(): if len(sys.argv) < 4: exit("Usage: {0} <url> <quality> <reference png image path>".format(sys.argv[0])) # Initialize and check GStreamer version gi.require_version("Gst", "1.0") gobject.threads_init() gst.init(None) # Collect arguments url = sys.argv[1] quality = sys.argv[2] refImage = sys.argv[3] global refArray image = cv2.imread(refImage) refArray = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # refArray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) refArray = cv2.blur(refArray, (3,3)) # Create the Livestreamer session livestreamer = Livestreamer() # Enable logging livestreamer.set_loglevel("debug") livestreamer.set_logoutput(sys.stdout) # Attempt to fetch streams try: streams = livestreamer.streams(url) except NoPluginError: exit("Livestreamer is unable to handle the URL '{0}'".format(url)) except PluginError as err: exit("Plugin error: {0}".format(err)) if not streams: exit("No streams found on URL '{0}'".format(url)) # Look for specified stream if quality not in streams: exit("Unable to find '{0}' stream on URL '{1}'".format(quality, url)) # We found the stream stream = streams[quality] # Create the player and start playback player = Player() # Blocks until playback is done player.play(stream) if __name__ == "__main__": main()
Unfortunately, it doesn't really work on the Pi. Probably relevant warnings and errors:from livestreamer:
[plugin.ustreamtv][warning] python-librtmp is not installed, but is needed to access the desktop streams
from gstreamer:0:00:17.111833668 9423 0xb0641520 ERROR vaapidecode ../../../gst/vaapi/gstvaapidecode.c:1025:gst_vaapidecode_ensure_allowed_caps: failed to retrieve VA display
and0:00:17.130139346 9423 0xb0641520 WARN uridecodebin gsturidecodebin.c:939:unknown_type_cb:<decoder> warning: No decoder available for type 'video/x-h264, stream-format=(string)byte-stream, alignment=(string)nal, width=(int)426, height=(int)240, framerate=(fraction)30/1, parsed=(boolean)true, pixel-aspect-ratio=(fraction)1/1, level=(string)2.1, profile=(string)main'.
python-librtmp couldn't be found so I couldn't install it. The error about a "VA display" seems to be related to h264 decoding (as per google), so fixing h264 might solve that. I just don't know how.
The reference image I use is in the project files.
-
Python script
10/11/2016 at 19:29 • 0 commentsSo here's the first working python script, ignoring all good practices like checking for return values or doing things in the right place. It also doesn't output zero and ones, but a float that is the L2 norm between the current frame and the reference frame. But in a way it does the job:
#!/usr/bin/env python from __future__ import print_function import sys import gi from gi.repository import GObject as gobject, Gst as gst from streamlink import Streamlink, StreamError, PluginError, NoPluginError import cv2 import numpy def exit(msg): print(msg, file=sys.stderr) sys.exit() class StreamlinkPlayer(object): def __init__(self): self.fd = None self.mainloop = gobject.MainLoop() # This creates a playbin pipeline and using the appsrc source # we can feed it our stream data self.pipeline = gst.Pipeline.new("player") source = gst.ElementFactory.make("uridecodebin", "decodebin") source.set_property("uri", "appsrc://") self.pipeline.add(source) self.vconverter = gst.ElementFactory.make("videoconvert", "vconverter") vsink = gst.ElementFactory.make("appsink", "videosink") vsink.set_property("emit-signals", True) vsink.set_property("max-buffers", 1) caps = gst.caps_from_string("video/x-raw, format=(string){RGB}") vsink.set_property("caps", caps) vsink.connect("new-sample", self.on_new_sample) filter = gst.ElementFactory.make("videorate", "fpsfilter") filter.set_property("max-rate", 1) self.pipeline.add(self.vconverter) self.pipeline.add(filter) self.pipeline.add(vsink) self.vconverter.link(filter) filter.link(vsink) # self.vsink = vsink source.connect("source-setup", self.on_source_setup) source.connect("pad-added", self.on_pad_added) # Creates a bus and set callbacks to receive errors self.bus = self.pipeline.get_bus() self.bus.add_signal_watch() self.bus.connect("message::eos", self.on_eos) self.bus.connect("message::error", self.on_error) def exit(self, msg): self.stop() exit(msg) def stop(self): # Stop playback and exit mainloop self.pipeline.set_state(gst.State.NULL) self.mainloop.quit() # Close the stream if self.fd: self.fd.close() def play(self, stream): # Attempt to open the stream try: self.fd = stream.open() except StreamError as err: self.exit("Failed to open stream: {0}".format(err)) # Start playback self.pipeline.set_state(gst.State.PLAYING) self.mainloop.run() def on_source_setup(self, element, source): # When this callback is called the appsrc expects # us to feed it more data print("source setup") source.connect("need-data", self.on_source_need_data) print("done") def on_new_sample(self, appsink): sample = appsink.emit("pull-sample") buf = sample.get_buffer() caps = sample.get_caps() height = caps.get_structure(0).get_value('height') width = caps.get_structure(0).get_value('width') (result, mapinfo) = buf.map(gst.MapFlags.READ) if result == True: arr = numpy.ndarray( (height, width, 3), buffer=buf.extract_dup(0, buf.get_size()), dtype=numpy.uint8) resized_refimage = cv2.resize(refArray, (width, height)) sum = int(0) diff = cv2.norm(arr, resized_refimage, cv2.NORM_L2) print("diff = " + str(diff)) buf.unmap(mapinfo) return gst.FlowReturn.OK def on_pad_added(self, element, pad): string = pad.query_caps(None).to_string() print(string) if string.startswith('video/'): pad.link(self.vconverter.get_static_pad("sink")) def on_source_need_data(self, source, length): # Attempt to read data from the stream try: data = self.fd.read(length) except IOError as err: self.exit("Failed to read data from stream: {0}".format(err)) # If data is empty it's the end of stream if not data: source.emit("end-of-stream") return # Convert the Python bytes into a GStreamer Buffer # and then push it to the appsrc buf = gst.Buffer.new_wrapped(data) source.emit("push-buffer", buf) def on_eos(self, bus, msg): # Stop playback on end of stream self.stop() def on_error(self, bus, msg): # Print error message and exit on error error = msg.parse_error()[1] self.exit(error) def main(): if len(sys.argv) < 4: exit("Usage: {0} <url> <quality> <reference png image path>".format(sys.argv[0])) # Initialize and check GStreamer version gi.require_version("Gst", "1.0") gobject.threads_init() gst.init(None) # Collect arguments url = sys.argv[1] quality = sys.argv[2] refImage = sys.argv[3] global refArray image = cv2.imread(refImage) refArray = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)#cv2.COLOR_BGR2GRAY) refArray = cv2.blur(refArray, (3,3)) # Create the Streamlink session streamlink = Streamlink() # Enable logging streamlink.set_loglevel("info") streamlink.set_logoutput(sys.stdout) # Attempt to fetch streams try: streams = streamlink.streams(url) except NoPluginError: exit("Streamlink is unable to handle the URL '{0}'".format(url)) except PluginError as err: exit("Plugin error: {0}".format(err)) if not streams: exit("No streams found on URL '{0}'".format(url)) # Look for specified stream if quality not in streams: exit("Unable to find '{0}' stream on URL '{1}'".format(quality, url)) # We found the stream stream = streams[quality] # Create the player and start playback player = StreamlinkPlayer() # Blocks until playback is done player.play(stream) if __name__ == "__main__": main()
Next:- modify it to create a binary value for availability
- add more checks
- use a better comparison algorithm (the difference between available and unavailable is a bit low: about 60000 vs 49000)
- maybe: add a config file
- maybe: add possibility to change the reference image during runtime
-
Doing it in python
10/10/2016 at 09:02 • 0 commentsStarting with the livestreamer player example I was able to move away from a very hacky bash script to something in python. This involved reading lots of examples for gstreamer and figuring out if they were for gstreamer 0.1 or 1.0, reading the corresponding documentation and simply giving it a try.
The overall approach is quite simple. The example simply shows the stream using gstreamer's appbin. appbin is replaced by a custom pipeline which allows me to do two things:
- limit to one frame per second
- analyze the raw frame in the python script instead of displaying it
That custom gstreamer pipeline currently looks like this:
uridecodebin -> videorate (limit to 1 fps) -> appsink
the uridecodebin element is fed with data from the live stream, and the appsink provides a raw buffer. Now what's left to be done is to actually compare the current frame with the reference frame, and to publish the result in some way.
-
Sample output
09/22/2016 at 20:15 • 0 commentsThe script has been running for about 90 minutes now, and shows no signs of giving up yet. Here's the script's output with stream shown in the background, while no live image is available:
The output shows five zeros in a row, meaning that, at 0.2 fps, the still image has already been there for about 25 seconds. After a while the signal is back and we get this:
The script spits out 1's again after about 13 x 5 = 65 seconds. Great! In the meantime, the ISS passed by somewhere over argentina or the south atlantic.