I’d like to run a number of predictions at the same time using MXNet to get a good combination of GPU usage and low latency for a use case I’m considering. That is to say I don’t want to buffer and batch my input, I’d like to submit it as soon as it’s available, and have multiple predictions running at once.
Multi-threaded MXNet inference isn’t supported, so my plan is to use an async pattern on my main thread. In principle it seems NDArray is setup to do exactly this, however I’m running into some gaps in the API that appear to force me to block my main thread to wait for results.
Does anyone have advice for how I could do this?
I’ve made a change locally as a work around, but what I’d like to do is something like this (not the can_read on the NDArray is not exposed in MXNet currently):
from time import sleep
import datetime
import mxnet as mx
last_frame = datetime.datetime.now()
def get_frame():
global last_frame
this_frame = datetime.datetime.now()
if (this_frame - last_frame).microseconds > 16600: # 60FPS
return mx.nd.random.uniform(0, 1, (3, 640, 480))
else:
return None
def main():
handler = FrameClassifier(8)
while True:
frame = get_frame()
if frame is not None:
handler.enqueue_frame(frame)
results = handler.get_finished_results()
print(results)
sleep(.0001)
class FrameClassifier:
def __init__(self, active_prediction_limit):
self.active_prediction_limit = active_prediction_limit
self.active_predictions = []
def get_finished_results(self):
finished_predictions = []
# We need some way to process finished predictions.
for prediction in self.active_predictions:
if prediction.can_read(): # Not yet exposed.
finished_predictions.append(prediction)
[self.active_predictions.remove(p) for p in finished_predictions]
return finished_predictions
def enqueue_frame(self, frame: mx.ndarray):
if len(self.active_predictions) > self.active_prediction_limit:
raise Exception('Too many requests')
executor = self.get_executor(frame)
prediction = executor.forward(is_train=False)
self.active_predictions.append(prediction[0])
@staticmethod
def get_executor(frame: mx.ndarray):
# Placeholder network.
x = mx.sym.Variable('x')
y = mx.sym.FullyConnected(x, num_hidden=1024)
for i in range(0, 4):
y = x + 1
executor = y.bind(mx.cpu(), {'x': frame})
return executor
if __name__ == "__main__":
main()